Part1.2 、RabbitMQ -- Publish/Subscribe 【发布和订阅】

python 目录

(一)、交换 (Exchanges)

-- 1.1 武sir 经典 Exchanges 案例展示。

(二)、临时队列( Temporary queues )

(三)、绑定(Bindings)

(四)、汇总(Putting it all together)



python系列之 RabbitMQ -- Publish/Subscribe 【发布和订阅】

>>前面的部分我们创建了一个工作队列(work queue). 设想是每个任务都能分发到一个 workerqueue】,这一部分我们将会做一些完全不同的事情

-- 我们将会分发一个消息到多个消费方(consumer),这种模式被誉为 发布/订阅(publish/subscribe)模式

>>为了阐明这种模式,我们将要创建一个简单的日志系统。

由两部分程序组成 --

第一部分将要发布日志消息,第二部分接收并打印 

在我们的日志系统中每个接收程序(receiver)将接收消息并复制消息内容,这样我们将会运行一个receiver  记录日志到磁盘;与此同时我们运行另一个receiver输入日志到屏幕查看。

本质上,发布日志消息将会广播到所有的 receivers。


前奏:交换 (Exchanges) 【武SIR前-序曲】

exchange类型可用: direct , topic , headers 和 fanout 。 
我们将要对最后一种进行讲解 --- fanout

一、消息发布订阅

发布订阅和简单的消息队列区别在于,发布订阅会将消息发送给所有的订阅者,而消息队列中的数据被消费一次便消失。所以,RabbitMQ实现发布和订阅时,会为每一个订阅者创建一个队列,而发布者发布消息时,会将消息放置在所有相关队列中。
exchange type = fanout

1、发布者\生产者
import pikaimport sys

connection = pika.BlockingConnection(pika.ConnectionParameters(        host=‘localhost‘))channel = connection.channel()

channel.exchange_declare(exchange=‘logs‘,                         type=‘fanout‘)

message = ‘ ‘.join(sys.argv[1:]) or "info: Hello World!"channel.basic_publish(exchange=‘logs‘,                      routing_key=‘‘,                      body=message)print(" [x] Sent %r" % message)connection.close()

2、订阅者\消费者。

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(        host=‘localhost‘))channel = connection.channel()

channel.exchange_declare(exchange=‘logs‘,                         type=‘fanout‘)

result = channel.queue_declare(exclusive=True)queue_name = result.method.queue

channel.queue_bind(exchange=‘logs‘,                   queue=queue_name)

print(‘ [*] Waiting for logs. To exit press CTRL+C‘)

def callback(ch, method, properties, body):    print(" [x] %r" % body)

channel.basic_consume(callback,                      queue=queue_name,                      no_ack=True)

channel.start_consuming()

(一)、交换 (Exchanges)

在前面的部分我们从一个队列来回发送并接收消息。现在介绍Rabbit中的完整消息模式。

让我们迅速回顾一下前面的章节的内容:

>>  一个 Producer 是一个发送消息的用户程序

>>  一个 queue 是一个存储消息的缓冲区

>>  一个 Consumer 是一个接收消息的用户程序

RabbitMQ的消息模式的核心思想是:


    生产者(Producer) 从不将消息直接发送到一个队列(queue)中,实际上,很多时候生产者甚至不知道一个消息是否要分发到所有队列.

    换言之,

    生产者(producer)只能够发送消息到一个交换区 Exchange.. 对exchange发送消息是 -->分发到所有队列照中.

    从生产者producer方接收消息,从另一边将消息push到队列中。exchange必须清楚知道接收到的消息要如何处理. 是要将消息发送到一个指定queue? 是要将消息发送到多个queue? 还是丢弃? 

    这个规则需要通过 exchange type 来定义。

这里有几种exchange类型可用: direct  ,  topic ,  headers  和 fanout  

我们将要对最后一种进行讲解  ---  fanout  。 我们创建一个 这种类型的exchange 并命名为logs:

channel.exchange_declare(exchange=‘logs‘,
                         type=‘fanout‘)

fanout exchange非常简单,你从这个名字中就能猜出来,它将从Producer方收到的消息广播给所有他知道的receiver方。而这正是我们的logger记录所需要的消息。

关于Exchange 的几种模式:

http://blog.csdn.net/songfreeman/article/details/50953288

exchanges列表

使用rabbitmqclt管理工具显示服务器上的exchanges列表

$ sudo rabbitmqctl list_exchanges
Listing exchanges ...
logs      fanout
amq.direct      direct
amq.topic       topic
amq.fanout      fanout
amq.headers     headers
...done.

在这个列表中有一些amq.*  exchange和默认的exchange,这些都是默认创建的,但是这些未必是你所需要的。

匿名的exchange

在前面部分我们知道空的exchange, 但仍然能够发送消息到队列中,只是因为我们使用的是我们定义的空字符串“ ”exchange(默认的exchange)

回忆一下我们之前怎么发布一个消息:

channel.basic_publish(exchange=‘‘,
                      routing_key=‘hello‘, #指定哪个队列。
                      body=message)

这个exchange参数就是这个exchange的名字. 空字符串标识默认的或者匿名的exchange:如果存在routing_key, 消息路由到routing_key指定的队列中。

现在我们可能够发布消息到我们自己命名的exchange:


channel.basic_publish(exchange=‘logs‘,
                      routing_key=‘‘,
                      body=message)

(二)、临时队列( Temporary queues )

你应该记得我们之前使用有一个特定名字的队列( hello、task_queue). 设置队列名对我们来说是至关重要的 --- 我们需要给消费方指定同样的队列名字。 要在生产者和消费者之间共享队列,给队列设置一个名字是非常重要的。

但是这不是我们日志应用的关键,我们希望获取到所有的日志消息,而不是他们的一个子集。我们只对当前活动的消息敢兴趣,对已经发过的旧的消息不关心。为了解决这个问题我们需要做两件事:

首先:

无论什么时候我们连接到Rabbit我们需要一个空的、新的队列。为了实现这个我们可以创建个随机名的队列

或者,

更好的-让服务端选择一个随机的队列名给我们,我们可以不给queue_declare方法设置 queue参数来实现。

result = channel.queue_declare()
  • 这样, result.method.queue 包含一个随机的队列名, 比如:看起来像 amq.gen-JzTY20BRgKO-HjmUJj0wLg.
  • 其次,一旦我们断开consumer连接,这个队列名将自动删除。这里有一个标识设置:


result = channel.queue_declare(exclusive=True)

(三)、绑定(Bindings)

我们已经创建了一个 fanout exchange 和一个队列queue, 现在我们需要告诉exchange发送消息到我们的queue中,  这个exchange与队列queue之间的关系我们成为 绑定(Bindings)


channel.queue_bind(exchange=‘logs‘,
                   queue=result.method.queue) #固定的写法

现在logs exchange 将要发送消息到我们的队列

你可以在Server端通过rabbitmqctl list_bindings命令查看绑定信息

(四)、汇总(Putting it all together)

生产者(Producer)程序发出log消息,和前面介绍的章节没什么太大的区别。

>> 主要的改变是我们想要发送消息到我们指定的logs exchange,而不是之前的那种匿名的exchange,那样的routing_key 还要指定发送的队列 如"myqueue",很死性的。

  • 我们之前使用匿名exchange发送消息时Producer需要提供一个 routing_key,这样就直接绑定到__队列上,跳过了中间件的 exchange。
  • 但当我们指定exchangefanout exchange时,这个值(routing_key)将忽略。

下面是 emit_log.py 脚本:



import pikaimport sys

connection = pika.BlockingConnection(pika.ConnectionParameters(        host=‘localhost‘))channel = connection.channel()

channel.exchange_declare(exchange=‘logs‘,                         type=‘fanout‘)

message = ‘ ‘.join(sys.argv[1:]) or "info: Hello World!"channel.basic_publish(exchange=‘logs‘,                      routing_key=‘‘,                      body=message)print(" [x] Sent %r" % message)connection.close()

如你所见, 当建立连接之后我们定义了一个exchange名logs, 由于发布一个消息到一个不存在的exchange是禁止的,所以这一步是必须有的。

发送消息时,如果还没有队列绑定到这个exchange上的话,消息将会丢失。 但这个对我们来说是OK的;如果还没有消费者(consumer) 监听上我们可以安全的放弃这条消息。


import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(        host=‘localhost‘))channel = connection.channel()

channel.exchange_declare(exchange=‘logs‘,                         type=‘fanout‘)

result = channel.queue_declare(exclusive=True)  # 队列断开后自动删除临时队列queue_name = result.method.queue                # 队列名采用服务端分配的临时队列

channel.queue_bind(exchange=‘logs‘,                   queue=queue_name)

print(‘ [*] Waiting for logs. To exit press CTRL+C‘)

def callback(ch, method, properties, body):    print(" [x] %r" % body)

channel.basic_consume(callback,                      queue=queue_name,                      no_ack=True)

channel.start_consuming()

我们已经完成了,如果你想保存日志到一个文件,只需要打开终端执行:


$ python receive_logs.py > logs_from_rabbit.log

如果你想在屏幕上查看输出的日志,新开一个终端并运行:


$ python receive_logs.py 

当然,发出日志信息:


$ python emit_log.py

使用 rabbitmqlctl list_bindings 你能验证代码确实创建了你想要的binding和队列。运行两个 receive_logs.py 程序你可以看到:


$ sudo rabbitmqctl list_bindings
Listing bindings ...
logs    exchange        amq.gen-JzTY20BRgKO-HjmUJj0wLg  queue           []
logs    exchange        amq.gen-vso0PVvyiRIL2WoV3i48Yg  queue           []
...done.

这个结果的解释非常直白: 从 logs  exchange 出来的数据发送服务端自动分配的到两个队列名中,这也是我们预期的。







来自为知笔记(Wiz)

时间: 2024-11-06 04:27:46

Part1.2 、RabbitMQ -- Publish/Subscribe 【发布和订阅】的相关文章

RabbitMQ - Publish/Subscribe in Java

这次我们试试publish / subscribe模式, 也就是将一个消息发送给多个consumer. 这里用一个简单的小程序来说明publish / subscribe. 由一个provider提供消息,这个消息会被多个consumer接收. consumer对同一个消息做出不同的反应,比如打印.保存到文件.数据库什么的. 之前的例子可能会给人这种感觉: producer将消息发送到队列中,消息缓冲在队列中,consumer从队列获得消息. 但这并不正确. 在rabbit中,producer从

3.6.4 RabbitMQ教程四 - Publish/Subscribe

Publish/Subscribe发布/订阅 What This Tutorial Focuses On In the previous tutorial we created a work queue. The assumption behind a work queue is that each task is delivered to exactly one worker. In this part we'll do something completely different -- we

rabbitmq学习3:Publish/Subscribe

在前面的Work Queue中的消息是均匀分配消息给消费者:如果我想把消息分发给所有的消费者呢?那应当怎么操作呢?这就是要下面提到的Publish/Subscribe(分布/订阅).让我们开始Publish/Subscribe之旅吧! Publish/Subscribe的工作示意图如下: 在上图中的X表示Exchange(交换区);Exchange的类型有:direct , topic , headers 和 fanout Publish/Subscribe的Exchang的类型为fanout;

消息队列 RabbitMQ系列 第四篇:发布/订阅 Publish/Subscribe

上篇中我们实现了Work Queue的创建,在Work Queue背后,其实是rabbitMQ把每条任务消息只发给一个消费者.本篇中我们将要研究如何把一条消息推送给多个消费者,这种模式被称为publish/subscribe(发布/订阅). 为了说明这个模式,我们将会构建一个简单的日志系统.这将会包含两部分程序,第一个是发送日志信息,第二个将会接收并打印它们. 在我们的日志系统里,每个运行的消费者程序都能接收到消息.这样我就运行一个receiver并把日志写到磁盘上,同时我们再运行另外一个消费者

RabbitMQ学习第三记:发布/订阅模式(Publish/Subscribe)

工作队列模式是直接在生产者与消费者里声明好一个队列,这种情况下消息只会对应同类型的消费者. 举个用户注册的列子:用户在注册完后一般都会发送消息通知用户注册成功(失败).如果在一个系统中,用户注册信息有邮箱.手机号,那么在注册完后会向邮箱和手机号都发送注册完成信息.利用MQ实现业务异步处理,如果是用工作队列的话,就会声明一个注册信息队列.注册完成之后生产者会向队列提交一条注册数据,消费者取出数据同时向邮箱以及手机号发送两条消息.但是实际上邮箱和手机号信息发送实际上是不同的业务逻辑,不应该放在一块处

RabbitMQ入门教程(五):扇形交换机发布/订阅(Publish/Subscribe)

原文:RabbitMQ入门教程(五):扇形交换机发布/订阅(Publish/Subscribe) 版权声明:本文为博主原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明. 本文链接:https://blog.csdn.net/vbirdbest/article/details/78628659 分享一个朋友的人工智能教程.比较通俗易懂,风趣幽默,感兴趣的朋友可以去看看. 简介 本节主要演示交换机的广播类型fanout,广播类型不需要routingKey,交换机会将所有

RabbitMQ官方中文入门教程(PHP版) 第三部分:发布/订阅(Publish/Subscribe)

发布/订阅 在上篇教程中,我们搭建了一个工作队列.每个任务之分发给一个工作者(worker).在本篇教程中,我们要做的之前完全不一样——分发一个消息给多个消费者(consumers).这种模式被称为“发布/订阅”. 为了描述这种模式,我们将会构建一个简单的日志系统.它包括两个程序——第一个程序负责发送日志消息,第二个程序负责获取消息并输出内容. 在我们的这个日志系统中,所有正在运行的接收方程序都会接受消息.我们用其中一个接收者(receiver)把日志写入硬盘中,另外一个接受者(receiver

Mina、Netty、Twisted一起学(七):发布/订阅(Publish/Subscribe)

消息传递有很多种方式,请求/响应(Request/Reply)是最常用的.在前面的博文的例子中,很多都是采用请求/响应的方式,当服务器接收到消息后,会立即write回写一条消息到客户端.HTTP协议也是基于请求/响应的方式. 但是请求/响应并不能满足所有的消息传递的需求,有些需求可能需要服务端主动推送消息到客户端,而不是被动的等待请求后再给出响应. 发布/订阅(Publish/Subscribe)是一种服务器主动发送消息到客户端的消息传递方式.订阅者Subscriber连接到服务器客户端后,相当

NATS学习 -- 概念学习之消息(Message)与发布订阅(Publish Subscribe)

1 理论篇 1.1 来自官方的介绍 NATS acts as a central nervous system for distributed systems such as mobile devices, IoT networks, enterprise microservices and cloud native infrastructure. Unlike traditional enterprise messaging systems, NATS provides an always o