副本和分区状态机

副本和分区状态机

1 前言

Controller 是从Kafka集群中选取一个的broker,负责管理topic分区和副本的状态的变化,通过上篇我们知道了controller的启动流程,这篇我们学习一下分区和副本状态机。

分区状态机记录着当前集群所有 Partition 的状态信息以及如何对 Partition 状态转移进行相应的处理;副本状态机则是记录着当前集群所有 Replica 的状态信息以及如何对 Replica 状态转变进行相应的处理。

2 PartitionStateMachine

PartitionStateMachine 记录着集群所有 Partition 的状态信息,它决定着一个 Partition 处在什么状态以及它在什么状态下可以转变为什么状态,Kafka 中 Partition 的状态总共有以下四种类型:

  • l  NonExistentPartition:这个代表着这个 Partition 之前没有被创建过或者之前创建了现在又被删除了,它有效的前置状态是 OfflinePartition;
  • l  NewPartition:Partition 创建后,它将处于这个状态,这个状态的 Partition 还没有 leader 和 isr,它有效的前置状态是 NonExistentPartition;
  • l  OnlinePartition:一旦这个 Partition 的 leader 被选举出来了,它将处于这个状态,它有效的前置状态是 NewPartition、OnlinePartition、OfflinePartition;
  • l  OfflinePartition:如果这个 Partition 的 leader 掉线,这个 Partition 将被转移到这个状态,它有效的前置状态是 NewPartition、OnlinePartition、OfflinePartition。

分区状态机转移图如下所示:

先看下 KafkaController 在启动时,调用 PartitionStateMachine 的 startup() 方法初始化的处理过程。

PartitionStateMachine 的初始化方法如下所示:

在这个方法中,PartitionStateMachine 先调用 initializePartitionState() 方法初始化集群中所有 Partition 的状态信息:

  • l  如果该 Partition 有 LeaderAndIsr 信息,那么如果 Partition leader 所在的机器是 alive 的,那么将其状态设置为 OnlinePartition,否则设置为 OfflinePartition 状态;
  • l  如果该 Partition 没有 LeaderAndIsr 信息,那么将其状态设置为 NewPartition。

这里只是将 Partition 的状态信息更新分区状态机的缓存 partitionState 中,并没有真正进行状态的转移。

在初始化的第二步,将会调用 triggerOnlinePartitionStateChange() 方法,为所有的状态为 NewPartition/OnlinePartition 的 Partition 进行 leader 选举,选举成功后的话,其状态将会设置为 OnlinePartition。

上面方法的目的是为尝试将所有的状态为 NewPartition/OnlinePartition 的 Partition 状态转移到 OnlinePartition,这个方法主要是做了两件事:

  • l  状态转移(这个在下面详细讲述);
  • l  发送相应的请求。

2.1分区的状态转移

这里以要转移的 TargetState 区分做详细详细讲解,当 TargetState 分别是 NewPartition、OfflinePartition、NonExistentPartition 或者 OnlinePartition 时,副本状态机所做的事情。

TargetState: NewPartition

NewPartition 是 Partition 刚创建时的一个状态,其处理逻辑如下:

实现逻辑:

l  校验其前置状态,它有效的前置状态为 NonExistentPartition;

l  将该 Partition 的状态转移为 NewPartition 状态,并且更新到缓存中。

TargetState: OnlinePartition

OnlinePartition 是一个 Partition 正常工作时的状态,这个状态下的 Partition 已经成功选举出了 leader 和 isr 信息,其实现逻辑如下:

实现逻辑:

  • l  校验这个 Partition 的前置状态,有效的前置状态是:NewPartition、OnlinePartition 或者 OfflinePartition;
  • l  如果前置状态是 NewPartition,那么为该 Partition 选举 leader 和 isr,更新到 zk 和 controller 的缓存中,如果副本没有处于 alive 状态的话,就抛出异常;
  • l  如果前置状态是 OnlinePartition,那么只是触发 leader 选举,在 OnlinePartition –> OnlinePartition 这种状态转移时,需要传入 leader 选举的方法,触发该 Partition 的 leader 选举;
  • l  如果前置状态是 OfflinePartition,同上,也是触发 leader 选举。
  • l  更新 Partition 的状态为 OnlinePartition。

对于以上这几种情况,无论前置状态是什么,最后都会触发这个 Partition 的 leader 选举,leader 成功后,都会触发向这个 Partition 的所有 replica 发送 LeaderAndIsr 请求。

TargetState: OfflinePartition

OfflinePartition 是这个 Partition 的 leader 挂掉时转移的一个状态,如果 Partition 转移到这个状态,那么就意味着这个 Partition 没有了可用 leader。

实现逻辑:

  • l  校验其前置状态,它有效的前置状态为 NewPartition、OnlinePartition 或者 OfflinePartition;
  • l  将该 Partition 的状态转移为 OfflinePartition 状态,并且更新到缓存中。

TargetState: NonExistentPartition

NonExistentPartition 代表了已经处于 OfflinePartition 状态的 Partition 已经从 metadata 和 zk 中删除后进入的状态。

实现逻辑:

  • l  校验其前置状态,它有效的前置状态为 OfflinePartition;
  • l  将该 Partition 的状态转移为 NonExistentPartition 状态,并且更新到缓存中。

2.2状态转移触发的条件

这里主要是看一下上面 Partition 各种转移的触发的条件,整理的结果如下表所示,部分内容会在后续文章讲解。


TargetState


触发方法


作用


OnlinePartition


Controller 的 shutdownBroker()


优雅关闭 Broker 时调用,因为要下线的节点是 leader,所以需要触发 leader 选举


OnlinePartition


Controller 的 onNewPartitionCreation()


Partition 新建时,这个是在 Replica 已经变为 NewPartition 状态后进行的,为新建的 Partition 初始化 leader 和 isr


OnlinePartition


controller 的 onPreferredReplicaElection()


对 Partition 进行最优 leader 选举,目的是触发 leader 选举


OnlinePartition


controller 的 moveReassignedPartitionLeaderIfRequired()


分区副本迁移完成后,1. 当前的 leader 不在 RAR 中,需要触发 leader 选举;2. 当前 leader 在 RAR 但是掉线了,也需要触发 leader 选举


OnlinePartition


PartitionStateMachine 的 triggerOnlinePartitionStateChange()


当 Controller 重新选举出来或 broker 有变化时,目的为了那些状态为 NewPartition/OfflinePartition 的 Partition 重新选举 leader,选举成功后状态变为 OnlinePartition


OnlinePartition


PartitionStateMachine 的 initializePartitionState()


Controller 初始化时,遍历 zk 的所有的分区,如果有 LeaderAndIsr 信息并且 leader 在 alive broker 上,那么就将状态转为 OnlinePartition。


OfflinePartition


controller 的 onBrokerFailure()


当有 broker 掉线时,将 leader 在这个机器上的 Partition 设置为 OfflinePartition


OfflinePartition


TopicDeletionManager 的 completeDeleteTopic()


Topic 删除成功后,中间会将该 Partition 的状态先转变为 OfflinePartition


NonExistentPartition


TopicDeletionManager 的 completeDeleteTopic()


Topic 删除成功后,最后会将该 Partition 的状态转移为 NonExistentPartition


NewPartition


Controller 的 onNewPartitionCreation()


Partition 刚创建时的一个中间状态 ,此时还没选举 leader 和设置 isr 信息

3 ReplicaStateMachine

ReplicaStateMachine 记录着集群所有 Replica 的状态信息,它决定着一个 replica 处在什么状态以及它在什么状态下可以转变为什么状态,Kafka 中副本的状态总共有以下七种类型:

  • l  NewReplica:这种状态下 Controller 可以创建这个 Replica,这种状态下该 Replica 只能作为 follower,它可以是当创建topic或分区重新分配期间副本被创建,也可以是 Replica 删除后的一个临时状态,它有效的前置状态是 NonExistentReplica;
  • l  OnlineReplica:一旦这个 Replica 被分配到指定的 Partition 上,并且 Replica 创建完成,那么它将会被置为这个状态,在这个状态下,这个 Replica 既可以作为 leader 也可以作为 follower,它有效的前置状态是 NewReplica、OnlineReplica 或 OfflineReplica;
  • l  OfflineReplica:如果一个 Replica 挂掉(所在的节点宕机或者其他情况),该 Replica 将会被转换到这个状态,它有的效前置状态是 NewReplica、OfflineReplica 或者 OnlineReplica;
  • l  ReplicaDeletionStarted:Replica 开始删除时被置为的状态,它有效的前置状态是 OfflineReplica;
  • l  ReplicaDeletionSuccessful:如果 Replica 在删除时没有遇到任何错误信息,它将被置为这个状态,这个状态代表该 Replica 的数据已经从节点上清除了,它有效的前置状态是 ReplicaDeletionStarted;
  • l  ReplicaDeletionIneligible:如果 Replica 删除失败,它将会转移到这个状态,这个状态意思是非法删除,也就是删除是无法成功的,它有效的前置状态是 ReplicaDeletionStarted;
  • l  NonExistentReplica:如果 Replica 删除成功,它将被转移到这个状态,它有效的前置状态是:ReplicaDeletionSuccessful。

上面的状态中其中后面4是专门为 Replica 删除而服务的,副本状态机转移图如下所示:

在之前介绍KafkaController 在启动时,会调用 ReplicaStateMachine 的 startup() 方法初始化的处理过程。

在这个方法中,ReplicaStateMachine 先调用 initializeReplicaState() 方法初始化集群中所有 Replica 的状态信息,如果 Replica 所在机器是 alive 的,那么将其状态设置为 OnlineReplica,否则设置为 ReplicaDeletionIneligible 状态,这里只是将 Replica 的状态信息更新副本状态机的缓存 replicaState中,并没有真正进行状态转移的操作。

接着第二步调用 handleStateChanges() 将所有存活的副本状态转移为 OnlineReplica 状态,这里才是真正进行状态转移的地方,其具体实现如下:

这里是副本状态机 startup() 方法的最后一步,它的目的是将所有 alive 的 Replica 状态转移到 OnlineReplica 状态,由于前面已经这些 alive replica 的状态设置成了 OnlineReplica,所以这里 Replica 的状态转移情况是:OnlineReplica –> OnlineReplica,这个方法主要是做了两件事:

  • l  状态转移(这个在下面详细讲述);
  • l  发送相应的请求。

3.1 副本的状态转移

TargetState: NewReplica

NewReplica 这个状态是 Replica 准备开始创建是的一个状态,其实现逻辑如下:

当想要把 Replica 的状态转移为 NewReplica 时,副本状态机的处理逻辑如下:

  • l  校验 Replica 的前置状态,只有处于 NonExistentReplica 状态的副本才能转移到 NewReplica 状态;
  • l  从 zk 中获取该 Topic-Partition 的 LeaderIsrAndControllerEpoch 信息;
  • l  如果获取不到上述信息,直接将该 Replica 的状态转移成 NewReplica,然后结束流程(对与新建的 Partition,处于这个状态时,该 Partition 是没有相应的 LeaderAndIsr 信息的);
  • l  获取到 Partition 的 LeaderIsrAndControllerEpoch 信息,如果发现该 Partition 的 leader 是当前副本,那么就抛出 StateChangeFailedException 异常,因为处在这个状态的 Replica 是不能被选举为 leader 的;
  • l  获取到了 Partition 的 LeaderIsrAndControllerEpoch 信息,并且该 Partition 的 leader 不是当前 replica,那么向该 Partition 的所有 Replica 添加一个 LeaderAndIsr 请求(添加 LeaderAndIsr 请求时,实际上也会向所有的 Broker 都添加一个 Update-Metadata 请求);
  • l  最后将该 Replica 的状态转移成 NewReplica,然后结束流程。

TargetState: ReplicaDeletionStarted

这是 Replica 开始删除时的状态,Replica 转移到这种状态的处理实现如下:

这部分的实现逻辑:

  • l  校验其前置状态,Replica 只能是在 OfflineReplica 的情况下才能转移到这种状态;
  • l  更新向该 Replica 的状态为 ReplicaDeletionStarted;
  • l  向该 replica 发送 StopReplica 请求(deletePartition = true),收到这请求后,broker 会从物理存储上删除这个 Replica 的数据内容;
  • l  如果请求返回的话会触发其回调函数(这部分会在 topic 删除部分讲解)。

TargetState: ReplicaDeletionIneligible

ReplicaDeletionIneligible 是副本删除失败时的状态,Replica 转移到这种状态的处理实现如下:

实现逻辑:

  • l  校验其前置状态,Replica 只能是在 ReplicaDeletionStarted 下才能转移这种状态;
  • l  更新该 Replica 的状态为 ReplicaDeletionIneligible。

TargetState: ReplicaDeletionSuccessful

ReplicaDeletionSuccessful 是副本删除成功时的状态,Replica 转移到这种状态的处理实现如下:

实现逻辑:

  • l  检验其前置状态,Replica 只能是在 ReplicaDeletionStarted 下才能转移这种状态;
  • l  更新该 Replica 的状态为 ReplicaDeletionSuccessful。

TargetState: NonExistentReplica

NonExistentReplica 是副本完全删除、不存在这个副本的状态,Replica 转移到这种状态的处理实现如下:

实现逻辑:

  • l  检验其前置状态,Replica 只能是在 ReplicaDeletionSuccessful 下才能转移这种状态;
  • l  在 controller 的 partitionReplicaAssignment 删除这个 Partition 对应的 replica 信息;
  • l  从 Controller 和副本状态机中将这个 Topic 从缓存中删除。

TargetState: OnlineReplica

OnlineReplica 是副本正常工作时的状态,此时的 Replica 既可以作为 leader 也可以作为 follower,Replica 转移到这种状态的处理实现如下:

从前面的状态转移图中可以看出,当 Replica 处在 NewReplica、OnlineReplica、OfflineReplica 或者 ReplicaDeletionIneligible 状态时,Replica 是可以转移到 OnlineReplica 状态的,下面分两种情况讲述:

NewReplica –> OnlineReplica 的处理逻辑如下:

  • l  从 Controller 的 partitionReplicaAssignment 中获取这个 Partition 的 AR;
  • l  如果 Replica 不在 AR 中的话,那么就将其添加到 Partition 的 AR 中;
  • l  最后将 Replica 的状态设置为 OnlineReplica 状态。

OnlineReplica/OfflineReplica/ReplicaDeletionIneligible –> OnlineReplica 的处理逻辑如下:

  • l  从 Controller 的 partitionLeadershipInfo 中获取 Partition 的 LeaderAndIsr 信息;
  • l  如果该信息存在,那么就向这个 Replica 所在 broker 添加这个 Partition 的 LeaderAndIsr 请求,并将 Replica 的状态设置为 OnlineReplica 状态;
  • l  否则不做任务处理;
  • l  最后更新AR Replica 的状态为 OnlineReplica。

TargetState: OfflineReplica

OfflineReplica 是 Replica 所在 Broker 掉线时 Replica 的状态,转移到这种状态的处理逻辑如下:

处理逻辑如下:

  • l  校验其前置状态,只有 Replica 在 NewReplica、OnlineReplica、OfflineReplica 或者 ReplicaDeletionIneligible 状态时,才能转移到这种状态;
  • l  向该 Replica 所在节点发送 StopReplica 请求(deletePartition = false);
  • l  调用 Controller 的 removeReplicaFromIsr() 方法将该 replica 从 Partition 的 isr 移除这个 replica(前提 isr 中还有其他有效副本),然后向该 Partition 的其他副本发送 LeaderAndIsr 请求;
  • l  更新这个 Replica 的状态为 OfflineReplica。

3.2状态转移触发的条件

这里主要是看一下上面 Replica 各种转移的触发的条件,整理的结果如下表所示


TargetState


触发方法


作用


OnlineReplica


KafkaController 的 onBrokerStartup()


Broker 启动时,目的是将在该节点的 Replica 状态设置为 OnlineReplica


OnlineReplica


KafkaController 的 onNewPartitionCreation()


新建 Partition 时,Replica 初始化及 Partition 状态变成 OnlinePartition 后,新创建的 Replica 状态也变为 OnlineReplica;


OnlineReplica


KafkaController 的 onPartitionReassignment()


副本迁移完成后,RAR 中的副本设置为 OnlineReplica 状态


OnlineReplica


ReplicaStateMachine 的 startup()


副本状态机刚初始化启动时,将存活的副本状态设置为 OnlineReplica


OfflineReplica


TopicDeletionManager 的 markTopicForDeletionRetry()


将删除失败的 Replica 设置为 OfflineReplica,重新进行删除


OfflineReplica


TopicDeletionManager 的 startReplicaDeletion()


开始副本删除时,先将副本设置为 OfflineReplica


OfflineReplica


KafkaController 的 shutdownBroker() 方法


优雅关闭 broker 时,目的是把下线节点上的副本状态设置为 OfflineReplica


OfflineReplica


KafkaController 的 onBrokerFailure()


broker 掉线时,目的是把下线节点上的副本状态设置为 OfflineReplica


NewReplica


KafkaController 的 onNewPartitionCreation()


Partition 新建时,当 Partition 状态变为 NewPartition 后,副本的状态变为 NewReplica


NewReplica


KafkaController 的 startNewReplicasForReassignedPartition()


Partition 副本迁移时,将新分配的副本状态设置为 NewReplica;


ReplicaDeletionStarted


TopicDeletionManager 的 startReplicaDeletion()


下线副本时,将成功设置为 OfflineReplica 的 Replica 设置为 ReplicaDeletionStarted 状态,开始物理上删除副本数据(也是发送 StopReplica)


ReplicaDeletionStarted


KafkaController 的 stopOldReplicasOfReassignedPartition()


Partition 的副本迁移时,目的是下线那些 old replica,新的 replica 已经迁移到新分配的副本上了


ReplicaDeletionSuccessful


TopicDeletionManager 的 completeReplicaDeletion()


物理将数据成功删除的 Replica 状态会变为这个


ReplicaDeletionSuccessful


KafkaController 的 stopOldReplicasOfReassignedPartition()


Partition 的副本迁移时,在下线那些旧 Replica 时的一个状态,删除成功


ReplicaDeletionIneligible


TopicDeletionManager 的 startReplicaDeletion()


开始副本删除时,删除失败的副本会设置成这个状态


ReplicaDeletionIneligible


KafkaController 的 stopOldReplicasOfReassignedPartition()


Partition 副本迁移时,在下线那些旧的 Replica 时的一个状态,删除失败


NonExistentReplica


TopicDeletionManager 的 completeReplicaDeletion()


副本删除成功后(状态为 ReplicaDeletionSuccessful),从状态机和 Controller 的缓存中清除该副本的记录;


NonExistentReplica


KafkaController 的 stopOldReplicasOfReassignedPartition()


Partition 的副本成功迁移、旧副本成功删除后,从状态机和 Controller 的缓存中清除旧副本的记录

参考资料:

https://blog.csdn.net/lizhitao/article/details/28108919

https://www.maiyewang.com/?p=7855

http://matt33.com/2018/06/16/controller-state-machine/

原文地址:https://www.cnblogs.com/zhy-heaven/p/10994193.html

时间: 2024-10-11 23:41:45

副本和分区状态机的相关文章

mongodb2.6部署副本集+分区

部署规划 操作系统:redhat6.4 64位 Config Route 分片1 分片2 分片3 使用端口 28000 27017 27018 27019 27020 IP地址 192.168.1.30 /etc/config.conf /etc/route.conf /etc/sd1.conf(主) /etc/sd2.conf(仲裁) /etc/sd3.conf(备) 192.168.1.52 /etc/config.conf /etc/route.conf /etc/sd1.conf(备)

Kafka(五)Kafka分区与副本

Kafka分区和副本都是由副本管理器所管理的,引入副本就是为了提高可用性,整个集群中如何判断代理是否存活? 一个存活的代理必须与Zookeeper保持连接,通过Zookeeper的心跳机制来实现的 作为一个Follower副本,该副本不能落后Leader副本太久(怎么算太久?)replica.lag.max.messages配置项确定的,默认为10秒. 满足上面2个条件则认为该副本或者节点处于同步中(in sync).Leader副本会追中所有同步中的节点,一旦一个节点宕机或者落后太久,Lead

NoSQL数据库介绍(4)

4 键/值存储 讨论了经常使用的概念.技术和模式后.第一类NoSQL数据存储会在本章进行研究. 键/值存储通常有一个简单的数据模型:一个map/dictionary,同意客户按键来存放和请求数值. 除了数据模型和API.现代键/值存储倾向于高扩展性而非一致性,因此它们中的大多数也省略了富ad-hoc查询和分析功能(尤其是联接和聚合操作被取消).通常,可存储的键的长度被限制为一定的字节数,而在值上的限制较少([ Ipp09 ],[ Nor09 ]). 键/值存储已经存在了非常长一段时间(如Berk

【原创】kafka controller源代码分析(一)

Kafka集群中的一个broker会被作为controller负责管理分区和副本的状态以及执行类似于重分配分区之类的管理任务.如果当前的controller失败了,会从剩下的broker中选出新的controller. 一.PartitionLeaderSelector.scala 顾名思义就是为分区选举出leader broker,该trait只定义了一个方法selectLeader,接收一个TopicAndPartition对象和一个LeaderAndIsr对象.TopicAndPartit

【原创】kafka controller源代码分析(二)

四.TopicDeletionManager.scala 管理topic删除的状态机,具体逻辑如下: TopicCommand发送topic删除命令,在zk的/admin/delete_topics目录下创建topic节点 controller会监听该zk目录下任何节点的变更并为对应的topic开启删除操作 controller开启一个后台线程处理topic的删除.使用该线程主要为了以后能够增加TTL(time to live)的特性.无论何时开启或重启topic删除操作时都会通知该线程.当前,

Controller机制

Controller机制 1 前言 Controller 是从Kafka集群中选取一个的broker,负责管理topic分区和副本的状态的变化,以及执行重分配分区之类的管理任务. 第一个启动的broker会成为一个controller,它会在Zookeeper上创建一个临时节点(ephemeral):/controller.其他后启动的broker也尝试去创建这样一个临时节点,但会报错,此时这些broker会在该zookeeper的/controller节点上创建一个监控(Watch),这样当该

kafka 学习之初体验

学习问题: 1.kafka是否需要zookeeper?2.kafka是什么?3.kafka包含哪些概念?4.如何模拟客户端发送.接受消息初步测试?(kafka安装步骤)5.kafka cluster怎么同zookeeper交互的? 1.kafka是否需要zoopkeeper kafka应用需要zookeeper,可以使用kafka安装包提供的zookeeper,也可以单独下载zookeeper 2.kafka是什么. kafka是一个分布式消息系统.Kafka是一个 分布式的.可分区的.可复制的

Linux磁盘管理基础

Linux磁盘管理基础 硬盘结构 文件系统与MBR.GTP 磁盘管理三步骤:分区.格式化.挂载 mount 硬盘结构 硬盘的基本组成材质是盘片,不同容量硬盘的盘片数不等.每个盘片有两面,都可记录信息.盘片表面上以盘片中心为圆心,不同半径的同心圆称为磁道,不同盘片相同半径的磁道所组成的圆柱称为柱面,每个磁道被分成许多扇形的区域,每个区域叫一个扇区,每个扇区可存储128×2^N 次方(N=0.1.2.3)字节信息.在DOS中每扇区是128×2^2 次方=512字节. 硬盘存储相关术语:CHS hea

kafka教程

一.理论介绍(一)相关资料1.官方资料,非常详细:?? http://kafka.apache.org/documentation.html#quickstart2.有一篇翻译版,基本一致,有些细节不同,建议入门时先读此文,再读官方文档.若自认英语很强,请忽视:?? http://www.linuxidc.com/Linux/2014-07/104470.htm3.还有一文也可以:http://www.sxt.cn/info-2871-u-324.html其主要内容来源于以下三篇文章:日志:每个