什么是AMQP?
在异步通讯中,消息不会立刻到达接收方,而是被存放到一个容器中,当满足一定的条件之后,消息会被容器发送给接收方,这个容器即消息队列,而完成这个功能需要双方和容器以及其中的各个组件遵守统一的约定和规则,AMQP就是这样的一种协议,消息发送与接受的双方遵守这个协议可以实现异步通讯。这个协议约定了消息的格式和工作方式。
AMQP 中包含的主要元素
生产者(Producer):向Exchange发布消息的应用。
消费者(Consumer):从消息队列queue中消费消息的应用。
消息队列(Message Queue):服务器组件,用于保存消息,直到发送给消费者。
Queue:消息载体;每个消息都会被投入到一个或多个队列。
消息(Message):传输的内容。
交换器(exchange):路由组件,接收Producer发送的消息,并根据Routing Key转发给消息队列queue。
Routing Key:路由关键字,exchange根据这个Routing Key进行消息投递到队列queue。
虚拟主机(Virtual Host): 用作不同用户的权限分离;一批交换器,消息队列和相关对象。虚拟主机是共享相同身份认证和加密环境的独立服务器域。
Broker :AMQP的服务端称为Broker。
连接(Connection):一个网络连接,比如TCP/IP套接字连接。
信道(Channel):消息通道,在客户端的每个连接里,可建立多个channel,每个channel代表一个会话任务;多路复用连接中的一条独立的双向数据流通道,为会话提供物理传输介质。
绑定器(Binding):把exchange和queue按照路由规则绑定起来。
exchange 与 Queue 的路由机制
生产者在发送消息时,都需要指定一个RoutingKey和Exchange,Exchange在接到该RoutingKey以后,会判断该ExchangeType,然后转发到对应的Queue中,所以发消息不需要指定Queue,似乎消费者可以指定Queue绑定到某个Exchange、某个RoutingKey,也可以不指定Queue,就只根据某个Exchange和某个RoutingKey接受到消息。
exchange 将消息发送到哪一个queue是由exchange type 和 Binding绑定规则决定的,目前常用的有3种exchange,Direct exchange, Fanout exchange, Topic exchange 。
Direct exchange 直接转发路由,其实现原理是会将消息中的RoutingKey与该Exchange关联的所有Binding中的BindingKey进行比较,如果相等,则发送到该Binding对应的Queue中。
Fanout exchange 复制分发路由,该路由不需要RoutingKey,会将消息发送给所有与该 Exchange 定义过Binding的所有Queues中去,其实是一种广播行为。
topic exchange 通配路由,是direct exchange的通配符模式,消息中的RoutingKey可以写成通配的模式,exchange支持“#”和“*” 的通配。收到消息后,将消息转发给所有符合匹配正则表达式的Queue。
需要注意的一点只有queue具有保存消息的功能,exchange不能保存消息。
AMQP 如何实现通信的
(1)建立连接Connection。由producer和consumer创建连接,连接到broker的物理节点上。
(2)建立消息Channel。Channel是建立在Connection之上的,一个Connection可以建立多个Channel。producer连接Virtual Host 建立Channel,Consumer连接到相应的queue上建立Channel。
(3)发送消息。由Producer发送消息到Broker中的exchange中。
(4)路由转发。exchange收到消息后,根据一定的路由策略routing key,将消息转发到相应的queue中去。
(5)消息接收。Consumer会监听相应的queue,一旦queue中有可以消费的消息,queue就将消息发送给Consumer端。
(6)消息确认。当Consumer完成某一条消息的处理之后,需要发送一条ACK消息给对应的Queue。Queue收到ACK信息后,才会认为消息处理成功,并将消息从Queue中移除;如果在对应的Channel断开后,Queue没有收到这条消息的ACK信息,该消息将被发送给另外的Channel。 至此一个消息的发送接收流程走完了。消息的确认机制提高了通信的可靠性。
消息队列的使用大概过程
(1)客户端连接Connection到消息队列服务器Broker,打开一个channel。
(2)客户端声明一个exchange,并设置相关属性。
(3)客户端声明一个queue,并设置相关属性。
(4)客户端使用routing key,在exchange和queue之间建立好绑定关系。
(5)客户端投递消息到exchange。
RabbitMQ中 exchange、route、queue的关系
MessageQueue、Exchange和Binding构成了AMQP协议的核心。
声明MessageQueue
在Rabbit MQ中,无论是生产者发送消息还是消费者接受消息,都首先需要声明一个MessageQueue。这就存在一个问题,是生产者声明还是消费者声明呢?要解决这个问题,首先需要明确:
a)消费者是无法订阅或者获取不存在的MessageQueue中信息。
b)消息被Exchange接受以后,如果没有匹配的Queue,则会被丢弃。
在明白了上述两点以后,就容易理解如果是消费者去声明Queue,就有可能会出现在声明Queue之前,生产者已发送的消息被丢弃的隐患。如果应用能够通过消息重发的机制允许消息丢失,则使用此方案没有任何问题。但是如果不能接受该方案,这就需要无论是生产者还是消费者,在发送或者接受消息前,都需要去尝试建立消息队列。
这里有一点需要明确,如果客户端尝试建立一个已经存在的消息队列,Rabbit MQ不会做任何事情,并返回客户端建立成功的。
如果一个消费者在一个信道中正在监听某一个队列的消息,Rabbit MQ是不允许该消费者在同一个channel去声明其他队列的。Rabbit MQ中,可以通过queue.declare命令声明一个队列,可以设置该队列以下属性:
a) Exclusive:排他队列,如果一个队列被声明为排他队列,该队列仅对首次声明它的连接可见,并在连接断开时自动删除。这里需要注意三点:其一,排他队列是基于连接可见的,同一连接的不同信道是可以同时访问同一个连接创建的排他队列的。其二,“首次”,如果一个连接已经声明了一个排他队列,其他连接是不允许建立同名的排他队列的,这个与普通队列不同。其三,即使该队列是持久化的,一旦连接关闭或者客户端退出,该排他队列都会被自动删除的。这种队列适用于只限于一个客户端发送读取消息的应用场景。
b) Auto-delete:自动删除,如果该队列没有任何订阅的消费者的话,该队列会被自动删除。这种队列适用于临时队列。
c) Durable:持久化,这个会在后面作为专门一个章节讨论。
d) 其他选项,例如如果用户仅仅想查询某一个队列是否已存在,如果不存在,不想建立该队列,仍然可以调用queue.declare,只不过需要将参数passive设为true,传给queue.declare,如果该队列已存在,则会返回true;如果不存在,则会返回Error,但是不会创建新的队列。
exchange 与 Queue 的路由机制
生产者在发送消息时,都需要指定一个RoutingKey和Exchange,Exchange在接到该RoutingKey以后,会判断该ExchangeType,然后转发到对应的Queue中,所以发消息不需要指定Queue,似乎消费者可以指定Queue绑定到某个Exchange、某个RoutingKey,也可以不指定Queue,就只根据某个Exchange和某个RoutingKey接受到消息。
exchange 将消息发送到哪一个queue是由exchange type 和 Binding绑定规则决定的,目前常用的有3种exchange,Direct exchange, Fanout exchange, Topic exchange 。
Direct exchange 直接转发路由,其实现原理是会将消息中的RoutingKey与该Exchange关联的所有Binding中的BindingKey进行比较,如果相等,则发送到该Binding对应的Queue中。
Fanout exchange 复制分发路由,该路由不需要RoutingKey,会将消息发送给所有与该 Exchange 定义过Binding的所有Queues中去,其实是一种广播行为。
topic exchange 通配路由,是direct exchange的通配符模式,消息中的RoutingKey可以写成通配的模式,exchange支持“#”和“*” 的通配。收到消息后,将消息转发给所有符合匹配正则表达式的Queue。
需要注意的一点只有queue具有保存消息的功能,exchange不能保存消息。
AMQP的应用场景
AMQP是实现消息机制的一种协议,消息队列主要有以下几种应用场景:
异步处理
比如公司新入职一个员工,需要开通系统账号,有几件事情要做,开通系统账号,发短信通知用户,发邮件给员工,在公司内部通讯系统中发送消息给员工。其中发短信,发邮件,发内部通讯系统消息,这三件事情可以串行也可以并行,并行的好处就是可以提高效率,这时可以应用MQ来实现并行。
应用解耦
在公司内部系统中,有人事系统,OA系统,财务系统,外围应用系统等等,当人事发生变动的时候(离职入职调岗),人事系统需要将这些变动通知给其他系统,这时只需人事系统发送一条消息,各个外围系统订阅该消息,就可得知人事变动,与实时服务调用相比,如果人事系统挂掉,各个外围系统不会受到影响,继续运行;如果是实时服务调用,比如人事系统被各个服务调用,人事系统挂了,调用人事系统的服务都会受到影响。
流量缓冲
在有些流量会瞬间暴增的场景下,如秒杀,为了防止流量突然增大而使得应用挂掉,可以引入MQ,将请求存入MQ中,如果超过了MQ的长度,就把请求丢弃掉,这样来限制流量。
日志处理
将消息队列引入到日志处理中,如kafka的应用,解决了大量日志的传输问题。日志客户端负责采集日志数据,并定期写入kafka队列,kafka负责接收,存储和转发日志,日志处理系统订阅并消费kafka中的日志数据。
SpringBoot+RabbitMQ的简单demo
https://www.cnblogs.com/theRhyme/p/10071781.html
来源:
https://blog.csdn.net/letempsar/article/details/52565020
https://blog.csdn.net/ztx114/article/details/78410727
https://www.cnblogs.com/linkenpark/p/5393666.html
原文地址:https://www.cnblogs.com/theRhyme/p/9578675.html