rabbitmq使用(三)

Publish/Subscribe

In the previous tutorial we created a work queue. The assumption behind a work queue is that each task is delivered to exactly one worker. In this part we‘ll do something completely different -- we‘ll deliver a message to multiple consumers. This pattern is known as "publish/subscribe".

To illustrate the pattern, we‘re going to build a simple logging system. It will consist of two programs -- the first will emit log messages and the second will receive and print them.

In our logging system every running copy of the receiver program will get the messages. That way we‘ll be able to run one receiver and direct the logs to disk; and at the same time we‘ll be able to run another receiver and see the logs on the screen.

Essentially, published log messages are going to be broadcast to all the receivers.

Exchanges

In previous parts of the tutorial we sent and received messages to and from a queue. Now it‘s time to introduce the full messaging model in Rabbit.

Let‘s quickly go over what we covered in the previous tutorials:

  • producer is a user application that sends messages.
  • queue is a buffer that stores messages.
  • consumer is a user application that receives messages.

The core idea in the messaging model in RabbitMQ is that the producer never sends any messages directly to a queue. Actually, quite often the producer doesn‘t even know if a message will be delivered to any queue at all.

Instead, the producer can only send messages to an exchange. An exchange is a very simple thing. On one side it receives messages from producers and the other side it pushes them to queues. The exchange must know exactly what to do with a message it receives. Should it be appended to a particular queue? Should it be appended to many queues? Or should it get discarded. The rules for that are defined by the exchange type.

There are a few exchange types available: direct, topic, headers and fanout. We‘ll focus on the last one -- the fanout. Let‘s create an exchange of that type, and call it logs:

channel.exchange_declare(exchange=‘logs‘,
                         type=‘fanout‘)

The fanout exchange is very simple. As you can probably guess from the name, it just broadcasts all the messages it receives to all the queues it knows. And that‘s exactly what we need for our logger.

Now, we can publish to our named exchange instead:

1 channel.basic_publish(exchange=‘logs‘,
2                       routing_key=‘‘,
3                       body=message)

Temporary queues

As you may remember previously we were using queues which had a specified name (remember hello and task_queue?). Being able to name a queue was crucial for us -- we needed to point the workers to the same queue. Giving a queue a name is important when you want to share the queue between producers and consumers.

But that‘s not the case for our logger. We want to hear about all log messages, not just a subset of them. We‘re also interested only in currently flowing messages not in the old ones. To solve that we need two things.

Firstly, whenever we connect to Rabbit we need a fresh, empty queue. To do it we could create a queue with a random name, or, even better - let the server choose a random queue name for us. We can do this by not supplying the queue parameter to queue_declare:

1 result = channel.queue_declare()

At this point result.method.queue contains a random queue name. For example it may look like amq.gen-JzTY20BRgKO-HjmUJj0wLg.

Secondly, once we disconnect the consumer the queue should be deleted. There‘s an exclusive flag for that:

1 result = channel.queue_declare(exclusive=True)

Bindings

We‘ve already created a fanout exchange and a queue. Now we need to tell the exchange to send messages to our queue. That relationship between exchange and a queue is called abinding.

1 channel.queue_bind(exchange=‘logs‘,
2                    queue=result.method.queue)

From now on the logs exchange will append messages to our queue.

Putting it all together

The producer program, which emits log messages, doesn‘t look much different from the previous tutorial. The most important change is that we now want to publish messages to our logs exchange instead of the nameless one. We need to supply a routing_key when sending, but its value is ignored for fanout exchanges. Here goes the code for emit_log.py script:

 1 #!/usr/bin/env python
 2 import pika
 3 import sys
 4
 5 connection = pika.BlockingConnection(pika.ConnectionParameters(
 6         host=‘localhost‘))
 7 channel = connection.channel()
 8
 9 channel.exchange_declare(exchange=‘logs‘,
10                          type=‘fanout‘)
11
12 message = ‘ ‘.join(sys.argv[1:]) or "info: Hello World!"
13 channel.basic_publish(exchange=‘logs‘,
14                       routing_key=‘‘,
15                       body=message)
16 print " [x] Sent %r" % (message,)
17 connection.close()

(emit_log.py source)

As you see, after establishing the connection we declared the exchange. This step is neccesary as publishing to a non-existing exchange is forbidden.

The messages will be lost if no queue is bound to the exchange yet, but that‘s okay for us; if no consumer is listening yet we can safely discard the message.

The code for receive_logs.py:

 1 #!/usr/bin/env python
 2 import pika
 3
 4 connection = pika.BlockingConnection(pika.ConnectionParameters(
 5         host=‘localhost‘))
 6 channel = connection.channel()
 7
 8 channel.exchange_declare(exchange=‘logs‘,
 9                          type=‘fanout‘)
10
11 result = channel.queue_declare(exclusive=True)
12 queue_name = result.method.queue
13
14 channel.queue_bind(exchange=‘logs‘,
15                    queue=queue_name)
16
17 print ‘ [*] Waiting for logs. To exit press CTRL+C‘
18
19 def callback(ch, method, properties, body):
20     print " [x] %r" % (body,)
21
22 channel.basic_consume(callback,
23                       queue=queue_name,
24                       no_ack=True)
25
26 channel.start_consuming()

(receive_logs.py source)

We‘re done. If you want to save logs to a file, just open a console and type:

$ python receive_logs.py > logs_from_rabbit.log

If you wish to see the logs on your screen, spawn a new terminal and run:

$ python receive_logs.py

And of course, to emit logs type:

$ python emit_log.py

Using rabbitmqctl list_bindings you can verify that the code actually creates bindings and queues as we want. With two receive_logs.py programs running you should see something like:

$ sudo rabbitmqctl list_bindings
Listing bindings ...
logs    exchange        amq.gen-JzTY20BRgKO-HjmUJj0wLg  queue           []
logs    exchange        amq.gen-vso0PVvyiRIL2WoV3i48Yg  queue           []
...done.
时间: 2024-10-09 00:51:24

rabbitmq使用(三)的相关文章

RabbitMQ(三) -- Publish/Subscribe

RabbitMQ(三) -- Publish/Subscribe `rabbitmq`支持一对多的模式,一般称为发布/订阅.也就是说,生产者产生一条消息后,`rabbitmq`会把该消息分发给所有的消费者. Exchanges 之前的教程中,仅仅使用了基本的消息模型: 生产者产生消息 把消息添加到消息队列 消费者接收消息 而在`rabbitmq完整的消息模型`中,并不是这样的.事实上,生产者并不知道消息是否发送到队列,而是把消息直接发送给`Exchanges`. `Exchanges`的功能理解

RabbitMQ(三) ——发布订阅

RabbitMQ(三) --发布订阅 (转载请附上本文链接--linhxx) 一.概述 RabbitMQ的发布订阅(Publish/Subscribe),其将生产者和消费者进一步解耦,生产者生产消息后,交付给交换机,消费者上线后,主动主动去队列中取数据进行处理.该模式也符合上一节工作队列中的ack.预取等规则. 发布订阅模式如下图所示: 二.交换机(exchange) 生产者生产完消息之后,都是将消息通过channel交给交换机,即生产者并不直接和队列联系.在没有定义交换机的时候,RabbitM

rabbitMQ第三篇:采用不同的交换机规则

在上一篇我们都是采用发送信息到队列然后队列把信息在发送到消费者,其实实际情况并非如此,rabbitMQ其实真正的思想是生产者不发送任何信息到队列,甚至不知道信息将发送到哪个队列.相反生产者只能发送信息到交换机,交换机接收到生产者的信息,然后按照规则把它推送到对列中,交换机是如何做处理他接收到的信息,并怎么样发送到特定的队列,那么这一篇主要是讲解交换机的规则. 一:发布/订阅 在上一篇说到的队列都指定了名称,但是现在我们不需要这么做,我们需要所有的日志信息,而不只是其中的一个.如果要做这样的队列,

RabbitMQ入门(三) —— fanout交换器

这篇文章主要介绍下fanout类型的exchange.fanout,顾名思义,就是像风扇吹面粉一样,吹得到处都是.如果使用fanout类型的exchange,那么routing key就不重要了.因为我们向exchange发送消息时用不着指定routing key,它会把消息给每个绑定到该exchange的queue发一份. package com.jaeger.exchange.fanout; import java.io.IOException; import java.util.concu

python使用rabbitMQ介绍三(发布订阅模式)

一.模式介绍 在前面的例子中,消息直接发送到queue中. 现在介绍的模式,消息发送到exchange中,消费者把队列绑定到exchange上. 发布-订阅模式是把消息广播到每个消费者,每个消费者接收到的消息都是相同的. 一个生产者,多个消费者,每一个消费者都有自己的一个队列,生产者没有将消息直接发送到队列,而是发送到了交换机,每个队列绑定交换机,生产者发送的消息经过交换机,到达队列,实现一个消息被多个消费者获取的目的.需要注意的是,如果将消息发送到一个没有队列绑定的exchange上面,那么该

RabbitMQ学习三

Work Queues 在上一篇文章中,send.py程序向名为hello的队列发送消息,receive.py程序向名为hello的队列接收消息.这一节中,我们将创建一个Work Queue用于将那些比较耗时的任务分布到多个worker上. Work Queues工作队列或者叫做Task Queues任务队列的主要概念就是为了避免立刻执行一个耗费资源的任务并且不得不等待它执行完成.取而代之的是,我们将这个任务调度到以后去执行. 我们封装一个任务为一个消息并发送这个消息到队列.一个work pro

RabbitMQ(三):任务分发机制

 在上篇文章中,我们解决了从发送端(Producer)向接收端(Consumer)发送"Hello World"的问题.在实际的应用场景中,这是远远不够的.从本篇文章开始,我们将结合更加实际的应用场景来讲解更多的高级用法. 当有Consumer需要大量的运算时,RabbitMQ Server需要一定的分发机制来balance每个Consumer的load.试想一下,对于web application来说,在一个很多的HTTP request里是没有时间来处理复杂的运算的,只能通过后

RabbitMQ(三)

官方的使用教程(测试运行) 1."Hello World!" We're about to tell the server to deliver us the messages from the queue. Since it will push us messages asynchronously, we provide a callback. That is what EventingBasicConsumer.Received event handler does. 我们将告诉服

RabbitMQ系列三 (深入消息队列)

消息持久化是 RabbitMQ 最为人津津乐道的特性之一, RabbitMQ 能够在付出最小的性能代价的基础上实现消息的持久化,最大的奥秘就在于 RabbitMQ 多层消息队列的设计上.下面,本文就从 MessageQueue 的设计和消息在 MessageQueue 的生命周期两个方面全面介绍  RabbitMQ 的消息队列. RabbitMQ完全实现了AMQP协议,类似于一个邮箱服务.Exchange负责根据ExchangeType和RoutingKey将消息投递到对应的消息队列中,消息队列

rabbitmq系列三 之发布/订阅

1.发布/订阅 在上篇教程中,我们搭建了一个工作队列,每个任务只分发给一个工作者(worker).在本篇教程中,我们要做的跟之前完全不一样 -- 分发一个消息给多个消费者(consumers).这种模式被称为"发布/订阅". 为了描述这种模式,我们将会构建一个简单的日志系统.它包括两个程序--第一个程序负责发送日志消息,第二个程序负责获取消息并输出内容. 在我们的这个日志系统中,所有正在运行的接收方程序都会接受消息.我们用其中一个接收者(receiver)把日志写入硬盘中,另外一个接受