目录
RabbitMQ --- Hello Mr.Tua
RabbitMQ --- Work Queues(工作队列)
RabbitMQ --- Publish/Subscribe(发布/订阅)
前言
在上一章中介绍了 Publish/Subscribe(发布/订阅),它是把每个消息发送给多个 Consumer,也就是说每个 Consumer 都是接收所有的消息,辣么问题来了,如果 Consumer 只接收它想要的某一部分消息,那该怎么办呢?可以通过 Routing(路由)的机制来实现。
Direct交换机(Direct exchange)
在上一章的示例中通过 Fanout exchange 把所有消息广播到多个 Consumer,这样是无法满足 Consumer 自定义接收消息的需求,为了解决这个问题就需要使用 Direct exchange ,它会使消息转发到 Routing key 和 Binding key 完全匹配的队列,而这两者不匹配的消息都会被丢弃。
也可以用相同的 Binding key 绑定多个队列,看上去和 Fanout exchange 的作用一样。
完整示例
现在修改上一章的示例代码,使 Consumer 可以自定义接收奇数(odd)或偶数(even)消息:
using RabbitMQ.Client; using System; using System.Text; namespace Producer { class Program { static void Main(string[] args) { var factory = new ConnectionFactory { HostName = "10.202.228.107", UserName = "Tua", Password = "Tua", Port = 5672 }; using (var connection = factory.CreateConnection()) { using (var channel = connection.CreateModel()) { channel.ExchangeDeclare ( exchange: "Tua", type: ExchangeType.Direct//Direct交换机 ); for (int m = 0; m < 10; m++) { string marks = string.Empty; for (int n = 0; n <= m; n++) { marks += ">"; } string routingKey = string.Empty; if(marks.Length % 2 != 0) { routingKey = "odd";//奇数 } else { routingKey = "even";//偶数 } string msg = "Mr.Tua" + marks + marks.Length + "s"; var body = Encoding.UTF8.GetBytes(msg); channel.BasicPublish ( exchange: "Tua", routingKey: routingKey,//路由键 basicProperties: null, body: body ); Console.WriteLine("Producer sent message: {0}", msg); } Console.ReadLine(); } } } } }
using RabbitMQ.Client; using RabbitMQ.Client.Events; using System; using System.Linq; using System.Text; using System.Threading; namespace Consumer { class Program { static void Main(string[] args) { var factory = new ConnectionFactory { HostName = "localhost" }; using (var connection = factory.CreateConnection()) { using (var channel = connection.CreateModel()) { channel.ExchangeDeclare ( exchange: "Tua", type: ExchangeType.Direct//Direct交换机 ); string queueName = channel.QueueDeclare().QueueName; string[] bindingKeys = { "odd", "even" }; Random random = new Random(); int index = random.Next(2); string bindingKey = bindingKeys[index];//随机生成绑定键 channel.QueueBind ( queue: queueName, exchange: "Tua", routingKey: bindingKey//绑定键 ); var consumer = new EventingBasicConsumer(channel); consumer.Received += (sender, e) => { var body = e.Body; var msg = Encoding.UTF8.GetString(body); int marks = msg.ToCharArray().Where(c => c.ToString() == ">").Count(); Console.WriteLine("Consumer received {0} message: {1}", bindingKey, msg); Thread.Sleep(marks * 1000); Console.WriteLine("OK"); }; channel.BasicConsume ( queue: queueName, noAck: true, consumer: consumer ); Console.ReadLine(); } } } } }
时间: 2024-10-12 12:01:36