Go RabbitMQ 死信消息队列(二)

实现原理:

/**

(1)创建一个正常的队列 Q1,目的是处理业务逻辑,比如发送订单消息等 ,对应交换器和绑定键  分别为  E1 和  Bingkey1

(2)创建一个延时消息队列 Q2,设定队列的延时时间为10s,对应的交换器和绑定键分别为 E2和Bingkey2;并在该队列创建时候,设定队列的  (a)超时时间 (b) 超时后跳转的 路由E1和绑定Bingkey1,即超时后跳到     队列Q1上

(3) 将消息先发送到 队列Q2 上,然后等着队列超时,执行逻辑

* 主要测试一个死信队列,功能主要实现延时消费,原理是先把消息发到正常队列,

* 正常队列有超时时间,当达到时间后自动发到死信队列,然后由消费者去消费死信队列里的消息. */

延迟队列的应用场景

1、未支付订单定时取消
2、定时清理缓存对象、空闲连接等
3、下单成功后30分钟内,按不同时间间隔发送通知等(1min、3min、10min发一次)

1、设置队列的过期时间

$this->channel->queue_declare(
            $this->retry_queue(),
            false,
            true,
            false,
            false,
            false,
            new AMQPTable(
                [
                    # 不设置x-dead-letter-routing-key,使用原先的routing_key,10s过期后自动重回原先的队列里面,那x-dead-letter-exchange交换机就需绑定原先队列
                    ‘x-dead-letter-exchange‘ => $this->retry_exchange(),

                    # 10s
                    ‘x-message-ttl‘ => 10000,
                ]
            )
        );

推送到该队列的所有消息(不设ttl),10s之后都会过期,根据原来的routing_key,进入到指定的exchange,进而进到指定队列。

2、设置消息的过期时间

$message = new AMQPMessage(
    ‘msg‘,
    array(
        # 消息持久化
        ‘delivery_mode‘ => AMQPMessage::DELIVERY_MODE_PERSITENT,

        # ttl过期时间
        ‘expiration‘ => 50000,
    )
);

每个消息都设置相同的过期时间,到期后消息就会失效。

3、同时设置队列、消息的过期时间

如果同时设置,则消息的过期时间会取决于较小的值,比如队列的‘x-message-ttl’设置为10s,消息的‘expiration’设置为50s,则10s之后这个消息就会失效。

4、后续

单单设置队列的ttl,或者单单设置相同的消息过期时间,死信队列是能正常工作的。但是设置不同的消息过期时间,就可能无法正常使用死信队列了。

队列不设ttl

$this->channel->queue_declare(
            $this->retry_queue(),
            false,
            true,
            false,
            false,
            false,
            new AMQPTable(
                [
                    # 不设置x-dead-letter-routing-key,使用原先的routing_key,10s过期后自动重回原先的队列里面,那x-dead-letter-exchange交换机就需绑定原先队列
                    ‘x-dead-letter-exchange‘ => $this->retry_exchange(),
                ]
            )
        );

第一个消息设置500s过期,先推进队列

第一个消息设置500s过期,先推进队列

$message = new AMQPMessage(
    ‘msg‘,
    array(
        # 消息持久化
        ‘delivery_mode‘ => AMQPMessage::DELIVERY_MODE_PERSITENT,

        # ttl过期时间
        ‘expiration‘ => 500000,
    )
);

第二个消息设置5s过期,后推进队列

$message = new AMQPMessage(
    ‘msg‘,
    array(
        # 消息持久化
        ‘delivery_mode‘ => AMQPMessage::DELIVERY_MODE_PERSITENT,

        # ttl过期时间
        ‘expiration‘ => 5000,
    )
);

结果发现,5s之后,队列里还存在2个消息。说明第二个消息并没有“真的过期失效”。原因是位于队列首部的消息没有过期。而rabbitmq的死信队列,是基于首部消息实现的。

5、结论

当MQ检查队列中的第一个消息时,发现其并未过期,则不会继续检查之后的消息了。即使之后的消息过期了,也会因为没在队列头部而无法流转到其他队列,这是MQ队列的特性决定的。你不能去消费队列中间的消息,队列必须先进先出。

对于设置队列TTL属性的方法,一旦消息过期,就会从队列中抹去,而设置消息头部属性,即使消息过期,也不会马上从队列中抹去,因为每条消息是否过期时在即将投递到消费者之前判定的,为什么两者得处理方法不一致?因为第一种方法里,队列中已过期的消息肯定在队列头部,RabbitMQ只要定期从队头开始扫描是否有过期消息即可,而第二种方法里,每条消息的过期时间不同,如果要删除所有过期消息,势必要扫描整个队列,所以不如等到此消息即将被消费时再判定是否过期,如果过期,再进行删除。

官方的叙述

"Only when expired messages reach the head of a queue will they actually be discarded (or dead-lettered)." 只有当过期的消息到了队列的顶端(队首),才会被真正的丢弃或者进入死信队列。
 

原文地址:https://www.cnblogs.com/fengchuiyizh/p/12297830.html

时间: 2024-10-09 11:27:17

Go RabbitMQ 死信消息队列(二)的相关文章

RabbitMQ (消息队列)专题学习07 RPC

(使用Java客户端) 一.概述 在Work Queue的章节中我们学习了如何使用Work Queue分配耗时的任务给多个工作者,但是如果我们需要运行一个函数在远程计算机上,这是一个完全不同的情景,这种模式通常被称之为RPC. 在本章节的学习中,我们将使用RabbitMQ来构建一个RPC系统:一个远程客户端和一个可扩展的RPC服务器,我们没有任何费时的任务进行分配,我们将创建一个虚拟的RPC服务返回Fibonacci数. 1.1.客户端接口(Client Interface) 为了说明一个RPC

RabbitMQ分布式消息队列服务器(一、Windows下安装和部署)

RabbitMQ消息队列服务器在Windows下的安装和部署-> 一.Erlang语言环境的搭建 RabbitMQ开源消息队列服务是使用Erlang语言开发的,因此我们要使用他就必须先进行Erlang语言环境的搭建,其实是非常简单的. 登录Erlang官网,进入下载页,官网地址->http://www.erlang.org/downloads 然后按照自己的系统环境来选择需要下载的安装文件. 我选择 64-bit下载包,因为我的操作系统是64位的 接下来我们需要对Erlang语言的环境变量的配

RabbitMQ (消息队列)专题学习06 Topic

(使用Java客户端) 一.概述 在路由消息分发的学习中,对日志记录系统做了改进,使用direct exchange来替换fanout exchange进行消息分发,可以使日志系统有了直接.并且可以有选择的接收消息. 尽管使用direct exchange改进了系统,但是它仍然有局限性,就是不能根据多个标准来分发消息. 在日志系统中,我们也许想订阅的不仅仅是基于日志消息的严重程度,而且可能是基于日志消息的发送源. 这将给我们带来很多的灵活,我可能想坚挺的错误来自"cron"的消息源,而

RabbitMQ (消息队列)专题学习05 routing(路由)

(使用Java客户端) 一.概述 在前面的学习中,构建了一个简单的日志记录系统,能够广播所有的日志给多个接收者,在该部分学习中,将添加一个新的特点,就是可以只订阅一个特定的消息源,也就是说能够直接把关键的错误日志消息发送到日志文件保存起来,不重要的日志信息文件不保存在磁盘中,但是仍然能够在控制台输出,那么这便是我们这部分要学习的消息的路由分发机制. 二.路由功能实现 2.1.绑定(bindings) 在前面的学习中已经创建了绑定(bindings),代码如下: channel.queueBind

RabbitMQ (消息队列)专题学习03 Work Queues(工作队列)

一.概述 工作队列(Work queues) (使用Java客户端) 在前面的专题学习中,我们使用Java语言实现了一个简单的从名为"hello"的队列中发送和接收消息的程序,在这部内容中我们将创建一个工作队列被用来分配定时消息任务,而且通过多个接收者(工作者)实现. 工作队列(又名任务队列),主要的思想是为了避免立即做一个资源密集型的任务(多消息同时密集发送),不必等待它完成,当运行许多工作者的让任务都在她们之间共享. 它在web应用中是非常有用的,因为在很短的时间内http请求窗口

RabbitMQ (消息队列)专题学习02 Hello World

一.概述 RabbitMQ可以当做一个消息代理,它的核心原理非常简单:即接收和发送消息,可以把它想象成一个邮局:我们把信件放入邮箱,邮递员就会把信件投递到你的收件人处,RabbitMQ就是一个邮箱.邮局.投递员功能综合体,整个过程就是:邮箱接收信件,邮局转发信件,投递员投递信件到达收件人处. RabbitMQ和邮局的主要区别就是RabbitMQ接收.存储和发送的是二进制数据----消息. 在专题学习一中我们已经简单提到了一些概念,在此我们更为深入的学习下RabbitMQ相关的专有名词. 1.生产

RabbitMQ (消息队列)专题学习01 RabbitMQ部署

一.概述 RabbitMQ(Message Queue)是当前流行的开源的消息队列系统,用ERLang语言开发,按照AMQP(Advanced Message Queue Protocol)的标准实现,消息队列是一种应用程序对应用程序之间的通信方法,应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,则无需专用链接来链接它们,RabbitMQ便是这样一种用于应用程序之间通信的中间件. 二.架构 RabbitMQ的架构图如下: 图-1 在此有几个概念需要说明一下: 1.Exchange:消

C#调用RabbitMQ实现消息队列

我在刚接触使用中间件的时候,发现,中间件的使用并不是最难的,反而是中间件的下载,安装,配置才是最难的. 所以,这篇文章我们从头开始学习RabbitMq,真正的从头开始. 关于消息队列 其实消息队列没有那么神秘,我们这样想一下,用户访问网站,最终是要将数据以HTTP的协议的方式,通过网络传输到主机的某个端口上的. 那么,接收数据的方式是什么呢?自然是端口监听啦. 那消息队列是什么就很好解释了? 它就是端口监听,接到数据后,将数据排列起来. 那这件事,我们不用中间件能做吗? 当然能做啦,写个TCP/

RabbitMQ(消息队列)集群配置与使用篇

介绍 MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法.应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,而无需专用连接来链接它们.消息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信,直接调用通常是用于诸如远程过程调用的技术.排队指的是应用程序通过 队列来通信.队列的使用除去了接收和发送应用程序同时执行的要求.其中较为成熟的MQ产品有IBM WEBSPHERE MQ等等. MQ特点 MQ是消费-生产者模型的一个典型