如何保证RabbitMQ全链路数据100%不丢失

我们都知道,消息从生产端到消费端消费要经过3个步骤:

生产端发送消息到RabbitMQ;
RabbitMQ发送消息到消费端;
消费端消费这条消息;

这3个步骤中的每一步都有可能导致消息丢失,消息丢失不可怕,可怕的是丢失了我们还不知道,所以要有一些措施来保证系统的可靠性。这里的可靠并不是一定就100%不丢失了,磁盘损坏,机房爆炸等等都能导致数据丢失,当然这种都是极小概率发生,能做到99.999999%消息不丢失,就是可靠的了。下面来具体分析一下问题以及解决方案。

生产端可靠性投递
生产端可靠性投递,即生产端要确保将消息正确投递到RabbitMQ中。生产端投递的消息丢失的原因有很多,比如消息在网络传输的过程中发生网络故障消息丢失,或者消息投递到RabbitMQ时RabbitMQ挂了,那消息也可能丢失,而我们根本不知道发生了什么。针对以上情况,RabbitMQ本身提供了一些机制。

事务消息机制
事务消息机制由于会严重降低性能,所以一般不采用这种方法,我就不介绍了,而采用另一种轻量级的解决方案——confirm消息确认机制。

confirm消息确认机制
什么是confirm消息确认机制?顾名思义,就是生产端投递的消息一旦投递到RabbitMQ后,RabbitMQ就会发送一个确认消息给生产端,让生产端知道我已经收到消息了,否则这条消息就可能已经丢失了,需要生产端重新发送消息了。

通过下面这句代码来开启确认模式:

channel.confirmSelect();// 开启发送方确认模式

然后异步监听确认和未确认的消息:

channel.addConfirmListener(new ConfirmListener() {
//消息正确到达broker
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
System.out.println("已收到消息");
//做一些其他处理
}

//RabbitMQ因为自身内部错误导致消息丢失,就会发送一条nack消息
@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
System.out.println("未确认消息,标识:" + deliveryTag);
//做一些其他处理,比如消息重发等
}
});

这样就可以让生产端感知到消息是否投递到RabbitMQ中了,当然这样还不够,稍后我会说一下极端情况。

消息持久化
那消息持久化呢?我们知道,RabbitMQ收到消息后将这个消息暂时存在了内存中,那这就会有个问题,如果RabbitMQ挂了,那重启后数据就丢失了,所以相关的数据应该持久化到硬盘中,这样就算RabbitMQ重启后也可以到硬盘中取数据恢复。那如何持久化呢?

message消息到达RabbitMQ后先是到exchange交换机中,然后路由给queue队列,最后发送给消费端。

所有需要给exchange、queue和message都进行持久化:

exchange持久化:

//第三个参数true表示这个exchange持久化
channel.exchangeDeclare(EXCHANGE_NAME, "direct", true);

queue持久化:

//第二个参数true表示这个queue持久化
channel.queueDeclare(QUEUE_NAME, true, false, false, null);

message持久化:

//第三个参数MessageProperties.PERSISTENT_TEXT_PLAIN表示这条消息持久化
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes(StandardCharsets.UTF_8));

这样,如果RabbitMQ收到消息后挂了,重启后会自行恢复消息。

到此,RabbitMQ提供的几种机制都介绍完了,但这样还不足以保证消息可靠性投递RabbitMQ中,上面我也提到了会有极端情况,比如RabbitMQ收到消息还没来得及将消息持久化到硬盘时,RabbitMQ挂了,这样消息还是丢失了,或者RabbitMQ在发送确认消息给生产端的过程中,由于网络故障而导致生产端没有收到确认消息,这样生产端就不知道RabbitMQ到底有没有收到消息,就不好做接下来的处理。

所以除了RabbitMQ提供的一些机制外,我们自己也要做一些消息补偿机制,以应对一些极端情况。接下来我就介绍其中的一种解决方案——消息入库。

消息入库
消息入库,顾名思义就是将要发送的消息保存到数据库中。

首先发送消息前先将消息保存到数据库中,有一个状态字段status=0,表示生产端将消息发送给了RabbitMQ但还没收到确认;在生产端收到确认后将status设为1,表示RabbitMQ已收到消息。这里有可能会出现上面说的两种情况,所以生产端这边开一个定时器,定时检索消息表,将status=0并且超过固定时间后(可能消息刚发出去还没来得及确认这边定时器刚好检索到这条status=0的消息,所以给个时间)还没收到确认的消息取出重发(第二种情况下这里会造成消息重复,消费者端要做幂等性),可能重发还会失败,所以可以做一个最大重发次数,超过就做另外的处理。

这样消息就可以可靠性投递到RabbitMQ中了,而生产端也可以感知到了。

消费端消息不丢失
既然已经可以让生产端100%可靠性投递到RabbitMQ了,那接下来就改看看消费端的了,如何让消费端不丢失消息。

默认情况下,以下3种情况会导致消息丢失:

在RabbitMQ将消息发出后,消费端还没接收到消息之前,发生网络故障,消费端与RabbitMQ断开连接,此时消息会丢失;
在RabbitMQ将消息发出后,消费端还没接收到消息之前,消费端挂了,此时消息会丢失;
消费端正确接收到消息,但在处理消息的过程中发生异常或宕机了,消息也会丢失。

其实,上述3中情况导致消息丢失归根结底是因为RabbitMQ的自动ack机制,即默认RabbitMQ在消息发出后就立即将这条消息删除,而不管消费端是否接收到,是否处理完,导致消费端消息丢失时RabbitMQ自己又没有这条消息了。

所以就需要将自动ack机制改为手动ack机制。

消费端手动确认消息:

DeliverCallback deliverCallback = (consumerTag, delivery) -> {
try {
//接收到消息,做处理
//手动确认
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
} catch (Exception e) {
//出错处理,这里可以让消息重回队列重新发送或直接丢弃消息
}
};
//第二个参数autoAck设为false表示关闭自动确认机制,需手动确认
channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> {});

这样,当autoAck参数置为false,对于RabbitMQ服务端而言,队列中的消息分成了两个部分:一部分是等待投递给消费端的消息;一部分是已经投递给消费端,但是还没有收到消费端确认信号的消息。如果RabbitMQ一直没有收到消费端的确认信号,并且消费此消息的消费端已经断开连接或宕机(RabbitMQ会自己感知到),则RabbitMQ会安排该消息重新进入队列(放在队列头部),等待投递给下一个消费者,当然也有能还是原来的那个消费端,当然消费端也需要确保幂等性。

好了,到此从生产端到RabbitMQ再到消费端的全链路,就可以保证数据的不丢失。

由于个人水平有限,有些地方可能理解错了或理解不到位的,请大家多多指出!Thanks

原文地址:https://www.cnblogs.com/Morus-alba/p/10534395.html

时间: 2024-08-30 02:11:26

如何保证RabbitMQ全链路数据100%不丢失的相关文章

互联网面试必杀:如何保证消息中间件全链路数据100%不丢失

背景引入 这篇文章,我们来聊聊在线上生产环境使用消息中间件技术的时候,从前到后的全链路到底如何保证数据不能丢失. 这个问题,在互联网公司面试的时候高频出现,而且也是非常现实的生产环境问题. 如果你的简历中写了自己熟悉MQ技术(RabbitMQ.RocketMQ.Kafka),而且在项目里有使用的经验,那么非常实际的一个生产环境问题就是:投递消息到MQ,然后从MQ消费消息来处理的这个过程,数据到底会不会丢失. 面试官此时会问:如果数据会丢失的话,你们项目生产部署的时候,是通过什么手段保证基于MQ传

大数据理论体系总结--数据仓库管理与全链路数据体系

前言 就这样,大数据领域蓬勃发展了好几年,有很多伙伴执迷于技术,成为了分布式计算与存储的领域专家.也有很多伙伴执迷于数据,成为了行业的数据研发专家.当然还有很多小伙伴,热衷于工具系统开发,成为了数据技术专家.那么我们回过头来考虑,什么是大数据,什么又是数据仓库,什么又是数据技术.大数据其实是个非常笼统的感念,它是由数据仓库演化而来的数据与技术方法论,那么我们先说一下数据仓库的由来: 早在多年以前在Hadoop.Spark.Storm.Kafka等系列分布式计算与存储.消息中间件还没有成熟的时候,

大数据从基础到项目实战(一站式全链路最佳学习路径)

大数据从基础到项目实战(一站式全链路最佳学习路径)课程链接:https://pan.baidu.com/s/1HC9zqxwUFNBJHT9zP1dlvg 密码:xdgd 本课程为就业课程,以完整的实战项目为主线,项目各个环节既深入讲解理论知识,又结合项目业务进行实操,从而达到一站式学习,让你快速达到就业水平. 全真企业项目全流程演示: 大数据生产->采集->存储->处理->计算->分析(离线+实时)->抽取(离线+实时)->Java接口->可视化Web展示

来自滴滴、微博、唯品会、魅族、点评关于全链路压测等问题实践分享

架构师小组交流会:每期选一个时下最热门的技术话题进行实践经验分享. 第二期:因为大家对全链路压测的问题比较感兴趣,因此做了一番探讨. 参与嘉宾:滴滴技术负责人彭令鹏.魅族系统架构师何伟.唯品会应用架构负责人张广平.新浪微博技术专家聂永.大众点评交易平台技术负责人陈一方.七牛云首席架构师李道兵. 本文是对此次交流的整理,欢迎探讨. 第一轮:自由交流 滴滴彭令鹏:大家好,我是彭令鹏,目前在滴滴平台部负责整个专快车业务的稳定性和效率,之前在百度做了5年半的网页搜索部底层架构的工作.现在在滴滴主要在推四

饿了么全链路压测平台的实现与原理

背景 在上篇文章中,我们曾介绍过饿了么的全链路压测的探索与实践,重点是业务模型的梳理与数据模型的构建,在形成脚本之后需要人工触发执行并分析数据和排查问题,整个过程实践下来主要还存在以下问题: 测试成本较高,几乎每个环节都需要人力支撑,费时费力. 由于测试用例较多,涉及的测试机范围较广,手工执行容易犯错,线上测试尤其危险. 记录结果和测试报告极不方便,需要二次加工.填写和上传. 测试过程中靠手工监控,覆盖不全且定位问题困难. 基于这些因素,我们决定推进全链路压测的自动化进程.这篇我们主要介绍全链路

有赞全链路压测实战

一.前言 有赞致力于成为商家服务领域里最被信任的引领者,因为被信任,所有我们更需要为商家保驾护航,保障系统的稳定性.有赞从去年开始通过全链路压测,模拟大促真实流量,串联线上全部系统,让核心系统同时达到流量峰值: 验证大促峰值流量下系统稳定性 容量规划 进行强弱依赖的划分 降级.报警.容灾.限流等演练 …通过全链路压测这一手段,对线上系统进行最真实的大促演练,获取系统在大压力时的表现情况,进而准确评估线上整个系统集群的性能和容量水平,不辜负百万商家的信任. 有赞对于性能测试主要有线下单系统单接口.

双11 背后的全链路可观测性:阿里巴巴鹰眼在“云原生时代”的全面升级

点击下载<不一样的 双11 技术:阿里巴巴经济体云原生实践> 本文节选自<不一样的 双11 技术:阿里巴巴经济体云原生实践>一书,点击上方图片即可下载! 作者: 周小帆(承嗣)? 阿里云中间件技术部高级技术专家 王华锋(水彧)??阿里云中间件技术部技术专家 徐彤(绍宽)??阿里云中间件技术部技术专家 夏明(涯海)??阿里云中间件技术部技术专家 导读:作为一支深耕多年链路追踪技术 (Tracing) 与性能管理服务 (APM) 的团队,阿里巴巴中间件鹰眼团队的工程师们见证了阿里巴巴基

阿里10年分布式技术沉淀:阿里高可用体系核心缔造者、全链路压测创始人告诉你!

原文链接 7月27日,云栖社区.阿里中间件将举办首届阿里巴巴中间件技术峰会,揭秘阿里10年分布式技术干货.目前活动官网已上线:https://yq.aliyun.com/promotion/262, 点击报名. 本次活动看点十足,大咖齐聚.纯正干货,下面给大家做下详解介绍,相信看后定会让你动心! 议题详情 双11核武器全链路压测--张军 / 阿里巴巴中间件高级技术专家 阿里巴巴双11备战期间,保障系统稳定性最大的难题在于容量规划,而容量规划最大的难题在于准确评估从用户登录到完成购买的整个链条中,

全链路设计与实践

背景 随着公司业务的高速发展,公司服务之间的调用关系愈加复杂,如何理清并跟踪它们之间的调用关系就显的比较关键.线上每一个请求会经过多个业务系统,并产生对各种缓存或者 DB 的访问,但是这些分散的数据对于问题排查,或者流程优化提供的帮助有限.在这样复杂的业务场景下,业务流会经过很多个微服务的处理和传递,我们难免会遇到这些问题: 一次请求的流量从哪个服务而来? 最终落到了哪个服务中去? 为什么这个请求这么慢? 到底哪个环节出了问题? 这个操作需要依赖哪些东西? 是数据库还是消息队列? Redis挂了