rabbitmq延迟任务的处理

场景一:物联网系统经常会遇到向终端下发命令,如果命令一段时间没有应答,就需要设置成超时。

场景二:订单下单之后30分钟后,如果用户没有付钱,则系统自动取消订单。

最近的一个项目遇到了这种情况,如果运单30分钟还没有被接单,则状态自动变为已取消。实现延迟消息原理如下,借用一张图:

php代码如下:

/**
 * 死信
 * 创建交换机1、交换机2、队列1、队列2
 * 交换机1绑定队列1、交换机2绑定队列2
 * 其中交换机1为死信交换机,队列1处理消息的ttl,队列1没有消费者
 *
 * 由于队列1没有消费者,所以它里面的消息过期后会变成死信,再定义规则,让死信进入交换机2,交换机2再把消息路由到队列2
 * 我们的客户端只需要消费队列2即可
 *
 * 需要确保业务上每个任务的延迟时间是一致的。如果遇到不同的任务类型需要不同的延时的话,需要为每一种不同延迟时间的消息建立单独的消息队列。
 */
public function actionDeadletter(){
    $connection = new AMQPStreamConnection($this->host, $this->port, $this->user, $this->pass, $this->vhost);
    $channel = $connection->channel();

    // 死信交换机和队列的设置
    $channel->exchange_declare(‘dead_exchange‘, ‘direct‘, false, true, false);
    $channel->queue_declare(‘queue1‘, false, true, false, false, false, [
            ‘x-dead-letter-exchange‘ => [‘S‘, ‘normal_exchange‘], // 死信被转发到哪个交换机
            ‘x-dead-letter-routing-key‘ => [‘S‘, ‘normal_routingkey‘] // 死信路由
    ]);
    $channel->queue_bind(‘queue1‘, ‘dead_exchange‘, ‘dead_routingkey‘);

    // 正常交换机和队列的设置
    $channel->exchange_declare(‘normal_exchange‘, ‘direct‘, false, true, false);
    $channel->queue_declare(‘queue2‘, false, true, false, false, false);
    $channel->queue_bind(‘queue2‘, ‘normal_exchange‘, ‘normal_routingkey‘);

    $msg = new AMQPMessage(‘hello world‘, [
            ‘delivery_mode‘ => 2,
            ‘expiration‘ => 10*1000 //毫秒
    ]);

    $channel->basic_publish($msg, ‘dead_exchange‘, ‘dead_routingkey‘);

    echo " [x] Sent ‘Hello World!‘\n";
    $channel->close();
    $connection->close();
}

运行程序,打开rabbitmq的web管理界面,可以看到消息先进入队列1,当消息过期后会自动进入队列2

参考:https://stackoverflow.com/questions/21942063/how-to-delay-php-amqplib

参考:http://www.cnblogs.com/haoxinyue/p/6613706.html

时间: 2024-11-02 10:13:55

rabbitmq延迟任务的处理的相关文章

使用RabbitMQ实现延迟任务

场景一:物联网系统经常会遇到向终端下发命令,如果命令一段时间没有应答,就需要设置成超时. 场景二:订单下单之后30分钟后,如果用户没有付钱,则系统自动取消订单. 上述类似的需求是我们经常会遇见的问题.最常用的方法是定期轮训数据库,设置状态.在数据量小的时候并没有什么大的问题,但是数据量一大轮训数据库的方式就会变得特别耗资源.当面对千万级.上亿级数据量时,本身写入的IO就比较高,导致长时间查询或者根本就查不出来,更别说分库分表以后了.除此之外,还有优先级队列,基于优先级队列的JDK延迟队列,时间轮

延迟任务的实现总结

上一篇写了使用RabbitMQ来实现延迟任务的实现,其实实现延迟任务的方式有很多,各有利弊,有单机和分布式的.在这里做一个总结,在遇到这类问题的时候希望给大家一个参考和思路. 延迟任务有别于定式任务,定式任务往往是固定周期的,有明确的触发时间.而延迟任务一般没有固定的开始时间,它常常是由一个事件触发的,而在这个事件触发之后的一段时间内触发另一个事件.延迟任务相关的业务场景如下: 场景一:物联网系统经常会遇到向终端下发命令,如果命令一段时间没有应答,就需要设置成超时. 场景二:订单下单之后30分钟

C#实现rabbitmq 延迟队列功能

最近在研究rabbitmq,项目中有这样一个场景:在用户要支付订单的时候,如果超过30分钟未支付,会把订单关掉.当然我们可以做一个定时任务,每个一段时间来扫描未支付的订单,如果该订单超过支付时间就关闭,但是在数据量小的时候并没有什么大的问题,但是数据量一大轮训数据库的方式就会变得特别耗资源.当面对千万级.上亿级数据量时,本身写入的IO就比较高,导致长时间查询或者根本就查不出来,更别说分库分表以后了.除此之外,还有优先级队列,基于优先级队列的JDK延迟队列,时间轮等方式.但如果系统的架构中本身就有

RabbitMQ延时任务

概念: 消息的TTL(Time To Live)消息的TTL就是消息的存活时间.RabbitMQ可以对队列和消息分别设置TTL.对队列设置就是队列没有消费者连着的保留时间,也可以对每一个单独的消息做单独的设置.超过了这个时间,我们认为这个消息就死了,称之为死信.如果队列设置了,消息也设置了,那么会取小的.所以一个消息如果被路由到不同的队列中,这个消息死亡的时间有可能不一样(不同的队列设置).这里单讲单个消息的TTL,因为它才是实现延迟任务的关键.可以通过设置消息的expiration字段或者x-

实现rabbitmq 延迟队列功能

最近在研究rabbitmq,项目中有这样一个场景:在用户要支付订单的时候,如果超过30分钟未支付,会把订单关掉.当然我们可以做一个定时任务,每个一段时间来扫描未支付的订单,如果该订单超过支付时间就关闭,但是在数据量小的时候并没有什么大的问题,但是数据量一大轮训数据库的方式就会变得特别耗资源.当面对千万级.上亿级数据量时,本身写入的IO就比较高,导致长时间查询或者根本就查不出来,更别说分库分表以后了.除此之外,还有优先级队列,基于优先级队列的JDK延迟队列,时间轮等方式.但如果系统的架构中本身就有

C# RabbitMQ延迟队列功能实战项目演练

一.需求背景 当用户在商城上进行下单支付,我们假设如果8小时没有进行支付,那么就后台自动对该笔交易的状态修改为订单关闭取消,同时给用户发送一份邮件提醒.那么我们应用程序如何实现这样的需求场景呢?在之前的<C# Redis缓存过期实现延迟通知实战演练>分享课程中阿笨最后总结的时候说过Redis Pub/Sub是一种并不可靠地消息机制,他不会做信息的存储,只是在线转发,那么肯定也没有ack确认机制,另外只有订阅段监听时才会转发!我们是否有更好的方式去实现呢?今天给大家分享的比较好的解决方案就是通过

java中延迟任务的处理方式

1.利用延迟队列 延时队列,第一他是个队列,所以具有对列功能第二就是延时,这就是延时对列,功能也就是将任务放在该延时对列中,只有到了延时时刻才能从该延时对列中获取任务否则获取不到…… 应用场景比较多,比如延时1分钟发短信,延时1分钟再次执行等,下面先看看延时队列demo之后再看延时队列在项目中的使用: 简单的延时队列要有三部分:第一实现了Delayed接口的消息体.第二消费消息的消费者.第三存放消息的延时队列,那下面就来看看延时队列demo. 一.消息体 package com.delqueue

windows上部署rabbitmq遇到的一些问题及解决方法

在目前这家公司,刚进公司的时候接手了一个服务,算是个比较完备的服务,其中几台电脑之间通信用到了rabbitmq,一开始没出什么问题,然后后来勒索病毒wanner cry来的时候,系服把所有服务器装了一个什么杀毒软件,重启之后rabibtmq集群就出现了一些问题,经过一番学习,把这些问题都搞定了,现在做一个总结. 一开始,我按照官网的描述,把四台服务器加入了一个集群,但是不知道为什么,除了主节点外,另外三台都看不了集群状态,由于并不影响什么,就先放在那没管,其实想起来,是因为之前集群的配置文件没删

Spring rabbitMq 中 correlationId或CorrelationIdString 消费者获取为null的问题

问题 在用Spring boot 的 spring-boot-starter-amqp   快速启动 rabbitMq 是遇到了个坑 消费者端获取不到:correlationId或CorrelationIdString 问题产生的原因 correlationId 的在 spring rabbitmq 2.0 以后 byte方式会被放弃,所以 目前 代码中有些地方没有改过来,应该算一个BUG @SuppressWarnings("deprecation") public class De