消息队列MQ
维基百科中是这样介绍消息队列的
消息队列(Message queue)是一种进程间通信或同一进程的不同线程间的通信方式,软件的贮列用来处理一系列的输入,通常是来自用户。消息队列提供了异步的通信协议,每一个贮列中的纪录包含详细说明的数据,包含发生的时间,输入设备的种类,以及特定的输入参数,也就是说:消息的发送者和接收者不需要同时与消息队列交互。消息会保存在队列中,直到接收者取回它
58沈剑是这样介绍消息队列的
消息队列,是一种跨进程的通信机制,用于上下游传递消息。在互联网架构中,MQ是一种非常常见的上下游“逻辑解耦+物理解耦”的消息通信服务。使用MQ后,消息发送上游只需要依赖MQ,逻辑上和物理上不依赖与下游。
消息队列的应用场景
- 异步处理: 将非核心流程异步化,提高系统响应性能。比如用户注册需要发送邮件和短信确认(虽然产品都不会这样做,仅仅举例),我们可以在注册成功后发送消息给MQ,然后邮件服务和短信服务接受消息后执行。这里,可以直接使用线程完成,但是我们可以使用MQ后,我们不用关心下游(邮件、短信服务等)有多少任务,并且我们可以简单的在下游订阅上游即可动态添加。
- 应用解耦: 将不强依赖于本系统的非核心流程和系统流程进行解耦。比如购买商品后会有积分赠送,这里积分系统时非核心流程,我们可以让订单系统和积分系统解耦。
- 流量削峰与流控控制: 当上下游系统处理能力存在差距的时候,利用消息队列做一个通用的“漏斗”。在下游有能力处理的时候,再进行分发。例如,在秒杀系统中我们可以把用户请求写入消息队列中,然后秒杀系统在根据规则依次从消息队列中取然后进行处理,这样就不会发送系统由于并发量过大而崩溃。
- 消息订阅: 这里有点像UDP,上游只关心把消息放入MQ中,下游谁订阅了就会获得消息。
- 消息通讯: MQ都会内置高效的通信机制,例如我们可以使用MQ作为通信工具进行RPC通信。
- 日志处理: 使用MQ解决大量日志传输问题。
日志处理(转自新浪是如何分析处理32亿条实时日志的?)
电商系统(转自大型网站架构系列:分布式消息队列(一))
消息队列的不足
- 系统更加复杂,多了MQ组件
- 消息可靠性和重复性互为矛盾,消息不丢不重难以同时保证
- 上游无法知道下游的执行结果
调用方实时依赖执行结果的业务场景,使用调用,而不是MQ
消息队列的消息协议
常见的MQ有RabbitMQ、ActiveMQ、Kafka、RocketMQ等等,它们支持不同的通信协议.例如AMQP、SMTP、STOMP、HTTP等等,下面是一下协议介绍。
AMQP
AMQP(高级消息队列协议)是应用层网络协议,它支持符合要求的客户端应用和消息中间件代理之间通信。
AMQP协议的各个部分
- AMQP协议中的元素包括:Message(消息体)、Producer(生产者)、Consumer(消费者)、Virtual Host(虚拟节点)、Exchange(交换机)、Queue(队列)等
- 由Producer(消息生产者)和Consumer(消息消费者)构成了AMQP的客户端,他们是发送消息和接收消息的主体。AMQP服务端称为Broker,一个Broker中一定包含完整的Virtual Host(虚拟主机)、 Exchange(交换机)、Queue(队列)定义
- 一个Broker可以创建多个Virtual Host(虚拟主机),我们将讨论的Exchange和Queue都是虚拟机中的工作元素(还有User元素)。注意,如果AMQP是由多个Broker构成的集群提供服务,那么一个Virtual Host也可以由多个Broker共同构成
- Connection是由Producer(消息生产者)和Consumer(消息消费者)创建的连接,连接到Broker物理节点上。但是有了Connection后客户端还不能和服务器通信,在Connection之上客户端会创建Channel,连接到Virtual Host或者Queue上,这样客户端才能向Exchange发送消息或者从Queue接受消息。一个Connection上允许存在多个Channel,只有Channel中能够发送/接受消息
- Exchange元素是AMQP协议中的交换机,Exchange可以绑定多个Queue也可以同时绑定其他Exchange。消息通过Exchange时,会按照Exchange中设置的Routing(路由)规则,将消息发送到符合的Queue或者Exchange中
AMQP中消息的交互
- 在Producer(消息生产者)客户端建立了Channel后,就建立了到Broker上Virtual Host的连接。接下来Producer就可以向这个Virtual Host中的Exchange发送消息了
- Exchange(交换机)能够处理消息的前提是:它至少已经和某个Queue或者另外的Exchange形成了绑定关系,并设置好了到这些Queue和Excahnge的Routing(路由规则)。Exchange中的Routing有三种模式,在Exchange收到消息后,会根据设置的Routing(路由规则),将消息发送到符合要求的Queue或者Exchange中(路由规则还会和Message中的Routing Key属性配合使用)
- Queue收到消息后,可能会进行如下的处理:如果当前没有Consumer的Channel连接到这个Queue,那么Queue将会把这条消息进行存储直到有Channel被创建(AMQP协议的不同实现产品中,存储方式又不尽相同);如果已经有Channel连接到这个Queue,那么消息将会按顺序被发送给这个Channel
- Consumer收到消息后,就可以进行消息的处理了。但是整个消息传递的过程还没有完成:视设置情况,Consumer在完成某一条消息的处理后,将需要手动的发送一条ACK消息给对应的Queue(当然您可以设置为自动发送,或者无需发送)。Queue在收到这条ACK信息后,才会认为这条消息处理成功,并将这条消息从Queue中移除;如果在对应的Channel断开后,Queue都没有这条消息的ACK信息,这条消息将会重新被发送给另外的Channel。当然,您还可以发送NACK信息,这样这条消息将会立即归队,并发送给另外的Channel
AMQP协议中的Message
AMQP协议的消息格式如下:
其中内容在PAYLOAD部分,PAYLOAD部分格式如下:
关于AMQP更加详细的资料见于:AMQP官网、关于AMQP网友的博客
STOMP
STOMP(简单文本定向协议)同样是应用层协议,它提供一个可交互操作的连接格式,容许STOMP客户端和任意STOMP消息代理(Broker)进行交互,常用于消息中间件。STOMP协议基于帧(Frame)进行通信,第一行包含命令,然后紧跟键值对形式的Header内容,第二行空行,第三行开始就是Body内容,末尾都以空字符结尾。
STOMP消息格式和HTTP请求很像,格式如下:
COMMAD
header1:value1
header2:value2
...
Body
常见帧命令有:
- CONNECT
- SEND
- SUBSCRIBE
- UNSUBSCRIBE
- ACK
- NACK
- ...
例如STOMPSEND消息:
SEND
destination:/app/hello
content-length:20
{"message":"World!"}
关于STOMP资料见于:STOMP1.2官方规范、关于STOMP网友的博客
JMS
JMS(Java消息服务)不是网络协议,其是Java关于消息服务提供了一组通用接口,具体实现的协议由使用该接口的应用程序实现。在MQ中ActiveMQ、RocketMQ都实现了该协议。
这里有篇介绍JMS文章:JMS学习一(JMS介绍)
Reference
http://blog.csdn.net/u012758088/article/details/78024581
https://www.cnblogs.com/my_life/articles/7002138.html
https://www.jianshu.com/p/cee941ca0c09
架构师之路工作号《到底什么时候该使用MQ?》(由于微信公众号文章URL有时间限制,所以没有列出来)
原文地址:https://www.cnblogs.com/maying3010/p/8508227.html