安装pika模块
linux下安装
pip3.5 install pika
一个简单的消息队列例子
发送端
import pika connection = pika.BlockingConnection(pika.ConnectionParameters("127.0.0.1")) channel = connection.channel() # 声明队列queue channel.queue_declare(queue="test") # RabbitMQ消息不能直接发送到队列,它总是需要经历一个交换 channel.basic_publish(exchange="", routing_key="test", body="Hello World!") print("[x] sent ‘Hello World!‘") connection.close()
接收端
import pika connection = pika.BlockingConnection(pika.ConnectionParameters("127.0.0.1")) channel = connection.channel() # 声明队列queue,receive声明队列的原因是不一定是发送端先启动,假如receive端先启动要先声明队列 channel.queue_declare(queue="test") def callback(ch, method, properties, body): print("[x] received %r" % body) channel.basic_consume(callback, queue="test", no_ack=True) print("[*] waiting for messages, to exit press CTRL+C") channel.start_consuming()
这种方式,RabbitMQ会将消息依次发送给接收者,跟负载均衡差不多
上面那种情况是接收端没有回应的,如果没有回应,接收端只要从队列中取走消息,队列中就已经没有这个数据了,有时为了避免这种请求,要求接收端必须接收消息并执行后,可以让接收端发送一个回应,然后RabbitMQ再将这条消息删除
发送端没有更改
接收端
import pika import time connection = pika.BlockingConnection(pika.ConnectionParameters("127.0.0.1")) channel = connection.channel() # 声明队列queue,receive声明队列的原因是不一定是发送端先启动,假如receive端先启动要先声明队列 channel.queue_declare(queue="test") def callback(ch, method, properties, body): print("[x] received %r" % body) time.sleep(30) ch.basic_ack(delivery_tag=method.delivery_tag) # channel.basic_consume(callback, queue="test", no_ack=True) channel.basic_consume(callback, queue="test") print("[*] waiting for messages, to exit press CTRL+C") channel.start_consuming()
RabbitMQ持久化,只修改发送端就可以
发送端
import pika connection = pika.BlockingConnection(pika.ConnectionParameters("127.0.0.1")) channel = connection.channel() # 声明队列queue # channel.queue_declare(queue="test") channel.queue_declare(queue="test", durable=True) # RabbitMQ消息不能直接发送到队列,它总是需要经历一个交换 # channel.basic_publish(exchange="", # routing_key="test", # body="Hello World!") channel.basic_publish(exchange="", routing_key="test", body="Hello World!", properties=pika.BasicProperties(delivery_mode=2)) print("[x] sent ‘Hello World!‘") connection.close()
公平分发
如果Rabbit只管按顺序把消息发到各个消费者身上,不考虑消费者负载的话,很可能出现,一个机器配置不高的消费者那里堆积了很多消息处理不完,同时配置高的消费者却一直很轻松。为解决此问题,可以在各个消费者端,配置perfetch=1,意思就是告诉RabbitMQ在我这个消费者当前消息还没处理完的时候就不要再给我发新消息了。
这里只需要修改接收端
import pika import time connection = pika.BlockingConnection(pika.ConnectionParameters("127.0.0.1")) channel = connection.channel() # 声明队列queue,receive声明队列的原因是不一定是发送端先启动,假如receive端先启动要先声明队列 channel.queue_declare(queue="test") def callback(ch, method, properties, body): print("ch", ch) print("method", method) print("properties", properties) print("[x] received %r" % body) time.sleep(30) ch.basic_ack(delivery_tag=method.delivery_tag) channel.basic_qos(prefetch_count=1) channel.basic_consume(callback, queue="test") print("[*] waiting for messages, to exit press CTRL+C") channel.start_consuming()
消息发布和订阅
之前的例子都基本都是1对1的消息发送和接收,即消息只能发送到指定的queue里,但有些时候你想让你的消息被所有的Queue收到,类似广播的效果,这时候就要用到exchange了
Exchange在定义的时候是有类型的,以决定到底是哪些Queue符合条件,可以接收消息
fanout: 所有bind到此exchange的queue都可以接收消息
direct: 通过routingKey和exchange决定的那个唯一的queue可以接收消息
topic:所有符合routingKey(此时可以是一个表达式)的routingKey所bind的queue可以接收消息
表达式符号说明:#代表一个或多个字符,*代表任何字符
例:#.a会匹配a.a,aa.a,aaa.a等
*.a会匹配a.a,b.a,c.a等
注:使用RoutingKey为#,Exchange Type为topic的时候相当于使用fanout
headers: 通过headers 来决定把消息发给哪些queue
fanout模式
发送端
import pika connection = pika.BlockingConnection(pika.ConnectionParameters("127.0.0.1")) channel = connection.channel() channel.exchange_declare(exchange="test", type="fanout") message = "Hello World!" channel.basic_publish(exchange="test", routing_key="", body=message) print("[x] sent %r" % message) connection.close()
接收端
import pika connection = pika.BlockingConnection(pika.ConnectionParameters("127.0.0.1")) channel = connection.channel() channel.exchange_declare(exchange="test", type="fanout") result = channel.queue_declare(exclusive=True) queue_name = result.method.queue channel.queue_bind(exchange="test", queue=queue_name) print("[*] waiting for messages, to exit press CTRL+C") def callback(ch, method, properties, body): print("[x] received %r" % body) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()
direct模式
发送端
import pika connection = pika.BlockingConnection(pika.ConnectionParameters("127.0.0.1")) channel = connection.channel() channel.exchange_declare(exchange="test", type="direct") message = "Hello World!" severity = "test123" channel.basic_publish(exchange="test", routing_key="test123", body=message) print("[x] sent %r:%r" % (severity, message)) connection.close()
接收端
import pika connection = pika.BlockingConnection(pika.ConnectionParameters("127.0.0.1")) channel = connection.channel() channel.exchange_declare(exchange="test", type="direct") result = channel.queue_declare(exclusive=True) queue_name = result.method.queue severity = "test123" channel.queue_bind(exchange="test", queue=queue_name, routing_key=severity) print("[*] waiting for messages, to exit press CTRL+C") def callback(ch, method, properties, body): print("[x] received %r:%r" % (method.routing_key, body)) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()
topic模式
发送端
import pika connection = pika.BlockingConnection(pika.ConnectionParameters("127.0.0.1")) channel = connection.channel() channel.exchange_declare(exchange="test", type="topic") message = "Hello World!" routing_key = "Hello" channel.basic_publish(exchange="test", routing_key=routing_key, body=message) print("[x] sent %r:%r" % (routing_key, message)) connection.close()
接收端
import pika connection = pika.BlockingConnection(pika.ConnectionParameters("127.0.0.1")) channel = connection.channel() channel.exchange_declare(exchange="test", type="topic") result = channel.queue_declare(exclusive=True) queue_name = result.method.queue routing_key = "Hello" channel.queue_bind(exchange="test", queue=queue_name, routing_key=routing_key) print("[*] waiting for messages, to exit press CTRL+C") def callback(ch, method, properties, body): print("[x] received %r:%r" % (method.routing_key, body)) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()
rpc
发送端
import pika import uuid class FibonacciRpcClient(object): def __init__(self): self.connection = pika.BlockingConnection(pika.ConnectionParameters( host=‘localhost‘)) self.channel = self.connection.channel() result = self.channel.queue_declare(exclusive=True) self.callback_queue = result.method.queue self.channel.basic_consume(self.on_response, no_ack=True, queue=self.callback_queue) def on_response(self, ch, method, props, body): if self.corr_id == props.correlation_id: self.response = body def call(self, n): self.response = None self.corr_id = str(uuid.uuid4()) self.channel.basic_publish(exchange=‘‘, routing_key=‘rpc_queue‘, properties=pika.BasicProperties( reply_to = self.callback_queue, correlation_id = self.corr_id, ), body=str(n)) while self.response is None: self.connection.process_data_events() return int(self.response) fibonacci_rpc = FibonacciRpcClient() print(" [x] Requesting fib(30)") response = fibonacci_rpc.call(30) print(" [.] Got %r" % response)
接收端
import pika import time connection = pika.BlockingConnection(pika.ConnectionParameters( host=‘localhost‘)) channel = connection.channel() channel.queue_declare(queue=‘rpc_queue‘) def fib(n): if n == 0: return 0 elif n == 1: return 1 else: return fib(n-1) + fib(n-2) def on_request(ch, method, props, body): n = int(body) print(" [.] fib(%s)" % n) response = fib(n) ch.basic_publish(exchange=‘‘, routing_key=props.reply_to, properties=pika.BasicProperties(correlation_id = props.correlation_id), body=str(response)) ch.basic_ack(delivery_tag = method.delivery_tag) channel.basic_qos(prefetch_count=1) channel.basic_consume(on_request, queue=‘rpc_queue‘) print(" [x] Awaiting RPC requests") channel.start_consuming()