RabbitMQ官网教程---工作队列

(使用python的pika 0.9.8客户端)

在第一个教程中,我们写了一个从命名的队列中发送和接收消息的程序。在这一个里面,我们将创建一个Work Queue来用于在多个工作者之间分类耗时任务。

Work Queues后面的主要思想是避免理解做一些资源密集的任务并且需要等待它完成。我们用计划任务在后面完成它。我们把一个任务封装为一个消息发送给队列。一个在后台执行的工作者队列将弹出任务并且完全的执行这个工作。当你运行许多工作者的时候,这个任务将在它们之间共享。

这种概念在web应用程序中是特别有用的,因为在那里它不可能在一个短的HTTP请求窗口中处理一个复杂的任务。

准备

在这个教程的前面我们发送了一个包含"Hello World!"的消息。现在我们将发送字符串来代表复杂的任务。我们没有像重置一个图片大小或者渲染pdf文件的工作任务,因此我们就通过伪装假装我们很忙-通过使用time.sleep()方法。我们将在字符串中取许多远点作为复杂性;每个远点将对"work"占用两秒。例如,一个捏造的任务通过Hello...被描述的将花费3秒。

我们将从我们前面的例子里稍微修改send.py代码,允许任意的消息从命令行发送。这个程序将给我们的工作队列计划任务,因此我们把它命名为new_task.py:

import sys

message = ‘ ‘.join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(exchange=‘‘,routing_key=‘hello‘,body=message)
print "[x] sent %r"%(message,)

我们的就receive.py脚本也需要一些改变;它需要给在消息体重的每一个圆点捏造一个2秒的工作。它将从队列中弹出消息并且执行任务,因此我们就叫它worker.py:

import time

def callback(ch, method, properties, body):
    print "[x] Received %r"%(body,)
    time.sleep(body.count(‘.‘))
    print "[x] one"

循环分发

使用任务队列的其中一个高级功能是容易的并行任务。如果我们生成工作后台日志,我们可以添加更多的工作者并且用那种方式收放自如。

首先,我们尝试在相同时间运行两个worker.py脚本。它们将都从队列中获取消息,但是正确的是如何的呢?我们来看看。

你需要打开三个控制台。两个将运行worker.py脚本。这连个控制台僵尸我们的消费者-C1和C2。

shell1$ python worker.py
[*] Waiting for messages. To exit press CTRL+C
shell2$ python worker.py
[*] Waiting for messages. To exit press CTRL+C

在第三个控制台中我们将发布新任务。一旦你已经开始了消费者你可以发送一些消息了:

shell3$ python new_task.py First message.
shell3$ python new_task.py Second message..
shell3$ python new_task.py Third message...
shell3$ python new_task.py Fourth message....
shell3$ python new_task.py Fifth message.....

我们来看看传递给我们的工作者的是什么:

shell1$ python worker.py
 [*] Waiting for messages. To exit press CTRL+C
 [x] Received ‘First message.‘
 [x] Received ‘Third message...‘
 [x] Received ‘Fifth message.....‘
shell2$ python worker.py
 [*] Waiting for messages. To exit press CTRL+C
 [x] Received ‘Second message..‘
 [x] Received ‘Fourth message....‘

默认的,RabbitMQ将给下面的消费者按照顺序发送每一个消息。平均每个消费者将得到相同数量的消息。这种分发消息的方式被叫做round-robin。尝试用三个或者更多的工作者试试。

消息确认

做一些能消耗几秒的任务。你也许想知道如果其中一个消费者开始了一个长任务并且完成了部分之后死掉了会发生什么事情。就当前我们的代码一旦RabbitMQ给消费者传递了消息它会立即从内存中移除。在这个例子中,如果你杀死了一个工作者我们将丢掉它正在处理的消息。我们也将丢掉所有的被分发到这个特别的工作者的任然不会被处理的消息。

但是我们不想丢掉任何任务。如果一个工作者死掉了,我们想让这个任务传递给另一个工作者。

为了确保一个消息永远不会丢失,RabbitMQ支持消息确认。一个ack是从消费者中发送反馈给RabbitMQ告诉它一个特别的消息已经被接收,处理并且RabbitMQ可以自由的删除它。

如果消费者在没有发送一个ack的时候就死掉了,RabbitMQ将认为一个消息没有完全的被处理并且将给另一个消费者重新发送。用这种方式你可以确保没有消息丢失,甚至如果工作者偶尔死掉。

没有任何消息超市;RabbitMQ将重新传递消息仅仅是当工作者连接断掉的时候。甚至如果处理一个消息任务花费了很长时间那也没事。

消息确认默认是打开的。在前面的例子中我们显式的通过no_ack=True标记将它关掉了。现在该移除它并且一旦我们用这个任务完成了,就从一个工作者中发送一个合适的确认了。

def callback(ch, method, properties, body):
    print "[x] received %x"%(body,)
    time.sleep(body.count(‘.‘))
    print "[x] Done"
    ch.basic_ack(delivery_tag=method.delivery_tag)
    
channel.basic_consume(callback, queue=‘hello‘)

用这个代码我们可以确保甚至如果当工作者正在处理一个消息的时候你使用CTRL+C杀死了工作者,那也不会丢失任何消息。在工作者死掉之后所有被确认的消息很快将被重新传递。

忘了确认

通常容易犯的错误就是丢掉了basic_ack。这是一个容易的错误,但是结果是严重的。当你的客户端退出的时候消息将被重新传递,但是RabbitMQ由于不能释放未被确认的消息,将吃掉越来越多的内存。

为了debug这种错误你可以使用rabbitmqctl打印messages_unacknowledged域:
$ sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
Listing queues...
hello 0 0
...done.

消息持久性

我们已经了解了如何确保甚至当消费者死掉的时候任务不被丢失。但是如果RabbitMQ服务器停止了,我们的任务任然将被丢失。

当RabbitMQ退出或者奔溃那将丢掉队列和消息,除非你告诉它不要这样做。有两件事情被要求确保消息不会被丢失:我们需要标记队列和消息为持久的。

首先,我们需要确保RabbitMQ将永远不会丢掉我们的队列。为了这样做,我们需要把它定义为持久的:

channel.queue_declare(queue=‘hello‘,durable=True)

尽管这个命令设置是正确的,但是它不会再我们的设置中生效。那是因为我们还没有定义一个叫做hello的队列。RabbitMQ不允许你用不同的参数重新定义已经存在的队列并且对于尝试这样做的程序会返回一个错误。但是有一种快捷的变通方案-我们用不同的名字定义队列,例如task_queue:

channel.queue_declare(queue=‘task_queue‘,durable=True)

这个queue_declare改变需要被应用到生产者和消费者代码中。

在那时我们确保了甚至如果RabbitMQ重新开始task_queue队列也不会被丢掉。现在我们需要标记我们的消息为持久化-通过应用一个使用值2的delivery_mode属性。

channel.basic_publish(exchange=‘‘,routing_key=‘task_queue‘,body=message,properties=pika.BasicProperties(delivery_mode=2,))
在消息持久化上的注意点

把消息标记为持久化也不能完全确保消息将不会被丢掉。尽管它告诉RabbitMQ把消息保存到磁盘上,当RabbitMQ已经接收了一个消息并且还没有保存的时候任然后一个短时间的空隙。还有RabbitMQ给没有消息不是fsync(2)--它也许被缓存保存并且还没有真正写到磁盘中。这种持久确保不是健壮的,但是对我们的简单任务队列也足够了。如果你需要一个更健壮的担保那么你可以使用publisher confirms。

公平的分发

你也许已经注意到分发任然不是我们完全想要的那种。例如在一种有两个工作者的情况下,当所有的偶数消息是巨大的并且单数消息是轻量的,一个工作者将

不断的忙碌并且另一个将努力的做一些工作。RabbitMQ不知道任何关于这个情况的事情并且任然会均匀的分发消息。

这种意外是因为当消息进入队列的时候RabbitMQ刚刚分发了一个消息。它不会考虑消费者的许多确认消息。它仅仅是给n-th消费者绑定分发n-th消息。

为了避免这种情况我们可以使用basic.qos方法设置prefetch_count=1.这回告诉RabbitMQ不要在同一时间给一个工作者给超过一个消息。或者,换句话说,不要给一个工作者分发一个新消息知道它已经处理并且确认了前面的一个完成了。代替,它将分发给接下来的不忙碌的工作者。

channel.basic_qos(prefetch_count=1)
关于队列大小的注意点

如果所有的工作者都是忙碌的,你的队列可能填满。你就想对这个保持监控,并且也许添加更多工作者或者做一些其它策略。

把它们放一起

new_task.py脚本最后的代码:

#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host=‘localhost‘))
channel = connection.channel()

channel.queue_declare(queue=‘task_queue‘, durable=True)

message = ‘ ‘.join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(exchange=‘‘,
                      routing_key=‘task_queue‘,
                      body=message,
                      properties=pika.BasicProperties(
                         delivery_mode = 2, # make message persistent
                      ))
print " [x] Sent %r" % (message,)
connection.close()

我们的工作者:

#!/usr/bin/env python
import pika
import time

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host=‘localhost‘))
channel = connection.channel()

channel.queue_declare(queue=‘task_queue‘, durable=True)
print ‘ [*] Waiting for messages. To exit press CTRL+C‘

def callback(ch, method, properties, body):
    print " [x] Received %r" % (body,)
    time.sleep( body.count(‘.‘) )
    print " [x] Done"
    ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,
                      queue=‘task_queue‘)

channel.start_consuming()
时间: 2024-10-29 13:44:27

RabbitMQ官网教程---工作队列的相关文章

RabbitMQ官网教程---发布/订阅

(使用python客户端pika 0.9.8) 在前面的教程中我们创建了一个工作队列.假设在一个工作队列后面是每一个被传递给正确的工作者的任务.在这部分中我们将做一些完全不同的事情--我们将给多个消费者传递一个消息.这种模式被称为"发布/订阅". 为了阐明这个模式,我们将构建一个简单的日志系统.它将由两个程序构成--第一个将发出日志消息并且第二个将接收并且打印它们. 在我们的日志系统中每个运行的接收程序副本将获得这个消息.用这种方式我们将可以运行一个接收器并且直接日志到磁盘:而且同时我

RabbitMQ官网教程---路由

(使用python客户端pika 0.9.8) 在前面的教程中我们构建了一个简单的日志系统.我们可以给许多接收者广播日志消息. 在这个教程中我们将添加一个特性给它-我们将订阅仅仅一种消息子集成为可能.例如,我们可以指挥仅仅错误消息到日志文件(保存到磁盘空间),它任然可以在控制台打印所有的日志消息. 绑定 在前面的例子中我们已经创建了绑定,你可以重新调用像这样的代码: channel.queue_bind(exchange=exchange_name, queue=queue_name) 绑定是e

RabbitMQ官网教程---主题

(使用python客户端pika 0.9.8) 在前面的教程中我们提高了我们的日志系统.我们使用一个direct类型的exchange代替使用一个fanout类型的exchange的虚拟广播,获取一个可选的接收日志. 尽管使用direct类型的exchange提高了我们的系统,但是它任然是有限的-它不基于多个判断标准路由. 在我们的日志系统中,我们也许想不仅订阅基于严重程度的日志,而且还有基于源的生产日志.你也许从unix的syslog工具中知道了这个概念,它基于严重程度路由日志和设施. 那将给

Java Web框架-----------------------struts2(官网教程版HelloWorld)

Java Web框架------------------struts2(官网教程版HelloWorld) 我们都知道Struts是Java Web 常用的三大框架之一,另外还有Spring.Hibernate.学习Struts很有必 要!那么怎么学习呢?我的建议是: 1.对于英语能力还可以的人,学技术我们要从官网文档学起,再结合中文别人所写的论文.博客,视频 等.这样可以达到事半功倍的效果. 2.对于阅读英语稍微吃力的人,我们可以下载有道词典,再来本计算机专业英语书,不懂就查,但是, 我们决不能

Django学习笔记 官网教程纠正 代码

原文: Django学习笔记 官网教程纠正 代码 Django学习笔记 4.模板初学中,照书例django book 出现以下异常 raise ImportError("Settings cannot be imported, because environment variable %s is undefined." % ENVIRONMENT_VARIABLE) ImportError: Settings cannot be imported, because environmen

[pytorch] 官网教程+注释

pytorch官网教程+注释 Classifier import torch import torchvision import torchvision.transforms as transforms transform = transforms.Compose( [transforms.ToTensor(), transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))]) trainset = torchvision.datasets.CIF

Unity 官网教程 -- Multiplayer Networking

教程网址:https://unity3d.com/cn/learn/tutorials/topics/multiplayer-networking/introduction-simple-multiplayer-example?playlist=29690 1. 新建一个3D工程,在菜单 "File"  - "Save Scenes" ,保存场景为 "Main".注意,保存的文件放在Assets目录下. 2.菜单"GameObject&

Gatling官网教程翻译之Advanced Tutorial

高级教程 在这一部分,我们假设读者已经完成了前面Quickstart的学习部分.而且你已经有了一个基础的simulation. 我们在这一部分将通过一系列的重构,向读者介绍更多Gatling的高级用法和DSL结构. 回顾下已有的Simulation: 1 package computerdatabase 2 import io.gatling.core.Predef._ 3 import io.gatling.http.Predef._ 4 import scala.concurrent.dur

Postman 官网教程,重点内容,翻译笔记,

json格式的提交数据需要添加:Content-Type :application/x-www-form-urlencoded,否则会导致请求失败 1. 创建 + 测试: 创建和发送任何的HTTP请求,请求可以保存到历史中再次执行2. Organize: 使用Postman Collections为更有效的测试及集成工作流管理和组织APIs3. document: 依据你创建的Clollections自动生成API文档,并将其发布成规范的格式4. collarorate: 通过同步连接你的tea