Springboot集成RocketMQ

Springboot集成RocketMQ什么是RocketMQ?官方说明:随着使用越来越多的队列和虚拟主题,ActiveMQIO模块遇到了瓶颈。我们尽力通过节流,断路器或降级来解决此问题,但效果不佳。因此,我们那时开始关注流行的消息传递解决方案Kafka。不幸的是,Kafka不能满足我们的要求,特别是在低延迟和高可靠性方面。看到这里可以很清楚的知道RcoketMQ是一款低延迟、高可靠、可伸缩、易于使用的消息中间件。具…

大家好,又见面了,我是你们的朋友全栈君。

什么是RocketMQ?

官方说明:

随着使用越来越多的队列和虚拟主题,ActiveMQ IO模块遇到了瓶颈。我们尽力通过节流,断路器或降级来解决此问题,但效果不佳。因此,我们那时开始关注流行的消息传递解决方案Kafka。不幸的是,Kafka不能满足我们的要求,特别是在低延迟和高可靠性方面。

看到这里可以很清楚的知道RcoketMQ 是一款低延迟、高可靠、可伸缩、易于使用的消息中间件。

具有以下特性:

  • 支持发布/订阅(Pub/Sub)和点对点(P2P)消息模型
  • 能够保证严格的消息顺序,在一个队列中可靠的先进先出(FIFO)和严格的顺序传递
  • 提供丰富的消息拉取模式,支持拉(pull)和推(push)两种消息模式
  • 单一队列百万消息的堆积能力,亿级消息堆积能力
  • 支持多种消息协议,如 JMS、MQTT 等
  • 分布式高可用的部署架构,满足至少一次消息传递语义

为什么选择RocketMQ消息队列?

  • 首先RocketMQ是阿里巴巴自研出来的,也已开源。其性能和稳定性从双11就能看出来,借用阿里的一句官方介绍:历年双 11 购物狂欢节零点千万级 TPS、万亿级数据洪峰,创造了全球最大的业务消息并发以及流转纪录(日志类消息除外);
  • 在始终保证高性能前提下,支持亿级消息堆积,不影响集群的正常服务,在削峰填谷(蓄洪)、微服务解耦的场景下尤为重要;这,就能说明RocketMQ的强大。

RocketMQ的特点和优势(可跳过看三的整合代码)

  • 削峰填谷(主要解决诸如秒杀、抢红包、企业开门红等大型活动时皆会带来较高的流量脉冲,或因没做相应的保护而导致系统超负荷甚至崩溃,或因限制太过导致请求大量失败而影响用户体验,海量消息堆积能力强)Springboot集成RocketMQ
  • 异步解耦(高可用松耦合架构设计,对高依赖的项目之间进行解耦,当下游系统出现宕机,不会影响上游系统的正常运行,或者雪崩)

    L3Byb3h5L2h0dHBzL2ltZzIwMTguY25ibG9ncy5jb20vYmxvZy8xMzExNjgxLzIwMTkwOC8xMzExNjgxLTIwMTkwODE3MTQxMTAxODI0LTY4MTkxNDMxLnBuZw==.jpg

    uploading.4e448015.gif

    转存失败重新上传取消Springboot集成RocketMQ

  • 顺序消息(顺序消息即保证消息的先进先出,比如证券交易过程时间优先原则,交易系统中的订单创建、支付、退款等流程,航班中的旅客登机消息处理等)Springboot集成RocketMQ
  • 分布式事务消息(确保数据的最终一致性,大量引入 MQ 的分布式事务,既可以实现系统之间的解耦,又可以保证最终的数据一致性,减少系统间的交互)Springboot集成RocketMQ

RocketMQ环境安装

参考我另外一篇文档 windows本地安装部署RocketMQ

SpringBoot环境中使用RocketMQ

SpringBoot 入门:SpringBoot入门 – SimpleWu – 博客园
SpringBoot 常用start:SpringBoot企业常用的starter – SimpleWu – 博客园

项目基于之前搭建 SpringCloud搭建Nacos项目 增加RocketMQ功能,项目搭建参考 springcloud集成nacos的配置中心,注册中心_lockie的博客-CSDN博客

当前项目环境版本为:

  • SpringBoot 2.2.2.RELEASE
  • RocketMQ 4.7.0

 生产者项目,消费者项目都增加配置文件

<!-- rocketmq -->
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.7.0</version>
</dependency>

MQ生产者配置

mq生产者项目 boot-order-service 端口号 8802

配置文件配置:

spring.application.name=boot-order-service
server.port=8802

# nacos配置地址
nacos.config.server-addr=127.0.0.1:8848
# nacos注册地址
nacos.discovery.server-addr=127.0.0.1:8848

spring.jackson.date-format=yyyy-MM-dd HH:mm:ss
spring.jackson.time-zone=GMT+8

# 是否开启自动配置
rocketmq.producer.isOnOff=on
# 发送同一类消息设置为同一个group,保证唯一默认不需要设置,rocketmq会使用ip@pid(pid代表jvm名字)作为唯一标识
rocketmq.producer.groupName=${spring.application.name}
# mq的nameserver地址
rocketmq.producer.namesrvAddr=127.0.0.1:9876
# 消息最大长度 默认 1024 * 4 (4M)
rocketmq.producer.maxMessageSize = 4096
# 发送消息超时时间,默认 3000
rocketmq.producer.sendMsgTimeOut=3000
# 发送消息失败重试次数,默认2
rocketmq.producer.retryTimesWhenSendFailed=2

新增一个 MQProducerConfigure 配置类,用来初始化MQ生产者

package com.lockie.cloudorder.rocketmq;

import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @author: lockie
 * @Date: 2020/4/21 10:28
 * @Description: mq生产者配置
 */
@Getter
@Setter
@ToString
@Configuration
@ConfigurationProperties(prefix = "rocketmq.producer")
public class MQProducerConfigure {
    public static final Logger LOGGER = LoggerFactory.getLogger(MQProducerConfigure.class);

    private String groupName;
    private String namesrvAddr;
    // 消息最大值
    private Integer maxMessageSize;
    // 消息发送超时时间
    private Integer sendMsgTimeOut;
    // 失败重试次数
    private Integer retryTimesWhenSendFailed;

    /**
     * mq 生成者配置
     * @return
     * @throws MQClientException
     */
    @Bean
    @ConditionalOnProperty(prefix = "rocketmq.producer", value = "isOnOff", havingValue = "on")
    public DefaultMQProducer defaultProducer() throws MQClientException {
        LOGGER.info("defaultProducer 正在创建---------------------------------------");
        DefaultMQProducer producer = new DefaultMQProducer(groupName);
        producer.setNamesrvAddr(namesrvAddr);
        producer.setVipChannelEnabled(false);
        producer.setMaxMessageSize(maxMessageSize);
        producer.setSendMsgTimeout(sendMsgTimeOut);
        producer.setRetryTimesWhenSendAsyncFailed(retryTimesWhenSendFailed);
        producer.start();
        LOGGER.info("rocketmq producer server 开启成功----------------------------------");
        return producer;
    }
}

MQ消费者配置

mq消费者项目 boot-user-service 端口号 8801

增加配置参数

spring.application.name=boot-user-service
server.port=8801

# nacos配置地址
nacos.config.server-addr=127.0.0.1:8848
# nacos注册地址
nacos.discovery.server-addr=127.0.0.1:8848

spring.jackson.date-format=yyyy-MM-dd HH:mm:ss
spring.jackson.time-zone=GMT+8

# 是否开启自动配置
rocketmq.consumer.isOnOff=on
# 发送同一类消息设置为同一个group,保证唯一默认不需要设置,rocketmq会使用ip@pid(pid代表jvm名字)作为唯一标识
rocketmq.consumer.groupName=${spring.application.name}
# mq的nameserver地址
rocketmq.consumer.namesrvAddr=127.0.0.1:9876
# 消费者订阅的主题topic和tags(*标识订阅该主题下所有的tags),格式: topic~tag1||tag2||tags3;
rocketmq.consumer.topics=TestTopic~TestTag;TestTopic~HelloTag;HelloTopic~HelloTag;MyTopic~*
# 消费者线程数据量
rocketmq.consumer.consumeThreadMin=5
rocketmq.consumer.consumeThreadMax=32
# 设置一次消费信心的条数,默认1
rocketmq.consumer.consumeMessageBatchMaxSize=1

新建一个MQConsumerConfigure 类用来初始化MQ消费者

package com.lockie.bootuser.rocketmq;

import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @author: lockie
 * @Date: 2020/4/21 10:28
 * @Description: mq消费者配置
 */
@Getter
@Setter
@ToString
@Configuration
@ConfigurationProperties(prefix = "rocketmq.consumer")
public class MQConsumerConfigure {
    public static final Logger LOGGER = LoggerFactory.getLogger(MQConsumerConfigure.class);

    private String groupName;
    private String namesrvAddr;
    private String topics;
    // 消费者线程数据量
    private Integer consumeThreadMin;
    private Integer consumeThreadMax;
    private Integer consumeMessageBatchMaxSize;

    @Autowired
    private MQConsumeMsgListenerProcessor consumeMsgListenerProcessor;
    /**
     * mq 消费者配置
     * @return
     * @throws MQClientException
     */
    @Bean
    @ConditionalOnProperty(prefix = "rocketmq.consumer", value = "isOnOff", havingValue = "on")
    public DefaultMQPushConsumer defaultConsumer() throws MQClientException {
        LOGGER.info("defaultConsumer 正在创建---------------------------------------");
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(groupName);
        consumer.setNamesrvAddr(namesrvAddr);
        consumer.setConsumeThreadMin(consumeThreadMin);
        consumer.setConsumeThreadMax(consumeThreadMax);
        consumer.setConsumeMessageBatchMaxSize(consumeMessageBatchMaxSize);
        // 设置监听
        consumer.registerMessageListener(consumeMsgListenerProcessor);

        /**
         * 设置consumer第一次启动是从队列头部开始还是队列尾部开始
         * 如果不是第一次启动,那么按照上次消费的位置继续消费
         */
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
        /**
         * 设置消费模型,集群还是广播,默认为集群
         */
//        consumer.setMessageModel(MessageModel.CLUSTERING);

        try {
            // 设置该消费者订阅的主题和tag,如果订阅该主题下的所有tag,则使用*,
            String[] topicArr = topics.split(";");
            for (String topic : topicArr) {
                String[] tagArr = topic.split("~");
                consumer.subscribe(tagArr[0], tagArr[1]);
            }
            consumer.start();
            LOGGER.info("consumer 创建成功 groupName={}, topics={}, namesrvAddr={}",groupName,topics,namesrvAddr);
        } catch (MQClientException e) {
            LOGGER.error("consumer 创建失败!");
        }
        return consumer;
    }
}

这个只是初始化操作,实际对消费者对消息处理放在 consumer.registerMessageListener(consumeMsgListenerProcessor); 这个监听类里面了,实际接收消息,处理消息都放在监听类里

新建一个监听类处理消息

package com.lockie.bootuser.rocketmq;

import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;

import java.util.List;

/**
 * @author: lockie
 * @Date: 2020/4/21 11:05
 * @Description: 消费者监听
 */
@Component
public class MQConsumeMsgListenerProcessor implements MessageListenerConcurrently {
    public static final Logger LOGGER = LoggerFactory.getLogger(MQConsumeMsgListenerProcessor.class);


    /**
     * 默认msg里只有一条消息,可以通过设置consumeMessageBatchMaxSize参数来批量接收消息
     * 不要抛异常,如果没有return CONSUME_SUCCESS ,consumer会重新消费该消息,直到return CONSUME_SUCCESS
     * @param msgList
     * @param consumeConcurrentlyContext
     * @return
     */
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgList, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
        if (CollectionUtils.isEmpty(msgList)) {
            LOGGER.info("MQ接收消息为空,直接返回成功");
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
        MessageExt messageExt = msgList.get(0);
        LOGGER.info("MQ接收到的消息为:" + messageExt.toString());
        try {
            String topic = messageExt.getTopic();
            String tags = messageExt.getTags();
            String body = new String(messageExt.getBody(), "utf-8");

            LOGGER.info("MQ消息topic={}, tags={}, 消息内容={}", topic,tags,body);
        } catch (Exception e) {
            LOGGER.error("获取MQ消息内容异常{}",e);
        }
        // TODO 处理业务逻辑
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
}

测试消息发送接收

生产者 boot-order-service 新建一个controller,再新建一个send方法,发送消息

package com.lockie.cloudorder.rocketmq;

import com.lockie.cloudorder.model.Results;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * @author: lockie
 * @Date: 2020/4/21 11:17
 * @Description:
 */
@RestController
@RequestMapping("/mqProducer")
public class MQProducerController {
    public static final Logger LOGGER = LoggerFactory.getLogger(MQProducerController.class);

    @Autowired
    DefaultMQProducer defaultMQProducer;

    /**
     * 发送简单的MQ消息
     * @param msg
     * @return
     */
    @GetMapping("/send")
    public Results send(String msg) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
        if (StringUtils.isEmpty(msg)) {
            return new Results().succeed();
        }
        LOGGER.info("发送MQ消息内容:" + msg);
        Message sendMsg = new Message("TestTopic", "TestTag", msg.getBytes());
        // 默认3秒超时
        SendResult sendResult = defaultMQProducer.send(sendMsg);
        LOGGER.info("消息发送响应:" + sendResult.toString());
        return new Results().succeed(sendResult);
    }

}

浏览器请求发送send接口  http://127.0.0.1:8802/mqProducer/send?msg=hello

Springboot集成RocketMQ

修改topic和tags为MyTopic,MyTags,再发送一次

Springboot集成RocketMQ

我们进入rocketmq控制台查看

Springboot集成RocketMQ

Springboot集成RocketMQ

项目代码地址:GitHub – LockieZou/springcloud-nacos-demo

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

发布者:全栈程序员-用户IM,转载请注明出处:https://javaforall.cn/136380.html原文链接:https://javaforall.cn

【正版授权,激活自己账号】: Jetbrains全家桶Ide使用,1年售后保障,每天仅需1毛

【官方授权 正版激活】: 官方授权 正版激活 支持Jetbrains家族下所有IDE 使用个人JB账号...

(0)
blank

相关推荐

  • ubuntu16.04安装教程_刚安装ETC不能马上使用吗

    ubuntu16.04安装教程_刚安装ETC不能马上使用吗Ubuntu的安装与使用1、请参考:https://jingyan.baidu.com/album/ca00d56c2b5257e99febcf41.html?picindex=29注意网络连接选

  • 汇编语言指令大全(详细)「建议收藏」

    汇编语言指令大全(详细)「建议收藏」汇编语言指令大全8080汇编手册数据传输指令──────────────────────────────它们在存贮器和寄存器、寄存器和输入输出端口之间传送数据。1。通用数据传送指令。MOV传送字或字节。MOVSX先符号扩展,再传送。MOVZX先零扩展,再传送。PUSH把字压入堆栈。POP把字弹出堆栈。PUSHA把AX,CX,DX,BX,SP,BP,SI,DI依次

  • Qt中的QFile读写文件操作

    Qt中的QFile读写文件操作1.首先记录一下QString,QByteArray,char*之间的转换(1)QString-&amp;gt;QByteArrayQStringbuf=&quot;123&quot;;QByteArraya=buf.toUtf8();//中文a=buf.toLocal8Bit();//本地编码(2)QByteArray-&amp;gt;char*char*b=a.data…

  • python精彩编程200例-Python创意编程200例turtle篇[通俗易懂]

    简介:Python是一种高阶计算机语言。它更接近自然语言,学习成本低,开发效率高。如今越来越多的中小学生都在开始学习Python了。我们可以预见,全民会Python的日子不久就会到来,各行各业的人未来都能用Python解决各自领域的问题或创造出独特魅力的作品。在Python的普及过程中,海龟模块(turtle)将会功不可没。它来源于上个世纪60年代的logo计算机语言,就是通过指挥一只小海龟移动…

  • 计算机网络原理(谢希仁第八版)第五章课后习题答案

    计算机网络原理(谢希仁第八版)第五章课后习题答案第五章1.试说明运输层在协议栈中的地位和作用,运输层的通信和网络层的通信有什么重要区别?为什么运输层是必不可少的?答:运输层处于面向通信部分的最高层,同时也是用户功能中的最低层,向它上面的应用层提供服务运输层为应用进程之间提供端到端的逻辑通信,但网络层是为主机之间提供逻辑通信(面向主机,承担路由功能,即主机寻址及有效的分组交换)。各种应用进程之间通信需要“可靠或尽力而为”的两类服务质量,必须由运输层以复用和分用的形式加载到网络层。2.网络层提供数据报或虚电路服务对上面的运输层有何影响

  • tcpip协议族有哪些

    tcpip协议族有哪些tcpip协议族有哪些有五层应用层运输层网络层数据链路层物理层

发表回复

您的电子邮箱地址不会被公开。

关注全栈程序员社区公众号