RabbitMQ中的消息不可达returnlistener和mandatory的使用

return listener 用于处理一些不可路由的消息。
    我们的消息生产者,通过指定一个exchange和routingkey,把消息送达到某一个队列中,然后我们的消费者监听队列,进行消费处理操作。
    但是在某种情况下,如果我们在发送消息的时候,当前的exchange不存在或者制定的路由key路由不到,这个时候如果我们需要监听这种不可达的消息,就要使用return listener。
    mandatory, 设置为true,则监听器会接收到路由不可达的消息, 然后进行处理,如果设置为false,那么broker端自动删除该消息。

消费者:

package com.flying.rabbitmq.api.returnlistener;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;

public class Consumer {

    public static void main(String[] args) throws Exception {

        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");

        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();

        String exchangeName = "test_return_exchange";
        String routingKey = "return.#";
        String queueName = "test_return_queue";

        channel.exchangeDeclare(exchangeName, "topic", true, false, null);
        channel.queueDeclare(queueName, true, false, false, null);
        channel.queueBind(queueName, exchangeName, routingKey);

        QueueingConsumer queueingConsumer = new QueueingConsumer(channel);

        channel.basicConsume(queueName, true, queueingConsumer);

        while(true){

            Delivery delivery = queueingConsumer.nextDelivery();
            String msg = new String(delivery.getBody());
            System.err.println("消费者: " + msg);
        }

    }
}

生产者:

package com.flying.rabbitmq.api.returnlistener;

import com.rabbitmq.client.*;

import java.io.IOException;

/**
 *  当mandatory标志位设置为true时,
 *  如果exchange根据自身类型和消息routingKey无法找到一个合适的queue存储消息,
 *  那么broker会调用basic.return方法将消息返还给生产者;当mandatory设置为false时,
 *  出现上述情况broker会直接将消息丢弃;通俗的讲,mandatory标志告诉broker代理服务器至少将消息route到一个队列中,
 *  否则就将消息return给发送者;
 */
public class Producer {

    public static void main(String[] args) throws Exception {

        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");

        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();

        String exchange = "test_return_exchange";
        String routingKey = "return.save";
        String routingKeyError = "abc.save";

        String msg = "Hello RabbitMQ Return Message";

        channel.addReturnListener(new ReturnListener() {
            @Override
            public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {

                System.err.println("---------handle  return----------");
                System.err.println("replyCode: " + replyCode);
                System.err.println("replyText: " + replyText);
                System.err.println("exchange: " + exchange);
                System.err.println("routingKey: " + routingKey);
                System.err.println("properties: " + properties);
                System.err.println("body: " + new String(body));
            }
        });

        channel.basicPublish(exchange,routingKey,true,null,msg.getBytes());

    }
}

原文地址:https://www.cnblogs.com/lflying/p/11107327.html

时间: 2024-11-05 15:53:50

RabbitMQ中的消息不可达returnlistener和mandatory的使用的相关文章

RabbitMQ 中 Connection 和 Channel 详解

我们知道无论是生产者还是消费者,都需要和 RabbitMQ Broker 建立连接,这个连接就是一条 TCP 连接,也就是 Connection. 一旦 TCP 连接建立起来,客户端紧接着可以创建一个 AMQP 信道(Channel),每个信道都会被指派一个唯一的 ID. 信道是建立在 Connection 之上的虚拟连接,RabbitMQ 处理的每条 AMQP 指令都是通过信道完成的. 我们完全可以使用 Connection 就能完成信道的工作,为什么还要引入信道呢? 试想这样一个场景,一个应

在C#中使用消息队列RabbitMQ

参考文章:http://www.cnblogs.com/qy1141/p/4054135.html 开发环境&工具: VS2017 RabbitMq Erlang运行环境 先安装Erlang运行环境然后再安装RabbitMq 安装和配置就不说了 默认安装路径:C:\Program Files\RabbitMQ Server,在rabbitmq_server-3.6.11\sbin文件夹下有bat文件 默认配置文件路径: C:\Users\wangshibang\AppData\Roaming\R

Python实现RabbitMQ中6种消息模型

RabbitMQ与Redis对比 ? RabbitMQ是一种比较流行的消息中间件,之前我一直使用redis作为消息中间件,但是生产环境比较推荐RabbitMQ来替代Redis,所以我去查询了一些RabbitMQ的资料.相比于Redis,RabbitMQ优点很多,比如: 具有消息消费确认机制 队列,消息,都可以选择是否持久化,粒度更小.更灵活. 可以实现负载均衡 RabbitMQ应用场景 异步处理:比如用户注册时的确认邮件.短信等交由rabbitMQ进行异步处理 应用解耦:比如收发消息双方可以使用

spring boot Rabbitmq集成,延时消息队列实现

本篇主要记录Spring boot 集成Rabbitmq,分为两部分, 第一部分为创建普通消息队列, 第二部分为延时消息队列实现: spring boot提供对mq消息队列支持amqp相关包,引入即可: [html] view plain copy <!-- rabbit mq --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-

RabbitMQ中 exchange、route、queue的关系

从AMQP协议可以看出,MessageQueue.Exchange和Binding构成了AMQP协议的核心,下面我们就围绕这三个主要组件    从应用使用的角度全面的介绍如何利用Rabbit MQ构建消息队列以及使用过程中的注意事项. 1. 声明MessageQueue 在Rabbit MQ中,无论是生产者发送消息还是消费者接受消息,都首先需要声明一个MessageQueue.这就存在一个问题,是生产者声明还是消费者声明呢?要解决这个问题,首先需要明确: a)消费者是无法订阅或者获取不存在的Me

RabbitMQ Consumer获取消息的两种方式(poll,subscribe)解析

以下转自:http://blog.csdn.net/yangbutao/article/details/10395599 rabbitMQ中consumer通过建立到queue的连接,创建channel对象,通过channel通道获取message, Consumer可以声明式的以API轮询poll的方式主动从queue的获取消息,也可以通过订阅的方式被动的从Queue中消费消息, 最近翻阅了基于java的客户端的相关源码,简单做个分析. 编程模型伪代码如下: ConnectionFactory

RabbitMq中的交换机

Rabbitmq的核心概念(如下图所示):有虚拟主机.交换机.队列.绑定: 交换机可以理解成具有路由表的路由程序,仅此而已.每个消息都有一个称为路由键(routing key)的属性,就是一个简单的字符串. 最新版本的RabbitMQ有四种交换机类型,分别是Direct exchange.Fanout exchange.Topic exchange.Headers exchange. Direct Exchange – 处理路由键.需要将一个队列绑定到交换机上,要求该消息与一个特定的路由键完全匹

如何保证MQ消息必达

此文章属于笔记,原属58沈剑 一.MQ消息必达,架构上的两个核心设计点: 消息落地 消息超时.重传.确认 四大部件:发送端 接收端 服务端 固化存储组成 二.上半场消息必达以及消息重复问题 上半场的流程 发送端MQ-client 将消息发送给服务端MQ-server 服务端MQ-server将消息落地 服务端MQ-server 回ACK(表示确认) 2.如果3丢失 发送端在超时后,又会发送一遍,此时重发是MQ-client发起的,消息处理的是MQ-server 为了避免2 重复落地,对每条MQ消

第二百九十一节,RabbitMQ多设备消息队列

RabbitMQ多设备消息队列-安装与简介 RabbitMQ简介 解释RabbitMQ,就不得不提到AMQP(Advanced Message Queuing Protocol)协议. AMQP协议是一种基于网络的消息传输协议,它能够在应用或组织之间提供可靠的消息传输.RabbitMQ是该AMQP协议的一种实现,利用它,可以将消息安全可靠的从发 送方传输到接收方.简单的说,就是消息发送方利用RabbitMQ将信息安全的传递给接收方. RabbitMQ是一个在AMQP基础上完整的,可复用的企业消息