RabbitMQ - exchange

总结一下几种ExchangeTypes。

之前写发布/订阅模式时第一次提到了exchange type。
即producer不是将消息直接放到队列中,而是先到exchange中,exchange主要用于控制消息到队列的路由,根据具体的exchange type将消息传给需要的队列或者直接废弃。
在这一篇中总结一下那些用到的exchange type。

一.Direct Exchange
direct exchange算是最基本的了。
direct exchange用于将带上routing key的消息传值拥有相同routing key的队列中。


当我们想用一个简单的标识符区别所有传入同一个exchange中的消息时direct exchange就非常合适。

private static String DIRECT_EXCHANGE = "DIRECT_EXCHAGNE";

    static class FanoutProducer {
        public static void main(String[] args) throws IOException {
            ConnectionFactory connectionFactory = new ConnectionFactory();
            Connection connection = connectionFactory.newConnection();
            Channel channel= connection.createChannel();;

            String content = "I miss the conversation";
            channel.exchangeDeclare(DIRECT_EXCHANGE, ExchangeTypes.DIRECT);
            channel.basicPublish(DIRECT_EXCHANGE, "alvez", null, content.getBytes());
        }
    }

    static class FanoutConsumer {
        public static void main(String[] args) throws IOException, InterruptedException {
            ConnectionFactory connectionFactory = new ConnectionFactory();
            Connection connection = connectionFactory.newConnection();
            Channel channel= connection.createChannel();

            String queueName = channel.queueDeclare().getQueue();
            channel.queueBind(queueName, DIRECT_EXCHANGE, "alvez");

            QueueingConsumer consumer = new QueueingConsumer(channel);
            String s = channel.basicConsume(queueName, true, consumer);
            System.out.println(s);
            while (true) {
                QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                String message = new String(delivery.getBody());
                String routingKey = delivery.getEnvelope().getRoutingKey();

                System.out.println("From:" + routingKey + "‘:‘" + message + "‘");
            }

        }}

二.Fanout Exchange
fanout和routing key无关,它将消息无差别地(indiscriminately)传送给所有队列。

fanout exchange通常用于发布/订阅模式。
将消息传送给不同的队列,不同的队列对同一种消息采取不同的行为。
比如,现在有一个客户订单消息被三个队列接收,队列1完成该订单,队列2将订单写入日志,队列3将订单发给别的部门什么的。
比如下面的代码,消费者可以获得routing key并输出,但能否获取与routing key无关:

    private static String FANOUT_EXCHANGE = "FANOUT_EXCHANGE";

    static class DirectProducer {
        public static void main(String[] args) throws IOException {
            ConnectionFactory connectionFactory = new ConnectionFactory();
            Connection connection = connectionFactory.newConnection();
            Channel channel= connection.createChannel();;

            String content = "I miss the conversation";
            channel.exchangeDeclare(FANOUT_EXCHANGE, ExchangeTypes.FANOUT);
            channel.basicPublish(FANOUT_EXCHANGE, "alvez", null, content.getBytes());
        }
    }

    static class DirectConsumer {
        public static void main(String[] args) throws IOException, InterruptedException {
            ConnectionFactory connectionFactory = new ConnectionFactory();
            Connection connection = connectionFactory.newConnection();
            Channel channel= connection.createChannel();

            String queueName = channel.queueDeclare().getQueue();
            channel.queueBind(queueName, FANOUT_EXCHANGE, "");

            QueueingConsumer consumer = new QueueingConsumer(channel);
            String s = channel.basicConsume(queueName, true, consumer);
            System.out.println(s);
            while (true) {
                QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                String message = new String(delivery.getBody());
                String routingKey = delivery.getEnvelope().getRoutingKey();

                System.out.println("From:" + routingKey + "‘:‘" + message + "‘");
            }

        }

    }

三.Topic Exchange
如果根据topic exchange用法说明其特征的话反而更麻烦。
topic exchange正如其名,就是根据某种主题而不是特定的标题,也就是可以匹配routing key的一部分或者全部。
topic exchange的routing key可以有多个词组成,词用‘.‘分隔。
routing key中可以包括‘*‘或者‘#‘,‘*‘表示一个词,‘#‘表示0~N个词。

比如消息发布时routing key为"honda.civic.navy",
能接收消息的队列的routing key可以是"honda.civic.navy"或"*.civic.*"或"honda.#"或"#",
但不能是"honda.accord.navy"或"honda.accord.silver"或"*.accord.*"或"ford.#"。

    private static String TOPIC_EXCHANGE = "TOPIC_EXCHAGNE";

    static class TopicProducer {
        public static void main(String[] args) throws IOException {
            ConnectionFactory connectionFactory = new ConnectionFactory();
            Connection connection = connectionFactory.newConnection();
            Channel channel= connection.createChannel();;

            String content = "I miss the conversation";
            channel.exchangeDeclare(TOPIC_EXCHANGE, ExchangeTypes.TOPIC);
            channel.basicPublish(TOPIC_EXCHANGE, "alvez.dep.FBI.map", null, content.getBytes());
        }
    }

    static class TopicConsumer {
        public static void main(String[] args) throws IOException, InterruptedException {
            ConnectionFactory connectionFactory = new ConnectionFactory();
            Connection connection = connectionFactory.newConnection();
            Channel channel= connection.createChannel();

            String queueName = channel.queueDeclare().getQueue();
            channel.queueBind(queueName, TOPIC_EXCHANGE, "alvez.#");

            QueueingConsumer consumer = new QueueingConsumer(channel);
            String s = channel.basicConsume(queueName, true, consumer);
            System.out.println(s);
            while (true) {
                QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                String message = new String(delivery.getBody());
                String routingKey = delivery.getEnvelope().getRoutingKey();

                System.out.println("From:" + routingKey + "‘:‘" + message + "‘");
            }

        }
}

四.Headers Exchange
即消息头和队列中声明的消息头匹配时可以通信。
就可以定义多个条件进行匹配这一点来说,headers exchange和topic exchange有些相似。
有时候会给人"为什么会有这种东西?"的感觉,相比topic exchage有什么优势?
如果仅仅说"headers exchange是基于headers的,topic exchange是基于routing key的",这种回答没什么意义。

代码如下,可以看到producer和consumer的routing key是不同的,producer的header通过properties对象传输:

private static String HEADERS_EXCHANGE = "HEADERS_EXCHANGE";

    static class HeadersProducer {
        public static void main(String[] args) throws IOException {
            ConnectionFactory connectionFactory = new ConnectionFactory();
            Connection connection = connectionFactory.newConnection();
            Channel channel = connection.createChannel();

            String content = "I miss the conversation";
            channel.exchangeDeclare(HEADERS_EXCHANGE, ExchangeTypes.HEADERS);
            AMQP.BasicProperties properties = new AMQP.BasicProperties();
            Map<String,Object> map = new HashMap<>();
            map.put("key1","val1");
            properties.setHeaders(map);

            channel.basicPublish(HEADERS_EXCHANGE, "alvez", properties, content.getBytes());

        }
    }

    static class HeadersConsumer {
        public static void main(String[] args) throws IOException, InterruptedException {
            ConnectionFactory connectionFactory = new ConnectionFactory();
            Connection connection = connectionFactory.newConnection();
            Channel channel = connection.createChannel();

            String queueName = channel.queueDeclare().getQueue();
            Map<String, Object> headers = new HashMap<>();
            headers.put("key1","val1");
            headers.put("key2","val2");
            headers.put("key3","val3");
            headers.put("key4","val4");
            channel.queueBind(queueName, HEADERS_EXCHANGE, "",headers);

            QueueingConsumer consumer = new QueueingConsumer(channel);
            String s = channel.basicConsume(queueName, true, consumer);
            System.out.println(s);
            while (true) {
                QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                String message = new String(delivery.getBody());
                String routingKey = delivery.getEnvelope().getRoutingKey();

                System.out.println("From:" + routingKey + "‘:‘" + message + "‘");
            }

        }

    }

(ps:图不错,感谢图片作者。)

时间: 2024-12-16 16:47:33

RabbitMQ - exchange的相关文章

5、RabbitMQ - Exchange之 fanout \ 【direct 关键字发送】 \ topic

pytho系列之 RabbitMQ - Exchange几种模式 RabbitMQ中,所有生产者提交的消息都由Exchange来接受,然后Exchange按照特定的策略转发到Queue进行存储 RabbitMQ提供了四种Exchange:fanout, direct, topic, headerheader模式在实际使用中较少,本文只对前三种模式进行比较.性能排序:fanout > direct >> topic .比例大约为11:10:6 六.关键字发送 exchange type =

RabbitMQ Exchange类型详解

前言 在上一篇文章中,我们知道了RabbitMQ的消息流程如下: 但在具体的使用中,我们还需知道exchange的类型,因为不同的类型对应不同的队列和路由规则. 在rabbitmq中,exchange有4个类型:direct,topic,fanout,header. direct exchange 此类型的exchange路由规则很简单: exchange在和queue进行binding时会设置routingkey channel.QueueBind(queue: "create_pdf_que

rabbitmq exchange type

This is the fourth installment to the series: RabbitMQ for Windows.  In thelast installment, we reviewed our Hello World example and introduced the concept of Exchanges.  In this installment, we'll discuss the four basic types of RabbitMQ exchanges.

RabbitMQ Exchange中的fanout类型

fanout 多播 在之前都是使用direct直连类型的交换机,通过routingkey来决定把消息推到哪个queue中. 而fanout则是把拿到消息推到与之绑定的所有queue中. 分析业务,怎样的场景需要它呢?某个用户注册了网站的用户,一般我们需要发送短信和邮件通知,莫非要在同一个consumer中把这两件事都做了?这不符合单一职责,可是发送的消息是一样的,只是方式不一样.要使用两种routingkey都发送一次?这显然也不是我们想要的.所以fanout出现了 fanout类型的excha

Rabbitmq Exchange Type 说明

Exchange在定义的时候是有类型的,以决定到底是哪些Queue符合条件,可以接收消息 fanout 所有bind到此exchange的queue都可以接收消息 direct 通过routingKey和exchange决定的那个唯一的queue可以接收消息 topic 所有符合routingKey(此时可以是一个表达式)的routingKey所bind的queue可以接收消息 表达式符号说明:#代表一个或多个字符,*代表任何字符 例:#.a会匹配a.a,aa.a,aaa.a等 *.a会匹配a.

RabbitMQ Exchange分类学习 渣渣翻译

Default Exchange 默认交换机是broker事先声明的一个无名字(空字符串)的直接交换机(direct Exchange),它有一个让它很有用或对一般简单应用很起作用的特点:每创建一个队列,都会以队列名称作为路由键(Route Key)被绑定到该交换机上.   举个例子:创建队列“search-indexing-online”,AMQP broker会把“search-indexing-online”作为路由键,绑定到默认交换机上.因此,发送以“search-indexing-on

RabbitMQ指南之五:主题交换器(Topic Exchange)

在上一章中,我们完善了我们的日志系统,用direct交换器替换了fanout交换器,使得我们可以有选择性地接收消息.尽管如此,仍然还有限制:不能基于多个标准进行路由.在我们的日志系统中,我们可能不仅希望根据日志等级订阅日志,还希望根据日志来源订阅日志.这个概念来自于unix工具syslog,它不仅可以根据日志等级(info/warn/crit...)来路由日志,同时还可以根据设备(auth/cron/kern...)来路由日志.这将更加灵活,我们可能希望只监听来自'cron'的error级别日志

SpringCloud中Rabbitmq的使用

1.pom配置 <dependency>    <groupId>org.springframework.cloud</groupId>    <artifactId>spring-cloud-starter-bus-amqp</artifactId> </dependency> 2.配置文件配置 #RabbitMqspring.rabbitmq.host=${dev.spring.rabbitmq.host}spring.rabbi

rabbitMQ学习(七)

反馈模式 在消费端接收到消息后,会反馈给服务器信息. 连接代码: import java.io.IOException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class GetChannel { private static Connection connection=null;