上一节我们实现了向指定的队列发送和接收消息。这一节,我们主要讲工作队列,用于在多个消费者之间分配置实时任务。
工作队列方式主要是为了防止在执行一个耗费资源的任务时,要等待其结束才能处理其它事情。我们将任务的执行延迟,将其封装成一个消息,然后发送给一个列队。后台再运行一个程序从队列里取出消息,然后执行任务。如果有多个消费者,还可以分享任务。
对于Web应用程序来说,这样就可以使用Http的短请求来处理复杂的业务。
准备
我们发送一个字符串来代表复杂的任务,然后用Thread.Sleep()来模拟耗时操作,使用点的个数来代表任务的复杂度,一个点代表一秒种的工作,例如Hello…代表一个耗时三秒的任务。
修改上一节示例中Send.cs的代码,允许从命令输入任意消息,然后发送到工作队列,我们将其命名为NewTask.cs。
var message = GetMessage(args); var body = Encoding.UTF8.GetBytes(message); var properties = channel.CreateBasicProperties(); properties.SetPersistent(true); channel.BasicPublish(exchange: "", routingKey: "task_queue", basicProperties: properties, body: body);
GetMessage方法获取输入的消息
private static string GetMessage(string[] args) { return ((args.Length > 0) ? string.Join(" ", args) : "Hello World!"); }
修改上一节中的Receive.cs,按每个点代表一个秒模拟耗时任务,处理来自RabbitMQ的消息并执行任务,我们将其命名为Worker.cs
var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body); Console.WriteLine(" [x] Received {0}", message); int dots = message.Split(‘.‘).Length - 1; Thread.Sleep(dots * 1000); Console.WriteLine(" [x] Done"); }; channel.BasicConsume(queue: "task_queue", noAck: true, consumer: consumer);
模拟任务执行的时间
int dots = message.Split(‘.‘).Length - 1; Thread.Sleep(dots * 1000);
编译运行
循环调度
使用工作队列的好处是很容易实现平分工作。如果要执行的工作会造成积压,只需要运行多个工作者就可以了。
首先,运行两个控制台程序执行Worker.cs,他们都从队列里取消息。这两个控制台程序就是消息者,分别是C1,C2。
再运行一个控制台程序执行NewTask.cs发布任务。启动消费者之后,在控制输入以下消息内容发到队列:
shell3$ NewTask.exe First message. shell3$ NewTask.exe Second message.. shell3$ NewTask.exe Third message... shell3$ NewTask.exe Fourth message.... shell3$ NewTask.exe Fifth message.....
然后看看工作都的接收内容:
C1
[*] Waiting for messages. To exit press CTRL+C [x] Received ‘First message.‘ [x] Received ‘Third message...‘ [x] Received ‘Fifth message.....‘
C2
[*] Waiting for messages. To exit press CTRL+C [x] Received ‘Second message..‘ [x] Received ‘Fourth message....‘
RabbitMQ默认按顺序发送消息到每一个消费者,每个消费者会平均取到相同数量的消息,这种分布式消息方式称做轮循调度。也可以添加更多的消费者。
消息确认
执行一个任务需要花费一些时间,那么你可能会考虑如果一个消费者正执行一个耗时的任务时突然崩溃了怎么办。我们目前的代码,一旦RabbitMQ发送消息给消费者,消费会立即被删除。在这种情况下,如果中止一个工作者,就会丢失正在处理的消息,还有当前消费者已接收但还没有被处理的消息也会丢失。
但是我们不想丢失任何任务,如果某一个工作者停止了,我们希望任务会被发送到其它工作者。
为了保证消息不会丢失,RabbitMQ提供了消息确认机制(acknowledgments)。当消息被接收并处理之后,消费者发送一个ack到RabbitMQ,告诉它可以自由删除该消息了。
如果消费者停止了,没有发送ack确认,那么RabbitMQ会认为这个消息没有被处理,它会将这个消费重新入队。如果还有其它的消费者的话,它就会立马把这个消息发送给另一个消费者去处理。这样即使消费者偶尔中止了,那么也不会造成消息丢失。
任何消息都不会超时,如果消费者崩溃了,RabbitMQ会重新发送消息,即使消息处理要发花费很长时间也不会有问题。
消息确认机制默认是开着的。前边的例子都通过设置noAck为true来关闭了消息确认。现在设置noAck为false,当任务执行完成后,工作者发送确认消息。
var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body); Console.WriteLine(" [x] Received {0}", message); int dots = message.Split(‘.‘).Length - 1; Thread.Sleep(dots * 1000); Console.WriteLine(" [x] Done"); channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); }; channel.BasicConsume(queue: "task_queue", noAck: false, consumer: consumer);
上边的代码可以确保工作者正在处理消息时崩溃的话,也不会丢失任何消息。所有未经确认的消息都会重新发送。
消息持久化
我们已经知道如何确认工作者崩溃时,任务也不会丢失。但是如查RabbitMQ也停止的话,任务同样会丢失。
如果RabbitMQ退出或崩溃了,默认就会清除队列和消息。要保证消息不丢失,需要配置队列和消息都要持久化。
首先要保证RabbitMQ不会丢失队列,这需要声明队列为持久队列
channel.QueueDeclare(queue: "hello", durable: true, exclusive: false, autoDelete: false, arguments: null);
从语法上讲,上边的代码没有什么问题,但是这样并不会起作用,因为我们这前已经声明了名为hello的队列,RabbitMQ不允许以不同的参数重新定义已经存在的队列,这样会抛出异常。所以要换个队列名称,如task_queue
channel.QueueDeclare(queue: "task_queue", durable: true, exclusive: false, autoDelete: false, arguments: null);
queueDeclare 的修改要在生产者和消费者两边都进配置。
设置队列持久化之后,需要再设置消息为持久化。
var properties = channel.CreateBasicProperties(); properties.SetPersistent(true);
平衡调度
你可能注意到,目前的分发机制仍然不理想。例如,如果有两个工作者,当奇数的消息十分复杂,而偶数的消息很简单时,一个工作者就会非常繁忙,而另一个工作者几乎没有任务可做。但RabbitMQ不知道这种情况,仍然会平均分配任务。
因为只要有消息进入队列,RabbitMQ就会分发消息。它不会检查每一个消费者未确认的消息个数。它只是盲目的将第N个消息发送给第N个消费者。
为了防止这种情况,要使用basicQos 方法,并设置prefetchCount 参数的值为1。这样RabbitMQ就不会同时发送多个消息给同一个工作者。也就是说,在工作者处理并确认前一个消息之前,不会分发新的消息给工作者。它会把消息发送给下一个不忙的工作者。
channel.BasicQos(0, 1, false);
完整代码
NewTask.cs代码
using System; using RabbitMQ.Client; using System.Text; class NewTask { public static void Main(string[] args) { var factory = new ConnectionFactory() { HostName = "localhost" }; using(var connection = factory.CreateConnection()) using(var channel = connection.CreateModel()) { channel.QueueDeclare(queue: "task_queue", durable: true, exclusive: false, autoDelete: false, arguments: null); var message = GetMessage(args); var body = Encoding.UTF8.GetBytes(message); var properties = channel.CreateBasicProperties(); properties.SetPersistent(true); channel.BasicPublish(exchange: "", routingKey: "task_queue", basicProperties: properties, body: body); Console.WriteLine(" [x] Sent {0}", message); } Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } private static string GetMessage(string[] args) { return ((args.Length > 0) ? string.Join(" ", args) : "Hello World!"); } }
Worker.cs代码
using System; using RabbitMQ.Client; using RabbitMQ.Client.Events; using System.Text; using System.Threading; class Worker { public static void Main() { var factory = new ConnectionFactory() { HostName = "localhost" }; using(var connection = factory.CreateConnection()) using(var channel = connection.CreateModel()) { channel.QueueDeclare(queue: "task_queue", durable: true, exclusive: false, autoDelete: false, arguments: null); channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false); Console.WriteLine(" [*] Waiting for messages."); var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body); Console.WriteLine(" [x] Received {0}", message); int dots = message.Split(‘.‘).Length - 1; Thread.Sleep(dots * 1000); Console.WriteLine(" [x] Done"); channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); }; channel.BasicConsume(queue: "task_queue", noAck: false, consumer: consumer); Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } } }
使用消息确认机制和BasicQos 就可以建立工作队列,用持久化选项可以保证即使RabbitMQ重启也不会丢失任务。