RabbitMQ实践体验

最近由于业务需要进行性能升级,将原来需要经过http进行数据交互的方式修改为消息队列的形式。于是原来的同步处理的方式变成了异步处理,在一定程度上提升我们系统的性能,不过debug的时候,不免哭了出来。因为每个环节都需要进行详细检查。
对于RabbitMQ,我们知道,其是AMQP的一种代理服服务器,具有一套严格的通信方式,即在核心产品进行通信的各个方面几乎都采用了RPC(Remote Procedure Call, 远程过程调用)模式。

AMQ与RabbitMQ进行交互

RabbitMQ通信时用到的类和方法与AMQP协议层面的类和方法一一对应。因此AMQP本质上是RPC的一种传输机制

高级消息队列模型

AMQ(Advanced Message Queuing)模型,这个模型是针对代理服务器软件例如(RabbitMQ)设计的,该模型在逻辑上定义了三种抽象组件用于指定消息的路由行为,分别是:

  • 交换器Exchange,消息代理服务器中用于把消息路由到队列的组件
    接收/发送到RabbitMQ中的消息并决定把他们投递到何处。
    定义消息的路由行为,通常这需要检查消息所携带的数据特性或者包含在消息体内的各种属性
  • 队列Queue,用来存储消息的数据结构,位于硬盘或内存中,以FIFO的顺序进行投递
    负责存储接收到的消息,同时也可能包含何如处理消息的配置信息。
  • 绑定Binding,一套规则,用于告诉交换器消息应该被存储到哪个队列
    • 定义队列和交换机之间的关系
    • 告知一个交换器应该将消息投递到哪些队列中。对于某些交换器类型,绑定同时告知交换器如何对消息进行过滤从而决定能够投递到队列的消息
    • 当发布一条消息到交换器时,应用程序使用路由键routing-key属性。路由可以是队列名称,也可以是一串用于描述消息、具有特定语法的字符串。当交换器对一条消息进行评估以决定路由到哪些合适的队列时,消息的路由就会和绑定进行比对。
    • 绑定是绑定队列到交换器的粘合剂,而路由则是用于比对的标准。 RabbitMQ的灵活性来自于消息如何通过交换器路由到队列的动态特性,介于交换器和队列之间的绑定,以及他们所创建的动态消息路由,构成了消息通信架构的基本组件。为了把消息路由到合适的目标地址,RabbitMQ所需的第一种信息就是用于控制路由的交换

python使用AMQP

在将消息发布到队列之前,我们需要经历过以下若干个步骤。至少,必须要设置交换器和队列,然后将他们绑定再一起。接下来我们将通过python来实现AMQP机制。
我用到了pika这个库,需要的话,需要通过以下指令安装。该库实现了绝大部分rabbitmq的api以及提供了相关的调优参数,后续有机会不妨可以详谈。

pip install pika

1. 声明交换器

交换器在AMQ模型中是非常重要的角色存在。因此,在AMQP规范中都有自己的类。声明一个交换器,我们可以直接在控制台界面进行创建。

不过这样仅仅是在极少数的情况下才适合,动手调戏鼠标对开发工程师的来说实在是太蠢啦,能玩键盘就别玩鼠标啊,我们不妨通过以下代码来声明(创建)一个交换器。pika内置函数会事先通过get的方式来检查我们待声明的交换器是否存在,如果存在则不创建,否则创建一个新的交换器。

 self.channel.exchange_declare(
            exchange=exchange,
            exchange_type="direct",
            passive=False,
            durable=True,
            auto_delete=False)

2. 声明队列

一旦交换器创建成功,就可以通过发送类似queue.declare命令让rabbitmq创建一个队列。同样的,我们仍然可以在图形化界面里面创建队列。

还是那句话,动手调戏鼠标对开发工程师的来说实在是太蠢啦,能玩键盘就别玩鼠标啊,我们不妨通过以下代码来声明(创建)若干个队列。pika内置函数会事先通过get的方式来检查我们待声明的队列是否存在,如果存在则不创建,否则创建一个新的队列。

self.channel.queue_declare(queue=queue, durable=True)

当队列同名时,即如果我们多次发送同一个queue.declare命令并不会有任何副作用,因为RabbitMQ并不会处理后续的队列声明,究其原因,每次创建都会先通过get的方式调用消息队列引擎查询队列是否存在。如果需要返回队列相关的有用信息,则将会返回队列中待处理消息的数量以及该队列的消费者数量。当然了如果队列同名,而且新队列的属性与原有的队列不一样,那么RabbitMQ将关闭发出的RPC请求的信道,返回403错误

3. 绑定队列到交换器

一旦创建了交换器和队列,之后就可以将它们绑定在一起了,如同queue.declare命令,将队列绑定到交换器Queue.Bind每次只能指定一个队列。我们既可以通过图形化界面进行绑定,也可以通过代码实现这个效果

 self.channel.queue_bind(
            queue=queue, exchange=exchange, routing_key=rk)

4. 发布消息

发布消息到RabbitMQ时,多个帧封装了发送到服务器的消息数据。在实际的消息内容到达rabbitMQ之前,客户端应用程序会发送一个basic.publish方法帧、一个内容头帧和至少一个消息体帧。
默认情况下,只要没有消费者正在监听队列,消息就会被存储在队列中。当添加更多消息时,队列大小也会随之增加。RabbitMQ可以将这些消息保存在内存或者写入磁盘。

def produce(self, body):
        self.channel.basic_publish(exchange=self.exchange, routing_key=self.route_key, body=body,
                                   properties=pika.BasicProperties(content_type=‘text/plain‘, delivery_mode=1)
                                   )

5. 消费消息

一旦发布消息被路由并且保存在一个或者多个队列中,剩下的就是如何对其进行消费。注意到,发送和消费是异步的。 消费时,可以让RabbitMQ知道如何消费他们
Basic.Consume命令中
no_ack为true时,RabbitMQ将连续发送消息直到消费者发送一个Basic.Cancel命令或者断开连接为止
如果为false,则需要发送一个Basic.Ack来确认收到每条消息的请求

def on_message(chan, method_frame, _header_frame, body, userdata=None):
            """Called when a message is received. Log message and ack it."""
            # LOGGER.info(‘Userdata: %s Message body: %s‘, userdata, body)
            # print(" [x] Received %r" % body.decode())
            data = body.decode()
            result = alarmFun(data)
            publish = Publish(exchange=‘spider‘, queue=‘alarm‘, rk=‘rk-alarm‘)
            publish.produce(result)
            # chan.basic_ack(delivery_tag=method_frame.delivery_tag)

on_message_callback = functools.partial(on_message)
self.channel.basic_consume(on_message_callback=on_message_callback,
                                   queue=self.queue,
                                   auto_ack=True
                                   )

基于python开发

经过前面的描述,我们需要理论联系实践,让我们通过python开发消费者角色和发布者角色。

发布者

按照配置流程,我们需要初始化连接、配置交换器、队列、绑定,然后才能通过连接件信息推送(publish)到队列中。

import logging
from random import randint

import pika

BROKER_USER = os.environ.get(‘BROKER_USER‘, ‘guest‘)
BROKER_PASSWD = os.environ.get(‘BROKER_PASSWD‘, ‘guest‘)
BROKER_IP = os.environ.get(‘BROKER_IP‘, ‘127.0.0.1‘)
BROKER_PORT = os.environ.get(‘BROKER_PORT‘, ‘5672‘)
BROKER_VHOST = os.environ.get(‘BROKER_VHOST‘, ‘my_vhost‘)
CELERYBEAT_SCHEDULER = ‘djcelery.schedulers.DatabaseScheduler‘
BROKER_URL = ‘amqp://{}:{}@{}:{}/{}‘.format(BROKER_USER, BROKER_PASSWD, BROKER_IP, BROKER_PORT, BROKER_VHOST)

# logging.basicConfig(level=logging.DEBUG)
# LOG_FORMAT = (‘%(levelname) -10s %(asctime)s %(name) -30s %(funcName) ‘
#               ‘-35s %(lineno) -5d: %(message)s‘)
# LOGGER = logging.getLogger(__name__)

class Publish(object):
    def __init__(self, exchange, queue, rk):
        # LOGGER.info(‘Connecting to %s‘, BROKER_URL)
        # logging.basicConfig(level=logging.DEBUG)
        self.credentials = pika.PlainCredentials(BROKER_USER, BROKER_PASSWD)
        # 通过这个方式设置备用链路,保证connection稳定性
        self.parameters = (
            pika.ConnectionParameters(BROKER_IP, BROKER_PORT, BROKER_VHOST, self.credentials),
            pika.ConnectionParameters(BROKER_IP, BROKER_PORT, BROKER_VHOST, self.credentials, connection_attempts=5,
                                      retry_delay=1))
        self.connection = pika.BlockingConnection(self.parameters)
        self.channel = self.connection.channel()
        self.exchange = exchange
        self.channel.exchange_declare(
            exchange=exchange,
            exchange_type="direct",
            passive=False,
            durable=True,
            auto_delete=False)
        self.channel.queue_declare(queue=queue, durable=True)
        self.route_key = rk

    def produce(self, body):
        self.channel.basic_publish(exchange=self.exchange, routing_key=self.route_key, body=body,
                                   properties=pika.BasicProperties(content_type=‘text/plain‘, delivery_mode=1)
                                   )

    def close(self):
        self.connection.close()

def test():
    publish = Publish(exchange=‘test_yerik‘, queue=‘test_test‘, rk=‘rk-test_test‘)
    for i in range(1, 10000):
        publish.produce(randint(1, 100).__str__())
    publish.close()

if __name__ == ‘__main__‘:
    test()

消费者

消费者的设计和生产者在初始化的时候设计大致相同,都是通过建立连接、开启channel、exange、queue、bind等过程,主要的区别在于commsum

import functools
import logging
import pika

BROKER_USER = os.environ.get(‘BROKER_USER‘, ‘guest‘)
BROKER_PASSWD = os.environ.get(‘BROKER_PASSWD‘, ‘guest‘)
BROKER_IP = os.environ.get(‘BROKER_IP‘, ‘127.0.0.1‘)
BROKER_PORT = os.environ.get(‘BROKER_PORT‘, ‘5672‘)
BROKER_VHOST = os.environ.get(‘BROKER_VHOST‘, ‘my_vhost‘)
CELERYBEAT_SCHEDULER = ‘djcelery.schedulers.DatabaseScheduler‘
BROKER_URL = ‘amqp://{}:{}@{}:{}/{}‘.format(BROKER_USER, BROKER_PASSWD, BROKER_IP, BROKER_PORT, BROKER_VHOST)

# print(‘pika version: %s‘ % pika.__version__)

# logging.basicConfig(level=logging.DEBUG)
# LOG_FORMAT = (‘%(levelname) -10s %(asctime)s %(name) -30s %(funcName) ‘
#               ‘-35s %(lineno) -5d: %(message)s‘)
# LOGGER = logging.getLogger(__name__)
from apps.alarm.alarmfun import alarmFun
from apps.utils.rabbitmq.publish import Publish

class Consummer(object):
    def __init__(self, exchange, queue, rk):
        # LOGGER.info(‘Connecting to %s‘, BROKER_URL)
        self.credentials = pika.PlainCredentials(BROKER_USER, BROKER_PASSWD)
        self.parameters = (
            pika.ConnectionParameters(BROKER_IP, BROKER_PORT, BROKER_VHOST, self.credentials),
            pika.ConnectionParameters(BROKER_IP, BROKER_PORT, BROKER_VHOST, self.credentials, connection_attempts=5,
                                      retry_delay=1))
        self.connection = pika.BlockingConnection(self.parameters)

        self.channel = self.connection.channel()
        self.exchange = exchange
        self.channel.basic_qos(prefetch_count=1)
        self.exchange = exchange
        self.queue = queue
        self.channel.exchange_declare(
            exchange=exchange,
            exchange_type="direct",
            passive=False,
            durable=True,
            auto_delete=False)
        self.channel.queue_declare(queue=queue, durable=True)
        self.channel.queue_bind(
            queue=queue, exchange=exchange, routing_key=rk)
        self.channel.basic_qos(prefetch_count=1)

    def consum_message(self):
        # LOGGER.info(‘Comsummer by {}‘.format(name))
        def on_message(chan, method_frame, _header_frame, body, userdata=None):
            """Called when a message is received. Log message and ack it."""
            # LOGGER.info(‘Userdata: %s Message body: %s‘, userdata, body)
            # print(" [x] Received %r" % body.decode())
            data = body.decode()
            result = alarmFun(data)
            publish = Publish(exchange=‘spider‘, queue=‘alarm‘, rk=‘rk-alarm‘)
            publish.produce(result)
            # chan.basic_ack(delivery_tag=method_frame.delivery_tag)

        on_message_callback = functools.partial(on_message)

        self.channel.basic_consume(on_message_callback=on_message_callback,
                                   queue=self.queue,
                                   auto_ack=True
                                   )
        try:
            self.channel.start_consuming()

        except KeyboardInterrupt:
            self.channel.stop_consuming()

    def cancel(self):
        self.connection.close()

def test():
    consummer = Consummer(‘test_yerik‘, ‘test_test‘, ‘rk-test_test‘)
    consummer.consum_message()
    print(consummer.receive)

if __name__ == ‘__main__‘:
    test()

参考文档:

  1. 深入RabbitMQ, Gavin M.Roy 著 汪佳南 郑天民 译

原文地址:https://blog.51cto.com/yerikyu/2386033

时间: 2024-10-05 04:28:51

RabbitMQ实践体验的相关文章

NET下RabbitMQ实践[配置篇]

这个系列目前计划写四篇,分别是配置,示例,WCF发布,实战.当然不排除加餐情况.  介绍: rabbitMQ是一个在AMQP协议标准基础上完整的,可服用的企业消息系统.他遵循Mozilla Public License开源协议.采用 Erlang 实现的工业级的消息队列(MQ)服务器. RabbitMQ的官方站:http://www.rabbitmq.com/          AMQP(高级消息队列协议) 是一个异步消息传递所使用的应用层协议规范,作为线路层协议,而不是API(例如JMS),A

NET下RabbitMQ实践[示例篇]

在上一篇文章中,介绍了在window环境下安装erlang,rabbitmq-server,以免配置用户,权限,虚拟机等内容.         今天将会介绍如果使用rabbitmq进行简单的消息入队,出队操作,因为本文演示的环境要用到上文中配置的环境,所以要运行本文sample,请先按上一篇中完成相应环境配置.         首先,我们下载官方的.net客户端软件,链接:http://www.rabbitmq.com/dotnet.html         下载并安装之后,将安装目录下的这两个

NET下RabbitMQ实践[实战篇]

之前的文章中,介绍了如何将RabbitMQ以WCF方式进行发布.今天就介绍一下我们产品中如何使用RabbitMQ的!          在Discuz!NT企业版中,提供了对HTTP错误日志的记录功能,这一点对企业版非常重要,另外存储错误日志使用了MongoDB,理由很简单,MongoDB的添加操作飞快,即使数量过亿之后插入速度依旧不减.          在开始正文之前,先说明一下本文的代码分析顺序,即:程序入口==>RabbitMQ客户端===>RabbitMQ服务端.好了,闲话少说,开始

NET下RabbitMQ实践[WCF发布篇]

在之前的两篇文章中,主要介绍了RabbitMQ环境配置,简单示例的编写.今天将会介绍如何使用WCF将RabbitMQ列队以服务的方式进行发布.          注:因为RabbitMQ的官方.net客户端中包括了WCF的SAMPLE代码演示,很适合初学者,所以我就偷了个懒,直接对照它的SAMPLE来说明了,算是借花献佛吧,呵呵. 首先我们下载相应源码(基于.NET 3.0),本文主要对该源码包中的代码进行讲解,链接如下:        Binary, compiled for .NET 3.0

golang rabbitmq实践 (二 实现简单的消息收发)

1:驱动 本来打算自己写一个驱动的,后来发现github上面已经有了,那我就直接拿现成的了, 驱动采用 github.com/streadway/amqp ,直接import就可以啦! 2:exchange and queue 在上一篇文章中,我们已经创建好virtualhost .exchange and queue,所以我们先定义这些常量 const ( queueName = "push.msg.q" exchange = "t.msg.ex" mqurl =

golang rabbitmq实践 (一 rabbitmq配置)

1:环境选择 系统为ubuntu 15.04 ,我装在虚拟机里面的 2:rabbitmq tabbitmq 3.5.4  download url : http://www.rabbitmq.com/ 3:安装 在Ubuntu环境下,建议直接下载deb安装包,可以再ubuntu软件包管理中直接安装,并且安装其他依赖包 4:启动 如果是deb包直接安装的话,默认是直接启动的,也可以通过 sudo  rabbitmq-server start 启动.如果提示 node with name "rabb

golang rabbitmq实践(啰嗦)

1:背景简介 我是一个.net一线开发,今年6月份离开帝都来到魔都,后入职于莫江互联网在线教育公司.现刚刚转正,在这短短的三个月的时间,莫江给我的是职业路上颠覆性变化. .net技术迷 => nodejs/java/golang 在此之前,我认为我会在.net平台坚定不移的走下,因为我是如此的喜欢C#语言,认为它是那么优美简介,vs ide号称宇宙神器,coding如此happy,but 当我们每周技术讨论例会上,我才真正的认识到我自己原来就是井底之蛙 2:C#和其他我所接触的语言对比 网上关于

RabbitMQ 实践及使用

1. RabbitMQ的安装 1.1 配置好 epel # For EL5: rpm -Uvh http://download.fedoraproject.org/pub/epel/5/i386/epel-release-5-4.noarch.rpm # For EL6: rpm -Uvh http://download.fedoraproject.org/pub/epel/6/i386/epel-release-6-8.noarch.rpm # For EL7: rpm -Uvh http:/

CentOS 7 安装RabbitMQ 3.3

http://www.cnblogs.com/shanyou/p/3902905.html 1.安装erlang 语言环境 安装依赖文件 #yum install ncurses-devel 进入 http://www.erlang.org/download.html 选择源文件下载 wget http://www.erlang.org/download/ otp_src_17.1.tar.gz tar zxvf  otp_src_17.1.tar.gz cd  otp_src_17.1 阅读H