在下图中的x便是exchange
P是生产者,红色为queue
X可以将P的task进行过滤,从而决定将task做如何处理:例如:
(1),舍弃任务
(2),将任务发送到某个task
(3),将任务发送到所有task
exchange有4中类型:direct, topic, headers and fanout
这次主要使用fanout
emit_log.py:task将被发送到exchange
# -*- coding: UTF-8 -*- import pika if __name__ == ‘__main__‘: connection = pika.BlockingConnection(pika.ConnectionParameters("localhost")) channel = connection.channel() channel.exchange_declare(exchange="logs",type="fanout") message = "You are awsome!" for i in range(0, 100): # 循环100次发送消息 channel.basic_publish(exchange="logs", routing_key=‘‘, body=message + " " + str(i),) print "sending ", message
receive_log.py
# -*- coding: UTF-8 -*- import pika __author__ = ‘Yue‘ def callback(ch, method, properties, body): print body if __name__ == ‘__main__‘: connection=pika.BlockingConnection(pika.ConnectionParameters("localhost")) channel=connection.channel() channel.exchange_declare(exchange="logs",type="fanout") #随机生成Queue result=channel.queue_declare(exclusive=True) #获取queue的name queue_name=result.method.queue print "queue_name",queue_name channel.queue_bind(exchange="logs",queue=queue_name) channel.basic_consume(callback,queue=queue_name,no_ack=True) channel.start_consuming()
时间: 2024-10-08 22:45:30