【译】RabbitMQ:发布-订阅(Publish/Subscribe)

在前一篇教程中,我们创建了一个工作队列,我们假设在工作队列后的每一个任务都只被调度给一个消费者。在这一部分,我们将做一些完全不一样的事情,调度同一条消息给多个消费者,也就是有名的“发布-订阅”模式。为了阐述这种模式,我们将构建一个简单的日志系统。该系统将由两部分组成:一部分发送日志消息,另一部分接收并且打印日志消息,在这个日志系统中,每一份运行着的接收程序都将会收到消息。这样我们可以运行一个接收者把日志写入到磁盘中,同时可以运行另一个接收者将日志打印到显示器上面。也就是说,发布的日志消息会被广播到所有的接收者。

交换器

在前面的教程中,我们发送消息到队列,然后从队列中接收消息。现在开始介绍RabbitMQ完整的消息模式。

让我们快速的复习一下在前面的教程中讲过的内容:

  • 生产者是一个发送消息的应用程序。
  • 队列是存储消息的缓存。
  • 消费者是一个接收消息的应用程序。

RabbitMQ消息模式的核心是生产者从不直接发送消息到队列。事实上,生产者往往不知道他产生的消息会被分发到哪些队列,它只能将消息发送到一个交换器。交换器非常简单,它一方面从生产者接收消息,另一方面又将消息压入队列中。交换器必须清楚的知道要用接收到的消息做什么,是应当追加到某个指定的队列?或者追加到很多队列?或者应当丢弃?要完成这些的规则都被定义在交换器的类型中。

有几种可用的交换器类型:direct、topic、headers和fanout。本文主要关注最后一种类型:fanout,让我们创建一个这种类型的交换器,命名为logs:

1 channel.ExchangeDeclare("logs", "fanout");

类型为fanout的交换器非常简单,顾名思义,它会广播所有收到的消息到它知道的所有的队列,而这也正是我们的日志系统所需要的。

交换器清单

为了展示服务器上交换器的清单,你可以运行在任何时候都特别有用的rabbitmqctl:

 1 $ sudo rabbitmqctl list_exchanges
 2 Listing exchanges ...
 3         direct
 4 amq.direct      direct
 5 amq.fanout      fanout
 6 amq.headers     headers
 7 amq.match       headers
 8 amq.rabbitmq.log        topic
 9 amq.rabbitmq.trace      topic
10 amq.topic       topic
11 logs    fanout
12 ...done.

在清单里,有一些amp.*样式的交换器和一个默认(未命名)的交换器,这些都是默认创建的,但并不是说你马上就需要使用它们。

匿名交换器

在前面的教程中我们并不知晓交换器的任何信息,但是任然可以将消息发送到队列中,那是因为我们使用了默认的交换器,使用空字符串表示("")。

回忆一下之前是如何发布消息的:

1 var message = GetMessage(args);
2 var body = Encoding.UTF8.GetBytes(message);
3 channel.BasicPublish(exchange: "",
4                      routingKey: "hello",
5                      basicProperties: null,
6                      body: body);

第一个参数就是交换器的名称,空字符串指代的是默认交换器或者是匿名交换器,如果队列存在,消息将通过指定的routingKey路由到队列。

现在我们可以将消息发布到上面定义的命名交换器了:

1 var message = GetMessage(args);
2 var body = Encoding.UTF8.GetBytes(message);
3 channel.BasicPublish(exchange: "logs",
4                      routingKey: "",
5                      basicProperties: null,
6                      body: body);

临时队列

你或许还记得我们之前使用的有指定名称的队列(还记得hello和task_queue么?)。能为队列命名对我们来说是至关重要的,我们需要指定给消费者相同的队列。当你想在生产者和消费者间共享队列时,给队列指定一个名字就显得特别重要了。

但是这并不是我们日志系统的问题。我们希望能监听到所有消息,而不仅仅是其中一个子集;我们对当前流入的消息感兴趣而不是之前的旧信息。为了解决这个问题,我们需要做两件事:第一、无论何时连接到RabbitMQ,我们需要一个新的空队列,为此我们可以创建一个拥有随机名称的队列或者更好的是直接让RabbitMQ服务替我们生成一个随机名称;第二、一旦消费者断开连接,队列应当被自动删除。

在.NET 客户端,我们通过提供无参数的QueueDeclare()函数可以创建一个不持久化、独占的、自动删除的拥有随机名称的队列:

1 var queueName = channel.QueueDeclare().QueueName;

这样queueName就是一个随机的队列名称,看起来会是这样的:amq.gen-JzTY20BRgKO-HjmUJj0wLg。

绑定

我们已经创建了一个fanout类型的交换器和一个队列,现在需要告诉交换器把消息发送到我们的队列。交换器和队列的关系就叫做绑定。

1 channel.QueueBind(queue: queueName,
2                   exchange: "logs",
3                   routingKey: "");

到目前为止,交换器logs将能添加消息到我们的队列中了。

绑定清单

你可以通过rabbitmqctl list_bingdings命令查看绑定清单。

组合在一起

发送日志的生产者程序和之前教程里面的没有太多不同,最重要的改变是现在我们希望将消息发送到logs交换器,而不是之前的匿名交换器。当发送消息的时候,我们需要指定一个routingKey,但是在使用fanout类型交换器的时候,它的值将被忽略。下面是EmitLog.cs文件里面的代码:

 1 using System;
 2 using RabbitMQ.Client;
 3 using System.Text;
 4
 5 class EmitLog
 6 {
 7     public static void Main(string[] args)
 8     {
 9         var factory = new ConnectionFactory() { HostName = "localhost" };
10         using(var connection = factory.CreateConnection())
11         using(var channel = connection.CreateModel())
12         {
13             channel.ExchangeDeclare(exchange: "logs", type: "fanout");
14
15             var message = GetMessage(args);
16             var body = Encoding.UTF8.GetBytes(message);
17             channel.BasicPublish(exchange: "logs",
18                                  routingKey: "",
19                                  basicProperties: null,
20                                  body: body);
21             Console.WriteLine(" [x] Sent {0}", message);
22         }
23
24         Console.WriteLine(" Press [enter] to exit.");
25         Console.ReadLine();
26     }
27
28     private static string GetMessage(string[] args)
29     {
30         return ((args.Length > 0)
31                ? string.Join(" ", args)
32                : "info: Hello World!");
33     }
34 }

如你所见,在创建链接之后我们申明了交换器,这一步用于禁止发布到不存在的交换器是很有必要的。如果没有队列绑定到交换器发布的消息将会丢失,这是没有问题的;如果没有消费者监听消息,我们可以安全的销毁它。

ReceiveLog.cs中的代码:

 1 using System;
 2 using RabbitMQ.Client;
 3 using RabbitMQ.Client.Events;
 4 using System.Text;
 5
 6 class ReceiveLogs
 7 {
 8     public static void Main()
 9     {
10         var factory = new ConnectionFactory() { HostName = "localhost" };
11         using(var connection = factory.CreateConnection())
12         using(var channel = connection.CreateModel())
13         {
14             channel.ExchangeDeclare(exchange: "logs", type: "fanout");
15
16             var queueName = channel.QueueDeclare().QueueName;
17             channel.QueueBind(queue: queueName,
18                               exchange: "logs",
19                               routingKey: "");
20
21             Console.WriteLine(" [*] Waiting for logs.");
22
23             var consumer = new EventingBasicConsumer(channel);
24             consumer.Received += (model, ea) =>
25             {
26                 var body = ea.Body;
27                 var message = Encoding.UTF8.GetString(body);
28                 Console.WriteLine(" [x] {0}", message);
29             };
30             channel.BasicConsume(queue: queueName,
31                                  noAck: true,
32                                  consumer: consumer);
33
34             Console.WriteLine(" Press [enter] to exit.");
35             Console.ReadLine();
36         }
37     }
38 }

像之前那样编译,工作就完成了。

1 $ csc /r:"RabbitMQ.Client.dll" EmitLogs.cs
2 $ csc /r:"RabbitMQ.Client.dll" ReceiveLogs.cs

如果你想将日志保存到文件中,打开控制台然后输入:

1 $ ReceiveLogs.exe > logs_from_rabbit.log

如果你想在屏幕上看到日志,打开一个新的终端,执行下面的代码:

1 $ ReceiveLogs.exe

当然,发送日志输入:

1 $ EmitLog.exe

使用rabbitmqctl list_bindings命令,可以看到代码确如我们希望的那样创建了绑定和队列。如果同时运行两个消费者(ReceiveLogs.cs)你将能看到下面这样的信息:

1 $ sudo rabbitmqctl list_bindings
2 Listing bindings ...
3 logs    exchange        amq.gen-JzTY20BRgKO-HjmUJj0wLg  queue           []
4 logs    exchange        amq.gen-vso0PVvyiRIL2WoV3i48Yg  queue           []
5 ...done.

结果非常的直观:数据从交换器logs发送到两个服务自动指定名称的队列,这正是我们之前预期的。

要了解如何监听消息的子集,让我们进入下一篇。

原文链接:http://www.rabbitmq.com/tutorials/tutorial-three-dotnet.html

时间: 2024-10-10 06:55:55

【译】RabbitMQ:发布-订阅(Publish/Subscribe)的相关文章

RabbitMQ入门教程(五):扇形交换机发布/订阅(Publish/Subscribe)

原文:RabbitMQ入门教程(五):扇形交换机发布/订阅(Publish/Subscribe) 版权声明:本文为博主原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明. 本文链接:https://blog.csdn.net/vbirdbest/article/details/78628659 分享一个朋友的人工智能教程.比较通俗易懂,风趣幽默,感兴趣的朋友可以去看看. 简介 本节主要演示交换机的广播类型fanout,广播类型不需要routingKey,交换机会将所有

消息队列 RabbitMQ系列 第四篇:发布/订阅 Publish/Subscribe

上篇中我们实现了Work Queue的创建,在Work Queue背后,其实是rabbitMQ把每条任务消息只发给一个消费者.本篇中我们将要研究如何把一条消息推送给多个消费者,这种模式被称为publish/subscribe(发布/订阅). 为了说明这个模式,我们将会构建一个简单的日志系统.这将会包含两部分程序,第一个是发送日志信息,第二个将会接收并打印它们. 在我们的日志系统里,每个运行的消费者程序都能接收到消息.这样我就运行一个receiver并把日志写到磁盘上,同时我们再运行另外一个消费者

3.6.4 RabbitMQ教程四 - Publish/Subscribe

Publish/Subscribe发布/订阅 What This Tutorial Focuses On In the previous tutorial we created a work queue. The assumption behind a work queue is that each task is delivered to exactly one worker. In this part we'll do something completely different -- we

rabbitmq学习3:Publish/Subscribe

在前面的Work Queue中的消息是均匀分配消息给消费者:如果我想把消息分发给所有的消费者呢?那应当怎么操作呢?这就是要下面提到的Publish/Subscribe(分布/订阅).让我们开始Publish/Subscribe之旅吧! Publish/Subscribe的工作示意图如下: 在上图中的X表示Exchange(交换区);Exchange的类型有:direct , topic , headers 和 fanout Publish/Subscribe的Exchang的类型为fanout;

RabbitMQ 发布/订阅

我们会做一些改变,就是把一个消息发给多个消费者,这种模式称之为发布/订阅(类似观察者模式). 为了验证这种模式,我们准备构建一个简单的日志系统.这个系统包含两类程序,一类程序发动日志,另一类程序接收和处理日志. 在我们的日志系统中,每一个运行的接收者程序都会收到日志.然后我们实现,一个接收者将接收到的数据写到硬盘上,与此同时,另一个接收者把接收到的消息展现在屏幕上.本质上来说,就是发布的日志消息会转发给所有的接收者. 1.转发器(Exchanges) RabbitMQ消息模型的核心理念是生产者永

.Net下RabbitMQ发布订阅模式实践

一.概念AMQP,即Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计.消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然.AMQP的主要特征是面向消息.队列.路由(包括点对点和发布/订阅).可靠性.安全.RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如:Python.Ruby..NET.Java.JMS.C.PHP.ActionScri

RabbitMQ 发布订阅持久化

RabbitMQ是一种重要的消息队列中间件,在生产环境中,稳定是第一考虑.RabbitMQ厂家也深知开发者的声音,稳定.可靠是第一考虑,为了消息传输的可靠性传输,RabbitMQ提供了多种途径的消息持久化保证:Exchange持久化.Queue持久化及Message的持久化.以保证RabbitMQ在退出或Crash等异常情况下,消息不会丢失.RabbitMQ提供了简单的参数配置来实现持久化操作. 简单说明一下各种持久化方式:(描述代码采用的是Rabbit.Client  SDK,  C#代码)

RabbitMQ发布订阅模式

这个可能是消息队列中最重要的队列了,其他的都是在它的基础上进行了扩展.功能实现:一个生产者发送消息,多个消费者获取消息(同样的消息),包括一个生产者,一个交换机,多个队列,多个消费者. 思路解读(重点理解): (1)一个生产者,多个消费者(2)每一个消费者都有自己的一个队列(3)生产者没有直接发消息到队列中,而是发送到交换机(4)每个消费者的队列都绑定到交换机上(5)消息通过交换机到达每个消费者的队列该模式就是Fanout Exchange(扇型交换机)将消息路由给绑定到它身上的所有队列以用户发

RabbitMQ - 发布订阅

这次我们试试publish / subscribe模式,也就是将一个消息发送给多个consumer. 这里用一个简单的小程序来说明publish / subscribe.由一个provider提供消息,这个消息会被多个consumer接收.consumer对同一个消息做出不同的反应,比如打印.保存到文件.数据库什么的. 之前的例子可能会给人这种感觉:producer将消息发送到队列中,消息缓冲在队列中,consumer从队列获得消息. 但这并不正确.在rabbit中,producer从来不会直接

Redis 命令参考——PubSub(发布订阅)

PubSub(发布订阅)PUBLISH PUBLISH channel message 将信息 message 发送到指定的频道 channel . 可用版本: >=2.0.0 时间复杂度: O(N+M),其中 N 是频道 channel 的订阅者数量,而 M 则是使用模式订阅(subscribed patterns)的客户端的数量. 返回值: 接收到信息 message 的订阅者数量. # 对没有订阅者的频道发送信息 redis>publish bad_channel "can a