RabbitMQ官网教程---发布/订阅

(使用python客户端pika 0.9.8)

在前面的教程中我们创建了一个工作队列。假设在一个工作队列后面是每一个被传递给正确的工作者的任务。在这部分中我们将做一些完全不同的事情--我们将给多个消费者传递一个消息。这种模式被称为“发布/订阅”。

为了阐明这个模式,我们将构建一个简单的日志系统。它将由两个程序构成--第一个将发出日志消息并且第二个将接收并且打印它们。

在我们的日志系统中每个运行的接收程序副本将获得这个消息。用这种方式我们将可以运行一个接收器并且直接日志到磁盘;而且同时我们将运行另一个接收器并且在屏幕上看日志。

本质上,被发布的消息将广播到所有的接收器。

交换

在这个教程的前面部分我们从一个队列中发送和接收消息。现在该到介绍在Rabbit中的完整的消息模型了。

我们来快速的复习一下前面教程涉及到的一些东西:

?生产者是一个发送消息的用户应用程序。

?队列是一个存储消息的缓冲

?消费者是一个接收消息的用户应用程序。

在RabbitMQ中在消息模型中的核心思想是生产者永远不直接发送任何消息给一个队列。事实上,生产者甚至一点都不知道是否一个消息将被传递给任何队列。

相反,生产者只能发送消息到一个exchange。一个exchange是一件非常简单的事情。一边它从生产者中接收消息另一边它把消息推送到队列中。这个exchange必须知道它接收到的这个消息是做什么的。它应该被追加到一个特别的队列中吗?它应该被追加到许多队列中吗?或者它应该被取消吗?这些规则通过exchange的type被定义。

有几个exchange有效的type:direct,topic,headers和fanout。我们将聚焦于最后一个--fanout。我们来创建一个这种类型的exchange,并且调用它的日志:

channel.exchange_declare(exchange=‘logs‘,type=‘fanout‘)

fanout 的exchange是非常简单的。正如你可能从名字中猜到的,它仅仅是广播所有的它接收到的消息给所有它知道的队列。而且那就是我们的日志器需要的。

Listing exchanges

为了列出在服务器上的exchange你可以使用rabbitmqctl运行:

$ sudo rabbitmqctl list_exchanges
Listing exchanges...
logs fanout
amq.direct direct
amq.topic  topic
amq.fanout  fanout
amq.headers  headers
... done.

在这个列表中有一些amq.*的exchange和默认的exchange。那些事默认被创建的,但是你不可能在同时使用它们。

Nameless exchange

在这个教程的前面部分我们关于exchange完全不了解,但是它任何能给队列发送消息。那是可能的因为我们正在使用一个默认的exchange,这个默认的exchange是我们用空字符串标记的。

在我们发布一个消息以前回调:

channel.basic_publish(exchange=‘‘,routing_key=‘hello‘, body=message)

这个exchange参数是以exchange命名的。空字符串表示默认或者未命名的exchange:如果消息存在,消息被指定的routing_key名字路由到队列。

现在,我们可以发布到一个被命名的exchange:

channel.basic_publish(exchange=‘log‘,routing_key=‘‘,body=message)

临时队列

像你也许还记得我们前面使用的用一个指定的名字(记得hello和task_queue)队列。能够命名一个队列对我们是非常重要的--我们需要把工作者指向相同的队列。当你想在生产者和消费者之间共享这个队列的时候,为一个队列取一个名字是重要的。

但是对我们的日志器那不是重要的。我们想接听到关于所有日志的消息,不仅仅是一个它们的子集。我们也对下面的消息而不是在老的队列里的感兴趣。解决这个问题我们需要两件事情。

首先,无论我们什么时候连接到Rabbit我们需要一个最新的空的队列。为了做这个我们需要用一个随机的名字创建一个队列或者甚至更好的-让服务器选择一个随机队列为我们命名。我们能够通过给queue_declare不应用queue参数来做这件事情:

result=channel.queue_declare()

在这时,result.method.queue包含了一个随机队列名字。例如它也许看起来像amq.gen-JzTY20BRgKO-HjmUJj0wLg.

其次,一旦我们跟消费者连接断掉了那就删除这个队列。exclusive是为这个的标识:

result=channel.queue_declare(exclusive=True)

绑定

我们已经创建了fanout类型的exchange和一个队列。现在我们需要告诉这个exchange给我们的队列发送消息。在exchange和队列之间的关系被叫做binding。

channel.queue_bind(exchange=‘logs‘,queue=result.method.queue)

从现在起这个日志exchange将给我们的队列追加消息。

Listing Bindings

你可以列出已经存的绑定,你猜到了,rabbitmqctl list_bindings。

把代码合在一起

生产者程序,它发出日志消息,跟前面的教程看起来没有很大程度的不同。最终要的改变是我们现在想发布消息给我们的日志exhcnage代替没有名字的exchange。我们需要在发送的时候提供一个routing_key,但是它的值会因为fanout类型exchange而被忽略。这是emit_log.py脚本的代码:

#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host=‘localhost‘))
channel = connection.channel()

channel.exchange_declare(exchange=‘logs‘,
                         type=‘fanout‘)

message = ‘ ‘.join(sys.argv[1:]) or "info: Hello World!"
channel.basic_publish(exchange=‘logs‘,
                      routing_key=‘‘,
                      body=message)
print " [x] Sent %r" % (message,)
connection.close()

正如你看到的,在建立了连接之后我们定义了这个exchange。这一步是必须的,同样地发布到一个不存在的exchange是被拒绝的。

如果还没有队列绑定到这个exchange,那么消息将被丢失,但是对我们那是没问题的;如果到现在还没有消费者监听我们可以安全的取消这个消息。

receive_logs.pyt的代码:

#!/usr/bin/env python
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host=‘localhost‘))
channel = connection.channel()

channel.exchange_declare(exchange=‘logs‘,
                         type=‘fanout‘)

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

channel.queue_bind(exchange=‘logs‘,
                   queue=queue_name)

print ‘ [*] Waiting for logs. To exit press CTRL+C‘

def callback(ch, method, properties, body):
    print " [x] %r" % (body,)

channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)

channel.start_consuming()

?

时间: 2025-01-13 13:28:23

RabbitMQ官网教程---发布/订阅的相关文章

RabbitMQ官网教程---路由

(使用python客户端pika 0.9.8) 在前面的教程中我们构建了一个简单的日志系统.我们可以给许多接收者广播日志消息. 在这个教程中我们将添加一个特性给它-我们将订阅仅仅一种消息子集成为可能.例如,我们可以指挥仅仅错误消息到日志文件(保存到磁盘空间),它任然可以在控制台打印所有的日志消息. 绑定 在前面的例子中我们已经创建了绑定,你可以重新调用像这样的代码: channel.queue_bind(exchange=exchange_name, queue=queue_name) 绑定是e

RabbitMQ官网教程---工作队列

(使用python的pika 0.9.8客户端) 在第一个教程中,我们写了一个从命名的队列中发送和接收消息的程序.在这一个里面,我们将创建一个Work Queue来用于在多个工作者之间分类耗时任务. Work Queues后面的主要思想是避免理解做一些资源密集的任务并且需要等待它完成.我们用计划任务在后面完成它.我们把一个任务封装为一个消息发送给队列.一个在后台执行的工作者队列将弹出任务并且完全的执行这个工作.当你运行许多工作者的时候,这个任务将在它们之间共享. 这种概念在web应用程序中是特别

RabbitMQ官网教程---主题

(使用python客户端pika 0.9.8) 在前面的教程中我们提高了我们的日志系统.我们使用一个direct类型的exchange代替使用一个fanout类型的exchange的虚拟广播,获取一个可选的接收日志. 尽管使用direct类型的exchange提高了我们的系统,但是它任然是有限的-它不基于多个判断标准路由. 在我们的日志系统中,我们也许想不仅订阅基于严重程度的日志,而且还有基于源的生产日志.你也许从unix的syslog工具中知道了这个概念,它基于严重程度路由日志和设施. 那将给

RabbitMQ(三) ——发布订阅

RabbitMQ(三) --发布订阅 (转载请附上本文链接--linhxx) 一.概述 RabbitMQ的发布订阅(Publish/Subscribe),其将生产者和消费者进一步解耦,生产者生产消息后,交付给交换机,消费者上线后,主动主动去队列中取数据进行处理.该模式也符合上一节工作队列中的ack.预取等规则. 发布订阅模式如下图所示: 二.交换机(exchange) 生产者生产完消息之后,都是将消息通过channel交给交换机,即生产者并不直接和队列联系.在没有定义交换机的时候,RabbitM

Java Web框架-----------------------struts2(官网教程版HelloWorld)

Java Web框架------------------struts2(官网教程版HelloWorld) 我们都知道Struts是Java Web 常用的三大框架之一,另外还有Spring.Hibernate.学习Struts很有必 要!那么怎么学习呢?我的建议是: 1.对于英语能力还可以的人,学技术我们要从官网文档学起,再结合中文别人所写的论文.博客,视频 等.这样可以达到事半功倍的效果. 2.对于阅读英语稍微吃力的人,我们可以下载有道词典,再来本计算机专业英语书,不懂就查,但是, 我们决不能

Django学习笔记 官网教程纠正 代码

原文: Django学习笔记 官网教程纠正 代码 Django学习笔记 4.模板初学中,照书例django book 出现以下异常 raise ImportError("Settings cannot be imported, because environment variable %s is undefined." % ENVIRONMENT_VARIABLE) ImportError: Settings cannot be imported, because environmen

[pytorch] 官网教程+注释

pytorch官网教程+注释 Classifier import torch import torchvision import torchvision.transforms as transforms transform = transforms.Compose( [transforms.ToTensor(), transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))]) trainset = torchvision.datasets.CIF

rabbitmq系列三 之发布/订阅

1.发布/订阅 在上篇教程中,我们搭建了一个工作队列,每个任务只分发给一个工作者(worker).在本篇教程中,我们要做的跟之前完全不一样 -- 分发一个消息给多个消费者(consumers).这种模式被称为"发布/订阅". 为了描述这种模式,我们将会构建一个简单的日志系统.它包括两个程序--第一个程序负责发送日志消息,第二个程序负责获取消息并输出内容. 在我们的这个日志系统中,所有正在运行的接收方程序都会接受消息.我们用其中一个接收者(receiver)把日志写入硬盘中,另外一个接受

Unity 官网教程 -- Multiplayer Networking

教程网址:https://unity3d.com/cn/learn/tutorials/topics/multiplayer-networking/introduction-simple-multiplayer-example?playlist=29690 1. 新建一个3D工程,在菜单 "File"  - "Save Scenes" ,保存场景为 "Main".注意,保存的文件放在Assets目录下. 2.菜单"GameObject&