【原创】kafka controller源代码分析(二)

四、TopicDeletionManager.scala

管理topic删除的状态机,具体逻辑如下:

  • TopicCommand发送topic删除命令,在zk的/admin/delete_topics目录下创建topic节点
  • controller会监听该zk目录下任何节点的变更并为对应的topic开启删除操作
  • controller开启一个后台线程处理topic的删除。使用该线程主要为了以后能够增加TTL(time to live)的特性。无论何时开启或重启topic删除操作时都会通知该线程。当前,topic删除操作只能由onPartitionDeletion回调方法来开启。在后续版本中,Kafka会考虑基于在topic上配置TTL来触发。下列情况下topic并不能被删除:
    • 保存待删除topic副本(哪怕只有一个)的broker宕机
    • 正在重新分配topic的分区副本
    • 正在进行topic的preferred分区副本选举——所谓的preferred副本,比如一个分区的副本列表是[1, 5, 9],那么节点1就是preferred副本,因为它在副本列表中最早出现
  • 以下情况下恢复topic删除操作:
    • 保存topic副本的broker重新启动起来
    • topic的preferred副本选举完成
    • topic的分区副本重分配完成
  • 每个正在被删除的副本都处于以下三种状态之一:
    • TopicDeletionStarted —— 调用onPartitionDeletion回调方法之后副本进入到删除topic阶段,通常都是由controller上监听zk中/admin/delete_topics的监听器触发。作为状态转换的一部分,controller会发送StopReplicaRequests请求给所有的副本。如果deletePartition设置为true,controller会为StopReplicaResponse注册一个回调方法并在接收删除副本的响应时调用它
    • TopicDeletionSuccessful —— 若响应中无错误,deleteTopicStopReplicaCallback会变更副本状态从TopicDeletionStarted到TopicDeletionSuccessful
    • TopicDeletionFailed —— 若响应中有错误,deleteTopicStopReplicaCallback会变更副本状态从TopicDeletionStarted到TopicDeletionFailed。如果一个broker宕机了但其上保存了正在被删除的topic的副本,kafka会标记相应的副本状态为TopicDeletionFailed。这样做的原因是——如果一个broker在停止副本请求发出之前但在副本进入到TopicDeletionStarted之后宕机,那么就有可能出现副本被错误地定格在TopicDeletionStarted状态,这样broker重新启动之后也不会重试topic删除操作了。
  • 只有当所有的副本都处于TopicDeletionSuccessful状态后,删除topic的线程会标记topic为"已成功删除"。之后线程会开启topic删除销毁模式,即从controller的上下文信息以及zookeeper中删除所有的topic状态。这是zk中路径/brokers/topics/[topic]被删除的唯一条件。但如果所有副本虽已不是TopicDeletionStarted状态但至少有一个是TopicDeletionFailed状态,那么线程就会标记topic需要进行重试删除操作。

具体到代码而言,这个scala只是简单地定义了一个类: TopicDeletionManager,其构造函数接收三个参数:

1. controller:表示一个controller对象

2. initialTopicsToBeDeleted:待删除的topic集合

3. initialTopicsIneligibleForDeletion:目前不能被删除的topic集合

主要的类成员字段如下:

controllerContext:controller上下文信息,主要缓存了很多broker、topic、分区及副本信息

partitionStateMachine:分区状态机实例

replicaStateMachine:副本状态机实例

topicsToBeDeleted:动态维护的可变待删除topic集合

partitionsToBeDeleted:动态维护的可变待删除分区集合

topicsIneligibleForDeletion:动态维护的可变不可删除的topic集合

deleteTopicsStateChanged:标记topic删除操作是否开始

deleteTopicsThread:删除topic的线程实例

isDeleteTopicEnabled:由属性delete.topic.enable指定是否支持删除topic,如果关闭该项,执行删除topic的命令无任何效果

在开始介绍类方法之前我们先看一下在这个类内部定义的嵌套类: DeleteTopicsThread——顾名思义,就是负责删除topic的专有线程。该类继承了ShutdownableThread,主要的逻辑如下:

  • 等待触发、重启或终止topic删除的事件到来
  • 如果线程当前没有运行,直接退出
  • 对于每一个要被删除的topic来说,如果所有副本都已被成功删除(都处于ReplicaDeletionSuccessful),那么直接调用completeDeleteTopic方法完成topic删除,主要是清理zookeeper以及controller缓存中的topic信息
  • 如果还有副本没有被删除,忽略此次操作,只是简单地打印那些未被删除的副本
  • 如果尚有副本在删除过程中失败了,那么就调用markTopicForDeletionRetry方法重试副本删除
  • 处理完所有副本之后需要判断一下topic当前是否可以被删除,如果是,直接调用onTopicDeletion方法开启删除操作

主要的类方法定义如下:

start:新controller初始化后调用该方法开启topic删除管理器。具体逻辑就是:首先判断是否开启了delete.topic.enable,否的话一切免谈!后面的很多操作也是基于开启它的前提展开的。之后创建删除topic线程并启动线程

resumeTopicDeletionThread:唤醒删除topic线程处理topic删除

shutdown:关闭topic删除线程,然后清空所有相关缓存(包括待删除topic集合、待删除分区集合以及不能被删除的topic集合)

enqueueTopicsForDeletion:将要删除的topic入队到topicsToBeDeleted集合中,在删除成功完成之后才会被移除

resumeDeletionForTopics:一旦有恢复topic删除操作的事件到来就会调用该方法。这些事件包括:1. 新broker启动,然后待删除topic队列中topic的任意副本都可以被删除;2. 分区重分配完成;3. preferred副本选举完成。具体做法就是找出给定topic集合与待删除topic集合的交集,如果这个交集不为空,将它们从不可删除topic集合中删除并调用resumeTopicDeletionThread方法唤醒线程开始处理topic删除

markTopicIneligibleForDeletion:以下条件发生时会终止topic的删除:1. 副本挂掉了;2. topic的某些分区正在进行重新分配;3. topic的某些分区正在进行preferred副本选举。该方法会将一些topic标记为不可删除(具体做法就是更新不可删除topic集合),如果给定的topic不满足条件则什么都不做

isTopicIneligibleForDeletion:判断给定的topic是否处于不可删除状态

isTopicDeletionInProgress:判断给定的topic是否处于正在被删除的状态,具体方法就是判断这个topic是否有尚未被删除的副本

isPartitionToBeDeleted:判断是否要删除给定的topic分区——具体做法就是判断是否在待删除分区集合中

isTopicQueuedUpForDeletion:判断是否可以删除给定的topic——具体做法就是判断其是否在待删除topic集合中

awaitTopicDeletionNotification:删除topic线程会调用该方法等待触发、重启或终止topic删除的事件发生。——具体做法就是只要线程在运行并且删除topic状态没有发生变更就一直等待

completeReplicaDeletion:停止副本请求的响应中如果没有错误,那么其回调方法会调用该方法。该方法会将那些被删除的副本置于ReplicaDeletionSuccessful状态并唤醒topic删除线程删除topic——当然前提是topic的所有副本都已被成功删除。具体逻辑就是从给定的副本集合中挑出那些已经做好删除准备的副本来,然后设置它们的状态,最后唤醒删除线程来执行删除操作

isTopicEligibleForDeletion:当出现以下状态时允许重试topic删除:1. topic删除未完成;2. topic删除当前未开始;3. topic当前被标记为不可删除

markTopicForDeletionRetry:如果topic在待删除队列中但尚未开始,那么就会重试删除操作。为了确保重试成功,设置相应的副本状态从ReplicaDeletionIneligible到OfflineReplica状态

completeDeleteTopic:完成具体的删除topic操作。逻辑如下:

  • 取消给定的topic之上的分区变更监听器以防止被删除的topic自动重建出来
  • 找出那些已被成功删除的副本集合
  • controller会将它们从状态机中的缓存以及分区副本分配缓存中移除
  • 从controller处获取到待删除topic的所有分区,并依次设置这些分区状态为OfflinePartition->NonExistentPartition
  • 将给定的topic从待删除topic集合中移除,同时更新待删除分区集合
  • 删除zookeeper上的topic相关的节点路径,包括/brokers/topics/[topic],/config/topics/[topic]以及/admin/delete_topics/[topic]
  • 最后,把topic信息从controller缓存中移除

failReplicaDeletion:如果保存有待删除topic副本的broker宕机或停止副本请求的响应中有错误的时候会调用该方法。该方法会将给定的replica列表中的所有副本设置为ReplicaDeletionIneligible状态,所属的topic也会加入到待删除topic集合中。如果topic所有副本都成功地响应了删除副本的请求并给出响应,那么就唤醒topic删除线程来重试topic删除

onTopicDeletion:topic删除线程会调用该方法并传递一组待删除的topic。该方法调用onPartitionDeletion回调方法来处理所有待删除的分区。具体逻辑如下:

  • 获取待删除topic集合的所有分区
  • controller发送更新元数据请求将这些分区的leader信息发给broker以便让broker不要再处理与待删除topic相关的数据
  • 按照topic将分区副本分配记录分组,然后为每个分区调用onPartitionDeletion来删除分区

startReplicaDeletion:onPartitionDeletion回调方法会调用该方法。它是删除topic的第二步,第一步是给所有broker发送UpdateMetadata请求让它们停止为那些topic提供服务。该方法会将待删除topic放入一个处理列表中。只要该列表不为空,就不会尝试重试列表中的topic。当以下情况发生时会从该列表中移除topic:

  • topic被成功删除
  • topic存在ReplicaDeletionIneligible状态的
  • 如果topic已入队列但尚未在进行中会重试topic删除,之后topic的所有副本都被置于ReplicaDeletionStarted状态。随后controller发送停止副本(并删除)请求。

具体逻辑如下:

  • 先按照topic将所有副本进行分组
  • 为每个topic找出当前可用的副本集合以及"已死"的副本集合
  • 从副本状态集合中找出topic的所有已成功删除的副本,根据刚才计算出来的可用副本找出需要重试删除的副本集合
  • 将"已死"副本状态置于不可删除,将需要重试删除的副本集合置于Offline状态并发送StepReplica请求给所有不是Offline的follower副本使它们停止向leader索取消息
  • 开始发送请求删除副本,如果有删除失败,标记该topic为不可删除

onPartitionDeletion:删除topic的回调方法会调用这个分区级别的删除方法。该方法首先会发送UpdateMetadata请求给所有当前活跃的broker让它们不要在处理相关的客户请求——抛出UnknownTopicPartitionException,然后将分区所有副本状态置于OfflineReplica状态,并发送StopReplica请求给对应副本以及LeaderAndIsr请求给leader以缩减ISR。如果leader副本本身就处于OfflineReplica状态,它就不会发送LeaderAndIsr请求因为leader项本身会被更新为-1,最后将所有副本置于ReplicaDeletionStarted状态——这将会发送StopReplica请求并连同副本一起删除,最后删除对应分布的所有副本的所有持久化数据

deleteTopicStopReplicaCallback:删除副本的回调方法,具体逻辑如下:

  • 获取StopReplica请求的响应
  • 找出那些删除失败的副本,如果不是全部副本都失败,就能获得那些被成功删除的副本,调用completeReplicaDeletion方法完成对这些副本的删除

五、ControllerChannelManager.scala

这个scala文件很复杂,里面定义了很多类和object。先说Callbacks这组伴生对象。Callbacks,顾名思义就是定义了回调方法类,目前Kafka定义了三种请求响应的回调方法: LeaderAndIsrResponse、UpdateMetadataResponse以及StopReplicaResponse。而Callbacks object充当了工厂类的方法定义了build方法用于构建三种回调方法。

下面再来说两个case类:StopReplicaRequestInfo和ControllerBrokerStateInfo类。ControllerBrokerStateInfo就是保存controller状态信息的,包括用于发送请求的通道channel,broker对象,请求队列,里面保存了请求及对应的回调方法,以及一个请求发送线程(后面会说到)。而StopReplicaRequestInfo类表示了该请求的相关信息,包括副本信息、是否删除分区的标识位以及一个回调方法用于在接收到响应后执行。

再来两个类:RequestSendThread——请求发送线程。定义的方法如下:

  • connectToBroker —— 打开底层的通道并连接上已发送状态变更请求
  • doWork —— 线程的核心方法,具体逻辑如下:
    • 从队列中获取第一项,并获得对应的请求及回调
    • 尝试发送请求(broker宕机一段时间了,那么controller的zk监听器ChannelConfig保存了channel的所有配置且可以动态修改,如果发送不成功,重连broker并休息300ms然后重试
    • 获取对应类型的响应,即是LeaderAndIsr、StopReplica还是UpdateMetadata的请求
    • 调用特定请求的回调

第二个类是ControllerBrokerRequestBatch。Kafka会缓存一批请求一起发送,因此该类主要定义了三类请求的批次队列: leaderAndIsrRequestMap、stopReplicaRequestMap和updateMetadataRequestMap。该类定义的方法如下:

1. newBatch: 一个检测方法,确保上述三类请求批次为空,如果不为空的话抛出异常

2. addUpdateMetadataRequestForBrokers:发送UpdateMeatadataRequest请求给指定的broker,更新给定的分区已经那些要被删除的分区。该方法中定义了一个嵌套方法:updateMetadataRequestMapFor,主要用于更新给定分区在updateMeataRequestMap中的记录信息。具体逻辑为:首先试图找出controller中保存的该分区的leader;如果不存在则直接退出,否则获取到对应的AR,如果该分区是要被删除的,则新创建一个LeaderAndIsr对象,并将leader状态设置为LeaderDuringDelete,否则直接传递给PartitionStateInfo构建新的实例,最后为每个broker都增加一条对应的分区->分区状态信息的映射记录

3. addLeaderAndIsrRequestForBrokers:增加broker与对应分区状态信息的映射到controller的缓存中。具体逻辑如下:

  • 构造一个TopicAndPartition实例
  • 为每个传入的broker取得对应的分区状态信息,如果没有对应记录直接创建一个新的map来保存brokerId -> ((topic, 分区号) -> 分区状态信息)的映射
  • 为每个broker增加一条映射记录,如果以前存在对应记录直接覆盖
  • 调用addUpdateMetadataRequestForBrokers方法将元数据更新请求发给每个broker

4. addStopReplicaRequestForBrokers:增加brokerId -> 一组StopReplica请求信息的映射。具体逻辑如下:

  • 为每个传入的broker找出对应的StopReplicaRequestInfo集合
  • 创建StopReplicaRequestInfo实例(根据回调方法是否为空创建特定版本的)然后加入到对应brokerId的StopReplicaRequestInfo集合中

5. sendRequestsToBrokers:发送请求给broker。具体逻辑如下:

  • 遍历LeaderAndIsr请求队列,获取到每个请求的目标broker以及当前broker列表中是leader的broker集合,根据它们构建新的LeaderAndIsr请求。
  • 调用controller的sendRequest发送LeaderAndIsr请求,指定不需要response
  • 清空LeaderAndIsr请求队列
  • UpdateMetadata请求队列也是类似。遍历UpdateMetadata请求队列,找出每条请求要发给的目标broker以及请求中分区状态信息
  • 使用这些信息创建新的UpdateMetadata请求,然后发送给对应的broker
  • 清空UpdateMetadata请求队列
  • 遍历StopReplica请求队列,为每个目标broker找出那些要删除的副本以及不要删除的副本来打印出来
  • 遍历每个目标broker对应的副本信息列表,为每个StopReplicaInfo创建一个StopReplicaRequest并发送出去
  • 清空StopReplicaRequestMap请求队列

现在可以说说ControllerChannelManager类了。它定义了一个Hashmap来保存broker状态信息,并在类构造时会将所有broker都装进这个map中。下面来看它定义的方法。

startRequestSendThread —— 启动对应broker的请求发送线程

startup —— 启动controller的通道管理器,就是启动所有当前可用broker的请求发送线程

removeExistingBroker —— 关闭对应broker的底层通道,清空请求/响应队列,关闭请求发送线程,并从broker状态信息从缓存中清除

shutdown —— 循环调用removeExistingBroker方法移除缓存中所有broker

sendRequest —— 发送请求,具体做法就是根据给定brokerId,找出缓存中的broker状态信息,如果不存在报警broker不可用退出,否则将request及回调方法装进对应的状态信息的消息队列中

addNewBroker —— 创建一个新的消息队列、一个阻塞队列以及一个线程,然后将这些信息封装进ControllerBrokerStateInfo中与对应的broker建立起映射

addBroker —— 如果在broker状态信息缓存中找不到给定的broker信息直接调用addNewBroker创建一个,另外开启对应broker的请求发送线程

removeBroker —— 把broker从broker状态信息缓存移除

六、KafkaController.scala

这个scala文件是controller包的核心文件,里面定义了10个class和object。我们从简单地开始说:

ControllerStats object

一个统计信息object,继承了KafkaMetricsGroup,里面定义了两种统计度量元:unclean leader选举率(每秒进行了多少次unclean leader选举)和leader选举计时器

LeaderIsrAndControllerEpoch case类

封装了一个LeaderAndIsr对象和一个int值表示controllere的epoch值,就是保存leader、ISR以及controller_epoch信息的一个数据结构

PartitionAndReplica case类

表示一个topic某个分区的一个副本的数据结构

PartitionsReassignedListener类

对分区重新分配副本,除非:1. 分区曾经存在过;2. 新的副本与已存在的副本相同;3. 新副本集合中任意一个副本已挂掉

如果上述条件中任意一个条件满足,kafka都会记录一个错误并将该分区从待重分配分区列表中移除。该类定义了两个方法:

1. handleDataChange:通过命令行发起的分区重分配时调用,具体逻辑如下:

  • 从zookeeper的/admin/reassign_partitions节点下读取分区重分配信息
  • 从该部分信息中过滤出尚未开始重分配的分区列表(与controller中缓存的正在进行重分配分区比较)
  • 遍历每个需要进行重分配副本的分区,先判断其所属的topic是否需要进行删除。如果topic需要进行删除,则跳过分区重分配操作,并将其从重分配副本分区列表中移除。如果topic不需要进行删除,构造一个ReassignedPartitionsContext传递给controller发起重分配副本的操作

2. handleDataDeleted:如果zookeeper中的leader信息被删除时会调用该方法,不过貌似这个方法没有具体的实现代码

ReassignedPartitionsIsrChangeListener类

也是一个zookeeper监听类,负责监听重分配分区、ISR信息的变化。同样地,它定义了两个方法:

1. handleDataChange:当有分区需要变更leader为preferred副本,具体逻辑如下:

  • 检查目标分区是否正在进行重分配,如果不是的话直接退出方法
  • 否则,从zookeeper中再次读取leader和ISR信息(因为zookeeper客户端回调方法并不会返回Stat对象)
  • 如果不存在leader和ISR信息,直接抛出异常说明分区不存在,否则检查下ISR中是否有加入新的副本
  • 如果没有加入新的副本则继续分区重分配操作;否则记录下这种情况即可

2. handleDataDeleted: 空方法

PreferredReplicaElectionListener类

为zookeeper中/admin/preferred_replica_election下给定的分区列表开始preferred副本leader选举。定义的两个方法如下:

1. handleDataChange:命令行命令发起的分区重分配时会调用这个方法,具体逻辑如下:

  • 找出那些要进行preferred副本leader选举的分区列表
  • 把controller缓存的当前正在进行的preferred副本选举的分区从上一步获得的分区列表中去掉,并过滤出那些所属topic要被删除的分区
  • 如果得到的分区集合不为空记录一个错误,并从最后要进行选举的分区集合中去除掉
  • 调用onPreferredReplicaElection方法进行preferred副本选举

2. handleDataDeleted:空方法

ReassignedPartitionsContext case类

重分配分区的一个上下文信息类,封装了新分配的副本集合以及注册一个分区ISR变更监听器

ControllerContext类

Kafka controller的上下文信息类,构造函数接收两个用于连接zookeeper的字段。该类定义的类字段如下:

1. controllerChannelManager —— controller的通道管理器

2. controllerLock —— 一个重入锁,用于同步操作

3. shuttingDownBrokerIds —— 正在处于关闭状态的Broker ID集合

4. brokerShutdownLock —— broker关闭时的同步锁

5. epoch —— controller的epoch信息,初始为0

6. epochZkVersion —— zk版本的epoch信息,初始为0

7. correlationId —— 关联请求/响应的correlationId

8. allTopics —— 总的topic集合

9. partitionReplicaAssignment —— 保存分区集合中每个分区的AR(assigned replicas)

10. partitionLeadershipInfo —— 保存分区集合中每个分区的leader、ISR以及controller_epoch信息

11. partitionBeingReassigned —— 正在进行副本重分配的分区集合

12. partitionsUndergoingPreferredReplicaElection —— 正在进行preferred副本leader选举的分区集合

13. liveBrokersUnderlying —— 当前可用的broker实例集合

14. liveBrokerIdsUnderlying —— 当前可用的broker ID集合

Controller上下文类定义的方法如下:

1. liveBrokers_ —— 设置当前可用的broker集合,即给liveBrokersUnderlying和liveBrokerIdsUnderlying赋值

2. liveBrokers/liveBrokerIds —— 过滤掉那些正在关闭中的broker

3. liveOrShuttingDownBrokersIds/liveOrShuttingDownBrokers —— 获取当前可用的以及正处于关闭状态的broker集合

4. partitionsOnBroker —— 返回所有在给定broker上保存有副本的分区集合

5. replicasOnBrokers —— 返回给定Broker集合中所有broker上保存的所有副本记录,封装成PartitionAndReplica集合返回

6. replicasForTopic —— 返回给定topic的所有副本,封装成PartitionAndReplica集合返回

7. partitionsForTopic —— 返回给定topic的所有分区

8. allLiveReplicas —— 返回所有可用的副本记录

9. replicasForPartition —— 返回给定分区的副本集合,封装成PartitionAndReplica集合返回

10. removeTopic —— 把topic从controller中移除,包括partitionLeadershipInfo、partitionReplicaAssignment和allTopics

KafkaController object

该object主要定义了一个方法:

parseControllerId —— 解析controller id,给定的json串是从zookeeper的/controller节点的数据,类似于{"version":1,"brokerid":0,"timestamp":"1431655574471"}

KafkaController class

kafka controller的核心类,有1000多行代码。我们先来看其定义的一些主要字段:

1. isRunning —— 标志位,判断该controller是否处于运行状态

2. controllerContext —— 创建的controller的上下文类实例

3. partitionStateMachine —— 分区状态机

4. replicaStateMachine —— 副本状态机

5. controllerElector —— controller的leader选举器,主要由server包的ZookeeperLeaderElector实现(server包时我们再说)

6. autoRebalanceScheduler —— KafkaScheduler实现的单独的controller调度器,可以独立于Kafka server启停

7. deleteTopicManager —— Topic删除管理器

8. offlinePartitionSelector/reassignedPartitionLeaderSelector/preferredReplicaPartitionLeaderSelector/controlledShutdownPartitionLeaderSelector —— 创建了各种分区leader选举器

9. brokerRequestBatch —— broker缓存的请求批次

10. partitionReassignedListener —— 分区重分配副本的监听器

11. preferredReplicaElectionListener —— 分区进行preferred副本leader选举的监听器

除了这些主要的类字段,该类还提供了很多重要的controller方法,比如onNewTopicCreation、onNewPartitionCreation、onBrokerFailure、onBrokerStartup、onPartitionReassignment、onPreferredReplicaElection和shutdownBroker等。我们还是一个一个说:

1. onNewPartitionCreation:这个回调方法在topic创建回调方法中被调用,具体逻辑如下:

  • 将给定分区设置为NewPartition状态
  • 将分区下所有副本都设定为NewReplica状态
  • 再次设定所有给定分区状态为OnlinePartition
  • 再次设定给定分区下所有副本状态设定为OnlineReplica

2. onNewTopicCreation:分区状态机的topic变更监听器会调用这个方法。具体逻辑如下:

  • 对于每一个topic都注册一个分区变更监听器
  • 为传入的新分区调用onNewPartitionCreation方法来创建这些新的分区
  • 将包含新topic信息的元数据请求发送给所有broker以使他们能够处理新topic的请求

3. isActive:判断当前broker是否是controller。如果其包含的上下文中的通道管理器不为空说明就是当前controller

4. shutdownBroker:正常关闭controller。controller会首先确定关闭broker上的leader replica,然后将这些leader转移到其他broker上。具体逻辑如下:

  • 首先判断是否是当前controller,如果不是抛出异常——毕竟不是controller何谈关闭!
  • 判断一下该broker是否存在,如果不存在自然也抛出异常
  • 将brokerId加入到待关闭broker集合中
  • 找出在该broker上所有topic分区以及副本数
  • 遍历每一个topic分区,如果其leader副本就在该broker上重新选举leader,更新zookeeper中的信息然后发送UpdateMetadata请求通知所有受影响的broker
  • 如果leader副本不在该broker上,把该broker加入到StopReplica请求队列中(但不删除分区)然后发送之
  • 最后将待关闭broker上所有副本都设置为OfflineReplica,更新zookeeper的ISR信息通知同志请求给leader

5. watchIsrChangesForReassignedPartition:注册ISR变更监听器监听zookeeper上/brokers/topics/[topic]/partitions/[partitionId]/state节点的变更以获取leader及ISR的变化

6. initiateReassignReplicasForTopicPartition:为给定topic分区重新分配副本,具体逻辑如下:

  • 根据给定的待重分配副本的分区上下文信息获取出要进行重新进行分配的副本集合,并从中获取出当前可用的副本集合
  • 获取controller中保存的该topic分区的所有副本。如果当前已分配的副本和要进行分配的新副本完全相同,则抛出异常表明不用分配了
  • 否则,先判断一下待分配的新副本集合中是否有不可用副本,如果没有则注册一下ISR监听器并标记topic不能被删除,最后调用onPartitionReassignment方法重新分配分区的副本
  • 如果有不可用副本,抛出异常说明无法开始此次副本重分配操作

7. sendUpdateMetadataRequest:将包含给定分区leader信息的UpdateMetadata请求发送到给定的broker上通知它们

8. maybTriggerPartitionReassignment:可能会触发分区进行副本重分配。做法就是遍历controller上下文信息保存的所有待副本重分配的分区列表

9. startChannelManager:根据KafkaConfig创建一个新的ControllerChannelManager并启动它

10. updateLeaderAndIsrCache:从zookeeper中更新controller保存的分区leader、isr和controller_epoch缓存

11. areReplicasInIsr:判断一组副本是否在某个topic分区的ISR中

12. startNewReplicasForReassignedPartition:为已分配副本的分区增加新的副本——发送StartReplica请求给新副本所在的broker

13. updateLeaderEpoch:不会修改leader或ISR,只是递增leader的epoch值

14. removeReplicaFromIsr:将给定的分区副本从ISR中移除。前提是它不是leader且ISR有足够多的副本。具体逻辑如下:

  • 不断地去刷新获取zookeeper中该分区的leader和ISR信息
  • 如果信息不存在则退出方法,否则判断一下epoch值是否比当前controller的epoch值大,如果是说明存在别的controller,那么抛出异常;如果否的话还需要判断一下给定副本是否在ISR中
  • 若存在,如果该副本还是leader副本的话就设置新的leader是-1,然后构建一个新的ISR(去掉了该副本)
  • 如果这个新的ISR为空说明要去除的副本已是ISR中最后一个了,如果没有开启unclean leader选举,那么就必须保存这个副本以便让它以后可以成为leader
  • 使用新的leader、ISR和controller_epoch值更新zookeeper然后返回更新结果
  • 如果第二步最后检查步骤中发现该副本不在ISR中那么就使用原来的信息更新controller的缓存
  • 最后返回更新过的leader、ISR和controller_epoch值

15. removePartitionsFromPreferredReplicaElection:把给定的多个分区从preferred leader副本选举缓存中移除

16. updateAssignedReplicasForPartition:根据传入的副本分配信息更新zookeeper中对应topic的副本分配记录

17. removePartitionFromReassignedPartitions:将给定topic分区的副本分配信息从zookeer和controller缓存中删除

18. readControllerEpochFromZookeeper:从zookeeper的/controller_epoch中读取controller_epoch信息

19. registerReassignedPartitionsListener/deregisterReassignedPartitionsListener:注册/取消zookeer上对/admin/reassign_partitions的监听

20. registerPreferredReplicaElectionListener/deregisterPreferredReplicaElectionListener:注册/取消zookeeper上对/admin/preferred_replica_election的监听

21. deregisterReassignedPartitionsIsrChangeListeners:取消所有正在进行副本分配的分区的ISR变更监听器

22. updateLeaderEpochAndSendRequest:更新给定分区的leader的epoch值并发送LeaderAndIsr请求给leader。具体逻辑如下:

  • 更新给定分区的leader_epoch值。如果更新失败记录一个错误表明无法发送LeaderAndIsr请求然后退出
  • 否则构造一个新的LeaderAndIsr请求并加入到LeaderAndIsr发送批次中
  • 发送批次中的LeaderAndIsr请求,将新分配的副本发给分区的leader

23. stopOldReplicasOfReassignedPartition:将给定分区中原先已存在的副本都停掉。具体逻辑如下:

  • 首先将这些旧的副本都设置为OfflineReplica状态,并从ISR中移除
  • 然后设置它们状态为ReplicaDeletionStarted,发送停止副本的命令给这些副本
  • 最后设置它们状态分别为ReplicaDeletionSuccessful和NonExistentReplica把它们从分区的AR中移除

24. moveReassignedPartitionLeaderIfRequired:如果必须的话重新选举某个分区的leader副本,具体逻辑如下:

  • 获得要进行分配的新的副本集合以及给定分区当前的leader
  • 保存缓存中该分区的副本分配记录,然后更新为上一步中新副本集合——这样做可以让LeaderAndIsr请求封装这个集合发送到当前或新选的leader,从而阻止它往ISR中添加旧的副本
  • 查看一下待分配的副本集合中是否包含leader副本:如果不包含的话则需要从ISR中重新选举leader
  • 否则检查一下leader是否可用,如果不可用也需要重新选举leader,否则更新zookeeper中leader的epoch值,以供后续的LeaderAndIsr请求使用

25. initializeTopicDeletion:就是找出那些需要删除的topic集合以及不能被删除的topic集合,然后使用它们创建一个topic删除管理器

26. initalizePartitionReassignment:主要就是找出哪些分区需要进行副本的重分配。具体逻辑如下:

  • 从zookeeper的/admin/reassign_partitions下读取正在进行副本分配操作的分区集合
  • 检查这些分区,找出那些已经完成的或是topic已被删除的分区
  • 遍历上一步中得到的分区集合,删除缓存中对应分区的记录
  • 之后构建准备进行副本分配的分区集合然后把它们加入到controller的对应缓存中

27. initializePreferredReplicaElection:主要就是要让controller缓存那些要进行preferred副本选举的分区。具体逻辑如下:

  • 从zookeeper的/admin/preferred_replica_election节点下读取处那些正在进行的preferred副本选举的分区集合
  • 检查它们是否已经完成或所属topic是否已被删除
  • 更新controller的缓存把那些分区集合加入到缓存中(但不包括上一步中的那些分区)

28. initalizeControllerContext:初始化controller上下文,具体逻辑如下:

  • 更新controller缓存的可用broker列表、所有topic列表、topic分区的副本分配记录
  • 在zookeeper中更新所有分区的leader、ISR信息
  • 开启controller的通道管理器
  • 初始化preferred副本选举分区、重分配副本的分区
  • 创建topic删除管理器

29. SessionExpirationListener嵌套类:监控zookeeper中会话过期的监听器。当前的实现中什么都不做,因为zookeeper客户端会重连

30. registerSessionExpirationListener:注册会话过期监听器

31. incrementControllerEpoch:递增controller的epoch值

32. sendRequest:调用通道管理器的sendRequest方法给给定的broker发送给定的request

33. startup:启动controller模块的方法。该方法并不会假设当前的broker就是controller,它仅仅是注册一个zookeeper会话过期监听器并启动controller的leader选举

34. onBrokerFailure:分区副本状态机的broker监听器会调用该回调方法,用于监听failed的broker。该方法主要做了一下三件事情:1. 标记failed leader的分区为Offline状态;2. 为所有New/Offline状态的分区触发OnlinePartition状态转换;3.每个failed 的broker的副本都设置为OfflineReplica。需要注意的是,该方法并没有刷新Leader和ISR缓存,主要是因为分区状态机在稍后为New/Offline分区变为Online执行leader选举时会刷新缓存。该方法具体逻辑如下:

  • 将正在被删除的broker从给定的dead broker列表中移除
  • 找出那些leader副本在dead broker中的所有分区,触发这些分区的状态变更,将它们设置为OfflinePartition
  • 再次触发分区状态变更,置为OnlinePartition
  • 找出dead broker上所有副本以及这些broker之上的当前可用副本,并将这些副本置于OfflineReplica状态
  • 检查是否存在待删除topic的副本,如果存在的话,将这些副本置于ReplicaDeletionIneligible状态

35. onBrokerStartup:副本状态机的broker监听器会调用此方法,传入一组新创建的broker。大致流程如下:1. 为所有NewPartition/OfflinePartition的分区触发OnlinePartition状态变更;2. 检查是否存在重分配副本分配给新添加的broker。如果存在的话,它会为每个topic分区都执行重分配的操作。该方法中不需要刷新leader和ISR缓存。具体逻辑如下:

  • 发送关于所有分区的UpdateMetadata请求到新添加的broker
  • 将新添加的broker上的副本置于OnlineReplica状态
  • 将OfflinePartition和NewPartition状态的分区都设置为OnlinePartition(使用OfflinePartitionLeaderSelector)
  • 对于那些存在于新添加broker的副本的分区,调用onPartitionReassignment来完成分区副本重分配

36. onControllerFailover:zookeeper leader选举器调用该方法选举当前broker为新的controller。具体逻辑如下:

  • 从zookeeper中读取controller_epoch值,并增加它的值
  • 注册controller_epoch变更监听器
  • 初始化controller的context对象以缓存当前topic,可用broker和所有分区leader
  • 启动controller的通道管理器、副本状态机和分区状态机
  • 为所有topic注册分区变更监听器,并将broker置于controller
  • 尝试进行分区副本重分配以及preferred副本leader选举,并将结果发送UpdateMetadata请求给所有可用的broker
  • 最后启动删除topic管理器

37. onControllerResignation:当前broker放弃controller身份时zk leader选举器会调用该方法,主要用于清除内部的controller。具体逻辑如下:

  • 取消所有zk上的监听器,包括reassignedPartitionListener和preferredReplicaElectionListener
  • 关闭topic删除管理器
  • 关闭分区状态机和副本状态机,关闭通道管理器
  • 重设controller context,并重置broker状态

38. onPreferredReplicaElection:为指定分区进行preferred副本leader选举。做法就是将所有目标分区状态置于OnlinePartition,并使用preferredReplicaPartitionLeaderSelector进行选举。

39. onPartitionReassignment——controller中最重要的方法!当命令行发起一个分区重分配操作时,它会在zookeeper的/admin/reassign_partitions路径下创建节点以触发zookeeper的监听器。对一个分区重新分配其副本涉及几个步骤。首先先来明确一下其中的几个名词:

RAR —— 重分配过的副本列表

OAR —— 重分配前的副本列表

AR —— 当前已分配的副本列表

大体的思想就是:

  • 使用OAR和RAR更新zookeeper中的AR记录
  • 为OAR+RAR中的每个副本发送LeaderAndIsr请求,主要是通过强制更新zookeeper中的leader_epoch值来完成
  • 启动新的RAR-OAR中的副本,就是将它们置于NewReplica状态
  • 等待RAR中所有副本都与leader同步
  • 将RAR中所有副本都设置为OnlineReplica转啊柜台
  • 更新缓存,设置AR为RAR
  • 如果leader副本不在RAR中,从RAR中选举一个新的leader。如果是从RAR中选出的leader则直接发送LeaderAndIsr,否则更新zookeeper中的leader_epoch然后再发送LeaderAndIsr请求。不管怎样,LeaderAndIsr请求中的AR都将被赋值为RAR,这可以阻止leader将(RAR - OAR)的副本重新添加会ISR中
  • 将所有(OAR-RAR)中的副本都置于OfflineReplica状态,在zookeeper中更新ISR使之去掉(OAR-RAR),然后只向leader发送LeaderAndIsr请求告知它ISR的更新。之后发送StopReplica请求给所有OAR-RAR中的副本
  • 发送StopReplica请求将所有OAR-RAR中的副本都移动到NonExistentReplica状态,并删除磁盘上的副本数据
  • 更新Zookeeper中的AR为RAR
  • 更新/admin/reassign_partitions数据移除该分区数据
  • 选举leader之后,副本和ISR信息都变化了,因此需要重新发送UpdateMetadata请求给每个broker
时间: 2024-09-28 17:01:31

【原创】kafka controller源代码分析(二)的相关文章

【原创】kafka controller源代码分析(一)

Kafka集群中的一个broker会被作为controller负责管理分区和副本的状态以及执行类似于重分配分区之类的管理任务.如果当前的controller失败了,会从剩下的broker中选出新的controller. 一.PartitionLeaderSelector.scala 顾名思义就是为分区选举出leader broker,该trait只定义了一个方法selectLeader,接收一个TopicAndPartition对象和一个LeaderAndIsr对象.TopicAndPartit

【原创】kafka server源代码分析(二)

十四.AbstractFetcherManager.scala 该scala定义了两个case类和一个抽象类.两个case类很简单: 1. BrokerAndFectherId:封装了一个broker和一个fetcher的数据结构 2. BrokerAndInitialOffset:封装了broker和初始位移的一个数据结构 该scala中最核心的还是那个抽象类:AbstractFetcherManager.它维护了一个获取线程的map,主要保存broker id + fetcher id对应的

Kafka SocketServer源代码分析

Kafka SocketServer源代码分析 标签: kafka 本文将详细分析Kafka SocketServer的相关源码. 总体设计 Kafka SocketServer是基于Java NIO来开发的,采用了Reactor的模式,其中包含了1个Acceptor负责接受客户端请求,N个Processor负责读写数据,M个Handler来处理业务逻辑.在Acceptor和Processor,Processor和Handler之间都有队列来缓冲请求. kafka.network.Accepto

【原创】kafka server源代码分析(一)

这个是Kafka server的核心包,里面的类也很多,我们还是一个一个分析 一.BrokerStates.scala 定义了目前一个kafka broker的7中状态 —— 1. NotRunning:未运行 2. Starting:启动中 3. RecoveringFromUncleanShutdown:从上次异常恢复中 4. RunningAsBroker:已启动 5. RunningAsController:作为Controller运行 6. PendingControlledShutd

【原创】Kafka console consumer源代码分析(二)

我们继续讨论console consumer的实现原理,本篇着重探讨ZookeeperConsumerConnector的使用,即后续所有的内容都由下面这条语句而起: val connector = Consumer.create(config) 那么问题来了?这条语句后面执行了什么呢?我们先看create方法的定义 def create(config: ConsumerConfig): ConsumerConnector = { val consumerConnect = new Zookee

【原创】Kakfa utils源代码分析(二)

我们继续研究kafka.utils包 八.KafkaScheduler.scala 首先该文件定义了一个trait:Scheduler——它就是运行任务的一个调度器.任务调度的方式支持重复执行的后台任务或是一次性的延时任务.这个trait定义了三个抽象方法: 1. startup: 启动调度器,用于接收调度任务 2. shutdown: 关闭调度器.一旦关闭就不再执行调度任务了,即使是那些晚于关闭时刻的任务. 3. schedule: 调度一个任务的执行.方法接收4个参数 3.1 任务名称 3.

【原创】kafka admin源代码分析

admin包定义了命令行的一些实现 一.AdminOperationException.scala 一个异常类,表示执行admin命令时候抛出的异常 二.AdminUtils.scala admin一些常用工具方法: 1. assignReplicasToBrokers:负责分配副本到不同的broker上.主要有两个目标:① 尽可能均匀地在不同的broker上分配副本:② 对于被分配到同一个broker上的分区而言,它们的其他副本会尽可能地分配到不同的broker上.如果要达到这些目标,需要①分

【原创】kafka consumer源代码分析

顾名思义,就是kafka的consumer api包. 一.ConsumerConfig.scala Kafka consumer的配置类,除了一些默认值常量及验证参数的方法之外,就是consumer的配置参数了,比如group.id, consumer.id等,详细列表见官网. 二.ConsumerIterator.scala KafkaStream的迭代器类,当stream底层的阻塞队列为空时该iterator会置于阻塞状态.这个迭代器还提供了一个shutdownCommand对象可作为一个

【原创】kafka client源代码分析

该包下只有一个文件:ClientUtils.scala.它是一个object,里面封装了各种client(包括producer,consumer或admin)可能会用到的方法: 1. fetchTopicMetadata(producer版本): producer client会调用该方法来发送一个TopicMetadata请求,最后返回该请求对应的response.具体逻辑如下: 构造一个TopicMetadataRequest请求 将给定的broker列表按照随机顺序打散以防止大量的请求被路