RabbitMQ 死信队列DLX

死信队列的简单介绍

利用dlx,当消息在一个队列中变成死信之后,它能被重新publish到另一个exchange,这个exchange就是dlx
消息变成死信的以下几种情况
消息被拒绝,并且requeue= false
消息ttl过期
队列达到最大的长度
dlx也是一个正常的exchange,和一般的exchange没什么区别,它能在任何的队列上被指定,实际上就是设置某个队列的属性。
当这个队列中有死信时,rabbitmq就会自动的将这个消息重新发布到设置的exchange上,进而被路由到另一个队列。
可以监听这个队列中消息做相应的处理,这个特性可以弥补rabbitMq3.0以前支持的immediate参数的功能。

消费端:

package com.flying.rabbitmq.api.dlx;

import java.util.HashMap;
import java.util.Map;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;

public class Consumer {

    public static void main(String[] args) throws Exception {

        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.11.76");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");

        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();

        // 这就是一个普通的交换机 和 队列 以及路由
        String exchangeName = "test_dlx_exchange";
        String routingKey = "dlx.#";
        String queueName = "test_dlx_queue";

        channel.exchangeDeclare(exchangeName, "topic", true, false, null);

        Map<String, Object> agruments = new HashMap<String, Object>();
        agruments.put("x-dead-letter-exchange", "dlx.exchange");
        //这个agruments属性,要设置到声明队列上
        channel.queueDeclare(queueName, true, false, false, agruments);
        channel.queueBind(queueName, exchangeName, routingKey);

        //要进行死信队列的声明:
        channel.exchangeDeclare("dlx.exchange", "topic", true, false, null);
        channel.queueDeclare("dlx.queue", true, false, false, null);
        channel.queueBind("dlx.queue", "dlx.exchange", "#");

        channel.basicConsume(queueName, true, new MyConsumer(channel));

    }
}

自定义消费端:

package com.flying.rabbitmq.api.dlx;

import java.io.IOException;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

public class MyConsumer extends DefaultConsumer {

    public MyConsumer(Channel channel) {
        super(channel);
    }

    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        System.err.println("-----------consume message----------");
        System.err.println("consumerTag: " + consumerTag);
        System.err.println("envelope: " + envelope);
        System.err.println("properties: " + properties);
        System.err.println("body: " + new String(body));
    }

}

生产端:

package com.flying.rabbitmq.api.dlx;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class Producer {

    public static void main(String[] args) throws Exception {

        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.11.76");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");

        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();

        String exchange = "test_dlx_exchange";
        String routingKey = "dlx.save";

        String msg = "Hello RabbitMQ DLX Message";

        for(int i =0; i<1; i ++){

            AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
                    .deliveryMode(2)
                    .contentEncoding("UTF-8")
                    .expiration("10000")
                    .build();
            channel.basicPublish(exchange, routingKey, true, properties, msg.getBytes());
        }

    }
}

原文地址:https://www.cnblogs.com/lflying/p/11107435.html

时间: 2024-11-01 14:37:16

RabbitMQ 死信队列DLX的相关文章

RabbitMQ死信队列

死信队列DLX,全称为Dead-Letter Exchange,可以称之为死信交换器,也有人称之为死信邮箱.当消息在一个队列中变成死信(dead message)之后,它能被重新被发送到另一个交换器中,这个交换器就是DLX,绑定DLX的队列就称之为死信队列.消息变成死信-般是由于以下几种情况:1.消息被拒绝(basic.reject或basic.nack)并且requeue=false.2.消息TTL过期[消息由于消息有效期(per-message TTL)过期]3.队列达到最大长度(队列满了,

RabbitMQ实战-死信队列

RabbitMQ死信队列 场景说明 代码实现 简单的Util 生产者 消费者 场景说明 场景: 当队列的消息未正常被消费时,如何解决? 消息被拒绝并且不再重新投递 消息超过有效期 队列超载 方案: 未被消费的消息,可通过"死信队列"重新被消费 死信队列含义,发生以上情况时,该队列上的消息,可通过配置转发到死信队列,被重新消费 模拟实现: 1个生产者,2个交换机和队列(普通和死信),1个消费者(死信消费者) 通过消息超时,模拟未正常消费场景 启动死信队列消费者,等待消息... 启动生产者

RabbitMQ项目使用之死信队列

消息消费失败处理方式: 一 进入死信队列(进入死信的三种方式) 1.消息被拒绝(basic.reject or basic.nack)并且requeue=false 2.消息TTL过期过期时间 3.队列达到最大长度 DLX也是一下正常的Exchange同一般的Exchange没有区别,它能在任何的队列上被指定,实际上就是设置某个队列的属性,当这个队列中有死信时,RabbitMQ就会自动的将这个消息重新发布到设置的Exchange中去,进而被路由到另一个队列, publish可以监听这个队列中消息

RabbitMQ与.net core(四) 消息的优先级 与 死信队列

1.消息的优先级 假如现在有个需求,我们需要让一些优先级最高的通知推送到客户端,我们可以使用redis的sortedset,也可以使用我们今天要说的rabbit的消息优先级属性 Producer代码 using RabbitMQ.Client; using System; using System.Collections.Generic; using System.Text; using System.Threading; using System.Threading.Tasks; namesp

RabbitMQ实现延时队列(死信队列)

基于队列和基于消息的TTL TTL是time to live 的简称,顾名思义指的是消息的存活时间.rabbitMq可以从两种维度设置消息过期时间,分别是队列和消息本身. 队列消息过期时间-Per-Queue Message TTL: 通过设置队列的x-message-ttl参数来设置指定队列上消息的存活时间,其值是一个非负整数,单位为微秒.不同队列的过期时间互相之间没有影响,即使是对于同一条消息.队列中的消息存在队列中的时间超过过期时间则成为死信. 死信交换机DLX 队列中的消息在以下三种情况

Spring Boot系列(8)——RabbitMQ确认、退回模式及死信队列

〇.什么是消息队列 参考:新手也能看懂,消息队列其实很简单    RabbitMQ运行模型与名词解释 一.应答模式 1.什么是应答? 消息投递到交换器(exchange)中,交换器给我们的反馈,是保障消息投递成功的一种机制. 2.测试 配置: 1 #选择确认类型为交互 2 spring.rabbitmq.publisher-confirm-type=correlated 测试方法: 1 @Test 2 /** 3 * the test is testing confirm-function in

Rabbitmq消费失败死信队列

Rabbitmq 重消费处理 一 处理流程图: 业务交换机:正常接收发送者,发送过来的消息,交换机类型topic AE交换机: 当业务交换机无法根据指定的routingkey去路由到队列的时候,会全部发送到AE交换机.发送到此队列的消息属于,业务垃圾消息,或者攻击消息类型,交换机类型fanout 死信交换机:用于处理消费者,消费失败回退的消息,根据死信交换机的routingkey发送到死信队列,交换机类型 topic EXAMPLE: 业务routingkey: hello/task_queue

rabbitmq~消息失败后重试达到 TTL放到死信队列(事务型消息补偿机制)

这是一个基于消息的分布式事务的一部分,主要通过消息来实现,生产者把消息发到队列后,由消费方去执行剩下的逻辑,而当消费方处理失败后,我们需要进行重试,即为了最现数据的最终一致性,在rabbitmq里,它有消息重试和重试次数的配置,但当你配置之后,你的TTL达到 后,消息不能自动放入死信队列,所以这块需要手工处理一下. rabbitmq关于消息重试的配置 rabbitmq: host: xxx port: xxx username: xxx password: xxx virtual-host: x

Go RabbitMQ 死信消息队列(二)

实现原理: /** (1)创建一个正常的队列 Q1,目的是处理业务逻辑,比如发送订单消息等 ,对应交换器和绑定键  分别为  E1 和  Bingkey1 (2)创建一个延时消息队列 Q2,设定队列的延时时间为10s,对应的交换器和绑定键分别为 E2和Bingkey2;并在该队列创建时候,设定队列的  (a)超时时间 (b) 超时后跳转的 路由E1和绑定Bingkey1,即超时后跳到     队列Q1上 (3) 将消息先发送到 队列Q2 上,然后等着队列超时,执行逻辑 * 主要测试一个死信队列,