基于Python语言使用RabbitMQ消息队列(二)

工作队列

在第一节我们写了程序来向命名队列发送和接收消息 。在本节我们会创建一个工作队列(Work Queue)用来在多个工人(worker)中分发时间消耗型任务(time-consuming tasks)。

工作队列(又叫做: Task Queues)背后的主体思想是 避免立刻去执行耗时任务并且等待它们完成。 相反我们可以安排这样的任务稍后执行. 我们可以把任务封装成一个消息并发送到队列中. 一个在后台运行的工人进程会接收任务并最终执行工作。当你使很多工人(workers)程序运行时,多个任务就会由它们共同承担。 
这个概念在web应用中尤其有用,因为在一次短期的HTTP请求中处理复杂任务几乎是不可能的。

准备

在前一节我们发送了消息 “Hello World!”. 现在我们会发送一个代表复杂任务的字符串. 目前我们没有一个真实情境下的任务,像重置图片大小或者pdf文件渲染,所以我们就做一个伪装,假装我们很忙就行了:通过time.sleep()方法的使用,我们让字符串中存在的点(.)的数量代表任务的复杂性,一个点占用一个工作的一秒钟。例如,“Hello…”会耗用三秒钟。

我们将会稍微修改先前的 send.py 代码, 允许从命令行发送任意的消息. 这个程序会安排任务给我们的工作队列,所以重命名为new_task.py:

import sys

message = ‘ ‘.join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(exchange=‘‘,
                      routing_key=‘task_queue‘,
                      body=message,
                      properties=pika.BasicProperties(
                         delivery_mode = 2, # make message                     #persistent
                      ))
print(" [x] Sent %r" % message)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

我们之前的 receive.py 脚本也需要做些改变: 假装让消息体中的每个点”.”耗费一秒钟的工作。它需要从队列中提取消息并且完成任务 ,我们把它命名为worker.py:

import time

def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    time.sleep(body.count(b‘.‘))
    print(" [x] Done")
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

轮询派发(Round-robin dispatching)

使用任务队列的一个优势是简化并行任务的能力。如果我们正在建立一个后台记录的任务,只需要多添加些工人(worker),这很容易做到。

首先我们同时运行起两个worker.py脚本,它们都会从队列中获取消息,到底是怎么回事呢,我们来看一下 。

你需要打开三个控制台,两个运行worker.py脚本。这两个控制台会成为我们的两个消费者–C1和C2。

# shell 1
python worker.py
# => [*] Waiting for messages. To exit press CTRL+C

# shell 2
python worker.py
# => [*] Waiting for messages. To exit press CTRL+C

# shell 3
python new_task.py First message.
python new_task.py Second message..
python new_task.py Third message...
python new_task.py Fourth message....
python new_task.py Fifth message.....
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

下图为在我的Ubuntu终端上的运行结果: 
shell1 
 
shell2 
 
shell3 

消息通知

RabbitMQ会默认把每条消息按次序发送给下一个消费者,平均每个消费者会获取到相同数量的消息,这种分发消息的方式就是轮询(round-robin),你可以使用三个或者更多工人试一下效果。 
做一件任务需要耗费数秒钟的时间。你可能疑惑如果一个消费者开展了一个长时间任务,但只完成了一部分时就死掉了,这时候会发生什么呢? 就我们当前的代码来说,一旦RabbitMQ把消息传递给了它的客户,RabbitMQ会立刻从内存中把这条消息删除掉,这样的话如果你杀死掉一个工人进程,我们就会丢掉它正在处理的这条消息。我们也会丢掉所有派发给这个特定工人进程的还有没被处理的消息。

但我们不想丢掉任何任务,如果一个工人进程死掉了,我们希望任务会被传递给另一个工人。 
为了确保消息没有丢,RabbitMQ支持消息通知机制(message acknowledgments)。一条通知(ack)会从消费者处返回来告知RabbitMQ特定的消息已经被接收,被处理并且RabbitMQ可以删掉它。

如果一个消费者挂了(它的渠道(channel)被关闭,连接被关闭或者TCP连接丢失)但没有发送通知,会理解为消息没有被完整地处理并且会重新把它推入队列。这时如果有其他消费者存在,它会迅速重新把它传递给其他消费者。这样的话你就可以确定不会有消息被丢掉,哪怕是工人进程意外挂了。

不会出现任何的消息超时问题,当消费者挂掉RabbitMQ会重新发送消息即便处理一条消息花费了很长很长时间。

消息通知默认是打开的。在前面的例子中我们通过设置no_ack=True 显式地关闭了他们flag. 是时候把它拿掉了,并且一旦完成了一个任务就让工人发送一条通知。

def callback(ch, method, properties, body):
    print " [x] Received %r" % (body,)
    time.sleep( body.count(‘.‘) )
    print " [x] Done"
    ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_consume(callback,
                      queue=‘task_queue‘)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

使用上面的代码我们可以确保什么也不会丢失,即便你通过CTRL+C退出了一个正在处理消息的工人进程。工人进程挂掉后,所有未返回通知的消息都会被重新发送。

忘了通知

一个常见错误是我们忘了basic_ack ,这看上去是个小错误, 
但后果很严重。当你退出客户端时消息会重新发送(看上去像是随机发送),但RabbitMQ会吃掉越来越多的内存,因为它不会释放任何未返回通知的消息。

调试这种类型的错误你可以使用rabbitmqctl打印messages_unacknowledged字段

sudo rabbitmqctl list_queues name messages_ready 
messages_unacknowledged

在 Windows上, 不用 sudo:

rabbitmqctl.bat list_queues name messages_ready 
messages_unacknowledged

消息持久化(durability)

我们已经了解如何确保即便消费者死掉任务也不会丢失,但是如果RabbitMQ服务停止我们的任务仍然会丢失。 
当RabbitMQ退出或崩溃时,它会遗忘掉队列和消息,除非你告诉它不要这样做。确保消息不会丢失我们有两件事需要做:把队列和消息都标记为持久化的。

首先,我们确保RabbitMQ不会丢失我们的队列,为了达到这个目的需要把它声明为持久化的:

channel.queue_declare(queue=‘hello‘, durable=True)
  • 1

就这条命令自身来说它是正确的,但在我们的设置中它无法正常工作。因为我们已经定义了一个叫做hello的非持久化的队列。RabbitMQ 不允许你使用不同的参数重新定义一个已经存在的队列并且会向任何试图那样做的程序返回一个错误。 但有一个变通方案(workaround)-我们用不同的名字声明一个队列,例如 task_queue:

channel.queue_declare(queue=‘task_queue‘, durable=True)
  • 1

这queue_declare 的改变 需要应用到生产者和消费者代码上面(其实我在前面早已经这样做了) 
这样我们确定task_queue 队列不会被丢掉即便 RabbitMQ 重启。 现在我们需要标记我们的消息为持久化——通过提供一个值为2的delivery_mode 属性。

channel.basic_publish(exchange=‘‘,
                      routing_key="task_queue",
                      body=message,
                      properties=pika.BasicProperties(
                         delivery_mode = 2,
                      ))
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

公平派发

你可能已经注意到派发过程仍然不太合适。例如有两个工人的情况, 当所有编号为偶数的消息是重量级,奇数消息是轻量级时,一个工人进程会持续繁忙,另一个却没做什么工作。好吧,RabbitMQ对此一无所知,并且继续若无其事地派发消息。 
发生这种情况是因为当消息进入队列时,RabbitMQ只是进行派发,它不会查看一个消费者的未返回通知的数量。它只是忙目地把第n条消息派发给第n条消费者。 
 
为了应对这种情况,我们可以使用basic.qos方法,设置prefetch_count=1 。这会告诉 RabbitMQ 不要同时给一个工人超过一条消息。或者换句话说,在一个工人处理完先前的消息并且返回通知前不要给他派发新的消息。相反的,它会把消息派发给下一个不忙的工人。

channel.basic_qos(prefetch_count=1)
  • 1

注意队列大小

如果所有工人都在繁忙中, 你的队列可能会被填满. 你会留意到这种情况,并且可能添加更多工人或者使用 message TTL(一个队列和消息存活时间的扩展,在此不做过多介绍)

整合

new_task.py脚本完整代码:

#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(host=‘localhost‘))
channel = connection.channel()

channel.queue_declare(queue=‘task_queue‘, durable=True)

message = ‘ ‘.join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(exchange=‘‘,
                      routing_key=‘task_queue‘,
                      body=message,
                      properties=pika.BasicProperties(
                         delivery_mode = 2, # make message persistent
                      ))
print(" [x] Sent %r" % message)
connection.close()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

worker.py脚本完成代码:

#!/usr/bin/env python
import pika
import time

connection = pika.BlockingConnection(pika.ConnectionParameters(host=‘localhost‘))
channel = connection.channel()

channel.queue_declare(queue=‘task_queue‘, durable=True)
print(‘ [*] Waiting for messages. To exit press CTRL+C‘)

def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    time.sleep(body.count(b‘.‘))
    print(" [x] Done")
    ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,
                      queue=‘task_queue‘)

channel.start_consuming()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

使用消息通知和prefetch_count你可以建立一个工作队列 ,持久化选项会使任务仍然存在即便RabbitMQ重启。 
下一节我们会了解如何把相同的消息传递给多个消费者。

原文地址:https://www.cnblogs.com/ExMan/p/10281577.html

时间: 2024-08-23 23:09:26

基于Python语言使用RabbitMQ消息队列(二)的相关文章

基于Python语言使用RabbitMQ消息队列(一)

介绍 RabbitMQ 是一个消息中间人(broker): 它接收并且发送消息. 你可以把它想象成一个邮局: 当你把想要寄出的信放到邮筒里时, 你可以确定邮递员会把信件送到收信人那里. 在这个比喻中, RabbitMQ 就是一个邮筒, 同时也是邮局和邮递员 . 和邮局的主要不同点在于RabbitMQ不处理纸质信件, 而是 接收(accepts), 存储(stores) 和转发(forwards)二进制数据块 —— 消息(messages). 在RabbitMQ中有一些自己的行业术语要了解 . 生

Python并发编程-RabbitMq消息队列

消息中间件 --->就是消息队列 异步方式:不需要立马得到结果,需要排队 同步方式:需要实时获得数据,坚决不能排队 subprocess 的Q也提供不同进程之间的沟通 应用场景: 买票,抢购 堡垒机批量发送文件 Centos6.x系统编译安装RabbitMQ 一.系统环境 [[email protected] ~]# cat /etc/redhat-release CentOS release 6.6 (Final) [[email protected] ~]# uname -r 2.6.32-

(转)(二)RabbitMQ消息队列-RabbitMQ消息队列架构与基本概念

http://blog.csdn.net/super_rd/article/details/70238869 没错我还是没有讲怎么安装和写一个HelloWord,不过快了,这一章我们先了解下RabbitMQ的基本概念. RabbitMQ架构 说是架构其实更像是应用场景下的架构(自己画的有点丑,勿嫌弃) 从图中可以看出RabbitMQ主要由Exchange和Queue两部分组成,然后通过RoutingKey关联起来,消息投递到Exchange然后通过Queue接收. RabbitMQ消息队列基本概

RabbitMQ 消息队列 应用

安装参考    详细介绍   学习参考 RabbitMQ 消息队列 RabbitMQ是一个在AMQP基础上完整的,可复用的企业消息系统.他遵循Mozilla Public License开源协议. MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法.应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,而无需专用连接来链接它们.消息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信,直接调用通常是用于诸如远程过程调用的技术.

RabbitMQ消息队列应用

RabbitMQ消息队列应用 消息通信组件Net分布式系统的核心中间件之一,应用与系统高并发,各个组件之间解耦的依赖的场景.本框架采用消息队列中间件主要应用于两方面:一是解决部分高并发的业务处理:二是通过消息队列传输系统日志.目前业界使用较多的消息队列组件有RabbitMQ.ActiveMQ.MSMQ.kafka.zeroMQ等,本文对系统架构之MQ Component诠释,并采用RabbitMQ作为消息队列中间件. 图1- 消息队列组件示意图 一.RabbitMQ介绍 RabbitMQ是一款基

Golang调用Rabbitmq消息队列和封装

前言 介绍Rabbimq Rabbitmq消息队列是干嘛的? 简单的说,消息队列,引申一下就是传递消息用的队列,也可以称为传递消息的通信方法.用争抢订单的快车举个例子,假如,A用户发送了一个用车的消息,那么消息队列要做的就是把A用户用车的这个消息广而告之,发送到一个公用队列当中,司机只管取到消息,而不管是谁发布的,这就是一个简单的消息队列例子,Rabbitmq其实就是消息队列的一种,用的比较多的还可能有Redis,kafka,ActiceMq等等,这个后面的博文里面我会说,这次我们只说Rabbi

关于《selenium2自动测试实战--基于Python语言》

关于本书的类型: 首先在我看来技术书分为两类,一类是“思想”,一类是“操作手册”. 对于思想类的书,一般作者有很多年经验积累,这类书需要细读与品位.高手读了会深有体会,豁然开朗.新手读了不止所云,甚至会说,都在扯犊子,看了半天也不知道如何下手. 对于操作手册的书,一般会提供大量的实例,告诉你详细的步骤.对于高手来说,这不就是翻译了一下官方文档嘛,好意思拿来骗钱.但对于新手来说,反而认为是好处,跟着上面的步骤操作就掌握了某种技术能力. 显然,本书属于后者,书中提供了大量代码实例,并没有太多思想层面

(转)RabbitMQ消息队列(六):使用主题进行消息分发

在上篇文章RabbitMQ消息队列(五):Routing 消息路由 中,我们实现了一个简单的日志系统.Consumer可以监听不同severity的log.但是,这也是它之所以叫做简单日志系统的原因,因为是仅仅能够通过severity设定.不支持更多的标准. 比如syslog unix的日志工具,它可以通过severity (info/warn/crit...) 和模块(auth/cron/kern...).这可能更是我们想要的:我们可以仅仅需要cron模块的log. 为了实现类似的功能,我们需

基于python语言的tensorflow的‘端到端’的字符型验证码识别源码整理(github源码分享)

基于python语言的tensorflow的‘端到端’的字符型验证码识别 1   Abstract 验证码(CAPTCHA)的诞生本身是为了自动区分 自然人 和 机器人 的一套公开方法, 但是近几年的人工智能技术的发展,传统的字符验证已经形同虚设. 所以,大家一方面研究和学习此代码时,另外一方面也要警惕自己的互联网系统的web安全问题. Keywords: 人工智能,Python,字符验证码,CAPTCHA,识别,tensorflow,CNN,深度学习 2   Introduction 全自动区