RabbitMQ实战-死信队列

RabbitMQ死信队列

  • 场景说明
  • 代码实现
    • 简单的Util
    • 生产者
    • 消费者

场景说明

场景: 当队列的消息未正常被消费时,如何解决?

  1. 消息被拒绝并且不再重新投递
  2. 消息超过有效期
  3. 队列超载

方案: 未被消费的消息,可通过"死信队列"重新被消费

死信队列含义,发生以上情况时,该队列上的消息,可通过配置转发到死信队列,被重新消费

模拟实现:

  1. 1个生产者,2个交换机和队列(普通和死信),1个消费者(死信消费者)
  2. 通过消息超时,模拟未正常消费场景
  3. 启动死信队列消费者,等待消息...
  4. 启动生产者,绑定死信队列并发送消息
  5. 消息超时后,由死信队列消费者消费

代码实现

简单的Util

package com.lyf.springboot.utils;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class MqUtil {

    private static Connection connection = null;
    private static Channel channel = null;

    /**
     * 获取channel
     * @return
     */
    public static Channel getChannel(){
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.37.200");
        factory.setUsername("lyf");
        factory.setPassword("123456");
        factory.setVirtualHost("/lyf");
        try {
            connection = factory.newConnection();
            channel = connection.createChannel();
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
        return channel;
    }

    /**
     * 关闭channel和connection
     */
    public static void close(){
        try {
            if(channel != null){
                channel.close();
            }
            if(connection != null){
                connection.close();
            }
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
    }
}

生产者

package com.lyf.springboot.mq;

import com.lyf.springboot.utils.MqUtil;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

public class Sender {
    private static String QUEUE_NAME="hello";
    private static String EXCHANGE_NAME="exchange";

    private static String DL_EXCHANGE_NAME="dl_exchange";

    public static void main(String []args) throws IOException {
        Channel channel = MqUtil.getChannel();

        // 普通队列
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
        Map<String, Object> arguments = new HashMap<>();
        /*--------------↓↓↓最关键一步,设置队列的死信队列↓↓↓----------------*/
        // x-dead-letter-exchange属性用于指定死信队列
        arguments.put("x-dead-letter-exchange", DL_EXCHANGE_NAME);
        channel.queueDeclare(QUEUE_NAME,false,false,false,arguments);
        /*--------------↑↑↑最关键一步,设置队列的死信队列↑↑↑----------------*/
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"info");

        // 设置超时时间5000ms
        AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("5000").build();
        String msg = "hello";
        channel.basicPublish(EXCHANGE_NAME, "info", properties, msg.getBytes());
        System.out.println("Se: " + msg);

        MqUtil.close();
    }
}

消费者

package com.lyf.springboot.mq;

import com.lyf.springboot.utils.MqUtil;
import com.rabbitmq.client.*;

import java.io.IOException;

public class Dl_Reciver {
    private static String DL_EXCHANGE_NAME="dl_exchange";
    private static String DL_QUEUE_NAME="dl_hello";

    public static void main(String []args) throws IOException {
        Channel channel = MqUtil.getChannel();

        channel.exchangeDeclare(DL_EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
        channel.queueDeclare(DL_QUEUE_NAME,false,false,false,null);
        channel.queueBind(DL_QUEUE_NAME,DL_EXCHANGE_NAME,"#");
        // 消费者
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg = new String(body, "utf-8");
                System.out.println("DL_Re: " + msg);
            }
        };
        channel.basicConsume(DL_QUEUE_NAME,false,consumer);
    }
}

启动顺序: 先启动消费者监听,后启动生产者.消息5s后被死信队列消费

参考:

原文地址:https://www.cnblogs.com/linyufeng/p/11332423.html

时间: 2024-08-30 13:50:33

RabbitMQ实战-死信队列的相关文章

RabbitMQ项目使用之死信队列

消息消费失败处理方式: 一 进入死信队列(进入死信的三种方式) 1.消息被拒绝(basic.reject or basic.nack)并且requeue=false 2.消息TTL过期过期时间 3.队列达到最大长度 DLX也是一下正常的Exchange同一般的Exchange没有区别,它能在任何的队列上被指定,实际上就是设置某个队列的属性,当这个队列中有死信时,RabbitMQ就会自动的将这个消息重新发布到设置的Exchange中去,进而被路由到另一个队列, publish可以监听这个队列中消息

Rabbitmq消费失败死信队列

Rabbitmq 重消费处理 一 处理流程图: 业务交换机:正常接收发送者,发送过来的消息,交换机类型topic AE交换机: 当业务交换机无法根据指定的routingkey去路由到队列的时候,会全部发送到AE交换机.发送到此队列的消息属于,业务垃圾消息,或者攻击消息类型,交换机类型fanout 死信交换机:用于处理消费者,消费失败回退的消息,根据死信交换机的routingkey发送到死信队列,交换机类型 topic EXAMPLE: 业务routingkey: hello/task_queue

RabbitMQ与.net core(四) 消息的优先级 与 死信队列

1.消息的优先级 假如现在有个需求,我们需要让一些优先级最高的通知推送到客户端,我们可以使用redis的sortedset,也可以使用我们今天要说的rabbit的消息优先级属性 Producer代码 using RabbitMQ.Client; using System; using System.Collections.Generic; using System.Text; using System.Threading; using System.Threading.Tasks; namesp

RabbitMQ 死信队列DLX

死信队列的简单介绍 利用dlx,当消息在一个队列中变成死信之后,它能被重新publish到另一个exchange,这个exchange就是dlx消息变成死信的以下几种情况 消息被拒绝,并且requeue= false 消息ttl过期 队列达到最大的长度dlx也是一个正常的exchange,和一般的exchange没什么区别,它能在任何的队列上被指定,实际上就是设置某个队列的属性.当这个队列中有死信时,rabbitmq就会自动的将这个消息重新发布到设置的exchange上,进而被路由到另一个队列.

RabbitMQ实现延时队列(死信队列)

基于队列和基于消息的TTL TTL是time to live 的简称,顾名思义指的是消息的存活时间.rabbitMq可以从两种维度设置消息过期时间,分别是队列和消息本身. 队列消息过期时间-Per-Queue Message TTL: 通过设置队列的x-message-ttl参数来设置指定队列上消息的存活时间,其值是一个非负整数,单位为微秒.不同队列的过期时间互相之间没有影响,即使是对于同一条消息.队列中的消息存在队列中的时间超过过期时间则成为死信. 死信交换机DLX 队列中的消息在以下三种情况

RabbitMQ死信队列

死信队列DLX,全称为Dead-Letter Exchange,可以称之为死信交换器,也有人称之为死信邮箱.当消息在一个队列中变成死信(dead message)之后,它能被重新被发送到另一个交换器中,这个交换器就是DLX,绑定DLX的队列就称之为死信队列.消息变成死信-般是由于以下几种情况:1.消息被拒绝(basic.reject或basic.nack)并且requeue=false.2.消息TTL过期[消息由于消息有效期(per-message TTL)过期]3.队列达到最大长度(队列满了,

rabbitmq~消息失败后重试达到 TTL放到死信队列(事务型消息补偿机制)

这是一个基于消息的分布式事务的一部分,主要通过消息来实现,生产者把消息发到队列后,由消费方去执行剩下的逻辑,而当消费方处理失败后,我们需要进行重试,即为了最现数据的最终一致性,在rabbitmq里,它有消息重试和重试次数的配置,但当你配置之后,你的TTL达到 后,消息不能自动放入死信队列,所以这块需要手工处理一下. rabbitmq关于消息重试的配置 rabbitmq: host: xxx port: xxx username: xxx password: xxx virtual-host: x

Spring Boot系列(8)——RabbitMQ确认、退回模式及死信队列

〇.什么是消息队列 参考:新手也能看懂,消息队列其实很简单    RabbitMQ运行模型与名词解释 一.应答模式 1.什么是应答? 消息投递到交换器(exchange)中,交换器给我们的反馈,是保障消息投递成功的一种机制. 2.测试 配置: 1 #选择确认类型为交互 2 spring.rabbitmq.publisher-confirm-type=correlated 测试方法: 1 @Test 2 /** 3 * the test is testing confirm-function in

Java SpringBoot集成RabbitMq实战和总结

目录 交换器.队列.绑定的声明 关于消息序列化 同一个队列多消费类型 注解将消息和消息头注入消费者方法 关于消费者确认 关于发送者确认模式 消费消息.死信队列和RetryTemplate RPC模式的消息(不常用) 关于消费模型 关于RabbitMq客户端的线程模型 在公司里一直在用RabbitMQ,由于api已经封装的很简单,关于RabbitMQ本身还有封装的实现没有了解,最近在看RabbitMQ实战这本书,结合网上的一些例子和spring文档,实现了RabbitMQ和spring的集成,对着