【RabbitMQ】5、RabbitMQ任务分发机制

当有Consumer需要大量的运算时,RabbitMQ Server需要一定的分发机制来balance每个Consumer的load。接下来我们分布讲解。

应用场景就是RabbitMQ Server会将queue的Message分发给不同的Consumer以处理计算密集型的任务:

1. Message acknowledgment 消息确认

每个Consumer可能需要一段时间才能处理完收到的数据。如果在这个过程中,Consumer出错了,异常退出了,而数据还没有处理完成,那么 非常不幸,这段数据就丢失了。因为我们采用no-ack的方式进行确认,也就是说,每次Consumer接到数据后,而不管是否处理完 成,RabbitMQ Server会立即把这个Message标记为完成,然后从queue中删除了。

如果一个Consumer异常退出了,它处理的数据能够被另外的Consumer处理,这样数据在这种情况下就不会丢失了(注意是这种情况下)。

为了保证数据不被丢失,RabbitMQ支持消息确认机制,即acknowledgments。为了保证数据能被正确处理而不仅仅是被Consumer收到,那么我们不能采用no-ack。而应该是在处理完数据后发送ack。

在处理数据后发送的ack,就是告诉RabbitMQ数据已经被接收,处理完成,RabbitMQ可以去安全的删除它了。

如果Consumer退出了但是没有发送ack,那么RabbitMQ就会把这个Message发送到下一个Consumer。这样就保证了在Consumer异常退出的情况下数据也不会丢失。

这里并没有用到超时机制。RabbitMQ仅仅通过Consumer的连接中断来确认该Message并没有被正确处理。也就是说,RabbitMQ给了Consumer足够长的时间来做数据处理。

这样即使你通过Ctr-C中断了Recieve.cs,那么Message也不会丢失了,它会被分发到下一个Consumer。

如果忘记了ack,那么后果很严重。当Consumer退出时,Message会重新分发。然后RabbitMQ会占用越来越多的内存,由于 RabbitMQ会长时间运行,因此这个“内存泄漏”是致命的。去调试这种错误,可以通过一下命令打印un-acked Messages.

2. Round-robin dispatching 循环分发

RabbitMQ的分发机制非常适合扩展,而且它是专门为并发程序设计的。如果现在load加重,那么只需要创建更多的Consumer来进行任务处理即 可。当然了,对于负载还要加大怎么办?我没有遇到过这种情况,那就可以创建多个virtual Host,细化不同的通信类别了。

1、首先开启两个Consumer,即运行两个Recieve.cs。

2、在开启两个Producer,即运行两个Producer.cs。

默认情况下,RabbitMQ 会顺序的分发每个Message。当每个收到ack后,会将该Message删除,然后将下一个Message分发到下一个Consumer。这种分发方式叫做round-robin(优雅分发)。

Producer.cs

class Program
    {
        static void Main(string[] args)
        {
            ConnectionFactory factory = new ConnectionFactory() { HostName = "localhost" };
            using (IConnection connection = factory.CreateConnection())
            {
                using (IModel channel = connection.CreateModel())
                {
                    channel.QueueDeclare("hello", false, false, false, null);
                    var message = GetMessage(args);
                    var body = Encoding.UTF8.GetBytes(message);

                    var properties = channel.CreateBasicProperties();
                    properties.DeliveryMode = 2;//non-persistent (1) or persistent (2)
                    //channel.TxSelect();
                    channel.BasicPublish("", "hello", properties, body);
                    //channel.TxCommit();
                }
            }
        }

        private static string GetMessage(string[] args)
        {
            return ((args.Length > 0) ? string.Join(" ", args) : "Hello World!");
        }
    }

Consumer.cs

//#define demo1
#define demo2
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace ReceiveDemo2
{
    /// <summary>
    /// 一个Send和多个Receive的例子,
    /// 还加上了ack的例子.
    /// 优雅分发
    /// </summary>
    class Program
    {
        static void Main(string[] args)
        {
            var factory = new ConnectionFactory() { HostName = "localhost" };
            using (var connection = factory.CreateConnection())
            {
                using (var channel = connection.CreateModel())
                {
                    channel.QueueDeclare("hello", false, false, false, null);
                    var consumer = new QueueingBasicConsumer(channel);
#if demo1
                    channel.BasicConsume("hello", true, consumer);//自动删除消息
#else
                    channel.BasicConsume("hello", false, consumer);//需要接受方发送ack回执,删除消息
#endif
                    Console.WriteLine(" [*] Waiting for messages." + "To exit press CTRL+C");
                    while (true)
                    {
                        var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();//挂起的操作
#if demo2
                        channel.BasicAck(ea.DeliveryTag, false);//与channel.BasicConsume("hello", false, null, consumer);对应
#endif
                        var body = ea.Body;
                        var message = Encoding.UTF8.GetString(body);
                        Console.WriteLine(" [x] Received {0}", message);
                        int dots = message.Split(‘.‘).Length - 1;
                        Thread.Sleep(dots * 1000);
                        Console.WriteLine(" [x] Done");
#if demo2
                        //channel.BasicAck(ea.DeliveryTag, false);//与channel.BasicConsume("hello", false, null, consumer);对应,这句话写道40行和49行运行结果就会不一样.写到这里会发生如果输出[x] Received {0}之后,没有输出 [x] Done之前,CTRL+C结束程序,那么message会自动发给另外一个客户端,当另外一个客户端收到message后,执行完49行的命令之后,服务器会删掉这个message
#endif
                    }
                }
            }
        }
    }
}

3. Message durability消息持久化

在上一节中我们知道了即使Consumer异常退出,Message也不会丢失。但是如果RabbitMQ Server退出呢?软件都有bug,即使RabbitMQ Server是完美毫无bug的(当然这是不可能的,是软件就有bug,没有bug的那不叫软件),它还是有可能退出的:被其它软件影响,或者系统重启 了,系统panic了。。。

为了保证在RabbitMQ退出或者crash了数据仍没有丢失,需要将queue和Message都要持久化。queue的持久化需要在声明时指定durable=True,修改Producer和Consumer的channel.QueueDeclare代码,再次强调,Producer和Consumer都应该去创建这个queue,尽管只有一个地方的创建是真正起作用的:

bool durable = true;
channel.QueueDeclare("hello", durable, false, false, null);

上述语句执行不会有什么错误,但是确得不到我们想要的结果,原因就是RabbitMQ Server已经维护了一个叫hello的queue,那么上述执行不会有任何的作用,也就是hello的任何属性都不会被影响。这一点在上篇文章也讨论过。

那么workaround也很简单,声明一个另外的名字的queue,比如名字定位task_hello,或者通过监控http://localhost:15672/,删除名为“hello”的Queue。

接下来,还需要持久化Message,即在Producer.cs里面Publish的时候指定一个properties,方式如下:

static void Main(string[] args)
        {
            var factory = new ConnectionFactory() { HostName = "localhost" };
            using (var connection = factory.CreateConnection())
            {
                using (var channel = connection.CreateModel())
                {
                    bool durable = true;
                    channel.QueueDeclare("task_queue", durable, false, false, null);//queue的持久化需要在声明时指定durable=True
                    var message = GetMessage(args);
                    var body = Encoding.UTF8.GetBytes(message);
                    var properties = channel.CreateBasicProperties();
                    properties.SetPersistent(true);//需要持久化Message,即在Publish的时候指定一个properties,
                    channel.BasicPublish("", "task_hello", properties, body);
                }
            }
        }

关于持久化的进一步讨论:

为了数据不丢失,我们采用了:

  1. 在数据处理结束后发送ack,这样RabbitMQ Server会认为Message Deliver 成功。
  2. 持久化queue,可以防止RabbitMQ Server 重启或者crash引起的数据丢失。
  3. 持久化Message,理由同上。

但是这样能保证数据100%不丢失吗?

答案是否定的。问题就在与RabbitMQ需要时间去把这些信息存到磁盘上,这个time window虽然短,但是它的确还是有。在这个时间窗口内如果数据没有保存,数据还会丢失。还有另一个原因就是RabbitMQ并不是为每个Message都做fsync:它可能仅仅是把它保存到Cache里,还没来得及保存到物理磁盘上。

因此这个持久化还是有问题。但是对于大多数应用来说,这已经足够了。当然为了保持一致性,你可以把每次的publish放到一个transaction中。这个transaction的实现需要user defined codes。

那么商业系统会做什么呢?一种可能的方案是在系统panic时或者异常重启时或者断电时,应该给各个应用留出时间去flash cache,保证每个应用都能exit gracefully。

4. Fair dispatch 公平分发

那么如果有个Consumer工作比较重,那么就会导致有的Consumer基本没事可做,有的Consumer却是毫无休息的机会。那么,RabbitMQ是如何处理这种问题呢?

通过 BasicQos 方法设置prefetchCount = 1。这样RabbitMQ就会使得每个Consumer在同一个时间点最多处理一个Message。换句话说,在接收到该Consumer的ack前,他它不会将新的Message分发给它。 设置方法如下:

channel.BasicQos(0, 1, false);

注意,这种方法可能会导致queue满。当然,这种情况下你可能需要添加更多的Consumer,或者创建更多的virtualHost来细化你的设计。

Consumer.cs

static void Main(string[] args)
        {
            var factory = new ConnectionFactory() { HostName = "localhost" };
            using (var connection = factory.CreateConnection())
            {
                using (var channel = connection.CreateModel())
                {
                    bool durable = true;
                    channel.QueueDeclare("task_queue", durable, false, false, null);
                    channel.BasicQos(0, 1, false);//这样RabbitMQ就会使得每个Consumer在同一个时间点最多处理一个Message。换句话说,在接收到该Consumer的ack前,他它不会将新的Message分发给它。
                    var consumer = new QueueingBasicConsumer(channel);
                    channel.BasicConsume("task_hello", false, null, consumer);//需要接受方发送ack回执,删除消息
                    Console.WriteLine(" [*] Waiting for messages." + "To exit press CTRL+C");
                    while (true)
                    {
                        var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();//挂起的操作
                        channel.BasicAck(ea.DeliveryTag, false);//与channel.BasicConsume("task_queue", false, null, consumer);对应
                        var body = ea.Body;
                        var message = Encoding.UTF8.GetString(body);
                        Console.WriteLine(" [x] Received {0}", message);
                        int dots = message.Split(‘.‘).Length - 1;
                        Thread.Sleep(dots * 1000);
                        Console.WriteLine(" [x] Done");
                    }
                }
            }
        }

出处:https://www.cnblogs.com/qiyebao/p/4205626.html

时间: 2024-10-09 15:50:10

【RabbitMQ】5、RabbitMQ任务分发机制的相关文章

(转)RabbitMQ消息队列(三):任务分发机制

在上篇文章中,我们解决了从发送端(Producer)向接收端(Consumer)发送“Hello World”的问题.在实际的应用场景中,这是远远不够的.从本篇文章开始,我们将结合更加实际的应用场景来讲解更多的高级用法. 当有Consumer需要大量的运算时,RabbitMQ Server需要一定的分发机制来balance每个Consumer的load.试想一下,对于web application来说,在一个很多的HTTP request里是没有时间来处理复杂的运算的,只能通过后台的一些工作线程

RabbitMQ消息队列(三):任务分发机制

在上篇文章中,我们解决了从发送端(Producer)向接收端(Consumer)发送“Hello World”的问题.在实际的应用场景中,这是远远不够的.从本篇文章开始,我们将结合更加实际的应用场景来讲解更多的高级用法. 当有Consumer需要大量的运算时,RabbitMQ Server需要一定的分发机制来balance每个Consumer的load.试想一下,对于web application来说,在一个很多的HTTP request里是没有时间来处理复杂的运算的,只能通过后台的一些工作线程

RabbitMQ(三):任务分发机制

 在上篇文章中,我们解决了从发送端(Producer)向接收端(Consumer)发送"Hello World"的问题.在实际的应用场景中,这是远远不够的.从本篇文章开始,我们将结合更加实际的应用场景来讲解更多的高级用法. 当有Consumer需要大量的运算时,RabbitMQ Server需要一定的分发机制来balance每个Consumer的load.试想一下,对于web application来说,在一个很多的HTTP request里是没有时间来处理复杂的运算的,只能通过后

RabbitMQ消息队列(三):任务分发机制[转]

在上篇文章中,我们解决了从发送端(Producer)向接收端(Consumer)发送“Hello World”的问题.在实际的应用场景中,这是远远不够的.从本篇文章开始,我们将结合更加实际的应用场景来讲解更多的高级用法. 当有Consumer需要大量的运算时,RabbitMQ Server需要一定的分发机制来balance每个Consumer的load.接下来我们分布讲解. 应用场景就是RabbitMQ Server会将queue的Message分发给不同的Consumer以处理计算密集型的任务

RabbitMQ 异常与任务分发

RabbitMQ 异常与任务分发 异常情况处理 上篇最后提到了这个问题, consumer异常退出.queue出错.甚至rabbitMQ崩溃.因为它们都是软件 ,软件都会有bug,这是无法避免的.所以RabbitMQ在设计的时候也想到了这一点 在之前,消息分发给consumer后立即就会被标记为已消费,这时候如果consumber接到了一个消息但是还没有来的及处理就异常退出,那么这个消息的状态是已被消费的,于是就会造成消息丢失的问题. 可以看到在进行消费的方法里,第二个参数noAck(不进行确认

rabbitMQ基础知识--消息确认机制

一:确认种类 RabbitMQ的消息确认有两种. 一种是消息发送确认.这种是用来确认生产者将消息发送给交换器,交换器传递给队列的过程中,消息是否成功投递.发送确认分为两步,一是确认是否到达交换器,二是确认是否到达队列. 第二种是消费接收确认.这种是确认消费者是否成功消费了队列中的消息. 具体建议参考:https://www.cnblogs.com/nizuimeiabc1/p/9397326.html 这里我们重点研究下接收确认的情况. 为了保证消息从队列可靠的到达消费者,RabbitMQ提供了

从ViewPager嵌套RecyclerView再嵌套RecyclerView看安卓事件分发机制

这两天伟大的PM下了一个需求,在一个竖滑列表里实现一个横向滑动的列表,没错,又是这种常见但是又经常被具有着强烈责任心和职业操守程序员所嗤之以鼻的效果,废话不多说,先上图: 实现的方式很多,因为项目中已经ViewPager+RV实现基本框架,所以现我也选择再添加一个RV实现相应的效果. 不过在写代码之前,先预估一下这个效果所有的坑. VP是横向滑动的,RV是竖向滑动的,那么现在再添加一个横向滑动的RV,肯定会有滑动冲突,主要表现在 VP和横向滑动RV 的冲突,因为两者都是横向滑动的,肯定有冲突,无

Android的Touch事件分发机制简单探析

前言 Android中关于触摸事件的分发传递是一个很值得研究的东西.曾不见你引入了一个ListView的滑动功能,ListView就不听你手指的指唤来滚动了:也不知道为啥Button设置了onClick和onTouch,其中谁会先响应:或许你会问onTouch和onTouchEvent有什么区别,又该如何使用?这里一切的一切,只要你了解了事件分发机制,你会发现,解释这都不是事儿! 相关Touch事件的方法 1.public boolean dispatchTouchEvent(MotionEve

Android事件分发机制详解:史上最全面、最易懂

前言 Android事件分发机制是每个Android开发者必须了解的基础知识 网上有大量关于Android事件分发机制的文章,但存在一些问题:内容不全.思路不清晰.无源码分析.简单问题复杂化等等 今天,我将全面总结Android的事件分发机制,我能保证这是市面上的最全面.最清晰.最易懂的 本文秉着"结论先行.详细分析在后"的原则,即先让大家感性认识,再通过理性分析从而理解问题: 所以,请各位读者先记住结论,再往下继续看分析: 文章较长,阅读需要较长时间,建议收藏等充足时间再进行阅读 目