消息中间件-消息的可靠性传递

前言

消息中间件的可靠性消息传递,是消息中间件领域非常重要的方案落实问题(在这之前的MQ理论,MQ选型是抽象层次更高的问题,这里不谈)。

并且这个问题与日常开发是存在较大的关联的。可以这么说,凡是使用了MQ的,机会都要考虑这个问题。当然也有一些原始数据采集,日志数据收集等应用场景对此没有过高要求。但是大多数的业务场景,对此还是有着较高要求的。比如订单系统,支付系统,消息系统等,你弄丢一条消息,嘿嘿。

网上对于这方面的博客,大多从单一MQ,或者干脆就是在论述MQ。我不喜欢这样的论述,这样的论述太过局限,也过于拖沓。

这次,主要从理论方面论证消息的可靠性传递的落实。具体技术,都是依据这些理论的,具体实现都差不多。不过为了便于大家理解,我在文中会以RabbitMq,Kafka这两个主流MQ稍作举例。

在日常开发中,我更倾向于在具体开发前,先整理思路,走通理论,再开始编码。毕竟,如果连理论都走不同,还谈什么编码。

另外,我按照消息可靠性层次逐步推进,形成相应的目录,希望大家喜欢(因为我认为,相较网上这方面现有博客的目录,这样的目录更合理,更人性化)。

概述

这里简单谈一些有关消息可靠性传递的理论。

消息传递次数

消息在消息系统(生产者+MQ+消费者),其消费的次数,无非一下三种情况:

  • 最多一次
  • 最少一次
  • 不多不少一次

消息可靠性层次

这也代表着消息系统的消息可靠性的三个层次:

  • 最多一次:上游服务的消息发出了,至于下游能不能收到服务,就不管了。结果就是下游服务,可能根本就没有接收到消息。
  • 最少一次:上游服务的消息发出了,并通过某些机制,确保下游服务一定收到了该消息。但是收到了几次,就不管了。结果就是下游服务,可能多次收到同一条消息。
  • 不多不少一次:上游服务的消息发出了,并确保下游服务一定收到了消息。下游服务通过某些机制,确保多次收到该消息与单次收到该消息,对其系统状态的影响是相同的。

方案落实

实现上述三个层次,需要逐步从三个方面考虑:

  • 最多一次:会用消息队列即可,只要确保消息的连通性即可
  • 最少一次:通过MQ提供的确认机制,确保消息的传递
  • 不多不少一次:通过外部应用程序,确保消息的单次消费与多次消费对系统状态影响是一致的

上述三个层次,对系统的性能损耗,系统复杂度等都是逐步上升的。

当然,我们首先,需要了解这三个层次分别如何实现。
再在实际开发中,根据需要,灵活选取合适方案。

最多一次的消息传递

这个方案是最简单的,只要确保消息系统的正确运作,以及系统的连通性即可。在正常情况下,可以保证绝大部分数据的可靠性传递。但是仍旧存在极小数据的丢失,并且数据的丢失会因为消息队列的选择,以及消息并发量,而受到影响。

优点

  • 实现简单。只要搭建对应的MQ服务器,写出对应的生产者与消费者,以及相应配置,即可正常工作。

缺点

  • 无法保证数据的可靠性,会存在一定的数据丢失情况,尤其是在并发量较大时

实际应用

可以应用于日志上传这样对消息可靠性要求低的应用场景。

总结

如果数据量不大的情况下,推荐使用RabbitMQ,其消息可靠性在地数据量下,是最可靠的。但是在达到万级并发时,会存在消息丢失,丢失的比例可以达到千分之一。

如果数据量较大的情况下,要么采用集群。要么就采用Kafk(Kafka可支持十万级并发)

一般来说,这种消息可靠性多见于项目初建,或类似日志采集,原始数据采集这样的特定场景。

最少一次的消息传递

这个方案开始利用MQ提供的特定机制,来提高消息传递的可靠性。

优点

  • 不错的消息可靠性。确保不会出现消息丢失的情况
  • 实现并不复杂。只需要合理使用MQ的API,设置合理参数(如重试次数)即可

缺点

  • 会出现消息重复消费的情况
  • 参数的设置需要合理。如重试次数,一般设置为5次,也可根据情况,进行调整
  • 资源占用的提升。如带宽(每次消息成功生产,消费都需要返回一条数据进行确认)等

方案落实

该方案的实现组成,由以下三个方面构成:

  • 消息的可靠生产
  • 消息的可靠存储
  • 消息的可靠消费

通过以上三个方面的落实,确保可消息一定被下游服务消费。

消息的可靠生产

消息的可靠生产,是通过回调确认机制,确保消息一定被消息服务器接收。

消息生产,发送给消息服务器后,消息服务器会返回一个确认信息,表示数据正常接收。

如果生产者在一定时间内没有接收到确认信息,就会触发重试机制,进行消息的重发。

如RabbitMq的comfirm机制,Kafka的acks机制等。

RabbitMq的confirm机制存在三个模式:

  • 普通模式:channel.waitForConfirms()
  • 批量模式:channel.waitForConfirmsOrDie()
  • 异步模式:channel.addConfirmListener()

这三个模式,看名称就可以知道具体作用了。

至于Kafka的acks机制,同样存在三个模式:

  • acks = 0 :不需要Kafka的任何Partition确认,即确认发送成功(这个之确保消息发送出去了,并不保证消息服务器是否成功接收)
  • acks = 1 :(默认)需要Kafka的Partition Leader确认,即被Kafka的一个Partition(Leader)接收。但是这样依旧存在极小概率的消息丢失,即Partition Leader获取了对应消息,并给了acks确认回复。但是在其他Partition同步前,Partition Leader宕机,数据丢失。那么这就造成了消息丢失。
  • acks = all :需要Kafka对应ISR中的全部Partition确认,才确认消息发送成功(当然,这里假定Kafka是多节点集群,如果只有一个分区,那就毫无意义了)。

说到这里,简单说一下,上述的操作可能造成消息的重复生产。

最简单的例子,消息成功发送,但是对应的消息确认信息由于网络波动而丢失。那么生产者就会重复发送该消息,所以消息服务器接收到了两条相同消息,故产生了消息的重复生产。

另外,上述的重试,都是存在响应时长判断(超出1min,就认为数据丢失),以及重试次数限制(超过5次,就不进行重试。否则,大量重试数据可能会拖垮整个服务)。

消息的可靠存储

消息的可靠存储,是确保消息在消息服务器经过,或者说堆积时不会因为宕机,网络等状况,丢失消息。

网上很多博客在论述消息的可靠性传递时,常常把这点遗漏。因为他们理所当然地认为消息队列已经通过集群等实现了消息队列服务的可用性,故消息的可靠性存储也就实现了。

但是这里存在两个问题。第一,可靠性不等于可用性。第二,消息的可靠存储,作为消息可靠性传递的一部分,是不可缺失的。

可用性:确保服务的可用。即对应的服务,可以提供服务。

可靠性:确保服务的正确。即对应的服务,提供的是正确的服务。

区别:我浏览淘宝,淘宝页面打不开,这就涉及了可用性问题(可用性计算公式:可用时间/全部时长*100%)。而我浏览淘宝,查询订单,给我显示的是别人的订单,这就涉及了可靠性问题。

另外这里再纠正一点,可靠性并不依赖于可用性。即使我打不开淘宝页面,我也不能说淘宝提供订单查询就有问题(只是如果没有了可用性,谈论可靠性是非常没有意义的。毕竟都用不了了,谁还关心其内容是否正确呢,都看不到)

消息队列的可用性,是通过多个节点构成集群,避免单点故障,从而提升可用性。

消息队列的可靠存储,是通过备份实现(这里不纠结备份如何确保正确)的。如RabbitMq集群的MemNode与DiskNode,又或者Kafka的replication机制等。

消息的可靠消费

消息的可靠消费,就是确保消息被消费者获取,并被成功消费。避免由于消息丢失,或者消费者宕机而造成消息消费不成功,最终造成消息的丢失(因为RabbitMq服务器在认为消息被成功消费后,将对应数据删除或标记为“已消费”)。

至于消息的可靠消费,核心理念还是重试,重试,再重试。不过具体的实现就八仙过海,各显神通了。

这里分别说一下RabbitMq,Kafka,Rocket三者对于可靠消费的处理:

RabbitMq

提供ack机制。默认是auto,直接在拿到消息时,直接ack。确保了消息到达了消费者,但是无法解决消费者消费失败这样的问题。

实际开发中,为了确保消息的可靠消费,一般会设置为munal,只有在程序正确运行后,才会调用对应api,表示消息正确消费。

Kafka

由于Kafka的消息是落地到硬盘文件的,而且Kafka的消息分发方式是pull的,所以消息的拉取是通过offset机制去确认对应位置消息的。

当然,Kafka的offset默认是自动提交的(可通过nable_auto_commit与auto_commit_interval_ms控制)。

所以消费者调用服务失败等原因,可以通过手动offset提交,来实现对数据的重复消费(甚至是历史数据的消费),也就可以在消费失败时对同一消息进行再消费。

如果是消费者宕机等原因,由于Kafka服务器没有收到对应的offset提交,所以认为那条消息没有被消费成功,故返回的依旧是那条消息。

RocketMq

其实RocketMq的处理有些类似Kafka确认机制+RabbitMq死信队列的感觉。

首先,消费者从RocketMq拉取消息,如果成功消费,就返回确认消息。

如果未成功消费,就尝试重新消费。

尝试消费一定次数后(如5次),就会将该消息发送之RocketMq中的重试队列。

如果遇到消费者宕机的情况,RocketMq会认为该消息未成功消费,会被其他消费者继续消费。

其实在RabbitMq的可靠性消费时,我们也会将多次消费失败的数据保存下来,便于后期修复等。不过保存的方式由很多种,日志,数据库,消息队列等。而RocketMq则给出了具体的落实方案。

上述的操作,可能造成消息的重复消费。

最简单的例子,消息成功被消费者消费,但是消费者还没来得及发送确认信息,就宕机了。

消息队列由于没有收到确认消息,认为该条消息尚未被消息,就将该消息交由其他消费者继续消费。

不多不少一次的消息传递

这个方案,就是通过MQ以外的应用程序,来进行扩展,最终达到消息准确消费的目的。

那么为什么不将这个功能,囊括在MQ中呢?

个人认为有四个方面的考虑:

  • 消息中间件,应该明确其功能域,而消息生产与消息消费往往涉及业务,所以避免与业务的耦合。所以消息中间件只完善了可靠存储。
  • 准确消费,往往涉及MQ以外的部分,需要其他部分的配合。就类似与XA接口一样。这样会带来编码的约束,系统的耦合性等。
  • 准确消费的实现可以通过一个工具,模块去实现,但是不该硬编码。毕竟现有的处理方案并不一定就是最优解(尤其是在调控中心,TCC框架展现的现在)。
  • 性能影响。为了一个不通用的功能,会带来消息中间件的性能大幅下降

优势

  • 确保消息被准确消费(不多不少一次)

缺点

  • 实现复杂(生产者与消费者都需要建立对应数据库)
  • 需要建立对应规范(但是通用规范确定后,实现就会变得快速)
  • 资源占用的提升。如带宽(每次消息成功生产,消费都需要返回一条数据进行确认)等

存在的问题

消息存储部分的准确存储,不该我们来操心,所以只阐述消息生产与消息消费两个部分。

消息的重复生产

  • 消息发给了消息队列服务器,消息队列服务器的确认信息由于网络波动等,没有及时到达生产者
  • 消息发送给了消息队列服务器,生产者在接收消息前,宕机
  • 消息发送给了消息队列服务器,生产者在接收消息后,还没来得及进行确认逻辑,宕机

综上来看,就是消息发出后,到生产者消息确认信息的处理之间,出现各种意外,导致重复生产。

消息的重复消费

  • 消息已经被消费,消费者还没来得及发送确认信息,就宕机了
  • 消息已经被消费,消费者发出确认信息,确认信息由于网络波动等,没有及时到达消息队列服务器
  • 消息已经被消费,消费者发出确认信息,消息队列服务器对应实例在接收到确认信息前,宕机
  • 消息已经被消费,消费者发出确认信息,消息队列服务器接受到了确认信息,还没来得及进行确认逻辑,宕机

综上来看,就是消息已经被消费后,到消息队列服务器进行确认消息处理之间,出现各种意外,导致重复消费。

解决方案

解决方案:messageId+幂等

准确来说,解决方案的核心是幂等,而messageId是作为辅助手段的。

幂等

幂等,简单说明一下,就是多次操作与单次操作对系统状态的影响是一致的。

i = 1; 

就是幂等操作,因为无论进行几次,i的值都没有变化。

i++;

则不是幂等操作,因为i的值与执行次数息息相关。

故通过幂等操作来确保同一条消息,不被执行多次。

messageId

但是,消费者如何确定是否为同一条消息呢?

有的消息体存在唯一性字段,如orderId等。但有的消息并没有这样的唯一性字段。

所以需要一个专门的字段,来表示唯一性,并且与业务消息解耦。这就是messageId。

既可以采用消息体的唯一性字段(可以是单一字段,也可以是组合字段),也可以通过特定方式生成对应标识。

具体的生成情况,就不在这里赘述了。

方案落实

先来一张大图(这种事情,图片展示最直观了),展示一下流程:

(图片是绝对清晰的。看不清图片的朋友,请将图片在新页面打开,或下载。说实话,来到新公司,首先提升的就是画图能力。囧)

简单说一下流程,大家可以对照着上图,看一下:

生产者到消息中间件服务器

  1. 生产者根据需要发送的消息,生成对应messageId。并封装对应message至生产者数据库(该操作应该利用事务性,确保生产者事件处理与message保存至数据库的原子性),同时标注message状态为sending(发送中状态)
  2. 将对应message发送给消息队列服务器
  3. 如果没有收到生产确认信息,则重新发送message(如果这个时候遇到生产者实例宕机,也不用担心。因为后续会有补偿程序,进行补偿重发操作)
  4. 当收到消息中间件服务器的消息生产确认消息(即确定消息已经达到消息中间件服务器),将数据库中对应message的状态修改为sended(已发送状态)

上述中提到的补偿机制,其实是类似事务中的一个操作。通过一个定时任务,定时巡检数据库处于sending状态的message,并通过生产者极性发送(所以message一般都保存source,target等信息)。

之所以会有sending状态的message,就是因为存在生产者消息发送出去了,还没收到生产确认信息,结果生产者实例自己宕机的情况。

至于补偿机制的定时任务,是一个非常简单的实现,这里就不再赘述了。

消息中间件到消费者

这里进行的操作是针对非幂等的操作。

如果是幂等操作,则可以直接进行。毕竟多次执行与单次执行对数据库的影响是一致的。

但是注意幂等操作在部分场景下无效的问题(时间影响上),如“余额 = 1k”的操作对于数据库而言是幂等的,但是在两次“余额 = 1k”操作间,有一个“余额 = 2k”的操作,则会发生问题(丢失了“余额 = 2k”操作)。当然,这种类似ABA问题,完全可以引入版本号,来进行解决。

综上,还是推荐采用以下解决方法,流程较为简单:

  1. 消费者获取数据
  2. 消费者判断数据库是否有对应message
  3. 如果存在对应message,则放弃执行(因为这是一个重复操作)
  4. 如果不存在,则进行相关消息处理。并通过事务控制,在消费者数据库中添加message(确保消息的处理与数据库添加message是原子操作)

至此,消息的准确传递就完成了。

总结

消息可靠性传递的发展过程,也体现了人们对消息中间件功能的一步步追求,更是体现了工程师们解决问题的思路。

很多时候,我们会遇到很多问题,甚至令人感到杂乱不堪,无从下手。这个时候,最好的办法就是静下心来,对它们进行划分(按照重要程度,紧迫度,实现难度),再进行一个长期规划,一步步来解决。往往这个时候,动动笔,在笔记上列下清单,会是一个不错的办法。

其中消息的准确传递,涉及一些事务相关的内容。也许有人已经联想到,消息队列是否可以作为分布式事务的一种手段呢?我会在之后的博客中,来阐述分布式事务这一重要主题。

原文地址:https://blog.51cto.com/14230003/2467291

时间: 2024-10-10 11:21:01

消息中间件-消息的可靠性传递的相关文章

Storm 官方文档翻译 --- 消息的可靠性保障

消息的可靠性保障 Storm 能够保证每一个由 Spout 发送的消息都能够得到完整地处理.本文详细解释了 Storm 如何实现这种保障机制,以及作为用户如何使用好 Storm 的可靠性机制. 消息的“完整性处理”是什么意思 一个从 spout 中发送出的 tuple 会产生上千个基于它创建的 tuples.例如,有这样一个 word-count 拓扑: TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("sent

IM系统如何保障消息高可靠性

消息传输的高可靠性在即时通讯系统中是最为基础.也是最核心的部分之一,同时也是衡量通讯系统的质量的重要指标.本文主要描述常见通讯系统模型的实现原理,分析影响消息传输可靠性的常见问题,并介绍有度即时通服务体系是如何通过架构及技术细节实现高可靠的消息送达机制.快递式系统模型是常见的即时通讯系统模型,主要结构如下图所示:该模型的消息发送过程不是一个完整事务,以下这些场景将出现丢消息的问题:1. ClientA将发消息送给Server,Server收到后回复ClientA发送成功,后续中转由Server保

线程给主窗口发事件(消息),传递字符串

代码来自安晓辉: #ifndef CUSTOMEVENT_H #define CUSTOMEVENT_H #include <QEvent> #include <QString> class CustomEvent : public QEvent // 自定义事件 { public: CustomEvent(const QString & msg); static QEvent::Type m_eventType; // 此事件的类型 static QEvent::Type

Kafka消息的可靠性测试--针对直播业务的方案选择

转自:http://blog.csdn.net/bailove/article/details/44240303 业务场景 来疯直播互动平台,每天有数百万人上下线,有数十万人同时参与互动直播聊天.用户的登陆.退出及用户间的各种交互行为如聊天.送礼.关注.投票.抢沙发等等事件都会产生大量的消息.这些消息具有瞬间爆发性,比如热门直播间刚开播,直播表演的高潮等等.而用户的礼物.星星.喇叭.沙发等这类消息是不允许丢失,必须100%送达.这就需要有一个高性能,高可靠,稳定可拓展的消息服务平台的支撑.它要求

kafka消息的可靠性

本文来自网易云社区 作者:田宏增 Kafka的高可靠性的保障来源于其健壮的副本(replication)策略.通过调节其副本相关参数,可以使得Kafka在性能和可靠性之间运转的游刃有余.Kafka从0.8.x版本开始提供partition级别的复制,replication的数量可以在$KAFKA_HOME/config/server.properties中配置. Kafka中消息是以topic进行分类的,生产者通过topic向Kafka broker发送消息,消费者通过topic读取数据.然而t

如何保证消息的可靠性传输?或者说,如何处理消息丢失的问题?

面试题 面试官心理分析 这个是肯定的,用 MQ 有个基本原则,就是数据不能多一条,也不能少一条,不能多,就是前面说的重复消费和幂等性问题.不能少,就是说这数据别搞丢了.那这个问题你必须得考虑一下. 如果说你这个是用 MQ 来传递非常核心的消息,比如说计费.扣费的一些消息,那必须确保这个 MQ 传递过程中绝对不会把计费消息给弄丢. 面试题剖析 数据的丢失问题,可能出现在生产者.MQ.消费者中,咱们从 RabbitMQ 和 Kafka 分别来分析一下吧. RabbitMQ 生产者弄丢了数据 生产者将

Android 消息广播Intent传递数据

1.创建布局文件activity_broadcast.xml <RelativeLayout xmlns:android="http://schemas.android.com/apk/res/android" xmlns:tools="http://schemas.android.com/tools" android:layout_width="match_parent" android:layout_height="match

消息中间件(消息队列MQ)简介

一.为什么要使用MQ 1. 异步:快速返回 2. 解耦:解除依赖 3. 削峰填谷 二.MQ的缺点 1. 系统可用性降低,因为MQ可能会挂 2. 系统复杂性提高,要考虑消息重复.丢失.顺序等问题 3. 数据一致性问题,生产者并不知道消费者是否真正消费了 三.怎么保证MQ消息不丢失 1. 生产者丢失数据,confirm机制 2. MQ丢失数据,持久化到磁盘 3. 消费者丢失数据,确认机制 四.怎么保证MQ高可用性 1. 单机模式 2. 普通集群模式,无法做到真正的高可用 3. 镜像集群模式,高可用但

JMS消息的可靠性机制

ActiveMQ消息签收机制: 客户端成功接收一条消息的标志是一条消息被签收,成功应答. 消息的签收请求分为两种: 1.带事务的session 如果session带有事务,并且事务成功提交,则消息被自动签收.如果事务回滚,则消息会被再次传送. 2.不带事务的session 不带事务的session的签收方式,取决于session的配置 ActiveMQ支持以下三种模式: Seesion.AUTO_ACKNOWLEDGE:消息自动签收: Session.CLIENT_ACKNOWLEDGE:客户端