RabbitMQ消息监听异常问题探究「建议收藏」

RabbitMQ消息监听异常问题探究「建议收藏」问题场景在使用SpringRabbitMQ做消息监听时,如果监听程序处理异常了,且未对异常进行捕获,会一直重复接收消息,然后一直抛异常。为了更好的描述问题,下面写个简单的例子。通过访问null对象来引发空指针异常,消息监听处理程序代码清单:packageamqp;importorg.springframework.amqp.core.Message;importorg.springfram

大家好,又见面了,我是你们的朋友全栈君。如果您正在找激活码,请点击查看最新教程,关注关注公众号 “全栈程序员社区” 获取激活教程,可能之前旧版本教程已经失效.最新Idea2022.1教程亲测有效,一键激活。

Jetbrains全系列IDE使用 1年只要46元 售后保障 童叟无欺

问题场景

在使用Spring RabbitMQ做消息监听时,如果监听程序处理异常了,且未对异常进行捕获,会一直重复接收消息,然后一直抛异常。为了更好的描述问题,下面写个简单的例子。

通过访问null对象来引发空指针异常,消息监听处理程序代码清单:

package amqp;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
import org.springframework.stereotype.Component;


@Component
public class FooMessageListener implements MessageListener { 
   

    @Override
    public void onMessage(Message message) { 
   
        String messageBody = new String(message.getBody());
        System.out.println(" [x] Received '" + messageBody + "'");
        String nullStr = null;
        nullStr.toString();
    }
}

往消息监听队列发送一条消息,控制台不停打印异常日志:

18:55:32.816 [org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-2] DEBUG org.springframework.amqp.rabbit.listener.BlockingQueueConsumer - Retrieving delivery for Consumer: tags=[{amq.ctag-s5myKVHHeP4FbTGIH0hyeA=directQueue}], channel=Cached Rabbit Channel: AMQChannel(amqp://it@192.168.48.59:5672/,1), conn: Proxy@fd85a26 Shared Rabbit Connection: SimpleConnection@68887242 [delegate=amqp://it@192.168.48.59:5672/, localPort= 49412], acknowledgeMode=AUTO local queue size=0
18:55:32.979 [pool-1-thread-10] DEBUG org.springframework.amqp.rabbit.listener.BlockingQueueConsumer - Storing delivery for Consumer: tags=[{amq.ctag-s5myKVHHeP4FbTGIH0hyeA=directQueue}], channel=Cached Rabbit Channel: AMQChannel(amqp://it@192.168.48.59:5672/,1), conn: Proxy@fd85a26 Shared Rabbit Connection: SimpleConnection@68887242 [delegate=amqp://it@192.168.48.59:5672/, localPort= 49412], acknowledgeMode=AUTO local queue size=0
18:55:32.979 [org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-2] DEBUG org.springframework.amqp.rabbit.listener.BlockingQueueConsumer - Received message: (Body:'Hello, world!' MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=null, receivedDeliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=, receivedRoutingKey=directQueue, receivedDelay=null, deliveryTag=1, messageCount=0, consumerTag=amq.ctag-s5myKVHHeP4FbTGIH0hyeA, consumerQueue=directQueue])
 [x] Received 'Hello, world!'
18:55:38.191 [org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-2] WARN org.springframework.amqp.rabbit.listener.ConditionalRejectingErrorHandler - Execution of Rabbit message listener failed.
org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener threw exception
	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.wrapToListenerExecutionFailedExceptionIfNeeded(AbstractMessageListenerContainer.java:870)
	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:780)
	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:700)
	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$001(SimpleMessageListenerContainer.java:95)
	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$1.invokeListener(SimpleMessageListenerContainer.java:187)
	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:1187)
	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:681)
	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1165)
	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:1149)
	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1100(SimpleMessageListenerContainer.java:95)
	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1312)
	at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NullPointerException: null
	at amqp.FooMessageListener.onMessage(FooMessageListener.java:16)
	at org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter.onMessage(MessageListenerAdapter.java:282)
	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:777)
	... 10 common frames omitted
18:55:38.192 [org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-2] DEBUG org.springframework.amqp.rabbit.listener.BlockingQueueConsumer - Rejecting messages (requeue=true)
18:55:38.193 [org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-2] DEBUG org.springframework.amqp.rabbit.listener.BlockingQueueConsumer - Retrieving delivery for Consumer: tags=[{amq.ctag-s5myKVHHeP4FbTGIH0hyeA=directQueue}], channel=Cached Rabbit Channel: AMQChannel(amqp://it@192.168.48.59:5672/,1), conn: Proxy@fd85a26 Shared Rabbit Connection: SimpleConnection@68887242 [delegate=amqp://it@192.168.48.59:5672/, localPort= 49412], acknowledgeMode=AUTO local queue size=0
18:55:38.194 [pool-1-thread-3] DEBUG org.springframework.amqp.rabbit.listener.BlockingQueueConsumer - Storing delivery for Consumer: tags=[{amq.ctag-s5myKVHHeP4FbTGIH0hyeA=directQueue}], channel=Cached Rabbit Channel: AMQChannel(amqp://it@192.168.48.59:5672/,1), conn: Proxy@fd85a26 Shared Rabbit Connection: SimpleConnection@68887242 [delegate=amqp://it@192.168.48.59:5672/, localPort= 49412], acknowledgeMode=AUTO local queue size=0
18:55:38.195 [org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-2] DEBUG org.springframework.amqp.rabbit.listener.BlockingQueueConsumer - Received message: (Body:'Hello, world!' MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=null, receivedDeliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=true, receivedExchange=, receivedRoutingKey=directQueue, receivedDelay=null, deliveryTag=2, messageCount=0, consumerTag=amq.ctag-s5myKVHHeP4FbTGIH0hyeA, consumerQueue=directQueue])
 [x] Received 'Hello, world!'
18:55:55.226 [org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-2] WARN org.springframework.amqp.rabbit.listener.ConditionalRejectingErrorHandler - Execution of Rabbit message listener failed.
org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener threw exception
	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.wrapToListenerExecutionFailedExceptionIfNeeded(AbstractMessageListenerContainer.java:870)
	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:780)
	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:700)
	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$001(SimpleMessageListenerContainer.java:95)
	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$1.invokeListener(SimpleMessageListenerContainer.java:187)
	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:1187)
	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:681)
	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1165)
	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:1149)
	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1100(SimpleMessageListenerContainer.java:95)
	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1312)
	at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NullPointerException: null
	at amqp.FooMessageListener.onMessage(FooMessageListener.java:16)
	at org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter.onMessage(MessageListenerAdapter.java:282)
	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:777)
	... 10 common frames omitted
18:55:55.226 [org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-2] DEBUG org.springframework.amqp.rabbit.listener.BlockingQueueConsumer - Rejecting messages (requeue=true)
18:55:55.227 [org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-2] DEBUG org.springframework.amqp.rabbit.listener.BlockingQueueConsumer - Retrieving delivery for Consumer: tags=[{amq.ctag-s5myKVHHeP4FbTGIH0hyeA=directQueue}], channel=Cached Rabbit Channel: AMQChannel(amqp://it@192.168.48.59:5672/,1), conn: Proxy@fd85a26 Shared Rabbit Connection: SimpleConnection@68887242 [delegate=amqp://it@192.168.48.59:5672/, localPort= 49412], acknowledgeMode=AUTO local queue size=0
18:55:55.229 [pool-1-thread-4] DEBUG org.springframework.amqp.rabbit.listener.BlockingQueueConsumer - Storing delivery for Consumer: tags=[{amq.ctag-s5myKVHHeP4FbTGIH0hyeA=directQueue}], channel=Cached Rabbit Channel: AMQChannel(amqp://it@192.168.48.59:5672/,1), conn: Proxy@fd85a26 Shared Rabbit Connection: SimpleConnection@68887242 [delegate=amqp://it@192.168.48.59:5672/, localPort= 49412], acknowledgeMode=AUTO local queue size=0
18:55:55.230 [org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-2] DEBUG org.springframework.amqp.rabbit.listener.BlockingQueueConsumer - Received message: (Body:'Hello, world!' MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=null, receivedDeliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=true, receivedExchange=, receivedRoutingKey=directQueue, receivedDelay=null, deliveryTag=3, messageCount=0, consumerTag=amq.ctag-s5myKVHHeP4FbTGIH0hyeA, consumerQueue=directQueue])

问题来了:为什么异常时会一直重复接收这条消息?

抓包验证

消息监听程序异常的过程到底发生了什么?为了一探究竟,笔者使用Wireshark抓包工具来查看消息处理过程。

首先,修改下监听程序,收到字符串exception时产生异常

 @Override
 public void onMessage(Message message) { 
   
     String messageBody = new String(message.getBody());
     System.out.println(" [x] Received '" + messageBody + "'");
     if(messageBody.equals("exception")){ 
   
         String nullStr = null;
         nullStr.toString();
     }
 }

1.监听程序正常处理情况

启动监听程序,Wireshark抓包情况:

启动监听程序

我们主要关注最后一列,这一列展示了请求的AMQP协议方法信息,AMQP协议方法包含类名+方法名+参数,这一列主要展示了类名和方法名,点击对应行可以查看参数信息。比如上图:

  • Connection.Start:请求服务端开始建立连接
  • Channel.Open:请求服务端建立信道
  • Queue.Declare:声明队列
  • Basic.Consume:开始一个消费者,请求指定队列的消息

AMQP协议方法更详细介绍可以查看官网

然后,通过客户端发送一条消息

RabbitTemplate template = ctx.getBean(RabbitTemplate.class);
template.convertAndSend("ok");

抓包:

ack

分析:

  • Basic.Publish: 客户端发送Basic.Publish方法请求,将消息发布到exchange,rabbitmq server会根据路由规则转发到队列中
  • Basic.Deliver: 服务端发送Basic.Deliver方法请求,投递消息到监听队列的客户端消费者
  • Basic.Ack: 客户端发送Basic.Ack方法请求,告知rabbimq server,消息已接收处理

2.监听程序异常处理情况

通过客户端发送exception字符串,制造异常

RabbitTemplate template = ctx.getBean(RabbitTemplate.class);
template.convertAndSend("exception");

抓包:

reject

分析:

  • Basic.Reject: 客户端发送Basic.Reject方法请求,表示无法处理消息,拒绝消息,此时的requeue参数为true,将消息返回原来的队列
  • Basic.Deliver: 服务端调用Basic.Deliver方法,和第一次Basic.Deliver方法不同的是,此时的redeliver参数为true,表示重新投递消息到监听队列的消费者

然后这两步会一直重复下去。对于Basic.Reject方法,可以设置requeue参数为false,这样消息无法处理的时候就不会重新入队了,他会根据异常类型选择直接丢弃或加入dead-letter-exchange中。Spring RabbitMQ配置:

<!--配置监听-->
<rabbit:listener-container connection-factory="connectionFactory" requeue-rejected="false">
    <rabbit:listener ref="fooMessageListener" queue-names="directQueue" />
</rabbit:listener-container>

结论

RabbitMQ消息监听程序异常时,消费者会向rabbitmq server发送Basic.Reject,表示消息拒绝接受,由于Spring默认requeue-rejected配置为true,消息会重新入队,然后rabbitmq server重新投递,造成了程序一直异常的情况。所以说了这么多,我们通过rabbitmq监听消息的时候,程序一定要添加try…catch语句!!!当然你也可以根据实际情况,选择设置requeue-rejected为false来丢弃消息。

参考

[1] AMQP协议方法
[2] Wireshark抓包教程
[3] http://blog.csdn.net/u013256816/article/details/55515234
[4] http://fengchj.com/?p=2234
[5] https://yemengying.com/2017/01/30/how-does-rabbitmq-handle-exception/

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

发布者:全栈程序员-用户IM,转载请注明出处:https://javaforall.cn/170742.html原文链接:https://javaforall.cn

【正版授权,激活自己账号】: Jetbrains全家桶Ide使用,1年售后保障,每天仅需1毛

【官方授权 正版激活】: 官方授权 正版激活 支持Jetbrains家族下所有IDE 使用个人JB账号...

(0)


相关推荐

发表回复

您的电子邮箱地址不会被公开。

关注全栈程序员社区公众号