[转帖]kafka 如何保证数据不丢失

https://www.cnblogs.com/MrRightZhao/p/11498952.html

一般我们在用到这种消息中件的时候,肯定会考虑要怎样才能保证数据不丢失,在面试中也会问到相关的问题。但凡遇到这种问题,是指3个方面的数据不丢失,即:producer consumer 端数据不丢失  broker端数据不丢失下面我们分别从这三个方面来学习,kafka是如何保证数据不丢失的

一.producer 生产端是如何保证数据不丢失的

  1.ack的配置策略

  acks = 0    生产者发送消息之后 不需要等待服务端的任何响应,它不管消息有没有发送成功,如果发送过程中遇到了异常,导致broker端没有收到消息,消息也就丢失了。实际上它只是把消息发送到了socketBuffer(缓存)中,而socketBuffer什么时候被提交到broker端并不关心,它不担保broker端是否收到了消息,但是这样的配置对retry是不起作用的,因为producer端都不知道是否发生了错误,而且对于offset的获取永远都是-1,因为broker端可能还没有开始写数据。这样不保险的操作为什么还有这样的配置?kafka对于收集海量数据,如果在收集某一项日志时是允许数据量有一定丢失的话,是可以用这种配置来收集日志。    acks = 1(默认值)    生产者发送消息之后,只要分区的leader副本成功写入消息,那么它就会收到来自服务端的成功响应。其实就是消息只发给了leader leader收到消息后会返回ack到producer端。如果消息无法写入leader时(选举、宕机等情况时),生产都会收到一个错误的响应,为了避免消息丢失,生产者可以选择重发消息,如果消息成功写入,在被其它副本同步数据时leader  崩溃,那么此条数据还是会丢失,因为新选举的leader是没有收到这条消息,ack设置为1是消息可靠性和吞吐量折中的方案。

  acks = all (或-1)    生产者在发送消息之后,需要等待ISR中所有的副本都成功写入消息之后才能够收到来自服务端的成功响应,在配置环境相同的情况下此种配置可以达到最强的可靠性。即:在发送消息时,需要leader 向fllow 同步完数据之后,也就是ISR队列中所有的broker全部保存完这条消息后,才会向ack发送消息,表示发送成功。 

2.retries的配置策略

  在kafka中错误分为2种,一种是可恢复的,另一种是不可恢复的。  可恢复性的错误:      如遇到在leader的选举、网络的抖动等这些异常时,如果我们在这个时候配置的retries大于0的,也就是可以进行重试操作,那么等到leader选举完成后、网络稳定后,这些异常就会消息,错误也就可以恢复,数据再次重发时就会正常发送到broker端。需要注意retries(重试)之间的时间间隔,以确保在重试时可恢复性错误都已恢复。  不可恢复性的错误:      如:超过了发送消息的最大值(max.request.size)时,这种错误是不可恢复的,如果不做处理,那么数据就会丢失,因此我们需要注意在发生异常时把这些消息写入到DB、缓存本地文件中等等,把这些不成功的数据记录下来,等错误修复后,再把这些数据发送到broker端。

  我们上面讲了2个配置项的作用,下面结合实际场景如何使用

3.如何选取

1.高可用型  配置:acks = all,retries > 0 retry.backoff.ms=100(毫秒) (并根据实际情况设置retry可能恢复的间隔时间)  优点:这样保证了producer端每发送一条消息都要成功,如果不成功并将消息缓存起来,等异常恢复后再次发送。  缺点:这样保证了高可用,但是这会导致集群的吞吐量不是很高,因为数据发送到broker之后,leader要将数据同步到fllower上,如果网络带宽、不稳定等情况时,ack响应时间会更长2.折中型  配置:acks = 1  retries > 0 retries 时间间隔设置 (并根据实际情况设置retries可能恢复的间隔时间)  优点:保证了消息的可靠性和吞吐量,是个折中的方案  缺点:性能处于2者中间

3.高吞吐型   配置:acks = 0  优点:可以相对容忍一些数据的丢失,吞吐量大,可以接收大量请求  缺点:不知道发送的消息是 否成功

二.consumer端是如何保证数据不丢失的

  1.consumer端的配置项

group.id: consumer group 分组的一个id  消费者隶属的消费组名称。在kafka中只允许消息只能被某个组里面的一个consumer端消费,如果为空,则会报异常。  对于一个新的consumer加入到消费时,肯定会隶属于哪个组,只有这样才能消费数据
auto.offset.reset = earliest(最早) /latest(最晚)  从何处开始进行消费  当一个新加入的consumer要进行消费数据,如果这个consumer是做数据分析工作的,是需要以前的历史数据那就需要从最早的位置消费数据,如果仅仅是查看消费情况,那可以从最晚位置开始消费数据
enable.auto.commit = true/false(默认true)  是否开启自动提交消费位移的功能,默认开启.  当设置为true时,意味着由kafka的consumer端自己间隔一定的时间会自动提交offset,如果设置成了fasle,也就是由客户端(自己写代码)来提交,那就还得控制提交的时间间隔auto.commit.interval.msauto.commit.interval.ms  当enable.auto.commit设置为true时才生效,表示开启自动提交消费位移功能时自动提交消费位移的时间间隔。

  2.consumer端的配置策略

在consumer消费阶段,对offset的处理,关系到是否丢失数据,是否重复消费数据,因此,我们把处理好offset就可以做到exactly-once && at-least-once(只消费一次)数据。当enable.auto.commit=true时    表示由kafka的consumer端自动提交offset,当你在pull(拉取)30条数据,在处理到第20条时自动提交了offset,但是在处理21条的时候出现了异常,当你再次pull数据时,由于之前是自动提交的offset,所以是从30条之后开始拉取数据,这也就意味着21-30条的数据发生了丢失。当enable.auto.commit=false时    由于上面的情况可知自动提交offset时,如果处理数据失败就会发生数据丢失的情况。那我们设置成手动提交。    当设置成false时,由于是手动提交的,可以处理一条提交一条,也可以处理一批,提交一批,由于consumer在消费数据时是按一个batch来的,当pull了30条数据时,如果我们处理一条,提交一个offset,这样会严重影响消费的能力,那就需要我们来按一批来处理,或者设置一个累加器,处理一条加1,如果在处理数据时发生了异常,那就把当前处理失败的offset进行提交(放在finally代码块中)注意一定要确保offset的正确性,当下次再次消费的时候就可以从提交的offset处进行再次消费。

  3.comsumer 的应用场景

  1.一直commit offset的处理    假如poll了100条数据,每处理1条,commit offset一次,这样会严重影响性能,在处理的时候设置1个计数器(或累加器),按一批来提交,但要确保提交offset的准确性    2.rebalance的影响    在处理数据时,有2种情况会发生,一种情况是处理了一半的时候,发生了rebalance,但是offset还没有来得及提交,另一种情况是rebalance发生后,重新分配了offset,在这种情况时会发生错误。    3.消息处理错误时的处理     假如consumer在处理数据的时候失败了,那么可以把这条数据给缓存起来,可以是redis、DB、file等,也可以把这条消息存入专门用于存储失败消息的topic中,让其它的consumer专门处理失败的消息。  4.处理消息的时间过长    假如poll一批100条消息的时间是1秒钟,但是在每处理1条需要花费1秒钟,这样来说极其影响消费能力,那我们可以把100条消息放到1个线程池中处理。这里特别特别注意,由于线程池的处理行为是并行的,所以要做对offset的判断。这里先说正常情况,如果消息都能被正常处理,那么会提交1个offset,并把这个offset存起来,假如此时又提交了1个offset,把2个offset相对比,哪个大把哪个存起来并做提交。如果消息处理发生了错误,我们在前面讲过,把这个错误消息发送到专门处理错误的topic中,让专门的consumer来处理。

  4.consumer 保证确保消息只被处理一次处理,同时确保幂等性

exactly-once & at-least-once
如何保证消息只获取一次并且确定被处理呢?这就需要我们在处理消息的时候要添加一个unique key
    假如pull 一个batch 100条的消息,在处理到第80条的时候,由于网络延迟、或者crash的原因没有来得及提交offset,被处理的80条数据都添加了unique key, 可以存到到DB中或者redis中(推荐,因为这样更快),当consumer端会再次poll消费数据时,因为没有提交offset,所以会从0开始消费数据,如果对之前已经消息过的数据没有做unique key的处理,那么会造成重复消息之前的80条数据,但是如果把每条对应的消息都添加了unique key,那就只需要对被处理的消息进行判断,有没有unique key 就可以做到不重复消费数据的问题,这样也同时保证了幂等性。

三.broker端是如何保证数据不丢失的

  1.broker端的配置项

  以下参数都是在创建topic时进行设置

1.replication-factor 3    在创建topic时会通过replication-factor来创建副本的个数,它提高了kafka的高可用性,同时,它允许n-1台broker挂掉,设置好合理的副本因子对kafka整体性能是非常有帮助的,通常是3个,极限是5个,如果多了也会影响开销。

2.min.insync.replicas = 2     分区ISR队列集合中最少有多少个副本,默认值是1    3.unclean.leander.election.enable = false     是否允许从ISR队列中选举leader副本,默认值是false,如果设置成true,则可能会造成数据丢失。

 2.leader选举造成的数据丢失

  3个replica分别为0 1 2,0为leader,数据都能完全同步到100,在某一时刻,分别有2个fllow挂掉了,此时有producer往0 的replica上发送50条数据完后,此时的leader挂掉了,而此时刚好的1个fllow起来了,它没有向leader上feach数据,因为leader已经不存在了,此时有2种处理方法:重新起来的fllow可以成为1个leader,需要通过 unclean.leader.election.enable=true,这样做保证了高可用,但是这样做的弊端是:新起来的fllow成为了leader,但是它会丢失部分数据,虽然这样保证了高可用。另一种情况是设置为false,不让fllow竞选leader,但是这样也会造成数据的丢失。假如在ISR的队列里面,只有0 1,但此时replica 1 没有来得及向leader feach数据leader挂掉了,这样也会造成数据的丢失。

  3.broker端的配置策略

  min.insync.replica    在一个topic中,1个分区 有3个副本,在创建时设置了min.insync.replica=2,假如此时在ISR中只有leader副本(1个)存在,在producer端生产数据时,此时的acks=all,这也就意味着在producer向broker端写数据时,必须保证ISR中指定数量的副本(包含leader、fllow副本)全部同步完成才算写成功,这个数量就是由min.insync.replica来控制的,这样producer端向broker端写数据是不成功,因为ISR中只有leader副本,min.insync.replica要求2个副本,此时的producer生产数据失败(异常),当然consumer端是可以消费数据的,只不过是没有新数据产生而已.这样保证了数据的一致性,但这样会导致高可用性降低了。一般的配置是按: n/2 +1 来配置min.insync.replicas 的数量的,同时也要将unclean.leader.election.enable=false    unclean.leader.election.enable    假如现在有leader 0 fllow 1 fllow 2 三个副本,存储的数据量分别是10 9 8,此时的broker的配置是:min.insync.replica=2 acks=all,leader的数据更新到了15,在没有同步到fllow 1 fllow 2时挂掉了,此时的ISR队列中是有fllow 1 和fllow 2的,如果unclean.leader.election.enable设置的是true,表示在ISR中的副本是可以竞选leader这样就会造成9-15或8-15之间的数据丢失,所以unclean.leader.election.enable必须设置成成false,这样整个kafka cluster都不读写了,这样就保证了数据的高度一致性.

   我们通过producer consumer broker 三个方面来讲述怎样保证数据在生产过程中不丢失,在发到broker(服务端)不丢失,在消费时不消费重复数据,其中通过学习kafka就是了解各种配置项控制的功能,后续我会总结梳理这三块的服务参数。

原文地址:https://www.cnblogs.com/jinanxiaolaohu/p/11867269.html

时间: 2024-10-08 11:08:35

[转帖]kafka 如何保证数据不丢失的相关文章

Spark Streaming和Kafka整合保证数据零丢失

当我们正确地部署好Spark Streaming,我们就可以使用Spark Streaming提供的零数据丢失机制.为了体验这个关键的特性,你需要满足以下几个先决条件: 1.输入的数据来自可靠的数据源和可靠的接收器: 2.应用程序的metadata被application的driver持久化了(checkpointed ); 3.启用了WAL特性(Write ahead log). 下面我将简单地介绍这些先决条件. 可靠的数据源和可靠的接收器 对于一些输入数据源(比如Kafka),Spark S

Spark Streaming使用Kafka保证数据零丢失

来自: https://community.qingcloud.com/topic/344/spark-streaming使用kafka保证数据零丢失 spark streaming从1.2开始提供了数据的零丢失,想享受这个特性,需要满足如下条件: 数据输入需要可靠的sources和可靠的receivers 应用metadata必须通过应用driver checkpoint WAL(write ahead log) 可靠的sources和receivers spark streaming可以通过

kafka文件存储结构和如何保证数据不丢失

一: kafka文件组成 基本组成: 1- broker 节点,多个broker构成一个集群 2- topic 对消息进行归类 3- producer 生产者 4- comsumer 消费者 5- consumerGroup 消费组 topic的组成: 1- partition  物理上数据存储的概念,一个topic包含多个partition,每个partition内部是有序的:每个partition是一个目录: 2- segment  一个partition包含多个segment,包含两种文件

mysql是怎么保证数据不丢失的

一:binlog写入机制. 先write ,把日志写入文件系统的的page cache ,然后fsync 将数据持久化到磁盘的操作. binlog是每个线程一个binlogCache,binlogCache中包含tmpfile和memery. 二:redo log 写入机制 首先日志写道redologbuffer中,然后写入pagecache ,最后写入磁盘. 有个后台线程每一秒钟轮询redobuffer写入磁盘,一个没有提交事务的redolog也可以持久化到磁盘. 1,redolog buff

Kafka在高并发的情况下,如何避免消息丢失和消息重复?kafka消费怎么保证数据消费一次?数据的一致性和统一性?数据的完整性?

1.kafka在高并发的情况下,如何避免消息丢失和消息重复? 消息丢失解决方案: 首先对kafka进行限速, 其次启用重试机制,重试间隔时间设置长一些,最后Kafka设置acks=all,即需要相应的所有处于ISR的分区都确认收到该消息后,才算发送成功 消息重复解决方案: 消息可以使用唯一id标识 生产者(ack=all 代表至少成功发送一次) 消费者 (offset手动提交,业务逻辑成功处理后,提交offset) 落表(主键或者唯一索引的方式,避免重复数据) 业务逻辑处理(选择唯一主键存储到R

160728、Spark Streaming kafka 实现数据零丢失的几种方式

定义 问题开始之前先解释下流处理中的一些概念: At most once - 每条数据最多被处理一次(0次或1次) At least once - 每条数据最少被处理一次 (1次或更多) Exactly once - 每条数据只会被处理一次(没有数据会丢失,并且没有数据会被多次处理) High Level API   如果不做容错,将会带来数据丢失因为receiver一直在接收数据,在其没有处理的时候(已通知zk数据接收到),executor突然挂掉(或是driver挂掉通知executor关闭

Kafka消息保证不丢失和重复消费问题

使用同步模式的时候,有3种状态保证消息被安全生产,在配置为1(只保证写入leader成功)的话,如果刚好leader partition挂了,数据就会丢失.还有一种情况可能会丢失消息,就是使用异步模式的时候,当缓冲区满了,如果配置为0(还没有收到确认的情况下,缓冲池一满,就清空缓冲池里的消息),数据就会被立即丢弃掉. 在数据生产时避免数据丢失的方法: 只要能避免上述两种情况,那么就可以保证消息不会被丢失.就是说在同步模式的时候,确认机制设置为-1,也就是让消息写入leader和所有的副本.还有,

Spark Streaming的容错和数据无丢失机制

实时的流式处理系统必须是7*24运行的,同时可以从各种各样的系统错误中恢复,在设计之处,Spark Streaing就支持driver和worker节点的错误恢复.然后,在使用某些数据源的时候,错误恢复时输入数据可能会丢失.在spark 1.2中,加入write ahead logs(日志)这个初步方案用来改进恢复机制,保证数据的无丢失. 背景 spark和rdd的设计保证了集群中worker节点的容错性.spark streaming构建在spark之上,所以它的worker节点也是同样的容错

关于MQ的几件小事(四)如何保证消息不丢失

1.mq原则 数据不能多,也不能少,不能多是说消息不能重复消费,这个我们上一节已解决:不能少,就是说不能丢失数据.如果mq传递的是非常核心的消息,支撑核心的业务,那么这种场景是一定不能丢失数据的. 2.丢失数据场景 丢数据一般分为两种,一种是mq把消息丢了,一种就是消费时将消息丢了.下面从rabbitmq和kafka分别说一下,丢失数据的场景, (1)rabbitmq A:生产者弄丢了数据 生产者将数据发送到rabbitmq的时候,可能在传输过程中因为网络等问题而将数据弄丢了. B:rabbit