RabbitMQ延迟队列(Python版)

原创Bge的博客 最后发布于2019-02-13 18:20:39 阅读数 401 收藏
展开
欢迎访问个人博客
最近在做一任务时,遇到需要延迟处理的数据,最开始的做法是现将数据存储在数据库,然后写个脚本,隔五分钟扫描数据表再处理数据,实际效果并不好。因为系统本身一直在用RabbitMQ做异步处理任务的中间件,所以想到是否可以利用RabbitMQ实现延迟队列。功夫不负有心人,RabbitMQ虽然没有现成可用的延迟队列,但是可以利用其两个重要特性来实现之:1、Time To Live(TTL)消息超时机制;2、Dead Letter Exchanges(DLX)死信队列。下面将具体描述实现原理以及实现代码。

延迟队列的基础原理
Time To Live(TTL)
RabbitMQ可以针对Queue设置x-expires 或者 针对Message设置 x-message-ttl,来控制消息的生存时间,如果超时(两者同时设置以最先到期的时间为准),则消息变为dead letter(死信)
RabbitMQ消息的过期时间有两种方法设置。

通过队列(Queue)的属性设置,队列中所有的消息都有相同的过期时间。(本次延迟队列采用的方案)
对消息单独设置,每条消息TTL可以不同。
如果同时使用,则消息的过期时间以两者之间TTL较小的那个数值为准。消息在队列的生存时间一旦超过设置的TTL值,就成为死信(dead letter)

Dead Letter Exchanges(DLX)
RabbitMQ的Queue可以配置x-dead-letter-exchange 和x-dead-letter-routing-key(可选)两个参数,如果队列内出现了dead letter,则按照这两个参数重新路由转发到指定的队列。

x-dead-letter-exchange:出现死信(dead letter)之后将dead letter重新发送到指定exchange
x-dead-letter-routing-key:出现死信(dead letter)之后将dead letter重新按照指定的routing-key发送
队列中出现死信(dead letter)的情况有:

消息或者队列的TTL过期。(延迟队列利用的特性)
队列达到最大长度
消息被消费端拒绝(basic.reject or basic.nack)并且requeue=false
综合上面两个特性,将队列设置TTL规则,队列TTL过期后消息会变成死信,然后利用DLX特性将其转发到另外的队列就可以被重新消费,达到延迟消费效果。

延迟队列设计及实现(Python)
从上面描述,延迟队列的实现大致分为两步:

产生死信,有两种方式Per-Message TTL和 Queue TTL,因为我的需求中是所有的消息延迟处理时间相同,所以本实现中采用 Queue TTL设置队列的TTL,如果需要将队列中的消息设置不同的延迟处理时间,则设置Per-Message TTL(官方文档)

设置死信的转发规则,Dead Letter Exchanges设置方法(官方文档)
完整代码如下:

"""
Created on Fri Aug 3 17:00:44 2018

@author: Bge
"""
import pika,json,logging
class RabbitMQClient:
def __init__(self, conn_str=‘amqp://user:[email protected]:port/%2F‘):
self.exchange_type = "direct"
self.connection_string = conn_str
self.connection = pika.BlockingConnection(pika.URLParameters(self.connection_string))
self.channel = self.connection.channel()
self._declare_retry_queue() #RetryQueue and RetryExchange
logging.debug("connection established")
def close_connection(self):
self.connection.close()
logging.debug("connection closed")
def declare_exchange(self, exchange):
self.channel.exchange_declare(exchange=exchange,
exchange_type=self.exchange_type,
durable=True)
def declare_queue(self, queue):
self.channel.queue_declare(queue=queue,
durable=True,)
def declare_delay_queue(self, queue,DLX=‘RetryExchange‘,TTL=60000):
"""
创建延迟队列
:param TTL: ttl的单位是us,ttl=60000 表示 60s
:param queue:
:param DLX:死信转发的exchange
:return:
"""
arguments={}
if DLX:
#设置死信转发的exchange
arguments[ ‘x-dead-letter-exchange‘]=DLX
if TTL:
arguments[‘x-message-ttl‘]=TTL
print(arguments)
self.channel.queue_declare(queue=queue,
durable=True,
arguments=arguments)
def _declare_retry_queue(self):
"""
创建异常交换器和队列,用于存放没有正常处理的消息。
:return:
"""
self.channel.exchange_declare(exchange=‘RetryExchange‘,
exchange_type=‘fanout‘,
durable=True)
self.channel.queue_declare(queue=‘RetryQueue‘,
durable=True)
self.channel.queue_bind(‘RetryQueue‘, ‘RetryExchange‘,‘RetryQueue‘)
def publish_message(self,routing_key, msg,exchange=‘‘,delay=0,TTL=None):
"""
发送消息到指定的交换器
:param exchange: RabbitMQ交换器
:param msg: 消息实体,是一个序列化的JSON字符串
:return:
"""
if delay==0:
self.declare_queue(routing_key)
else:
self.declare_delay_queue(routing_key,TTL=TTL)
if exchange!=‘‘:
self.declare_exchange(exchange)
self.channel.basic_publish(exchange=exchange,
routing_key=routing_key,
body=msg,
properties=pika.BasicProperties(
delivery_mode=2,
type=exchange
))
self.close_connection()
print("message send out to %s" % exchange)
logging.debug("message send out to %s" % exchange)
def start_consume(self,callback,queue=‘#‘,delay=1):
"""
启动消费者,开始消费RabbitMQ中的消息
:return:
"""
if delay==1:
queue=‘RetryQueue‘
else:
self.declare_queue(queue)
self.channel.basic_qos(prefetch_count=1)
try:
self.channel.basic_consume( # 消费消息
callback, # 如果收到消息,就调用callback函数来处理消息
queue=queue, # 你要从那个队列里收消息
)
self.channel.start_consuming()
except KeyboardInterrupt:
self.stop_consuming()
def stop_consuming(self):
self.channel.stop_consuming()
self.close_connection()
def message_handle_successfully(channel, method):
"""
如果消息处理正常完成,必须调用此方法,
否则RabbitMQ会认为消息处理不成功,重新将消息放回待执行队列中
:param channel: 回调函数的channel参数
:param method: 回调函数的method参数
:return:
"""
channel.basic_ack(delivery_tag=method.delivery_tag)
def message_handle_failed(channel, method):
"""
如果消息处理失败,应该调用此方法,会自动将消息放入异常队列
:param channel: 回调函数的channel参数
:param method: 回调函数的method参数
:return:
"""
channel.basic_reject(delivery_tag=method.delivery_tag, requeue=False)

发布消息代码如下:

from MQ.RabbitMQ import RabbitMQClient
print("start program")
client = RabbitMQClient()
msg1 = ‘{"key":"value"}‘
client.publish_message(‘test-delay‘,msg1,delay=1,TTL=10000)
print("message send out")

消费者代码如下:

from MQ.RabbitMQ import RabbitMQClient
import json
print("start program")
client = RabbitMQClient()
def callback(ch, method, properties, body):
msg = body.decode()
print(msg)
# 如果处理成功,则调用此消息回复ack,表示消息成功处理完成。
RabbitMQClient.message_handle_successfully(ch, method)
queue_name = "RetryQueue"
client.start_consume(callback,queue_name,delay=0)
————————————————
版权声明:本文为CSDN博主「Bge的博客」的原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/weixin_43437629/article/details/87196729

原文地址:https://www.cnblogs.com/fengff/p/12560630.html

时间: 2024-10-10 06:15:13

RabbitMQ延迟队列(Python版)的相关文章

C# RabbitMQ延迟队列功能实战项目演练

一.需求背景 当用户在商城上进行下单支付,我们假设如果8小时没有进行支付,那么就后台自动对该笔交易的状态修改为订单关闭取消,同时给用户发送一份邮件提醒.那么我们应用程序如何实现这样的需求场景呢?在之前的<C# Redis缓存过期实现延迟通知实战演练>分享课程中阿笨最后总结的时候说过Redis Pub/Sub是一种并不可靠地消息机制,他不会做信息的存储,只是在线转发,那么肯定也没有ack确认机制,另外只有订阅段监听时才会转发!我们是否有更好的方式去实现呢?今天给大家分享的比较好的解决方案就是通过

C#实现rabbitmq 延迟队列功能

最近在研究rabbitmq,项目中有这样一个场景:在用户要支付订单的时候,如果超过30分钟未支付,会把订单关掉.当然我们可以做一个定时任务,每个一段时间来扫描未支付的订单,如果该订单超过支付时间就关闭,但是在数据量小的时候并没有什么大的问题,但是数据量一大轮训数据库的方式就会变得特别耗资源.当面对千万级.上亿级数据量时,本身写入的IO就比较高,导致长时间查询或者根本就查不出来,更别说分库分表以后了.除此之外,还有优先级队列,基于优先级队列的JDK延迟队列,时间轮等方式.但如果系统的架构中本身就有

实现rabbitmq 延迟队列功能

最近在研究rabbitmq,项目中有这样一个场景:在用户要支付订单的时候,如果超过30分钟未支付,会把订单关掉.当然我们可以做一个定时任务,每个一段时间来扫描未支付的订单,如果该订单超过支付时间就关闭,但是在数据量小的时候并没有什么大的问题,但是数据量一大轮训数据库的方式就会变得特别耗资源.当面对千万级.上亿级数据量时,本身写入的IO就比较高,导致长时间查询或者根本就查不出来,更别说分库分表以后了.除此之外,还有优先级队列,基于优先级队列的JDK延迟队列,时间轮等方式.但如果系统的架构中本身就有

RabbitMQ 延迟队列实现订单支付结果异步阶梯性通知

在第三方支付中,例如支付宝.或者微信,对于订单请求,第三方支付系统采用的是消息同步返回.异步通知+主动补偿查询的补偿机制. 由于互联网通信的不可靠性,例如双方网络.服务器.应用等因素的影响,不管是同步返回.异步通知.主动查询报文都可能出现超时无响应.报文丢失等情况,所以像支付业务,对结果的通知一般采用几种方案结合的补偿机制,不能完全依赖某一种机制. 例如一个支付结果的通知,一方面会在支付页面跳转时候返回支付结果(一般只用作前端展示使用,非最终状态),同时会采用后台异步通知机制(有前台.后台通知的

Spring Boot(十四)RabbitMQ延迟队列

一.前言 延迟队列的使用场景:1.未按时支付的订单,30分钟过期之后取消订单:2.给活跃度比较低的用户间隔N天之后推送消息,提高活跃度:3.过1分钟给新注册会员的用户,发送注册邮件等. 实现延迟队列的方式有两种: 通过消息过期后进入死信交换器,再由交换器转发到延迟消费队列,实现延迟功能: 使用rabbitmq-delayed-message-exchange插件实现延迟功能: 注意: 延迟插件rabbitmq-delayed-message-exchange是在RabbitMQ 3.5.7及以上

rabbitmq延迟队列demo

工程结构: 定义jar包依赖的版本,版本很重要,rabbit依赖spring,必须一致,否则报错: <properties> <springframework.version>4.2.7.RELEASE</springframework.version> <spring-rabbit.version>1.6.1.RELEASE</spring-rabbit.version> <junit.version>4.12</junit.

SpringBoot RabbitMQ 延迟队列代码实现

场景 用户下单后,如果30min未支付,则删除该订单,这时候就要可以用延迟队列 准备 利用rabbitmq_delayed_message_exchange插件: 首先下载该插件:https://www.rabbitmq.com/community-plugins.html 然后把该插件放到rabbitmq安装目录plugins下: 进入到sbin目录下,执行"rabbitmq-plugins.bat enable rabbitmq_delayed_message_exchange";

SpringBoot:RabbitMQ 延迟队列

SpringBoot 是为了简化 Spring 应用的创建.运行.调试.部署等一系列问题而诞生的产物,自动装配的特性让我们可以更好的关注业务本身而不是外部的XML配置,我们只需遵循规范,引入相关的依赖就可以轻易的搭建出一个 WEB 工程 初探RabbitMQ消息队列中介绍了RabbitMQ的简单用法,顺带提及了下延迟队列的作用.所谓延时消息就是指当消息被发送以后,并不想让消费者立即拿到消息,而是等待指定时间后,消费者才拿到这个消息进行消费. 延迟队列 延迟队列能做什么? 订单业务: 在电商/点餐

【RabbitMQ】一文带你搞定RabbitMQ延迟队列

本文口味:鱼香肉丝? ?预计阅读:10分钟 一.说明 在上一篇中,介绍了RabbitMQ中的死信队列是什么,何时使用以及如何使用RabbitMQ的死信队列.相信通过上一篇的学习,对于死信队列已经有了更多的了解,这一篇的内容也跟死信队列息息相关,如果你还不了解死信队列,那么建议你先进行上一篇文章的阅读. 这一篇里,我们将继续介绍RabbitMQ的高级特性,通过本篇的学习,你将收获: 什么是延时队列 延时队列使用场景 RabbitMQ中的TTL 如何利用RabbitMQ来实现延时队列 二.本文大纲