基于Python语言使用RabbitMQ消息队列(一)

介绍

RabbitMQ 是一个消息中间人(broker): 它接收并且发送消息. 你可以把它想象成一个邮局: 当你把想要寄出的信放到邮筒里时, 你可以确定邮递员会把信件送到收信人那里. 在这个比喻中, RabbitMQ 就是一个邮筒, 同时也是邮局和邮递员 . 
和邮局的主要不同点在于RabbitMQ不处理纸质信件, 而是 接收(accepts), 存储(stores) 和转发(forwards)二进制数据块 —— 消息(messages). 
在RabbitMQ中有一些自己的行业术语要了解 . 
生产(producing)在这里的意思就是发送(sending). 一个发送消息的程序就是生产者( producer) : 
 
队列(queue) 可以看做是邮筒的别名 ,它存在于RabbitMQ中. 虽然消息在RabbitMQ和你的应用程序中流转, 但它只能被存储在队列当中. 一个队列只受到主机的内存和磁盘的限制, 它实际上是个大的消息缓冲区. 许多生产者可以发送消息到一个队列, 许多消费者可以从队列中接收数据. 下面是队列的示意图: 
 
消费(consuming) 与接收(receiving)有相似的含义. 消费者(consumer)就是等待接收消息的程序 : 
 
要注意的是 生产者, 消费者, 和中间人不必在相同的主机上,实际上大多数情况下它们都不在同一台主机上 
(using the pika 0.10.0 Python client)

在教程的这部分里我们用Python写两个小程序; 一个 发送消息的生产者 (sender), 一个接收消息并把它打印出来的消费者consumer (receiver)

在下面的图例中, “P” 代表我们的生产者 , “C”代表我们的消费者. 中间的盒子是一个队列—由RabbitMQ 维持的消息缓冲区.

我们的整体设计大致如下图所示: 
 
生产者发送消息到名为 “hello”的 队列. 消费者从那个队列中接收消息

RabbitMQ 库

RabbitMQ遵循 AMQP 0.9.1, 这是一个开源的, 多用途(general-purpose)的消息发送协议. 
针对RabbitMQ,在不同语言中有多种客户端可用. 在本教程系列中我们将使用 Pika, 这是由RabbitMQ团队推荐的 Python客户端. 你可以使用pip安装.

发送 
 
我们的第一个程序send.py 将会发送一条消息到队列中. 我们要做的第一件事是和 RabbitMQ 服务建立连接.

#!/usr/bin/env python
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(‘localhost‘))
channel = connection.channel()
  • 1
  • 2
  • 3
  • 4
  • 5

现在我们已经建立了一个到本地机器的中间人(broker)的连接, 如果想要连接到不同的机器上的中间人,只要把‘localhost’替换成指定的名字和IP地址即可.

下一步, 在发送前我们要确保接收的队列存在. 如果我们发送消息到一个不存在的地址, RabbitMQ 会把消息丢弃掉. 我们创建一个名为‘hello’的队列 ,把消息发送到这个队列中:

channel.queue_declare(queue=‘hello‘)
  • 1

到这里我们准备好要发送消息了,第一条消息只是一个简单的字符串“hello world!”,把它发送到队列中

In RabbitMQ 一条消息从不会被直接发送到队列, 它会先经过一个交换所(exchange). 但我我们不要被细节缠住 ? 你会在教程的第三部分了解更多关于交换所的内容. 目前我们需要知道的就是如何使用有空字符串所指定的默认交换所。这个交换所允许我们准确指定消息应该前往哪个队列。 队列名由 “routing_key”参数指定:

channel.basic_publish(exchange=‘‘,
                      routing_key=‘hello‘,
                      body=‘Hello World!‘)
print(" [x] Sent ‘Hello World!‘")
  • 1
  • 2
  • 3
  • 4

退出程序前我们需要确保网络缓冲区(network buffers)被冲刷(flushed),并且我们的消息真的被发送到了RabbitMQ. 这只需要通过关闭连接来完成:

connection.close()
  • 1

接收 
 
我们的第二个程序 receive.py 将会从队列接收消息并且打印出来。

同样,我们首先要连接到RabbitMQ 服务。 连接到Rabbit的代码同前面的一样 。

下一步,同先前一样,要确保队列存在. 使用queue_declare 创建队列是一个幂等(idempotent)操作 ? 我们想运行多少次这个命令都可以, 但只有一个队列被创建.

channel.queue_declare(queue=‘hello‘)
  • 1

你可能会问为什么又一次声明队列 ? 我们在前面的代码中已经声明过一次. 如果我们确定队列存在的话的话可以避免那么做. 例如 send.py 已经运行了. 但我们不确定哪个程序先运行. 在这种情况下最好在两个程序中都声明一下,这是一个好的习惯。

列出所有队列

如果你想查看RabbitMQ 拥有哪些队列,有多少消息在其中.你可以使用 rabbitmqctl 工具:

sudo rabbitmqctl list_queues 
在 Windows中:

rabbitmqctl.bat list_queues

从队列中接收消息会稍微复杂一些. 通过给队列提供一个callback 函数来实现. 无论何时接收到消息, 这个callback 函数都会被 Pika 库调用. 在我们这里,这个函数会打印出接收到的消息.

def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
  • 1
  • 2

下一步, 我们需要告诉 RabbitMQ 这个callback函数应该从我们的 “hello”队列中接收消息:

channel.basic_consume(callback,
                      queue=‘hello‘,
                      no_ack=True)
  • 1
  • 2
  • 3

这里的“no_ack ”参数会在后面有介绍.

最后我们加一个等待接收数据并且在必要时运行回调函数的永远不会终止的循环.

print(‘ [*] Waiting for messages. To exit press CTRL+C‘)
channel.start_consuming()
  • 1
  • 2

整合 
send.py的完整代码:

#!/usr/bin/env python
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host=‘localhost‘))
channel = connection.channel()

channel.queue_declare(queue=‘hello‘)

channel.basic_publish(exchange=‘‘,
                      routing_key=‘hello‘,
                      body=‘Hello World!‘)
print(" [x] Sent ‘Hello World!‘")
connection.close()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

receive.py的完整代码:

#!/usr/bin/env python
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host=‘localhost‘))
channel = connection.channel()

channel.queue_declare(queue=‘hello‘)

def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)

channel.basic_consume(callback,
                      queue=‘hello‘,
                      no_ack=True)

print(‘ [*] Waiting for messages. To exit press CTRL+C‘)
channel.start_consuming()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

现在在终端运行我们的程序. 首先,启动一个消费者程序, 这会持续运行来等待接收消息:

python receive.py
  • 1

下面是在我的Ubuntu终端上的运行结果: 
 
现在来启动生产者. 生产者程序在运行完会退出:

python send.py
  • 1

在回头看之前打开的消费者程序终端,已经接到了消息: 

我们已经学会了如何向一个命名队列中发送和接收消息. 下一节我们来构建一个简单的工作队列(work queue)

原文地址:https://www.cnblogs.com/ExMan/p/10281517.html

时间: 2024-11-04 17:23:51

基于Python语言使用RabbitMQ消息队列(一)的相关文章

基于Python语言使用RabbitMQ消息队列(二)

工作队列 在第一节我们写了程序来向命名队列发送和接收消息 .在本节我们会创建一个工作队列(Work Queue)用来在多个工人(worker)中分发时间消耗型任务(time-consuming tasks). 工作队列(又叫做: Task Queues)背后的主体思想是 避免立刻去执行耗时任务并且等待它们完成. 相反我们可以安排这样的任务稍后执行. 我们可以把任务封装成一个消息并发送到队列中. 一个在后台运行的工人进程会接收任务并最终执行工作.当你使很多工人(workers)程序运行时,多个任务

Python并发编程-RabbitMq消息队列

消息中间件 --->就是消息队列 异步方式:不需要立马得到结果,需要排队 同步方式:需要实时获得数据,坚决不能排队 subprocess 的Q也提供不同进程之间的沟通 应用场景: 买票,抢购 堡垒机批量发送文件 Centos6.x系统编译安装RabbitMQ 一.系统环境 [[email protected] ~]# cat /etc/redhat-release CentOS release 6.6 (Final) [[email protected] ~]# uname -r 2.6.32-

RabbitMQ 消息队列 应用

安装参考    详细介绍   学习参考 RabbitMQ 消息队列 RabbitMQ是一个在AMQP基础上完整的,可复用的企业消息系统.他遵循Mozilla Public License开源协议. MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法.应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,而无需专用连接来链接它们.消息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信,直接调用通常是用于诸如远程过程调用的技术.

RabbitMQ消息队列应用

RabbitMQ消息队列应用 消息通信组件Net分布式系统的核心中间件之一,应用与系统高并发,各个组件之间解耦的依赖的场景.本框架采用消息队列中间件主要应用于两方面:一是解决部分高并发的业务处理:二是通过消息队列传输系统日志.目前业界使用较多的消息队列组件有RabbitMQ.ActiveMQ.MSMQ.kafka.zeroMQ等,本文对系统架构之MQ Component诠释,并采用RabbitMQ作为消息队列中间件. 图1- 消息队列组件示意图 一.RabbitMQ介绍 RabbitMQ是一款基

Golang调用Rabbitmq消息队列和封装

前言 介绍Rabbimq Rabbitmq消息队列是干嘛的? 简单的说,消息队列,引申一下就是传递消息用的队列,也可以称为传递消息的通信方法.用争抢订单的快车举个例子,假如,A用户发送了一个用车的消息,那么消息队列要做的就是把A用户用车的这个消息广而告之,发送到一个公用队列当中,司机只管取到消息,而不管是谁发布的,这就是一个简单的消息队列例子,Rabbitmq其实就是消息队列的一种,用的比较多的还可能有Redis,kafka,ActiceMq等等,这个后面的博文里面我会说,这次我们只说Rabbi

(转)RabbitMQ消息队列(六):使用主题进行消息分发

在上篇文章RabbitMQ消息队列(五):Routing 消息路由 中,我们实现了一个简单的日志系统.Consumer可以监听不同severity的log.但是,这也是它之所以叫做简单日志系统的原因,因为是仅仅能够通过severity设定.不支持更多的标准. 比如syslog unix的日志工具,它可以通过severity (info/warn/crit...) 和模块(auth/cron/kern...).这可能更是我们想要的:我们可以仅仅需要cron模块的log. 为了实现类似的功能,我们需

RabbitMQ消息队列(六):使用主题进行消息分发

在上篇文章RabbitMQ消息队列(五):Routing 消息路由 中,我们实现了一个简单的日志系统.Consumer可以监听不同severity的log.但是,这也是它之所以叫做简单日志系统的原因,因为是仅仅能够通过severity设定.不支持更多的标准. 比如syslog unix的日志工具,它可以通过severity (info/warn/crit...) 和模块(auth/cron/kern...).这可能更是我们想要的:我们可以仅仅需要cron模块的log. 为了实现类似的功能,我们需

(转)RabbitMQ消息队列(七):适用于云计算集群的远程调用(RPC)

在云计算环境中,很多时候需要用它其他机器的计算资源,我们有可能会在接收到Message进行处理时,会把一部分计算任务分配到其他节点来完成.那么,RabbitMQ如何使用RPC呢?在本篇文章中,我们将会通过其它节点求来斐波纳契完成示例. 1. 客户端接口 Client interface 为了展示一个RPC服务是如何使用的,我们将创建一段很简单的客户端class. 它将会向外提供名字为call的函数,这个call会发送RPC请求并且阻塞知道收到RPC运算的结果.代码如下: [python] vie

RabbitMQ消息队列1: Detailed Introduction 详细介绍

1. 历史 RabbitMQ是一个由erlang开发的AMQP(Advanced Message Queue )的开源实现.AMQP 的出现其实也是应了广大人民群众的需求,虽然在同步消息通讯的世界里有很多公开标准(如 COBAR的 IIOP ,或者是 SOAP 等),但是在异步消息处理中却不是这样,只有大企业有一些商业实现(如微软的 MSMQ ,IBM 的 Websphere MQ 等),因此,在 2006 年的 6 月,Cisco .Redhat.iMatix 等联合制定了 AMQP 的公开标