如何优雅的使用RabbitMQ

RabbitMQ无疑是目前最流行的消息队列之一,对各种语言环境的支持也很丰富,作为一个.NET developer有必要学习和了解这一工具。消息队列的使用场景大概有3种:

1、系统集成,分布式系统的设计。各种子系统通过消息来对接,这种解决方案也逐步发展成一种架构风格,即“通过消息传递的架构”。

2、当系统中的同步处理方式严重影响了吞吐量,比如日志记录。假如需要记录系统中所有的用户行为日志,如果通过同步的方式记录日志势必会影响系统的响应速度,当我们将日志消息发送到消息队列,记录日志的子系统就会通过异步的方式去消费日志消息。

3、系统的高可用性,比如电商的秒杀场景。当某一时刻应用服务器或数据库服务器收到大量请求,将会出现系统宕机。如果能够将请求转发到消息队列,再由服务器去消费这些消息将会使得请求变得平稳,提高系统的可用性。

一、开始使用RabbitMQ

RabbitMQ官网提供了详细的安装步骤,另外官网还提供了RabbitMQ在六种场景的使用教程。其中教程1、3、6将覆盖99%的使用场景,所以正常来说只需要搞清楚这3个教程即可快速上手。

二、简单分析

我们以官方提供的教程1做个简单梳理:该教程展示了Producer如何向一个消息队列(message queue)发送一个消息(message),消息消费者(Consumer)收到该消息后消费该消息。

1、producer端:


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

var factory = new ConnectionFactory() { HostName = "localhost" };

 using (var connection = factory.CreateConnection())

 {

     while (Console.ReadLine() != null)

     {

         using (var channel = connection.CreateModel())

         {

             //创建一个名叫"hello"的消息队列

             channel.QueueDeclare(queue: "hello",

                 durable: false,

                 exclusive: false,

                 autoDelete: false,

                 arguments: null);

             var message = "Hello World!";

             var body = Encoding.UTF8.GetBytes(message);

             //向该消息队列发送消息message

             channel.BasicPublish(exchange: "",

                 routingKey: "hello",

                 basicProperties: null,

                 body: body);

             Console.WriteLine(" [x] Sent {0}", message);

         }

     }

 }

该段代码非常简单,几乎到了无法精简的地步:创建了一个信道(channel)->创建一个队列->向该队列发送消息。

2、Consumer端


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

var factory = new ConnectionFactory() { HostName = "localhost" };

 using (var connection = factory.CreateConnection())

 {

     using (var channel = connection.CreateModel())

     {

         //创建一个名为"hello"的队列,防止producer端没有创建该队列

         channel.QueueDeclare(queue: "hello",

                              durable: false,

                              exclusive: false,

                              autoDelete: false,

                              arguments: null);

         //回调,当consumer收到消息后会执行该函数

         var consumer = new EventingBasicConsumer(channel);

         consumer.Received += (model, ea) =>

         {

             var body = ea.Body;

             var message = Encoding.UTF8.GetString(body);

             Console.WriteLine(" [x] Received {0}", message);

         };

         //消费队列"hello"中的消息

         channel.BasicConsume(queue: "hello",

                              noAck: true,

                              consumer: consumer);

         Console.WriteLine(" Press [enter] to exit.");

         Console.ReadLine();

     }

 }

该段代码可以理解为:创建信道->创建队列->定义回调函数->消费消息。

该实例描述了Send/Receive模式,可以简单理解为1(producer) VS 1(consumer)的场景;

实例3则描述了Publish/Subscriber模式,即1(producer) VS 多个(consumer);

在以上两个示例中,producer只需要发送消息即可,并不关心consumer的返回结果。实例6则描述了一个RPC调用场景,producer发送消息后还要接收consumer的返回结果,这一场景看起来跟使用消息队列的目的有点相悖。因为使用消息队列的目的之一就是要异步,但是这一场景似乎又将异步变成了同步,不过这一场景也很有用,比如一个用户操作产生了一个消息,应用服务收到该消息后执行了一些逻辑并使得数据库发生了变化,UI会一直等待应用服务的返回结果才刷新页面。

三、 发现抽象

我桌子上放着一本RabbitMQ in Action,另外官网提供的文档也很详细,我感觉在一个月内我就能精通RabbitMQ,到时候简历上又可以写上“精通…”,感觉有点小得意呢... ,但是我知道这并不是使用RabbitMQ的最佳方式。

我们知道合理的抽象可以帮我们隐藏掉一些技术细节,让我们将重心放在核心业务上,比如一个人问你:“大雁塔如何走?”你的回答可能是“小寨往东,一直走两站,右手边”,如果你回答:“右转45度,向前走100米,再转90度…”,对方就会迷失在这些细节中。

消息队列的使用过程中实际隐藏着一种抽象——服务总线(Service Bus)。

我们在回头看第一个例子,这个例子隐含的业务是:ClientA发送一个指令,ClientB收到该指令后做出反应。如果是这样,我们为什么要关心如何创建channel,如何创建一个queue? 我仅仅是要发送一个消息而已。另外这个例子写的其实不够健壮:

没有重试机制:如果ClientB第一次没有执行成功如何对该消息处理?

没有错误处理机制:如果ClientB在重试了N次之后还是异常如何处理该消息?

没有熔断机制;

如何对ClientA做一个schedule(计划安排),比如定时发送等;

没有消息审计机制;

无法对消息的各个状态做追踪;

事物处理等。

服务总线正是这种场景的抽象,并且为我们提供了这些机制,让我们赶快来看个究竟吧。

四、初识MassTransit

MassTransit是.NET平台下的一款开源免费的ESB产品,官网:http://masstransit-project.com/,GitHub 700 star,500 Fork,类似的产品还有NServiceBus,之所以要选用MassTransit是因为他要比NServiceBus轻量级,另外在MassTransit开发之初就选用了RabbitMQ作为消息传输组建;同时我想拿他跟NServiceBus做个比较,看看他们到底有哪些侧重点。

1、新建控制台应用程序:Masstransit.RabbitMQ.GreetingClient

使用MassTransit可以从Nuget中安装:


1

Install-Package MassTransit.RabbitMQ

2、创建服务总线,发送一个命令


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

static void Main(string[] args)

{

    Console.WriteLine("Press ‘Enter‘ to send a message.To exit, Ctrl + C");

    var bus = BusCreator.CreateBus();

    var sendToUri = new Uri($"{RabbitMqConstants.RabbitMqUri}{RabbitMqConstants.GreetingQueue}");

    while (Console.ReadLine()!=null)

    {

        Task.Run(() => SendCommand(bus, sendToUri)).Wait();

    }

    Console.ReadLine();

}

private static async void SendCommand(IBusControl bus,Uri sendToUri)

{

    var endPoint =await bus.GetSendEndpoint(sendToUri);

    var command = new GreetingCommand()

    {

        Id = Guid.NewGuid(),

        DateTime = DateTime.Now

    };

    await endPoint.Send(command);

    Console.WriteLine($"send command:id={command.Id},{command.DateTime}");

}

这一段代码隐藏了众多关于消息队列的细节,将我们的注意力集中在发送消息上,同时ServiceBus提供的API也更接近业务,我们虽然发送的是一个消息,但是在这种场景下体现出来是一个命令,Send(command)这一API描述了我们的意图。

3、服务端接收这一命令

新建一个命令台控制程序:Masstransit.RabbitMQ.GreetingServer


1

2

3

4

5

6

7

8

var bus = BusCreator.CreateBus((cfg, host) =>

{

    cfg.ReceiveEndpoint(host, RabbitMqConstants.GreetingQueue, e =>

    {

        e.Consumer<GreetingConsumer>();

    });

});

这一代码可以理解为服务端在监听消息,我们在服务端注册了一个名为“GreetingConsumer”的消费者,GreetingConsumer的定义:


1

2

3

4

5

6

7

8

public class GreetingConsumer :IConsumer<GreetingCommand>

{

    public async Task Consume(ConsumeContext<GreetingCommand> context)

    {

        await Console.Out.WriteLineAsync($"receive greeting commmand: {context.Message.Id},{context.Message.DateTime}");

    }

}

该consumer可以消费类型为GreetingCommand的消息。这一实例几乎隐藏了有关RabbitMQ的技术细节,将代码中心放在了业务中,将这两个控制台应用跑起来试试:

五、实现Publish/Subscribe模式

发布/订阅模式使得基于消息传递的软件架构成为可能,这一能力表现为ClientA发送消息X,ClientB和ClientC都可以订阅消息X。

1、我们在上面的例子中改造一下,当GreetingConsumer收到GreetingCommand后发送一个GreetingEvent:


1

2

3

4

5

6

7

var greetingEvent = new GreetingEvent()

 {

     Id = context.Message.Id,

     DateTime = DateTime.Now

 };

 await context.Publish(greetingEvent);

2、新建控制台程序Masstransit.RabbitMQ.GreetingEvent.SubscriberA用来订阅GreetingEvent消息:


1

2

3

4

5

6

7

8

9

var bus = BusCreator.CreateBus((cfg, host) =>

 {

     cfg.ReceiveEndpoint(host, RabbitMqConstants.GreetingEventSubscriberAQueue, e =>

     {

         e.Consumer<GreetingEventConsumer>();

     });

 });

 bus.Start();

定义GreetingEventConsumer:


1

2

3

4

5

6

7

public class GreetingEventConsumer:IConsumer<Greeting.Message.GreetingEvent>

 {

     public async Task Consume(ConsumeContext<Greeting.Message.GreetingEvent> context)

     {

         await Console.Out.WriteLineAsync($"receive greeting event: id {context.Message.Id}");

     }

 }

这一代码跟Masstransit.RabbitMQ.GreetingServer接受一个命令几乎一模一样,唯一的区别在于:

在Send/Receive模式中Client首先要获得对方(Server)的终结点(endpoint),直接向该终结点发送命令。Server方监听自己的终结点并消费命令。

而Publish/Subscribe模式中Client publish一个事件,SubscriberA在自己的终结点(endpointA)监听事件,SubscriberB在自己的终结点(endpointB)监听事件。

3、根据上面的分析再定义一个Masstransit.RabbitMQ.GreetingEvent.SubscriberB

4、将4个控制台应用程序跑起来看看

六、实现RPC模式

这一模式在Masstransit中被称作Request/Response模式,通过IRequestClient<IRequest, IResponse> 接口来实现相关操作。一个相关的例子在官方的github

结束语:本篇文章分析了如何使用Masstransit来抽象业务,避免直接使用具体的消息队列,当然本文提到的众多服务总线机制,如“重试、熔断等”并没有在该文中出现,需要大家进一步去了解该项目。

通过对Masstransit的一些试用和NServiceBus的对比,Masstransit在实际项目中很容易上手并且免费,各种API定义的也非常清晰,但是官方的文档有点过于简单,实际使用中还需要去做深入的研究。作为.NET平台下为数不多的ESB开源产品,其关注程度还是不够,期待大家为开源项目做出贡献。

本文例子提供下载:http://git.oschina.net/richieyangs/RabbitMQ.Practice

来源:http://www.cnblogs.com/richieyang/

时间: 2024-10-21 10:08:08

如何优雅的使用RabbitMQ的相关文章

Masstransit开发基于消息传递的分布式应用

使用Masstransit开发基于消息传递的分布式应用 Masstransit作为.Net平台下的一款优秀的开源产品却没有得到应有的关注,这段时间有机会阅读了Masstransit的源码,我觉得我有必要普及一下这个框架的使用. 值得一提的是Masstransit的源码写的非常优秀,值得每个想提高自己编程能力的.Net选手阅读,整个代码看起来赏心悦目.反之,每次打开自己公司项目的时候心情都异常沉重.所以不是.Net不行,还是咱们水平不行. 学会了Masstransit你再也不用羡慕别人有Dubbo

上周热点回顾(5.16-5.22)

热点随笔: · [无私分享:从入门到精通ASP.NET MVC]从0开始,一起搭框架.做项目(5.2) 登录功能的实现,接口注入.log4net的使用(果冻布丁喜之郎)· [无私分享:从入门到精通ASP.NET MVC]从0开始,一起搭框架.做项目(5.3) 登录功能的实现,丰富数据表.建立关联(果冻布丁喜之郎)· 微软.NET Core RC2正式发布,横跨所有平台(张善友)· [无私分享:从入门到精通ASP.NET MVC]从0开始,一起搭框架.做项目(5.4) 登录功能的实现,创建与登录用

杂项之rabbitmq

杂项之rabbitmq 本节内容 rabbitmq简介 AMQP协议 rabbitmq使用 应用举例 rabbitmq简介 介绍rabbitmq之前,先介绍一下AMQP协议,因为rabbitmq就是基于AMQP协议实现的一个服务程序.(目前为止应该也是唯一实现了AMQP协议的服务) AMQP(高级消息队列协议)是一个网络协议.它支持符合要求的客户端应用(application)和消息中间件代理(messaging middleware broker)之间进行通信. arbbitmq使用erlan

消息队列_RabbitMQ-0003.深入RabbitMQ节点/配置/管理及日志实时化?

理解节点: 说明: 节点是指Erlang节点,而且节点之间支持相互通信,RabbitMQ应用跑在Erlang节点之上,应用崩溃,Erlang节点会自动尝试重启应用程序,前提是Erlang本身没有崩溃,节点日志默认位于var/log/rabbitmq/[email protected]log 基本管理: 启动节点: rabbitmq-server -detached 关闭节点: rabbitmqctl stop -n [email protected] 说明: rabbitmq-server启动后

(转)RabbitMQ消息队列(三):任务分发机制

在上篇文章中,我们解决了从发送端(Producer)向接收端(Consumer)发送“Hello World”的问题.在实际的应用场景中,这是远远不够的.从本篇文章开始,我们将结合更加实际的应用场景来讲解更多的高级用法. 当有Consumer需要大量的运算时,RabbitMQ Server需要一定的分发机制来balance每个Consumer的load.试想一下,对于web application来说,在一个很多的HTTP request里是没有时间来处理复杂的运算的,只能通过后台的一些工作线程

rabbitmq集群故障恢复详解

RabbitMQ的mirror queue(镜像队列)机制是最简单的队列HA方案,它通过在cluster的基础上增加ha-mode.ha-param等policy选项,可以根据 需求将cluster中的队列镜像到多个节点上,从而实现高可用,消除cluster模式中队列内容单点带来的风险. 在使用镜像队列之前,有几点注意事项必须熟记于心(下文中将混用主节点和master,从节点和slave): 1. 镜像队列不能作为负载均衡使用,因为每个操作在所有节点都要做一遍. 2. ha-mode参数和dur

RabbitMQ消息队列(三):任务分发机制

在上篇文章中,我们解决了从发送端(Producer)向接收端(Consumer)发送“Hello World”的问题.在实际的应用场景中,这是远远不够的.从本篇文章开始,我们将结合更加实际的应用场景来讲解更多的高级用法. 当有Consumer需要大量的运算时,RabbitMQ Server需要一定的分发机制来balance每个Consumer的load.试想一下,对于web application来说,在一个很多的HTTP request里是没有时间来处理复杂的运算的,只能通过后台的一些工作线程

封装 RabbitMQ.NET

这篇文章内容会很短,主要是想给大家分享下我最近在做一个简单的rabbitmq客户端类库的封装的经验总结,说是简单其实一点都不简单.为了节省时间我主要按照Library的执行顺序来介绍,在你看来这里仅仅是一个简单的经验总结,但是在我看来这些经验只有在你真正的封装rabbitmq客户端库的时候且将你的客户端安全稳定的发布上线后才会真的发现这些问题. 比如你的库只是链接单个Node的时候和链接高可用集群的HAProxy时候是完全两回事.当你未能在你的库里使用反向注入LOG接口的时候一旦在线上发生网络解

RabbitMq学习一入门篇(hello world)

简介  RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如:Python.Ruby..NET.Java,也是众多消息队列中表现不俗的一员,作用就是提高系统的并发性,将一些不需要及时响应客户端且占用较多资源的操作,放入队列,再由另外一个线程,去异步处理这些队列,可极大的提高系统的并发能力. 安装    安装RabbitMq需要erlang,点击下载 去官网下载Rabbit安装包,点击下载 启用web管理界面,启用方式->打开CMD命令,cd到安装目录sbi