我的RabbitMQ学习之旅3 (发布/订阅)

在前面的教程中,我们创建了一个工作队列。工作队列背后的假设是,每个任务只被传递给一个工作人员。在这一部分,我们将做一些完全不同的事情 - 我们会向多个消费者传递信息。这种模式被称为“发布/订阅”。

本质上,发布的日志消息将被广播给所有的接收者

生产者 是发送消息的用户的应用程序。

队列 是存储消息的缓冲器。

消费者 是接收消息的用户的应用程序。

RabbitMQ中消息传递模型的核心思想是生产者永远不会将任何消息直接发送到队列中。实际上,生产者通常甚至不知道一个消息是否会被传送到任何队列中。

相反,制作人只能发送消息给交易所。交换是一件非常简单的事情。一方面它接收来自生产者的消息,另一方则推动他们排队。交易所必须知道如何处理收到的消息。是否应该附加到特定的队列?是否应该附加到许多队列?还是应该丢弃。这些规则是由交换类型定义的 。

有几种可用的交换类型: direct, topic, headers , fanout

channel.ExchangeDeclare("logs", "fanout");

fanout 交换非常简单。正如你可能从名字中猜到的那样,它只是将所有收到的消息广播到它所知道的所有队列中。这正是我们记录器所需要的。

列出交易所

要列出服务器上的交换,命令

rabbitmqctl list_exchanges

在这个列表中将会有一些amq。* 交换和默认(未命名)交换。这些是默认创建的。

默认交换

在本教程的前面部分,我们对交换一无所知,但仍能够将消息发送到队列。这是可能的,因为我们使用了一个默认的交换,我们用空字符串(“”)来标识。

    channel.BasicPublish(exchange:"",//默认交换
                         routingKey:"hello",
                         basicProperties:null,
                         body:"发送内容");

第一个参数是交易所的名称。空字符串表示默认或无名交换:消息被路由到具有由 routingKey 指定的名称的队列(如果存在)

我们可以发布到我们的命名交换

channel.BasicPublish(exchange:"logs",
                     routingKey:"",
                     basicProperties:null,
                     body:"发送内容");

临时队列

正如你以前可能记得我们使用的是具有指定名称的队列(请记住hello和task_queue?)。能够列出队列对我们至关重要 - 我们需要指出工人队列。当你想分享生产者和消费者之间的队列时,给队列一个名字是很重要的。

但是我们的记录器并不是这样。我们希望了解所有日志消息,而不仅仅是其中的一部分。我们也只对目前流动的消息感兴趣,而不是旧消息。要解决这个问题,我们需要两件事。

首先,每当我们连接到 Rabbit ,我们需要一个新的,空的队列。要做到这一点,我们可以创建一个随机名称的队列,或者,甚至更好 - 让服务器为我们选择一个随机队列名称。

其次,一旦我们断开消费者,队列应该被自动删除。

在.NET客户端中,当我们不向queueDeclare() 提供参数时,我们 使用生成的名称创建一个非持久的独占的自动删除队列:

var queueName = channel.QueueDeclare().QueueName;

此时queueName包含一个随机队列名称。例如,它可能看起来像amq.gen-JzTY20BRgKO-HjmUJj0wLg。

绑定

我们已经创建了一个 fanout 交换和一个队列。现在我们需要告诉交换机将消息发送到我们的队列。交换和队列之间的关系被称为绑定

channel.QueueBind(queue: queueName,
                  exchange: "logs",
                  routingKey: "");

从现在起,日志交换将把消息附加到我们的队列中。

列出绑定

您可以使用列出现有的绑定命令

rabbitmqctl list_bindings

把它放在一起

发出日志消息的生产者程序与前面的教程没有什么不同。最重要的变化是,我们现在要发布消息到我们的日志交换,而不是无名的。发送时我们需要提供一个路由密钥,但是对于扇出交换,它的值将被忽略。

1.建立 发布者(生产者)

public static void Main(string[] args)
    {
        var factory = new ConnectionFactory() { HostName = "localhost" };
        using(var connection = factory.CreateConnection())
        using(var channel = connection.CreateModel())
        {
            channel.ExchangeDeclare(exchange: "logs", type: "fanout");

            var message = GetMessage(args);
            var body = Encoding.UTF8.GetBytes(message);
            channel.BasicPublish(exchange: "logs",
                                 routingKey: "",
                                 basicProperties: null,
                                 body: body);
            Console.WriteLine(" [x] Sent {0}", message);
        }

        Console.WriteLine(" Press [enter] to exit.");
        Console.ReadLine();
    }

    private static string GetMessage(string[] args)
    {
        return ((args.Length > 0)
               ? string.Join(" ", args)
               : "info: Hello World!");
    }

如你所见,建立连接后,我们宣布交换。这一步是必要的,因为发布到一个不存在的交易所是被禁止的。

如果没有队列绑定到交换机上,消息将会丢失,但对我们来说没关系; 如果没有消费者正在听,我们可以放心地丢弃消息。

1.建立 订阅者(消费者)

 public static void Main()
    {
        var factory = new ConnectionFactory() { HostName = "localhost" };
        using(var connection = factory.CreateConnection())
        using(var channel = connection.CreateModel())
        {       // 设置交易所的名称 以及类型
            channel.ExchangeDeclare(exchange: "logs", type: "fanout");

            //让系统生成队列名称
            var queueName = channel.QueueDeclare().QueueName;
            channel.QueueBind(queue: queueName,
                              exchange: "logs",
                              routingKey: "");

            Console.WriteLine(" [*] Waiting for logs.");

            var consumer = new EventingBasicConsumer(channel);
            consumer.Received += (model, ea) =>
            {
                var body = ea.Body;
                var message = Encoding.UTF8.GetString(body);
                Console.WriteLine(" [x] {0}", message);
            };
            channel.BasicConsume(queue: queueName,
                                 autoAck: true,
                                 consumer: consumer);

            Console.WriteLine(" Press [enter] to exit.");
            Console.ReadLine();
        }
    }

如果您想将日志保存到文件中,只需打开一个控制台并输入:

cd ReceiveLogs
dotnet run > logs_from_rabbit.log
如果你想看到屏幕上的日志,产生一个新的终端,并运行:
cd ReceiveLogs
dotnet run

要发射日志类型:
cd EmitLog
dotnet run

使用rabbitmqctl list_bindings,你可以验证代码实际上是否创建了绑定和队列。有两个接受程序运行,你应该看到类似于:
sudo rabbitmqctl list_bindings
# => Listing bindings ...
# => logs    exchange        amq.gen-JzTY20BRgKO-HjmUJj0wLg  queue           []
# => logs    exchange        amq.gen-vso0PVvyiRIL2WoV3i48Yg  queue           []
# => ...done.

结果的解释很简单:交换日志中的数据转到两个带有服务器分配名称的队列中。这正是我们的意图。
 


原文地址:https://www.cnblogs.com/missliu/p/8079307.html

时间: 2024-07-29 14:36:39

我的RabbitMQ学习之旅3 (发布/订阅)的相关文章

RabbitMQ学习之旅(一)

RabbitMQ学习总结(一) RabbitMQ简介 RabbitMQ是一个消息代理,其接收并转发消息.类似于现实生活中的邮局:你把信件投入邮箱的过程,相当于往队列中添加信息,因为所有邮箱中的信件最终都会汇集到邮局中:当邮递员把你的新建发送给收件人的时候,相当于消息的转发. RabbitMQ中的常见术语 生产者(Provider):生产者负责生产消息,并将其发送到消息队列中 队列(Queue):消息代理(Proxy)角色,从生产者那里接收消息,并将其转发到消费者进行消费.队列主要受限于主机的内存

RabbitMQ系列教程之三:发布/订阅(Publish/Subscribe)

(本教程是使用Net客户端,也就是针对微软技术平台的) 在前一个教程中,我们创建了一个工作队列.工作队列背后的假设是每个任务会被交付给一个[工人].在这一部分我们将做一些完全不同的事情--我们将向多个[消费者]传递信息.这种模式被称为"发布/订阅". 为了说明这种模式,我们将构建一个简单的日志系统.它将包括两个程序,第一个将发出日志消息,第二个将接收并打印它们. 在我们的日志系统中每个接收程序的运行副本都会得到消息.这样我们就可以运行一个接收者程序,将日志记录到磁盘:同时我们可以运行另

学习javascript设计模式之发布-订阅(观察者)模式

1.发布-订阅模式又叫观察者模式,它定义对象之间一种一对多的依赖关系. 2.如何实现发布-订阅模式 2-1.首先指定好发布者 2-2.给发布者添加一个缓冲列表,用户存放回调函数以便通知订阅者 2-3.最后发布消息时候,发布者会遍历这个缓存列表,依次触发里面存放的订阅者回调函数 例子: var salesOffice = {};salesOffice.clientList = [];salesOffice.listen = function(key,fn){    if(!this.clientL

分享一个分布式消息总线,基于.NET Socket Tcp的发布-订阅框架,附代码下载

一.分布式消息总线 在很多MIS项目之中都有这样的需求,需要一个及时.高效的的通知机制,即比如当使用者A完成了任务X,就需要立即告知使用者B任务X已经完成,在通常的情况下,开发人中都是在使用者B所使用的程序之中写数据库轮循代码,这样就会产品一个很严重的两个问题,第一个问题是延迟,轮循机制要定时执行,必须会引起延迟,第二个问题是数据库压力过大,当进行高频度的轮循会生产大量的数据库查询,并且如果有大量的使用者进行轮循,那数据库的压力就更大了. 那么在这个时间,就需要一套能支持发布-订阅模式的分布式消

RabbitMQ学习(三)订阅/发布

RabbitMQ学习(三)订阅/发布 1.RabbitMQ模型 前面所学都只用到了生产者.队列.消费者.如上图所示,其实生产者并不直接将信息传输到队列中,在生产者和队列中间有一个交换机(Exchange),我们之前没有使用到交换机是应为我们没有配置交换机,使用了默认的交换机. 有几个可供选择的交换机类型:直连交换机(direct), 主题交换机(topic), (头交换机)headers和 扇型交换机(fanout) 这里我们使用扇形交换机做一个简单的广播模型:一个生产者和多个消费者接受相同消息

RabbitMQ学习第三记:发布/订阅模式(Publish/Subscribe)

工作队列模式是直接在生产者与消费者里声明好一个队列,这种情况下消息只会对应同类型的消费者. 举个用户注册的列子:用户在注册完后一般都会发送消息通知用户注册成功(失败).如果在一个系统中,用户注册信息有邮箱.手机号,那么在注册完后会向邮箱和手机号都发送注册完成信息.利用MQ实现业务异步处理,如果是用工作队列的话,就会声明一个注册信息队列.注册完成之后生产者会向队列提交一条注册数据,消费者取出数据同时向邮箱以及手机号发送两条消息.但是实际上邮箱和手机号信息发送实际上是不同的业务逻辑,不应该放在一块处

RabbitMQ学习(三).NET Client之Publish/Subscribe

3 Publish/Subscribe Sending messages to many consumers at once Python | Java | Ruby | PHP| C# 转载请注明出处:jiq?钦's technical Blog Publish/Subscribe (using the .NET Client) 前面的教程我们已经学习了如何创建工作队列,工作队列背后的假设是每一个任务都被准确地递送给一个worker进行处理.这里我们将介绍完全不同的模式,即一个消息可以递送给多

Android热修复学习之旅——HotFix完全解析

在上一篇博客Android热修复学习之旅开篇--热修复概述中,简单介绍了各个热修复框架的原理,本篇博客我将详细分析QQ空间热修复方案. Android dex分包原理介绍 QQ空间热修复方案基于Android dex分包基础之上,简单概述android dex分包的原理就是:就是把多个dex文件塞入到app的classloader之中,但是android dex拆包方案中的类是没有重复的,如果classes.dex和classes1.dex中有重复的类,当classes.dex和classes1

RabbitMQ学习之:(十)AMQP和RabbitMQ介绍 (转贴+我的评论)

From: http://www.infoq.com/cn/articles/AMQP-RabbitMQ 准备开始 高级消息队列协议(AMQP1)是一个异步消息传递所使用的应用层协议规范.作为线路层协议,而不是API(例如JMS2),AMQP客户端能够无视消息的来源任意发送和接受信息.现在,已经有相当一部分不同平台的服务器3和客户端可以投入使用4. AMQP的原始用途只是为金融界提供一个可以彼此协作的消息协议,而现在的目标则是为通用消息队列架构提供通用构建工具.因此,面向消息的中间件(MOM)系