RabbitMQ的几种应用场景

RabbitMQ是一个消息代理,从“生产者”接收消息并传递消息至“消费者”,期间可根据规则路由、缓存、持久化消息。“生产者”也即message发送者以下简称P,相对应的“消费者”乃message接收者以下简称C,message通过queue由P到C,queue存在于RabbitMQ,可存储尽可能多的message,多个P可向同一queue发送message,多个C可从同一个queue接收message。

应用场景1-“Hello Word”

一个P向queue发送一个message,一个C从该queue接收message并打印。

producer,连接至RabbitMQ Server,声明队列,发送message,关闭连接,退出。

consumer,连接至RabbitMQ Server,声明队列,接收消息并进行处理这里为打印出消息,退出。

 1 using System;
 2 using RabbitMQ.Client;
 3 using System.Text;
 4
 5 class Send
 6 {
 7     public static void Main()
 8     {
 9         var factory = new ConnectionFactory() { HostName = "localhost" };
10         using(var connection = factory.CreateConnection())
11         using(var channel = connection.CreateModel())
12         {
13             channel.QueueDeclare(queue: "hello",
14                                  durable: false,
15                                  exclusive: false,
16                                  autoDelete: false,
17                                  arguments: null);
18
19             string message = "Hello World!";
20             var body = Encoding.UTF8.GetBytes(message);
21
22             channel.BasicPublish(exchange: "",
23                                  routingKey: "hello",
24                                  basicProperties: null,
25                                  body: body);
26             Console.WriteLine(" [x] Sent {0}", message);
27         }
28
29         Console.WriteLine(" Press [enter] to exit.");
30         Console.ReadLine();
31     }
32 }
 1 using RabbitMQ.Client;
 2 using RabbitMQ.Client.Events;
 3 using System;
 4 using System.Text;
 5
 6 class Receive
 7 {
 8     public static void Main()
 9     {
10         var factory = new ConnectionFactory() { HostName = "localhost" };
11         using(var connection = factory.CreateConnection())
12         using(var channel = connection.CreateModel())
13         {
14             channel.QueueDeclare(queue: "hello",
15                                  durable: false,
16                                  exclusive: false,
17                                  autoDelete: false,
18                                  arguments: null);
19
20             var consumer = new EventingBasicConsumer(channel);
21             consumer.Received += (model, ea) =>
22             {
23                 var body = ea.Body;
24                 var message = Encoding.UTF8.GetString(body);
25                 Console.WriteLine(" [x] Received {0}", message);
26             };
27             channel.BasicConsume(queue: "hello",
28                                  noAck: true,
29                                  consumer: consumer);
30
31             Console.WriteLine(" Press [enter] to exit.");
32             Console.ReadLine();
33         }
34     }
35 }
 

应用场景2-work queues

将耗时的消息处理通过队列分配给多个consumer来处理,我们称此处的consumer为worker,我们将此处的queue称为Task Queue,其目的是为了避免资源密集型的task的同步处理,也即立即处理task并等待完成。相反,调度task使其稍后被处理。也即把task封装进message并发送到task queue,worker进程在后台运行,从task queue取出task并执行job,若运行了多个worker,则task可在多个worker间分配。


建立连接,声明队列,发送可以模拟耗时任务的message,断开连接、退出。

建立连接,声明队列,不断的接收message,处理任务,进行确认。

应用场景3-Publish/Subscribe

在应用场景2中一个message(task)仅被传递给了一个comsumer(worker)。现在我们设法将一个message传递给多个consumer。这种模式被称为publish/subscribe。此处以一个简单的日志系统为例进行说明。该系统包含一个log发送程序和一个log接收并打印的程序。由log发送者发送到queue的消息可以被所有运行的log接收者接收。因此,我们可以运行一个log接收者直接在屏幕上显示log,同时运行另一个log接收者将log写入磁盘文件。


日志消息接收者:建立连接,声明exchange,将exchange与queue进行绑定,开始不停的接收log并打印。

日志消息发送者:建立连接,声明fanout类型的exchange,通过exchage向queue发送日志消息,消息被广播给所有接收者,关闭连接,退出。

应用场景4-Routing

应用场景3中构建了简单的log系统,可以将log message广播至多个receiver。现在我们将考虑只把指定的message类型发送给其subscriber,比如,只把error message写到log file而将所有log message显示在控制台。


log message接收者:建立连接,声明direct类型的exchange,声明queue,使用提供的参数作为routing_key将queue绑定到exchange,开始循环接收log message并打印。

log message发送者:建立连接,声明direct类型的exchange,生成并发送log message到exchange,关闭连接,退出。

应用场景5-topic

应用场景4中改进的log系统中用direct类型的exchange替换应用场景3中的fanout类型exchange实现将不同的log message发送给不同的subscriber(也即分别通过不同的routing_key将queue绑定到exchange,这样exchange便可将不同的message根据message内容路由至不同的queue)。但仍然存在限制,不能根据多个规则路由消息,比如接收者要么只能收error类型的log message要么只能收info类型的message。如果我们不仅想根据log的重要级别如info、warning、error等来进行log message路由还想同时根据log message的来源如auth、cron、kern来进行路由。为了达到此目的,需要topic类型的exchange。topic类型的exchange中routing_key中可以包含两个特殊字符:“*”用于替代一个词,“#”用于0个或多个词。

log message接收者:建立连接,声明topic类型的exchange,声明queue,根据程序参数构造routing_key,根据routing_key将queue绑定到exchange,循环接收并处理message。

log message发送者:建立连接、声明topic类型的exchange、根据程序参数构建routing_key和要发送的message,以构建的routing_key将message发送给topic类型的exchange,关闭连接,退出。

应用场景6-PRC

在应用场景2中描述了如何使用work queue将耗时的task分配到不同的worker中。但是,如果我们task是想在远程的计算机上运行一个函数并等待返回结果呢。这根场景2中的描述是一个完全不同的故事。这一模式被称为远程过程调用。现在,我们将构建一个RPC系统,包含一个client和可扩展的RPC server,通过返回斐波那契数来模拟RPC service。

RPC server:建立连接,声明queue,定义了一个返回指定数字的斐波那契数的函数,定义了一个回调函数在接收到包含参数的调用请求后调用自己的返回斐波那契数的函数并将结果发送到与接收到message的queue相关联的queue,并进行确认。开始接收调用请求并用回调函数进行请求处理。

RPC client:远程过程调用发起者:定义了一个类,类中初始化到RabbitMQ Server的连接、声明回调queue、开始在回调queue上等待接收响应、定义了在回调queue上接收到响应后的处理函数on_response根据响应关联的correlation_id属性作出响应、定义了调用函数并在其中向调用queue发送包含correlation_id等属性的调用请求、初始化一个client实例,以30为参数发起远程过程调用。

消息队列的选择:kafka、rabbitmq、zeromq

一、rabbitmq

首先是百科里的一段话,Rabbitmq是流行的开源消息队列系统,使用erlang语言进行开发。RabbitMQ是AMQP(高级消息队列协议)的标准实现。可以说从功能上rabbitmq基本上是符号这次项目要求的工具。

它的优点有:

1、完整的消息队列系统,支持多种消息队列模式,包括竞争消费;

2、基于AMQP

3、支持集群模式,扩展集群容量和性能比较方便,同时集成了集群的监控和管理;

4、支持消息的持久化;

缺点是:

1、需要学习比较复杂的接口和协议,比较耗费时间;

2、性能不是特别理想大概在1wqps左右;

3、使用Erlang语言,以前没听说过,出了问题不会排查;

二、zeromq

以前经常在内网中使用,号称是最快的消息队列,由于它支持的模式非常多:tcp、ipc、inproc、multicas,基本已经达到了替代标准socket的地步了,听说linux内核已经准备将zeromq纳入标准内核中了。

zeromq是一个智能传输层,它并不是对socket的封装,而是在其之上有一套自己的协议,可以使用非常丰富的开发模式像扇出(fanout)、发布订阅(pub-sub)、任务分发(task distribution)、请求响应(request-reply)等。

优点:

1、缺省为异步I/O交互,封装了连接的维护操作,消息处理并行化;

2、性能非常不错;

3、编程简单,上手很快;

缺点:

1、消息无法持久化,除非自己在实现一个中间件,否则消息传递完成就删除了;

2、扩展性不是很好,其实是一个消息库,并不算是MQ;

三、kafka

日志团队正在使用的工具,是一个消息发布订阅系统。生产者向某个队列发送一个数据,消费者订阅一个队列,一旦这个队列内产生新的数据了,中间人就会将数据发送给所有订阅队列的消费者。

用术语来说生产者就是producer、消费者就是consumer、中间人就是broker,kafka主要就是这三者之间进行联系的。

优点:

1、高吞吐量率,每秒能处理几十万条消息;

2、分布式架构,能够以集群进行处理;

3、日志团队已经建立了kafka集群,可以蹭一蹭;

缺点:

1、以前没有使用过,需要一定的熟悉时间,和开发工作;

四、结论

日志团队的强烈推荐,和强大的技术支持,最后决定使用kafka了,它提供的特点和优势确实也使人心动,不过这次的调研也让我了解了一些开源软件的设计思路和软件选择的看法,后面在写几篇记录一下。

时间: 2024-08-01 22:45:38

RabbitMQ的几种应用场景的相关文章

Python实现RabbitMQ中6种消息模型

RabbitMQ与Redis对比 ? RabbitMQ是一种比较流行的消息中间件,之前我一直使用redis作为消息中间件,但是生产环境比较推荐RabbitMQ来替代Redis,所以我去查询了一些RabbitMQ的资料.相比于Redis,RabbitMQ优点很多,比如: 具有消息消费确认机制 队列,消息,都可以选择是否持久化,粒度更小.更灵活. 可以实现负载均衡 RabbitMQ应用场景 异步处理:比如用户注册时的确认邮件.短信等交由rabbitMQ进行异步处理 应用解耦:比如收发消息双方可以使用

final修饰符的三种使用场景

final有三种使用场景,各自是修饰变量.方法和类.不管哪种修饰.一旦声明为final类型.你将不能改变这个引用了,编译器会检查代码,假设你试图再次初始化,编译器会报错.以下我来详细说说每一种修饰场景. 1.修饰变量 当final修饰一个变量的时候一般把他作为常量.通常和statickeyword配合使用. 比如: private static final String ISSUCCESS_FLAG = "true";一般static修饰的常量都用大写字母来表示. 在这里我写一段代码:

scala中Either的一种使用场景

用scala有一年多了,对于scala中的Option和Try使用的较为频繁,对其应用场景相对熟悉一些.而对于Either,仔细回想一下却发现几乎(完全)没有使用过,其实并不是没有遇到过Either的使用场景,只是遇到的时候不知道能够使用Either来解决此问题. 昨天在网上偶然看到一篇介绍Either的文章,发现有一种场景可以使用Either来解决,具体是这样的: web系统中,Controller层调用service层方法,根据邮箱查询注册的用户User,如果未取到User,则需要知道是什么

3.5星|《秩序:不法之徒为何比我们想象的更有秩序》:海盗等几种自治场景分析

秩序 英文原版是2014年的,可以看作是作者的<海盗经济学>的续篇,讲一些特定的无政府状态下出现的自治组织的情况. 重点讲了三种自治场景:1:16世纪英格兰苏格兰交战对立期间边境线附近居民自发形成<边境法则> :2:18-19世纪海盗们约定俗成了一套海盗章程:3:索马里政府垮台后的无政府状态. 海盗相关内容有细节有概述有数据,说的比较好,英格兰苏格兰边境次之,文献条文多于实例,更缺乏实际执行效果的信息,索马里无政府状态则只有宏观的经济发展指标,没有当地实际情况的信息. 书中还有不少

RabbitMQ的几种典型使用场景

AMQP AMQP协议是一个高级抽象层消息通信协议,RabbitMQ是AMQP协议的实现.它主要包括以下组件: 1.Server(broker): 接受客户端连接,实现AMQP消息队列和路由功能的进程. 2.Virtual Host:其实是一个虚拟概念,类似于权限控制组,一个Virtual Host里面可以有若干个Exchange和Queue,但是权限控制的最小粒度是Virtual Host 3.Exchange:接受生产者发送的消息,并根据Binding规则将消息路由给服务器中的队列.Exch

RabbitMQ 6种应用场景

http://www.rabbitmq.com/getstarted.html官网 最近业务需要使用Rabbitmq工作队列实现任务的负载分发 1.1.什么是RabbitMQ? RabbitMQ是实现AMQP(高级消息队列协议)的消息中间件的一种,服务器端用Erlang语言编写,支持多种客户端,如:Python.Ruby..NET.Java.JMS.C.PHP.ActionScript.XMPP.STOMP等,支持AJAX.用于在分布式系统中存储转发消息. 1.2.什么是AMQP? ??AMQP

迭代器模式的一种应用场景以及C#对于迭代器的内置支持

迭代器模式 先放上gof中对于迭代器模式的介绍镇楼 意图 提供一种方法顺序访问一个聚合对象中各个元素, 而又不需暴露该对象的内部表示. 别名 游标(Cursor). 动机 一个聚合对象, 如列表(list), 应该提供一种方法来让别人可以访问它的元素,而又不需暴露它的内部结构. 此外,针对不同的需要,可能要以不同的方式遍历这个列表.但是即使可以预见到所需的那些遍历操作,你可能也不希望列表的接口中充斥着各种不同遍历的操作.有时还可能需要在同一个表列上同时进行多个遍历.迭代器模式都可帮你解决所有这些

jquery.proxy的四种使用场景

作者:zccst 其实只有两种使用方式,只不过每一种又细分是否传参. 先给一段HTML,后面会用来测试: <p><button id="test">Test</button></p> <p id="log"></p> 1,jQuery.proxy(function, context); 使用context作为function运行上下文(即this) 2,jQuery.proxy(function

【RabbitMQ】——5种队列(转)

原文地址:https://blog.csdn.net/u012654963/article/details/76417613 应用RabbitMQ,我们可以根据需求选择5种队列之一. 一.简单队列 P:消息的生产者 C:消息的消费者 红色:队列 简单队列的生产者和消费者关系一对一 但有时我们的需求,需要一个生产者,对应多个消费者,那就可以采用第二种模式 二.Work模式 一个生产者.2个消费者. 但MQ中一个消息只能被一个消费者获取.即消息要么被C1获取,要么被C2获取.这种模式适用于类似集群,