RabbitMQ项目使用之死信队列

消息消费失败处理方式:

一 进入死信队列(进入死信的三种方式)

1.消息被拒绝(basic.reject or basic.nack)并且requeue=false

2.消息TTL过期过期时间

3.队列达到最大长度

DLX也是一下正常的Exchange同一般的Exchange没有区别,它能在任何的队列上被指定,实际上就是设置某个队列的属性,当这个队列中有死信时,RabbitMQ就会自动的将这个消息重新发布到设置的Exchange中去,进而被路由到另一个队列, publish可以监听这个队列中消息做相应的处理, 这个特性可以弥补R abbitMQ 3.0.0以前支持的immediate参数中的向publish确认的功能。

rabbitmq的三种模式:

一. Fanout Exchange  广播

所有发送到Fanout Exchange的消息都会被转发到与该Exchange 绑定(Binding)的所有Queue上。Fanout Exchange  不需要处理RouteKey 。只需要简单的将队列绑定到exchange 上。这样发送到exchange的消息都会被转发到与该交换机绑定的所有队列上。类似子网广播,每台子网内的主机都获得了一份复制的消息。所以,Fanout Exchange 转发消息是最快的。

二. Direct Exchange  点对点

所有发送到Direct Exchange的消息被转发到RouteKey中指定的Queue。Direct模式,可以使用rabbitMQ自带的Exchange:default Exchange 。所以不需要将Exchange进行任何绑定(binding)操作 。消息传递时,RouteKey必须完全匹配,才会被队列接收,否则该消息会被抛弃。

三. Topic Exchange  模糊匹配

所有发送到Topic Exchange的消息被转发到所有关心RouteKey中指定Topic的Queue上,Exchange 将RouteKey 和某Topic 进行模糊匹配。此时队列需要绑定一个Topic。可以使用通配符进行模糊匹配,符号“#”匹配一个或多个词,符号“*”匹配不多不少一个词。因此“log.#”能够匹配到“log.info.oa”,但是“log.*” 只会匹配到“log.error”。所以,Topic Exchange 使用非常灵活。

死信队列实现:

详见官网说明: http://www.rabbitmq.com/ttl.html

在声明期间使用x参数为队列定义消息TTL

如:spring 中配置如下:

<rabbit:connection-factory id="connectionFactory" host="47.104.203.101" password="admin"                           username="admin" port="5672"                           channel-cache-size="30" virtual-host="/citpay"                           publisher-confirms="true"                           publisher-returns="true"/>

<rabbit:template id="amqpTemplate" connection-factory="connectionFactory" mandatory="true"                 confirm-callback="payOrderConfirmCallBack"                 return-callback="payOrderReturnCallBack"/>

<rabbit:admin connection-factory="connectionFactory" />

<rabbit:queue id="pay_order_queue" name="citpay.pay_order_queue" durable="true" auto-delete="false" >  <rabbit:queue-arguments>      <entry key="x-message-ttl">          <value type="java.lang.Long">5000</value>      </entry>      <entry key="x-dead-letter-exchange">          <value type="java.lang.String">citpay.pay_order_dead_exchange</value>      </entry>  </rabbit:queue-arguments></rabbit:queue>

<rabbit:direct-exchange name="citpay.direct_pay_order_exchange" id="direct_pay_order_exchange" durable="true" auto-delete="false">    <rabbit:bindings>        <rabbit:binding queue="citpay.pay_order_queue" key="pay_order_routekey"/>    </rabbit:bindings></rabbit:direct-exchange>
<rabbit:queue id="pay_dead_order_queue" name="citpay.pay_dead_order_queue" durable="true" auto-delete="false"></rabbit:queue>

<rabbit:direct-exchange name="citpay.pay_order_dead_exchange" id="pay_order_dead_exchange" durable="true" auto-delete="false">    <rabbit:bindings>        <rabbit:binding queue="citpay.pay_dead_order_queue" key="pay_order_routekey"/>    </rabbit:bindings></rabbit:direct-exchange>

<rabbit:listener-container        connection-factory="connectionFactory" acknowledge="manual" max-concurrency="1" >    <rabbit:listener queues="citpay.pay_dead_order_queue"                     ref="payOrderDeadReceiveConfirmListener" /></rabbit:listener-container>

使用策略为队列定义队列TTL

  命令形式:   rabbitmqctl set_policy expiry“。*”‘{“expires”:1800000}‘ -  apply-to queues

管理控制台设置策略:

原文地址:https://www.cnblogs.com/8341-jack/p/9829392.html

时间: 2024-11-02 19:54:55

RabbitMQ项目使用之死信队列的相关文章

Rabbitmq消费失败死信队列

Rabbitmq 重消费处理 一 处理流程图: 业务交换机:正常接收发送者,发送过来的消息,交换机类型topic AE交换机: 当业务交换机无法根据指定的routingkey去路由到队列的时候,会全部发送到AE交换机.发送到此队列的消息属于,业务垃圾消息,或者攻击消息类型,交换机类型fanout 死信交换机:用于处理消费者,消费失败回退的消息,根据死信交换机的routingkey发送到死信队列,交换机类型 topic EXAMPLE: 业务routingkey: hello/task_queue

RabbitMQ与.net core(四) 消息的优先级 与 死信队列

1.消息的优先级 假如现在有个需求,我们需要让一些优先级最高的通知推送到客户端,我们可以使用redis的sortedset,也可以使用我们今天要说的rabbit的消息优先级属性 Producer代码 using RabbitMQ.Client; using System; using System.Collections.Generic; using System.Text; using System.Threading; using System.Threading.Tasks; namesp

RabbitMQ 死信队列DLX

死信队列的简单介绍 利用dlx,当消息在一个队列中变成死信之后,它能被重新publish到另一个exchange,这个exchange就是dlx消息变成死信的以下几种情况 消息被拒绝,并且requeue= false 消息ttl过期 队列达到最大的长度dlx也是一个正常的exchange,和一般的exchange没什么区别,它能在任何的队列上被指定,实际上就是设置某个队列的属性.当这个队列中有死信时,rabbitmq就会自动的将这个消息重新发布到设置的exchange上,进而被路由到另一个队列.

RabbitMQ实现延时队列(死信队列)

基于队列和基于消息的TTL TTL是time to live 的简称,顾名思义指的是消息的存活时间.rabbitMq可以从两种维度设置消息过期时间,分别是队列和消息本身. 队列消息过期时间-Per-Queue Message TTL: 通过设置队列的x-message-ttl参数来设置指定队列上消息的存活时间,其值是一个非负整数,单位为微秒.不同队列的过期时间互相之间没有影响,即使是对于同一条消息.队列中的消息存在队列中的时间超过过期时间则成为死信. 死信交换机DLX 队列中的消息在以下三种情况

RabbitMQ实战-死信队列

RabbitMQ死信队列 场景说明 代码实现 简单的Util 生产者 消费者 场景说明 场景: 当队列的消息未正常被消费时,如何解决? 消息被拒绝并且不再重新投递 消息超过有效期 队列超载 方案: 未被消费的消息,可通过"死信队列"重新被消费 死信队列含义,发生以上情况时,该队列上的消息,可通过配置转发到死信队列,被重新消费 模拟实现: 1个生产者,2个交换机和队列(普通和死信),1个消费者(死信消费者) 通过消息超时,模拟未正常消费场景 启动死信队列消费者,等待消息... 启动生产者

RabbitMQ死信队列

死信队列DLX,全称为Dead-Letter Exchange,可以称之为死信交换器,也有人称之为死信邮箱.当消息在一个队列中变成死信(dead message)之后,它能被重新被发送到另一个交换器中,这个交换器就是DLX,绑定DLX的队列就称之为死信队列.消息变成死信-般是由于以下几种情况:1.消息被拒绝(basic.reject或basic.nack)并且requeue=false.2.消息TTL过期[消息由于消息有效期(per-message TTL)过期]3.队列达到最大长度(队列满了,

rabbitmq~消息失败后重试达到 TTL放到死信队列(事务型消息补偿机制)

这是一个基于消息的分布式事务的一部分,主要通过消息来实现,生产者把消息发到队列后,由消费方去执行剩下的逻辑,而当消费方处理失败后,我们需要进行重试,即为了最现数据的最终一致性,在rabbitmq里,它有消息重试和重试次数的配置,但当你配置之后,你的TTL达到 后,消息不能自动放入死信队列,所以这块需要手工处理一下. rabbitmq关于消息重试的配置 rabbitmq: host: xxx port: xxx username: xxx password: xxx virtual-host: x

Spring Boot系列(8)——RabbitMQ确认、退回模式及死信队列

〇.什么是消息队列 参考:新手也能看懂,消息队列其实很简单    RabbitMQ运行模型与名词解释 一.应答模式 1.什么是应答? 消息投递到交换器(exchange)中,交换器给我们的反馈,是保障消息投递成功的一种机制. 2.测试 配置: 1 #选择确认类型为交互 2 spring.rabbitmq.publisher-confirm-type=correlated 测试方法: 1 @Test 2 /** 3 * the test is testing confirm-function in

ActiveMQ_5死信队列

activemq死信队列 DLQ-死信队列(Dead Letter Queue)用来保存处理失败或者过期的消息. 出现以下情况时,消息会被redelivered: A transacted session is used and rollback() is called. A transacted session is closed before commit is called. A session is using CLIENT_ACKNOWLEDGE and Session.recover