RabbitMQ 延迟队列实现订单支付结果异步阶梯性通知

在第三方支付中,例如支付宝、或者微信,对于订单请求,第三方支付系统采用的是消息同步返回、异步通知+主动补偿查询的补偿机制。

由于互联网通信的不可靠性,例如双方网络、服务器、应用等因素的影响,不管是同步返回、异步通知、主动查询报文都可能出现超时无响应、报文丢失等情况,所以像支付业务,对结果的通知一般采用几种方案结合的补偿机制,不能完全依赖某一种机制。

例如一个支付结果的通知,一方面会在支付页面跳转时候返回支付结果(一般只用作前端展示使用,非最终状态),同时会采用后台异步通知机制(有前台、后台通知的,以后台异步通知结果为准),但由于前台跳转、后台结果通知都可能失效,因此还以定时补单+请求方主动查询接口作为辅助手段。
常见的补单操作,任务调度策略一般设定30秒、60秒、3分钟、6分钟、10分钟调度多次(以自己业务需要),如果调度接收到响应确认报文,补单成功,则中止对应订单的调度任务;如果超过补单上限次数,则停止补单,避免无谓的资源浪费。请求端随时可以发起请求报文查询对应订单的状态。

在日常开发中,对于网站前端来说,支付计费中心对于订单请求信息的处理也是通过消息同步返回、异步通知+主动补偿查询相结合的机制,其中对于订单的异步通知,目前的通知策略为3s、30s、60s、120s、180、300s的阶梯性通知。返回成功情况下就不继续通知了,本来打算使用将失败的消息写到数据库等待发送,然后每秒查询数据库获取消息通知前端。但觉得这样的处理方式太粗暴。存在以下缺点:

1 、每秒请求有点儿浪费资源; 2 、通知方式不稳定; 3 、无法承受大数据量等等

所以最终打算使用rabbitmq的消息延迟+死信队列来实现。消息模型如下:

producer发布消息,通过exchangeA的消息会被分发到QueueA,Consumer监听queueA,一旦有消息到来就被消费,这边的消费业务就是通知前端,如果通知失败,就创建一个延迟队列declareQueue,设置每个消息的ttl然后通过declare_exchange将消息分发到declare_queue,因为declare_queue没有consumer并且declare_queue中的消息设置了ttl,当ttl到期后,将通过DEX路由到queueA,被重新消费。

代码如下:DeclareQueue.java


  1. package org.delayQueue;
  2. import com.rabbitmq.client.BuiltinExchangeType;
  3. import com.rabbitmq.client.Channel;
  4. import com.rabbitmq.client.Connection;
  5. import com.rabbitmq.client.ConnectionFactory;
  6. public class DeclareQueue {
  7. public static String EXCHANGE_NAME = "notifyExchange";
  8. public static void init() {
  9. ConnectionFactory factory = new ConnectionFactory();
  10. factory.setHost("localhost");
  11. factory.setPort(5672);
  12. Connection connection = null;
  13. try {
  14. connection = factory.newConnection();
  15. Channel channel = connection.createChannel();
  16. channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
  17. String routingKey = "AliPaynotify";
  18. String message = "http://localhost:8080/BossCenter/payGateway/notifyRecv.jsp?is_success=T?ify_id=4ab9bed148d043d0bf75460706f7774a?ify_time=2014-08-29+16%3A22%3A02?ify_type=trade_status_sync&out_trade_no=1421712120109862&total_fee=424.42&trade_no=14217121201098611&trade_status=TRADE_SUCCESS";
  19. channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());
  20. System.out.println(" [x] Sent :" + message);
  21. } catch (Exception e) {
  22. // TODO Auto-generated catch block
  23. e.printStackTrace();
  24. } finally {
  25. if (connection != null) {
  26. try {
  27. connection.close();
  28. } catch (Exception ignore) {
  29. }
  30. }
  31. }
  32. }
  33. public static void main(String args[]) {
  34. init();
  35. }
  36. }

DeclareConsumer.java


  1. package org.delayQueue;
  2. import java.io.BufferedReader;
  3. import java.io.IOException;
  4. import java.io.InputStreamReader;
  5. import java.util.ArrayList;
  6. import java.util.HashMap;
  7. import java.util.List;
  8. import java.util.Map;
  9. import java.util.Map.Entry;
  10. import org.apache.http.HttpResponse;
  11. import org.apache.http.client.ClientProtocolException;
  12. import org.apache.http.client.HttpClient;
  13. import org.apache.http.client.methods.HttpPost;
  14. import org.apache.http.impl.client.DefaultHttpClient;
  15. import com.rabbitmq.client.AMQP;
  16. import com.rabbitmq.client.Channel;
  17. import com.rabbitmq.client.Connection;
  18. import com.rabbitmq.client.ConnectionFactory;
  19. import com.rabbitmq.client.Consumer;
  20. import com.rabbitmq.client.DefaultConsumer;
  21. import com.rabbitmq.client.Envelope;
  22. public class DeclareConsumer {
  23. public static String EXCHANGE_NAME = "notifyExchange";
  24. public static String QU_declare_15S = "Qu_declare_15s";
  25. public static String EX_declare_15S = "EX_declare_15s";
  26. public static String ROUTINGKEY = "AliPaynotify";
  27. public static Connection connection = null;
  28. public static Channel channel = null;
  29. public static Channel DECLARE_15S_CHANNEL = null;
  30. public static String declare_queue = "init";
  31. public static String originalExpiration = "0";
  32. public static void init() throws Exception {
  33. ConnectionFactory factory = new ConnectionFactory();
  34. factory.setHost("localhost");
  35. factory.setPort(5672);
  36. connection = factory.newConnection();
  37. channel = connection.createChannel();
  38. DECLARE_15S_CHANNEL = connection.createChannel();
  39. }
  40. public static void consume() {
  41. try {
  42. channel.exchangeDeclare(EXCHANGE_NAME, "topic");
  43. final String queueName = channel.queueDeclare().getQueue();
  44. channel.queueBind(queueName, EXCHANGE_NAME, ROUTINGKEY);
  45. System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
  46. final Consumer consumer = new DefaultConsumer(channel) {
  47. @Override
  48. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  49. String message = new String(body, "UTF-8");
  50. Map<String, Object> headers = properties.getHeaders();
  51. if (headers != null) {
  52. List<Map<String, Object>> xDeath = (List<Map<String, Object>>) headers.get("x-death");
  53. System.out.println("xDeath--- > " + xDeath);
  54. if (xDeath != null && !xDeath.isEmpty()) {
  55. Map<String, Object> entrys = xDeath.get(0);
  56. // for(Entry<String, Object>
  57. // entry:entrys.entrySet()){
  58. // System.out.println(entry.getKey()+":"+entry.getValue());
  59. // }
  60. originalExpiration = entrys.get("original-expiration").toString();
  61. }
  62. }
  63. System.out.println(" [x] Received ‘" + envelope.getRoutingKey() + "‘:‘" + message + "‘" + "time" + System.currentTimeMillis());
  64. HttpClient httpClient = new DefaultHttpClient();
  65. HttpPost post = new HttpPost(message);
  66. HttpResponse response = httpClient.execute(post);
  67. BufferedReader inreader = null;
  68. if (response.getStatusLine().getStatusCode() == 200) {
  69. inreader = new BufferedReader(new InputStreamReader(response.getEntity().getContent(), "UTF-8"));
  70. StringBuffer responseBody = new StringBuffer();
  71. String line = null;
  72. while ((line = inreader.readLine()) != null) {
  73. responseBody.append(line);
  74. }
  75. if (!responseBody.equals("success")) {
  76. // putDeclre15s(message);
  77. if (originalExpiration.equals("0")) {
  78. putDeclreQueue(message, 3000, QU_declare_15S);
  79. }
  80. if (originalExpiration.equals("3000")) {
  81. putDeclreQueue(message, 30000, QU_declare_15S);
  82. }
  83. if (originalExpiration.equals("30000")) {
  84. putDeclreQueue(message, 60000, QU_declare_15S);
  85. }
  86. if (originalExpiration.equals("60000")) {
  87. putDeclreQueue(message, 120000, QU_declare_15S);
  88. }
  89. if (originalExpiration.equals("120000")) {
  90. putDeclreQueue(message, 180000, QU_declare_15S);
  91. }
  92. if (originalExpiration.equals("180000")) {
  93. putDeclreQueue(message, 300000, QU_declare_15S);
  94. }
  95. if (originalExpiration.equals("300000")) {
  96. // channel.basicConsume(QU_declare_300S,true, this);
  97. System.out.println("finish notify");
  98. }
  99. }
  100. } else {
  101. System.out.println(response.getStatusLine().getStatusCode());
  102. }
  103. }
  104. };
  105. channel.basicConsume(queueName, true, consumer);
  106. } catch (Exception e) {
  107. e.printStackTrace();
  108. } finally {
  109. }
  110. }
  111. static Map<String, Object> xdeathMap = new HashMap<String, Object>();
  112. static List<Map<String, Object>> xDeath = new ArrayList<Map<String, Object>>();
  113. static Map<String, Object> xdeathParam = new HashMap<String, Object>();
  114. public static void putDeclre15s(String message) throws IOException {
  115. channel.exchangeDeclare(EX_declare_15S, "topic");
  116. Map<String, Object> args = new HashMap<String, Object>();
  117. args.put("x-dead-letter-exchange", EXCHANGE_NAME);// 死信exchange
  118. AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
  119. builder.expiration("3000").deliveryMode(2);// 设置消息TTL
  120. AMQP.BasicProperties properties = builder.build();
  121. channel.queueDeclare(QU_declare_15S, false, false, false, args);
  122. channel.queueBind(QU_declare_15S, EX_declare_15S, ROUTINGKEY);
  123. channel.basicPublish(EX_declare_15S, ROUTINGKEY, properties, message.getBytes());
  124. System.out.println("send message in QA_DEFERRED_15S" + message + "time" + System.currentTimeMillis());
  125. }
  126. public static void putDeclreQueue(String message, int mis, String queue) throws IOException {
  127. channel.exchangeDeclare(EX_declare_15S, "topic");
  128. Map<String, Object> args = new HashMap<String, Object>();
  129. args.put("x-dead-letter-exchange", EXCHANGE_NAME);// 死信exchange
  130. AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
  131. builder.expiration(String.valueOf(mis)).deliveryMode(2);// 设置消息TTL
  132. AMQP.BasicProperties properties = builder.build();
  133. channel.queueDeclare(queue, false, false, false, args);
  134. channel.queueBind(queue, EX_declare_15S, ROUTINGKEY);
  135. channel.basicPublish(EX_declare_15S, ROUTINGKEY, properties, message.getBytes());
  136. System.out.println("send message in " + queue + message + "time============" + System.currentTimeMillis());
  137. }
  138. public static void main(String args[]) throws Exception {
  139. init();
  140. consume();
  141. }
  142. }

消息通过dlx转发的情况下,header头部会带有x-death的一个数组,里面包含消息的各项属性,比如说消息成为死信的原因reason,original-expiration这个字段表示消息在原来队列中的过期时间,根据这个值来确定下一次通知的延迟时间应该是多少秒。

运行结果如下:

原文地址:https://www.cnblogs.com/jpfss/p/9908853.html

时间: 2024-11-05 23:28:41

RabbitMQ 延迟队列实现订单支付结果异步阶梯性通知的相关文章

C# RabbitMQ延迟队列功能实战项目演练

一.需求背景 当用户在商城上进行下单支付,我们假设如果8小时没有进行支付,那么就后台自动对该笔交易的状态修改为订单关闭取消,同时给用户发送一份邮件提醒.那么我们应用程序如何实现这样的需求场景呢?在之前的<C# Redis缓存过期实现延迟通知实战演练>分享课程中阿笨最后总结的时候说过Redis Pub/Sub是一种并不可靠地消息机制,他不会做信息的存储,只是在线转发,那么肯定也没有ack确认机制,另外只有订阅段监听时才会转发!我们是否有更好的方式去实现呢?今天给大家分享的比较好的解决方案就是通过

C#实现rabbitmq 延迟队列功能

最近在研究rabbitmq,项目中有这样一个场景:在用户要支付订单的时候,如果超过30分钟未支付,会把订单关掉.当然我们可以做一个定时任务,每个一段时间来扫描未支付的订单,如果该订单超过支付时间就关闭,但是在数据量小的时候并没有什么大的问题,但是数据量一大轮训数据库的方式就会变得特别耗资源.当面对千万级.上亿级数据量时,本身写入的IO就比较高,导致长时间查询或者根本就查不出来,更别说分库分表以后了.除此之外,还有优先级队列,基于优先级队列的JDK延迟队列,时间轮等方式.但如果系统的架构中本身就有

实现rabbitmq 延迟队列功能

最近在研究rabbitmq,项目中有这样一个场景:在用户要支付订单的时候,如果超过30分钟未支付,会把订单关掉.当然我们可以做一个定时任务,每个一段时间来扫描未支付的订单,如果该订单超过支付时间就关闭,但是在数据量小的时候并没有什么大的问题,但是数据量一大轮训数据库的方式就会变得特别耗资源.当面对千万级.上亿级数据量时,本身写入的IO就比较高,导致长时间查询或者根本就查不出来,更别说分库分表以后了.除此之外,还有优先级队列,基于优先级队列的JDK延迟队列,时间轮等方式.但如果系统的架构中本身就有

Spring Boot(十四)RabbitMQ延迟队列

一.前言 延迟队列的使用场景:1.未按时支付的订单,30分钟过期之后取消订单:2.给活跃度比较低的用户间隔N天之后推送消息,提高活跃度:3.过1分钟给新注册会员的用户,发送注册邮件等. 实现延迟队列的方式有两种: 通过消息过期后进入死信交换器,再由交换器转发到延迟消费队列,实现延迟功能: 使用rabbitmq-delayed-message-exchange插件实现延迟功能: 注意: 延迟插件rabbitmq-delayed-message-exchange是在RabbitMQ 3.5.7及以上

SpringBoot RabbitMQ 延迟队列代码实现

场景 用户下单后,如果30min未支付,则删除该订单,这时候就要可以用延迟队列 准备 利用rabbitmq_delayed_message_exchange插件: 首先下载该插件:https://www.rabbitmq.com/community-plugins.html 然后把该插件放到rabbitmq安装目录plugins下: 进入到sbin目录下,执行"rabbitmq-plugins.bat enable rabbitmq_delayed_message_exchange";

【RabbitMQ】一文带你搞定RabbitMQ延迟队列

本文口味:鱼香肉丝? ?预计阅读:10分钟 一.说明 在上一篇中,介绍了RabbitMQ中的死信队列是什么,何时使用以及如何使用RabbitMQ的死信队列.相信通过上一篇的学习,对于死信队列已经有了更多的了解,这一篇的内容也跟死信队列息息相关,如果你还不了解死信队列,那么建议你先进行上一篇文章的阅读. 这一篇里,我们将继续介绍RabbitMQ的高级特性,通过本篇的学习,你将收获: 什么是延时队列 延时队列使用场景 RabbitMQ中的TTL 如何利用RabbitMQ来实现延时队列 二.本文大纲

RabbitMQ延迟队列简单示例

简介 延迟队列存储的消息是不希望被消费者立刻拿到的,而是等待特定时间后,消费者才能拿到这个消息进行消费.使用场景比较多,例如订单限时30分钟内支付,否则取消,再如分布式环境中每隔一段时间重复执行某操作. 下面举一个简单的例子,例子大概意思是分别在首次发送消息后的10秒.40秒.100秒后重新读取到消息.为了直观,不使用RabbitMQ其他多余的特性. 准备工作 在Centos7下安装RabbitMQ,版本为3.6.12单机版本(非集群),IP是127.0.0.1,端口是15672,使用web管理

rabbitmq延迟队列demo

工程结构: 定义jar包依赖的版本,版本很重要,rabbit依赖spring,必须一致,否则报错: <properties> <springframework.version>4.2.7.RELEASE</springframework.version> <spring-rabbit.version>1.6.1.RELEASE</spring-rabbit.version> <junit.version>4.12</junit.

SpringBoot:RabbitMQ 延迟队列

SpringBoot 是为了简化 Spring 应用的创建.运行.调试.部署等一系列问题而诞生的产物,自动装配的特性让我们可以更好的关注业务本身而不是外部的XML配置,我们只需遵循规范,引入相关的依赖就可以轻易的搭建出一个 WEB 工程 初探RabbitMQ消息队列中介绍了RabbitMQ的简单用法,顺带提及了下延迟队列的作用.所谓延时消息就是指当消息被发送以后,并不想让消费者立即拿到消息,而是等待指定时间后,消费者才拿到这个消息进行消费. 延迟队列 延迟队列能做什么? 订单业务: 在电商/点餐