RabbitMQ 延迟队列实现订单支付结果异步阶梯性通知[通俗易懂]

RabbitMQ 延迟队列实现订单支付结果异步阶梯性通知[通俗易懂]在第三方支付中,例如支付宝、或者微信,对于订单请求,第三方支付系统采用的是消息同步返回、异步通知+主动补偿查询的补偿机制。 由于互联网通信的不可靠性,例如双方网络、服务器、应用等因素的影响,不管是同步返回、异步通知、主动查询报文都可能出现超时无响应、报文丢失等情况,所以像支付业务,对结果的通知一般采用几种方案结合的补偿机制,不能完全依赖某一种机制。例如一个支付结果的通知,一方面会在支付页…

大家好,又见面了,我是你们的朋友全栈君。

在第三方支付中,例如支付宝、或者微信,对于订单请求,第三方支付系统采用的是消息同步返回、异步通知+主动补偿查询的补偿机制。

 

由于互联网通信的不可靠性,例如双方网络、服务器、应用等因素的影响,不管是同步返回、异步通知、主动查询报文都可能出现超时无响应、报文丢失等情况,所以像支付业务,对结果的通知一般采用几种方案结合的补偿机制,不能完全依赖某一种机制。
例如一个支付结果的通知,一方面会在支付页面跳转时候返回支付结果(一般只用作前端展示使用,非最终状态),同时会采用后台异步通知机制(有前台、后台通知的,以后台异步通知结果为准),但由于前台跳转、后台结果通知都可能失效,因此还以定时补单+请求方主动查询接口作为辅助手段。
常见的补单操作,任务调度策略一般设定30秒、60秒、3分钟、6分钟、10分钟调度多次(以自己业务需要),如果调度接收到响应确认报文,补单成功,则中止对应订单的调度任务;如果超过补单上限次数,则停止补单,避免无谓的资源浪费。请求端随时可以发起请求报文查询对应订单的状态。

在日常开发中,对于网站前端来说,支付计费中心对于订单请求信息的处理也是通过消息同步返回、异步通知+主动补偿查询相结合的机制,其中对于订单的异步通知,目前的通知策略为3s、30s、60s、120s、180、300s的阶梯性通知。返回成功情况下就不继续通知了,本来打算使用将失败的消息写到数据库等待发送,然后每秒查询数据库获取消息通知前端。但觉得这样的处理方式太粗暴。存在以下缺点:

1 、每秒请求有点儿浪费资源; 2 、通知方式不稳定; 3 、无法承受大数据量等等

所以最终打算使用rabbitmq的消息延迟+死信队列来实现。消息模型如下:

RabbitMQ 延迟队列实现订单支付结果异步阶梯性通知[通俗易懂]

producer发布消息,通过exchangeA的消息会被分发到QueueA,Consumer监听queueA,一旦有消息到来就被消费,这边的消费业务就是通知前端,如果通知失败,就创建一个延迟队列declareQueue,设置每个消息的ttl然后通过declare_exchange将消息分发到declare_queue,因为declare_queue没有consumer并且declare_queue中的消息设置了ttl,当ttl到期后,将通过DEX路由到queueA,被重新消费。

代码如下:DeclareQueue.java

 
  1. package org.delayQueue;

  2.  
  3. import com.rabbitmq.client.BuiltinExchangeType;

  4. import com.rabbitmq.client.Channel;

  5. import com.rabbitmq.client.Connection;

  6. import com.rabbitmq.client.ConnectionFactory;

  7.  
  8. public class DeclareQueue {

  9. public static String EXCHANGE_NAME = "notifyExchange";

  10.  
  11. public static void init() {

  12. ConnectionFactory factory = new ConnectionFactory();

  13. factory.setHost("localhost");

  14. factory.setPort(5672);

  15.  
  16. Connection connection = null;

  17. try {

  18. connection = factory.newConnection();

  19. Channel channel = connection.createChannel();

  20. channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);

  21. String routingKey = "AliPaynotify";

  22. String message = "http://localhost:8080/BossCenter/payGateway/notifyRecv.jsp?is_success=T¬ify_id=4ab9bed148d043d0bf75460706f7774a¬ify_time=2014-08-29+16%3A22%3A02¬ify_type=trade_status_sync&out_trade_no=1421712120109862&total_fee=424.42&trade_no=14217121201098611&trade_status=TRADE_SUCCESS";

  23. channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());

  24. System.out.println(" [x] Sent :" + message);

  25. } catch (Exception e) {

  26. // TODO Auto-generated catch block

  27. e.printStackTrace();

  28. } finally {

  29. if (connection != null) {

  30. try {

  31. connection.close();

  32. } catch (Exception ignore) {

  33. }

  34. }

  35. }

  36. }

  37.  
  38. public static void main(String args[]) {

  39. init();

  40. }

  41.  
  42. }

DeclareConsumer.java

 
  1. package org.delayQueue;

  2.  
  3. import java.io.BufferedReader;

  4. import java.io.IOException;

  5. import java.io.InputStreamReader;

  6. import java.util.ArrayList;

  7. import java.util.HashMap;

  8. import java.util.List;

  9. import java.util.Map;

  10. import java.util.Map.Entry;

  11.  
  12. import org.apache.http.HttpResponse;

  13. import org.apache.http.client.ClientProtocolException;

  14. import org.apache.http.client.HttpClient;

  15. import org.apache.http.client.methods.HttpPost;

  16. import org.apache.http.impl.client.DefaultHttpClient;

  17.  
  18. import com.rabbitmq.client.AMQP;

  19. import com.rabbitmq.client.Channel;

  20. import com.rabbitmq.client.Connection;

  21. import com.rabbitmq.client.ConnectionFactory;

  22. import com.rabbitmq.client.Consumer;

  23. import com.rabbitmq.client.DefaultConsumer;

  24. import com.rabbitmq.client.Envelope;

  25.  
  26. public class DeclareConsumer {

  27. public static String EXCHANGE_NAME = "notifyExchange";

  28. public static String QU_declare_15S = "Qu_declare_15s";

  29. public static String EX_declare_15S = "EX_declare_15s";

  30. public static String ROUTINGKEY = "AliPaynotify";

  31. public static Connection connection = null;

  32. public static Channel channel = null;

  33. public static Channel DECLARE_15S_CHANNEL = null;

  34. public static String declare_queue = "init";

  35. public static String originalExpiration = "0";

  36. public static void init() throws Exception {

  37. ConnectionFactory factory = new ConnectionFactory();

  38. factory.setHost("localhost");

  39. factory.setPort(5672);

  40. connection = factory.newConnection();

  41. channel = connection.createChannel();

  42. DECLARE_15S_CHANNEL = connection.createChannel();

  43. }

  44.  
  45. public static void consume() {

  46. try {

  47. channel.exchangeDeclare(EXCHANGE_NAME, "topic");

  48. final String queueName = channel.queueDeclare().getQueue();

  49.  
  50. channel.queueBind(queueName, EXCHANGE_NAME, ROUTINGKEY);

  51. System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

  52. final Consumer consumer = new DefaultConsumer(channel) {

  53. @Override

  54. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

  55. String message = new String(body, "UTF-8");

  56. Map<String, Object> headers = properties.getHeaders();

  57. if (headers != null) {

  58. List<Map<String, Object>> xDeath = (List<Map<String, Object>>) headers.get("x-death");

  59. System.out.println("xDeath--- > " + xDeath);

  60. if (xDeath != null && !xDeath.isEmpty()) {

  61. Map<String, Object> entrys = xDeath.get(0);

  62. // for(Entry<String, Object>

  63. // entry:entrys.entrySet()){

  64. // System.out.println(entry.getKey()+":"+entry.getValue());

  65. // }

  66. originalExpiration = entrys.get("original-expiration").toString();

  67. }

  68. }

  69. System.out.println(" [x] Received '" + envelope.getRoutingKey() + "':'" + message + "'" + "time" + System.currentTimeMillis());

  70. HttpClient httpClient = new DefaultHttpClient();

  71. HttpPost post = new HttpPost(message);

  72. HttpResponse response = httpClient.execute(post);

  73. BufferedReader inreader = null;

  74. if (response.getStatusLine().getStatusCode() == 200) {

  75. inreader = new BufferedReader(new InputStreamReader(response.getEntity().getContent(), "UTF-8"));

  76. StringBuffer responseBody = new StringBuffer();

  77. String line = null;

  78. while ((line = inreader.readLine()) != null) {

  79. responseBody.append(line);

  80. }

  81. if (!responseBody.equals("success")) {

  82. // putDeclre15s(message);

  83. if (originalExpiration.equals("0")) {

  84. putDeclreQueue(message, 3000, QU_declare_15S);

  85. }

  86. if (originalExpiration.equals("3000")) {

  87. putDeclreQueue(message, 30000, QU_declare_15S);

  88. }

  89. if (originalExpiration.equals("30000")) {

  90. putDeclreQueue(message, 60000, QU_declare_15S);

  91. }

  92. if (originalExpiration.equals("60000")) {

  93. putDeclreQueue(message, 120000, QU_declare_15S);

  94. }

  95. if (originalExpiration.equals("120000")) {

  96. putDeclreQueue(message, 180000, QU_declare_15S);

  97. }

  98. if (originalExpiration.equals("180000")) {

  99. putDeclreQueue(message, 300000, QU_declare_15S);

  100. }

  101. if (originalExpiration.equals("300000")) {

  102. // channel.basicConsume(QU_declare_300S,true, this);

  103. System.out.println("finish notify");

  104. }

  105. }

  106. } else {

  107. System.out.println(response.getStatusLine().getStatusCode());

  108. }

  109. }

  110. };

  111.  
  112. channel.basicConsume(queueName, true, consumer);

  113. } catch (Exception e) {

  114. e.printStackTrace();

  115. } finally {

  116. }

  117. }

  118.  
  119.  
  120.  
  121. static Map<String, Object> xdeathMap = new HashMap<String, Object>();

  122. static List<Map<String, Object>> xDeath = new ArrayList<Map<String, Object>>();

  123. static Map<String, Object> xdeathParam = new HashMap<String, Object>();

  124.  
  125. public static void putDeclre15s(String message) throws IOException {

  126. channel.exchangeDeclare(EX_declare_15S, "topic");

  127. Map<String, Object> args = new HashMap<String, Object>();

  128. args.put("x-dead-letter-exchange", EXCHANGE_NAME);// 死信exchange

  129. AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();

  130. builder.expiration("3000").deliveryMode(2);// 设置消息TTL

  131. AMQP.BasicProperties properties = builder.build();

  132. channel.queueDeclare(QU_declare_15S, false, false, false, args);

  133. channel.queueBind(QU_declare_15S, EX_declare_15S, ROUTINGKEY);

  134. channel.basicPublish(EX_declare_15S, ROUTINGKEY, properties, message.getBytes());

  135. System.out.println("send message in QA_DEFERRED_15S" + message + "time" + System.currentTimeMillis());

  136. }

  137.  
  138. public static void putDeclreQueue(String message, int mis, String queue) throws IOException {

  139. channel.exchangeDeclare(EX_declare_15S, "topic");

  140. Map<String, Object> args = new HashMap<String, Object>();

  141. args.put("x-dead-letter-exchange", EXCHANGE_NAME);// 死信exchange

  142. AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();

  143. builder.expiration(String.valueOf(mis)).deliveryMode(2);// 设置消息TTL

  144. AMQP.BasicProperties properties = builder.build();

  145. channel.queueDeclare(queue, false, false, false, args);

  146. channel.queueBind(queue, EX_declare_15S, ROUTINGKEY);

  147. channel.basicPublish(EX_declare_15S, ROUTINGKEY, properties, message.getBytes());

  148. System.out.println("send message in " + queue + message + "time============" + System.currentTimeMillis());

  149. }

  150.  
  151. public static void main(String args[]) throws Exception {

  152. init();

  153. consume();

  154. }

  155. }

消息通过dlx转发的情况下,header头部会带有x-death的一个数组,里面包含消息的各项属性,比如说消息成为死信的原因reason,original-expiration这个字段表示消息在原来队列中的过期时间,根据这个值来确定下一次通知的延迟时间应该是多少秒。

运行结果如下:

RabbitMQ 延迟队列实现订单支付结果异步阶梯性通知[通俗易懂]

RabbitMQ 延迟队列实现订单支付结果异步阶梯性通知[通俗易懂]

RabbitMQ 延迟队列实现订单支付结果异步阶梯性通知[通俗易懂]

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

发布者:全栈程序员-用户IM,转载请注明出处:https://javaforall.cn/145842.html原文链接:https://javaforall.cn

【正版授权,激活自己账号】: Jetbrains全家桶Ide使用,1年售后保障,每天仅需1毛

【官方授权 正版激活】: 官方授权 正版激活 支持Jetbrains家族下所有IDE 使用个人JB账号...

(0)


相关推荐

  • Oracle如何创建数据库[通俗易懂]

    Oracle如何创建数据库[通俗易懂]C:\Users\爸爸>sqlplus–执行OracleSQL*Plus:Release11.2.0.1.0Productionon星期四3月1014:14:052022Copyright(c)1982,2010,Oracle.Allrightsreserved.请输入用户名:system–用户名输入口令:–密码连接到:OracleDatabase11gEnterpriseEditionRelease11.2.0.1….

  • dfs算法java(java算法预测)

    packagecom.yangkaile.generator;importlombok.extern.slf4j.Slf4j;importorg.junit.jupiter.api.Test;importjava.util.*;/***@description:DFA算法案例*@className:ApplicationTest*@author:wangdong*@Date:2021/7/2615:56*/@Slf4jpublicclas.

  • 【Shader】Shader官方示例[通俗易懂]

    官方示例原文地址:https://docs.unity3d.com/Manual/SL-SurfaceShaderExamples.htmlSurfaceShader示例在表面着色器。此页面上的示例显示如何使用内置照明模型。有关如何实现自定义光照模型的示例,请参阅SurfaceShader光照示例。简单着色器例我们将从一个非常简单的Shader开始,并在此基础上进行构建。这是一个将…

  • python中unittest框架_unittest接口自动化

    python中unittest框架_unittest接口自动化unittest简介参考:https://urlify.cn/e6rAr2为什么要使用unittest在编写接口自动化用例时,我们一般针对一个接口建立一个.py文件,一条测试用例封装为一个函数(方法),但是在批量执行的过程中,如果其中一条出错,后面的用例就无法执行。使用测试框架可以互不影响的用例执行及更灵活的执行控制。unittest特点•python自带的单元测试框架,无需安装;•用例执行互不干扰;•提供不同范围的setUp(测试准备)和tearDown(测试清理)方法;•

    2022年10月14日
  • matlab的解决反复激活问题的license.lic文件[通俗易懂]

    matlab的解决反复激活问题的license.lic文件[通俗易懂]%%%%%%%%%%%%%%%%%%%%%%%%%%%%INCREMENTAerospace_BlocksetMLM99permanentuncounted\A05070F00D1EB1F92326VENDOR_STRING=QQ=47399897HOSTID=ANY\ck=216SN=888888TS_OKINCREMENTAerospace_ToolboxMLM99permanentuncounted\6090F

  • 领域驱动实践总结(基本理论总结与分析V+架构分析与代码设计+具体应用设计分析)

    领域驱动实践总结(基本理论总结与分析V+架构分析与代码设计+具体应用设计分析)领域驱动实践总结一:基本理论总结与分析一、领域驱动设计两大设计:战略设计和战术设计二、理解和分析领域+子域+核心域+通用域+支撑域三、理解和分析界限上下文,定义领域边界四、理解和分析实体和值对象五、理解和分析聚合思想:聚合和聚合根六、理解很分析领域事件来解耦微服务…

发表回复

您的电子邮箱地址不会被公开。

关注全栈程序员社区公众号