在前一篇文章中我们构建了一个简单的日志系统,我们可以向多个接受者广播消息。
在这篇文章我,我们将要添加一些功能使得针对部分消息的接受成为可能,例如我们只对错误的消息进行磁盘记录,同时又可以把所有的消息打印到屏幕上。
绑定
在之前的案例中,我们已经创建了一个绑定,可以重新调用如下的代码:
channel.QueueBind(queue: queueName,exchange: "logs",routingKey: "");
绑定是交换机和队列之间的关系,可以简单的理解为队列对该交换机上的消息感兴趣。
绑定可以设定参数routingKey,为了避免和BasicPublish 方法的参数混淆,我们暂且称之为binding key,下面是我们创建一个带有指定binding key的绑定:
channel.QueueBind(queue: queueName,exchange: "direct_logs",routingKey: "black");
Direct exchange
我们之前的日志系统,把接受到的消息广播给所有的接受者,我们将要扩展它使得其能够根据消息的级别来过滤发送消息,例如我们想让记录日志的接受者仅仅接受严重性级别的错误消息,而不用在警告和信息级别的消息上浪费磁盘空间。
fanout没有办法提供给我们这样的灵活性,它只能对接受到的消息进行直接广播,而不去关心 routing key.
这里我们使用direct类型的交换机去代替,direct类型的交换机的实现思想非常简单--消息将会被发送到其Binding key 和消息的routing key 完全匹配的队列上。
为了说明这个问题,考虑下面的设定:
在这个交换机上绑定了两个队列,第一个队列绑定的bingding key 为"orange",第二个队列绑定了两个binding key,一个是"black",另外一个是"green",在这样的配置下,如果一个带有routing key为"orange"的消息,竟会被路由到队列Q1上,而带有routing key为"black" 或 "green"的消息将会被路由到队列Q2上。
多绑定
使用一个routing key 绑定多个队列完全是合法的,在我们案例中我们可以在 路由X 和 队列Q1,Q2中同时添加一个binding key为"black"的绑定,在这种情况下路由 X 将会像 fanout交换机一样帮匹配到的消息发送给所有的接受者:路由会把routing key 为"black"的消息发送给Q1和Q2。
嵌入日志
我们使用这种模式来构建我们的日志系统,代替fanout我们将会把我们的消息发送到direct类型的交换机,我们把日志级别作为路由的rouing key,这样接受者就可以根据日志级别选择接受其感兴趣的日志。
像往常一样,首先我们需要创建一个交换机:
channel.ExchangeDeclare(exchange: "direct_logs", type: "direct");
然后我们准备发送消息:
var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish(exchange: "direct_logs",routingKey: severity,basicProperties: null, body: body);
我们简单的假设 severity 分为以下几种:"info","warning","error".
订阅
接受消息和之前的一样,唯一的区别就是我们将会为我们感兴趣的每一个级别的消息新建绑定:
var queueName = channel.QueueDeclare().QueueName; foreach(var severity in args) { channel.QueueBind(queue: queueName,exchange: "direct_logs",routingKey: severity); }
汇总
EmitLogDirect.cs:
class EmitLogDirect { public static void Main(string[] args) { var factory = new ConnectionFactory() { HostName = "localhost" }; using(var connection = factory.CreateConnection()) using(var channel = connection.CreateModel()) { channel.ExchangeDeclare(exchange: "direct_logs", type: "direct"); var severity = (args.Length > 0) ? args[0] : "info"; var message = (args.Length > 1) ? string.Join(" ", args.Skip( 1 ).ToArray()) : "Hello World!"; var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish(exchange: "direct_logs", routingKey: severity, basicProperties: null, body: body); Console.WriteLine(" [x] Sent ‘{0}‘:‘{1}‘", severity, message); } Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } }
ReceiveLogsDirect.cs:
class ReceiveLogsDirect { public static void Main(string[] args) { var factory = new ConnectionFactory() { HostName = "localhost" }; using(var connection = factory.CreateConnection()) using(var channel = connection.CreateModel()) { channel.ExchangeDeclare(exchange: "direct_logs", type: "direct"); var queueName = channel.QueueDeclare().QueueName; if(args.Length < 1) { Console.Error.WriteLine("Usage: {0} [info] [warning] [error]", Environment.GetCommandLineArgs()[0]); Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); Environment.ExitCode = 1; return; } foreach(var severity in args) { channel.QueueBind(queue: queueName, exchange: "direct_logs", routingKey: severity); } Console.WriteLine(" [*] Waiting for messages."); var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body); var routingKey = ea.RoutingKey; Console.WriteLine(" [x] Received ‘{0}‘:‘{1}‘", routingKey, message); }; channel.BasicConsume(queue: queueName, noAck: true, consumer: consumer); Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } } }
编译执行代码:
例如:如果你仅仅想保存"warning"和"error"的消息,打开控制台输入:ReceiveLogsDirect.exe warning error
发送一个Error消息, 控制台输入 EmitLogDirect.exe error "Run. Run. Or it will explode."