RabbitMQ 学习

一、概要

官网: http://www.rabbitmq.com/

一个不错的入门教程: http://blog.csdn.net/linvo/article/details/5750987  写的挺好的,只是刚开始看可能不太懂,模模糊糊,多看几遍,试着写点代码之后,再看。就比较清晰了。

官方文档使用了 using the pika 0.9.8 Python client 。本文使用      http://github.com/celery/py-amqp   amqp 1.4.6

至于安装,自己找下教程吧。不难,先安装 Erlang,再安装RabbitMQ。然后配置一下,有个web控制台。之后就是python编程使用了。



MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法。RabbitMQ是流行的开源消息队列系统,用erlang语言开发。RabbitMQ是AMQP(高级消息队列协议)的标准实现。

Broker:简单来说就是消息队列服务器实体。
  Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。
  Queue:消息队列载体,每个消息都会被投入到一个或多个队列。
  Binding:绑定,它的作用就是把exchange和queue按照路由规则绑定起来。
  Routing Key:路由关键字,exchange根据这个关键字进行消息投递。
  vhost:虚拟主机,一个broker里可以开设多个vhost,用作不同用户的权限分离。
  producer:消息生产者,就是投递消息的程序。
  consumer:消息消费者,就是接受消息的程序。
  channel:消息通道,在客户端的每个连接里,可建立多个channel,每个channel代表一个会话任务。

消息队列的使用过程大概如下:

(1)客户端连接到消息队列服务器,打开一个channel。
  (2)客户端声明一个exchange,并设置相关属性。
  (3)客户端声明一个queue,并设置相关属性。
  (4)客户端使用routing key,在exchange和queue之间建立好绑定关系。
  (5)客户端投递消息到exchange。

exchange接收到消息后,就根据消息的key和已经设置的binding,进行消息路由,将消息投递到一个或多个队列里。

exchange也有几个类型,完全根据key进行投递的叫做Direct交换机,例如,绑定时设置了 routing key为”abc”,那么客户端提交的消息,只有设置了key为”abc”的才会投递到队列。对key进行模式匹配后进行投递的叫做Topic交换机,符 号”#”匹配一个或多个词,符号”*”匹配正好一个词。例如”abc.#”匹配”abc.def.ghi”,”abc.*”只匹配”abc.def”。还 有一种不需要key的,叫做Fanout交换机,它采取广播模式,一个消息进来时,投递到与该交换机绑定的所有队列。

RabbitMQ支持消息的持久化,也就是数据写在磁盘上,为了数据安全考虑,我想大多数用户都会选择持久化。消息队列持久化包括3个部分:
  (1)exchange持久化,在声明时指定durable => 1
  (2)queue持久化,在声明时指定durable => 1
  (3)消息持久化,在投递时指定delivery_mode => 2(1是非持久化)

如果exchange和queue都是持久化的,那么它们之间的binding也是持久化的。如果exchange和queue两者之间有一个持久化,一个非持久化,就不允许建立绑定。



二、基本使用

入门教程看会之后,就差不多了。

下面示例代码:

consumer 消费者

# amqp_consumer.py

# -*- coding: utf-8 -*-

__author__ = ‘lpe234‘
__date__ = ‘2014-12-15‘

import amqp

conn = amqp.Connection(host="localhost:5672", userid="guest", password="guest", virtual_host="/", insist=False)
chan = conn.channel()

chan.queue_declare(queue="po_box", durable=True, exclusive=False, auto_delete=False)
chan.exchange_declare(exchange="sorting_room", type="direct", durable=True, auto_delete=False, )

chan.queue_bind(queue="po_box", exchange="sorting_room", routing_key="1111")

def receive_callback(msg):
    print ‘Received: ‘ + msg.body + ‘ from channel #‘ + str(msg.channel.channel_id)

chan.basic_consume(queue=‘po_box‘, no_ack=True, callback=receive_callback, consumer_tag="consumer")
while True:
    chan.wait()
chan.basic_cancel("consumer")

chan.close()
conn.close()

producer 生产者

# amqp_publisher.py

# -*- coding: utf-8 -*-

__author__ = ‘lpe234‘
__date__ = ‘2014-12-15‘

import amqp
import json

conn = amqp.Connection(host="localhost:5672", userid="guest", password="guest", virtual_host="/", insist=False)
chan = conn.channel()

for x in xrange(10):
    msg = json.dumps({‘id‘: str(x)+‘111‘, ‘lists‘: [{‘id‘: 12345}, {‘id‘: 12345}, {‘id‘: 15656}, {‘id‘: ‘4545‘}, ]})
    print msg
    msg = amqp.Message(msg)
    msg.properties["delivery_mode"] = 2
    chan.basic_publish(msg, exchange="sorting_room", routing_key="1111")

chan.close()
conn.close()

代码基本都是在 csdn 那个博客里面弄下来的。稍微的修改了以下。

启动时,先运行 consumer 消费者进程,它会先连接, 并创建 Queue和 Exchange ,然后一直等待队列中的消息。

然后,启动 publisher ,它会先连接,然后向指定 Exchange 交换机推送带有特定 routing_key 路由键的消息。

如果消费者对应的 Queue 队列与 Exchange 交换机 的 routing_key 路由键 相对应的话。那么消费者就会接收到相应消息。至此,整个传递过程结束。



三、补充

注释代码

# -*- coding: utf-8 -*-

__author__ = ‘lpe234‘
__date__ = ‘2014-12-15‘

import amqp

"""
amqp rabbitmq DEMO测试
先启动 amqp_consumer.py 消费者,创建
"""

conn = amqp.Connection(host=‘localhost:5672‘, userid=‘guest‘, password=‘guest‘, virtual_host=‘/‘, insist=False)
# 每个channel都被分配了一个整数标识,自动由Connection()类的.channel()方法维护。可以使用.channel(x)来指定channel标识。
chan = conn.channel(channel_id=1)
# 当多个 channel_id 相同时,实际为同一 channel
# 现在已经有了一个可用的连接和channel。
# 现在将代码分为两类,生产者(producer)和消费者(consumer)。

# 创建一个消费者程序,会创建一个"po_box"的队列和一个叫"sorting_room"的交换机。
chan.queue_declare(queue=‘po_box‘, durable=True, exclusive=False, auto_delete=False)
chan.exchange_declare(exchange=‘sorting_room‘, type=‘direct‘, durable=True, auto_delete=False)
# 创建了"po_box" 的队列,durable重启之后会重新建立,auto_delete=False最后一个消费者断开之后不会自动删除,exclusive私有队列
# 创建了"sorting_room"的交换机,type指定交换机类型,
# 现在已经有了一个可以接收消息的队列和一个可以发送消息的交换机。不过还需要创建一个绑定

chan.queue_bind(queue=‘po_box‘, exchange=‘sorting_room‘, routing_key=‘jason‘)
# 这个绑定非常直接,任何送到交换机"sorting_room"的具有路由键"jason"的消息都被路由到"po_box" 队列

# 现在有两个方法,从队列中取出消息。
# 第一个是调用 chan.basic_get(), 主动从队列中拉出下一条消息(若没有则返回 None)
# msg = chan.basic_get(queue=‘po_box‘)
# if msg:
#     print msg.body
#     chan.basic_ack(msg.delivery_tag)

# 第二种
def receive_callback(msg):
    print msg.body

chan.basic_consume(queue=‘po_box‘, no_ack=True, callback=receive_callback, consumer_tag=‘testtag‘)
while True:
    chan.wait()
chan.basic_cancel(‘testtag‘)
# chan.wait() 放在无限循环里面,这个函数会等待在队列上,知道下一个消息到达队列。
# chan.basic_cancel() 用来注销该回调函数
# no_ack 这个参数,可以传给 chan.basic_get(), chan.basic_consume。是否等待回馈,

其他的后续再补充吧

时间: 2024-10-11 07:27:30

RabbitMQ 学习的相关文章

RabbitMQ学习及实践2---介绍及简单Java实现

一,基本概念 MQ是消费-生产者模型的一个典型的代表,一端往消息队列中不断写入消息,而另一端则可以读取或者订阅队列中的消息.MQ和JMS类似,但不同的是JMS是SUN JAVA消息中间件服务的一个标准和API定义,而MQ则是遵循了AMQP协议的具体实现和产品. RabbitMQ是一个在AMQP基础上完整的,可复用的企业消息系统.他遵循Mozilla Public License开源协议. RabbitMQ是流行的开源消息队列系统,用erlang语言开发.RabbitMQ是AMQP(高级消息队列协

RabbitMQ学习及实践3--SpringMVC实现

根据学习的RabbitMQ知识配了一个SpringMVC的实现.这是一个完整的工程,view的部分使用freeMarker,持久化操作是通过mybatis实现. 整个工程的目录结构如下: src下的相关包的解释: controller:控制器: domain:对应数据库的操作对象类: persistence:mybatis的持久化操作: util:相关工具类: msg.bean:消息类,定义及描述消息体: msg.convert:实现如何转化消息体为可接受的消息类: msg.process:处理

rabbitmq 学习-9- RpcClient发送消息和同步接收消息原理

rabbitmq 学习-9- RpcClient发送消息和同步接收消息原理

RabbitMQ学习系列(四): 几种Exchange 模式

上一篇,讲了RabbitMQ的具体用法,可以看看这篇文章:RabbitMQ学习系列(三): C# 如何使用 RabbitMQ.今天说些理论的东西,Exchange 的几种模式. AMQP协议中的核心思想就是生产者和消费者隔离,生产者从不直接将消息发送给队列.生产者通常不知道是否一个消息会被发送到队列中,只是将消息发送到一个交换机.先由Exchange来接收,然后Exchange按照特定的策略转发到Queue进行存储.同理,消费者也是如此.Exchange 就类似于一个交换机,转发各个消息分发到相

RabbitMQ学习之:(六)Direct Exchange (转贴+我的评论)

From: http://lostechies.com/derekgreer/2012/04/02/rabbitmq-for-windows-direct-exchanges/ RabbitMQ for Windows: Direct Exchanges Posted by Derek Greer on April 2, 2012 This is the fifth installment to the series: RabbitMQ for Windows.  In thelast inst

RabbitMQ学习笔记五:RabbitMQ之优先级消息队列

RabbitMQ优先级队列注意点: 1.只有当消费者不足,不能及时进行消费的情况下,优先级队列才会生效 2.RabbitMQ3.5以后才支持优先级队列 代码在博客:RabbitMQ学习笔记三:Java实现RabbitMQ之与Spring集成 最后面有下载地址,只是做了少许改变,改变的代码如下: 消费者 spring-config.xml(还需要增加一个QueueListener监听器,代码就不复制到这里了,可以参考项目中的其他监听器) <!-- =========================

RabbitMQ学习系列(三): C# 如何使用 RabbitMQ

上一篇已经讲了Rabbitmq如何在Windows平台安装,还不了解如何安装的朋友,请看我前面几篇文章:RabbitMQ学习系列一:windows下安装RabbitMQ服务 , 今天就来聊聊 C# 实际开发的过程中,怎么调用 用RabbitMQ. 一.客户端 RabbitMQ 有很多客户端API,都非常的好用.我们在一边,一直用的都是 EasyNetQ,所以这里的 demo 只介绍 EasyNetQ 客户端实现.其他的客户端,大家自己去研究吧. EasyNetQ 是一个易于使用的RabbitMQ

RabbitMQ学习和使用

RabbitMQ学习和使用 RabbitMQ介绍 MQ全称Message Queue 消息队列,RabbitMQ是基于AMQP(高级消息队列协议)实现的.消息队列通常用以应用之间相互通信,解决同步问题.MQ是典型的生产者消费者模型,RabbitMQ最常用的三种模式是点对点模式.发布订阅模式.广播模式. RabbitMQ is a message-queueing software called a message broker or queue manager. Simply said; It

RabbitMQ学习(三)订阅/发布

RabbitMQ学习(三)订阅/发布 1.RabbitMQ模型 前面所学都只用到了生产者.队列.消费者.如上图所示,其实生产者并不直接将信息传输到队列中,在生产者和队列中间有一个交换机(Exchange),我们之前没有使用到交换机是应为我们没有配置交换机,使用了默认的交换机. 有几个可供选择的交换机类型:直连交换机(direct), 主题交换机(topic), (头交换机)headers和 扇型交换机(fanout) 这里我们使用扇形交换机做一个简单的广播模型:一个生产者和多个消费者接受相同消息

RabbitMQ学习之旅(一)

RabbitMQ学习总结(一) RabbitMQ简介 RabbitMQ是一个消息代理,其接收并转发消息.类似于现实生活中的邮局:你把信件投入邮箱的过程,相当于往队列中添加信息,因为所有邮箱中的信件最终都会汇集到邮局中:当邮递员把你的新建发送给收件人的时候,相当于消息的转发. RabbitMQ中的常见术语 生产者(Provider):生产者负责生产消息,并将其发送到消息队列中 队列(Queue):消息代理(Proxy)角色,从生产者那里接收消息,并将其转发到消费者进行消费.队列主要受限于主机的内存