Spring Boot 实现 RabbitMQ 延迟消费和延迟重试队列

本文主要摘录自:详细介绍Spring Boot + RabbitMQ实现延迟队列

并增加了自己的一些理解,记录下来,以便日后查阅。

项目源码:

背景

何为延迟队列?

顾名思义,延迟队列就是进入该队列的消息会被延迟消费的队列。而一般的队列,消息一旦入队了之后就会被消费者马上消费。
延迟队列能做什么?延迟队列多用于需要延迟工作的场景。最常见的是以下两种场景:

  • 延迟消费。比如:用户生成订单之后,需要过一段时间校验订单的支付状态,如果订单仍未支付则需要及时地关闭订单;用户注册成功之后,需要过一段时间比如一周后校验用户的使用情况,如果发现用户活跃度较低,则发送邮件或者短信来提醒用户使用。
  • 延迟重试。比如消费者从队列里消费消息时失败了,但是想要延迟一段时间后自动重试。

如果不使用延迟队列,那么我们只能通过一个轮询扫描程序去完成。这种方案既不优雅,也不方便做成统一的服务便于开发人员使用。但是使用延迟队列的话,我们就可以轻而易举地完成。

实现思路

在介绍具体的实现思路之前,我们先来介绍一下RabbitMQ的两个特性,一个是Time-To-Live Extensions,另一个是Dead Letter Exchanges。

Time-To-Live Extensions

RabbitMQ允许我们为消息或者队列设置TTL(time to live),也就是过期时间。TTL表明了一条消息可在队列中存活的最大时间,单位为毫秒。也就是说,当某条消息被设置了TTL或者当某条消息进入了设置了TTL的队列时,这条消息会在经过TTL秒后“死亡”,成为Dead Letter。如果既配置了消息的TTL,又配置了队列的TTL,那么较小的那个值会被取用。更多资料请查阅官方文档

Dead Letter Exchange

刚才提到了,被设置了TTL的消息在过期后会成为Dead Letter。其实在RabbitMQ中,一共有三种消息的“死亡”形式:

  • 消息被拒绝。通过调用basic.reject或者basic.nack并且设置的requeue参数为false。
  • 消息因为设置了TTL而过期。
  • 消息进入了一条已经达到最大长度的队列。

如果队列设置了Dead Letter Exchange(DLX),那么这些Dead Letter就会被重新publish到Dead Letter Exchange,通过Dead Letter Exchange路由到其他队列。更多资料请查阅官方文档

流程图

聪明的你肯定已经想到了,如何将RabbitMQ的TTL和DLX特性结合在一起,实现一个延迟队列。

针对于上述的延迟队列的两个场景,我们分别有以下两种流程图:

延迟消费

延迟消费是延迟队列最为常用的使用模式。如下图所示,生产者产生的消息首先会进入缓冲队列(图中红色队列)。通过RabbitMQ提供的TTL扩展,这些消息会被设置过期时间,也就是延迟消费的时间。等消息过期之后,这些消息会通过配置好的DLX转发到实际消费队列(图中蓝色队列),以此达到延迟消费的效果。

延迟重试

延迟重试本质上也是延迟消费的一种,但是这种模式的结构与普通的延迟消费的流程图较为不同,所以单独拎出来介绍。

如下图所示,消费者发现该消息处理出现了异常,比如是因为网络波动引起的异常。那么如果不等待一段时间,直接就重试的话,很可能会导致在这期间内一直无法成功,造成一定的资源浪费。那么我们可以将其先放在缓冲队列中(图中红色队列),等消息经过一段的延迟时间后再次进入实际消费队列中(图中蓝色队列),此时由于已经过了“较长”的时间了,异常的一些波动通常已经恢复,这些消息可以被正常地消费。

代码实现

配置队列

从上述的流程图中我们可以看到,一个延迟队列的实现,需要一个缓冲队列以及一个实际的消费队列。又由于在RabbitMQ中,我们拥有两种消息过期的配置方式,所以在代码中,我们一共配置了三条队列:

  • delay_queue_per_message_ttl:TTL配置在消息上的缓冲队列。
  • delay_queue_per_queue_ttl:TTL配置在队列上的缓冲队列。
  • delay_process_queue:实际消费队列。

我们通过Java Config的方式将上述的队列配置为Bean。由于我们添加了spring-boot-starter-amqp扩展,Spring Boot在启动时会根据我们的配置自动创建这些队列。为了方便接下来的测试,我们将delay_queue_per_message_ttl以及delay_queue_per_queue_ttl的DLX配置为同一个,且过期的消息都会通过DLX转发到delay_process_queue。

delay_queue_per_message_ttl

首先介绍delay_queue_per_message_ttl的配置代码:

@Bean
Queue delayQueuePerMessageTTL() {
    return QueueBuilder.durable(DELAY_QUEUE_PER_MESSAGE_TTL_NAME)
                       .withArgument("x-dead-letter-exchange", DELAY_EXCHANGE_NAME) // DLX,dead letter发送到的exchange
                       .withArgument("x-dead-letter-routing-key", DELAY_PROCESS_QUEUE_NAME) // dead letter携带的routing key
                       .build();
}

其中,x-dead-letter-exchange声明了队列里的死信转发到的DLX名称,x-dead-letter-routing-key声明了这些死信在转发时携带的routing-key名称。

delay_queue_per_queue_ttl

类似地,delay_queue_per_queue_ttl的配置代码:

@Bean
Queue delayQueuePerQueueTTL() {
    return QueueBuilder.durable(DELAY_QUEUE_PER_QUEUE_TTL_NAME)
                       .withArgument("x-dead-letter-exchange", DELAY_EXCHANGE_NAME) // DLX
                       .withArgument("x-dead-letter-routing-key", DELAY_PROCESS_QUEUE_NAME) // dead letter携带的routing key
                       .withArgument("x-message-ttl", QUEUE_EXPIRATION) // 设置队列的过期时间
                       .build();
}

delay_queue_per_queue_ttl队列的配置比delay_queue_per_message_ttl队列的配置多了一个x-message-ttl,该配置用来设置队列的过期时间。

delay_process_queue

delay_process_queue的配置最为简单:

@Bean
Queue delayProcessQueue() {
    return QueueBuilder.durable(DELAY_PROCESS_QUEUE_NAME)
                       .build();
}

配置Exchange

配置DLX

首先,我们需要配置DLX,代码如下:

@Bean
DirectExchange delayExchange() {
    return new DirectExchange(DELAY_EXCHANGE_NAME);
}

然后再将该DLX绑定到实际消费队列即delay_process_queue上。这样所有的死信都会通过DLX被转发到delay_process_queue:

@Bean
Binding dlxBinding(Queue delayProcessQueue, DirectExchange delayExchange) {
    return BindingBuilder.bind(delayProcessQueue)
                         .to(delayExchange)
                         .with(DELAY_PROCESS_QUEUE_NAME);
}

配置延迟重试所需的Exchange

从延迟重试的流程图中我们可以看到,消息处理失败之后,我们需要将消息转发到缓冲队列,所以缓冲队列也需要绑定一个Exchange。在本例中,我们将delay_process_per_queue_ttl作为延迟重试里的缓冲队列。

定义消费者

我们创建一个最简单的消费者ProcessReceiver,这个消费者监听delay_process_queue队列,对于接受到的消息,他会:

  • 如果消息里的消息体不等于FAIL_MESSAGE,那么他会输出消息体。
  • 如果消息里的消息体恰好是FAIL_MESSAGE,那么他会模拟抛出异常,然后将该消息重定向到缓冲队列(对应延迟重试场景)。

另外,我们还需要新建一个监听容器用于存放消费者,代码如下:

@Bean
SimpleMessageListenerContainer processContainer(ConnectionFactory connectionFactory, ProcessReceiver processReceiver) {
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
    container.setConnectionFactory(connectionFactory);
    container.setQueueNames(DELAY_PROCESS_QUEUE_NAME); // 监听delay_process_queue
    container.setMessageListener(new MessageListenerAdapter(processReceiver));
    return container;
}

至此,我们前置的配置代码已经全部编写完成,接下来我们需要编写测试用例来测试我们的延迟队列。

编写测试用例

延迟消费场景

首先我们编写用于测试TTL设置在消息上的测试代码。

我们借助spring-rabbit包下提供的RabbitTemplate类来发送消息。由于我们添加了spring-boot-starter-amqp扩展,Spring Boot会在初始化时自动地将RabbitTemplate当成bean加载到容器中。

解决了消息的发送问题,那么又该如何为每个消息设置TTL呢?这里我们需要借助MessagePostProcessor。MessagePostProcessor通常用来设置消息的Header以及消息的属性。我们新建一个ExpirationMessagePostProcessor类来负责设置消息的TTL属性:

/**
 * 设置消息的失效时间
 */
public class ExpirationMessagePostProcessor implements MessagePostProcessor {
    private final Long ttl; // 毫秒

    public ExpirationMessagePostProcessor(Long ttl) {
        this.ttl = ttl;
    }

    @Override
    public Message postProcessMessage(Message message) throws AmqpException {
        message.getMessageProperties()
               .setExpiration(ttl.toString()); // 设置per-message的失效时间
        return message;
    }
}

然后在调用RabbitTemplate的convertAndSend方法时,传入ExpirationMessagePostPorcessor即可。我们向缓冲队列中发送3条消息,过期时间依次为1秒,2秒和3秒。具体的代码如下所示:

@Test
public void testDelayQueuePerMessageTTL() throws InterruptedException {
    ProcessReceiver.latch = new CountDownLatch(3);
    for (int i = 1; i <= 3; i++) {
        long expiration = i * 1000;
        rabbitTemplate.convertAndSend(QueueConfig.DELAY_QUEUE_PER_MESSAGE_TTL_NAME,
                (Object) ("Message From delay_queue_per_message_ttl with expiration " + expiration), new ExpirationMessagePostProcessor(expiration));
    }
    ProcessReceiver.latch.await();
}

细心的朋友一定会问,为什么要在代码中加一个CountDownLatch呢?这是因为如果没有latch阻塞住测试方法的话,测试用例会直接结束,程序退出,我们就看不到消息被延迟消费的表现了。

那么类似地,测试TTL设置在队列上的代码如下:

@Test
public void testDelayQueuePerQueueTTL() throws InterruptedException {
    ProcessReceiver.latch = new CountDownLatch(3);
    for (int i = 1; i <= 3; i++) {
        rabbitTemplate.convertAndSend(QueueConfig.DELAY_QUEUE_PER_QUEUE_TTL_NAME,
                "Message From delay_queue_per_queue_ttl with expiration " + QueueConfig.QUEUE_EXPIRATION);
    }
    ProcessReceiver.latch.await();
}

我们向缓冲队列中发送3条消息。理论上这3条消息会在4秒后同时过期。

延迟重试场景

我们同样还需测试延迟重试场景。

@Test
public void testFailMessage() throws InterruptedException {
    ProcessReceiver.latch = new CountDownLatch(6);
    for (int i = 1; i <= 3; i++) {
        rabbitTemplate.convertAndSend(QueueConfig.DELAY_PROCESS_QUEUE_NAME, ProcessReceiver.FAIL_MESSAGE);
    }
    ProcessReceiver.latch.await();
}

我们向delay_process_queue发送3条会触发FAIL的消息,理论上这3条消息会在4秒后自动重试。

我的理解

延迟消费过程(每个消息可以单独设置失效时间):

  • 1. 声明 delay_queue_per_message_ttl 队列:死信队列,设置 DLX 参数,包含 x-dead-letter-exchange 表示失效后进入的 exchange(值为 delay_exchange,即实际消费交换机)、x-dead-letter-routing-key 表示失效后的路由键(值为 delay_process_queue,即实际消费队列)。
  • 2. 声明 delay_process_queue 队列:实际消费队列。
  • 3. 声明 delay_exchange 交换机:实际消费交换机,类型为 Direct(一一对应)。
  • 4. 声明 dlx_binding 绑定:将实际消费队列和实际消费交换机绑定(路由键规则值为 delay_process_queue)。
  • 5. 发布一个消息,路由键为 delay_queue_per_message_ttl(发送到死信队列),并通过 header 单独设置每个消息的过期时间:当过期时间生效后,消息会转到实际消费队列。
  • 6. 声明一个消费者,监听 delay_process_queue 队列(即实际消费队列):消息正常被消费掉,达到延迟消费的目的。

延迟消费过程(所有消息统一设置失效时间):

  • 1. 声明 delay_queue_per_queue_ttl 队列:死信队列,设置 DLX 参数,包含 x-dead-letter-exchange 表示失效后进入的 exchange(值为 delay_exchange,即实际消费交换机)、x-dead-letter-routing-key 表示失效后的路由键(值为 delay_process_queue,即实际消费队列)、x-message-ttl 表示队列消息过期时间。
  • 2. 声明 delay_process_queue 队列:实际消费队列。
  • 3. 声明 delay_exchange 交换机:实际消费交换机,类型为 Direct(一一对应)。
  • 4. 声明 dlx_binding 绑定:将实际消费队列和实际消费交换机绑定(路由键规则值为 delay_process_queue)。
  • 5. 发布一个消息,路由键为 delay_queue_per_queue_ttl(发送到死信队列):当过期时间生效后,消息会转到实际消费队列。
  • 6. 声明一个消费者,监听 delay_process_queue队列(即实际消费队列):消息正常被消费掉,达到延迟消费的目的。

延迟重试过程

  • 1. 声明 delay_process_queue 队列:实际消费队列。
  • 2. 声明 delay_queue_per_queue_ttl 队列:死信队列,设置 DLX 参数,包含 x-dead-letter-exchange 表示失效后进入的 exchange(值为 delay_exchange,即实际消费交换机)、x-dead-letter-routing-key 表示失效后的路由键(值为 delay_process_queue,即实际消费队列)、x-message-ttl 表示队列消息过期时间。
  • 3. 声明 delay_exchange 交换机:实际消费交换机,类型为 Direct(一一对应)。
  • 4. 声明 per_queue_ttl_exchange 交换机:死信交换机,类型为 Direct(一一对应)。
  • 5. 声明 dlx_binding 绑定:将实际消费队列和实际消费交换机绑定(路由键规则值为 delay_process_queue)。
  • 6. 声明 queue_ttl_binding 绑定:将死信队列和死信交换机绑定(路由键规则值为 delay_queue_per_queue_ttl)。
  • 7. 发布一个消息,路由键为 delay_process_queue(发送到实际消费队列)。
  • 8. 声明一个消费者,监听 delay_process_queue 队列(即实际消费队列):消费者监听到消息,当处理过程中发生异常,消息重新发送到私信队列,然后等待过期时间生效后,消息再转到实际消费队列,重新消费,以达到延迟重试的目的。

需要注意:在延迟消费的过程中,我们是没有创建死信交换机的,那为什么还可以发布消息呢?原因是 RabbitMQ 会使用默认的 Exchange,并且创建一个默认的 Binding(类型为 Direct),通过rabbitmqadmin list bindings命令,可以看到结果。

Spring Cloud Stream RabbitMQ DLX 的实现:_rabbitmq_consumer_properties

原文地址:https://www.cnblogs.com/xishuai/p/spring-boot-rabbitmq-delay-queue.html

时间: 2024-11-09 05:43:47

Spring Boot 实现 RabbitMQ 延迟消费和延迟重试队列的相关文章

Spring Boot 揭秘与实战(六) 消息队列篇 - RabbitMQ

文章目录 1. 什么是 RabitMQ 2. Spring Boot 整合 RabbitMQ 3. 实战演练4. 源代码 3.1. 一个简单的实战开始 3.1.1. Configuration 3.1.2. 消息生产者 3.1.3. 消息消费者 3.1.4. 运行 3.1.5. 单元测试 3.2. 路由的实战演练 3.2.1. Configuration 3.2.2. 消息生产者 3.2.3. 消息消费者 3.2.4. 运行 3.2.5. 单元测试 本文,讲解 Spring Boot 如何集成

Spring Boot之RabbitMQ

RabbitMQ 即一个消息队列,主要是用来实现应用程序的异步和解耦,同时也能起到消息缓冲,消息分发的作用. 消息中间件在互联网公司的使用中越来越多,刚才还看到新闻阿里将 RocketMQ 捐献给了 Apache,当然了今天的主角还是讲 RabbitMQ.消息中间件最主要的作用是解耦,中间件最标准的用法是生产者生产消息传送到队列,消费者从队列中拿取消息并处理,生产者不用关心是谁来消费,消费者不用关心谁在生产消息,从而达到解耦的目的.在分布式的系统中,消息队列也会被用在很多其它的方面,比如:分布式

Spring Boot (十三): Spring Boot 整合 RabbitMQ

1. 前言 RabbitMQ 是一个消息队列,说到消息队列,大家可能多多少少有听过,它主要的功能是用来实现应用服务的异步与解耦,同时也能起到削峰填谷.消息分发的作用. 消息队列在比较主要的一个作用是用来做应用服务的解耦,消息从消息的生产者传递到消息队列,消费者从消息队列中获取消息并进行消费,生产者不需要管是谁在消费消息,消费者也无需关注消息是由谁来生产的.在分布式的系统中,消息队列也会被用在其他地方,比如分布式事务的支持,代表如阿里开源的 RocketMQ . 当然,我们本篇文章的主角还是 Ra

消息队列 - Spring Boot 对rabbitmq批量处理数据的支持

消息队列 - Spring Boot 对rabbitmq批量处理数据的支持 一丶前言 在生产中,存在一些场景,需要对数据进行批量操作.如,可以先将数据存放到redis,然后将数据进行批量写进数据库.但是使用redis,不得不面对一个数据容易丢失的问题.也可以考虑使用消息队列进行替换,在数据持久化,数据不丢失方面,消息队列确实比redis好一点,毕竟设计不一样.是不是使用消息队列,就一定好呢?不是的,首先使用消息队列,不能确保数据百分百不丢失,(如果要做到百分百不丢失,设计上就会比较复杂),除此之

Spring Boot (25) RabbitMQ消息队列

MQ全程(Message Queue)又名消息队列,是一种异步通讯的中间件.可以理解为邮局,发送者将消息投递到邮局,然后邮局帮我们发送给具体的接收者,具体发送过程和时间与我们无关,常见的MQ又kafka.activemq.zeromq.rabbitmq等等. RabbitMQ RabbitMQ是一个遵循AMQP协议,由面向高并发的erlang语言开发而成,用在实时的对可靠性要求比较高的消息传递上,支持多种语言客户端,支持延迟队列. 基础概念 Broker:消息队列的服务器实体 Exchange:

spring boot整合RabbitMQ(Fanout模式)

1.Fanout Exchange介绍Fanout Exchange 消息广播的模式,不管路由键或者是路由模式,会把消息发给绑定给它的全部队列,如果配置了routing_key会被忽略. 如上图所示,即当使用fanout交换器时,他会将消息广播到与该交换器绑定的所有队列上,这有利于你对单条消息做不同的反应. 例如存在以下场景:一个web服务要在用户完善信息时,获得积分奖励,这样你就可以创建两个对列,一个用来处理用户信息的请求,另一个对列获取这条消息是来完成积分奖励的任务. 2.代码示例 1).Q

spring boot Rabbitmq集成,延时消息队列实现

本篇主要记录Spring boot 集成Rabbitmq,分为两部分, 第一部分为创建普通消息队列, 第二部分为延时消息队列实现: spring boot提供对mq消息队列支持amqp相关包,引入即可: [html] view plain copy <!-- rabbit mq --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-

RabbitMq 集成 spring boot 消息队列 入门Demo

spring boot 集成 RabbitMq还是很方便的.现在来一个简单的例子来集成rabbitmq.入门demo. 主要概念: 其中比较重要的概念有 4 个,分别为:虚拟主机,交换机,队列,和绑定. 虚拟主机:一个虚拟主机持有一组交换机.队列和绑定.为什么需要多个虚拟主机呢?很简单,RabbitMQ当中,用户只能在虚拟主机的粒度进行权限控制. 因此,如果需要禁止A组访问B组的交换机/队列/绑定,必须为A和B分别创建一个虚拟主机.每一个RabbitMQ服务器都有一个默认的虚拟主机"/"

详解Spring Boot中的RabbitMQ

RabbitMQ 即一个消息队列,主要是用来实现应用程序的异步和解耦,同时也能起到消息缓冲,消息分发的作用. 消息中间件在互联网公司的使用中越来越多,刚才还看到新闻阿里将 RocketMQ 捐献给了 Apache,当然了今天的主角还是讲 RabbitMQ.消息中间件最主要的作用是解耦,中间件最标准的用法是生产者生产消息传送到队列,消费者从队列中拿取消息并处理,生产者不用关心是谁来消费,消费者不用关心谁在生产消息,从而达到解耦的目的.在分布式的系统中,消息队列也会被用在很多其它的方面,比如:分布式