RabbitMQ --- Work Queues(工作队列)

目录

RabbitMQ --- Hello Mr.Tua

前言

Work Queues 即工作队列,它表示一个 Producer 对应多个 Consumer,包括两种分发模式:轮循分发(Round-robin)和公平分发(Fair dispatch)。旨在为了避免立即执行任务时出现占用很多资源和时间却又必须等待完成的现象。

原理分析: Producer 把工作任务转化为消息发送给队列,当后台有一个 Consumer 进程在运行时,它会不间断地从队列中取出消息来执行;当后台有多个 Consumer 进程在运行时,它们会不间断地从队列中取出消息采取并行执行的方式以提高效率。

轮循分发(Round-robin)

我修改了第一篇文章中的代码,用线程来模拟处理消息耗时的场景,分别在10个消息的末尾增加符号“>”,每个符号“>”表示该消息在线程中执行需要耗时1秒,每个消息处理完毕时以“OK”表示结束。

Producer 代码片段:

for (int m = 0; m < 10; m++)
{
    string marks = string.Empty;
    for (int n = 0; n <= m; n++)
    {
        marks += ">";
    }
    string msg = "Mr.Tua" + marks + marks.Length + "s";
    var body = Encoding.UTF8.GetBytes(msg);
    channel.BasicPublish
    (
        exchange: string.Empty,
        routingKey: "Tua",
        basicProperties: null,
        body: body
    );
    Console.WriteLine("Producer sent message: {0}", msg);
}

Consumer  代码片段:

consumer.Received += (sender, e) =>
{
    var body = e.Body;
    var msg = Encoding.UTF8.GetString(body);
    int marks = msg.ToCharArray().Where(c => c.ToString() == ">").Count();
    Console.WriteLine("Consumer received message: {0}", msg);    Thread.Sleep(marks * 1000);
    Console.WriteLine("OK");
};

Producer 控制台(后启动运行):

Consumer 控制台(先启动运行 x 2 ):

可以看到 Producer 按照顺序将每个消息发送给下一个 Consumer (一次性平均分配),每个 Consumer 得到相等数量的消息,当中不用考虑处理消息时需要耗费多少时间,也就是说不关心 Consumer 是否繁忙或空闲,这种默认的分发模式称为轮循分发(Round-robin)。

消息应答(Message acknowledgment)

如果某个 Consumer 在处理消息时由于各种原因挂了导致 Producer 没有收到消息处理完成时的应答,那么就会丢失 Consumer 正在处理和没有处理的消息。

两个 Consumer 同时运行的过程中我关闭了其中一个,可以看到下面的 Consumer 完成了第2个消息,丢失了第4(未处理完毕)、6、8、10个消息。

在这种情况下如何保证消息不丢失呢?

消息应答(Message acknowledgment):如果 Consumer 挂了没有发送应答,Producer 会重新转发给其它的 Consumer 以保证不丢失消息。

修改 Consumer 代码:

var body = e.Body;
var msg = Encoding.UTF8.GetString(body);
int marks = msg.ToCharArray().Where(c => c.ToString() == ">").Count();
Console.WriteLine("Consumer received message: {0}", msg);
Thread.Sleep(marks * 1000);
Console.WriteLine("OK");
//每个消息处理完毕时手动发送消息应答
channel.BasicAck
(
   deliveryTag: e.DeliveryTag, //该消息的Index
   multiple: false//是否批量应答,true:批量应答所有小于该deliveryTag的消息
);
channel.BasicConsume
(
   queue: "Tua",
   noAck: false,//手动应答
   consumer: consumer
);

可以看到虽然下面的 Consumer 挂了,但是 Producer 重新把消息发给了上面的 Consumer 去处理。

消息持久化(Message durability)

现在已经知道当 Consumer 挂了不丢失消息的解决方案,可是 RabbitMQ 服务要是挂了会导致所有的队列和消息丢失,这种情况该怎么办呢?

消息持久化(Message durability):让所有的队列和消息都开启持久化功能,将队列和消息都保存在磁盘上以达到持久化的目的。

另外还有一种为了解决事务机制性能开销大(导致吞吐量下降)而提出的更强大的消息持久化的方式叫做 Publisher Confirm,这里不作讨论。

修改 Producer 代码:

channel.QueueDeclare
(
   queue: "Tua",
   durable: true,//开启队列持久化
   exclusive: false,
   autoDelete: false,
   arguments: null
);
var basicProperties = channel.CreateBasicProperties();
basicProperties.Persistent = true;//开启消息持久化
channel.BasicPublish
(
   exchange: string.Empty,
   routingKey: "Tua",
   basicProperties: basicProperties,
   body: body
);

修改 Consumer 代码:

channel.QueueDeclare
(
   queue: "Tua",
   durable: true,//开启队列持久化
   exclusive: false,
   autoDelete: false,
   arguments: null
);

运行程序时报出一个异常错误:

这是因为修改了代码 durable: true,开启了队列的持久化,然而 RabbitMQ 是不允许使用不同的参数重新定义一个已有的同名队列。

两种方法可以解决:

1.重新定义一个不同名的队列;

2.删除已有的同名队列。

第一种方法没有什么好说的,这里说第二种方法,打开 RabbitMQ 管理平台删除已有的同名队列:

测试步骤:首先启动 Producer ---> 关闭 RabbitMQ 服务 ---> 启动 RabbitMQ 服务 ---> 最后启动 Consumer。

可以看到队列和消息都木有丢失,这里就不再上图了。

公平分发(Fair dispatch)

在介绍轮循分发(Round-robin)时有提到它是不关心 Consumer 是否繁忙或空闲的,但是这样很可能就会出现有的 Consumer 劳累过度赶脚身体被掏空,而有的 Consumer 悠闲自得赶脚无用武之地的问题,那该怎么办呢?

公平分发(Fair dispatch):不会同时给一个 Consumer 发送多个新消息,只有在 Consumer 空闲的时候才会给它发送一个新消息。

修改 Consumer 代码:

//请求服务的特殊设置
channel.BasicQos
(
   prefetchSize: 0,//服务传送消息的最大容量,0表示无限制
   prefetchCount: 1,//服务传送消息的最大数量,0表示无限制
   global: false//false:将以上的设置应用于Consumer级别,true:将以上的设置应用于Channel级别
);

为了便于演示,我把 Producer 发送消息的顺序改为从10到1。

可以看到当 Consumer 空闲的时候才会给它发送一个新消息,而且在公平分发(Fair dispatch)模式下支持动态增加 Consumer ,使得新加的 Consumer 可以立即处理还没有发送出去的消息。

反观在默认的轮循分发(Round-robin)模式下已经将消息一次性平均分配完毕,就算是动态增加了 Consumer 也然并卵。。。

示例代码

using RabbitMQ.Client;
using System;
using System.Text;

namespace WorkQueuesProducer
{
    class Program
    {
        static void Main(string[] args)
        {
            var factory = new ConnectionFactory
            {
                HostName = "192.168.31.212",
                UserName = "Tua",
                Password = "Tua",
                Port = 5672
            };
            using (var connection = factory.CreateConnection())
            {
                using (var channel = connection.CreateModel())
                {
                    channel.QueueDeclare
                    (
                        queue: "Tua",
                        durable: true,
                        exclusive: false,
                        autoDelete: false,
                        arguments: null
                    );
                    for (int m = 0; m < 10; m++)
                    {
                        string marks = string.Empty;
                        for (int n = 0; n <= m; n++)
                        {
                            marks += ">";
                        }
                        string msg = "Mr.Tua" + marks + marks.Length + "s";
                        var body = Encoding.UTF8.GetBytes(msg);
                        var basicProperties = channel.CreateBasicProperties();
                        basicProperties.Persistent = true;
                        channel.BasicPublish
                        (
                            exchange: string.Empty,
                            routingKey: "Tua",
                            basicProperties: basicProperties,
                            body: body
                        );
                        Console.WriteLine("Producer sent message: {0}", msg);
                    }
                    Console.ReadLine();
                }
            }
        }
    }
}
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Linq;
using System.Text;
using System.Threading;

namespace WorkQueueConsumer
{
    class Program
    {
        static void Main(string[] args)
        {
            var factory = new ConnectionFactory
            {
                HostName = "localhost"
            };
            using (var connection = factory.CreateConnection())
            {
                using (var channel = connection.CreateModel())
                {
                    channel.QueueDeclare
                    (
                        queue: "Tua",
                        durable: true,
                        exclusive: false,
                        autoDelete: false,
                        arguments: null
                    );
                    channel.BasicQos
                    (
                        prefetchSize: 0,
                        prefetchCount: 1,
                        global: false
                    );
                    var consumer = new EventingBasicConsumer(channel);
                    consumer.Received += (sender, e) =>
                    {
                        var body = e.Body;
                        var msg = Encoding.UTF8.GetString(body);
                        int marks = msg.ToCharArray().Where(c => c.ToString() == ">").Count();
                        Console.WriteLine("Consumer received message: {0}", msg);
                        Thread.Sleep(marks * 1000);
                        Console.WriteLine("OK");
                        channel.BasicAck
                        (
                            deliveryTag: e.DeliveryTag,
                            multiple: false
                        );
                    };
                    channel.BasicConsume
                    (
                        queue: "Tua",
                        noAck: false,
                        consumer: consumer
                    );
                    Console.ReadLine();
                }
            }
        }
    }
}
时间: 2024-10-30 20:34:43

RabbitMQ --- Work Queues(工作队列)的相关文章

RabbitMQ Work Queues(工作队列)

RabbitMQ Work Queues(工作队列) 工作队列模式为一个生产者对应多个消费者,但是只有一个消费者获得消息,即一个队列被多个消费者监听,但一条消息只能被其中的一个消费者获取 代码如下: 生产者代码: public class WorkSend { private final static String QUEUE_NAME = "hello"; public static void main(String[] args) throws Exception { //获取连接

RabbitMQ (消息队列)专题学习03 Work Queues(工作队列)

一.概述 工作队列(Work queues) (使用Java客户端) 在前面的专题学习中,我们使用Java语言实现了一个简单的从名为"hello"的队列中发送和接收消息的程序,在这部内容中我们将创建一个工作队列被用来分配定时消息任务,而且通过多个接收者(工作者)实现. 工作队列(又名任务队列),主要的思想是为了避免立即做一个资源密集型的任务(多消息同时密集发送),不必等待它完成,当运行许多工作者的让任务都在她们之间共享. 它在web应用中是非常有用的,因为在很短的时间内http请求窗口

2. RabbitMQ 之Work Queues (工作队列)

在上一篇揭开RabbitMQ的神秘面纱一文中,我们编写了程序来发送和接收来自命名队列的消息. 本篇我们将会创建一个 Work Queue(工作队列) 来使用分发任务在多个任务中. 前提:本教程假定RabbitMQ 已在标准端口(15672)上的localhost上安装并运行.如果您使用不同的主机,端口或凭据,则需要调整连接设置. 1. Work Queue 工作队列 工作队列(又称:任务队列)背后的主要思想是避免立即执行资源密集型任务,并且必须等待它完成. 相反,我们安排任务稍后完成.我们将任务

我的RabbitMQ学习2(工作队列)

创建一个工作队列 1.建立一个生成者  //初始化一个连接 生产者 -> (消费者) var factory = new ConnectionFactory() { HostName = "localhost" }; using (var connection = factory.CreateConnection()) using (var channel = connection.CreateModel()) { //对应的队列 channel.QueueDeclare(que

RabbitMQ学习第二记:工作队列的两种分发方式,轮询分发(Round-robin)和 公平分发(Fair dispatch)

1.什么是RabbitMQ工作队列 我们在应用程序使用消息系统时,一般情况下生产者往队列里插入数据时速度是比较快的,但是消费者消费数据往往涉及到一些业务逻辑处理导致速度跟不上生产者生产数据.因此如果一个生产者对应一个消费者的话,很容易导致很多消息堆积在队列里.这时,就得使用工作队列了.一个队列有多个消费者同时消费数据. 下图取自于官方网站(RabbitMQ)的工作队列的图例 P:消息的生产者 C1:消息的消费者1 C2:消息的消费者2 红色:队列 生产者将消息发送到队列,多个消费者同时从队列中获

RabbitMQ入门(二)工作队列

??在文章RabbitMQ入门(一)之Hello World,我们编写程序通过指定的队列来发送和接受消息.在本文中,我们将会创建工作队列(Work Queue),通过多个workers来分配耗时任务. ??工作队列(Work Queue,也被成为Task Queue,任务队列)的中心思想是,避免立即执行一个资源消耗巨大且必须等待其完成的任务.相反地,我们调度好队列可以安排该任务稍后执行.我们将一个任务(task)封装成一个消息,将它发送至队列.一个在后台运行的work进程将会抛出该任务,并最终执

RabbitMQ学习三

Work Queues 在上一篇文章中,send.py程序向名为hello的队列发送消息,receive.py程序向名为hello的队列接收消息.这一节中,我们将创建一个Work Queue用于将那些比较耗时的任务分布到多个worker上. Work Queues工作队列或者叫做Task Queues任务队列的主要概念就是为了避免立刻执行一个耗费资源的任务并且不得不等待它执行完成.取而代之的是,我们将这个任务调度到以后去执行. 我们封装一个任务为一个消息并发送这个消息到队列.一个work pro

十一天 python操作rabbitmq、redis

1.启动rabbimq.mysql 在""运行""里输入services.msc,找到rabbimq.mysql启动即可 2.启动redis 管理员进入cmd,进入redis所在目录,执行redis-server.exe redis.windows.conf --maxmemory 200M  启动redis  server 执行redis-cli.exe启动客户端 一.python系列之 RabbitMQ - work queues 本节我们创建一个工作队列( w

RabbitMQ --- Routing(路由)

目录 RabbitMQ --- Hello Mr.Tua RabbitMQ --- Work Queues(工作队列) RabbitMQ --- Publish/Subscribe(发布/订阅) 前言 在上一章中介绍了 Publish/Subscribe(发布/订阅),它是把每个消息发送给多个 Consumer,也就是说每个 Consumer 都是接收所有的消息,辣么问题来了,如果 Consumer 只接收它想要的某一部分消息,那该怎么办呢?可以通过 Routing(路由)的机制来实现. Dir