RoutingKey
每个项目都需要记录日志,日志则一般会分为多种级别,常见的是
Info、debug、warn、Error
对于前三种日志,在项目运行中会产生大量的消息,但是一般多数情况下是不会用到的即时性不高,而error则不同,对于error级别的消息需要迅速通知开发人员去修改项目中的错误
使用RabbitMq应该怎么设计? 可以分析一下
首先我们需要一个exchange,对于即时性不高的日志速度慢一些是没有关系的,所以可以把它们放到一个队列中,而针对error级别即时性较高的需要放到一个单独的队列中。在之前我们是在消息生产者中去声明exchange、queue以及它们的绑定关系,这显然是不对的。
对于消息发布者而言它只负责把消息发布出去,甚至它也不知道消息是发到哪个queue,消息通过exchange到达queue,exchange的职责非常简单,就是一边接收发布者的消息一边把这些消息推到queue中。
而exchange是怎么知道消息应该推到哪个queue呢,这就要通过绑定queue与exchange时的routingkey了,通过代码进行绑定并且指定routingkey,下面有一张关系图,p(发布者) ---> x(exchange) bindding(绑定关系也就是我们的routingkey) 红色代表着queue
在第一个例子中我们并没有声明exchange而是使用是默认的,在发布的时候routingkey则是使用的队列名,事实上如果没有指定routingkey队列的名称就是routingkey
//声明队列 channel.QueueDeclare("firstTest", true, false, false, null); //发布消息 var msg = Encoding.UTF8.GetBytes("Hello RabbitMQ"); channel.BasicPublish(string.Empty, routingKey: "firstTest", basicProperties: null, body: msg);
这时可以继续回到我们日志记录了,info、warn、debug 我希望放到Log_else队列中,它们的routingkey分别是info、warn、debug,errror级别的日志我希望放到Log_error队列中,它的routingkey是error。这时可以去编写代码了
在发布者中的代码中我把声明exchange、queue与它们的绑定关系移除掉了,放到了consumber中,,方便演示这里的日志级别只有info、debug、error。下面分别生成了对应的消息
//创建返回一个新的频道 using (var channel = RabbitMqHelper.GetConnection().CreateModel()) { //发布一百个消息 for (var i = 0; i < 100; i++) { //对i进行求余来决定日志的级别 var routingkey = i % 2 == 0 ? "info" : i % 3 == 0 ? "debug" : "error"; var msg = Encoding.UTF8.GetBytes($"{i} :{routingkey}Message"); channel.BasicPublish("LogExchange", routingKey: routingkey, basicProperties: null, body: msg); } Console.Write("发布成功!"); }
对于consumber而言,这里根据控制台传入的级别创建不同的queue与它们与exchange的关系
bool flag = true; string level = ""; while (flag) { Console.WriteLine("请指定要接收的消息级别"); level = Console.ReadLine(); if (level == "info" || level == "error" || level == "debug") flag = false; else Console.Write("仅支持info、debug与error级别"); } using (var channel = RabbitMqHelper.GetConnection().CreateModel()) { //声明交换机 direct模式 channel.ExchangeDeclare("LogExchange", "direct", true, false, null); //根据声明使用的队列 var queueName = level == "info" ? "Log_else" : level == "debug" ? "Log_else" : "Log_error"; channel.QueueDeclare(queueName, true, false, false, null); //进行绑定 channel.QueueBind(queueName, "LogExchange", level, null); //创建consumbers var consumer = new EventingBasicConsumer(channel); consumer.Received += (sender, e) => { var msg = Encoding.UTF8.GetString(e.Body); Console.WriteLine(msg); }; //进行消费 channel.BasicConsume(queueName, true, consumer); Console.ReadKey(); }
运行了三个consumber,指定三个消息级别
打开Web工具,可以看到已经有了log_else与log_error两种queue。
还有它们的绑定关系
运行消息发布者,这时候就可以看到三个consumber已经在消费了,error的consumber只消费了log_error队列中的消息、其它两个consumber消费的是log_else中的消息