RabbitMQ指南(C#)(二)工作队列

上一节我们实现了向指定的队列发送和接收消息。这一节,我们主要讲工作队列,用于在多个消费者之间分配置实时任务。

工作队列方式主要是为了防止在执行一个耗费资源的任务时,要等待其结束才能处理其它事情。我们将任务的执行延迟,将其封装成一个消息,然后发送给一个列队。后台再运行一个程序从队列里取出消息,然后执行任务。如果有多个消费者,还可以分享任务。

对于Web应用程序来说,这样就可以使用Http的短请求来处理复杂的业务。

准备

我们发送一个字符串来代表复杂的任务,然后用Thread.Sleep()来模拟耗时操作,使用点的个数来代表任务的复杂度,一个点代表一秒种的工作,例如Hello…代表一个耗时三秒的任务。

修改上一节示例中Send.cs的代码,允许从命令输入任意消息,然后发送到工作队列,我们将其命名为NewTask.cs。

var message = GetMessage(args);
var body = Encoding.UTF8.GetBytes(message);

var properties = channel.CreateBasicProperties();
properties.SetPersistent(true);

channel.BasicPublish(exchange: "",
                     routingKey: "task_queue",
                     basicProperties: properties,
                     body: body);

GetMessage方法获取输入的消息

private static string GetMessage(string[] args)
{
    return ((args.Length > 0) ? string.Join(" ", args) : "Hello World!");
}

修改上一节中的Receive.cs,按每个点代表一个秒模拟耗时任务,处理来自RabbitMQ的消息并执行任务,我们将其命名为Worker.cs

var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
    var body = ea.Body;
    var message = Encoding.UTF8.GetString(body);
    Console.WriteLine(" [x] Received {0}", message);

    int dots = message.Split(‘.‘).Length - 1;
    Thread.Sleep(dots * 1000);

    Console.WriteLine(" [x] Done");
};
channel.BasicConsume(queue: "task_queue", noAck: true, consumer: consumer);

模拟任务执行的时间

int dots = message.Split(‘.‘).Length - 1;
Thread.Sleep(dots * 1000);

编译运行

循环调度

使用工作队列的好处是很容易实现平分工作。如果要执行的工作会造成积压,只需要运行多个工作者就可以了。

首先,运行两个控制台程序执行Worker.cs,他们都从队列里取消息。这两个控制台程序就是消息者,分别是C1,C2。

再运行一个控制台程序执行NewTask.cs发布任务。启动消费者之后,在控制输入以下消息内容发到队列:

shell3$ NewTask.exe First message.
shell3$ NewTask.exe Second message..
shell3$ NewTask.exe Third message...
shell3$ NewTask.exe Fourth message....
shell3$ NewTask.exe Fifth message.....

然后看看工作都的接收内容:

C1

[*] Waiting for messages. To exit press CTRL+C
[x] Received ‘First message.‘
 [x] Received ‘Third message...‘
 [x] Received ‘Fifth message.....‘

C2

[*] Waiting for messages. To exit press CTRL+C
 [x] Received ‘Second message..‘
 [x] Received ‘Fourth message....‘

RabbitMQ默认按顺序发送消息到每一个消费者,每个消费者会平均取到相同数量的消息,这种分布式消息方式称做轮循调度。也可以添加更多的消费者。

消息确认

执行一个任务需要花费一些时间,那么你可能会考虑如果一个消费者正执行一个耗时的任务时突然崩溃了怎么办。我们目前的代码,一旦RabbitMQ发送消息给消费者,消费会立即被删除。在这种情况下,如果中止一个工作者,就会丢失正在处理的消息,还有当前消费者已接收但还没有被处理的消息也会丢失。

但是我们不想丢失任何任务,如果某一个工作者停止了,我们希望任务会被发送到其它工作者。

为了保证消息不会丢失,RabbitMQ提供了消息确认机制(acknowledgments)。当消息被接收并处理之后,消费者发送一个ack到RabbitMQ,告诉它可以自由删除该消息了。

如果消费者停止了,没有发送ack确认,那么RabbitMQ会认为这个消息没有被处理,它会将这个消费重新入队。如果还有其它的消费者的话,它就会立马把这个消息发送给另一个消费者去处理。这样即使消费者偶尔中止了,那么也不会造成消息丢失。

任何消息都不会超时,如果消费者崩溃了,RabbitMQ会重新发送消息,即使消息处理要发花费很长时间也不会有问题。

消息确认机制默认是开着的。前边的例子都通过设置noAck为true来关闭了消息确认。现在设置noAck为false,当任务执行完成后,工作者发送确认消息。

var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
    var body = ea.Body;
    var message = Encoding.UTF8.GetString(body);
    Console.WriteLine(" [x] Received {0}", message);

    int dots = message.Split(‘.‘).Length - 1;
    Thread.Sleep(dots * 1000);

    Console.WriteLine(" [x] Done");

    channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
};
channel.BasicConsume(queue: "task_queue", noAck: false, consumer: consumer);

上边的代码可以确保工作者正在处理消息时崩溃的话,也不会丢失任何消息。所有未经确认的消息都会重新发送。

消息持久化

我们已经知道如何确认工作者崩溃时,任务也不会丢失。但是如查RabbitMQ也停止的话,任务同样会丢失。

如果RabbitMQ退出或崩溃了,默认就会清除队列和消息。要保证消息不丢失,需要配置队列和消息都要持久化。

首先要保证RabbitMQ不会丢失队列,这需要声明队列为持久队列

channel.QueueDeclare(queue: "hello",
                     durable: true,
                     exclusive: false,
                     autoDelete: false,
                     arguments: null);

从语法上讲,上边的代码没有什么问题,但是这样并不会起作用,因为我们这前已经声明了名为hello的队列,RabbitMQ不允许以不同的参数重新定义已经存在的队列,这样会抛出异常。所以要换个队列名称,如task_queue

channel.QueueDeclare(queue: "task_queue",
                     durable: true,
                     exclusive: false,
                     autoDelete: false,
                     arguments: null);

queueDeclare 的修改要在生产者和消费者两边都进配置。

设置队列持久化之后,需要再设置消息为持久化。

var properties = channel.CreateBasicProperties();
properties.SetPersistent(true);

平衡调度

你可能注意到,目前的分发机制仍然不理想。例如,如果有两个工作者,当奇数的消息十分复杂,而偶数的消息很简单时,一个工作者就会非常繁忙,而另一个工作者几乎没有任务可做。但RabbitMQ不知道这种情况,仍然会平均分配任务。

因为只要有消息进入队列,RabbitMQ就会分发消息。它不会检查每一个消费者未确认的消息个数。它只是盲目的将第N个消息发送给第N个消费者。

为了防止这种情况,要使用basicQos 方法,并设置prefetchCount 参数的值为1。这样RabbitMQ就不会同时发送多个消息给同一个工作者。也就是说,在工作者处理并确认前一个消息之前,不会分发新的消息给工作者。它会把消息发送给下一个不忙的工作者。

channel.BasicQos(0, 1, false);

完整代码

NewTask.cs代码

using System;
using RabbitMQ.Client;
using System.Text;

class NewTask
{
    public static void Main(string[] args)
    {
        var factory = new ConnectionFactory() { HostName = "localhost" };
        using(var connection = factory.CreateConnection())
        using(var channel = connection.CreateModel())
        {
            channel.QueueDeclare(queue: "task_queue",
                                 durable: true,
                                 exclusive: false,
                                 autoDelete: false,
                                 arguments: null);

            var message = GetMessage(args);
            var body = Encoding.UTF8.GetBytes(message);

            var properties = channel.CreateBasicProperties();
            properties.SetPersistent(true);

            channel.BasicPublish(exchange: "",
                                 routingKey: "task_queue",
                                 basicProperties: properties,
                                 body: body);
            Console.WriteLine(" [x] Sent {0}", message);
        }

        Console.WriteLine(" Press [enter] to exit.");
        Console.ReadLine();
    }

    private static string GetMessage(string[] args)
    {
        return ((args.Length > 0) ? string.Join(" ", args) : "Hello World!");
    }
}

Worker.cs代码

using System;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;
using System.Threading;

class Worker
{
    public static void Main()
    {
        var factory = new ConnectionFactory() { HostName = "localhost" };
        using(var connection = factory.CreateConnection())
        using(var channel = connection.CreateModel())
        {
            channel.QueueDeclare(queue: "task_queue",
                                 durable: true,
                                 exclusive: false,
                                 autoDelete: false,
                                 arguments: null);

            channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);

            Console.WriteLine(" [*] Waiting for messages.");

            var consumer = new EventingBasicConsumer(channel);
            consumer.Received += (model, ea) =>
            {
                var body = ea.Body;
                var message = Encoding.UTF8.GetString(body);
                Console.WriteLine(" [x] Received {0}", message);

                int dots = message.Split(‘.‘).Length - 1;
                Thread.Sleep(dots * 1000);

                Console.WriteLine(" [x] Done");

                channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
            };
            channel.BasicConsume(queue: "task_queue",
                                 noAck: false,
                                 consumer: consumer);

            Console.WriteLine(" Press [enter] to exit.");
            Console.ReadLine();
        }
    }
}

使用消息确认机制和BasicQos 就可以建立工作队列,用持久化选项可以保证即使RabbitMQ重启也不会丢失任务。

时间: 2024-08-29 04:43:32

RabbitMQ指南(C#)(二)工作队列的相关文章

RabbitMQ指南之二:工作队列(Work Queues)

在上一章的指南中,我们写了一个命名队列:生产者往该命名队列发送消息.消费从从该命名队列中消费消息.在本章中,我们将创建一个工作队列,用于在多个工作者之间分配耗时的任务.工作队列(即任务队列)的主要思想是避免立即执行那些需要等他们执行完成的资源密集型任务.相反,我们将任务安排在稍后完成.我们将任务封装为消息并将其发送到队列,后台运行的工作进程将取出任务并执行完成.如果你启动了多个工作者,这些任务将在多个工作者之间分享. 这个概念也即我们说的异步,在项目中,有时候一个简单的Web请求,后台要做一系统

Redis 小白指南(二)

Redis 小白指南(二) 引言 目录 基础命令 字符串类型 散列类型 列表类型 集合类型 有序集合类型 基础命令 1.获得符合规则的键名列表 KEYS pattern pattern 支持 glob 风格通配符: 2.判断一个键是否存在 EXISTS key 如果键存在则返回整数类型 1,否则返回 0 3.删除键 DEL key [key ...] 可以删除一个或者多个键,返回值是删除的键的个数 4.获得键值的数据类型 TYPE key 字符串类型 1.介绍 字符串类型是 Redis 中最基本

分布式消息系统Jafka入门指南之二

分布式消息系统Jafka入门指南之二 作者:chszs,转载需注明.博客主页:http://blog.csdn.net/chszs 三.Jafka的文件夹结构 1.安装tree命令 $ sudo yum install tree 2.查看文件夹 $ tree -L 1 . ?..? ? bin ? ..?? conf ?..?? data ? ..?? lib ? ..?? LICENSE ?..? ? logs ?..?? VERSION 说明:bin文件夹:命令行脚本conf文件夹:存放配置

Maven入门指南(二)

转载自并发编程网 – ifeve.com本文链接地址: Maven入门指南(二) Maven目录结构 Maven有一个标准的目录结构.如果你在项目中遵循Maven的目录结构,就无需在pom文件中指定源代码.测试代码等目录. Maven的目录结构布局,参考Maven标准目录结构介绍 以下为最重要的目录: - src - main - java - resources - webapp - test - java - resources - target src目录是源代码和测试代码的根目录.mai

转 猫都能学会的Unity3D Shader入门指南(二)

猫都能学会的Unity3D Shader入门指南(二) 关于本系列 这是Unity3D Shader入门指南系列的第二篇,本系列面向的对象是新接触Shader开发的Unity3D使用者,因为我本身自己也是Shader初学者,因此可能会存在错误或者疏漏,如果您在Shader开发上有所心得,很欢迎并恳请您指出文中纰漏,我会尽快改正.在之前的开篇中介绍了一些Shader的基本知识,包括ShaderLab的基本结构和语法,以及简单逐句地讲解了一个基本的shader.在具有这些基础知识后,阅读简单的sha

Android Gradle Plugin指南(二)——基本项目

原文地址:http://tools.android.com/tech-docs/new-build-system/user-guide#TOC-Basic-Project 3.Basic Project(基本项目) 一个Gradle项目的构建过程定义在build.gradle文件中,位于项目的根目录下. 3.1 Simple build files(简单的构建文件) 一个最简单的Gradle纯Java项目的build.gradle文件包含以下内容: apply plugin: 'java' 这里

开发指南专题二:JEECG微云高速开发平台JEECG框架初探

开发指南专题二:JEECG微云高速开发平台JEECG框架初探 2.JEECG框架初探 2.1演示系统 打开浏览器输入JEECG演示环境界址:http://demo.jeecg.org:8090/能够看到如图21所看到的的登录界面., 图21演示系统登录界面 点击[登陆]button,进入演示系统的主界面,如图22所看到的. 图22演示系统主界面 在JEECG演示系统中的功能模块包含系统管理.流程管理.业务申请.业务办理.经常使用功能演示等.当中,用户管理.流程设计器的界面截图如图23和图24所看

WSS(Windows Storage Server)2008R2使用指南(二)安装篇

WSS2008专题内容: WSS(Windows Storage Server)2008R2使用指南(一)下载篇 WSS(Windows Storage Server)2008R2使用指南(二)安装篇 WSS(Windows Storage Server)2008R2使用指南(三)配置及使用篇 PartII 安装篇 准备一台Windows Server 2008 R2作为WSS服务器,安装Windows Storage Server 2008 R2企业版组件,选择下图: 打开软件,弹出对话框,点

Asp.Net MVC4.0 官方教程 入门指南之二--添加一个控制器

Asp.Net MVC4.0 官方教程 入门指南之二--添加一个控制器 MVC概念 MVC的含义是 “模型-视图-控制器”.MVC是一个架构良好并且易于测试和易于维护的开发模式.基于MVC模式的应用程序包含: · Models: 表示该应用程序的数据并使用验证逻辑来强制实施业务规则的数据类. · Views: 应用程序动态生成 HTML所使用的模板文件. · Controllers: 处理浏览器的请求,取得数据模型,然后指定要响应浏览器请求的视图模板. 本系列教程,我们将覆盖所有这些概念,并告诉