Rabbitmq -Publish_Subscribe模式- python编码实现

what is Exchanges ??

Let‘s quickly go over what we covered in the previous tutorials:

  • producer is a user application that sends messages.
  • queue is a buffer that stores messages.
  • consumer is a user application that receives messages.

The core idea in the messaging model in RabbitMQ is that the producer never sends any messages directly to a queue. Actually, quite often the producer doesn‘t even know if a message will be delivered to any queue at all.

# 个人简单翻译下: 这个核心的概念就是说在rabbitmy消息模式里面,消息生产者不会直接把消息发送到队列里面,实际上,生产者甚至都不知道信息如何被投送到所有的队列里面。

Instead, the producer can only send messages to an exchange. An exchange is a very simple thing. On one side it receives messages from producers and the other side it pushes them to queues. The exchange must know exactly what to do with a message it receives. Should it be appended to a particular queue? Should it be appended to many queues? Or should it get discarded. The rules for that are defined by the exchange type.

# 个人简单翻译下:

如下图所示:反而,这个生产这仅仅能够发送消息到exchangs上,这个exchangs 是 个非常简单的东西,它一边接受来自生产者的消息,一边把消息推送

给队列,这个exchangs 必须确切的知道如何处理接受到的消息,它收到了消息,应该把它添加到特别的队列里面吗?应该把它添加到许多队列吗?或者把它丢弃,这些规则都被exchange的类型定义了。

how many exchanges types ??

There are a few exchange types available: direct, topic, headers and fanout(扇出). We‘ll focus on the last one -- the fanout. Let‘s create an exchange of that type, and call it logs:

channel.exchange_declare(exchange=‘logs‘,type=‘fanout‘)

  

what is fanout(one of exchanges type)??

The fanout exchange is very simple. As you can probably guess from the name, it just broadcasts all the messages it receives to all the queues it knows. And that‘s exactly what we need for our logger.

#简单的翻译下就是说: 扇出这个交换器非常简单,你都能够从这个名字里面猜出来,它就是把他接受到的信息广播到它所知道的队列里,这恰好是我们logger需要的

----------------------------------------------------------------------------------

Listing exchanges

To list the exchanges on the server you can run the ever useful rabbitmqctl:

$ sudo rabbitmqctl list_exchanges
Listing exchanges ...
logs      fanout
amq.direct      direct
amq.topic       topic
amq.fanout      fanout
amq.headers     headers
...done.

In this list there are some amq.* exchanges and the default (unnamed) exchange. These are created by default, but it is unlikely you‘ll need to use them at the moment.

Nameless exchange

In previous parts of the tutorial we knew nothing about exchanges, but still were able to send messages to queues. That was possible because we were using a default exchange, which we identify by the empty string ("").

#简单的翻译下: 在前面的教程中我们没有指定exchange也能够发送消息到队列里面,那很有可能我们使用了默认的exchange

Recall how we published a message before:

channel.basic_publish(exchange=‘‘,routing_key=‘hello‘,body=message)

The exchange parameter(参数) is the the name of the exchange. The empty string denotes the default or nameless exchange: messages are routed to the queue with the name specified byrouting_key, if it exists.

# 这个exchange的参数是exchange的名字,这里没写为空的话代表着使用默认的exchange或者不可能命名的exchange。消息能够被路由到队列是因为使用 routing_key这个特殊的字段。

此时我们修改下代码

Now, we can publish to our named exchange instead:

channel.basic_publish(exchange=‘logs‘,routing_key=‘‘,body=message)

  

Temporary queues

As you may remember previously we were using queues which had a specified name (rememberhello and task_queue?). Being able to name a queue was crucial for us -- we needed to point the workers to the same queue. Giving a queue a name is important when you want to share the queue between producers and consumers.

# 简单的翻译下:你可能还记得我们前面在使用队列的时候指定了特殊的管道名字,这样命名是为了对我们有用,我们需要把那些运行的程序去指向同一个队列,命名一个队列非常重要当我们在生产者和消费者共享队列的时候

But that‘s not the case for our logger. We want to hear about all log messages, not just a subset of them. We‘re also interested only in currently flowing messages not in the old ones. To solve that we need two things.

Firstly, whenever we connect to Rabbit we need a fresh, empty queue. To do it we could create a queue with a random name, or, even better - let the server choose a random queue name for us. We can do this by not supplying the queue parameter to queue_declare:

result=channel.queue_declare()

  

At this point result.method.queue contains a random queue name. For example it may look likeamq.gen-JzTY20BRgKO-HjmUJj0wLg.

Secondly, once we disconnect the consumer the queue should be deleted. There‘s an exclusive flag for that:

result=channel.queue_declare(exclusive=True)

  

# translate : 上面说的这样指定命名队列的情况不适用于我们的logger,我们想要监听所有的日志信息,不是监听一个子集,我们同样对只在当前信息流的信息感兴趣,而不是对旧信息感兴趣,为了解决这个问题,我们需要做两件事,第一,无论什么时候我们链接rabbit 我们都能够得到新鲜的消息,空队列,为了做到它,我们需要一个随机命名的队列,或者更好的是,让服务自己选择一个随机队列名字给我们,我们能够做到这点,通过提供这个队列参数queue_declare,在这点上,result.method.queue 包含了随机的队列名字,例如它可能看起来想amq.gen-JzTY20BRgKO-HjmUJj0wLg...第二,一旦我们和消费者市区了连接在,这个队列就应该删除,这里有一个独有的标志:

===================================================================

Bindings

We‘ve already created a fanout exchange and a queue. Now we need to tell the exchange to send messages to our queue. That relationship between exchange and a queue is called a binding

# translate:我们已经创建了扇出的交换器和一个队列,现在我们需要告诉这个交换器发送消息到我们队列,这时,交换器和队列他们之间两者的关系成为捆版

channel.queue_bind(exchange=‘logs‘,queue=result.method.queue)

  

From now on the logs exchange will append messages to our queue.

Listing bindings

You can list existing bindings using, you guessed it, rabbitmqctl list_bindings.

Putting it all together

The producer program, which emits log messages, doesn‘t look much different from the previous tutorial. The most important change is that we now want to publish messages to our logs exchange instead of the nameless one. We need to supply a routing_key when sending, but its value is ignored for fanout exchanges. Here goes the code for emit_log.py script:

# translate: 这个生产这程序,能够发送日志消息,看起来不不同于前面的教程,这个最重要的改变是我们现在想发送广播信息到我们的日志交换器而不是未命名的交换器,我们需要提供一个routeing_key当我们发送的时候,但是它的值是可以忽略的对于扇出交换器,这里能够得到这个代码:

github地址:https://github.com/rabbitmq/rabbitmq-tutorials/blob/master/python/emit_log.py

#!/usr/bin/env pythonimport pika
import sys

connection=pika.BlockingConnection(pika.ConnectionParameters(
        host=‘localhost‘))
channel=connection.channel()

channel.exchange_declare(exchange=‘logs‘,
                         type=‘fanout‘)

message=‘ ‘.join(sys.argv[1:]) or"info: Hello World!"channel.basic_publish(exchange=‘logs‘,
                      routing_key=‘‘,
                      body=message)
print(" [x] Sent %r"%message)
connection.close()

As you see, after establishing the connection we declared the exchange. This step is neccesary as publishing to a non-existing exchange is forbidden.

The messages will be lost if no queue is bound to the exchange yet, but that‘s okay for us; if no consumer is listening yet we can safely discard the message.The code for receive_logs.py:

# translate: 正如你所看到的,在建立连接后我们可以声明这个交换器,这一步的话广播到不存在的交换器是会被拒绝的,

这个信息会被丢失如果没有队列去限制这个交换器,但是这对我们来说是OK,如果没有消费者去监听的话,我们就可以安全的丢弃这个消息,这个代码如下:

The code for receive_logs.py:

#!/usr/bin/env pythonimport pika

connection=pika.BlockingConnection(pika.ConnectionParameters(
        host=‘localhost‘))
channel=connection.channel()

channel.exchange_declare(exchange=‘logs‘,
                         type=‘fanout‘)

result=channel.queue_declare(exclusive=True)
queue_name=result.method.queuechannel.queue_bind(exchange=‘logs‘,
                   queue=queue_name)

print(‘ [*] Waiting for logs. To exit press CTRL+C‘)

defcallback(ch, method, properties, body):
    print(" [x] %r"%body)

channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)

channel.start_consuming()

代码地址:https://github.com/rabbitmq/rabbitmq-tutorials/blob/master/python/receive_logs.py

We‘re done. If you want to save logs to a file, just open a console and type:

$ python receive_logs.py > logs_from_rabbit.log

 If you wish to see the logs on your screen, spawn a new terminal and run:

$ python receive_logs.py

  And of course, to emit logs type:

$ python emit_log.py

  Using rabbitmqctl list_bindings you can verify that the code actually creates bindings and queues as we want. With two receive_logs.py programs running you should see something like:

$ sudo rabbitmqctl list_bindings
Listing bindings ...
logs    exchange        amq.gen-JzTY20BRgKO-HjmUJj0wLg  queue           []
logs    exchange        amq.gen-vso0PVvyiRIL2WoV3i48Yg  queue           []
...done.

  

The interpretation of the result is straightforward: data from exchange logs goes to two queues with server-assigned names. And that‘s exactly what we intended.

To find out how to listen for a subset of messages, let‘s move on to tutorial 4

时间: 2024-10-07 11:46:17

Rabbitmq -Publish_Subscribe模式- python编码实现的相关文章

Rabbitmq -Routeing模式- python编码实现

(using the pika 0.10.0 Python client) In the previous tutorial we built a simple logging system. We were able to broadcast log messages to many receivers. In this tutorial we're going to add a feature to it - we're going to make it possible to subscr

使用rabbitmq rpc 模式

服务器端 安装 ubuntu 16.04 server 安装 rabbitmq-server 设置 apt 源 curl -s https://packagecloud.io/install/repositories/rabbitmq/rabbitmq-server/script.python.sh | bash 使用 apt-get install rabbitmq-server 安装 rabbitmq 服务器 按键Y或者 y 确认安装 rabbitmq-server 简单管理 rabbitm

VS2013+PTVS,python编码问题

1.调试,input('中文'),乱码2.调试,print('中文'),正常3.不调试,input('中文'),正常4.不调试,print('中文'),正常 页面编码方式已经加了"# -- coding:utf-8 --" 这是调试模式下的截图: VS2013+PTVS,python编码问题 >> python 这个答案描述的挺清楚的:http://www.goodpm.net/postreply/python/1010000008988526/VS2013PTVSpyth

python编码问题,从隐隐作痛到除去病根

查阅的资料链接 python编码为什么这么蛋疼 python2.7手册str函数 python源文件默认编码与内部默认编码 1.源文件默认编码为ASCII,所以,如果不显示声明当前代码用什么编码写的,python会用ASCII去解析,如果源文件中有UTF-8编码,由于ASCII不能翻译UTF8编码,则会报错了. #file test.py 使用UTF8保存 a='a' b='好' 运行后 SyntaxError: Non-ASCII character '\xe5' in file test.p

PYTHON编码处理-str与Unicode的区别

一篇关于str和Unicode的好文章 整理下python编码相关的内容 注意: 以下讨论为Python2.x版本, Py3k的待尝试 开始 用python处理中文时,读取文件或消息,http参数等等 一运行,发现乱码(字符串处理,读写文件,print) 然后,大多数人的做法是,调用encode/decode进行调试,并没有明确思考为何出现乱码 所以调试时最常出现的错误 错误1 Traceback (most recent call last): File "<stdin>"

Python编码规则

1. 命名规则 1.1 变量名.包名.模块名 变量名通常有字母.数字和下划线组成,且首字母必须是字母或下划线,并且不能使用python的保留字:包名.模块名通常用小写字母 1.2 类名.对象名 类名首字母用大写,其他字母采用小写:对象名用小写字母.类的属性和方法名以对象作为前缀,对象通过操作符"."访问属性和方法.类的私有变量.私有方法以两个下划线作为前缀. l.3 函数名     函数名通常采用小写,并用下划线或单词首字母大写来增加名称的可读性,导入的函数以模块名作为前缀. 2. 模

Python 编码

Python 编码 ASCII.Unicode.UTF-8 以及 gbk 在具体说明 Python 编码之前,先来理清 ASCII.Unicode.UTF-8.gbk 究竟是什么? 这边仅简单介绍下,具体请百度. ASCII:是现今最通用的单字节编码系统.ASCII(仅1~127) 仅可代表英文.数字及一些符号等,如,A 的 ASCII 码为65(十进制). Unicode:为了解决传统的字符编码方案的局限而产生,为每种语言中的每个字符设定了统一并且唯一的二进制编码,以满足跨语言.跨平台进行文本

说说Python编码规范

前言 已有近两个月没有发表过文章了,前段时间外甥和女儿过来这边渡暑假,平常晚上和周末时间都陪着她们了,趁这个周末有空,再抽空再把这块拾起来.         这么久没写了,再次拿起键盘,想想,发表些什么呢,想起上次公司的代码评审委员会下周其中一个议题是关于Python编码规范的整理,那就趁热打铁,整理一份关于Python编码规范的文章,也为那些写Python的人,提供一些编码注意的一些事项或者说是参考吧. 编码规范的作用         规范故明思义,就是通过不断的总结,吸取好的点,从而形成的一

python 编码问题:&#39;ascii&#39; codec can&#39;t encode characters in position 的解决方案

问题描述: Python在安装时,默认的编码是ascii,当程序中出现非ascii编码时,python的处理常常会报这样的错UnicodeDecodeError: 'ascii' codec can't decode byte 0x?? in position 1: ordinal not in range(128),python没办法处理非ascii编码的,此时需要自己设置将python的默认编码,一般设置为utf8的编码格式. 查询系统默认编码可以在解释器中输入以下命令: Python代码