RabbitMQ实战:理解消息通信

本系列是「RabbitMQ实战:高效部署分布式消息队列」书籍的总结笔记。

前段时间总结完了「深入浅出MyBatis」系列,对MyBatis有了更全面和深入的了解,在掘金社区也收到了一些博友的喜欢,很高兴。另外,短暂的陪产假就要结束了,小宝也二周了,下周二就要投入工作了,希望自己尽快调整过来,加油努力。

从本篇开始总结「RabbitMQ实战」系列的阅读笔记,RabbitMQ是一个开源的消息代理和队列服务器,可以通过基本协议在完全不同的应用之间共享数据,可以将作业排队以便让分布式服务进行处理。

本篇介绍下消息通信,首先介绍基础概念,将这些概念映射到AMQP协议,然后介绍消息持久化、发送方确认模式等消息可靠性保证。

通过本篇介绍,你会了解到:

  • 消息通信概念:消费者、生产者和代理
  • AMQP元素:队列、交换器、绑定
  • 虚拟主机
  • 消息持久化
  • 发送方确认模式

消息通信概念

此部分的介绍,会牵涉到AMQP的元素,如果之前没接触过的,可以结合下面的「AMQP元素」进行理解。

消息

消息是传输的主体,消息包括两部分:有效载荷(payload)和标签(label);有效载荷是要传输的数据,可以是任何内容,比如JSON串、二进制、自定义的数据协议等;标签描述了有效载荷,并且Rabbit用它来决定谁将获得消息的投递。

可以与HTTP协议类比,HTTP消息头部描述了消息体的类型、大小等,HTTP消息体是要传输的数据,HTTP服务端通过消息头部决定如何处理请求和数据。

生产者和消费者

生产者创建消息,然后发送到代理服务器(RabbitMQ Server),AMQP只会用标签表述这条消息(一个交换器名称和可选的主题标记),Rabbit服务器会根据标签把消息发送给订阅的消费者。

消费者消费消息,它会订阅到队列(queue)上,每当有消息到达RabbitMQ服务器时,会发送给消费者,消费者收到消息时,会进行处理。

注意:消费者收到的消息只包括有效载荷,所有不会知道是从哪里发来的。

连接和信道

要想发布或消费消息,必须先与RabbitMQ Server建立一条TCP连接,建立TCP连接之后,要创建一条信道,信道是建立在真实TCP连接的虚拟连接。

AMQP命令都是通过信道发送出去的,每条信道会被指派一个唯一的ID,为什么不直接通过TCP连接发送AMQP命令呢? 因为操作系统建立和销毁TCP会话是很昂贵的,而且创建的连接数也有限。 通过引入通道,可以在连接上建立通道,而且通道是私密的,相互不受影响。

通道的概念还是有点抽象,后面专门写一篇文章进行分析介绍,这里简单理解下吧。

AMQP元素

AMQP消息路由有三部分组成:队列、交换器和绑定,队列是存放消息的地方,交换器是决定不同的分发策略,绑定是队列和交换器的桥梁,定义匹配规则。

生产者发送消息到交换器,交换器根据自身类型和绑定规则,将消息存放在对应队列中,然后将消息发送到监听队列的消费者。

如上图:P为生产者,X为交换器,交换器类型为direct,根据不同的绑定规则(orange、black、green),分发给不同的队列,C为消费者,从不同的队列介绍消息。

队列

消费者通过两种方式从特定的队列接收消息:

  • basic.consume,这样会将信道置为接收模式,直到取消对队列的订阅;
  • basic.get,主动让消费者接收队列中的下一条消息;

basic.get会影响性能,推荐使用basic.consume来实现高吞吐量,因为其处理过程是先订阅消息,获取单条消息,再取取消订阅。

如果队列拥有多个消费者时,队列收到的消息将以循环的方式发给消费者,即多个消费者平均消费这些消息。

另外,消费者接收到的每一条消息都要进行确认,必须通过basic.ack命令向rabbitmq服务端发送一个确认。 也可以设置auto_ack为true,只要消费者接收到消息,就自动视为确认,不过不建议这样,因为接收到不代表业务逻辑处理成功。 服务端接收到确认后,会从队列中删除对应消息。

还有一种场景,在接收到消息后,如果不想处理,可以通过下面方式处理:

  • 把消费者从RabbitMQ服务器断开连接,,这样RabbitMQ会自动将消息入队并发送给另外一个消费者;
  • 如果不想发送给其他消费者处理,就是想忽略这个消息,可以发送basic.reject命令;

最后来介绍下如何创建队列,首先明确下是生成者还是消费者创建,关键点是:生产者能否承担起丢失消息,因为发出去的消息如果路由到了不存在的队列,Rabbit会忽略它们。所以,建议生成者和消费者都尝试去创建队列,可以通过设置queue.declare的passive选项设置为ture来判断队列是否存在,如果不存在会返回一个错误。

通过queue.declare命令来创建队列,有一些选项说明下:

  • exclusive:如果设置true的化,队列将变成私有的,只有创建队列的应用程序才能够消费队列消息;
  • auto-delete:当最后一个消费者取消订阅的时候,队列会自动移除;
  • durable:是否要持久化;
queueDeclare(String queue,
            boolean durable,
            boolean exclusive,
            Map<String, Object> arguments);
交换器和绑定

交换器有四种类型:direct、fanout、topic、headers,其中headers匹配消息的header而非路由键,不太实用,就不详细介绍了。

第一种:direct交换器

direct交换器比较简单,如果和路由键 完全匹配 的话,就会投递到对应的队列:

channel.exchangeDeclare(EXCHANGE_NAME, "direct");
channel.queueBind(queueName, EXCHANGE_NAME, routingKey);

服务器默认包含一个空白字符串名称的默认路由器,当声明一个队列时,会自定绑定到默认交换器,并以队列名称作为路由键。

第二种:fanout交换器

fanout交换器,不处理路由键,只需要简单的将队列绑定到交换机上,为会每个消费者自动生成一个随机队列,所有的消费者都会收到所有消息。

channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

第三种:topic交换器

topic交换器,将路由键和某模式进行匹配,此时队列需要绑定要一个模式上。

channel.exchangeDeclare(EXCHANGE_NAME, "topic");
channel.queueBind(queueName, EXCHANGE_NAME, "*.orange.*");

关于模式,符号#匹配一个或多个词,符号匹配一个词,因此kfs.#能够匹配到kfs.session.message,但是audit.只会匹配到audit.session。

虚拟主机

每个RabbitMQ服务器都能创建虚拟消息服务器,称为虚拟主机(vhost),每个RabbitMQ本质上是一个mini版的RabbitMQ服务器,拥有自己的队列、交换器、绑定,还有自己的权限机制。

连接时,必须制定vhost,rabbitmq包含了默认的vhost:"/"。当创建一个用户时,会被指派给至少一个vhsot,并且相互隔离。

vhost不能通过AMQP协议创建,需要使用rabbitmqctl工具创建。

消息持久化和发送方确认模式

如果没有持久化,重启rabbitmq后,队列、交换器都会消失,RabbitMQ提供了持久化的功能,需要满足以下三个条件:

  • 交换器设置为持久化,通过durable属性;
  • 队列设置为持久化,通过durable属性;
  • 消息投递模式delivery设置为2;

当发布一条持久化消息到持久化交换器上时,rabbit会在消息提交到日志文件后才会发送响应,所有会损失性能,所以,只对重要数据持久化即可。

考虑这种情况:由于发布消息后,不返回任何信息给生产者,如何只对服务器已经持久化到硬盘了呢,可能在传输过程中丢失,或者持久化前服务器宕机,导致消息丢失。

RabbitMQ通过「发送方确认模式」来解决上面的问题。首先,需要将信道设置成confirm模式,这样所有在信道上发布的消息都会被指派一个唯一的ID号,一旦消息被投递到所有匹配的队列或持久化到磁盘,会发送一个确认消息给生产者。

通过本篇的介绍,对Rabbit的消息模型有了整体了解,下一篇会写个DEMO,并介绍下运行和管理RabbitMQ。

欢迎扫描下方二维码,关注我的个人微信公众号 ~

原文地址:http://blog.51cto.com/13714880/2115088

时间: 2024-10-07 16:22:25

RabbitMQ实战:理解消息通信的相关文章

RabbitMQ(二):理解消息通信RabbitMQ

原文:RabbitMQ(二):理解消息通信RabbitMQ 一.消费者.生产者和信道 生产者(producer):生产者创建消息,然后发布(发送)到代理服务器(RabbitMQ),可以说发送消息的程序就是生产者.什么是消息?消息包含两部分:有效载荷和标签.有效载荷就是传输的数据,可以是任何内容,包括json数据和图片等等.而标签(一个叫交换器名称和可选的主题标记)描述了有效载荷,RabbitMQ用它来决定谁将获得这个消息. 消费者(consumer):消费者就是接收消息并处理消息的程序,他们连接

RabbitMQ实战:消息通信模式和最佳实践

本系列是「RabbitMQ实战:高效部署分布式消息队列」书籍的总结笔记. 通过前2篇的介绍,了解了消息通信的主要元素和交互过程,以及如何运行和管理RabbitMQ,这篇将站在开发模式的角度理解「面向消息通信」带来的好处,以及在各种场景下的最佳实践. 通过介绍,你会了解到: 面向消息通信的好处 发后即忘模型 用RabbitMQ实现RPC 面向消息通信的好处 主要从异步状态思维.处理能力扩展性.集成复杂度方面,说明面向消息通信的好处. 异步状态思维 当将消息通信集成到应用程序时,开发模式将从同步模型

RabbitMQ介绍2 - 理解消息AMQP

理解消息AMQP通信.官方解释: http://www.rabbitmq.com/tutorials/amqp-concepts.html 概念:生产者producer,消费者consumer,队列queue,交换器exchange,路由键routing key,绑定键binding key. producer发布消息,消息经过交换器传播放入队列,消费者从队列中得到消息. ConnectionFactory, connection, channel信道.connectionFactory用来建立

.Net RabbitMQ之消息通信

1.消息投递服务 RabbitMQ是一种消息投递服务,怎么理解这句话呢?即RabbitMQ即不是消息的生产者,也是消息的消费者.他就像现实生活中快递模式,消费者在电商网站上下单买了一件商品,此时对应的生产者(商家)则生产了一件货物(概念上的生产,可能已经生产好了),接着生产者(商家)将货物发送给快递公司,因为消费者下单了这个货物,相当于订阅了这件货物,所以快递公司将会把这件货物发送给对应的消费者.RabbitMQ就相当于这里面的快递公司.服务在生产者和消费者之间建立桥梁,即通信. 2.Rabbi

RabbitMQ详解(二)------消息通信的概念

RabbitMQ详解(二)------消息通信的概念 消息通信,有很多种,邮箱 qq 微信 短信等,这些通信方式都有发送者,接受者,还有一个中间存储离线消息的容器.但是这些通信方式和RabbitMQ的通信模型是不一样的,比如邮件,邮件服务器基于POP3/SMTP协议,通信双方需要明确指定,并且发送的邮件内容有固定的结构.而RabbitMQ服务器基于AMQP协议,这个协议是不需要明确指定发送方和接受方的,而且发送的消息也没有固定的结构,甚至可直接存储二进制数据,并且和邮件服务器一样,也能存储离线消

电商项目实战(架构八)——RabbitMQ实现延迟消息

一.前言 RabbitMQ是一个开源的消息队列,轻量级且易于部署,并支持多种消息协议.RabbitMQ可以部署在分布式和联合配置中,以满足高规模.高可用性的需求.本文整合RabbitMQ实现延迟消息的过程,以发送延迟消息取消超时订单为例. 二.RabbitMQ的安装和使用 1.安装Erlang,下载地址:http://erlang.org/download/otp_win_64_21.3.exe 2.安装RabbitMQ,下载地址:https://dl.bintray.com/rabbitmq/

Java SpringBoot集成RabbitMq实战和总结

目录 交换器.队列.绑定的声明 关于消息序列化 同一个队列多消费类型 注解将消息和消息头注入消费者方法 关于消费者确认 关于发送者确认模式 消费消息.死信队列和RetryTemplate RPC模式的消息(不常用) 关于消费模型 关于RabbitMq客户端的线程模型 在公司里一直在用RabbitMQ,由于api已经封装的很简单,关于RabbitMQ本身还有封装的实现没有了解,最近在看RabbitMQ实战这本书,结合网上的一些例子和spring文档,实现了RabbitMQ和spring的集成,对着

RabbitMQ实战:可用性分析和实现

本系列是「RabbitMQ实战:高效部署分布式消息队列」书籍的总结笔记. 上一篇介绍了各种场景下的最佳实践,大部分场景可以使用「发后即忘」的模式,不需要响应,如果需要响应,可以使用RabbitMQ的RPC模型. RabbitMQ以异步的方式解耦系统间的关系,调用者将业务请求发送到Rabbit服务器,就可以返回了,Rabbit会确保请求被正确处理,即使遇到网络异常.Rabbit服务器崩溃.整个机房断电等特殊场景,针对这些场景,Rabbit提供了各种机制确保其可用性. 本篇通过总结可能出现的特殊场景

RabbitMQ实战:界面管理和监控

本系列是「RabbitMQ实战:高效部署分布式消息队列」书籍的总结笔记. 上一篇总结了可能出现的异常场景,并对RabbitMQ提供的可用性保证进行了分析,在出现服务器宕机后,仍然可以正常服务.另外,需要尽快恢复异常的服务器,重新加入集群,推送未消费的消息,通过监控可第一时间接收到错误并进行处理. 另外,我们想主动了解消息堆积和消费的情况,以及服务器节点的压力,RabbitMQ提供了几种方式便捷.直观的了解,包括Web管理插件.REST API.rabbitmqadmin脚本. 通过介绍,你会了解