主要的核心类如下:
controller :
根据相关的上下文,创建KafkaController对象,引入多个监听器监听broker,topic,partition以及副本的状态变化。
ZookeeperLeaderElector:
主要负责选举当前broker为lead的过程,同时,如果出现异常情况转移lead选举权。
ReplicaStateMachine:
主要负责broker的副本状态变化跟踪与重新分配的工作
PartitionStateMachine:
主要负责topic与partition的状态变化跟踪与重新分配的工作
ZookeeperLeaderElector详解
当前broker节点成为控制节点的流程
1.注册controller epoch 的监听者,用来监控epoch的选举
2.增加controller epoch的值
3.初始化 controller的上下文,用来缓存当前的topics,活着的broker与每个online的partitions的leader
4.启动controller的channel manager
5.启动 replica state machine
6.启动 partition state machine
如果在这个过程中出现异常情况,此节点将放弃成为controller。其他的broker将触发选举动作,将接替成为controller。
ReplicaStateMachine详解
将会启动broker副本状态变化监听程序
1.当OnlinePartition状态变化例如(新建partition,offlinepartition),将触发状态变化通知
2.检查是否有需要重新分配的partition给最近启动的broker,从而进行分配调整。
在此过程中,不会更新topic/partition的leader/isr缓存,原因在于PartitionStateMachine会被触发,他会去负责相关工作。
partition state machine详解
1.创建新的topic
a)partitionStateMachine注册topic信息
b)创建新的partition,将partition设置为NewPartition 状态,再讲其NewPartition->OnlinePartition 状态
2.分区调整
partition state machine将注册一个reassigned
partitions监听器,用来调整partition。当admin开始调整分区的时候,会创建zk的/admin
/reassign_partitions,从而会触发zk此目录的监听器,开始进行分区调整1)用OAR + RAR副本组修改并分配副本列表.
2)当处于OAR + RAR时,发送LeaderAndIsr请求给每个副本.
3)副本处于RAR – OAR -> 调用方法NewReplica
4)等待直到新的副本加入isr中
5)副本处于RAR -> 调用方法OnlineReplica
6)设置AR to RAR并写到内存中,将RAR扩容
7)如果当前的lead不再RAR中,从RAR中选举一个新lead,并发送LeaderAndIsr信息。
8)对于处于OAR – RAR的副本 -> Offline (强制这些副本从isr重剔除)
9)将处于OAR – RAR的副本 ->NonExistentReplica ,并在物理硬盘上面删除对应的文件数据
10)在zk上修改AR到RAR中。
11)更新/admin/reassign_partitions下面的信息,将删除对应的partition信息
12)当新的lead选完之后,同时,也会将更新的副本与isr的信息的变化信息,通知给每一个broker
例子:
例如, if OAR = {1, 2, 3} and RAR = {4,5,6}, 在zk上重分配副本和领导者/is这些值可能经历以下转化。
AR leader/isr
{1,2,3} 1/{1,2,3} (初始化状态)
{1,2,3,4,5,6} 1/{1,2,3} (step 2)
{1,2,3,4,5,6} 1/{1,2,3,4,5,6} (step 4)
{1,2,3,4,5,6} 4/{1,2,3,4,5,6} (step 7)
{1,2,3,4,5,6} 4/{4,5,6} (step 8)
{4,5,6} 4/{4,5,6} (step 10)