消息消费失败处理方式:
一 进入死信队列(进入死信的三种方式)
1.消息被拒绝(basic.reject or basic.nack)并且requeue=false
2.消息TTL过期过期时间
3.队列达到最大长度
DLX也是一下正常的Exchange同一般的Exchange没有区别,它能在任何的队列上被指定,实际上就是设置某个队列的属性,当这个队列中有死信时,RabbitMQ就会自动的将这个消息重新发布到设置的Exchange中去,进而被路由到另一个队列, publish可以监听这个队列中消息做相应的处理, 这个特性可以弥补R abbitMQ 3.0.0以前支持的immediate参数中的向publish确认的功能。
rabbitmq的三种模式:
一. Fanout Exchange 广播
所有发送到Fanout Exchange的消息都会被转发到与该Exchange 绑定(Binding)的所有Queue上。Fanout Exchange 不需要处理RouteKey 。只需要简单的将队列绑定到exchange 上。这样发送到exchange的消息都会被转发到与该交换机绑定的所有队列上。类似子网广播,每台子网内的主机都获得了一份复制的消息。所以,Fanout Exchange 转发消息是最快的。
二. Direct Exchange 点对点
所有发送到Direct Exchange的消息被转发到RouteKey中指定的Queue。Direct模式,可以使用rabbitMQ自带的Exchange:default Exchange 。所以不需要将Exchange进行任何绑定(binding)操作 。消息传递时,RouteKey必须完全匹配,才会被队列接收,否则该消息会被抛弃。
三. Topic Exchange 模糊匹配
所有发送到Topic Exchange的消息被转发到所有关心RouteKey中指定Topic的Queue上,Exchange 将RouteKey 和某Topic 进行模糊匹配。此时队列需要绑定一个Topic。可以使用通配符进行模糊匹配,符号“#”匹配一个或多个词,符号“*”匹配不多不少一个词。因此“log.#”能够匹配到“log.info.oa”,但是“log.*” 只会匹配到“log.error”。所以,Topic Exchange 使用非常灵活。
死信队列实现:
详见官网说明: http://www.rabbitmq.com/ttl.html
在声明期间使用x参数为队列定义消息TTL
如:spring 中配置如下:
<rabbit:connection-factory id="connectionFactory" host="47.104.203.101" password="admin" username="admin" port="5672" channel-cache-size="30" virtual-host="/citpay" publisher-confirms="true" publisher-returns="true"/> <rabbit:template id="amqpTemplate" connection-factory="connectionFactory" mandatory="true" confirm-callback="payOrderConfirmCallBack" return-callback="payOrderReturnCallBack"/> <rabbit:admin connection-factory="connectionFactory" /> <rabbit:queue id="pay_order_queue" name="citpay.pay_order_queue" durable="true" auto-delete="false" > <rabbit:queue-arguments> <entry key="x-message-ttl"> <value type="java.lang.Long">5000</value> </entry> <entry key="x-dead-letter-exchange"> <value type="java.lang.String">citpay.pay_order_dead_exchange</value> </entry> </rabbit:queue-arguments></rabbit:queue> <rabbit:direct-exchange name="citpay.direct_pay_order_exchange" id="direct_pay_order_exchange" durable="true" auto-delete="false"> <rabbit:bindings> <rabbit:binding queue="citpay.pay_order_queue" key="pay_order_routekey"/> </rabbit:bindings></rabbit:direct-exchange>
<rabbit:queue id="pay_dead_order_queue" name="citpay.pay_dead_order_queue" durable="true" auto-delete="false"></rabbit:queue> <rabbit:direct-exchange name="citpay.pay_order_dead_exchange" id="pay_order_dead_exchange" durable="true" auto-delete="false"> <rabbit:bindings> <rabbit:binding queue="citpay.pay_dead_order_queue" key="pay_order_routekey"/> </rabbit:bindings></rabbit:direct-exchange> <rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual" max-concurrency="1" > <rabbit:listener queues="citpay.pay_dead_order_queue" ref="payOrderDeadReceiveConfirmListener" /></rabbit:listener-container>
使用策略为队列定义队列TTL
命令形式: rabbitmqctl set_policy expiry“。*”‘{“expires”:1800000}‘ - apply-to queues
管理控制台设置策略:
原文地址:https://www.cnblogs.com/8341-jack/p/9829392.html