Python-RabbitMQ消息分发机制

上一篇中的例子是一个生产者对应一个消费者,那能不能一个生产者对应一个消费者呢? 下面来测试一下,顺便观察一下它的分发策略。。。

步骤一:先编辑生产者代码(rabbit_send.py)

#top1:导入pika模块
import os
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
import pika

#top2:建立socket
connection = pika.BlockingConnection(pika.ConnectionParameters(‘localhost‘))

#top3:声明管道
channel = connection.channel()

#top4:在管道中声明Queue,Queue的名字是‘exclusive‘(随意)
channel.queue_declare(queue=‘exclusive‘)

#top5:在管道内发送消息
channel.basic_publish(exchange=‘‘,
                      routing_key=‘exclusive‘,  #queue名称
                      body=‘Let s go!‘)             #消息内容

#top6:关闭队列
connection.close()

步骤二:编辑消费者代码(rabbit_receive.py)

#top1:导入pika模块
import os
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
import pika

#top2:建立scoket
connection = pika.BlockingConnection(pika.ConnectionParameters(
    ‘localhost‘))

#top3:声明管道
channel = connection.channel()

#top4:声明Queue
channel.queue_declare(queue=‘exclusive‘)

#top5:定义一个处理消息的函数(所说的回调函数)
def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)

#top6:接收消息
channel.basic_consume(#消费消息
                      callback,   #如果收到消息,就调用callback函数来处理消息
                      queue=‘exclusive‘,
                      no_ack=True)

#top7:此处的start只要一起动就一直运行了,因为它不止收一条
channel.start_consuming()
定义好生产者和消费者后,执行一个生产者多个消费者进行测试。测试结果是消息的接收机制是轮询的,生产者每发送一次消息,都由消费者轮流来接收。

接下来考虑一个情况,现在的代码是消费者接收到消息后调用callback函数去处理消息立刻打印,但是如果我的处理过程需要30秒的时间,恰好在这30秒的时间内消费者宕机了,这个消息还没有处理完,比如我有一个转账的业务,那转到一半宕机了,那咋整?应该有一个确认机制来确定到底是不是处理完了,消费者应该发送一个确认给生产者,然后生产者才把消息从消息队列里删除;还是纠结。。。。那消费者处理到一半宕机了,还怎么给生产者发确认。。。。
还用刚才的代码来测试,把在消费者处理消息的函数中加入一个time.sleep(30),再print一句话来模拟处理时间,再执行生产者和多个消费者,假如第一个消费者接收到消息我们把它停止,再观察别的消费者,没反应。。。。什么鬼?消息丢了!!!
那我们回过头来把no_ack=True注释掉,这个的意思是"不确认",再测试。结果是把第一个消费者断了,第二个消费者继续处理消息,保证消息被处理完,那为什么生产者知道消费者宕机了呢?因为socket断了,它是连接RabbitMQ的,它断了自然而然就知道消费者宕机了。。
一般我们不需要加no_ack=True参数,只有那些对生产者不关心的消息可以加上。

原文地址:http://blog.51cto.com/12730062/2059318

时间: 2024-10-30 17:08:50

Python-RabbitMQ消息分发机制的相关文章

【RabbitMQ】5、RabbitMQ任务分发机制

当有Consumer需要大量的运算时,RabbitMQ Server需要一定的分发机制来balance每个Consumer的load.接下来我们分布讲解. 应用场景就是RabbitMQ Server会将queue的Message分发给不同的Consumer以处理计算密集型的任务: 1. Message acknowledgment 消息确认 每个Consumer可能需要一段时间才能处理完收到的数据.如果在这个过程中,Consumer出错了,异常退出了,而数据还没有处理完成,那么 非常不幸,这段数

Cocos2d-x 3.0 屏幕触摸及消息分发机制

***************************************转载请注明出处:http://blog.csdn.net/lttree******************************************** 题外话: 唉. 开学了!    好烦. 这就已经大三了, 两年前的这时候,我还是懵懂的大一小学弟, 两年后.就要奔上社会就业了. 光阴似箭.日月如梭呀~ 正文: 好久没做cocos2d-x了,这次练习一下.屏幕触摸及消息分发机制. 这里,我用的是cocos2d-

delphi VCL研究之消息分发机制(转)

原文来源,http://blog.csdn.net/sushengmiyan/article/details/8635550 1.VCL 概貌 先看一下VCL类图的主要分支,如图4.1所示.在图中可以看到,TObject是VCL的祖先类,这也是Object Pascal语言所规定的.但实际上,TObject以及TObject声明所在的system.pas整个单元,包括在“编译器魔法”话题中提到的_ClassCreate等函数,都是编译器内置支持的.因此,无法修改.删除system.pas中的任何

RabbitMQ消息确认机制—消息发送确认和 消息接收确认

/** * RabbitMQ消息确认机制 * 关于rabbit的生产和消费方的一些实用的操作: * producer的confirm和consumer的ack,这两者使用的模式都是用来保证数据完整性,防止数据丢失 */ /** * producer的confirm模式 * 业务场景描述: * 促销系统在做活动前,需要给用户的手机发送一条活动内容短信希望用户来参加, * 因为用户量有点大,所以通过往短信mq中插入数据方式,让短信服务来消费mq发短信: * 此时插入mq消息的服务为了保证给所有用户发

RabbitMQ消息分发轮询和Message Acknowledgment

一.消息分发 RabbitMQ中的消息都只能存储在Queue中,生产者(下图中的P)生产消息并最终投递到Queue中,消费者(下图中的C)可以从Queue中获取消息并消费. 多个消费者可以订阅同一个Queue,这时Queue中的消息会被平均分摊给多个消费者进行处理,而不是每个消费者都收到所有的消息并处理. 启动3个消费者 生产者依次生成3条消息 可见3条消息分别被3个消费者获取,所以RabbitMQ是采用轮询机制将消息队列Queue中的消息依次发给不同的消费者 二.消息确认(Message Ac

RabbitMQ 消息确认机制

消息确认机制 在之前异常处理部分就已经写了,对于consumer的异常退出导致消息丢失,可以时候consumer的消息确认机制.重复的就不说了,这里说一些不一样的. consumer的消息确认机制 当一个消费者收到一个快递,但是这个包裹是破损的,这时候一般会有以下选择 拒收快递,让快递员把快递寄回. (如果有多个consumer可能这条消息会到其它的consumer中,如果只有一个,那么下次获取还是可以拿到) 签收快递,然后偷偷的扔了(钱多任性) 拒收快递,联系商家再给我补发一个 下面是具体的方

RabbitMQ消息分发轮询

我们首先下载pika,以及rabbitMQ,和ir语言,rabbitMQ是由ir语言编写的 消息队列的使用过程大概如下: (1)客户端连接到消息队列服务器,打开一个channel. channel:消息通道,在客户端的每个连接里,可建立多个channel,每个channel代表一个会话任务. (2)客户端声明一个exchange,并设置相关属性. Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列. (3)客户端声明一个queue,并设置相关属性. Queue:消息队列载体,每个

Android正在使用Handler实现消息分发机制(两)

在开始这篇文章之前,.首先,我们在总结前两篇文章Handler, Looper和MessageQueue像一些关键点: 0)在创建线程Handler之前,你必须调用Looper.prepare(), 创建一个线程局部变量Looper,然后调用Looper.loop() 进入轮循. 1)当Handler创建之后,就能够调用Handler的sendMessageAtTime方法发送消息.而实际上是调用MessageQueue的enqueueMessage方法.将相应的消息放入消息队列. 2)每个线程

Python RabbitMQ 消息队列

RabbitMQ是一个在AMQP基础上完整的,可复用的企业消息系统.他遵循Mozilla Public License开源协议. MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法.应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,而无需专用连接来链接它们.消 息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信,直接调用通常是用于诸如远程过程调用的技术.排队指的是应用程序通过 队列来通信.队列的使用除去了接收和发送应用