RabbitMQ(二):mandatory标志的作用

本文转自:http://m.blog.csdn.net/article/details?id=54311277

在生产者通过channel的basicPublish方法发布消息时,通常有几个参数需要设置,为此我们有必要了解清楚这些参数代表的具体含义及其作用,查看Channel接口,会发现存在3个重载的basicPublish方法

void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException;

void basicPublish(String exchange, String routingKey, boolean mandatory, BasicProperties props, byte[] body)
            throws IOException;

void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body)
            throws IOException;

他们共有的参数分别是:
        exchange:交换机名称
        routingKey:路由键
        props:消息属性字段,比如消息头部信息等等
        body:消息主体部分
        除此之外,还有mandatory和immediate这两个参数,鉴于RabbitMQ3.0不再支持immediate标志,因此我们重点讨论mandatory标志
        mandatory的作用:

当mandatory标志位设置为true时,如果exchange根据自身类型和消息routingKey无法找到一个合适的queue存储消息,那么broker会调用basic.return方法将消息返还给生产者;当mandatory设置为false时,出现上述情况broker会直接将消息丢弃;通俗的讲,mandatory标志告诉broker代理服务器至少将消息route到一个队列中,否则就将消息return给发送者;

下面我们通过几个实例测试下mandatory标志的作用:
        测试1:设置mandatory标志,且exchange未绑定队列

public class ProducerTest {
    public static void main(String[] args) {
        String exchangeName = "confirmExchange";
        String queueName = "confirmQueue";
        String routingKey = "confirmRoutingKey";
        String bindingKey = "confirmBindingKey";
        int count = 3;

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("172.16.151.74");
        factory.setUsername("test");
        factory.setPassword("test");
        factory.setPort(5672);

        //创建生产者
        Sender producer = new Sender(factory, count, exchangeName, routingKey);
        producer.run();
    }
}

class Sender
{
    private ConnectionFactory factory;
    private int count;
    private String exchangeName;
    private String routingKey;

    public Sender(ConnectionFactory factory,int count,String exchangeName,String routingKey) {
        this.factory = factory;
        this.count = count;
        this.exchangeName = exchangeName;
        this.routingKey = routingKey;
    }

    public void run() {
        try {
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
            //创建exchange
            channel.exchangeDeclare(exchangeName, "direct", true, false, null);
            //发送持久化消息
            for(int i = 0;i < count;i++)
            {
                //第一个参数是exchangeName(默认情况下代理服务器端是存在一个""名字的exchange的,因此如果不创建exchange的话我们可以直接将该参数设置成"",如果创建了exchange的话我们需要将该参数设置成创建的exchange的名字),第二个参数是路由键
                channel.basicPublish(exchangeName, routingKey, true, MessageProperties.PERSISTENT_BASIC, ("第"+(i+1)+"条消息").getBytes());
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

第45行我们将basicPublish的第三个参数mandatory设置成了true,表示开启了mandatory标志,但我们没有为当前exchange绑定任何队列;

通过wireshark抓包看到下面输出:  

可以看到最后执行了basic.return方法,将发布者发出的消息返还给了发布者,查看协议的Arguments参数部分可以看到,Reply-Text字段值为:NO_ROUTE,表示消息并没有路由到合适的队列中;

那么我们该怎么获取到没有被正确路由到合适队列的消息呢?这时候可以通过为channel信道设置ReturnListener监听器来实现,具体实现代码见下:

public class ProducerTest {
    public static void main(String[] args) {
        String exchangeName = "confirmExchange";
        String queueName = "confirmQueue";
        String routingKey = "confirmRoutingKey";
        String bindingKey = "confirmBindingKey";
        int count = 3;

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("172.16.151.74");
        factory.setUsername("test");
        factory.setPassword("test");
        factory.setPort(5672);

        //创建生产者
        Sender producer = new Sender(factory, count, exchangeName, routingKey);
        producer.run();
    }
}

class Sender
{
    private ConnectionFactory factory;
    private int count;
    private String exchangeName;
    private String routingKey;

    public Sender(ConnectionFactory factory,int count,String exchangeName,String routingKey) {
        this.factory = factory;
        this.count = count;
        this.exchangeName = exchangeName;
        this.routingKey = routingKey;
    }

    public void run() {
        try {
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
            //创建exchange
            channel.exchangeDeclare(exchangeName, "direct", true, false, null);
            //发送持久化消息
            for(int i = 0;i < count;i++)
            {
                //第一个参数是exchangeName(默认情况下代理服务器端是存在一个""名字的exchange的,
                //因此如果不创建exchange的话我们可以直接将该参数设置成"",如果创建了exchange的话
                //我们需要将该参数设置成创建的exchange的名字),第二个参数是路由键
                channel.basicPublish(exchangeName, routingKey, true, MessageProperties.PERSISTENT_BASIC, ("第"+(i+1)+"条消息").getBytes());
            }
            channel.addReturnListener(new ReturnListener() {

                @Override
                public void handleReturn(int arg0, String arg1, String arg2, String arg3, BasicProperties arg4, byte[] arg5)
                        throws IOException {
                    //此处便是执行Basic.Return之后回调的地方
                    String message = new String(arg5);
                    System.out.println("Basic.Return返回的结果:  "+message);
                }
            });
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

在设置了ReturnListener监听器之后,broker(代理服务器)发出basic.return方法之后,就会回调第52行的handleReturn方法,在这个方法里面我们就可以进行消息的重新发布操作啦;

测试2:设置mandatory标志,且为exchange绑定队列(路由键和绑定键一致)

public class ProducerTest {
    public static void main(String[] args) {
        String exchangeName = "confirmExchange";
        String queueName = "confirmQueue";
        String routingKey = "confirmRoutingKey";
        String bindingKey = "confirmRoutingKey";
        //String bindingKey = "confirmBindingKey";
        int count = 3;

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("172.16.151.74");
        factory.setUsername("test");
        factory.setPassword("test");
        factory.setPort(5672);

        //创建生产者
        Sender producer = new Sender(factory, count, exchangeName, queueName,routingKey,bindingKey);
        producer.run();
    }
}

class Sender
{
    private ConnectionFactory factory;
    private int count;
    private String exchangeName;
    private String     queueName;
    private String routingKey;
    private String bindingKey;

    public Sender(ConnectionFactory factory,int count,String exchangeName,String queueName,String routingKey,String bindingKey) {
        this.factory = factory;
        this.count = count;
        this.exchangeName = exchangeName;
        this.queueName = queueName;
        this.routingKey = routingKey;
        this.bindingKey = bindingKey;
    }

    public void run() {
        try {
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
            //创建exchange
            channel.exchangeDeclare(exchangeName, "direct", true, false, null);
            //创建队列
            channel.queueDeclare(queueName, true, false, false, null);
            //绑定exchange和queue
            channel.queueBind(queueName, exchangeName, bindingKey);
            //发送持久化消息
            for(int i = 0;i < count;i++)
            {
                //第一个参数是exchangeName(默认情况下代理服务器端是存在一个""名字的exchange的,
                //因此如果不创建exchange的话我们可以直接将该参数设置成"",如果创建了exchange的话
                //我们需要将该参数设置成创建的exchange的名字),第二个参数是路由键
                channel.basicPublish(exchangeName, routingKey, true, MessageProperties.PERSISTENT_BASIC, ("第"+(i+1)+"条消息").getBytes());
            }
            channel.addReturnListener(new ReturnListener() {

                @Override
                public void handleReturn(int arg0, String arg1, String arg2, String arg3, BasicProperties arg4, byte[] arg5)
                        throws IOException {
                    //此处便是执行Basic.Return之后回调的地方
                    String message = new String(arg5);
                    System.out.println("Basic.Return返回的结果:  "+message);
                }
            });
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

通过抓包发现并不会有basic.return方法被调用,查看RabbitMQ管理界面发现消息已经到达了队列;


测试3:设置mandatory标志,且exchange绑定队列(路由键和绑定键不一致)

代码就是把测试2中第6行注释,第7行注释打开,注意到此时的routingKey和bindingKey是不一致的,此时我们运行程序,同时抓包得到下面截图:

注意一点,我们发送了三条消息,那么相应的应该执行三次basic.return,其中第一次和第二次basic.return显示在一行上了,第三次是单独一行,不要误认为只执行了两次,从协议的具体返回内容里我们同样看到了Reply-Text字段值是NO_ROUTE,这种现象在测试1中已经见过了;

到此,我们明白了mandatory标志的作用:在消息没有被路由到合适队列情况下会将消息返还给消息发布者,同时我们测试了哪些情况下消息不会到达合适的队列,测试1演示的是创建了exchange但是没有为他绑定队列导致的消息未到达合适队列,测试3演示的是创建了exchange同时创建了queue,但是在将两者绑定的时候,使用的bindingKey和消息发布者使用的rountingKey不一致导致的消息未到达合适队列;

时间: 2024-10-12 07:31:16

RabbitMQ(二):mandatory标志的作用的相关文章

深入学习RabbitMQ(一):mandatory标志的作用

在生产者通过channel的basicPublish方法发布消息时,通常有几个参数需要设置,为此我们有必要了解清楚这些参数代表的具体含义及其作用,查看Channel接口,会发现存在3个重载的basicPublish方法 void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException; void basicPublish(String excha

RabbitMQ(二):理解消息通信RabbitMQ

原文:RabbitMQ(二):理解消息通信RabbitMQ 一.消费者.生产者和信道 生产者(producer):生产者创建消息,然后发布(发送)到代理服务器(RabbitMQ),可以说发送消息的程序就是生产者.什么是消息?消息包含两部分:有效载荷和标签.有效载荷就是传输的数据,可以是任何内容,包括json数据和图片等等.而标签(一个叫交换器名称和可选的主题标记)描述了有效载荷,RabbitMQ用它来决定谁将获得这个消息. 消费者(consumer):消费者就是接收消息并处理消息的程序,他们连接

一个winform带你玩转rabbitMQ(二)

接上一篇内容 安装,简介和初探 下面我们接着来学习下RabbitMQ 一.  exchange属性 Type 前一章我们说了exchange的类型分为fanout,direct,topic.还有一种不常用的headers. headers这种类型的exchange绑定的时候会忽略掉routingkey,Headers是一个键值对,可以定义成成字典等.发送者在发送的时候定义一些键值对,接收者也可以再绑定时候传入一些键值对,两者匹配的话,则对应的队列就可以收到消息.匹配有两种方式all和any.这两

rabbitmq(二)原理

一.基本概念1.1 可以看到提供方提供一个Broker(消息队列实体)当中的虚拟主机->>包含了Exchange(交换器)通过binding绑定一个队列Queue 客户端再通过连接不同渠道(Channel)给客户端提供消息而一个消息队列又分几种模式 1.2VirtualHost虚拟主机.表示一批交换器,消息队列和相关对象.虚拟主机是共享相同的身份认证和加密环境的独立服务器域.每个 vhost 本质上就是一个 mini 版的 RabbitMQ 服务器,拥有自己的队列.交换器.绑定和权限机制.vh

RabbitMQ-linux安装rabbitmq(二)

说明 本地装了个虚拟机模拟集群 所以记下安装步骤 安装Erlang 安装类库 yum -y install ncurses-devel yum -y install openssl-devel yum -y install unixODBC-devel yum -y install gcc-c++ 下载otp_src资源包并安装 1.下载资源包(可以通过-P ~/download file.name 指定下载文件地址默认再~目录) wget http://erlang.org/download/

SpringBoot整合RabbitMq(二)

       本文序列化和添加package参考:https://www.jianshu.com/p/13fd9ff0648d RabbitMq安装 [[email protected] ~]# docker images REPOSITORY TAG IMAGE ID CREATED SIZE elasticsearch latest 874179f19603 11 days ago 771 MB springbootdemo4docker latest cd13bc7f56a0 2 week

rabbitMQ(二):Fanout Exchange

任何发送到Fanout Exchange的消息都会被转发到与该Exchange绑定(Binding)的所有Queue上. 1.可以理解为路由表的模式 2.这种模式不需要RouteKey 3.这种模式需要提前将Exchange与Queue进行绑定,一个Exchange可以绑定多个Queue,一个Queue可以同多个Exchange进行绑定. 4.如果接受到消息的Exchange没有与任何Queue绑定,则消息会被抛弃. 原文地址:https://www.cnblogs.com/dwxblogs/p

RabbitMQ特性

使用默认的exchange channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); 如果用空字符串去申明一个exchange,那么系统就会使用"amq.direct"这个exchange.我们在创建一个queue的时候,默认的都会有一个和新建queue同名的routingKey绑定到这个默认的exchange上去 在方法中的第一个参数是需要输入一个exchange.在RabbitMQ中,所有的消

RabbitMQ中的消息不可达returnlistener和mandatory的使用

return listener 用于处理一些不可路由的消息.    我们的消息生产者,通过指定一个exchange和routingkey,把消息送达到某一个队列中,然后我们的消费者监听队列,进行消费处理操作.    但是在某种情况下,如果我们在发送消息的时候,当前的exchange不存在或者制定的路由key路由不到,这个时候如果我们需要监听这种不可达的消息,就要使用return listener.    mandatory, 设置为true,则监听器会接收到路由不可达的消息, 然后进行处理,如果