(使用python客户端pika 0.9.8)
在前面的教程中我们构建了一个简单的日志系统。我们可以给许多接收者广播日志消息。
在这个教程中我们将添加一个特性给它-我们将订阅仅仅一种消息子集成为可能。例如,我们可以指挥仅仅错误消息到日志文件(保存到磁盘空间),它任然可以在控制台打印所有的日志消息。
绑定
在前面的例子中我们已经创建了绑定,你可以重新调用像这样的代码:
channel.queue_bind(exchange=exchange_name, queue=queue_name)
绑定是exchange和队列之间的一种关系。这可以简单的读作:队列对来自于这个exchange的消息是感兴趣的。
绑定可以使用一个额外的routing_key参数。为了避免跟一个basic_publish参数混乱我们将调用它一个绑定的键。这就是我们如何使用一个键创建一个绑定:
channel.queue_bind(exchange=exchange_name,queue=queue_name, routing_key=‘black‘)
绑定键意味着以来exchange类型。fanout类型的exchange,我们用了前面的,简单的忽略它的值。
Direct exchange
前面的教程中我们的日志系统给所有的消费者广播所有的消息。我们想扩展它允许基于它们的服务过滤消息。例如我们也许想让写的脚本仅仅只接收严重的错误日志消息,并且对警告和信息日志上不浪费磁盘空间。
我们使用一个fanout类型的exchange,它不会给我们太多的扩展性-它仅仅能无意识的广播。
我们将使用direct类型的exchange代替它。在direct类型的exchange后面的路由算法是简单的-一个消息进入一个正真匹配绑定键的消息routing_key的队列。
为了阐明这个,考虑下面的设置:
在这个设置中,我们看到direct类型的exchange X有两个队列被绑定给它。第一个队列使用绑定键orange绑定,第二个有两个绑定,一个用绑定键black并且另一个用green。
像这样的设置一个消息使用一个路由键orange被发布到exchange将被路由到队列Q1。使用路由键black或者green将进入Q2。所有的消息将被丢弃。
多个绑定
它完美合法的使用相同的绑定键绑定了多个队列。在我们的例子中我们可以在X和Q1之间天剑一个绑定键black。在那个例子中,direct类型的exchange将表现相似于fanout并且将给所有匹配的队列广播消息。一个使用路由键的black消息将被传递给Q1和Q2。
生产日志
我们将给我们的日志系统使用这种模式。单体fanout我们将给一个direct类型的exchange发送消息。我们将提供日志严重程度作为一个路由键。接收脚本用这种方式就能选择它想要接收的严重程度。我们首先聚焦于生产日志。
我们总是需要首先创建exchange:
channel.exchange_declare(exchange=‘direct_logs‘, type=‘direct‘)
并且我们准备发送一个消息:
channel.basic_publish(exchange=‘direct_logs‘,routing_key=severity,body=message)
对于简化的事情我们建议‘严重程度‘可能是‘info‘,‘warning‘,‘error‘中的其中之一。
订阅
接收消息将仅仅像前面的教程一样生效,带着一个异常-我们将给每个我们感兴趣的严重程度创建一个绑定。
result=channel.queue_declare(exclusive=True) queue_name = result.method.queue for severity in severities: channel.queue_bind(exchange=‘direct_logs‘, queue=queue_name, routing_key=severity)
把代码合在一起
emit_log_direct.py的代码:
#!/usr/bin/env python import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters(host=‘localhost‘)) channel = connection.channel() channel.exchange_declare(exchange=‘direct_logs‘,type=‘direct‘) severity = sys.argv[1] if len(sys.argv) > 1 else ‘info‘ message = ‘ ‘.join(sys.argv[2:]) or ‘Hello World!‘ channel.basic_publish(exchange=‘direct_logs‘, routing_key=severity, body=message) print "[x] sent %r:%r" %(severity, message) connection.close()
receive_logs_direct.py的代码:
#!/usr/bin/env python import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters(host=‘localhost‘)) channel = connection.channel() channel.exchange_declare(exchange=‘direct_logs‘, type=‘direct‘) result = channel.queue_declare(exclusive=True) queue_name = result.method.queue severities = sys.argv[1:] if not severities: print >> sys.stderr, "Usage : %s [info] [warning] [error]" %(sys.argv[0],) sys.exit(1) for severity in severities: channel.queue_bind(exchange=‘direct_logs‘, queue=queue_name, routing_key=severity) print ‘[*] waiting for logs. To exit press CTRL+C‘ def callback(ch, method, properties, body): print "[x] %r:%r" %(method, routing_key, body,) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()