SpringBoot整合RabbitMQ之实战

SpringBoot整合RabbitMQ之实战实战前言RabbitMQ作为目前应用相当广泛的消息中间件,在企业级应用、微服务应用中充当着重要的角色。特别是在一些典型的应用场景以及业务模块中具有重要的作用,比如业务服务模块解耦、异步通信、高并发限流、超时业务、数据延迟处理等。其中课程的学习链接地址:https://edu.csdn.net/course/detail/9314RabbitMQ官网拜读首先,让我们先拜读Ra…

大家好,又见面了,我是你们的朋友全栈君。

实战前言

RabbitMQ 作为目前应用相当广泛的消息中间件,在企业级应用、微服务应用中充当着重要的角色。特别是在一些典型的应用场景以及业务模块中具有重要的作用,比如业务服务模块解耦、异步通信、高并发限流、超时业务、数据延迟处理等。

其中课程的学习链接地址:https://edu.csdn.net/course/detail/9314 

RabbitMQ 官网拜读

首先,让我们先拜读 RabbitMQ 官网的技术开发手册以及相关的 Features,感兴趣的朋友可以耐心的阅读其中的相关介绍,相信会有一定的收获,地址可见:http://www.rabbitmq.com/getstarted.html

阅读该手册过程中,我们可以得知 RabbitMQ 其实核心就是围绕 “消息模型” 来展开的,其中就包括了组成消息模型的相关组件:生产者,消费者,队列,交换机,路由,消息等!而我们在实战应用中,实际上也是紧紧围绕着 “消息模型” 来展开撸码的!

下面,我就介绍一下这一消息模型的演变历程,当然,这一历程在 RabbitMQ 官网也是可以窥览得到的!

enter image description here

上面几个图就已经概述了几个要点,而且,这几个要点的含义可以说是字如其名!

生产者:发送消息的程序
消费者:监听接收消费消息的程序
消息:一串二进制数据流
队列:消息的暂存区/存储区
交换机:消息的中转站,用于接收分发消息。其中有 fanout、direct、topic、headers 四种
路由:相当于密钥/第三者,与交换机绑定即可路由消息到指定的队列!
正如上图所展示的消息模型的演变,接下来我们将以代码的形式实战各种典型的业务场景!

SpringBoot 整合 RabbitMQ 实战

工欲善其事,必先利其器。我们首先需要借助 IDEA 的 Spring Initializr 用 Maven 构建一个 SpringBoot 的项目,并引入 RabbitMQ、Mybatis、Log4j 等第三方框架的依赖。搭建完成之后,可以简单的写个 RabbitMQController 测试一下项目是否搭建是否成功(可以暂时用单模块方式构建)

紧接着,我们进入实战的核心阶段,在项目或者服务中使用 RabbitMQ,其实无非是有几个核心要点要牢牢把握住,这几个核心要点在撸码过程中需要“时刻的游荡在自己的脑海里”,其中包括:

我要发送的消息是什么
我应该需要创建什么样的消息模型:DirectExchange+RoutingKey?TopicExchange+RoutingKey?等
我要处理的消息是实时的还是需要延时/延迟的?
消息的生产者需要在哪里写,消息的监听消费者需要在哪里写,各自的处理逻辑是啥
基于这样的几个要点,我们先小试牛刀一番,采用 RabbitMQ 实战异步写日志与异步发邮件。当然啦,在进行实战前,我们需要安装好 RabbitMQ 及其后端控制台应用,并在项目中配置一下 RabbitMQ 的相关参数以及相关 Bean 组件。

RabbitMQ 安装完成后,打开后端控制台应用:http://localhost:15672  输入guest guest 登录,看到下图即表示安装成功

enter image description here

然后是项目配置文件层面的配置 application.properties

spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.listener.concurrency=10
spring.rabbitmq.listener.max-concurrency=20
spring.rabbitmq.listener.prefetch=5

其中,后面三个参数主要是用于“并发量的配置”,表示:并发消费者的初始化值,并发消费者的最大值,每个消费者每次监听时可拉取处理的消息数量。

接下来,我们需要以 Configuration 的方式配置 RabbitMQ 并以 Bean 的方式显示注入 RabbitMQ 在发送接收处理消息时相关 Bean 组件配置其中典型的配置是 RabbitTemplate 以及 SimpleRabbitListenerContainerFactory,前者是充当消息的发送组件,后者是用于管理  RabbitMQ监听器listener 的容器工厂,其代码如下:

@Configuration
    public class RabbitmqConfig {
    private static final Logger log= LoggerFactory.getLogger(RabbitmqConfig.class);
 
    @Autowired
    private Environment env;
 
    @Autowired
    private CachingConnectionFactory connectionFactory;
 
    @Autowired
    private SimpleRabbitListenerContainerFactoryConfigurer factoryConfigurer;
 
    /**
     * 单一消费者
     * @return
     */
    @Bean(name = "singleListenerContainer")
    public SimpleRabbitListenerContainerFactory listenerContainer(){
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setMessageConverter(new Jackson2JsonMessageConverter());
        factory.setConcurrentConsumers(1);
        factory.setMaxConcurrentConsumers(1);
        factory.setPrefetchCount(1);
        factory.setTxSize(1);
        factory.setAcknowledgeMode(AcknowledgeMode.AUTO);
        return factory;
    }
 
    /**
     * 多个消费者
     * @return
     */
    @Bean(name = "multiListenerContainer")
    public SimpleRabbitListenerContainerFactory multiListenerContainer(){
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factoryConfigurer.configure(factory,connectionFactory);
        factory.setMessageConverter(new Jackson2JsonMessageConverter());
        factory.setAcknowledgeMode(AcknowledgeMode.NONE);
        factory.setConcurrentConsumers(env.getProperty("spring.rabbitmq.listener.concurrency",int.class));
        factory.setMaxConcurrentConsumers(env.getProperty("spring.rabbitmq.listener.max-concurrency",int.class));
        factory.setPrefetchCount(env.getProperty("spring.rabbitmq.listener.prefetch",int.class));
        return factory;
    }
 
    @Bean
    public RabbitTemplate rabbitTemplate(){
        connectionFactory.setPublisherConfirms(true);
        connectionFactory.setPublisherReturns(true);
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMandatory(true);
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                log.info("消息发送成功:correlationData({}),ack({}),cause({})",correlationData,ack,cause);
            }
        });
        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
            @Override
            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                log.info("消息丢失:exchange({}),route({}),replyCode({}),replyText({}),message:{}",exchange,routingKey,replyCode,replyText,message);
            }
        });
        return rabbitTemplate;
    }}

RabbitMQ 实战:业务模块解耦以及异步通信

在一些企业级系统中,我们经常可以见到一个执行 function 通常是由许多子模块组成的,这个 function 在执行过程中,需要 同步的将其代码从头开始执行到尾,即执行流程是 module_A -> module_B -> module_C -> module_D,典型的案例可以参见汇编或者 C 语言等面向过程语言开发的应用,现在的一些 JavaWeb 应用也存在着这样的写法。

而我们知道,这个执行流程其实对于整个 function 来讲是有一定的弊端的,主要有几点:

整个 function 的执行响应时间将很久;
如果某个 module 发生异常而没有处理得当,可能会影响其他 module 甚至整个 function 的执行流程与结果;
整个 function 中代码可能会很冗长,模块与模块之间可能需要进行强通信以及数据的交互,出现问题时难以定位与维护,甚至会陷入 “改一处代码而动全身”的尴尬境地!
故而,我们需要想办法进行优化,我们需要将强关联的业务模块解耦以及某些模块之间实行异步通信!下面就以两个场景来实战我们的优化措施!

场景一:异步记录用户操作日志

对于企业级应用系统或者微服务应用中,我们经常需要追溯跟踪记录用户的操作日志,而这部分的业务在某种程度上是不应该跟主业务模块耦合在一起的,故而我们需要将其单独抽出并以异步的方式与主模块进行异步通信交互数据。

下面我们就用 RabbitMQ 的 DirectExchange+RoutingKey 消息模型也实现“用户登录成功记录日志”的场景。如前面所言,我们需要在脑海里回荡着几个要点:

消息模型:DirectExchange+RoutingKey 消息模型
消息:用户登录的实体信息,包括用户名,登录事件,来源的IP,所属日志模块等信息
发送接收:在登录的 Controller 中实现发送,在某个 listener 中实现接收并将监听消费到的消息入数据表;实时发送接收
首先我们需要在上面的 RabbitmqConfig 类中创建消息模型:包括 Queue、Exchange、RoutingKey 等的建立,代码如下:

enter image description here

上图中 env 获取的信息,我们需要在 application.properties 进行配置,其中 mq.env=local

enter image description here

此时,我们将整个项目/服务跑起来,并打开 RabbitMQ 后端控制台应用,即可看到队列以及交换机及其绑定已经建立好了,如下所示:

SpringBoot整合RabbitMQ之实战

SpringBoot整合RabbitMQ之实战

接下来,我们需要在 Controller 中执行用户登录逻辑,记录用户登录日志,查询获取用户角色视野资源信息等,由于篇幅关系,在这里我们重点要实现的是用MQ实现 “异步记录用户登录日志” 的逻辑,即在这里 Controller 将充当“生产者”的角色,核心代码如下:

@RestController
    public class UserController {
 
    private static final Logger log= LoggerFactory.getLogger(HelloWorldController.class);
 
    private static final String Prefix="user";
 
    @Autowired
    private ObjectMapper objectMapper;
 
    @Autowired
    private UserMapper userMapper;
 
    @Autowired
    private UserLogMapper userLogMapper;
 
    @Autowired
    private RabbitTemplate rabbitTemplate;
 
    @Autowired
    private Environment env;
 
    @RequestMapping(value = Prefix+"/login",method = RequestMethod.POST,consumes = MediaType.MULTIPART_FORM_DATA_VALUE)
    public BaseResponse login(@RequestParam("userName") String userName,@RequestParam("password") String password){
        BaseResponse response=new BaseResponse(StatusCode.Success);
        try {
            //TODO:执行登录逻辑
            User user=userMapper.selectByUserNamePassword(userName,password);
            if (user!=null){
                //TODO:异步写用户日志
                try {
                    UserLog userLog=new UserLog(userName,"Login","login",objectMapper.writeValueAsString(user));
                    userLog.setCreateTime(new Date());
                    rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
                    rabbitTemplate.setExchange(env.getProperty("log.user.exchange.name"));
                    rabbitTemplate.setRoutingKey(env.getProperty("log.user.routing.key.name"));
 
                    Message message=MessageBuilder.withBody(objectMapper.writeValueAsBytes(userLog)).setDeliveryMode(MessageDeliveryMode.PERSISTENT).build();
                    message.getMessageProperties().setHeader(AbstractJavaTypeMapper.DEFAULT_CONTENT_CLASSID_FIELD_NAME, MessageProperties.CONTENT_TYPE_JSON); 
                    rabbitTemplate.convertAndSend(message);         
                }catch (Exception e){
                    e.printStackTrace();
                }
 
                //TODO:塞权限数据-资源数据-视野数据
            }else{
                response=new BaseResponse(StatusCode.Fail);
            }
        }catch (Exception e){
            e.printStackTrace();
        }
        return response;
    }}

在上面的“发送逻辑”代码中,其实也体现了我们最开始介绍的演进中的几种消息模型,比如我们是将消息发送到 Exchange 的而不是 Queue,消息是以二进制流的形式进行传输等等。当用 postman 请求到这个 controller 的方法时,我们可以在 RabbitMQ 的后端控制台应用看到一条未确认的消息,通过 GetMessage 即可看到其中的详情,如下:

SpringBoot整合RabbitMQ之实战

SpringBoot整合RabbitMQ之实战

最后,我们将开发消费端的业务代码,如下:

@Component
    public class CommonMqListener {
 
    private static final Logger log= LoggerFactory.getLogger(CommonMqListener.class);
 
    @Autowired
    private ObjectMapper objectMapper;
 
    @Autowired
    private UserLogMapper userLogMapper;
 
    @Autowired
    private MailService mailService;
 
    /**
     * 监听消费用户日志
     * @param message
     */
    @RabbitListener(queues = "${log.user.queue.name}",containerFactory = "singleListenerContainer")
    public void consumeUserLogQueue(@Payload byte[] message){
        try {
            UserLog userLog=objectMapper.readValue(message, UserLog.class);
            log.info("监听消费用户日志 监听到消息: {} ",userLog);
            //TODO:记录日志入数据表
            userLogMapper.insertSelective(userLog);
        }catch (Exception e){
            e.printStackTrace();
        }
    }

将服务跑起来之后,我们即可监听消费到上面 Queue 中的消息,即当前用户登录的信息,而且,我们也可以看到“记录用户登录日志”的逻辑是由一条异于主业务线程的异步线程去执行的:

“异步记录用户操作日志”的案例我想足以用于诠释上面所讲的相关理论知识点了,在后续篇章中,由于篇幅限制,我将重点介绍其核心的业务逻辑!

场景二:异步发送邮件

发送邮件的场景,其实也是比较常见的,比如用户注册需要邮箱验证,用户异地登录发送邮件通知等等,在这里我以 RabbitMQ 实现异步发送邮件。实现的步骤跟场景一几乎一致!

1. 消息模型的创建

2. 配置信息的创建

3. 生产端

4. 消费端

RabbitMQ 实战:并发量配置与消息确认机制

实战背景

对于消息模型中的 listener 而言,默认情况下是“单消费实例”的配置,即“一个 listener 对应一个消费者”,这种配置对于上面所讲的“异步记录用户操作日志”、“异步发送邮件”等并发量不高的场景下是适用的。但是在对于秒杀系统、商城抢单等场景下可能会显得很吃力!

我们都知道,秒杀系统跟商城抢单均有一个共同的明显的特征,即在某个时刻会有成百上千万的请求到达我们的接口,即瞬间这股巨大的流量将涌入我们的系统,我们可以采用下面一图来大致体现这一现象:

enter image description here

当到了“开始秒杀”、“开始抢单”的时刻,此时系统可能会出现这样的几种现象:

应用系统配置承载不了这股瞬间流量,导致系统直接挂掉,即传说中的“宕机”现象;
接口逻辑没有考虑并发情况,数据库读写锁发生冲突,导致最终处理结果跟理论上的结果数据不一致(如商品存库量只有 100,但是高并发情况下,实际表记录的抢到的用户记录数据量却远远大于 100);
应用占据服务器的资源直接飙高,如 CPU、内存、宽带等瞬间直接飙升,导致同库同表甚至可能同 host 的其他服务或者系统出现卡顿或者挂掉的现象;
于是乎,我们需要寻找解决方案!对于目前来讲,网上均有诸多比较不错的解决方案,在此我顺便提一下我们的应用系统采用的常用解决方案,包括:

我们会将处理抢单的整体业务逻辑独立、服务化并做集群部署;
我们会将那股巨大的流量拒在系统的上层,即将其转移至 MQ 而不直接涌入我们的接口,从而减少数据库读写锁冲突的发生以及由于接口逻辑的复杂出现线程堵塞而导致应用占据服务器资源飙升;
我们会将抢单业务所在系统的其他同数据源甚至同表的业务拆分独立出去服务化,并基于某种 RPC 协议走 HTTP 通信进行数据交互、服务通信等等;
采用分布式锁解决同一时间同个手机号、同一时间同个 IP 刷单的现象;
下面,我们用 RabbitMQ 来实战上述的第二点!即我们会在“请求” -> “处理抢单业务的接口” 中间架一层消息中间件做“缓冲”、“缓压”处理,如下图所示:

enter image description here

并发量配置与消息确认机制

正如上面所讲的,对于抢单、秒杀等高并发系统而言,如果我们需要用 RabbitMQ 在 “请求” – “接口” 之间充当限流缓压的角色,那便需要我们对 RabbitMQ 提出更高的要求,即支持高并发的配置,在这里我们需要明确一点,“并发消费者”的配置其实是针对 listener 而言,当配置成功后,我们可以在 MQ 的后端控制台应用看到 consumers 的数量,如下所示:

SpringBoot整合RabbitMQ之实战

其中,这个 listener 在这里有 10 个 consumer 实例的配置,每个 consumer 可以预监听消费拉取的消息数量为 5 个(如果同一时间处理不完,会将其缓存在 mq 的客户端等待处理!)

另外,对于某些消息而言,我们有时候需要严格的知道消息是否已经被 consumer 监听消费处理了,即我们有一种消息确认机制来保证我们的消息是否已经真正的被消费处理。在 RabbitMQ 中,消息确认处理机制有三种:Auto – 自动、Manual – 手动、None – 无需确认,而确认机制需要 listener 实现 ChannelAwareMessageListener 接口,并重写其中的确认消费逻辑。在这里我们将用 “手动确认” 的机制来实战用户商城抢单场景。

1.在 RabbitMQConfig 中配置确认消费机制以及并发量的配置

SpringBoot整合RabbitMQ之实战

2.消息模型的配置信息

enter image description here

3.RabbitMQ 后端控制台应用查看此队列的并发量配置

SpringBoot整合RabbitMQ之实战

4.listener 确认消费处理逻辑:在这里我们需要开发抢单的业务逻辑,即“只有当该商品的库存 >0 时,抢单成功,扣减库存量,并将该抢单的用户信息记录入表,异步通知用户抢单成功!”

enter image description here

enter image description here

紧接着我们采用 CountDownLatch 模拟产生高并发时的多线程请求(或者采用 jmeter 实施压测也可以!),每个请求将携带产生的随机数:充当手机号 -> 充当消息,最终入抢单队列!在这里,我模拟了 50000 个请求,相当于 50000 手机号同一时间发生抢单的请求,而设置的产品库存量为 100,这在 product 数据库表即可设置

enter image description here

6.将抢单请求的手机号信息压入队列,等待排队处理

enter image description here

7.在最后我们写个 Junit 或者写个 Controller,进行 initService.generateMultiThread(); 调用模拟产生高并发的抢单请求即可

@RestController
    public class ConcurrencyController {
 
    private static final Logger log= LoggerFactory.getLogger(HelloWorldController.class);
 
    private static final String Prefix="concurrency";
 
    @Autowired
    private InitService initService;
 
    @RequestMapping(value = Prefix+"/robbing/thread",method = RequestMethod.GET)
    public BaseResponse robbingThread(){
        BaseResponse response=new BaseResponse(StatusCode.Success);
        initService.generateMultiThread();
        return response;
    }}

8.最后,我们当然是跑起来,在控制台我们可以观察到系统不断的在产生新的请求(线程)– 相当于不断的有抢单的手机号涌入我们的系统,然后入队列,listener 监听到请求之后消费处理抢单逻辑!最后我们可以观察两张数据库表:商品库存表、商品成功抢单的用户记录表 – 只有当库存表中商品对应的库存量为 0、商品成功抢单的用户记录刚好 100 时 即表示我们的实战目的以及效果已经达到了!!

enter image description here

总结:如此一来,我们便将 request 转移到我们的 mq,在一定程度缓解了我们的应用以及接口的压力!当然,实际情况下,我们的配置可能远远不只代码层次上的配置,比如我们的 mq 可能会做集群配置、负载均衡、商品库存的更新可能会考虑分库分表、库存更新可能会考虑独立为库存 Dubbo 服务并通过 Rest Api 异步通信交互并独立部署等等。这些优化以及改进的目的其实无非是为了能限流、缓压、保证系统稳定、数据的一致等!而我们的 MQ,在其中可以起到不可磨灭的作用,其字如其名:“消息队列”,而队列具有 “先进先出” 的特点,故而所有进入 MQ 的消息都将 “乖巧” 的在 MQ 上排好队,先来先排队,先来先被处理消费,由此一来至少可以避免 “瞬间时刻一窝蜂的 request 涌入我们的接口” 的情况!

附注:在用 RabbitMQ 实战上述高并发抢单解决方案,其实我也在数据库层面进行了优化,即在读写存库时采用了“类似乐观锁”的写法,保证:抢单的请求到来时有库存,更新存库时保证有库存可以被更新!

彩蛋:本博文介绍了几个典型的RabbitMQ实战的业务场景,相关源码数据库可以来这里下载

(1)https://download.csdn.net/download/u013871100/10654482

(2)https://pan.baidu.com/s/1KUuz_eeFXOKF3XRMY2Jcew

 

 

 

 

原文:https://blog.csdn.net/u013871100/article/details/82982235 

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

发布者:全栈程序员-用户IM,转载请注明出处:https://javaforall.cn/146496.html原文链接:https://javaforall.cn

【正版授权,激活自己账号】: Jetbrains全家桶Ide使用,1年售后保障,每天仅需1毛

【官方授权 正版激活】: 官方授权 正版激活 支持Jetbrains家族下所有IDE 使用个人JB账号...

(0)
blank

相关推荐

  • php递归算法经典实例_递归算法1加到100

    php递归算法经典实例_递归算法1加到100在前面的文章《PHP递归算法(一)》中,我们为大家介绍了如何利用静态变量的方法来实现递归算法。本篇文章我们就继续为大家介绍另一种实现递归算法的方法即通过全局变量的方法。下面我们结合代码示例,为大家介绍通过全局变量Global实现递归的方法。代码如下:…

  • freehosting申请空间和ssh -D设置

    freehosting申请空间和ssh -D设置前段时间申请了website.org的免费空间,可是有广告.在这时向大家推荐freehosting.com.Freehosting.com是一家创建于1996年的美国网站,国内在2006年有介绍过它的免费PHP空间,不过没能找到演示,目前免费空间的主机放在德国,提供1G存储空间,月流量为10G,采用CPanel控制管理面板(有简体中文版),支持FTP和Web在线文件管理(可在线解压缩),…

  • Java解惑五:类之谜

    Java解惑五:类之谜

    2021年11月23日
  • Hybrid开发框架一、Weex

    Hybrid开发框架一、Weex前言最近开始试水Weex开发,使用这么长一段时间,感觉写Weex还是非常方便的。作为一个Android开发,免不了要追查一下weex的sdk源码。今天,就以WeexSDKforAndroid为例,分析SDK的认识WeexSDK源码https://github.com/alibaba/weex/tree/dev/android整体分析下拉,按照js文件的渲染过程,绘制出了下面…

  • sqlyog错误号码2058_将设备连接至你的电脑时出错

    sqlyog错误号码2058_将设备连接至你的电脑时出错远在天边,近在眼前。

  • 图片加载失败替换图片解决方案

    图片加载失败替换图片解决方案图片加载失败在不同浏览器表现有差异,比如google可能会一片空白、img的宽高是0*0,ie会在图片位置会出现一个碎片图标,火狐会显示一个边框像这样:一个页面如果很多这种好难看,一般会用默认图片替换显示,解决方法:1、css方案:不好使。在img的伪类加替换图片,但火狐的img没有伪类,google有,有时候就算给img标签加了width、height也没有,图片加载失败img的宽高…

发表回复

您的电子邮箱地址不会被公开。

关注全栈程序员社区公众号