RabbitMQ入门:路由(Routing)

在上一篇博客《RabbitMQ入门:发布/订阅(Publish/Subscribe)》中,我们认识了fanout类型的exchange,它是一种通过广播方式发送消息的路由器,所有和exchange建立

大家好,又见面了,我是全栈君。

在上一篇博客《RabbitMQ入门:发布/订阅(Publish/Subscribe)》中,我们认识了fanout类型的exchange,它是一种通过广播方式发送消息的路由器,所有和exchange建立的绑定关系的队列都会接收到消息。但是有一些场景只需要订阅到一部分消息,这个时候就不能使用fanout 类型的exchange了,这个就引出来今天的“猪脚”–Direct Exchange,通过Routing Key来决定需要将消息发送到哪个或者哪些队列中。

接下来请收看详细内容:

  1. Direct Exchange(直接路由器)
  2. 多重绑定
  3. 代码实例

一、Direct Exchange(直接路由器)

在上文中介绍exchange的时候,对direct exchange进行了简单介绍,它是一种完全按照routing key(路由关键字)进行投递的:当消息中的routing key和队列中的binding key完全匹配时,才进行会将消息投递到该队列中。这里提到了一个routing key和binding key(绑定关键字),是什么东东?

  1. routing key:

     在发送消息的时候,basicPublish的第二个参数就是routing key,由于上次是fanout 类型的exchange 进行广播方式投递,这个字段不会影响投递结果,因此我们这里就传入了“”,但是在direct 类型的exchange中我们就不能传入””了,需要指定具体的关键字。

    RabbitMQ入门:路由(Routing)

  2. binding key:

    我们在前文中建立绑定关系的时候,queueBind的第三个参数就是绑定关键字

    RabbitMQ入门:路由(Routing)

我们声明direact exchange的时候使用:

RabbitMQ入门:路由(Routing)

二、多重绑定

多个队列相同的绑定键绑定到同一个路由器的情况,我们称之为多重绑定

工作模型为(P代表生产者,X代表路由器,红色的Q代表队列,C代表消费者):

RabbitMQ入门:路由(Routing)

 

三、代码实例

 预备知识了解完了,现在来写个程序感受下。

  1. 生产者
    public class LogDirectSender {
        // exchange名字
        public static String EXCHANGE_NAME = "directExchange";
    
        public static void main(String[] args) {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            Connection connection = null;
            Channel channel = null;
            try {
                // 1.创建连接和通道
                connection = factory.newConnection();
                channel = connection.createChannel();
    
                // 2.为通道声明direct类型的exchange
                channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
    
                
                // 3.发送消息到指定的exchange,队列指定为空,由exchange根据情况判断需要发送到哪些队列
                String routingKey = "debug";
                String msg = " hello rabbitmq, I am " + routingKey;
                channel.basicPublish(EXCHANGE_NAME, routingKey, null, msg.getBytes());
                System.out.println("product send a msg: " + msg);
            } catch (IOException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            } finally {
                // 4.关闭连接
                if (channel != null) {
                    try {
                        channel.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    } catch (TimeoutException e) {
                        e.printStackTrace();
                    }
                }
    
                if (connection != null) {
                    try {
                        connection.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
    
        }
    }

    和上次博客中生产者的区别就是黑字粗体部分:1.路由器类型改为direct 2.消息发布的时候指定了routing key

  2. 消费者
    public class LogDirectReciver {
    
        public static void main(String[] args) {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            Connection connection = null;
            Channel channel = null;
            try {
                // 1.创建连接和通道
                connection = factory.newConnection();
                channel = connection.createChannel();
    
                // 2.为通道声明direct类型的exchange
                channel.exchangeDeclare(LogDirectSender.EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
                // 3.创建随机名字的队列
                String queueName = channel.queueDeclare().getQueue();
    
                // 4.建立exchange和队列的绑定关系
                 String[] bindingKeys = { "error", "info", "debug" };
    //            String[] bindingKeys = { "error" };
                for (int i = 0; i < bindingKeys.length; i++) {
                    channel.queueBind(queueName, LogDirectSender.EXCHANGE_NAME, bindingKeys[i]);
                    System.out.println(" **** LogDirectReciver keep alive ,waiting for " + bindingKeys[i]);
                }
    
                // 5.通过回调生成消费者并进行监听
                Consumer consumer = new DefaultConsumer(channel) {
                    @Override
                    public void handleDelivery(String consumerTag, Envelope envelope,
                            com.rabbitmq.client.AMQP.BasicProperties properties, byte[] body) throws IOException {
    
                        // 获取消息内容然后处理
                        String msg = new String(body, "UTF-8");
                        System.out.println("*********** LogDirectReciver" + " get message :[" + msg + "]");
                    }
                };
                // 6.消费消息
                channel.basicConsume(queueName, true, consumer);
    
            } catch (IOException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            }
        }
    
    }

    和上次博客中消费者的区别就是黑字粗体部分:1.路由器类型改为direct 2.建立绑定关系的时候指定了binding key

  3. 执行消费者,控制台log打印如下:
     **** LogDirectReciver keep alive ,waiting for error
     **** LogDirectReciver keep alive ,waiting for info
     **** LogDirectReciver keep alive ,waiting for debug

    这个消费者我们视为消费者1,它会接收error,info,debug三个关键字的消息。

  4. 将String[] bindingKeys = { “error”, “info”, “debug” };改为String[] bindingKeys = { “error” };,然后再运行一次消费者。控制台log打印如下:
     **** LogDirectReciver keep alive ,waiting for error

    这个消费者我们视为消费者2,它只会接收error 关键字的消息。

  5. 执行生产者,然后将String routingKey = “debug”;的值分别改为“info”和”error”,然后分别执行,这样一共执行了三次生产者
    第一次执行:
    product send a msg:  hello rabbitmq, I am debug
    
    第二次执行:
    product send a msg:  hello rabbitmq, I am info
    
    第三次执行:
    product send a msg:  hello rabbitmq, I am error

  6. 再次查看两个消费者的控制台log:
    消费者1:
     **** LogDirectReciver keep alive ,waiting for error
     **** LogDirectReciver keep alive ,waiting for info
     **** LogDirectReciver keep alive ,waiting for debug
    *********** LogDirectReciver get message :[ hello rabbitmq, I am debug]
    *********** LogDirectReciver get message :[ hello rabbitmq, I am info]
    *********** LogDirectReciver get message :[ hello rabbitmq, I am error]
    
    消费者2:
     **** LogDirectReciver keep alive ,waiting for error
    *********** LogDirectReciver get message :[ hello rabbitmq, I am error]

     

  7. 查看RabbitMQ管理页面RabbitMQ入门:路由(Routing)

    exchanges标签页里面多了个direct类型的路由器。进入详细页面:RabbitMQ入门:路由(Routing)

    有4个绑定关系,其中三个的队列是同一个。切换到Queues标签页:RabbitMQ入门:路由(Routing)

    有两个临时队列。

  8. 如果关掉消费者1和消费者2,会发现队列自动删除了,绑定关系也不存在了。

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

发布者:全栈程序员-用户IM,转载请注明出处:https://javaforall.cn/120896.html原文链接:https://javaforall.cn

【正版授权,激活自己账号】: Jetbrains全家桶Ide使用,1年售后保障,每天仅需1毛

【官方授权 正版激活】: 官方授权 正版激活 支持Jetbrains家族下所有IDE 使用个人JB账号...

(0)


相关推荐

  • java详细学习路线及路线图

    java详细学习路线及路线图java详细路线:原文出自点击打开链接本文将告诉你学习Java需要达到的30个目标,学习过程中可能遇到的问题,及学习路线。希望能够对你的学习有所帮助。对比一下自己,你已经掌握了这30条中的多少条了呢?路线Java发展到现在,按应用来分主要分为三大块:J2SE,J2ME和J2EE。这三块相互补充,应用范围不同。J2SE就是Java2的标准版,主要用于…

  • 时序数据库 mysql_时序数据库 应用场景

    时序数据库 mysql_时序数据库 应用场景influxDB介绍时间序列数据是以时间字段为每行数据的标示,比如股票市场的价格,环境中的温度,主机的CPU使用率等。但是又有什么数据是不包含timestamp的呢?几乎所有的数据都可以打上一个timestamp字段。时间序列数据更重要的一个属性是如何去查询它。在查询的时候,对于时间序列我们总是会带上一个时间范围去过滤数据。同时查询的结果里也总是会包含timestamp字段。InfluxDB是一…

  • linux之管道

    1.进程间通信概述进程是一个独立的资源分配单元,不同进程之间的资源是独立的,没有关联,不能在一个进程中直接访问另一个进程的资源。进程不是孤立的,不同的进程需要进行信息的交互和状态的传递等,因此需要

    2021年12月28日
  • 转录因子调控基因表达_转录因子的转录激活域

    转录因子调控基因表达_转录因子的转录激活域基因转录调控网络——转录因子调控网络分析转录因子(TranscriptionFactors,TFs)是指能够以序列特异性方式结合DNA并且调节转录的蛋白质。转录因子通过识别特定的DNA序列来控制染色质和转录,以形成指导基因组表达的复杂系统。转录水平的调控是基因调控的重要环节,其中转录因子(TranscriptionFactor,TF)和转录因子结合位点(TranscriptionFactorBindingSite,TFBS)是转录调控的重要组成部分。基因转录调控网络由于其可以直观地显示基

    2022年10月28日
  • 错误代码as-3_android studio is currently

    错误代码as-3_android studio is currently解决AS编译报错:Causedby:org.gradle.api.internal.plugins.PluginApplicationException:Failedtoapplyplugin[id‘com.android.application’]编译Android项目时总是遇到以上报错,所以记录一下。解决方法:添加android.overridePathCheck=true就可以了。其实解决的方法可以在EventLog窗口中找到,如图下:从日志第一句可以看出,应该是项目路径包

  • 安装Pytorch-gpu版本(第一次安装 或 已经安装Pytorch-cpu版本后)

    安装Pytorch-gpu版本(第一次安装 或 已经安装Pytorch-cpu版本后)由于已经安装了cpu版本了,如果再在该环境下安装gpu版本会造成环境污染.因此,再安装gpu版本时,需要再新建一个虚拟环境才能安装成功。然后去官网下载所适配的版本。安装完cuda和cudnn后,开始安装pytorch的gpu版本。1.安装cude首先查看windows电脑之前是否成功安装了CUDA第一步:同时按键盘上的“windows键+R”,输入“cmd”并回车,进入windows的命令行界面。第二步:命令行里输入“nvcc-V”并回车第三步:如果已经成功安装CUDA的话,.

发表回复

您的电子邮箱地址不会被公开。

关注全栈程序员社区公众号