RocketMQ 入门使用详解[通俗易懂]

RocketMQ 入门使用详解[通俗易懂]RocketMQ是阿里巴巴在2012年开源的分布式消息中间件,目前已经捐赠给Apache基金会,已经于2016年11月成为 Apache 孵化项目,相信RocketMQ的未来会发挥着越来越大的作用,将有更多的开发者因此受益。 本文仅对RocketMQ的简单实用做入门性介绍,不对RocketMQ的底层原理进行深入介绍,后续文章将对RocketMQ的原理做详细介绍。

大家好,又见面了,我是你们的朋友全栈君。

       RocketMQ是阿里巴巴在2012年开源的分布式消息中间件,目前已经捐赠给Apache基金会,已经于2016年11月成为 Apache 孵化项目,相信RocketMQ的未来会发挥着越来越大的作用,将有更多的开发者因此受益。

        本文仅对RocketMQ的简单实用做入门性介绍,不对RocketMQ的底层原理进行深入介绍,后续文章将对RocketMQ的原理做详细介绍。

        RocketMQ的Maven依赖:

   

<!-- https://mvnrepository.com/artifact/com.alibaba.rocketmq/rocketmq-client -->
<dependency>
    <groupId>com.alibaba.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>3.2.6</version>
</dependency>

     MQ的消费类RocketMQConsumer.java:

    

package com.lance.rocketMQ.RocketMQ;

import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.MessageListener;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;

import java.util.UUID;


/**
 * Created by lance on 2017/2/10.
 */
public class RocketMQConsumer {

    private DefaultMQPushConsumer consumer;

    private MessageListener listener;

    protected String nameServer;

    protected String groupName;

    protected String topics;

    public RocketMQConsumer(MessageListener listener, String nameServer, String groupName, String topics) {
        this.listener = listener;
        this.nameServer = nameServer;
        this.groupName = groupName;
        this.topics = topics;
    }

    public void init() {
        consumer = new DefaultMQPushConsumer(groupName);
        consumer.setNamesrvAddr(nameServer);
        try {
            consumer.subscribe(topics, "*");
        } catch (MQClientException e) {
            e.printStackTrace();
        }
        consumer.setInstanceName(UUID.randomUUID().toString());
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
        consumer.registerMessageListener((MessageListenerConcurrently) this.listener);

        try {
            consumer.start();
        } catch (MQClientException e) {
            e.printStackTrace();
        }
        System.out.println("RocketMQConsumer Started! group=" + consumer.getConsumerGroup() + " instance=" + consumer.getInstanceName()
        );
    }


}

       MQ消息的监听接口类RocketMQListener.java

      

        

package com.lance.rocketMQ.RocketMQ;

import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.common.message.MessageExt;

import java.util.List;

/**
 * Created by lance on 2017/2/10.
 */
public class RocketMQListener  implements MessageListenerConcurrently {


    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
//        System.out.println("get data from rocketMQ:" + msgs);
        for (MessageExt message : msgs) {

            String msg = new String(message.getBody());
            System.out.println("msg data from rocketMQ:" + msg);
        }

        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
}

        MQ消息的生产者类RocketMQProducer.java

package com.lance.rocketMQ.RocketMQ;

import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
import com.alibaba.rocketmq.client.producer.SendResult;
import com.alibaba.rocketmq.client.producer.SendStatus;
import com.alibaba.rocketmq.common.message.Message;

import java.util.UUID;

/**
 * Created by lance on 2017/2/10.
 */
public class RocketMQProducer {

    private DefaultMQProducer sender;

    protected String nameServer;

    protected String groupName;

    protected String topics;

    public void init() {
        sender = new DefaultMQProducer(groupName);
        sender.setNamesrvAddr(nameServer);
        sender.setInstanceName(UUID.randomUUID().toString());
        try {
            sender.start();
        } catch (MQClientException e) {
            e.printStackTrace();
        }
    }

    public RocketMQProducer(String nameServer, String groupName, String topics) {
        this.nameServer = nameServer;
        this.groupName = groupName;
        this.topics = topics;
    }

    public void send(Message message) {

        message.setTopic(topics);

        try {
            SendResult result = sender.send(message);
            SendStatus status = result.getSendStatus();
            System.out.println("messageId=" + result.getMsgId() + ", status=" + status);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

       测试RocketMQ的消费 RocketMQConsumerTest.java

package com.lance.rocketMQ.RocketMQ;

/**
 * Created by lance on 2017/2/10.
 */
public class RocketMQConsumerTest {


    public static void main(String[] args) {


        String mqNameServer = "172.10.254.2:9876";
        String mqTopics = "MQ-MSG-TOPICS-TEST";

        String consumerMqGroupName = "CONSUMER-MQ-GROUP";
        RocketMQListener mqListener = new RocketMQListener();
        RocketMQConsumer mqConsumer = new RocketMQConsumer(mqListener, mqNameServer, consumerMqGroupName, mqTopics);
        mqConsumer.init();


        try {
            Thread.sleep(1000 * 60L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

    }
}

          run RocketMQConsumerTest.java 之后,控制台输出:

       

RocketMQConsumer Started! group=CONSUMER-MQ-GROUP instance=1eb7d308-4414-4658-90b5-e2cae3b793eb

            结果分析: 此时MQ对应的TOPIC中并没有响应的消息,故收不到消息,仅看到MQ消费者正常启动信息。

       

        MQ的生产者测试类:RocketMQProducerTest.java

       

package com.lance.rocketMQ.RocketMQ;

import com.alibaba.rocketmq.common.message.Message;

/**
 * Created by lance on 2017/2/10.
 */
public class RocketMQProducerTest {

    public static void main(String[] args) {

        String mqNameServer = "172.10.254.2:9876";
        String mqTopics = "MQ-MSG-TOPICS-TEST";

        String producerMqGroupName = "PRODUCER-MQ-GROUP";
        RocketMQProducer mqProducer = new RocketMQProducer(mqNameServer, producerMqGroupName, mqTopics);
        mqProducer.init();


        for (int i = 0; i < 5; i++) {

            Message message = new Message();
            message.setBody(("I send message to RocketMQ " + i).getBytes());
            mqProducer.send(message);
        }



    }

}

          run RocketMQProducerTest.java 之后,RocketMQProducerTest.java 对应的控制台输出为:

                

messageId=0A71290100002A9F00000003D0BB0832, status=SEND_OK
messageId=0A71290100002A9F00000003D0BB08BB, status=SEND_OK
messageId=0A71290100002A9F00000003D0BB0944, status=SEND_OK
messageId=0A71290100002A9F00000003D0BB09CD, status=SEND_OK
messageId=0A71290300002A9F000000005440AEED, status=SEND_OK

         结果分析:表明所有消息都已经正常发送,且被RocketMQ正常接收。

          此时查看RocketMQConsumerTest.java对应的控制台输出发生改变,输出内容变更如下:

         

RocketMQConsumer Started! group=CONSUMER-MQ-GROUP instance=1eb7d308-4414-4658-90b5-e2cae3b793eb
msg data from rocketMQ:I send message to RocketMQ 1
msg data from rocketMQ:I send message to RocketMQ 0
msg data from rocketMQ:I send message to RocketMQ 3
msg data from rocketMQ:I send message to RocketMQ 2
msg data from rocketMQ:I send message to RocketMQ 4

看,简单吧!

备注:小编自己使用了Apache版本的RocketMQ(即RocketMQ 4.*),发现只需要更改import的package的路径而已,不需要修改其他代码,请参考。

RocketMQ的重复问题解决方式:
a.MQ的消费端执行的操作具有幂等性,即无论多少次重复执行,其结果是一样的;
b.MQ的消费端做重复校验,比如将受到MQ消息的唯一编号保存到Redis中,即每次收到消息时,将检查唯一编号是否已经在Redis中,如果存在说明消息重复;否则将唯一编号放入到Redis中,可以根据系统需要设置唯一编号在Redis中的过期时间,以防止Redis溢出。

 

参考链接:1.分布式开放消息系统(RocketMQ)的原理与实践 (强烈推荐)

                    2.RocketMQ捐赠给Apache那些鲜为人知的故事

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

发布者:全栈程序员-用户IM,转载请注明出处:https://javaforall.cn/149571.html原文链接:https://javaforall.cn

【正版授权,激活自己账号】: Jetbrains全家桶Ide使用,1年售后保障,每天仅需1毛

【官方授权 正版激活】: 官方授权 正版激活 支持Jetbrains家族下所有IDE 使用个人JB账号...

(0)


相关推荐

  • 图像标注-自动标注图像

    图像标注-自动标注图像常见的图像标注工具有Yolo_mark,labelImg,以下两篇文章是对这两款工具的说明Yolo_mark使用教程labelImg标注图像深度学习图像标注工具汇总这里需要提供另一款标注工具—百度物体检测模型,不同于以上的标注工具,百度提供的物体检测模型在标注一定数量(100张)后,提供智能标注功能,可以对数据集中的其他图片自动标注,详细介绍确认百度自动标注结果后可以将数据集…

  • SpringAOP学习–SpringAOP简介及原理

    SpringAOP学习–SpringAOP简介及原理前文对AOP做了介绍,实际项目中,一般不会直接上手手动实现aop,而是使用一些高级封装的aop实现,如SpringAOP。Spring是一个广泛应用的框架,SpringAOP则是Spring提供的一个标准易用的aop框架,依托Spring的IOC容器,提供了极强的AOP扩展增强能力,对项目开发提供了极大地便利。前文提到AOP的实现有AspectJ、JDK动态代理、CGLIB动态代理,SpringAOP不是一种新的AOP实现,其底层采用的是JDK/CGLIB动态代理。JDK动态代理回顾上一篇简单介绍了

  • 四足机器人|机器狗|仿生机器人|多足机器人|Adams仿真|Simulink仿真|基于CPG的四足机器人Simulink与Adams虚拟样机|源码可直接执行|绝对干货!需要资料及指导的可以联系我!

    四足机器人|机器狗|仿生机器人|多足机器人|Adams仿真|Simulink仿真|基于CPG的四足机器人Simulink与Adams虚拟样机|源码可直接执行|绝对干货!需要资料及指导的可以联系我!四足机器人、行走控制。附带源码及虚拟样机设计方案。针对目前仿生四足机器人控制中存在的稳定性低、控制精度不高、可控性差等问题,本文引入一种基于CPG(中央模式发生器)的步态控制算法模型,CPG生成的节律运动具有独立性与稳定性,还具有反馈调整功能,对波形调制处理后,能够实现仿生机器人的前进、后退、转弯、侧移、原地踏步等运动控制。针对仿生机器人研发周期长与成本高的问题,本课题利用Simulink与Adams构建虚拟样机对步态控制模型进行联合仿真验证。

  • cisco交换机常用命令[通俗易懂]

    一台全新交换机,不同模式命令大全交换机基本状态显示及各个状态切换:hostname&gt;用户模式hostname#特权模式hostname(config)#全局配置模式hostname(config-if)#接口或者vlan多个接口配置模式hostname(vlan)#…

  • redis的问题_redis高级数据类型

    redis的问题_redis高级数据类型备注:针对基本问题做一些基本的总结,不是详细解答!1.Redis在项目中的主要作用是是什么?怎么用的?(应用场景)2.Redis支持的数据类型(必考)3.zset跳表的数据结构(必考)4.Redis的数据过期策略(必考)5.Redis的LRU过期策略的具体实现6.如何解决Redis缓存雪崩,缓存穿透问题7.Redis的持久化机制(必考)8.Redis的管道pipel…

  • PHP中heredoc和nowdoc的用法

    PHP中heredoc和nowdoc的用法

发表回复

您的电子邮箱地址不会被公开。

关注全栈程序员社区公众号