Kafka partition 副本迁移与broker上下线

Kafka partition 副本迁移与broker上下线

1 前言

Controller 在初始化时,会利用 ZK 的 watch 机制注册很多不同类型的监听器,当监听的事件被触发时,Controller 就会触发相应的操作。

Controller 在初始化时,会注册多种类型的监听器,主要有以下几种:

  • l  监听 /admin/reassign_partitions 节点,用于分区副本迁移的监听;
  • l  监听 /isr_change_notification 节点,用于 Partition Isr 变动的监听,;
  • l  监听 /admin/preferred_replica_election 节点,用于需要进行 Partition 最优 leader 选举的监听;
  • l  监听 /brokers/topics 节点,用于 Topic 新建的监听;
  • l  监听 /brokers/topics/TOPIC_NAME 节点,用于 Topic Partition 扩容的监听;
  • l  监听 /admin/delete_topics 节点,用于 Topic 删除的监听;
  • l  监听 /brokers/ids 节点,用于 Broker 上下线的监听。

2 Partition 副本迁移整体流程

Partition 的副本迁移实际上就是将分区的副本重新分配到不同的代理节点上,如果 zk 中新副本的集合与 Partition 原来的副本集合相同,那么这个副本就不需要重新分配了。

Partition 的副本迁移是通过监听 zk 的 /admin/reassign_partitions 节点触发的,Kafka 也向用户提供相应的脚本工具进行副本迁移,副本迁移的脚本使用方法如下所示:

在调用脚本向 zk 提交 Partition 的迁移计划时,迁移计划更新到 zk 前需要进行一步判断,如果该节点(写入迁移计划的节点)已经存在,即副本迁移还在进行,那么本次副本迁移计划是无法提交的,实现的逻辑如下所示:

2.1 ZK PartitionsReassignedListener 副本迁移处理

在 zk 的 /admin/reassign_partitions 节点数据有变化时,就会触发 PartitionsReassignedListener 的 doHandleDataChange() 方法,实现如下:

如果 Partition 出现下面的情况,将不会进行副本迁移,直接将 Partition 的迁移计划从 ZK 移除:

  • l  这个 Partition 的 reassignment 之前已经存在, 即正在迁移中;
  • l  这个 Partition 新分配的 replica 与之前的 replicas 相同;
  • l  这个 Partition 所有新分配 replica 都已经 dead;
  • l  这个 Partition 已经被设置了删除标志。

对于可以进行副本迁移的 Partition 集合,这里将会调用 Kafka Controller 的 initiateReassignReplicasForTopicPartition() 方法对每个 Partition 进行处理。

2.2 副本迁移初始化

进行了前面的判断后,这个 Partition 满足了可以迁移的条件,Controller 会首先初始化副本迁移的流程,实现如下所示

对于副本迁移流程初始化如下:

  • l  通过 watchIsrChangesForReassignedPartition() 方法监控这个 Partition 的 LeaderAndIsr 变化,如果有新的副本数据同步完成,那么 leader 会将其加到 isr 中更新到 zk 中,这时候 Controller 是可以接收到相关的信息通知的;
  • l  将正在迁移的 Partition 添加到 partitionsBeingReassigned 中,它会记录当前正在迁移的 Partition 列表;
  • l  将要迁移的 Topic 设置为非法删除删除状态,在这个状态的 Topic 是无法进行删除的;
  • l  调用 onPartitionReassignment(),进行副本迁移。

在第一步中,会向这个 Partition 注册一个额外的监听器,监听其 LeaderAndIsr 信息变化,如下所示:

如果该 Partition 的 LeaderAndIsr 信息有变动,那么就会触发这个 listener 的 doHandleDataChange() 方法:

  • l  首先检查这个 Partition 是否在还在迁移中,不在的话直接结束流程,因为这个监听器本来就是为了 Partition 副本迁移而服务的;
  • l  从 zk 获取最新的 leader 和 isr 信息,如果新分配的副本全部都在 isr 中,那么就再次触发 controller 的 onPartitionReassignment() 方法,再次调用时实际上已经证明了这个 Partition 的副本迁移已经完成,否则的话就会不进行任何处理,等待新分配的所有副本迁移完成。

2.3 副本迁移

Partition 副本迁移真正实际处理是在 Controller 的 onPartitionReassignment() 方法完成的,在看这个方法之前,先介绍几个基本的概念(假设一个 Partition 原来的 replica 是 {1、2、3},新分配的副本列表是:{2、3、4}):

  • RAR = Reassigned replicas,即新分配的副本列表,也就是 {2、3、4};
  • OAR = Original list of replicas for partition,即这个 Partition 原来的副本列表,也就是 {1、2、3};
  • AR = current assigned replicas,该 Partition 当前的副本列表,这个会随着阶段的不同而变化;
  • RAR-OAR:需要创建、数据同步的新副本,也就是 {4};
  • OAR-RAR:需要删除的副本,也就是{1}

这个方法的实现如下所示:

这个方法整体分为以下12个步骤:

  • l  把 AR = OAR+RAR ({1、2、3、4})更新到 zk 及本地 Controller 缓存中;
  • l  发送 LeaderAndIsr 给 AR 中每一个副本,并且会强制更新 zk 中 leader 的 epoch;
  • l  创建需要新建的副本(【RAR-OAR】,即 {4}),将其状态设置为 NewReplica;
  • l  等待直到 RAR({2、3、4}) 中的所有副本都在 ISR 中;
  • l  把 RAR({2、3、4}) 中的所有副本设置为 OnReplica 状态;
  • l  将缓存中 AR 更新为 RAR(重新分配的副本列表,即 {2、3、4});
  • l  如果 leader 不在 RAR 中, 就从 RAR 选择对应的 leader, 然后发送 LeaderAndIsr 请求;如果不需要,那么只会更新 leader epoch,然后发送 LeaderAndIsr 请求; 在发送 LeaderAndIsr 请求前设置了 AR=RAR, 这将确保了 leader 在 isr 中不会添加任何 【RAR-OAR】中的副本(old replica,即 {1});
  • l  将【OAR-RAR】({1})中的副本设置为 OfflineReplica 状态,OfflineReplica 状态的变化,将会从 ISR 中删除【OAR-RAR】的副本,更新到 zk 中并发送 LeaderAndIsr 请求给 leader,通知 leader isr 变动。之后再发送 StopReplica 请求(delete=false)给【OAR-RAR】中的副本;
  • l  将【OAR-RAR】中的副本设置为 NonExistentReplica 状态。这将发送 StopReplica 请求(delete=true)给【OAR-RAR】中的副本,这些副本将会从本地上删除数据;
  • l  在 zk 中更新 AR 为 RAR;
  • l  更新 zk 中路径 【/admin/reassign_partitions】信息,移除已经成功迁移的 Partition;
  • l  leader 选举之后,这个 replica 和 isr 信息将会变动,发送 metadata 更新给所有的 broker。

上面的流程简单来说,就是先创建新的 replica,开始同步数据,等待所有新的分配都加入到了 isr 中后,开始进行 leader 选举(需要的情况下),下线不需要的副本(OAR-RAR),下线完成后将 Partition 的最新 AR (即 RAR)信息更新到 zk 中,最后发送相应的请求给 broker,到这里一个 Partition 的副本迁移算是完成了。

3 Broker上下线

每台 Broker 在上线时,都会与 ZK 建立一个建立一个 session,并在 /brokers/ids 下注册一个节点,节点名字就是 broker id,这个节点是临时节点,该节点内部会有这个 Broker 的详细节点信息。Controller 会监听 /brokers/ids 这个路径下的所有子节点,如果有新的节点出现,那么就代表有新的 Broker 上线,如果有节点消失,就代表有 broker 下线,Controller 会进行相应的处理,Kafka 就是利用 ZK 的这种 watch 机制及临时节点的特性来完成集群 Broker 的上下线。

3.1 ZK 回调处理

BrokerChangeListener 是监听 /brokers/ids 节点的监听器,当该节点有变化时会触发 doHandleChildChange() 方法,具体实现如下:

这里需要重点关注 doHandleChildChange() 方法的实现,该方法处理逻辑如下:

  • l  从 ZK 获取当前的 Broker 列表(curBrokers)及 broker id 的列表(curBrokerIds);
  • l  获取当前 Controller 中缓存的 broker id 列表(liveOrShuttingDownBrokerIds);
  • l  获取新上线 broker id 列表:newBrokerIds = curBrokerIds – liveOrShuttingDownBrokerIds;
  • l  获取掉线的 broker id 列表:deadBrokerIds = liveOrShuttingDownBrokerIds – curBrokerIds;
  • l  对于新上线的 broker,先在 ControllerChannelManager 中添加该 broker(即建立与该 Broker 的连接、初始化相应的发送线程和请求队列),最后 Controller 调用 onBrokerStartup() 上线该 Broker;
  • l  对于掉线的 broker,先在 ControllerChannelManager 中移除该 broker(即关闭与 Broker 的连接、关闭相应的发送线程和清空请求队列),最后 Controller 调用 onBrokerFailure() 下线该 Broker。

3.2 broker上线

一台 Broker 上线主要有以下两步:

  • l  在 Controller Channel Manager 中添加该 Broker 节点,主要的内容是:Controller 建立与该 Broker 的连接、初始化相应的请求发送线程与请求队列;
  • l  调用 Controller 的 onBrokerStartup() 方法上线该节点。

ontroller Channel Manager 添加 Broker 的实现如下,这里就不重复讲述了,前面讲述 Controller 服务初始化的文章已经讲述过这部分的内容。下面再看下 Controller 如何在 onBrokerStartup() 方法中实现 Broker 上线操作的,具体实现如下所示:

onBrokerStartup() 方法在实现的逻辑上分为以下几步:

  • l  调用 sendUpdateMetadataRequest() 方法向当前集群所有存活的 Broker 发送 Update Metadata 请求,这样的话其他的节点就会知道当前的 Broker 已经上线了;
  • l  获取当前节点分配的所有的 Replica 列表,并将其状态转移为 OnlineReplica 状态;
  • l  触发 PartitionStateMachine 的 triggerOnlinePartitionStateChange() 方法,为所有处于 NewPartition/OfflinePartition 状态的 Partition 进行 leader 选举,如果 leader 选举成功,那么该 Partition 的状态就会转移到 OnlinePartition 状态,否则状态转移失败;
  • l  如果副本迁移中有新的 Replica 落在这台新上线的节点上,那么开始执行副本迁移操作;
  • l  如果之前由于这个 Topic 设置为删除标志,但是由于其中有 Replica 掉线而导致无法删除,这里在节点启动后,尝试重新执行删除操作。

到此为止,一台 Broker 算是真正加入到了 Kafka 的集群中,在上述过程中,涉及到 leader 选举的操作,都会触发 LeaderAndIsr 请求及 Metadata 请求的发送。

3.3 broker下线

一台 Broker 掉线后主要有以下两步:

l  首先在 Controller Channel Manager 中移除该 Broker 节点,主要的内容是:关闭 Controller 与 Broker 的连接和相应的请求发送线程,并清空请求队列;

l  调用 Controller 的 onBrokerFailure() 方法下线该节点。

Controller Channel Manager 下线 Broker 的处理如下所示:

在 Controller Channel Manager 处理完掉线的 Broker 节点后,下面 KafkaController 将会调用 onBrokerFailure() 进行相应的处理,其实现如下:

Controller 对于掉线 Broker 的处理过程主要有以下几步:

  • l  首先找到 Leader 在该 Broker 上所有 Partition 列表,然后将这些 Partition 的状态全部转移为 OfflinePartition 状态;
  • l  触发 PartitionStateMachine 的 triggerOnlinePartitionStateChange() 方法,为所有处于 NewPartition/OfflinePartition 状态的 Partition 进行 Leader 选举,如果 Leader 选举成功,那么该 Partition 的状态就会迁移到 OnlinePartition 状态,否则状态转移失败(Broker 上线/掉线、Controller 初始化时都会触发这个方法);
  • l  获取在该 Broker 上的所有 Replica 列表,将其状态转移成 OfflineReplica 状态;
  • l  过滤出设置为删除、并且有副本在该节点上的 Topic 列表,先将该 Replica 的转移成 ReplicaDeletionIneligible 状态,然后再将该 Topic 标记为非法删除,即因为有 Replica 掉线导致该 Topic 无法删除;
  • l  如果 leader 在该 Broker 上所有 Partition 列表不为空,证明有 Partition 的 leader 需要选举,在最后一步会触发全局 metadata 信息的更新。

到这里,一台掉线的 Broker 算是真正下线完成了。

3.4 主动关闭broker

Controller 在接收这个关闭服务的请求,通过 shutdownBroker() 方法进行处理,实现如下所示:

上述方法的处理逻辑如下:

  • l  先将要下线的 Broker 添加到 shuttingDownBrokerIds 集合中,该集合记录了当前正在进行关闭的 broker 列表;
  • l  获取副本在该节点上的所有 Partition 的列表集合;
  • l  遍历上述 Partition 列表进行处理:如果该 Partition 的 leader 是要下线的节点,那么通过 PartitionStateMachine 进行状态转移(OnlinePartition –> OnlinePartition)触发 leader 选举,使用的 leader 选举方法是 ControlledShutdownLeaderSelector,它会选举 isr 中第一个没有正在关闭的 Replica 作为 leader,否则抛出 StateChangeFailedException 异常;
  • l  否则的话,即要下线的节点不是 leader,那么就向要下线的节点发送 StopReplica 请求停止副本同步,并将该副本设置为 OfflineReplica 状态,这里对 Replica 进行处理的原因是为了让要下线的机器关闭副本同步流程,这样 Kafka 服务才能正常关闭。

参考资料:

http://matt33.com/2018/06/16/partition-reassignment/

http://matt33.com/2018/06/17/broker-online-offline/

原文地址:https://www.cnblogs.com/zhy-heaven/p/10994229.html

时间: 2024-08-14 10:00:49

Kafka partition 副本迁移与broker上下线的相关文章

[bigdata] kafka基本命令 -- 迁移topic partition到指定的broker

版本 0.9.2 创建topic bin/kafka-topics.sh --create --topic topic_name --partition 6 --replication-factor 1 -zookeeper 10.27.100.207:2181,10.27.100.144:2181,10.27.100.145:2181 开启console consumer查看消息 bin/kafka-console-consumer.sh --topic rt_live_pcweb -zook

kafka消息存储与partition副本原理

消息的存储原理: 消息的文件存储机制: 前面我们知道了一个 topic 的多个 partition 在物理磁盘上的保存路径,那么我们再来分析日志的存储方式.通过 ll /tmp/kafka-logs/testTopic-0/ 命令找到对应 partition 下的日志内容: kafka 是通过分段的方式将 Log 分为多个 LogSegment,LogSegment 是一个逻辑上的概念,一个 LogSegment 对应磁盘上的一个日志文件和一个索引文件,其中日志文件是用来记录消息的.索引文件是用

阿里云消息队列Kafka商业化:支持消息无缝迁移到云上

列Kafka彻底解决了开源产品稳定性不足的痛点,可用性达99.9%,数据可靠性99.999999%,并且支持消息无缝迁移到云上. 7月25日,阿里云宣布正式推出消息队列Kafka,全面融合开源生态.在兼容Apache生态的基础上,阿里云消息队列Kafka彻底解决了开源产品稳定性不足的痛点,可用性达99.9%,数据可靠性99.999999%,并且支持消息无缝迁移到云上. Kafka是一个分布式.高吞吐量.高可扩展性的消息队列服务,广泛用于日志收集.监控数据聚合.流式数据处理.在线和离线分析等大数据

5.zookeeper应用案例之分布式服务器动态上下线感知

zookeeper应用案例之分布式服务器动态上下线感知,当服务器上线和下线时候客户端都能感知到,还有哪些机器在线.并对zookeeper管理的服务器进行节点的监听; 代码实现:客户端 每当服务端有服务器上线或下线 在客户端都能通过监听感知到 package org.zookeeper.anli; import java.util.ArrayList; import java.util.List; import org.apache.zookeeper.WatchedEvent; import o

如何利用mono把.net windows service程序迁移到linux上

How to migrate a .NET Windows Service application to Linux using mono? 写在最前:之所以用要把windows程序迁移到Linux上,主要是由于一些成本问题,这个就不多解释了. 如何把之前用.net写的windows服务程序迁移到linux上运行.答案是有很多种的,今天我只提一下mono(我只实验了mono,呵呵). 如何在Linux部署mono,并成功的运行.net程序,还请大家多多查询吧,我在这方面也只是搭建成功了,遇到的问

xmpp 消息和好友上下线(3)

原始地址:XMPPFrameWork IOS 开发(四) 消息 //收到消息 - (void)xmppStream:(XMPPStream *)sender didReceiveMessage:(XMPPMessage *)message{ // NSLog(@"message = %@", message); //消息的内容 NSString *msg = [[message elementForName:@"body"] stringValue]; //消息发送

聊天软件中的窗口上滑和下滑提示上下线

聊天软件中右下角窗口上滑提示有好友上线,窗口下滑提示有好友下线. 在 Qt 下实现此功能,用到的类有 QPoint  QTimer mainwindow.h 1 #ifndef MAINWINDOW_H 2 #define MAINWINDOW_H 3 4 #include <QMainWindow> 5 #include <QPoint> 6 #include <QTimer> 7 8 namespace Ui { 9 class MainWindow; 10 } 1

基于Nginx dyups模块的站点动态上下线

简介 今天主要讨论一下,对于分布式服务,站点如何平滑的上下线问题. 分布式服务 在分布式服务下,我们会用nginx做负载均衡, 业务站点访问某服务站点的时候, 统一走nginx, 然后nginx根据一定的轮询策略,将请求路由到后端一台指定的服务器上. 这样的架构是没有问题的, 但是我们这里考虑几个问题, 1. 网站上下线问题:我们网站平时更新站点的时候是直接覆盖文件,然后重启, 那这样会造成一些请求中断,如果是非核心逻辑那还好, 如果是核心逻辑,那请求中断,会影响一些数据一致性,比如资金, 交易

呕心沥血的java复杂项目(包括自定义应用层协议、CS多线程、多客户端登录、上下线提醒等等)

建议大家先下源代码,导入到Eclipse,然后运行服务器和多个客户端,这样有个不错的体会.下载地址:http://download.csdn.net/detail/woshiwanghao_hi/7320927. 首先来看下整个系统的文件架构图: 系统是个基于UDP的聊天室,因为不能保持所有用户和聊天室的持续连接.同时为了保持数据传输的可靠性,就需要自定义应用层协议了. 程序大概的一个流程如下: 1.启动服务器,点击"start service",之后服务器及开始监听指定端口. 2.启