Python之路48-RabbitMQ

安装pika模块

linux下安装

pip3.5 install pika

一个简单的消息队列例子

发送端

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters("127.0.0.1"))
channel = connection.channel()

# 声明队列queue
channel.queue_declare(queue="test")

# RabbitMQ消息不能直接发送到队列,它总是需要经历一个交换
channel.basic_publish(exchange="", routing_key="test", body="Hello World!")
print("[x] sent ‘Hello World!‘")
connection.close()

接收端

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters("127.0.0.1"))
channel = connection.channel()

# 声明队列queue,receive声明队列的原因是不一定是发送端先启动,假如receive端先启动要先声明队列
channel.queue_declare(queue="test")

def callback(ch, method, properties, body):
    print("[x] received %r" % body)

channel.basic_consume(callback, queue="test", no_ack=True)

print("[*] waiting for messages, to exit press CTRL+C")
channel.start_consuming()

这种方式,RabbitMQ会将消息依次发送给接收者,跟负载均衡差不多

上面那种情况是接收端没有回应的,如果没有回应,接收端只要从队列中取走消息,队列中就已经没有这个数据了,有时为了避免这种请求,要求接收端必须接收消息并执行后,可以让接收端发送一个回应,然后RabbitMQ再将这条消息删除

发送端没有更改

接收端

import pika
import time

connection = pika.BlockingConnection(pika.ConnectionParameters("127.0.0.1"))
channel = connection.channel()

# 声明队列queue,receive声明队列的原因是不一定是发送端先启动,假如receive端先启动要先声明队列
channel.queue_declare(queue="test")

def callback(ch, method, properties, body):
    print("[x] received %r" % body)
    time.sleep(30)
    ch.basic_ack(delivery_tag=method.delivery_tag)

# channel.basic_consume(callback, queue="test", no_ack=True)
channel.basic_consume(callback, queue="test")

print("[*] waiting for messages, to exit press CTRL+C")
channel.start_consuming()

RabbitMQ持久化,只修改发送端就可以

发送端

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters("127.0.0.1"))
channel = connection.channel()

# 声明队列queue
# channel.queue_declare(queue="test")
channel.queue_declare(queue="test", durable=True)

# RabbitMQ消息不能直接发送到队列,它总是需要经历一个交换
# channel.basic_publish(exchange="",
#                       routing_key="test",
#                       body="Hello World!")
channel.basic_publish(exchange="",
                      routing_key="test",
                      body="Hello World!",
                      properties=pika.BasicProperties(delivery_mode=2))
print("[x] sent ‘Hello World!‘")
connection.close()

公平分发

如果Rabbit只管按顺序把消息发到各个消费者身上,不考虑消费者负载的话,很可能出现,一个机器配置不高的消费者那里堆积了很多消息处理不完,同时配置高的消费者却一直很轻松。为解决此问题,可以在各个消费者端,配置perfetch=1,意思就是告诉RabbitMQ在我这个消费者当前消息还没处理完的时候就不要再给我发新消息了。

这里只需要修改接收端

import pika
import time

connection = pika.BlockingConnection(pika.ConnectionParameters("127.0.0.1"))
channel = connection.channel()

# 声明队列queue,receive声明队列的原因是不一定是发送端先启动,假如receive端先启动要先声明队列
channel.queue_declare(queue="test")

def callback(ch, method, properties, body):
    print("ch", ch)
    print("method", method)
    print("properties", properties)
    print("[x] received %r" % body)
    time.sleep(30)
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback, queue="test")

print("[*] waiting for messages, to exit press CTRL+C")
channel.start_consuming()

消息发布和订阅

之前的例子都基本都是1对1的消息发送和接收,即消息只能发送到指定的queue里,但有些时候你想让你的消息被所有的Queue收到,类似广播的效果,这时候就要用到exchange了

Exchange在定义的时候是有类型的,以决定到底是哪些Queue符合条件,可以接收消息

fanout: 所有bind到此exchange的queue都可以接收消息

direct: 通过routingKey和exchange决定的那个唯一的queue可以接收消息

topic:所有符合routingKey(此时可以是一个表达式)的routingKey所bind的queue可以接收消息

   表达式符号说明:#代表一个或多个字符,*代表任何字符

例:#.a会匹配a.a,aa.a,aaa.a等

*.a会匹配a.a,b.a,c.a等

注:使用RoutingKey为#,Exchange Type为topic的时候相当于使用fanout 

headers: 通过headers 来决定把消息发给哪些queue

fanout模式

发送端

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters("127.0.0.1"))
channel = connection.channel()

channel.exchange_declare(exchange="test", type="fanout")

message = "Hello World!"

channel.basic_publish(exchange="test",
                      routing_key="",
                      body=message)
print("[x] sent %r" % message)
connection.close()

接收端

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters("127.0.0.1"))
channel = connection.channel()

channel.exchange_declare(exchange="test", type="fanout")
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
channel.queue_bind(exchange="test", queue=queue_name)

print("[*] waiting for messages, to exit press CTRL+C")

def callback(ch, method, properties, body):
    print("[x] received %r" % body)

channel.basic_consume(callback, queue=queue_name, no_ack=True)

channel.start_consuming()

direct模式

发送端

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters("127.0.0.1"))
channel = connection.channel()

channel.exchange_declare(exchange="test", type="direct")

message = "Hello World!"
severity = "test123"
channel.basic_publish(exchange="test",
                      routing_key="test123",
                      body=message)
print("[x] sent %r:%r" % (severity, message))
connection.close()

接收端

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters("127.0.0.1"))
channel = connection.channel()

channel.exchange_declare(exchange="test", type="direct")
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
severity = "test123"
channel.queue_bind(exchange="test", queue=queue_name, routing_key=severity)

print("[*] waiting for messages, to exit press CTRL+C")

def callback(ch, method, properties, body):
    print("[x] received %r:%r" % (method.routing_key, body))

channel.basic_consume(callback, queue=queue_name, no_ack=True)

channel.start_consuming()

topic模式

发送端

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters("127.0.0.1"))
channel = connection.channel()

channel.exchange_declare(exchange="test", type="topic")

message = "Hello World!"
routing_key = "Hello"
channel.basic_publish(exchange="test",
                      routing_key=routing_key,
                      body=message)
print("[x] sent %r:%r" % (routing_key, message))
connection.close()

接收端

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters("127.0.0.1"))
channel = connection.channel()

channel.exchange_declare(exchange="test", type="topic")
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
routing_key = "Hello"
channel.queue_bind(exchange="test", queue=queue_name, routing_key=routing_key)

print("[*] waiting for messages, to exit press CTRL+C")

def callback(ch, method, properties, body):
    print("[x] received %r:%r" % (method.routing_key, body))

channel.basic_consume(callback, queue=queue_name, no_ack=True)

channel.start_consuming()

rpc

发送端

import pika
import uuid
 
class FibonacciRpcClient(object):
    def __init__(self):
        self.connection = pika.BlockingConnection(pika.ConnectionParameters(
                host=‘localhost‘))
 
        self.channel = self.connection.channel()
 
        result = self.channel.queue_declare(exclusive=True)
        self.callback_queue = result.method.queue
 
        self.channel.basic_consume(self.on_response, no_ack=True,
                                   queue=self.callback_queue)
 
    def on_response(self, ch, method, props, body):
        if self.corr_id == props.correlation_id:
            self.response = body
 
    def call(self, n):
        self.response = None
        self.corr_id = str(uuid.uuid4())
        self.channel.basic_publish(exchange=‘‘,
                                   routing_key=‘rpc_queue‘,
                                   properties=pika.BasicProperties(
                                         reply_to = self.callback_queue,
                                         correlation_id = self.corr_id,
                                         ),
                                   body=str(n))
        while self.response is None:
            self.connection.process_data_events()
        return int(self.response)
 
fibonacci_rpc = FibonacciRpcClient()
 
print(" [x] Requesting fib(30)")
response = fibonacci_rpc.call(30)
print(" [.] Got %r" % response)

接收端

import pika
import time
connection = pika.BlockingConnection(pika.ConnectionParameters(
        host=‘localhost‘))
 
channel = connection.channel()
 
channel.queue_declare(queue=‘rpc_queue‘)
 
def fib(n):
    if n == 0:
        return 0
    elif n == 1:
        return 1
    else:
        return fib(n-1) + fib(n-2)
 
def on_request(ch, method, props, body):
    n = int(body)
 
    print(" [.] fib(%s)" % n)
    response = fib(n)
 
    ch.basic_publish(exchange=‘‘,
                     routing_key=props.reply_to,
                     properties=pika.BasicProperties(correlation_id =                                                          props.correlation_id),
                     body=str(response))
    ch.basic_ack(delivery_tag = method.delivery_tag)
 
channel.basic_qos(prefetch_count=1)
channel.basic_consume(on_request, queue=‘rpc_queue‘)
 
print(" [x] Awaiting RPC requests")
channel.start_consuming()
时间: 2024-10-15 20:14:27

Python之路48-RabbitMQ的相关文章

python 之路12 RabbitMQ Python 操作mysql

1. RabbitMQ简介 rabbitmq服务类似于mysql.apache服务,只是提供的功能不一样.rabbimq是用来提供发送消息的服务,可以用在不同的应用程序之间进行通信. 2.安装RabbitMQ Ubuntu 14.04 sudo apt-get install rabbitmq-server 安装好后,rabbitmq服务就已经启动好了.接下来看下python编写Hello World!的实例.实例的内容就是从send.py发送“Hello World!”到rabbitmq,re

【python之路48】生成器表达式、推导式

一.生成器表达式 1. 生成器表达式为: (结果 for 变量 in 可迭代的对象 if 条件)-->if条件可以省略 ge = (i for i in range(1, 11)) print(ge) # <generator object <genexpr> at 0x0000023EA176B5C8> for elment in ge: print(elment) # 结果从1打印到10 # # 实际相当于 # def ge(): # for i in range(1,

Python之路【第九篇】:Python操作 RabbitMQ、Redis、Memcache、SQLAlchemy

Python之路[第九篇]:Python操作 RabbitMQ.Redis.Memcache.SQLAlchemy Memcached Memcached 是一个高性能的分布式内存对象缓存系统,用于动态Web应用以减轻数据库负载.它通过在内存中缓存数据和对象来减少读取数据库的次数,从而提高动态.数据库驱动网站的速度.Memcached基于一个存储键/值对的hashmap.其守护进程(daemon )是用C写的,但是客户端可以用任何语言来编写,并通过memcached协议与守护进程通信. Memc

Python之路【第十七篇】:Django【进阶篇 】

Python之路[第十七篇]:Django[进阶篇 ] Model 到目前为止,当我们的程序涉及到数据库相关操作时,我们一般都会这么搞: 创建数据库,设计表结构和字段 使用 MySQLdb 来连接数据库,并编写数据访问层代码 业务逻辑层去调用数据访问层执行数据库操作 import MySQLdb def GetList(sql): db = MySQLdb.connect(user='root', db='wupeiqidb', passwd='1234', host='localhost')

Python之路【第三篇】:Python基础(二)

Python之路[第三篇]:Python基础(二) 内置函数 一 详细见python文档,猛击这里 文件操作 操作文件时,一般需要经历如下步骤: 打开文件 操作文件 一.打开文件 1 文件句柄 = file('文件路径', '模式') 注:python中打开文件有两种方式,即:open(...) 和  file(...) ,本质上前者在内部会调用后者来进行文件操作,推荐使用 open. 打开文件时,需要指定文件路径和以何等方式打开文件,打开后,即可获取该文件句柄,日后通过此文件句柄对该文件操作.

Python之路【第十九篇】:爬虫

Python之路[第十九篇]:爬虫 网络爬虫(又被称为网页蜘蛛,网络机器人,在FOAF社区中间,更经常的称为网页追逐者),是一种按照一定的规则,自动地抓取万维网信息的程序或者脚本.另外一些不常使用的名字还有蚂蚁.自动索引.模拟程序或者蠕虫. Requests Python标准库中提供了:urllib.urllib2.httplib等模块以供Http请求,但是,它的 API 太渣了.它是为另一个时代.另一个互联网所创建的.它需要巨量的工作,甚至包括各种方法覆盖,来完成最简单的任务. import

Python之路【第十五篇】:Web框架

Python之路[第十五篇]:Web框架 Web框架本质 众所周知,对于所有的Web应用,本质上其实就是一个socket服务端,用户的浏览器其实就是一个socket客户端. 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 #!/usr/bin/env python #coding:utf-8   import socket   def handle_request(client):     buf = client.recv(10

Python之路【第十七篇】:Django之【进阶篇】

Python之路[第十七篇]:Django[进阶篇 ] Model 到目前为止,当我们的程序涉及到数据库相关操作时,我们一般都会这么搞: 创建数据库,设计表结构和字段 使用 MySQLdb 来连接数据库,并编写数据访问层代码 业务逻辑层去调用数据访问层执行数据库操作 import MySQLdb def GetList(sql): db = MySQLdb.connect(user='root', db='wupeiqidb', passwd='1234', host='localhost')

Python之路【第十八篇】:Web框架们

Python之路[第十八篇]:Web框架们 Python的WEB框架 Bottle Bottle是一个快速.简洁.轻量级的基于WSIG的微型Web框架,此框架只由一个 .py 文件,除了Python的标准库外,其不依赖任何其他模块. 1 2 3 4 pip install bottle easy_install bottle apt-get install python-bottle wget http://bottlepy.org/bottle.py Bottle框架大致可以分为以下部分: 路

Python之路【第十七篇】:Django【进阶篇】

Python之路[第十七篇]:Django[进阶篇 ] Model 到目前为止,当我们的程序涉及到数据库相关操作时,我们一般都会这么搞: 创建数据库,设计表结构和字段 使用 MySQLdb 来连接数据库,并编写数据访问层代码 业务逻辑层去调用数据访问层执行数据库操作 import MySQLdb def GetList(sql): db = MySQLdb.connect(user='root', db='wupeiqidb', passwd='1234', host='localhost')