图解 Kafka 水印备份机制

高可用是很多分布式系统中必备的特征之一,Kafka 日志的高可用是通过基于 leader-follower 的多副本同步实现的,每个分区下有多个副本,其中只有一个是 leader 副本,提供发送和消费消息,其余都是 follower 副本,不断地发送 fetch 请求给 leader 副本以同步消息,如果 leader 在整个集群运行过程中不发生故障,follower 副本不会起到任何作用,问题就在于任何系统都不能保证其稳定运行,当 leader 副本所在的 broker 崩溃之后,其中一个 follower 副本就会成为该分区下新的 leader 副本,那么问题来了,在选为新的 leader 副本时,会导致消息丢失或者离散吗?Kafka 是如何解决 leader 副本变更时消息不会出错?以及 leader 与 follower 副本之间的数据同步是如何进行的?带着这几个问题,我们接着往下看,一起揭开 Kafka 水印备份的神秘面纱。

水印相关概念

在讲解水印备份之前,我们必须要先搞清楚几个关键的术语以及它们的含义,下面我用一张图来示意 Kafka 分区副本的位移信息:

如上图所示,绿色部分表示已完全备份的消息,对消费者可见,紫色部分表示未完全备份的消息,对消费者不可见。

LEO(last end offset):日志末端位移,记录了该副本对象底层日志文件中下一条消息的位移值,副本写入消息的时候,会自动更新 LEO 值。

HW(high watermark):从名字可以知道,该值叫高水印值,HW 一定不会大于 LEO 值,小于 HW 值的消息被认为是“已提交”或“已备份”的消息,并对消费者可见。

leader 会保存两个类型的 LEO 值,一个是自己的 LEO,另一个是 remote LEO 值,remote LEO 值就是 follower 副本的 LEO 值,意味着 follower 副本的 LEO 值会保存两份,一份保存到 leader 副本中,一份保存到自己这里。

remote LEO 值有什么用呢?

它是决定 HW 值大小的关键,当 HW 要更新时,就会对比 LEO 值(也包括 leader LEO),取最小的那个做最新的 HW 值。

以下介绍 LEO 和 HW 值的更新机制:

LEO 更新机制:

  1. leader 副本自身的 LEO 值更新:在 Producer 消息发送过来时,即 leader 副本当前最新存储的消息位移位置 +1;
  2. follower 副本自身的 LEO 值更新:从 leader 副本中 fetch 到消息并写到本地日志文件时,即 follower 副本当前同步 leader 副本最新的消息位移位置 +1;
  3. leader 副本中的 remote LEO 值更新:每次 follower 副本发送 fetch 请求都会包含 follower 当前 LEO 值,leader 拿到该值就会尝试更新 remote LEO 值。

leader HW 更新机制:

leader HW 更新分为故障时更新与正常时更新:

故障时更新:

  1. 副本被选为 leader 副本时:当某个 follower 副本被选为分区的 leader 副本时,kafka 就会尝试更新 HW 值;
  2. 副本被踢出 ISR 时:如果某个副本追不上 leader 副本进度,或者所在 broker 崩溃了,导致被踢出 ISR,leader 也会检查 HW 值是否需要更新,毕竟 HW 值更新只跟处于 ISR 的副本 LEO 有关系。

正常时更新:

  1. producer 向 leader 副本写入消息时:在消息写入时会更新 leader LEO 值,因此需要再检查是否需要更新 HW 值;
  2. leader 处理 follower FETCH 请求时:follower 的 fetch 请求会携带 LEO 值,leader 会根据这个值更新对应的 remote LEO 值,同时也需要检查是否需要更新 HW 值。

follower HW 更新机制:

  1. follower 更新 HW 发生在其更新 LEO 之后,每次 follower Fetch 响应体都会包含 leader 的 HW 值,然后比较当前 LEO 值,取最小的作为新的 HW 值。

图解水印备份过程

在了解了 Kafka 水印备份机制的相关概念之后,下面我用图来帮大家更好地理解 Kafka 的水印备份过程,假设某个分区有两个副本,min.insync.replica=1:

Step 1:leader 和 follower 副本处于初始化值,follower 副本发送 fetch 请求,由于 leader 副本没有数据,因此不会进行同步操作;

Step 2:生产者发送了消息 m1 到分区 leader 副本,写入该条消息后 leader 更新 LEO = 1;

Step 3:follower 发送 fetch 请求,携带当前最新的 offset = 0,leader 处理 fetch 请求时,更新 remote LEO = 0,对比 LEO 值最小为 0,所以 HW = 0,leader 副本响应消息数据及 leader HW = 0 给 follower,follower 写入消息后,更新 LEO 值,同时对比 leader HW 值,取最小的作为新的 HW 值,此时 follower HW = 0,这也意味着,follower HW 是不会超过 leader HW 值的。

Step 4:follower 发送第二轮 fetch 请求,携带当前最新的 offset = 1,leader 处理 fetch 请求时,更新 remote LEO = 1,对比 LEO 值最小为 1,所以 HW = 1,此时 leader 没有新的消息数据,所以直接返回 leader HW = 1 给 follower,follower 对比当前最新的 LEO 值 与 leader HW 值,取最小的作为新的 HW 值,此时 follower HW = 1。

基于水印备份机制的一些缺陷

  public class LogHandler implements InvocationHandler {
  
  private Object targetObj;
  
  public Object newProxyObject(Object targetObj){
  
  this.targetObj = targetObj;
  
  return Proxy.newProxyInstance(
  
  targetObj.getClass().getClassLoader(), //获取委托类的类加载器
  
  targetObj.getClass().getInterfaces(), //获取委托类实现的所有接口
  
  public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
  
  Object ret;
  
  try {
  
  System.out.println("before method");
  
  ret = method.invoke(targetObj, args);
  
  System.out.println("after method");
  
  } catch (IllegalAccessException e) {
  
  e.printStackTrace();
  
  System.out.println("error");
  
  public class TestProxy {
  
  public static void main(String[] args) {
  
  LogHandler logHandler = new LogHandler();
  
  UserManager o = (UserManager)logHandler.newProxyObject(new UserManagerImpl());
  
  o.getName(www.shentuylzc.cn"ls");
  
  运行结果如下:
  
  before method
  
  UserManagerImpl.getName:ls
  
  after method
  
  before method
  
  UserManagerImpl. www.jiuhuaylgw.cn getId:2
  
  after method
  
  */
  
  JDK动态代理其实是在运行时动态生成了一个代理类去实现接口,只是隐藏了这个过程,我们不知道而已。
  
  class $JDKProxy implements UserManager{www.huizhonggjpt.cn}
  
  需要注意的是,实现JDK动态代理的一个前提就是,需要定义一个接口,然后委托类去实现这个接口。那如果我不想定义接口,只定义一个委托类能不能实现呢?这就需要用到cglib代理了。(因为cglib是通过继承方式)
  
  三、cglib动态代理
  
  需要定义一个类实现MethodInterceptor接口(注意,这个类可不是代理类,也不是委托类哦)。
  
  //委托类,不需要实现接口
  
  public class CgTarget {
  
  public class CglibProxy www.qiaoheibpt.com implements MethodInterceptor {
  
  private Object target;
  
  @Override
  
  public Object intercept(Object www.baihua178.cn Method method, Object[] objects, MethodProxy methodProxy) throws Throwable {
  
  System.out.println(www.feishenbo.cn"cglib开始");
  
  Object invoke = method.invoke(target, objects);
  
  System.out.println("cglib结束");
  
  return invoke;

从以上步骤可看出,leader 中保存的 remote LEO 值的更新总是需要额外一轮 fetch RPC 请求才能完成,这意味着在 leader 切换过程中,会存在数据丢失以及数据不一致的问题,下面我用图来说明存在的问题:

  • 数据丢失

前面也说过,leader 中的 HW 值是在 follower 下一轮 fetch RPC 请求中完成更新的,如上图所示,有副本 A 和 B,其中 B 为 leader 副本,A 为 follower 副本,在 A 进行第二段 fetch 请求,并接收到响应之后,此时 B 已经将 HW 更新为 2,如果这是 A 还没处理完响应就崩溃了,即 follower 没有及时更新 HW 值,A 重启时,会自动将 LEO 值调整到之前的 HW 值,即会进行日志截断,接着会向 B 发送 fetch 请求,但很不幸的是此时 B 也发生宕机了,Kafka 会将 A 选举为新的分区 Leader。当 B 重启后,会从 向 A 发送 fetch 请求,收到 fetch 响应后,拿到 HW 值,并更新本地 HW 值,此时 HW 被调整为 1(之前是 2),这时 B 会做日志截断,因此,offsets = 1 的消息被永久地删除了。

可能你会问,follower 副本为什么要进行日志截断?

这是由于消息会先记录到 leader,follower 再从 leader 中拉取消息进行同步,这就导致 leader LEO 会比 follower 的要大(ollower之间的offset也不尽相同,虽然最终会一致,但过程中会有差异),假设此时出现 leader 切换,有可能选举了一个 LEO 较小的 follower 成为新的 leader,这时该副本的 LEO 就会成为新的标准,这就会导致 follower LEO 值有可能会比 leader LEO 值要大的情况,因此 follower 在进行同步之前,需要从 leader 获取 LastOffset 的值(该值后面会有解释),如果 LastOffset 小于 当前 LEO,则需要进行日志截断,然后再从 leader 拉取数据实现同步。

可能你还会问,日志截断会不会造成数据丢失?

前面也说过,HW 值以上的消息是没有“已提交”或“已备份”的,因此消息也是对消费者不可见,即这些消息不对用户作承诺,也即是说从 HW 值截断日志,并不会导致数据丢失(承诺用户范围内)。

  • 数据不一致/离散

以上情况,需要满足以下其中一个条件才会发生:

  1. 宕机之前,B 已不在 ISR 列表中,unclean.leader.election.enable=true,即允许非 ISR 中副本成为 leader;
  2. B 消息写入到 pagecache,但尚未 flush 到磁盘。

分区有两个副本,其中 A 为 Leader 副本,B 为 follower 副本,A 已经写入两条消息,且 HW 更新到 2,B 只写了 1条消息,HW 为 1,此时 A 和 B 同时宕机,B 先重启,B 成为了 leader 副本,这时生产者发送了一条消息,保存到 B 中,由于此时分区只有 B,B 在写入消息时把 HW 更新到 2,就在这时候 A 重新启动,发现 leader HW 为 2,跟自己的 HW 一样,因此没有执行日志截断,这就造成了 A 的 offset=1 的日志与 B 的 offset=1 的日志不一样的现象。

leader epoch

为了解决 HW 更新时机是异步延迟的,而 HW 又是决定日志是否备份成功的标志,从而造成数据丢失和数据不一致的现象,Kafka 引入了 leader epoch 机制,在每个副本日志目录下都创建一个 leader-epoch-checkpoint 文件,用于保存 leader 的 epoch 信息,如下,leader epoch 长这样:

它的格式为 (epoch offset),epoch指的是 leader 版本,它是一个单调递增的一个正整数值,每次 leader 变更,epoch 版本都会 +1,offset 是每一代 leader 写入的第一条消息的位移值,比如:

(0, 0)
(1, 300)

以上第二个版本是从位移300开始写入消息,意味着第一个版本写入了 0-299 的消息。

leader epoch 具体的工作机制如下:

1)当副本成为 leader 时:

这时,如果此时生产者有新消息发送过来,会首先新的 leader epoch 以及 LEO 添加到 leader-epoch-checkpoint 文件中。

2)当副本变成 follower 时:

  1. 发送 LeaderEpochRequest 请求给 leader 副本,该请求包括了 follower 中最新的 epoch 版本;
  2. leader 返回给 follower 的相应中包含了一个 LastOffset,如果 follower last epoch = leader last epoch,则 LastOffset = leader LEO,否则取大于 follower last epoch 中最小的 leader epoch 的 start offset 值,举个例子:假设 follower last epoch = 1,此时 leader 有 (1, 20) (2, 80) (3, 120),则 LastOffset = 80;
  3. follower 拿到 LastOffset 之后,会对比当前 LEO 值是否大于 LastOffset,如果当前 LEO 大于 LastOffset,则从 LastOffset 截断日志;
  4. follower 开始发送 fetch 请求给 leader 保持消息同步。

基于 leader epoch 的工作机制,我们接下来看看它是如何解决水印备份缺陷的:

(1)解决数据丢失:

如上图所示,A 重启之后,发送 LeaderEpochRequest 请求给 B,由于 B 还没追加消息,此时 epoch = request epoch = 0,因此返回 LastOffset = leader LEO = 2 给 A,A 拿到 LastOffset 之后,发现等于当前 LEO 值,故不用进行日志截断。就在这时 B 宕机了,A 成为 leader,在 B 启动回来后,会重复 A 的动作,同样不需要进行日志截断,数据没有丢失。

(2)解决数据不一致/离散

如上图所示,A 和 B 同时宕机后,B 先重启回来成为分区 leader,这时候生产者发送了一条消息过来,leader epoch 更新到 1,此时 A 启动回来后,发送 LeaderEpochRequest(follower epoch = 0) 给 B,B 判断 follower epoch 不等于 最新的 epoch,于是找到大于 follower epoch 最小的 epoch = 1,即 LastOffset = epoch start offset = 1,A 拿到 LastOffset 后,判断小于当前 LEO 值,于是从 LastOffset 位置进行日志截断,接着开始发送 fetch 请求给 B 开始同步消息,避免了消息不一致/离散的问题。

原文地址:https://www.cnblogs.com/laobeipai/p/12109950.html

时间: 2024-11-25 17:20:59

图解 Kafka 水印备份机制的相关文章

kafka备份机制——zk选举leader,leader在broker里负责备份

Kafka架构 如上图所示,一个典型的kafka集群中包含若干producer(可以是web前端产生的page view,或者是服务器日志,系统CPU.memory等),若干broker(Kafka支持水平扩展,一般broker数量越多,集群吞吐率越高),若干consumer group,以及一个Zookeeper集群.Kafka通过Zookeeper管理集群配置,选举leader,以及在consumer group发生变化时进行rebalance.producer使用push模式将消息发布到b

图解 kafka 的高可用机制

对于一个复杂的分布式系统,如果没有丰富的经验和牛逼的架构能力,很难把系统做得简单易维护,我们都知道,一个软件的生命周期中,后期维护占了70%,所以系统的可维护性是极其重要的, kafka 能成为大数据领域的事实标准,很大原因是因为运维起来很方便简单,今天我们来看下 kafka 是怎么来简化运维操作的. kafka 使用多副本来保证消息不丢失,多副本就涉及到kafka的复制机制,在一个超大规模的集群中,时不时地这个点磁盘坏了,那个点cpu负载高了,出现各种各样的问题,多个副本之间的复制,如果想完全

HDFS源码分析(二)-----元数据备份机制

前言 在Hadoop中,所有的元数据的保存都是在namenode节点之中,每次重新启动整个集群,Hadoop都需要从这些持久化了的文件中恢复数据到内存中,然后通过镜像和编辑日志文件进行定期的扫描与合并,ok,这些稍微了解Hadoop的人应该都知道,这不就是SecondNameNode干的事情嘛,但是很多人只是了解此机制的表象,内部的一些实现机理估计不是每个人都又去深究过,你能想象在写入编辑日志的过程中,用到了双缓冲区来加大并发量的写吗,你能想象为了避免操作的一致性性,作者在写入的时候做过多重的验

Kafka文件存储机制那些事

点评一下先:kafka的存储主要有几个特点: 1. 多级索引(名义上是1级索引,但是这级索引依赖了文件列表,相当于文件列表是第一级索引,所以是二级索引),二级索引文件和数据文件一一对应. 相比只有1级索引,这样可以支持更大的数据量,也可以更好的支持删除.如果我来设计这个存储系统,我会这样设计: 第一级索引:只有1个文件,文件名固定,保存数据文件名和这个数据文件保存的第1个消息的id.数据文件名如果采用数字依次编号法,那么无需存储数据文件名,这个索引文件只需要在第1个8字节保存数据文件的起始编号,

转】 Kafka文件存储机制那些事

原博文出自于:http://tech.meituan.com/kafka-fs-design-theory.html    感谢! Kafka是什么 Kafka是最初由Linkedin公司开发,是一个分布式.分区的.多副本的.多订阅者,基于zookeeper协调的分布式日志系统(也可以当做MQ系统),常见可以用于web/nginx日志.访问日志,消息服务等等,Linkedin于2010年贡献给了Apache基金会并成为顶级开源项目. 1.前言 一个商业化消息队列的性能好坏,其文件存储机制设计是衡

NVM区数据备份机制

上一篇主要说明NVM区操作注意事项,本文针对上篇提到的NVM区数据备份方法进行补充讲解.NVM区主要特性是写入数据掉电不丢失,可以永久的保存数据,一般用作存放不经常修改的数据,此功能类似FLASH.向NVM区写入数据可分为3步:第一步,将目标扇区内原有数据读出到RAM中:第二步,擦除NVM目标扇区内数据:第三步,将新数据和RAM中的旧数据写入到该扇区中.基于以上写操作的特点可以看出,若执行写NVM区操作的第二步或第三步时芯片断电了,就会造成NVM区内原有数据丢失,而新数据写入失败,表现出NVM区

Android图解浅析事件拦截机制

当Android系统捕获到用户的各种输入事件后,如何准确的传递给真正的需要这个事件的控件?Android提供了一整套完善的事件传递.处理机制,来帮助开发者完成准确的事件分配与处理,这里我就不分析源码了,简单点,图形化分发过程,便于理解. 当我们点击一个按钮时,通常会产生两个或者三个事件---按下.滑动(可能无).抬起.Android为触摸事件封装了一个类----MotionEvent,其中假如我们重写一个view的onTouchEvent事件中的参数就是一个MotionEvent.由于Andro

图解Android - Android核心机制

图解Android - Zygote, System Server 启动分析 图解Android - Binder 和 Service 图解Android - System Service 概论 和 Android GUI 系统 图解Android - Looper, Handler 和 MessageQueue 图解Android - 如何看Android的UML 图?

图解 Android 事件分发机制

首发原文:http://mp.weixin.qq.com/s?__biz=MzI0MjE3OTYwMg==&mid=2649548149&idx=1&sn=709149df682c7d3a6e453c9ef0626a1f&chksm=f1180e08c66f871eb2e7e39e057a5b090214fd71adcd98aa36b3d7fcecf77ad5d08138c50131#rd 在Android开发中,事件分发机制是一块Android比较重要的知识体系,了解并熟