RabbitMQ 原文译04--发布和订阅

发布/订阅

在之前的案例中我们创建了一个工作队列,这个工作队列的实现思想就是一个把每一个任务平均分配给每一个执行者,在这个篇文章我们会做一些不一样的东西,把一个消息发送给多个消费者,这种模式就被称作"发布/订阅".

为了说明这个模式,我们将要创建一个简单的日志系统,一个负责发布消息,另外一个负责接收打印他们.

在我们的日志系统中,每一个运行中的接收者副本将都会获得消息,这种方式可以让我们在运行一个接收者直接把消息保存在磁盘的同时,另外一个消费者可以把消息打印到屏幕上.

本质上,发布一个日志消息将会广播给所有的接收者

交换机(Exchanges)

在之前的文章中,我们接受和发送消息都是通过一个队列来完成了,现在是时候引入RabbitMQ的全部工作模型了.

让我们快速回忆一下之前涉及到的模型

--生产者(发布者),是一个负责发送消息的用户应用程序.

--队列,负责存储消息

--消费者(接收者),负责接收消息的用户程序.

RabbitMQ的核心思想是生产者永远不会直接把消息发送给队列,事实上生产者甚至经常不知道一个发出去的消息是否可以有队列去接收它.

相应的,生产者只能消息发送给交换机,交换机的工作机制非常简单,一方面它从生产者那里接收到消息,另一方面它会把消息发送给相应的队列上.交换机必须要知道怎么处理接收到的消息,它应该被放入一个特殊的队列吗?它是否应该被放入多个队列?或者它是否需要被忽略.

处理这工作的方式是通过交换机类型来实现的.

这里有几个可用的交换机类型:direct,topic,headers,fanout 我们将会关注最后一个(fanout),让我们创建一个fanout的交换机,名字叫做‘logs‘

channel.ExchangeDeclare("logs", "fanout");

这个fanout的交换机功能非常简单(你也许已经从名字中猜到了他的方式),把接收到的消息广播给所有已知的队列,这个这是我们的日志系统需要的.

列出RabbitMQ已添加的交换机:

cmd:rabbitmqctl list_exchanges

无命名的交换机:在之前的案例中我们对于交换机一无所知,但是仍然可以把消息发送到队列上,这是因为我们使用的是一个默认的交互机,名字为空(""),回顾一下我们之前发送消息的方式

var message = GetMessage(args);
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "",routingKey: "hello", basicProperties: null,body: body);

第一个参数就是交换机的名称,空字符串表示默认的无命名的交换机:消息通过存在的RoutingKey被发送到队列上.

现在我们发送命名的交换机代替:

var message = GetMessage(args);
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "logs", routingKey: "", basicProperties: null,  body: body);

临时队列

在之前的案例中,我们使用的队列是一个指定了名字的队列(记得hello 和task_queue 吗),给一个队名命名是严格的,我们需要执行者连接的同样的队列来工作,当你想在生产者和消费者之间共享队列的时候指定一个队列名是非常重要的.但是我们的日志系统则不在此列,

我们想要监听到所有的日志消息,而不仅仅是他们的子集,我们也仅仅对当前正在流转的消息感兴趣,而不是老的消息,结局这个问题我们需要2件事情.

首先,无论何时我们连接到队列,我们都需要一个新鲜的,空的队列,为了实现这个目标我们可以每次创建一个随机名称的队列,或者更加便捷的方式--让服务为我们的队列随机命名.

第二,一旦我们断开到消费者到队列的连接,我们需要自动删除队列.

在.Net客户端,我们使用无参的queueDeclare()方法来创建一个随机命名的非持久的,自动删除的排他队列.

var queueName = channel.QueueDeclare().QueueName;

queueName就是一个随机的队列名,如:amq.gen-JzTY20BRgKO-HjmUJj0wLg.

绑定

我们已经创建了一个fanout的交换机和一个队列,现在我们需要告诉我们交换机发送消息到我们的队列,交换机和队列之间的关系叫做绑定.

channel.QueueBind(queue: queueName,exchange: "logs", routingKey: "");

从现在开始logs 交换机将会把消息放入我们的队列当中.

列出队列cmd: rabbitmqctl list_bindings

汇总

负责发送消息的生产者可之前案例基本上是一样的,最大的不同是我们将消息发送到了我们的命名队列logs上而不是默认的队列上,发送的时候我们需要使用routingKey,但是它的值是被fanout交换机忽略的.

EmitLog.cs

class EmitLog
{
    public static void Main(string[] args)
    {
        var factory = new ConnectionFactory() { HostName = "localhost" };
        using(var connection = factory.CreateConnection())
        using(var channel = connection.CreateModel())
        {
            channel.ExchangeDeclare(exchange: "logs", type: "fanout");

            var message = GetMessage(args);
            var body = Encoding.UTF8.GetBytes(message);
            channel.BasicPublish(exchange: "logs", routingKey: "", basicProperties: null, body: body);
            Console.WriteLine(" [x] Sent {0}", message);
        }
        Console.WriteLine(" Press [enter] to exit.");
        Console.ReadLine();
    }

    private static string GetMessage(string[] args)
    {
        return ((args.Length > 0)
               ? string.Join(" ", args)
               : "info: Hello World!");
    }
}

正如你看到的,我们在建立连接之后创建了一个队列,这一步是必须的,因为发送到一个不存在的交换机是不被允许的。

当队列还没有绑定到交换机是发送的消息将会丢失,但是这对我们日志系统来说没有问题,当没有消费者监听时我们可以安全的忽略这个消息。

ReceiveLogs.cs:

class ReceiveLogs
{
    public static void Main()
    {
        var factory = new ConnectionFactory() { HostName = "localhost" };
        using(var connection = factory.CreateConnection())
        using(var channel = connection.CreateModel())
        {
            channel.ExchangeDeclare(exchange: "logs", type: "fanout");

            var queueName = channel.QueueDeclare().QueueName;
            channel.QueueBind(queue: queueName,exchange: "logs",routingKey: "");

            Console.WriteLine(" [*] Waiting for logs.");

            var consumer = new EventingBasicConsumer(channel);
            consumer.Received += (model, ea) =>
            {
                var body = ea.Body;
                var message = Encoding.UTF8.GetString(body);
                Console.WriteLine(" [x] {0}", message);
            };
            channel.BasicConsume(queue: queueName,
                                 noAck: true,
                                 consumer: consumer);

            Console.WriteLine(" Press [enter] to exit.");
            Console.ReadLine();
        }
    }
}

同时运行两个receive,可以看到两个接收端可以同时接收到一个消息。

时间: 2024-10-09 00:08:35

RabbitMQ 原文译04--发布和订阅的相关文章

Part1.2 、RabbitMQ -- Publish/Subscribe 【发布和订阅】

python 目录 (一).交换 (Exchanges) -- 1.1 武sir 经典 Exchanges 案例展示. (二).临时队列( Temporary queues ) (三).绑定(Bindings) (四).汇总(Putting it all together) python系列之 RabbitMQ -- Publish/Subscribe [发布和订阅] >>前面的部分我们创建了一个工作队列(work queue). 设想是每个任务都能分发到一个 worker[queue],这一

RabbitMQ 原文译05--路由

在前一篇文章中我们构建了一个简单的日志系统,我们可以向多个接受者广播消息. 在这篇文章我,我们将要添加一些功能使得针对部分消息的接受成为可能,例如我们只对错误的消息进行磁盘记录,同时又可以把所有的消息打印到屏幕上. 绑定 在之前的案例中,我们已经创建了一个绑定,可以重新调用如下的代码: channel.QueueBind(queue: queueName,exchange: "logs",routingKey: ""); 绑定是交换机和队列之间的关系,可以简单的理解

文成小盆友python-num12 Redis发布与订阅补充,python操作rabbitMQ

本篇主要内容: redis发布与订阅补充 python操作rabbitMQ 一,redis 发布与订阅补充 如下一个简单的监控模型,通过这个模式所有的收听者都能收听到一份数据. 用代码来实现一个redis的订阅者何消费者. 定义一个类: import redis class Redis_helper(): def __init__(self): self.__conn = redis.Redis(host='192.168.11.87') #创建一个连接 def pub(self, mes, c

Python-RabbitMQ消息队列的发布与订阅

RabbitMQ消息队列的发布与订阅类似于广播,一端发送消息,多个客户端可以同时接收到消息 fanout:所有绑定到exchange的queue都可以接收消息 消息发布端 # -*- coding:utf-8 -*- __author__ = "MuT6 Sch01aR" import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1')) channel = conn

RabbitMQ(三) ——发布订阅

RabbitMQ(三) --发布订阅 (转载请附上本文链接--linhxx) 一.概述 RabbitMQ的发布订阅(Publish/Subscribe),其将生产者和消费者进一步解耦,生产者生产消息后,交付给交换机,消费者上线后,主动主动去队列中取数据进行处理.该模式也符合上一节工作队列中的ack.预取等规则. 发布订阅模式如下图所示: 二.交换机(exchange) 生产者生产完消息之后,都是将消息通过channel交给交换机,即生产者并不直接和队列联系.在没有定义交换机的时候,RabbitM

rabbitmq系列三 之发布/订阅

1.发布/订阅 在上篇教程中,我们搭建了一个工作队列,每个任务只分发给一个工作者(worker).在本篇教程中,我们要做的跟之前完全不一样 -- 分发一个消息给多个消费者(consumers).这种模式被称为"发布/订阅". 为了描述这种模式,我们将会构建一个简单的日志系统.它包括两个程序--第一个程序负责发送日志消息,第二个程序负责获取消息并输出内容. 在我们的这个日志系统中,所有正在运行的接收方程序都会接受消息.我们用其中一个接收者(receiver)把日志写入硬盘中,另外一个接受

分布式环境下rabbitmq发布与订阅端

假设rabbitmq配置了集群,且客户端连接rabbitmq-server通过lvs实现HA但一般情况下不建议做LB.在分布式系统的环境下,由于节点的非预知性,使用spring amqp模板进行配置不足以灵活到满足弹性扩展的需求,因此,更加方便的方式是通过rabbitmq原生的java client进行订阅和发布.在我们的场景中,某些节点需要同时是发布端和订阅端以便做到弹性扩展,无需额外的配置.以fanout类型为例,如下所示: 发布端: /** * @Title: Send.java * @P

RabbitMQ官方中文入门教程(PHP版) 第三部分:发布/订阅(Publish/Subscribe)

发布/订阅 在上篇教程中,我们搭建了一个工作队列.每个任务之分发给一个工作者(worker).在本篇教程中,我们要做的之前完全不一样——分发一个消息给多个消费者(consumers).这种模式被称为“发布/订阅”. 为了描述这种模式,我们将会构建一个简单的日志系统.它包括两个程序——第一个程序负责发送日志消息,第二个程序负责获取消息并输出内容. 在我们的这个日志系统中,所有正在运行的接收方程序都会接受消息.我们用其中一个接收者(receiver)把日志写入硬盘中,另外一个接受者(receiver

自学总结redis第三部分(安全性、主从、哨兵、事物、持久化、发布与订阅、虚拟内存)

八.redis的安全性 因为redis速度相当快,所以在一台比较好的服务器下,一个外部用户在一秒内可以进行15W次的密码尝试,这意味着需要设定非常强大的密码来防止暴力破解. 可以通过设置密码以及登录redis方式来操作,具体参考 九.redis主从复制 9.1简介 1.Master可以拥有多个slave. 2.多个slave可以连接同一个master外,还可以连接到其他的slave. 3.主从复制不会阻塞master,在同步数据时,master可以继续处理client请求. 4.提供系统的伸缩性