RabbitMQ入门:工作队列(Work Queue)

在上一篇博客《RabbitMQ入门:HelloRabbitMQ代码实例》中,我们通过指定的队列发送和接收消息,代码还算是比较简单的。假设有这一些比较耗时的任务,按照上一次的那种方式,我们要一直等

大家好,又见面了,我是全栈君。

 

在上一篇博客《RabbitMQ入门:Hello RabbitMQ 代码实例》中,我们通过指定的队列发送和接收消息,代码还算是比较简单的。

假设有这一些比较耗时的任务,按照上一次的那种方式,我们要一直等前面的耗时任务完成了之后才能接着处理后面耗时的任务,那要等多久才能处理完?别担心,我们今天的主角–工作队列就可以解决该问题。我们将围绕下面这个索引展开:

  1. 什么是工作队列
  2. 代码准备
  3. 循环分发
  4. 消息确认
  5. 公平分发
  6. 消息持久化

废话少说,直接展开。

一、什么是工作队列

工作队列–用来将耗时的任务分发给多个消费者(工作者),主要解决这样的问题:处理资源密集型任务,并且还要等他完成。有了工作队列,我们就可以将具体的工作放到后面去做,将工作封装为一个消息,发送到队列中,一个工作进程就可以取出消息并完成工作。如果启动了多个工作进程,那么工作就可以在多个进程间共享。

二、代码准备

  1. 生产者类:NewTask.java
    public class NewTask {
        //队列名称
        public static final String QUEUE_NAME = "TASK_QUEUE";
        //队列是否需要持久化
        public static final boolean DURABLE = false;
        
        //需要发送的消息列表
        public static final String[] msgs = {"task 1", "task 2", "task 3", "task 4", "task 5", "task 6"};
        
        public static void main(String[] args) {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            Connection connection = null;
            Channel channel = null;
    
            try {
                // 1.connection & channel
                connection = factory.newConnection();
                channel = connection.createChannel();
    
                // 2.queue
                channel.queueDeclare(QUEUE_NAME, DURABLE, false, false, null);
    
                // 3.publish msg
                for (int i = 0; i < msgs.length; i++) {
                    channel.basicPublish("", QUEUE_NAME, null, msgs[i].getBytes());
                    System.out.println("** new task ****:" + msgs[i]);
                }
            } catch (IOException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            } finally {
                if (channel != null) {
                    try {
                        channel.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    } catch (TimeoutException e) {
                        e.printStackTrace();
                    }
                }
    
                if (connection != null) {
                    try {
                        connection.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
    
            }
    
        }
    }

     

  2. 消费者类:Work.java
    public class Work {
    
        public static void main(String[] args) {
            System.out.println("*** Work ***");
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
    
            try {
                //1.connection & channel
                final Channel channel = factory.newConnection().createChannel();
                
                //2.queue
                channel.queueDeclare(NewTask.QUEUE_NAME, NewTask.DURABLE, false, false, null);
    
                //3. consumer instance
                Consumer consumer = new DefaultConsumer(channel) {
                    @Override
                    public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
                            byte[] body) throws IOException {
                        String msg = new String(body, "UTF-8");
                        //deal task
                        doWork(msg);
    
                    }
                };
                
                //4.do consumer
                boolean autoAck = true;
                channel.basicConsume(NewTask.QUEUE_NAME, autoAck, consumer);
            } catch (IOException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            }
        }
    
        private static void doWork(String msg) {
            try {
                System.out.println("**** deal task begin :" + msg);
                
                //假装task比较耗时,通过sleep()来模拟需要消耗的时间
                if ("sleep".equals(msg)) {
                    Thread.sleep(1000 * 60);
                } else {
                    Thread.sleep(1000);
                }
    
                System.out.println("**** deal task finish :" + msg);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    
    }

     

  3. 再来一个消费者类:Work2.java,代码同Work.java一模一样。

 

三、循环分发

我们先启动Work和Work2,然后启动NewTask,运行结果如下:

NewTask运行结果:

RabbitMQ入门:工作队列(Work Queue)

Work运行结果:

RabbitMQ入门:工作队列(Work Queue)

Work2运行结果:

 RabbitMQ入门:工作队列(Work Queue)

我们发现,消息生产者发送了6条消息,消费者work和work2分别分到了3个消息,而且是循环轮流分发到的,这种分发的方式就是循环分发。

四、消息确认

假如我们在发送的消息里面添加“sleep”

//需要发送的消息列表
    public static final String[] msgs = {"sleep", "task 1", "task 2", "task 3", "task 4", "task 5", "task 6"};

根据代码中的实现,这个sleep要耗时1分钟,万一在这1分钟之内,工作进程崩溃了或者被kill了,会发生什么情况呢?根据上面的代码:

//4.do consumer
            boolean autoAck = true;
            channel.basicConsume(NewTask.QUEUE_NAME, autoAck, consumer);

自动确认为true,每次RabbitMQ向消费者发送消息之后,会自动发确认消息(我工作你放心,不会有问题),这个时候消息会立即从内存中删除。如果工作者挂了,那将会丢失它正在处理和未处理的所有工作,而且这些工作还不能再交由其他工作者处理,这种丢失属于客户端丢失。

我们来验证下,和刚才的步骤一样执行程序:

1.NewTask的控制台打印结果:
** new task ****:sleep
** new task ****:task 1
** new task ****:task 2
** new task ****:task 3
** new task ****:task 4
** new task ****:task 5
** new task ****:task 6

2.Work的控制台打印结果:
**** deal task begin :sleep

3.Work2的控制台打印结果:
**** deal task begin :task 1
**** deal task finish :task 1
**** deal task begin :task 3
**** deal task finish :task 3
**** deal task begin :task 5
**** deal task finish :task 5

根据上面的内容,消息生产者发送了7条消息, work2消费了1、3、5 三条,那剩下的sleep、2、4、6 这四条消息肯定是work来处理,只是sleep耗时一分钟 ,时间差后面的还没来得及处理,这个时候我们kill掉work,去看下RabbitMQ 管理页面,没有未处理的消息,消息随着work被kill也跟着丢失了。

RabbitMQ入门:工作队列(Work Queue)

是不是很可怕?

为了应对这种情况,RabbitMQ支持消息确认。消费者处理完消息之后,会发送一个确认消息告诉RabbitMQ,消息处理完了,你可以删掉它了。

代码修改(Work.java和Work2.java同步修改):1.将自动确认改为false,2.消息处理之后再通过channel.basicAck进行消息确认

RabbitMQ入门:工作队列(Work Queue)

 修改完后,执行程序:

1.NewTask的控制台打印结果:
** new task ****:sleep
** new task ****:task 1
** new task ****:task 2
** new task ****:task 3
** new task ****:task 4
** new task ****:task 5
** new task ****:task 6

2.Work的控制台打印结果:
**** deal task begin :sleep

3.Work2的控制台打印结果:
**** deal task begin :task 1
**** deal task finish :task 1
**** deal task begin :task 3
**** deal task finish :task 3
**** deal task begin :task 5
**** deal task finish :task 5

然后kill掉work,去看RabbitMQ管理页面,会发现有4条未确认:

RabbitMQ入门:工作队列(Work Queue)

再去看下work2的控制台,work2将work未处理完和未来得及处理的消息都给处理了:

RabbitMQ入门:工作队列(Work Queue)

等work2处理完后,你再去看RabbitMQ管理页面,会发现页面的消息数值也都变成0 了。

 

五、公平分发

按照上面那种循环分发的方式,每个消费者会分到相同数量的任务,这样会有一个问题:假如有一些task非常耗时,之前的任务还没有完成,后面又来了那么多任务,来不及处理,那咋办? 有的消费者忙的不可开交,有的消费者却很快处理完事情然后无所事事浪费资源,那咋整?答案就是:公平分发。 怎么实现呢?

 发生上述问题的原因就是RabbitMQ收到消息后就立即分发出去,而没有确认各个工作者未返回确认的消息数量。因此我们可以使用basicQos方法,并将参数prefetchCount设为1,告诉RabbitMQ 我每次值处理一条消息,你要等我处理完了再分给我下一个。这样RabbitMQ就不会轮流分发了,而是寻找空闲的工作者进行分发。

代码修改(work和Work2同步修改):

RabbitMQ入门:工作队列(Work Queue)

执行代码:

1.NewTask的控制台打印结果:
** new task ****:sleep
** new task ****:task 1
** new task ****:task 2
** new task ****:task 3
** new task ****:task 4
** new task ****:task 5
** new task ****:task 6

2.Work的控制台打印结果:
**** deal task begin :sleep
**** deal task finish :sleep

3.Work2的控制台打印结果:
**** deal task begin :task 1
**** deal task finish :task 1
**** deal task begin :task 2
**** deal task finish :task 2
**** deal task begin :task 3
**** deal task finish :task 3
**** deal task begin :task 4
**** deal task finish :task 4
**** deal task begin :task 5
**** deal task finish :task 5
**** deal task begin :task 6
**** deal task finish :task 6

Work只处理了sleep,Work2处理了1、2、3、4、5、6 这个六条消息。

六、消息持久化

上面说到消息确认的时候,提到了工作者被kill的情况。那如果RabbitMQ被stop掉了呢?我们来看下:

这次只启动Work和NewTask,不启动Work2,所有消息都交给Work来处理,控制台打印信息:

1.NewTask的控制台打印结果:
** new task ****:sleep
** new task ****:task 1
** new task ****:task 2
** new task ****:task 3
** new task ****:task 4
** new task ****:task 5
** new task ****:task 6

2.Work的控制台打印结果:
**** deal task begin :sleep

在work处理sleep的过程中,我们停掉RabbitMQ服务

RabbitMQ入门:工作队列(Work Queue)

然后重新start服务并执行rabbitmq-plugins enable rabbitmq_management命令,然后查看管理页面:

RabbitMQ入门:工作队列(Work Queue)

你会发现,所有消息都将被清空了。这种丢失属于服务端丢失

因此需要将消息进行持久化来应对这种情况。

持久化需要做两件事情:

  1. 队列持久化,在声明队列的时候,将第二个参数设为trueRabbitMQ入门:工作队列(Work Queue)

          RabbitMQ入门:工作队列(Work Queue)

     另外,由于RabbitMQ不允许重新定义已经存在的队列,否则就会报错(上一篇博客中已经提到过了),因此我们将这次的队列名改下:RabbitMQ入门:工作队列(Work Queue)

     

  2. 消息持久化,在发送消息的时候,将第三个参数设为2RabbitMQ入门:工作队列(Work Queue)

然后运行代码,在work处理sleep的时候将服务停掉,并重新启动且执行rabbitmq-plugins enable rabbitmq_management命令,然后查看管理页面:

 RabbitMQ入门:工作队列(Work Queue)

一共7条消息,未确认的1条(sleep)和ready的6条(1、2、3、4、5、6)。消息被保存了下来。

 重新启动Work,所有消息被消费:

RabbitMQ入门:工作队列(Work Queue)

RabbitMQ入门:工作队列(Work Queue)

 

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

发布者:全栈程序员-用户IM,转载请注明出处:https://javaforall.cn/120898.html原文链接:https://javaforall.cn

【正版授权,激活自己账号】: Jetbrains全家桶Ide使用,1年售后保障,每天仅需1毛

【官方授权 正版激活】: 官方授权 正版激活 支持Jetbrains家族下所有IDE 使用个人JB账号...

(0)
blank

相关推荐

  • 区块链入门——比特币科普

    区块链入门——比特币科普

  • 卸载oracle数据库

    卸载oracle数据库

    2021年11月14日
  • linux vlc乱码,一劳永逸解决VLC播放中文字幕乱码问题

    linux vlc乱码,一劳永逸解决VLC播放中文字幕乱码问题VLC对于Mac/Ubuntu用户来说算得上是必备软件。其相当于PC机上的“暴风影音”,但Mac/Ubuntu的新手使用VLC播放avi时都会碰到字幕乱码的问题。avi字幕的格式有多种,这里假设你使用常见的.srt字幕。VLC默认支持的字幕内码为utf-8,而网上提供的.srt字幕基本上都是GBK码,所以在初装VLC后的默认状态下,加载.srt字幕都会出现乱码。本教程以当前最新的VLC1.1…

  • k8s–证书签发

    k8s–证书签发1.准备签发证书环境运维主机hdss-1-200.host.com上:2.安装CFSSL证书签发工具CFSSL:R1.2cfssl下载地址https://pkg.cfssl.org/R1.2/cfssl_linux-amd64cfssl-json下载地址https://pkg.cfssl.org/R1.2/cfssljson_linux-amd64cfssl-certinfo下载地址https://pkg.cfssl.org/R1.2/cfssl-certinfo_li…

  • URLEncoder转换字符串问题

    URLEncoder转换字符串问题今天在开发过程中遇到了一个坑,关于使用URLEncoder去encode字符串的问题,是解析一个下载地址,由于下载文件名中含有空格,导致encode之后所有空格变成了“+”,url拼接自然就出错了,下载地址相应不到报了404异常,由于之前没接触过这方面的事情,也算是给自己挖了一个小坑,特此记录一下。这段是业务背景,不想看的直接跳到下一段看解决办法哈。大概是这样,公司网站原本下载各种附件的地…

  • centos7安装pycharm_pycharm配置环境变量

    centos7安装pycharm_pycharm配置环境变量Centos下pycharm的安装与配置1下载安装pycharm首先在下面的网址下载安装包:https://www.jetbrains.com/pycharm/download/#section=linux然后使用下列指令将安装包放入合适的目录下(本文将安装包放入了/usr/local目录下):cd/home/yue/Downloadsmvpycharm-community-2020.1.tar.gz/usr/local然后进行安装包所在目录,进行解压:tar-zxvfpyc

发表回复

您的电子邮箱地址不会被公开。

关注全栈程序员社区公众号