AMQP协议与RabbitMQ

什么是AMQP?

  在异步通讯中,消息不会立刻到达接收方,而是被存放到一个容器中,当满足一定的条件之后,消息会被容器发送给接收方,这个容器即消息队列,而完成这个功能需要双方和容器以及其中的各个组件遵守统一的约定和规则,AMQP就是这样的一种协议,消息发送与接受的双方遵守这个协议可以实现异步通讯。这个协议约定了消息的格式和工作方式。

AMQP 中包含的主要元素

生产者(Producer):向Exchange发布消息的应用。

消费者(Consumer):从消息队列queue中消费消息的应用。

消息队列(Message Queue):服务器组件,用于保存消息,直到发送给消费者。

Queue:消息载体;每个消息都会被投入到一个或多个队列。

消息(Message):传输的内容。

交换器(exchange):路由组件,接收Producer发送的消息,并根据Routing Key转发给消息队列queue。

Routing Key:路由关键字,exchange根据这个Routing Key进行消息投递到队列queue。

虚拟主机(Virtual Host): 用作不同用户的权限分离;一批交换器,消息队列和相关对象。虚拟主机是共享相同身份认证和加密环境的独立服务器域。

Broker :AMQP的服务端称为Broker。

连接(Connection):一个网络连接,比如TCP/IP套接字连接。

信道(Channel):消息通道,在客户端的每个连接里,可建立多个channel,每个channel代表一个会话任务;多路复用连接中的一条独立的双向数据流通道,为会话提供物理传输介质。

绑定器(Binding):把exchange和queue按照路由规则绑定起来。

exchange 与 Queue 的路由机制

生产者在发送消息时,都需要指定一个RoutingKey和Exchange,Exchange在接到该RoutingKey以后,会判断该ExchangeType,然后转发到对应的Queue中,所以发消息不需要指定Queue,似乎消费者可以指定Queue绑定到某个Exchange、某个RoutingKey,也可以不指定Queue,就只根据某个Exchange和某个RoutingKey接受到消息


exchange 将消息发送到哪一个queue是由exchange type 和 Binding绑定规则决定的,目前常用的有3种exchange,Direct exchange, Fanout exchange, Topic exchange 。


Direct exchange 直接转发路由,其实现原理是会将消息中的RoutingKey与该Exchange关联的所有Binding中的BindingKey进行比较,如果相等,则发送到该Binding对应的Queue中。


Fanout exchange 复制分发路由,该路由不需要RoutingKey,会将消息发送给所有与该 Exchange 定义过Binding的所有Queues中去,其实是一种广播行为。


topic exchange 通配路由,是direct exchange的通配符模式,消息中的RoutingKey可以写成通配的模式,exchange支持“#”和“*” 的通配。收到消息后,将消息转发给所有符合匹配正则表达式的Queue。


需要注意的一点只有queue具有保存消息的功能,exchange不能保存消息。

AMQP 如何实现通信的

(1)建立连接Connection。由producer和consumer创建连接,连接到broker的物理节点上。

(2)建立消息Channel。Channel是建立在Connection之上的,一个Connection可以建立多个Channel。producer连接Virtual Host 建立Channel,Consumer连接到相应的queue上建立Channel。

(3)发送消息。由Producer发送消息到Broker中的exchange中。

(4)路由转发。exchange收到消息后,根据一定的路由策略routing key,将消息转发到相应的queue中去。

(5)消息接收。Consumer会监听相应的queue,一旦queue中有可以消费的消息,queue就将消息发送给Consumer端。

(6)消息确认。当Consumer完成某一条消息的处理之后,需要发送一条ACK消息给对应的Queue。Queue收到ACK信息后,才会认为消息处理成功,并将消息从Queue中移除;如果在对应的Channel断开后,Queue没有收到这条消息的ACK信息,该消息将被发送给另外的Channel。 至此一个消息的发送接收流程走完了。消息的确认机制提高了通信的可靠性。

消息队列的使用大概过程

(1)客户端连接Connection到消息队列服务器Broker,打开一个channel。
(2)客户端声明一个exchange,并设置相关属性。
(3)客户端声明一个queue,并设置相关属性。
(4)客户端使用routing key,在exchange和queue之间建立好绑定关系。
(5)客户端投递消息到exchange。

 

RabbitMQ中 exchange、route、queue的关系

MessageQueue、Exchange和Binding构成了AMQP协议的核心。

  声明MessageQueue 

  在Rabbit MQ中,无论是生产者发送消息还是消费者接受消息,都首先需要声明一个MessageQueue。这就存在一个问题,是生产者声明还是消费者声明呢?要解决这个问题,首先需要明确:

  a)消费者是无法订阅或者获取不存在的MessageQueue中信息。

  b)消息被Exchange接受以后,如果没有匹配的Queue,则会被丢弃。

  在明白了上述两点以后,就容易理解如果是消费者去声明Queue,就有可能会出现在声明Queue之前,生产者已发送的消息被丢弃的隐患。如果应用能够通过消息重发的机制允许消息丢失,则使用此方案没有任何问题。但是如果不能接受该方案,这就需要无论是生产者还是消费者,在发送或者接受消息前,都需要去尝试建立消息队列。

  这里有一点需要明确,如果客户端尝试建立一个已经存在的消息队列,Rabbit MQ不会做任何事情,并返回客户端建立成功的。

如果一个消费者在一个信道中正在监听某一个队列的消息,Rabbit MQ是不允许该消费者在同一个channel去声明其他队列的。Rabbit MQ中,可以通过queue.declare命令声明一个队列,可以设置该队列以下属性:

a) Exclusive:排他队列,如果一个队列被声明为排他队列,该队列仅对首次声明它的连接可见,并在连接断开时自动删除。这里需要注意三点:其一,排他队列是基于连接可见的,同一连接的不同信道是可以同时访问同一个连接创建的排他队列的。其二,“首次”,如果一个连接已经声明了一个排他队列,其他连接是不允许建立同名的排他队列的,这个与普通队列不同。其三,即使该队列是持久化的,一旦连接关闭或者客户端退出,该排他队列都会被自动删除的。这种队列适用于只限于一个客户端发送读取消息的应用场景。

b)   Auto-delete:自动删除,如果该队列没有任何订阅的消费者的话,该队列会被自动删除。这种队列适用于临时队列。

c)   Durable:持久化,这个会在后面作为专门一个章节讨论。

d)  其他选项,例如如果用户仅仅想查询某一个队列是否已存在,如果不存在,不想建立该队列,仍然可以调用queue.declare,只不过需要将参数passive设为true,传给queue.declare,如果该队列已存在,则会返回true;如果不存在,则会返回Error,但是不会创建新的队列。

  

exchange 与 Queue 的路由机制

生产者在发送消息时,都需要指定一个RoutingKey和Exchange,Exchange在接到该RoutingKey以后,会判断该ExchangeType,然后转发到对应的Queue中,所以发消息不需要指定Queue,似乎消费者可以指定Queue绑定到某个Exchange、某个RoutingKey,也可以不指定Queue,就只根据某个Exchange和某个RoutingKey接受到消息


exchange 将消息发送到哪一个queue是由exchange type 和 Binding绑定规则决定的,目前常用的有3种exchange,Direct exchange, Fanout exchange, Topic exchange 。


Direct exchange 直接转发路由,其实现原理是会将消息中的RoutingKey与该Exchange关联的所有Binding中的BindingKey进行比较,如果相等,则发送到该Binding对应的Queue中。


Fanout exchange 复制分发路由,该路由不需要RoutingKey,会将消息发送给所有与该 Exchange 定义过Binding的所有Queues中去,其实是一种广播行为。


topic exchange 通配路由,是direct exchange的通配符模式,消息中的RoutingKey可以写成通配的模式,exchange支持“#”和“*” 的通配。收到消息后,将消息转发给所有符合匹配正则表达式的Queue。


需要注意的一点只有queue具有保存消息的功能,exchange不能保存消息。

 

AMQP的应用场景


AMQP是实现消息机制的一种协议,消息队列主要有以下几种应用场景:


异步处理


比如公司新入职一个员工,需要开通系统账号,有几件事情要做,开通系统账号,发短信通知用户,发邮件给员工,在公司内部通讯系统中发送消息给员工。其中发短信,发邮件,发内部通讯系统消息,这三件事情可以串行也可以并行,并行的好处就是可以提高效率,这时可以应用MQ来实现并行。


应用解耦

在公司内部系统中,有人事系统,OA系统,财务系统,外围应用系统等等,当人事发生变动的时候(离职入职调岗),人事系统需要将这些变动通知给其他系统,这时只需人事系统发送一条消息,各个外围系统订阅该消息,就可得知人事变动,与实时服务调用相比,如果人事系统挂掉,各个外围系统不会受到影响,继续运行;如果是实时服务调用,比如人事系统被各个服务调用,人事系统挂了,调用人事系统的服务都会受到影响。


流量缓冲


在有些流量会瞬间暴增的场景下,如秒杀,为了防止流量突然增大而使得应用挂掉,可以引入MQ,将请求存入MQ中,如果超过了MQ的长度,就把请求丢弃掉,这样来限制流量。


日志处理


将消息队列引入到日志处理中,如kafka的应用,解决了大量日志的传输问题。日志客户端负责采集日志数据,并定期写入kafka队列,kafka负责接收,存储和转发日志,日志处理系统订阅并消费kafka中的日志数据。

SpringBoot+RabbitMQ的简单demo

https://www.cnblogs.com/theRhyme/p/10071781.html

来源:

https://blog.csdn.net/letempsar/article/details/52565020

https://blog.csdn.net/ztx114/article/details/78410727

https://www.cnblogs.com/linkenpark/p/5393666.html

原文地址:https://www.cnblogs.com/theRhyme/p/9578675.html

时间: 2024-11-05 18:35:37

AMQP协议与RabbitMQ的相关文章

rabbitMQ的简单实例——amqp协议带数据回写机制

rabbitMQ是一种高性能的消息队列,支持或者说它实现了AMQP协议(advanced message queue protocol高级消息队列协议). 下面简单讲一讲一个小例子.我们首先要部署好rabbitMQ,然后实现一个生产者—消费者,生产者向rabbit中发布一个消息,消费者去rabbit取这个消息,在正确收到这个消息后,消费者会通过返回队列回写通知生产者自己收到了消息. windows下部署rabbit非常简单,先安装erlang运行时,然后安装rabbitMQ安装文件即可,都是ex

Rabbimq必备基础之对高级消息队列协议AMQP分析及Rabbitmq本质介绍

MQ的一个产品... [消息队列] 1. MSMQ windows自带的一个服务... [petshop],message存放在文件系统中. 最原始的消息队列... [集群,消息确认,内存化,高可用,镜像] 2. ActiveMQ,ZeroMQ,RabbitMQ 3. Rabbitmq本质是一个什么东西???? <1> rabbitmq是用什么语言写的 => erlang <2> rabbitmq其实是遵循 amqp协议 的一个erlang代码实现... 4. amqp协议:

RabbitMQ与AMQP协议详解

1. 消息队列的历史 了解一件事情的来龙去脉,将不会对它感到神秘.让我们来看看消息队列(Message Queue)这项技术的发展历史. Message Queue的需求由来已久,80年代最早在金融交易中,高盛等公司采用Teknekron公司的产品,当时的Message queuing软件叫做:the information bus(TIB). TIB被电信和通讯公司采用,路透社收购了Teknekron公司.之后,IBM开发了MQSeries,微软开发了Microsoft Message Que

RabbitMQ之AMQP协议

一. 协议 AMQP协议是分层的,类似于OSI或TCP/IP协议分层.从图中可以看出分三层: 1. Model Layer:规范服务器端Broker的行为. 2. Session Layer:定义客户端 与服务器端Broker的Context. 3. Transport Layer:传输二进制数据流. 二. 模型 AMQP服务器Broker主要由Exchange和Message Queue组成,主要功能是Message的路由Routing和缓存Buffering. Exchange接受Produ

AMQP协议学习

参考这个:http://kb.cnblogs.com/page/73759/ 写的挺好 AMQP协议是一种二进制协议,提供客户端应用与消息中间件之间异步.安全.高效地交互.从整体来看,AMQP协议可划分为三层: 这种分层架构类似于OSI网络协议,可替换各层实现而不影响与其它层的交互.AMQP定义了合适的服务器端域模型,用于规范服务器的行为(AMQP服务器端可称为broker).在这里Model层决定这些基本域模型所产生的行为,这种行为在AMQP中用"command"表示,在后文中会着重

AMQP协议

当前各种应用大量使用异步消息模型,并随之产生众多消息中间件产品及协议,标准的不一致使应用与中间件之间的耦合限制产品的选择,并增加维护成本. AMQP是一个提供统一消息服务的应用层标准协议,基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同开发语言等条件的限制. 当然这种降低耦合的机制是基于与上层产品,语言无关的协议.AMQP协议是一种二进制协议,提供客户端应用与消息中间件之间异步.安全.高效地交互. 从整体来看,AMQP协议可划分为三层:  这种分层架构类似于OSI网络

一:AMQP协议标准简单介绍

一:AMQP协议?--->AMQP 是 Advanced Message Queuing Protocol,即高级消息队列协议.和前面罗列的技术不同,AMQP 是一个标准化的消息中间件协议--->她的理想是让不同语言,不同系统的应用互相通信,并提供一个简单统一的模型和编程接口.这样,人们就可以采用各种语言和平台来实现自己的应用,当需要和其他系统通信时,只要承认 AMQP 协议即可. 二:AMQP协议的形象化例子?--->世界各地的人们由于地理和历史的原因,使用着各种不同的语言,相互交流十

RPC机制之AMQP协议

openstack RPC通信 Openstack 的主要组件有 Nova.Cinder.Neutron.Glance 等,分别负责云平台的计算.存储.网络资源管理.OpenStack 各组件之间是通过 REST 接口进行相互通信,而各组件内部则采用了RPC通信. 什么是RPC RPC即Remote Procedure Call(远程方法调用),是Openstack中一种用来实现跨进程(或者跨机器)的通信机制.Openstack中同项目内(如nova, neutron, cinder-)各服务(

php amqp 消息队列 RabbitMQ 交换器类型 直连 (三)

1.AMQP_EX_TYPE_DIRECT:直连型 直连型又包括: 1对1 和1对N(N对1. N对N) 接收端receive.php代码如下 ? 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 <!--?php $connect =newAMQPConnection(); $connect--->connect(); $channel =newAMQPChannel($connect); $