【译】RabbitMQ:工作队列(Work Queue)

在第一篇我们写了两个程序通过一个命名的队列分别发送和接收消息。在这一篇,我们将创建一个工作队列在多个工作线程间分发耗时的工作任务。

工作队列的核心思想是避免立刻处理资源密集型任务导致必须等待其执行完成。相反的,我们安排这些任务在稍晚的时间完成。我们将一个任务封装为一个消息并把它发送到队列中。一个后台的工作线程将从队列中取出任务并最终执行。当你运行多个工作线程,这些任务将在这些工作线程间共享。

这个概念对于在一个HTTP请求中处理复杂任务的Web应用尤其有用。

准备工作

在前一篇中,我们发送了一条内容为“Hello World!”的消息。现在,我们将要发送一些代表复杂任务的字符串。我们并没有诸如改变图片大小或者渲染PDF文件这样的真实的任务,所以假设任务会导致系统的繁忙--通过使用Threed.Sleep()函数。我们会采用许多的点(.)在字符串中来表达他的复杂性,每一个点将消耗一秒钟的工作时间。例如,假设有一个任务“Hello...”将消耗3秒钟。

我们会把上一个例子中的Send.cs文件中的代码稍微调整一下,使得对任意的消息都能通过命令行发送。这个程序将调度任务到我们的工作队列中,所以让我们将它命名为NewTask.cs:

 1 var message = GetMessage(args);
 2 var body = Encoding.UTF8.GetBytes(message);
 3
 4 var properties = channel.CreateBasicProperties();
 5 properties.SetPersistent(true);
 6
 7 channel.BasicPublish(exchange: "",
 8                      routingKey: "task_queue",
 9                      basicProperties: properties,
10                      body: body);

获取命令行消息的帮助方法:

1 private static string GetMessage(string[] args)
2 {
3     return ((args.Length > 0) ? string.Join(" ", args) : "Hello World!");
4 }

旧有的Receive.cs代码同样需要稍作修改:需要一个为消息中每一个点模拟一秒的时间消耗。它将会处理RabbitMQ发布的消息,执行任务,所以我们称之为Worker.cs。

 1 var consumer = new EventingBasicConsumer(channel);
 2 consumer.Received += (model, ea) =>
 3 {
 4     var body = ea.Body;
 5     var message = Encoding.UTF8.GetString(body);
 6     Console.WriteLine(" [x] Received {0}", message);
 7
 8     int dots = message.Split(‘.‘).Length - 1;
 9     Thread.Sleep(dots * 1000);
10
11     Console.WriteLine(" [x] Done");
12
13     channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
14 };
15 channel.BasicConsume(queue: "task_queue", noAck: false, consumer: consumer);

模拟虚拟任务的执行时间:

1 int dots = message.Split(‘.‘).Length - 1;
2 Thread.Sleep(dots * 1000);

像第一篇中那样编译程序:

1 $ csc /r:"RabbitMQ.Client.dll" NewTask.cs
2 $ csc /r:"RabbitMQ.Client.dll" Worker.cs

轮转调度

使用工作队列的好处之一是能够很轻松的并行任务。如果我们要加强对积压工作的处理,只需要按照上面的方法添加更多的Worker,非常容易扩展。

首先,我们同时运行两个Worker。它们都会从队列中获取消息,但是究竟是怎样做到的呢?让我们看看。

你需要打开三个控制台。两个运行Worker,这两个控制台程序将充当消费者--C1与C2。

1 shell1$ Worker.exe
2 Worker
3  [*] Waiting for messages. To exit press CTRL+C
1 shell2$ Worker.exe
2 Worker
3  [*] Waiting for messages. To exit press CTRL+C

在第三个控制台程序中,我们将发布一些新的任务。一旦你已经运行了消费者,你就可以发布新消息了:

1 shell3$ NewTask.exe First message.
2 shell3$ NewTask.exe Second message..
3 shell3$ NewTask.exe Third message...
4 shell3$ NewTask.exe Fourth message....
5 shell3$ NewTask.exe Fifth message.....

让我们看看有什么发送到了Worker端:

1 shell1$ Worker.exe
2  [*] Waiting for messages. To exit press CTRL+C
3  [x] Received ‘First message.‘
4  [x] Received ‘Third message...‘
5  [x] Received ‘Fifth message.....‘
1 shell2$ Worker.exe
2  [*] Waiting for messages. To exit press CTRL+C
3  [x] Received ‘Second message..‘
4  [x] Received ‘Fourth message....‘

默认情况下,RabbitMQ会按顺序将消息逐个发送到消费者。平均情况下,每一个消费者将会获得相同数量的消息。这种分发消息的方式成为轮转调度。可以使用三个以上的Worker试一试。

消息确认

处理一个任务可能花费数秒钟。你可能会担心消费者开始一个较长的任务,但是在完成部分之后就出错了。在我们现在的代码中,一旦RabbitMQ分发了一条消息给消费者它就会马上在队列中删除这条消息。在这样的情况下,如果你中止某一个Worker,因为消息正在执行中,我们将丢失该消息。我们也将丢失所有分发到该Worker但是未被处理的消息。

但是我们不想丢失任何一个任务。如果一个Worker中止了,我们希望这个任务能被分发给其他Worker。

为了确保消息绝不丢失,RabbitMQ提供了消息确认机制。消费者回发一个确认给RabbitMQ,告知某个消息已经被接收、处理,然后RabbitMQ就可以随心所欲的删除它了。

如果一个消费者在没有回发确认就中止了,RabbitMQ会认为该消息没有被完全的处理,并会将该消息重新分发给其他的消费者。通过这种方式,你可以确定没有消息会丢失,即使有Worker会不可意料的中止。

没有消息会超时,RabbitMQ仅仅会在Worker的连接中止的时候重新分发消息。即使处理一个消息花费的时间很长很长也不会有什么关系。

消息确认在默认情况下是开启的。在前面的示例中我们通过将noAck参数设置为true显示的关闭了消息确认。现在是时候移除该标记了,使完成一个任务时发回一个恰当的确认。

 1 var consumer = new EventingBasicConsumer(channel);
 2 consumer.Received += (model, ea) =>
 3 {
 4     var body = ea.Body;
 5     var message = Encoding.UTF8.GetString(body);
 6     Console.WriteLine(" [x] Received {0}", message);
 7
 8     int dots = message.Split(‘.‘).Length - 1;
 9     Thread.Sleep(dots * 1000);
10
11     Console.WriteLine(" [x] Done");
12
13     channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
14 };
15 channel.BasicConsume(queue: "task_queue", noAck: false, consumer: consumer);

使用这段代码,我们可以确保及时在消费者正在执行时你用CTRL+C强制中断了程序,也不会丢失任何消息。在消费者中止后不久,所有为收到确认的消息都将被重新分发。

被遗忘的确认

缺少BasicAck是一个非常常见的错误。这是一个简单的错误,但是后果却相当严重。当客户端退出的时候,消息会被重新分发(看起来像是随机分发的),但是RabbitMQ会占用越来越多的内存因为它不能释放未确认的消息。

为了调试这种错误,你可以使用rabbitmqctl打印messages_unacknowledged字段:

1 $ sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
2 Listing queues ...
3 hello    0       0
4 ...done.

消息持久化

我们已经学习了如何确保即使消费者中止,任务也不会丢失。但是如果RabbitMQ服务中止的时候,我们的任务还是会丢失。

当RabbitMQ退出或者崩溃,它会忘记存在的队列和队列中的消息,除非你告诉它不要这样。确保消息不丢失,有两件事情是必须的:我们必须同时把队列和消息标记为持久的(durable)。

首先,我们需要确保RabbitMQ永远不会丢失队列。为了做到这件事,我们需要将队列申明为持久的:

1 channel.QueueDeclare(queue: "hello",
2                      durable: true,
3                      exclusive: false,
4                      autoDelete: false,
5                      arguments: null);

尽管此命令本身是正确的,但是在当前设置下它不会起作用。因为我们已经定义过一个叫做hello的队列。RabbitMQ不允许使用不同的参数重定义一个已经存在的队列,任何尝试做这样的事情的程序都将返回一个错误。但是有一个变通的方法--让我们用不同的名称申明一个队列,例如task_queue:

1 channel.QueueDeclare(queue: "task_queue",
2                      durable: true,
3                      exclusive: false,
4                      autoDelete: false,
5                      arguments: null);

这个队列申明的改变需要被应用于生产者和消费者。

这个时候,我们可以确定即使是RabbitMQ重启了,task_queue也不会丢失。现在我们需要把我们的消息标记为持久的(persistent),通过把IBasicProperties.SetPersistent设置为true。

1 var properties = channel.CreateBasicProperties();
2 properties.SetPersistent(true);

消息持久注记

将消息标记为持久的并不能完全的保证消息不会丢失。尽管告知了RabbitMQ将消息保存在磁盘上,仍旧有很短的时间里RabbitMQ接收到一个消息并且还没有保存。所以RabbitMQ不会对每条消息做fsync--它可能仅仅被存放在Cache中而不是实际写入到磁盘里面。消息的持久化保证并不健壮,但是对于简单的任务队列已经足够。如果你需要一个更加健壮的保证,你可以使用发布者确认

公平调度

你可能已经注意到调度依旧不能完全按照我们期望的方式工作。设想一个有两个Worker的应用场景,当所有奇数消息都很庞大而偶数消息很轻量的时候,一个Worker总是非常的繁忙而另一个几乎不做什么事情。嗯,RabbitMQ并不会知道这事儿,它依然会平均的分发消息。

出现这种情况是因为RabbitMQ只是在消息进入队列后就将其分发。它并不会去检查每个消费者所拥有的未确定消息的数量。它只是不假思索的将第N个消息调度到第N个消费者。

为了应对这种情况,我们可以使用basicQos方法并且把参数prefetchCount设置为1。这将告诉RabbitMQ不要同一时间调度给同一个消费者超过一条消息。或者,当一个消费者正在处理或确认前一个消息时不要将新消息调度给它。相反的,它会把这个消息调度给下一个不忙碌的消费者。

1 channel.BasicQos(0, 1, false);

队列大小注记

如果所有的消费者都很忙碌,你的队列可能被填满。你希望能盯着这个问题,并且添加更多的消费者,或者使用其他策略。

组合在一起

NewTask.cs类的最终代码如下:

 1 using System;
 2 using RabbitMQ.Client;
 3 using System.Text;
 4
 5 class NewTask
 6 {
 7     public static void Main(string[] args)
 8     {
 9         var factory = new ConnectionFactory() { HostName = "localhost" };
10         using(var connection = factory.CreateConnection())
11         using(var channel = connection.CreateModel())
12         {
13             channel.QueueDeclare(queue: "task_queue",
14                                  durable: true,
15                                  exclusive: false,
16                                  autoDelete: false,
17                                  arguments: null);
18
19             var message = GetMessage(args);
20             var body = Encoding.UTF8.GetBytes(message);
21
22             var properties = channel.CreateBasicProperties();
23             properties.SetPersistent(true);
24
25             channel.BasicPublish(exchange: "",
26                                  routingKey: "task_queue",
27                                  basicProperties: properties,
28                                  body: body);
29             Console.WriteLine(" [x] Sent {0}", message);
30         }
31
32         Console.WriteLine(" Press [enter] to exit.");
33         Console.ReadLine();
34     }
35
36     private static string GetMessage(string[] args)
37     {
38         return ((args.Length > 0) ? string.Join(" ", args) : "Hello World!");
39     }
40 }

Worker.cs类的最终代码如下:

 1 using System;
 2 using RabbitMQ.Client;
 3 using RabbitMQ.Client.Events;
 4 using System.Text;
 5 using System.Threading;
 6
 7 class Worker
 8 {
 9     public static void Main()
10     {
11         var factory = new ConnectionFactory() { HostName = "localhost" };
12         using(var connection = factory.CreateConnection())
13         using(var channel = connection.CreateModel())
14         {
15             channel.QueueDeclare(queue: "task_queue",
16                                  durable: true,
17                                  exclusive: false,
18                                  autoDelete: false,
19                                  arguments: null);
20
21             channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
22
23             Console.WriteLine(" [*] Waiting for messages.");
24
25             var consumer = new EventingBasicConsumer(channel);
26             consumer.Received += (model, ea) =>
27             {
28                 var body = ea.Body;
29                 var message = Encoding.UTF8.GetString(body);
30                 Console.WriteLine(" [x] Received {0}", message);
31
32                 int dots = message.Split(‘.‘).Length - 1;
33                 Thread.Sleep(dots * 1000);
34
35                 Console.WriteLine(" [x] Done");
36
37                 channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
38             };
39             channel.BasicConsume(queue: "task_queue",
40                                  noAck: false,
41                                  consumer: consumer);
42
43             Console.WriteLine(" Press [enter] to exit.");
44             Console.ReadLine();
45         }
46     }
47 }

你可以使用消息确定和BasicQos设置一个工作队列。持久化选项使得消息RabbitMQ重启的时候也得以保全。

要了解关于IModel和IBasicProperties的更多信息,你可以浏览在线的RabbitMQ .NET客户端API引用

现在我们可以前进道教程三并了解如何向多个消费者发送相同的消息。

原文链接:http://www.rabbitmq.com/tutorials/tutorial-two-dotnet.html

时间: 2024-10-03 06:25:48

【译】RabbitMQ:工作队列(Work Queue)的相关文章

RabbitMQ系列 第三篇:工作队列Work Queue

在上篇中我们实现了程序来从一个已经命名的队列里发送和接收消息.本篇博文中我们将要创建工作队列用来在多个执行角色间,使用定时器来分散执行任务. 工作队列的主要思想就是避开立刻处理某个资源消耗交大的任务并且需要等待它执行完成.取而代之的是我们可以将它加入计划列表,并在后边执行这些任务.我们将任务分装成一个消息,并发送到队列中.后台的工作程序在接收到消息后将会立刻执行任务.当运行多个执行器时,任务将会在他们之间共享. 这个概念在web应用程序中是比较实用的,对于一些在一个短的http请求里无法完成的复

RabbitMQ的work queue(1)

http://www.rabbitmq.com/tutorials/tutorial-two-java.html 在第一个教程中,我们通过一个命名队列来发送消息和接受消息.在这一节,我们将创建一个工作队列,在多个工作者之间,分发比较耗时的任务 工作队列主要是为了避免资源密集型任务的立即执行,然后一直等待它执行结束.相反,我们可以安排好任务,然后在执行.我们可以将一个任务封装成一个消息,发送到队列中.由工作者在后台取出任务然后执行.当有多个工作者时,他们共同处理这些任务. 在web应用中,当一次h

rabbitmq method之queue.declare

queue.declare即申请队列,首先对队列名作处理,若未指定队列名则随机生成一个,然后查询数据库队列是否已经创建,若创建完成则会申请队列返回 handle_method(#'queue.declare'{queue = QueueNameBin, passive = false, durable = DurableDeclare, exclusive = ExclusiveDeclare, auto_delete = AutoDelete, nowait = NoWait, argumen

RabbitMQ (消息队列)专题学习03 Work Queues(工作队列)

一.概述 工作队列(Work queues) (使用Java客户端) 在前面的专题学习中,我们使用Java语言实现了一个简单的从名为"hello"的队列中发送和接收消息的程序,在这部内容中我们将创建一个工作队列被用来分配定时消息任务,而且通过多个接收者(工作者)实现. 工作队列(又名任务队列),主要的思想是为了避免立即做一个资源密集型的任务(多消息同时密集发送),不必等待它完成,当运行许多工作者的让任务都在她们之间共享. 它在web应用中是非常有用的,因为在很短的时间内http请求窗口

RabbitMQ学习第二记:工作队列的两种分发方式,轮询分发(Round-robin)和 公平分发(Fair dispatch)

1.什么是RabbitMQ工作队列 我们在应用程序使用消息系统时,一般情况下生产者往队列里插入数据时速度是比较快的,但是消费者消费数据往往涉及到一些业务逻辑处理导致速度跟不上生产者生产数据.因此如果一个生产者对应一个消费者的话,很容易导致很多消息堆积在队列里.这时,就得使用工作队列了.一个队列有多个消费者同时消费数据. 下图取自于官方网站(RabbitMQ)的工作队列的图例 P:消息的生产者 C1:消息的消费者1 C2:消息的消费者2 红色:队列 生产者将消息发送到队列,多个消费者同时从队列中获

RabbitMQ(一)

转载请标明出处:http://blog.csdn.net/lmj623565791/article/details/37607165 本系列教程主要来自于官网入门教程的翻译,然后自己进行了部分的修改与实验,内容仅供参考. "Hello world" of RabbitMQ 1.Windows下RabbitMQ的安装 下载Erlang,地址:http://www.erlang.org/download/otp_win32_R15B.exe ,双击安装即可(首先装) 下载RabbitMQ,

十一天 python操作rabbitmq、redis

1.启动rabbimq.mysql 在""运行""里输入services.msc,找到rabbimq.mysql启动即可 2.启动redis 管理员进入cmd,进入redis所在目录,执行redis-server.exe redis.windows.conf --maxmemory 200M  启动redis  server 执行redis-cli.exe启动客户端 一.python系列之 RabbitMQ - work queues 本节我们创建一个工作队列( w

RabbitMQ 6种应用场景

http://www.rabbitmq.com/getstarted.html官网 最近业务需要使用Rabbitmq工作队列实现任务的负载分发 1.1.什么是RabbitMQ? RabbitMQ是实现AMQP(高级消息队列协议)的消息中间件的一种,服务器端用Erlang语言编写,支持多种客户端,如:Python.Ruby..NET.Java.JMS.C.PHP.ActionScript.XMPP.STOMP等,支持AJAX.用于在分布式系统中存储转发消息. 1.2.什么是AMQP? ??AMQP

Python一路走来 RabbitMQ

一:介绍:(induction) Rabbitmq 是一个消息中间件.他的思想就是:接收和发送消息.你可以把它想成一个邮政局.当你把你的邮件发送到邮箱的,首先你需要确认的是:邮政员先生能把你的邮件发送给你想发送的地方.在这个比喻中,rabbitmq就是一个邮箱.一个邮政局.一个邮递员. 在这里rabbitmq和传统邮政局的区别:rabbitmq不处理信纸.取而代之的是:接收.储存.发送二进制数的消息. rabbitmq和消息用如下专业术语: 生产者意思发送.A程序发送消息被称为:producer

RabbitMQ教程

1.引言 RabbitMQ--Rabbit Message Queue的简写,但不能仅仅理解其为消息队列,消息代理更合适.RabbitMQ 是一个由 Erlang 语言开发的AMQP(高级消息队列协议)的开源实现,其内部结构如下: RabbitMQ作为一个消息代理,主要和消息打交道,负责接收并转发消息.RabbitMQ提供了可靠的消息机制.跟踪机制和灵活的消息路由,支持消息集群和分布式部署.适用于排队算法.秒杀活动.消息分发.异步处理.数据同步.处理耗时任务.CQRS等应用场景. 下面我们就来学