rabbitmq~消息失败后重试达到 TTL放到死信队列(事务型消息补偿机制)

这是一个基于消息的分布式事务的一部分,主要通过消息来实现,生产者把消息发到队列后,由消费方去执行剩下的逻辑,而当消费方处理失败后,我们需要进行重试,即为了最现数据的最终一致性,在rabbitmq里,它有消息重试和重试次数的配置,但当你配置之后,你的TTL达到 后,消息不能自动放入死信队列,所以这块需要手工处理一下.

rabbitmq关于消息重试的配置

  rabbitmq:
    host: xxx
    port: xxx
    username: xxx
    password: xxx
    virtual-host: xxx
    ###开启消息确认机制 confirms
    publisher-confirms: true
    publisher-returns: true
    listener:
      simple:
        acknowledge-mode: manual #设置确认方式
        prefetch: 1 #每次处理1条消息
        retry.max-attempts: 3 # 最大重试次数
        retry.enabled: true #是否开启消费者重试(为false时关闭消费者重试,这时消费端代码异常会一直重复收到消息)
        retry.initial-interval: 2000 #重试间隔时间(单位毫秒)
        default-requeue-rejected: true #该配置项是决定由于监听器抛出异常而拒绝的消息是否被重新放回队列。默认值为true,需要手动basicNack时这些参数谅失效了

手工实现消息重试并放入死信的方式

定义队列的相关配置

/**
   * 创建普通交换机.
   */
  @Bean
  public TopicExchange lindExchange() {
    //消息持久化
    return (TopicExchange) ExchangeBuilder.topicExchange(EXCHANGE).durable(true).build();
  }

  @Bean
  public TopicExchange deadExchange() {
    return (TopicExchange) ExchangeBuilder.topicExchange(LIND_DL_EXCHANGE).durable(true).build();
  }

  /**
   * 基于消息事务的处理方式,当消费失败进行重试,有时间间隔,当达到超时时间,就发到死信队列,等待人工处理.
   * @return
   */
  @Bean
  public Queue testQueue() {
    //设置死信交换机
    return QueueBuilder.durable(QUEUE).withArgument("x-dead-letter-exchange", LIND_DL_EXCHANGE)
        //毫秒
        .withArgument("x-message-ttl", CONSUMER_EXPIRE)
        //设置死信routingKey
        .withArgument("x-dead-letter-routing-key", LIND_DEAD_QUEUE).build();
  }
  @Bean
  public Queue deadQueue() {
    return new Queue(LIND_DEAD_QUEUE);
  }
  @Bean
  public Binding bindBuildersRouteKey() {
    return BindingBuilder.bind(testQueue()).to(lindExchange()).with(ROUTER);
  }

  @Bean
  public Binding bindDeadBuildersRouteKey() {
    return BindingBuilder.bind(deadQueue()).to(deadExchange()).with(LIND_DEAD_QUEUE);
  }

消费者实现的代码

/**
   * 延时队列:不应该有RabbitListener订阅者,应该让它自己达到超时时间后自动转到死信里去消费
   * 消息异常处理:消费出现异常后,延时几秒,然后从新入队列消费,直到达到TTL超时时间,再转到死信,证明这个信息有问题需要人工干预
   *
   * @param message
   */
  @RabbitListener(queues = MqConfig.QUEUE)
  public void testSubscribe(Message message, Channel channel) throws IOException, InterruptedException {
    try {
      System.out.println(LocalDateTime.now() + ":Subscriber:" + new String(message.getBody(), "UTF-8"));
      //当程序处理出现问题时,消息使用basicReject上报
      int a = 0;
      int b = 1 / a;
      channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
    } catch (Exception ex) {
      //出现异常手动放回队列
      Thread.sleep(2000);
      channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
    }

  }
 /**
   * 死信队列.
   *
   * @param message
   */
  @RabbitListener(queues = MqConfig.LIND_DEAD_QUEUE)
  public void dealSubscribe(Message message, Channel channel) throws IOException {
    System.out.println("Dead Subscriber:" + new String(message.getBody(), "UTF-8"));
    channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
  }

这边尝试让消费者执行出错,然后走到catch里使用basicNack方法把消息从新放里队列里,并让线程让休息2秒,以避免频繁操作,之后就是我们希望看到的代码

2019-12-20T17:21:31.190:Subscriber:send a message to mq
2019-12-20T17:21:33.200:Subscriber:send a message to mq
2019-12-20T17:21:35.206:Subscriber:send a message to mq
2019-12-20T17:21:37.213:Subscriber:send a message to mq
2019-12-20T17:21:39.221:Subscriber:send a message to mq
Dead Subscriber:send a message to mq

这就是一个消息队列的补偿机制,使用死信队列也可以实现延时消息的机制,有时间再给大家分享!

原文地址:https://www.cnblogs.com/lori/p/12074299.html

时间: 2024-08-30 09:02:49

rabbitmq~消息失败后重试达到 TTL放到死信队列(事务型消息补偿机制)的相关文章

聊聊数据库乐观锁和悲观锁,乐观锁失败后重试

在写入数据库的时候需要有锁,比如同时写入数据库的时候会出现丢数据,那么就需要锁机制. 数据锁分为乐观锁和悲观锁,那么它们使用的场景如下: 1. 乐观锁适用于写少读多的情景,因为这种乐观锁相当于JAVA的CAS,所以多条数据同时过来的时候,不用等待,可以立即进行返回. 2. 悲观锁适用于写多读少的情景,这种情况也相当于JAVA的synchronized,reentrantLock等,大量数据过来的时候,只有一条数据可以被写入,其他的数据需要等待.执行完成后下一条数据可以继续. 他们实现的方式上有所

简单聊聊消息队列的事务补偿机制

转自:https://my.oschina.net/u/1589819/blog/1503241 因为一直学习与尝试负责公司的推送相关业务,包括整个应用的实现,其中就采用了基于消息队列的异步事件驱动模型来做解耦异步处理,所以就要去做了解一些相关的知识点,这边稍作总结,并整理一下消息补偿机制的一套简单实现的代码设计图. 采用基于消息队列的异步事件驱动模型来解决问题的时候,一个计较棘手的问题就是事务的一致性. 案例:现在用户发起一个创建订单的请求,如果我们是单系统架构,那么修改订单表,修改库存表可能

Oracle 数据库 JOB 失败后的重试规律解密

由于官方文档上没有找到相关的说明,所以这里进行了如下测试,为了找到oracle数据库中 job 失败后重试时间的规律. 数据库版本:11.2.0.3 测试说明:这里创建了一个日志表以及一个运行时必定出错的procedure,用于job的运行.这里只要记录下每次job执行时视图user_jobs 中的 next_date就可以推断出job 执行失败后的重试规律. 为了测试job的重试规律我做了如下工作 日志表以及序列: create table job_exec_logs (id number ,

Rabbitmq消费失败死信队列

Rabbitmq 重消费处理 一 处理流程图: 业务交换机:正常接收发送者,发送过来的消息,交换机类型topic AE交换机: 当业务交换机无法根据指定的routingkey去路由到队列的时候,会全部发送到AE交换机.发送到此队列的消息属于,业务垃圾消息,或者攻击消息类型,交换机类型fanout 死信交换机:用于处理消费者,消费失败回退的消息,根据死信交换机的routingkey发送到死信队列,交换机类型 topic EXAMPLE: 业务routingkey: hello/task_queue

RabbitMQ与.net core(四) 消息的优先级 与 死信队列

1.消息的优先级 假如现在有个需求,我们需要让一些优先级最高的通知推送到客户端,我们可以使用redis的sortedset,也可以使用我们今天要说的rabbit的消息优先级属性 Producer代码 using RabbitMQ.Client; using System; using System.Collections.Generic; using System.Text; using System.Threading; using System.Threading.Tasks; namesp

RabbitMQ死信队列

死信队列DLX,全称为Dead-Letter Exchange,可以称之为死信交换器,也有人称之为死信邮箱.当消息在一个队列中变成死信(dead message)之后,它能被重新被发送到另一个交换器中,这个交换器就是DLX,绑定DLX的队列就称之为死信队列.消息变成死信-般是由于以下几种情况:1.消息被拒绝(basic.reject或basic.nack)并且requeue=false.2.消息TTL过期[消息由于消息有效期(per-message TTL)过期]3.队列达到最大长度(队列满了,

RabbitMQ实战-死信队列

RabbitMQ死信队列 场景说明 代码实现 简单的Util 生产者 消费者 场景说明 场景: 当队列的消息未正常被消费时,如何解决? 消息被拒绝并且不再重新投递 消息超过有效期 队列超载 方案: 未被消费的消息,可通过"死信队列"重新被消费 死信队列含义,发生以上情况时,该队列上的消息,可通过配置转发到死信队列,被重新消费 模拟实现: 1个生产者,2个交换机和队列(普通和死信),1个消费者(死信消费者) 通过消息超时,模拟未正常消费场景 启动死信队列消费者,等待消息... 启动生产者

Spring Cloud Stream消费失败后的处理策略(三):使用DLQ队列(RabbitMQ)

应用场景 前两天我们已经介绍了两种Spring Cloud Stream对消息失败的处理策略: 自动重试:对于一些因环境原因(如:网络抖动等不稳定因素)引发的问题可以起到比较好的作用,提高消息处理的成功率. 自定义错误处理逻辑:如果业务上,消息处理失败之后有明确的降级逻辑可以弥补的,可以采用这种方式,但是2.0.x版本有Bug,2.1.x版本修复. 那么如果代码本身存在逻辑错误,无论重试多少次都不可能成功,也没有具体的降级业务逻辑,之前在深入思考中讨论过,可以通过日志,或者降级逻辑记录的方式把错

Spring Cloud Stream消费失败后的处理策略(四):重新入队(RabbitMQ)

应用场景 之前我们已经通过<Spring Cloud Stream消费失败后的处理策略(一):自动重试>一文介绍了Spring Cloud Stream默认的消息重试功能.本文将介绍RabbitMQ的binder提供的另外一种重试功能:重新入队. 动手试试 准备一个会消费失败的例子,可以直接沿用前文的工程,也可以新建一个,然后创建如下代码的逻辑: @EnableBinding(TestApplication.TestTopic.class) @SpringBootApplication pub