RabbitMQ(二):Work Queues、循环分发、消息确认、持久化、公平分发

RabbitMQ(二):Work Queues、循环分发、消息确认、持久化、公平分发

内容翻译自:RabbitMQ Tutorials Java版


RabbitMQ(一):Hello World程序

RabbitMQ(二):Work Queues、循环分发、消息确认、持久化、公平分发

RabbitMQ(三):Exchange交换器–fanout

RabbitMQ(四):Exchange交换器–direct

RabbitMQ(五):Exchange交换器–topic

RabbitMQ(六):回调队列callback queue、关联标识correlation id、实现简单的RPC系统

RabbitMQ(七):常用方法说明 与 学习小结


Work Queues:

在上一篇博客中,我们实现了从一个指定的队列中发送和接收消息。在这一部分,我们将会创建一个工作队列:用来将耗时的任务分发给多个工作者。

工作队列的主要思想是避免这样的情况:直接去做一件资源密集型的任务,并且还得等它完成。相反,我们将任务安排到之后再去做。我们将任务封装为一个消息,并发到队列中。一个工作进程将会在后台取出任务并最终完成工作。如果开启多个工作进程,任务将会在这多个工作进程间共享。

这个概念在web应用中是非常有用的,因为web应用不可能在一个HTTP请求中去处理一个复杂的任务。


准备:

在上一篇博客中,我们发送了“hello world”的消息。现在,我们会发送一些代表复杂任务的字符串。我们没有真实的任务(比如调整图片大小、PDF文件加载等),所以我们使用 Thread.sleep() 方法来伪造耗时任务。我们用字符串中的点号.来表示任务的复杂性,一个点就表示需要耗时1秒,比如一个描述为hello...的假任务,它需要耗时3秒。

将上个教程中的Send.java中的代码稍作修改。因为这个程序会调度任务到工作队列,所以我们将它命名为NewTask.java

String message = "1.";

channel.basicPublish("", "hello", null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");

之前的Recv.java同样也要做些修改,它需要模拟消息中的点代表的耗时。因为它负责接收消息并处理任务,所以,将它命名为Worker.java

final Consumer consumer = new DefaultConsumer(channel) {
  @Override
  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    String message = new String(body, "UTF-8");

    System.out.println(" [x] Received '" + message + "'");
    try {
      doWork(message);
    } finally {
      System.out.println(" [x] Done");
    }
  }
};
boolean autoAck = true; // acknowledgment is covered below
channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer);

我们的假任务的执行:

private static void doWork(String task) throws InterruptedException {
    for (char ch: task.toCharArray()) {
        if (ch == '.') Thread.sleep(1000);
    }
}   

循环分发:

使用任务队列的一个优势在于容易并行处理。如果积压了大量的工作,我们只需要添加更多的工作者(上文中的Worker.java中的概念),这样很容易扩展。

首先,我们来尝试同时运行两个工作者实例(Worker.java)。它们都会从队列中获取消息,但具体是如何获取的呢?

启动NewTask,之后,可以依次将message修改为”2..”、”3…”、”4….”、”5…..”等,每修改一次就运行一次。
观察console中两个工作者的接收消息情况:

//其中之一的worker
 [x] Received '1.'
 [x] Done
 [x] Received '3...'
 [x] Done
 [x] Received '5....'
 [x] Done
 [x] Received '7....'
 [x] Done
//另一个worker
 [x] Received '2..'
 [x] Done
 [x] Received '4....'
 [x] Done
 [x] Received '6....'
 [x] Done
 [x] Received '8....'
 [x] Done

可以看出,默认情况下,RabbitMQ是轮流发送消息给下一个消费者,平均每个消费者接收到的消息数量是相等的。这种分发消息的方式叫做循环分发。你可以试一下开3个或更多工作者的情况。


消息确认:

完成一项任务可能会耗费几秒钟,你可能会问,假如其中一个消费者开始了一个非常耗时的任务,并在执行这个任务的时候崩溃了(也就是没有完成这个任务),将会发生什么事情。按照上面的代码,一旦RabbitMQ向消费者发出消息,消息就会立即从内存中移除。在这种情况下,如果你杀死一个工作者,我们将会失去它正在处理的消息,同时也会丢失所有发给这个工作者但这个工作者还未处理的消息。

但我们不想丢掉任务,如果一个工作者死掉,我们想将这个任务发给其他的工作者。

为了确保消息永远不会丢失,RabbitMQ支持消息确认。消费者将会发送一个确认信息来告诉RabbitMQ,我已经接收到了消息,并且处理完了,你可以随便删它了。

如果一个消费者在发送确认信息前死去(连接或通道关闭、TCP连接丢失等),RabbitMQ将会认为该消息没有被完全处理并会重新将消息加入队列。如果此时有其他的消费者,RabbitMQ很快就会重新发送该消息到其他的消费者。通过这种方式,你完全可以保证没有消息丢失,即使某个消费者意外死亡。

对RabbitMQ而言,没有消息超时这一说。如果消费者死去,RabbitMQ将会重新发送消息。即使处理一个消息需要耗时很久很久也没有关系。

消息确认机制是默认打开的。只是在前面的代码中,我们显示地关掉了:boolean autoAck=true。将代码做如下修改:

channel.basicQos(1); // accept only one unack-ed message at a time (see below)

final Consumer consumer = new DefaultConsumer(channel) {
  @Override
  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    String message = new String(body, "UTF-8");

    System.out.println(" [x] Received '" + message + "'");
    try {
      doWork(message);
    } finally {
      System.out.println(" [x] Done");
      channel.basicAck(envelope.getDeliveryTag(), false);
    }
  }
};
boolean autoAck = false;
channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer);

注意到最上面的那句代码:

channel.basicQos(int prefetchCount);

其中的参数prefetchCount表示:maximum number of messages that the server will deliver

这样,就可以确保即使消费者挂了,消息也不会丢失。


消息持久化:

通过上面的教程,我们知道如何确保消费者挂掉也不会丢失消息。但是,加入RabbitMQ服务器挂掉了怎么办?

如果关闭RabbitMQ服务或者RabbitMQ服务崩溃了,RabbitMQ就会丢掉所有的队列和消息:除非你告诉它不要这样。要确保RabbitMQ服务关闭或崩溃后消息不会丢失,要做两件事情:持久化队列、持久化消息。

首先,我们要确保RabbitMQ永远不会丢失我们的队列。怎么做呢?在声明队列的时候,指定durable参数为true。

boolean durable = true;
channel.queueDeclare("hello", durable, false, false, null);

尽管上面的代码没有错,但是它不会按所想的那样将队列持久化:因为之前我们已经将hello这个队列设置了不持久化,RabbitMQ不允许重新定义已经存在的队列,否则就会报错。但是,我们有一个快速的解决办法:声明另外一个队列就行了,只要不叫hello,比如task_queue

boolean durable = true;
channel.queueDeclare("task_queue", durable, false, false, null);

现在,我们已经确保队列不会丢失了,那么如何将消息持久化呢:将MessageProperties的值设置为PERSISTENT_TEXT_PLAIN

import com.rabbitmq.client.MessageProperties;

channel.basicPublish("", "task_queue",
            MessageProperties.PERSISTENT_TEXT_PLAIN,
            message.getBytes());

将消息标记为持久化并不能完全保证消息不会丢失。尽管它告诉RabbitMQ将消息保存到磁盘中,但是在RabbitMQ接收到消息和保存消息之间会与一个很短的时间窗。同时,RabbitMQ不会为每个消息做fsync(2)处理,消息可能仅仅保存到缓存中而不会真正地写入到磁盘中。这种持久化保证尽管不够健壮,但已经远远足够我们的简单任务队列。如果你需要更强大的保证,可以使用[publisher confirms](https://www.rabbitmq.com/confirms.html)


公平分发:

你可能已经发现,循环消息分发并不是我们想要的。比如,有两个工作者,当奇数消息(如上文中的”1…”、”3…”、”5…”、”7…”)很耗时,而偶数消息(如上文中的”2.”、”4.”、”6.”、”8.”)很简单的时候,其中一个工作者就会一直很忙而另一个工作者就会闲。然而RabbitMQ对这些一概不知,它只是在轮流平均地发消息。

这种情况的发生是因为,RabbitMQ 只是当消息进入队列时就分发出去,而没有查看每个工作者未返回确认信息的数量。

为了改变这种情况,我们可以使用basicQos方法,并将参数prefetchCount设为1。这样做,工作者就会告诉RabbitMQ:不要同时发送多个消息给我,每次只发1个,当我处理完这个消息并给你确认信息后,你再发给我下一个消息。这时候,RabbitMQ就不会轮流平均发送消息了,而是寻找闲着的工作者。

int prefetchCount = 1;
channel.basicQos(prefetchCount);

注意,如果所有的工作者都很忙,你的队列可能会装满,你必须留意这种情况:或者添加更多的工作者,或者采取其他策略。

完整代码:

NewTask.java

import java.io.IOException;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.MessageProperties;

public class NewTask {

  private static final String TASK_QUEUE_NAME = "task_queue";

  public static void main(String[] argv)
                      throws java.io.IOException {

    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);

    String message = getMessage(argv);

    channel.basicPublish( "", TASK_QUEUE_NAME,
            MessageProperties.PERSISTENT_TEXT_PLAIN,
            message.getBytes());
    System.out.println(" [x] Sent '" + message + "'");

    channel.close();
    connection.close();
  }      
  //...
}

Worker.java

import com.rabbitmq.client.*;

import java.io.IOException;

public class Worker {
  private static final String TASK_QUEUE_NAME = "task_queue";

  public static void main(String[] argv) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    final Connection connection = factory.newConnection();
    final Channel channel = connection.createChannel();

    channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

    channel.basicQos(1);

    final Consumer consumer = new DefaultConsumer(channel) {
      @Override
      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        String message = new String(body, "UTF-8");

        System.out.println(" [x] Received '" + message + "'");
        try {
          doWork(message);
        } finally {
          System.out.println(" [x] Done");
          channel.basicAck(envelope.getDeliveryTag(), false);
        }
      }
    };
    boolean autoAck = false;
    channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer);
  }

  private static void doWork(String task) {
    for (char ch : task.toCharArray()) {
      if (ch == '.') {
        try {
          Thread.sleep(1000);
        } catch (InterruptedException _ignored) {
          Thread.currentThread().interrupt();
        }
      }
    }
  }
}

 


说明

①与原文略有出入,如有疑问,请参考原文。
②原文是直接用javacp命令运行代码,用IDE更方便。

博客转自:https://www.jianshu.com/p/37c23ed0a5f1
 

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

发布者:全栈程序员-用户IM,转载请注明出处:https://javaforall.cn/114608.html原文链接:https://javaforall.cn

【正版授权,激活自己账号】: Jetbrains全家桶Ide使用,1年售后保障,每天仅需1毛

【官方授权 正版激活】: 官方授权 正版激活 支持Jetbrains家族下所有IDE 使用个人JB账号...

(0)


相关推荐

  • (一)如何让selenium爬我们需要的东西

    (一)如何让selenium爬我们需要的东西

  • cozmo vector的起源最详细的说明「建议收藏」

    VECTORvector,向量,从一个点,往一个方向无限延申。anki公司最初给他们的第一个家庭机器人取名就复用了vector这个众所周知的名字。要谈vector,我还是先从vector的小兄弟cozmo谈起……其实他们最初是个玩具公司,他们开发的赛车玩具我也没玩过,直到可爱的cozmo出现,它的特点就是很可爱。有手有脚有情绪,虽然跑起来没有赛车快。我赶紧买了一个回来。哈哈,说白了就是一个树莓派包装下。anki公司对他的定位还只是一个玩具,可以在手机上安装APP,APP.

  • SpringBoot2+Netty+WebSocket(netty实现websocket,支持URL参数)

    SpringBoot2+Netty+WebSocket(netty实现websocket,支持URL参数)关于NettyNetty是一个利用Java的高级网络的能力,隐藏其背后的复杂性而提供一个易于使用的API的客户端/服务器框架。MAVEN依赖 <dependencies> <!–https://mvnrepository.com/artifact/io.netty/netty-all–> <dependency> <g…

  • Udp攻击_dns是udp协议还是tcp协议

    Udp攻击_dns是udp协议还是tcp协议UDP洪流攻击是导致基于主机的服务拒绝攻击的一种。用户数据报协议(UDP)是一种无连接协议,它不需要用任何程序建立连接来传输数据。当数据包经由UDP协议发送时,发送双方无需通过三次握手建立连接,接收方必须接收处理该资料包。因此大量的发往受害主机UDP报文能使网络饱和。…

  • JavaScript实现页面前进后退「建议收藏」

    JavaScript实现页面前进后退「建议收藏」function pagebackward()   {     window.history.back();   }      function pageforward()   {     window.history.forward();   }      click=”pageforward()”>

  • export方法_import怎么用

    export方法_import怎么用基础命令学习目录首页export的基本作用就是将父shell中的局部变量设置为环境变量,使得该变量可以在子shell中使用。下面设置两种情景对export进行原理解析。情景1.有一个名为myexport.sh的脚本,内容如下:#!/bin/shexportMY_PATH=/usr/local12在linux环境中打开终端运行该shell$shmy…

发表回复

您的电子邮箱地址不会被公开。

关注全栈程序员社区公众号