RabbitMQ指南之四:路由(Routing)和直连交换机(Direct Exchange)

  在上一章中,我们构建了一个简单的日志系统,我们可以把消息广播给很多的消费者。在本章中我们将增加一个特性:我们可以订阅这些信息中的一些信息。例如,我们希望只将error级别的错误存储到硬盘中,同时可以将所有级别(error、info、warning等)的日志都打印在控制台上。

1、绑定(Bindings)

  在上一章中,我们已经创建了绑定关系,回顾一下代码:

1 channel.queueBind(queueName, EXCHANGE_NAME, "");

  一个绑定是一个交换器与队列之间的关系。意思是指:这个队列对这个交换器的消息感兴趣。

  该方法同时还有另一个routing Key参数,为了避免与basic_public参数产生中的路由键(routing key)混淆,我们称之为绑定键(bingind key),下面展示了如何通过一个绑定key创建一个绑定:

1 channel.queueBind(queueName, EXCHANGE_NAME, "black");

  注意,这个绑定键(这里是"black")的含义依赖于交换器的类型。比如在我们的日志系统中,交换器类型为fanout,此时,绑定键没有任何意义,会被忽略掉。

2、直连交换机(Direct Exchange)

  在我们之前的日志系统中,所有的消息被广播给所有的消费者,但是本章的需要是希望有一个程序可以只接收error级别的日志并保存到磁盘中,而不用浪费空间去存储那些info、warning级别的日志。

  我们正在用的广播模式的交换器并不够灵活,它只是不加思索地进行广播。因此,需要使用direct exchange来代替。直连交换器的路由算法非常简单:将消息推送到binding key与该消息的routing key相同的队列。

  为了说明这点,请看下图:

  

  在该图中,直连交换器X上绑定了两个队列。第一个队列绑定了绑定键orange,第二个队列有两个绑定键:black和green。在这种场景下,一个消息在布时指定了路由键为orange将会只被路由到队列Q1,路由键为black和green的消息都将被路由到队列Q2。其他的消息都将被丢失。

3、多重绑定

  

  同一个绑定键可以绑定到不同的队列上去,在上图中,我们也可以增加一个交换器X与队列Q2的绑定键,在这种情况下,直连交换器将会和广播交换器有着相同的行为,将消息推送到所有匹配的队列。一个路由键为black的消息将会同时被推送到队列Q1和Q2。

4、发送日志

  首先我们要一如既往地创建一个交换器:

1 channel.exchangeDeclare(EXCHANGE_NAME, "direct");

  并准备发送消息:

1 channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());

  我们需要确保在我们日志系统中参数"severity"是“info”、“warning”和“error”中的一个。

5、订阅

  创建接收消息与上一章基本相同,唯一不同的是,需要在创建绑定关系时,指定severity的值:

1 String queueName = channel.queueDeclare().getQueue();
2
3 for(String severity : argv){
4   channel.queueBind(queueName, EXCHANGE_NAME, severity);
5 }

6、完整的代码

  EmitLogDirect.java

 1 import com.rabbitmq.client.Channel;
 2 import com.rabbitmq.client.Connection;
 3 import com.rabbitmq.client.ConnectionFactory;
 4
 5 public class EmitLogDirect {
 6
 7   private static final String EXCHANGE_NAME = "direct_logs";
 8
 9   public static void main(String[] argv) throws Exception {
10     ConnectionFactory factory = new ConnectionFactory();
11     factory.setHost("localhost");
12     try (Connection connection = factory.newConnection();
13          Channel channel = connection.createChannel()) {
14         channel.exchangeDeclare(EXCHANGE_NAME, "direct");
15
16         String severity = getSeverity(argv);
17         String message = getMessage(argv);
18
19         channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes("UTF-8"));
20         System.out.println(" [x] Sent ‘" + severity + "‘:‘" + message + "‘");
21     }
22   }
23   //..
24 }

  ReceiveLogsDirect.java

 1 import com.rabbitmq.client.*;
 2
 3 public class ReceiveLogsDirect {
 4
 5   private static final String EXCHANGE_NAME = "direct_logs";
 6
 7   public static void main(String[] argv) throws Exception {
 8     ConnectionFactory factory = new ConnectionFactory();
 9     factory.setHost("localhost");
10     Connection connection = factory.newConnection();
11     Channel channel = connection.createChannel();
12
13     channel.exchangeDeclare(EXCHANGE_NAME, "direct");
14     String queueName = channel.queueDeclare().getQueue();
15
16     if (argv.length < 1) {
17         System.err.println("Usage: ReceiveLogsDirect [info] [warning] [error]");
18         System.exit(1);
19     }
20
21     for (String severity : argv) {
22         channel.queueBind(queueName, EXCHANGE_NAME, severity);
23     }
24     System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
25
26     DeliverCallback deliverCallback = (consumerTag, delivery) -> {
27         String message = new String(delivery.getBody(), "UTF-8");
28         System.out.println(" [x] Received ‘" +
29             delivery.getEnvelope().getRoutingKey() + "‘:‘" + message + "‘");
30     };
31     channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
32   }
33 }

  为了测试方便,我们可以把"info"、"error"、"warning"都绑定到一个队列上去,然后生产者分别往"info"、"error"、"warning"发送消息:

  此时查看RabbitMq控制台:

  

  到此,发布-订阅涉及到的相关知识点都讲解完了,下一章将讲解Topic(主题模式)。

原文地址:https://www.cnblogs.com/wuhenzhidu/p/10801103.html

时间: 2024-10-10 01:58:24

RabbitMQ指南之四:路由(Routing)和直连交换机(Direct Exchange)的相关文章

Rabbit的直连交换机direct

直连交换机类型为:direct.加入了路由键routingKey的概念. 就是说 生产者投递消息给指定交换机的指定路由键. 只有绑定了此交换机指定路由键的消息队列才可以收到消息. 生产者: package com.kf.queueDemo.exchange.direct; import java.io.IOException; import java.util.concurrent.TimeoutException; import com.kf.utils.RabbitConnectionUti

RabbitMQ官方中文入门教程(PHP版) 第四部分:路由(Routing)

路由(Routing) 在前面的教程中,我们实现了一个简单的日志系统.可以把日志消息广播给多个接收者. 本篇教程中我们打算新增一个功能——使得它能够只订阅消息的一个字集.例如,我们只需要把严重的错误日志信息写入日志文件(存储到磁盘),但同时仍然把所有的日志信息输出到控制台中 绑定(Bindings) 前面的例子,我们已经创建过绑定(bindings),代码如下: $exchange->publish($message, ''); 绑定(binding)是指交换器(exchange)和队列(que

RabbitMQ官方教程四 Routing(GOLANG语言实现)

在上一教程中,我们构建了一个简单的日志记录系统. 我们能够向许多消费者广播日志消息. 在本教程中,我们将向其中添加功能-我们将使仅订阅消息的子集成为可能. 例如,我们将只能将严重错误消息定向到日志文件(以节省磁盘空间),同时仍然能够在控制台上打印所有日志消息. 绑定 在前面的示例中,我们已经在创建绑定. 您可能会想起类似的代码: err = ch.QueueBind( q.Name, // queue name "", // routing key "logs",

RabbitMQ(四) -- Routing

RabbitMQ(四) -- Routing `rabbitmq`可以通过路由选择订阅者来发布消息. Bindings 通过下面的函数绑定Exchange与消息队列: channel.queue_bind(exchange=exchange_name, queue=queue_name) 可以通过添加`routing_key`来做路由选择,如下: channel.queue_bind(exchange=exchange_name, queue=queue_name, routing_key='b

asp.net MVC 5 路由 Routing

ASP.NET MVC ,一个适用于WEB应用程序的经典模型 model-view-controller 模式.相对于web forms一个单一的整块,asp.net mvc是由连接在一起的各种代码层所组成. 最近又接触了关于asp.net mvc的项目,又重拾以前的记忆,感觉忘了好多,特此记录. 首先,来说说路由Routing. ASP.NET MVC 不再是要依赖于物理页面了,你可以使用自己的语法自定义URL,通过这些语法来指定资源和操作.语法通过URL模式集合表达,也称为路由. 路由是代表

Asp.Net MVC4.0 官方教程 入门指南之四--添加一个模型

Asp.Net MVC4.0 官方教程 入门指南之四--添加一个模型 在这一节中,你将添加用于管理数据库中电影的类.这些类是ASP.NET MVC应用程序的模型部分. 你将使用.NET Framework框架下的实体框架(Entity Framework)数据访问技术,与模型类协同工作.实体框架(常简称为EF)支持一种称之为编码先行(Code First)的开发模式.编码先行使你通过编写简单的类(简称为POCO类,全称为"plain-old CLR objects."),来创建模型对象

Erlang Rebar 使用指南之四:依赖管理

Erlang Rebar 使用指南之四:依赖管理 全文目录: https://github.com/rebar/rebar/wiki 本章链接: https://github.com/rebar/rebar/wiki/Dependency-management 1 rebar依赖定义 Rebar取得和构建符合OTP/Rebar规范的项目.如果项目包含子项目,Rebar会自动递归地构建它们. 项目的依赖在project_dir/rebar.config中定义,形式如下: {deps, [Depen

springboot rabbitmq direct exchange和topic exchange 写法上关于路由键的区别

这是direct exchange写法中消息发送写法,可见下图红色框中路由键是queue队列中定义的路由键 这是topic exchange写法中消息发送写法,可见下图红色框中路由键是exchange交换中定义的路由键,这与上面的定义的是队列中的路由键有区别. 原文地址:https://www.cnblogs.com/Andrew520/p/8428802.html

RabbitMQ各种交换机类型Exchange Types介绍

最新版本的RabbitMQ有四种交换机类型,分别是Direct exchange.Fanout exchange.Topic exchange.Headers exchange. 一.Direct Exchange 它处理路由键.需要将一个队列绑定到交换机上,要求该消息与一个特定的路由键完全匹配.这是一个完整的匹配.如果一个队列绑定到该交换机上要求路由键 “dog”,则只有被标记为“dog”的消息才被转发,不会转发dog.puppy,也不会转发dog.guard,只会转发dog. 二.Fanou