RabbitMQ入门:发布/订阅(Publish/Subscribe)[通俗易懂]

在前面的两篇博客中RabbitMQ入门:HelloRabbitMQ代码实例RabbitMQ入门:工作队列(WorkQueue)遇到的实例都是一个消息只发送给一个消费者(工作者),他们的消息

大家好,又见面了,我是全栈君。

在前面的两篇博客中

遇到的实例都是一个消息只发送给一个消费者(工作者),他们的消息模型分别为(P代表生产者,C代表消费者,红色代表队列):

RabbitMQ入门:发布/订阅(Publish/Subscribe)[通俗易懂]

RabbitMQ入门:发布/订阅(Publish/Subscribe)[通俗易懂]

这次我们来看下将一个消息发送给多个消费者(工作者),这种模式一般被称为“发布/订阅”模式。其工作模型为(P代表生产者,X代表Exchange(路由器/交换机),C代表消费者,红色代表队列):

RabbitMQ入门:发布/订阅(Publish/Subscribe)[通俗易懂]

我们发现,工作模型中首次出现路由器,并且每个消费者有单独的队列。生产者生成消息后将其发送给路由器,然后路由器转送到队列,消费者各自到自己的队列里面获取消息进行消费。在实际的应用场景中,生产者一般不会直接将消息发送给队列,而是发送给路由器进行中转,Exchange必须清楚的知道怎么处理收到的消息:是将消息发送到一个特定队列还是多有队列,或者直接废弃消息。这种才符合RabbitMQ消息模型的核心思想

接下来我们详细展开今天的话题:

一、Exchange

Exchange在我们的工作模型中首次出现,因此需要详细介绍下。

Exchange分为4种类型:

Direct:完全根据key进行投递的,例如,绑定时设置了routing key为”abc”,那么客户端提交的消息,只有设置了key为”abc”的才会投递到队列。
Topic:对key进行模式匹配后进行投递,符号”#”匹配一个或多个词,符号”*”匹配正好一个词。例如”abc.#”匹配”abc.def.ghi”,”abc.*”只匹配”abc.def”。
Fanout:不需要key,它采取广播模式,一个消息进来时,投递到与该交换机绑定的所有队列。
Headers:我们可以不考虑它。

今天我们的实例采用fanout类型的exchange。

尽管首次出现,但是其实我们前面的案例中也有用到exchange,只是我们没有给他名字,用的是RabbitMQ默认的,比如下面这段代码,我们将路由器名这个参数传入了“”,如果我们需要自己声明exchange的话,这个就不能传入“”了,而是传入自己定义好的值。

RabbitMQ入门:发布/订阅(Publish/Subscribe)[通俗易懂]

二、临时队列

前面两篇博客中,我们都在使用队列的时候给出了定义好的名字,这在生产者和消费者共用相同队列的时候很有必要,但是我们有了exchange,生产者不需要知道有哪些队列,因此队列名字可以不用指定了,而是通过RabbitMQ 接口自己去生成临时队列,队列名字也由RabbitMQ自动生成。通过

RabbitMQ入门:发布/订阅(Publish/Subscribe)[通俗易懂]

可以声明一个非持久的、通道独占的、自动删除的队列,getQueue()方法可以获取随机队列名字。这个名字用来在队列和exchange之间建立binding关系的时候使用:

RabbitMQ入门:发布/订阅(Publish/Subscribe)[通俗易懂]

 

三、代码实现

基于上面exchange和临时队列的知识铺垫,可以展开今天的代码实现了。

  1.  生产者
    public class Product {
        //exchange名字
        public static String EXCHANGE_NAME = "exchange";
    
        public static void main(String[] args) {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            Connection connection = null;
            Channel channel = null;
            try {
                // 1.创建连接和通道
                connection = factory.newConnection();
                channel = connection.createChannel();
    
                // 2.为通道声明exchange和exchange的类型
                channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
                
                String msg = " hello rabbitmq, this is publish/subscribe mode";
                // 3.发送消息到指定的exchange,队列指定为空,由exchange根据情况判断需要发送到哪些队列
                channel.basicPublish(EXCHANGE_NAME, "", null, msg.getBytes());
                System.out.println("product send a msg: " + msg);
            } catch (IOException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            } finally {
                // 4.关闭连接
                if (channel != null) {
                    try {
                        channel.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    } catch (TimeoutException e) {
                        e.printStackTrace();
                    }
                }
    
                if (connection != null) {
                    try {
                        connection.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }

     

  2. 消费者1
    public class Consumer1 {
    
        public static void main(String[] args) {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            Connection connection = null;
            Channel channel = null;
            try {
                // 1.创建连接和通道
                connection = factory.newConnection();
                channel = connection.createChannel();
    
                // 2.为通道声明exchange以及exchange类型
                channel.exchangeDeclare(Product.EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
    
                // 3.创建随机名字的队列
                String queueName = channel.queueDeclare().getQueue();
    
                // 4.建立exchange和队列的绑定关系
                channel.queueBind(queueName, Product.EXCHANGE_NAME, "");
                System.out.println(" **** Consumer1 keep alive ,waiting for messages, and then deal them");
                // 5.通过回调生成消费者并进行监听
                Consumer consumer = new DefaultConsumer(channel) {
                    @Override
                    public void handleDelivery(String consumerTag, Envelope envelope,
                            com.rabbitmq.client.AMQP.BasicProperties properties, byte[] body) throws IOException {
    
                        // 获取消息内容然后处理
                        String msg = new String(body, "UTF-8");
                        System.out.println("*********** Consumer1" + " get message :[" + msg + "]");
                    }
                };
                // 6.消费消息
                channel.basicConsume(queueName, true, consumer);
            } catch (IOException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            }
        }
    }

     

  3. 消费者2,核心代码同消费者1一样,只是在日志打印上将”Consumer1″改为”Consumer2″而已。这里不再列出具体代码。
  4. 先运行消费者1和2,然后运行生产者,观察控制台log打印情况:
    生产者:
    product send a msg:  hello rabbitmq, this is publish/subscribe mode
    
    消费者1**** Consumer1 keep alive ,waiting for messages, and then deal them
    *********** Consumer1 get message :[ hello rabbitmq, this is publish/subscribe mode]
    
    消费者2: **** Consumer2 keep alive ,waiting for messages, and then deal them
    *********** Consumer2 get message :[ hello rabbitmq, this is publish/subscribe mode]

    可以看到,当生产者发出消息后,两个消费者最终都收到了消息。

  5. 我们去查看RabbitMQ管理页面:RabbitMQ入门:发布/订阅(Publish/Subscribe)[通俗易懂]

    在Exchanges 标签页里面多了一个名为“exchange”的路由器,他的类型是fanout。点exchange 的link进入详细页面:RabbitMQ入门:发布/订阅(Publish/Subscribe)[通俗易懂]

    发现在binding项目中有了两条绑定关系,队列的名字也可以看到。将页面切换到Queues标签页:RabbitMQ入门:发布/订阅(Publish/Subscribe)[通俗易懂]

    出现了两个新的队列,队列名字和绑定关系中的一样,并且队列都是自动删除的、通道独占的。

  6. 然后将消费者1和消费者2都停掉,重新查看管理页面,我们发现exchange还在,binding关系不存在了,临时队列也自动删除了RabbitMQ入门:发布/订阅(Publish/Subscribe)[通俗易懂]

    RabbitMQ入门:发布/订阅(Publish/Subscribe)[通俗易懂]

    RabbitMQ入门:发布/订阅(Publish/Subscribe)[通俗易懂]

     

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

发布者:全栈程序员-用户IM,转载请注明出处:https://javaforall.cn/120897.html原文链接:https://javaforall.cn

【正版授权,激活自己账号】: Jetbrains全家桶Ide使用,1年售后保障,每天仅需1毛

【官方授权 正版激活】: 官方授权 正版激活 支持Jetbrains家族下所有IDE 使用个人JB账号...

(0)


相关推荐

  • Mysql text MEDIUMTEXT 在5.5和5.7中的差异及扩容测试

    Mysql text MEDIUMTEXT 在5.5和5.7中的差异及扩容测试#textLENGTH#TINYTEXT:256bytes#TEXT:65,535bytes=>~64kb#MEDIUMTEXT:16,777,215bytes=>~16MB#LONGTEXT:4,294,967,295bytes=>~4GBselectversion();#5.7.17createtabletestTB(idintnotnu

  • 爬虫–简单woff文件的处理

    爬虫–简单woff文件的处理woff文件Web开放字体格式(WebOpenFontFormat,简称WOFF)是一种网页所采用的字体格式标准。此字体格式发展于2009年,现在正由万维网联盟的Web字体工作小组标准化,以求成为推荐标准。此字体格式不但能够有效利用压缩来减少档案大小,并且不包含加密也不受DRM(数位著作权管理)限制。而且有时候再页面接受的时候其返回的是一个字符串:如下不过看其这个url,其实简单理解就是data:font/truetype;charset=utf-8;base64,+字符串有要尝试的可以

    2022年10月26日
  • Microsoft Platform SDK Febrary 2003下载(更新VC6的SDK)

    Microsoft Platform SDK Febrary 2003下载(更新VC6的SDK)http://www.x86pro.com/article/sdk-update-for-vc6VC6自带的SDK实在太旧了, 因此很多人抱怨,有很多网上下载的代码在VC6中无法编译. 所以我们需要更新一下SDK,但是不能太新,因为太新可能不支持VC6. 支持VC++6.0的SDK,就只有2003年2月的那版了. 更新SDK后,你的VC6会重新焕发生机. 另外,如果再安装个VisualAs

  • MVP模式从入门到精通

    MVP模式从入门到精通首先附上自己写的一个MVP的demo,这是一个很标准的MVP,Github地址如下:https://github.com/SilasGao/MVPDemo首先MVP是从经典的MVC架构演变而来,那我们是不是要先说下何为MVC模式?系统C/S(Client/Server)三层架构模型:1)视图层(View):一般采用XML文件对应用的界面进行描述,使用的时候可以直接引入…

    2022年10月30日
  • C# CultureInfo列表详细说明

    C# CultureInfo列表详细说明””(空字符串)固定区域性 af 南非荷兰语 af-ZA 南非荷兰语(南非) sq 阿尔巴尼亚语 sq-AL 阿尔巴尼亚语(阿尔巴尼亚) ar 阿拉伯语 ar-DZ 阿拉伯语(阿尔及利亚) ar-BH 阿拉伯语(巴林) ar-EG 阿拉伯语(埃及) ar-IQ 阿拉伯语(伊拉克) 

  • win7系统opc服务器配置,win7 设置opc服务器

    win7系统opc服务器配置,win7 设置opc服务器win7设置opc服务器内容精选换一换创建媒体处理服务配置项。媒体处理服务配置项用于媒体处理服务中获取相关授权。参数和对应说明如表1。MpcConfigmpcConfig=newMpcConfig();mpcConfig.setEndPoint(“endPoint”);//设置转码节点地址mpcConfig.setProjectId(华为云帮助中心,为用户提供产品简介、价格说明、购买…

发表回复

您的电子邮箱地址不会被公开。

关注全栈程序员社区公众号