RabbitMQ消息可靠性分析

消息中间件的可靠性是指对消息不丢失的保障程度;而消息中间件的可用性是指无故障运行的时间百分比,通常用几个 9 来衡量。不存在绝对的可靠性只能尽量趋向完美。并且通常可靠性也意味着影响性能和付出更大的成本,因此实际应用时还要根据业务需求,对真正关键的信息来做可靠性保证,并要从生产者、消息队列、消费者三个维度来努力。

1、生产者发送信息的可靠性 

生产者客户端发送出去之后可以发生网络丢包、网络故障等造成消息丢失。一般情况下如果不采取措施,生产者无法感知消息是否已经正确无误的发送到交换器中。如果消息在传输到Exchange的过程中发生失败而可以让生产者感知的话,生产者可以进行进一步的处理动作,比如重新投递相关消息以确保消息的可靠性。为此AMQP协议在建立之初就考虑到这种情况而提供了事务机制。

事务确实能够解决消息发送方和RabbitMQ之间消息确认的问题,只有消息成功被RabbitMQ接收,事务才能提交成功,否则我们便可在捕获异常之后进行事务回滚,与此同时可以进行消息重发。但是使用事务机制的话会“吸干”RabbitMQ的性能,那么有没有更好的方法既能保证消息发送方确认消息已经正确送达,又能基本上不带来性能上的损失呢?从AMQP协议层面来看并没有更好的办法,但是RabbitMQ提供了一个改进方案,即发送方确认机制(publisher confirm)。事务机制在一条消息发送之后会使发送端阻塞,以等待RabbitMQ的回应,之后才能继续发送下一条消息。相比之下,发送方确认机制最大的好处在于它是异步的,一旦发布一条消息,生产者应用程序就可以在等信道返回确认的同时继续发送下一条消息,当消息最终得到确认之后,生产者应用便可以通过回调方法来处理该确认消息,如果RabbitMQ因为自身内部错误导致消息丢失,就会发送一条nack(Basic.Nack)命令,生产者应用程序同样可以在回调方法中处理该nack命令。事务机制和publisher confirm机制两者是互斥的,不能共存。

 2、消息路由到队列的可靠性

消息发送到交换器,如果没有和它匹配队列的话,消息也会丢失,mandatory或者AE可以让消息在路由到队列之前得到极大的可靠性保障。当mandatory参数设为true时,交换器无法根据自身的类型和路由键找到一个符合条件的队列的话,那么RabbitMQ会调用Basic.Return命令将消息返回给生产者。当mandatory参数设置为false时,出现上述情形的话,消息直接被丢弃。

备份交换器,英文名称Alternate Exchange,简称AE,或者更直白的可以称之为“备胎交换器”。生产者在发送消息的时候如果不设置mandatory参数,那么消息在未被路由的情况下将会丢失,如果设置了mandatory参数,那么需要添加ReturnListener的编程逻辑,生产者的代码将变得复杂化。如果你不想复杂化生产者的编程逻辑,又不想消息丢失,那么可以使用备份交换器,这样可以将未被路由的消息存储在RabbitMQ中,再在需要的时候去处理这些消息。 可以通过在声明交换器(调用channel.exchangeDeclare方法)的时候添加alternate-exchange参数来实现,也可以通过策略的方式实现。如果两者同时使用的话,前者的优先级更高,会覆盖掉Policy的设置。

3、消息在队列中持久化

持久化可以提高队列的可靠性,以防在异常情况(重启、关闭、宕机等)下的数据丢失。队列的持久化是通过在声明队列时将durable参数置为true实现的,如果队列不设置持久化,那么在RabbitMQ服务重启之后,相关队列的元数据将会丢失,此时数据也会丢失。

队列的持久化能保证其本身的元数据不会因异常情况而丢失,但是并不能保证内部所存储的消息不会丢失。要确保消息不会丢失,需要将其设置为持久化。通过将消息的投递模式(BasicProperties中的deliveryMode属性)设置为2即可实现消息的持久化。

如果在发送消息时采用了事务机制或者publisher confirm机制的话,服务端的返回是在消息落盘之后执行的,这样可以进一步的提高了消息的可靠性。但是即便如此也无法避免单机故障且无法修复(比如磁盘损毁)而引起的消息丢失,这里就需要引入镜像队列。镜像队列相当于配置了副本,绝大多数分布式的东西都有多副本的概念来确保HA。在镜像队列中,如果主节点(master)在此特殊时间内挂掉,可以自动切换到从节点(slave),这样有效的保证了高可用性,除非整个集群都挂掉。虽然这样也不能完全的保证RabbitMQ消息不丢失(比如机房被炸。。。),但是配置了镜像队列要比没有配置镜像队列的可靠性要高很多,在实际生产环境中的关键业务队列一般都会设置镜像队列。

4、消息消费阶段的可靠性

为了保证消息从队列可靠地达到消费者,RabbitMQ提供了消息确认机制(message acknowledgement)。消费者在订阅队列时,可以指定autoAck参数,当autoAck等于false时,RabbitMQ会等待消费者显式地回复确认信号后才从内存(或者磁盘)中移去消息(实质上是先打上删除标记,之后再删除)。当autoAck等于true时,RabbitMQ会自动把发送出去的消息置为确认,然后从内存(或者磁盘)中删除,而不管消费者是否真正的消费到了这些消息。RabbitMQ不会为未确认的消息设置过期时间,它判断此消息是否需要重新投递给消费者的唯一依据是消费该消息的消费者连接是否已经断开,这么设计的原因是RabbitMQ允许消费者消费一条消息的时间可以很久很久。

如果消息消费失败,也可以调用Basic.Reject或者Basic.Nack来拒绝当前消息而不是确认,如果只是简单的拒绝那么消息会丢失,需要将相应的requeue参数设置为true,那么RabbitMQ会重新将这条消息存入队列,以便可以发送给下一个订阅的消费者。如果requeue参数设置为false的话,RabbitMQ立即会把消息从队列中移除,而不会把它发送给新的消费者。

还有一种情况需要考虑: requeue的消息是存入队列头部的,即可以快速的又被发送给消费,如果此时消费者又不能正确的消费而又requeue的话就会进入一个无尽的循环之中。对于这种情况,笔者的建议是在出现无法正确消费的消息时不要采用requeue的方式来确保消息可靠性,而是重新投递到新的队列中,比如设定的死信队列中,以此可以避免前面所说的死循环而又可以确保相应的消息不丢失。对于死信队列中的消息可以用另外的方式来消费分析,以便找出问题的根本。

RabbitMQ消息可靠性分析

原文地址:https://www.cnblogs.com/doit8791/p/10228269.html

时间: 2024-08-30 13:21:22

RabbitMQ消息可靠性分析的相关文章

RabbitMQ 消息可靠性的机制

RabbitMQ 消息可靠性 一.发布确认机制. 生成者发送消息,Exchange路由消息到队列,RabbitMQ就会给生产者发送确认Ack.(注意:发布确认机制不能和事务机制一起使用) 注意:多消息发布确认机制情况下,倘若要发送 100 条消息,发送 90 条后,突然网络故障,后面的消息发送失败了,那么 isAllPublished 返回的是 false,而前面 90 条消息已经发送到消息队列了.我们还不知道哪些消息是发送失败的,所以很多条消息发布确认,建议分几次发送或多通道发送. 二.持久化

解决RabbitMQ消息丢失问题和保证消息可靠性(一)

原文链接(作者一个人):https://juejin.im/post/5d468591f265da03b810427e 工作中经常用到消息中间件来解决系统间的解耦问题或者高并发消峰问题,但是消息的可靠性如何保证一直是个很大的问题,什么情况下消息就不见了?如何防止消息丢失?下面通过这篇文章,我们就聊聊RabbitMQ 消息可靠性如何解决的? 本文分三部分说明 RabbitMQ 消息丢失场景有哪些? 如何避免消息丢失? 如何设计部署消息中间件保证消息可靠性? RabbitMQ 消息丢失场景有哪些?

RabbitMQ消息丢失问题和保证消息可靠性-消费端不丢消息和HA(二)

继续上篇文章解决RabbitMQ消息丢失问题和保证消息可靠性(一) 未完成部分,我们聊聊MQ Server端的高可用和消费端如何保证消息不丢的问题? 回归上篇的内容,我们知道消息从生产端到服务端,为了保证消息不丢,我们必须做哪些事情? 发送端采用Confirm模式,注意Server端没成功通知发送端,需要重发操作需要额外处理 消息的持久化处理 上面两个操作保证消息到服务端不丢,但是非高可用状态,如果节点挂掉,服务暂时不可用,需要重启后,消息恢复,消息不会丢失,因为有磁盘存储. 本文先从消费端讲起

apache Storm学习之三-消息可靠性

4.1 简介 storm可以确保spout发送出来的每个消息都会被完整的处理.本章将会描述storm体系是如何达到这个目标的,并将会详述开发者应该如何使用storm的这些机制来实现数据的可靠处理. 4.2 理解消息被完整处理 TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("sentences", new KestrelSpout("kestrel.backtype.com", 22

Storm消息可靠性的保障机制

参考[并发编程网]的Storm官方教程翻译 以WordCountToPology为例: // 构造Topology TopologyBuilder builder = new TopologyBuilder(); builder.setSpout(SPOUT_ID,new SentenceSpout(), 2)// 指定 Spout ,2 指的是使用2个executor来运行spout .setNumTasks(4);//指定tasks的数量 // 指定 SentenceSpout 向Split

RabbitMQ消息模型概览(简明教程)

小菜最近用到RabbitMQ,由于之前了解过其他消息中间件,算是有些基础,所以随手从网上搜了几篇文章,准备大概了解下RabbitMQ的消息模型,没想到网上文章千篇一律,写一大堆内容,就是说不明白到底怎么回事,真是逼小菜写博客… 首先说明本文只适合有消息中间件基础的读者,本文不会讲解基础概念,而是一针见血的指明RabbitMQ该怎么用,告诉读者RabbitMQ能做什么,而不是像网络上其他文章那样花里胡哨抓不住重点. 好了,直入正题. simple简单队列 这种队列,纯属RabbitMQ搞的一个花样

Storm系列三: Storm消息可靠性保障

Storm系列三: Storm消息可靠性保障 在上一篇 Storm系列二: Storm拓扑设计 中我们已经设计了一个稍微复杂一点的拓扑. 而本篇就是在上一篇的基础上再做出一定的调整. 在这里先大概提一下上一篇的业务逻辑, 我们会不断收到来自前端的消息,消息包含消息的发送时间,消息内容,结束标识, 消息的发送者, SessionId等其他信息, 我们需要做的事情是当接收到消息之后,根据SessionId判断是否属于同一消息, 如果是的话将内容拼接, 如果结束标识为 true, 表示会话已结束,则存

Openstack中RabbitMQ RPC代码分析

在Openstack中,RPC调用是通过RabbitMQ进行的. 任何一个RPC调用,都有Client/Server两部分,分别在rpcapi.py和manager.py中实现. 这里以nova-scheduler调用nova-compute为例子. nova/compute/rpcapi.py中有ComputeAPI nova/compute/manager.py中有ComputeManager 两个类有名字相同的方法,nova-scheduler调用ComputeAPI中的方法,通过底层的R

Rabbitmq消息持久化

消息的可靠性是RabbitMQ的一大特色,那么RabbitMQ是如何保证消息可靠性的呢--消息持久化.?为了保证RabbitMQ在退出或者crash等异常情况下数据没有丢失,需要将queue,exchange和Message都持久化. queue的持久化 queue的持久化是通过durable=true来实现的.?一般程序中这么使用: /** * amqp_queue_declare * * @param [in] state connection state – TCP连接 * @param