kafka源码走读-controller (创建topic过程)

晚上刚刚被媳妇骂,难过之余,还是要坚持继续写一篇kafka源码走读的博客,心情难过,原谅我开头发下牢骚。。。

源码版本依然是0.10.2.1,我们都知道,kafka在0.8版本前没有提供Partition的Replication机制,一旦Broker宕机,其上的所有Partition就都无法提供服务,而Partition又没有备份数据,数据的可用性就大大降低了,所以0.8后提供了Replication机制来保证Broker的failover,而controller则是实现副本机制的核心。

controller要实现副本机制,它是极度依赖于zookeeper(以下简称zk)服务的,简单来说就是:利用zk来监听zk目录,一旦的监听到发生变化,在controller里实现的监听器的处理逻辑就会被触发,controller再将处理结果分发给相关的或所有的brokers,这样一来所有的brokers就会都知道发生的改变以及处理结果。可能还是有些抽象,接下来就以创建topic的过程为例,分析一下zk、controller、broker的server端发生的处理逻辑。

1. 常用名词解释:

  • leader:副本中的主副本,producer和consumer都是直接与leader进行交互的。
  • follower:所有副本中除了leader的其它副本,它不断地从leader来fetch数据以保持数据同步,一旦leader挂掉,会从剩下的follower中选出一个所为新的leader,数据可靠性的保证。
  • AR:是一个partition的所有副本的集合。OAR是之前的副本集合,RAR是重新分配的副本集合。
  • ISR:follower们向leader进行数据同步时会有一定的延迟,如果延迟在设定的延迟阈值里,那么该副本就属于isr(俗称进队),所以isr是leader和那些进队的follower。
  • OSR:与ISR相反,如果延迟超过阈值,那么就属于osr(俗称掉队)。因此,AR=ISR+OSR。
  • LEO:消息生产的最高位置,也就是leader最后一条消息的offset值。
  • HW:高水位的简称,表达的含义就是consumer从leader最多能消费的位置,因为producer是直接和leader进行交互的,当一条消息被append到leader上,但isr里的follower还没有将该条消息fetch过去,那么此时如果leader挂掉,可能面临着数据丢失消费不到的问题,只有当isr里的所有副本都同步完该条消息,才会将leader的hw值更新。

还是不多说废话,我们直接上代码吧。

2. 代码走读:

创建topic的命令一定不会陌生: ~/software/kafka/bin/kafka-topics.sh --topic xx_log --replication-factor 3 --partitions 240 --create --zookeeper 127.0.0.1:2181,首先从解析这条命令开始:

 1 def createTopic(zkUtils: ZkUtils, opts: TopicCommandOptions) {
 2     val topic = opts.options.valueOf(opts.topicOpt)
 3     val configs = parseTopicConfigsToBeAdded(opts)
 4     val ifNotExists = opts.options.has(opts.ifNotExistsOpt)
 5     if (Topic.hasCollisionChars(topic))
 6       println("WARNING: Due to limitations in metric names, topics with a period (‘.‘) or underscore (‘_‘) could collide. To avoid issues it is best to use either, but not both.")
 7     try {
 8       if (opts.options.has(opts.tagOpt)) {
 9         val tag = opts.options.valueOf(opts.tagOpt).toString
10         if (zkUtils.checkTagExist(tag)) {
11           if (opts.options.has(opts.replicaAssignmentOpt)) {  // 创建时指定tp在哪个broker上
12             val assignment = parseReplicaAssignment(opts.options.valueOf(opts.replicaAssignmentOpt))
13             AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, assignment, configs, update = false, tag = tag)
14           } else {
15             CommandLineUtils.checkRequiredArgs(opts.parser, opts.options, opts.partitionsOpt, opts.replicationFactorOpt, opts.tagOpt)
16             val partitions = opts.options.valueOf(opts.partitionsOpt).intValue
17             val replicas = opts.options.valueOf(opts.replicationFactorOpt).intValue
18             // 默认不会关闭机架感知副本策略
19             val rackAwareMode = if (opts.options.has(opts.disableRackAware)) RackAwareMode.Disabled
20             else RackAwareMode.Enforced
21             AdminUtils.createTopic(zkUtils, topic, partitions, replicas, configs, rackAwareMode, tag = tag)
22           }
23           println("Created topic \"%s\".".format(topic))
24         } else {
25           println(s"Tag ‘$tag‘ not exists.")
26         }
27       } else {
28         if (opts.options.has(opts.replicaAssignmentOpt)) {
29           val assignment = parseReplicaAssignment(opts.options.valueOf(opts.replicaAssignmentOpt))
30           AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, assignment, configs, update = false)
31         } else {
32           CommandLineUtils.checkRequiredArgs(opts.parser, opts.options, opts.partitionsOpt, opts.replicationFactorOpt)
33           val partitions = opts.options.valueOf(opts.partitionsOpt).intValue
34           val replicas = opts.options.valueOf(opts.replicationFactorOpt).intValue
35           val rackAwareMode = if (opts.options.has(opts.disableRackAware)) RackAwareMode.Disabled
36           else RackAwareMode.Enforced
37           AdminUtils.createTopic(zkUtils, topic, partitions, replicas, configs, rackAwareMode)
38         }
39         println("Created topic \"%s\".".format(topic))
40       }
41     } catch  {
42       case e: TopicExistsException => if (!ifNotExists) throw e
43     }
44   }

可以看到首先会解析这条创建topic的命令,一般不会在创建时就指定哪些tp在哪个broker上,这个过程一般默认靠特定的分配算法。也默认不会关闭机架感知副本策略,因为通过将副本分布在不同的机架位上,也是处于高可用的考虑上。核心是AdminUtils.createTopic,进去看下干了啥。

 1 def createTopic(zkUtils: ZkUtils,
 2                   topic: String,
 3                   partitions: Int,
 4                   replicationFactor: Int,
 5                   topicConfig: Properties = new Properties,
 6                   rackAwareMode: RackAwareMode = RackAwareMode.Enforced,
 7                   tag: String = "") {
 8     //1、创建默认quota
 9     createDefaultGroupQuota(zkUtils,topic)
10
11     //2、然后再去创建topic
12     var currentTag = tag
13     var currentrReplicationFactor = replicationFactor
14     info("createTopic topic=" + topic + ", tag=" + tag + ", partitions=" + partitions + ", replicationFactor=" + replicationFactor)
15     if (Topic.GroupMetadataTopicName.equals(topic)) {
16       val brokers = zkUtils.getLiveBrokersInTag(Topic.GroupMetadataTagName)
17       if (!brokers.isEmpty) {
18         currentTag = Topic.GroupMetadataTagName
19         currentrReplicationFactor = Math.min(brokers.size, replicationFactor)
20       }
21     } else {
22       if (currentTag != "") {
23         if (! zkUtils.checkTagExist(currentTag)) {
24           throw new AdminOperationException("Tag " + currentTag + " not exists.")
25         }
26       }
27     }
28     val brokerMetadatas = getBrokerMetadatas(zkUtils, rackAwareMode, tag = currentTag)
29     // 根据是否有机架感知信息,来对tp进行broker上的分配
30     val replicaAssignment = AdminUtils.assignReplicasToBrokers(brokerMetadatas, partitions, currentrReplicationFactor)
31     AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, replicaAssignment, topicConfig, tag = currentTag)
32   }

核心是最后两行,首先计算要创建的topic的tp如何分配在哪些brokers上,然后将分配结果写入zk。tp分配的算法不是本文的重点,它的基本思想就是取模轮询计算,使得所有tp尽可能分布在不同的broker和机架位上。重点是写zk部分,进去看一下。

 1 def createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils: ZkUtils,
 2                                                      topic: String,
 3                                                      partitionReplicaAssignment: Map[Int, Seq[Int]],
 4                                                      config: Properties = new Properties,
 5                                                      update: Boolean = false,
 6                                                      tag: String = "") {
 7     // 根据参数判断这个create是否合法
 8     validateCreateOrUpdateTopic(zkUtils, topic, partitionReplicaAssignment, config, update)
 9
10     // Configs only matter if a topic is being created. Changing configs via AlterTopic is not supported
11     if (!update) {
12       // write out the config if there is any, this isn‘t transactional(有关的) with the partition assignments
13       // 写zk过程-/config/topic/topicName目录
14       writeEntityConfig(zkUtils, getEntityConfigPath(ConfigType.Topic, topic), config)
15     }
16
17     // create the partition assignment
18     // 写zk过程-topic下的层级
19     writeTopicPartitionAssignment(zkUtils, topic, partitionReplicaAssignment, update, tag = tag)
20   }

可以看到,在判断这个create topic是否合法之后,就开始写zk了,首先写/config/topic/topicName目录,然后写写zk过程-topic下的层级,具体目录路径读者可继续深入。到这里,通过创建topic的command对zk的操作就完成了,由于监听zk的监听器存在,就会触发感知controller,具体开始注册zk监听的过程是在选举controller的过程中完成的,首先目光转到controller层,先看下针对于topic目录的监听器的实现:

 1 /**
 2    * This is the zookeeper listener that triggers all the state transitions for a partition
 3    */
 4   class TopicChangeListener(protected val controller: KafkaController) extends ControllerZkChildListener {
 5
 6     protected def logName = "TopicChangeListener"
 7
 8     // zk写完topic的目录后会被感知,是topic创建后controller反应的入口
 9     def doHandleChildChange(parentPath: String, children: Seq[String]) {
10       inLock(controllerContext.controllerLock) {
11         if (hasStarted.get) {
12           try {
13             val currentChildren = {
14               debug("Topic change listener fired for path %s with children %s".format(parentPath, children.mkString(",")))
15               children.toSet
16             }
17             val newTopics = currentChildren -- controllerContext.allTopics
18             val deletedTopics = controllerContext.allTopics -- currentChildren
19             controllerContext.allTopics = currentChildren
20
21             val addedPartitionReplicaAssignment = zkUtils.getReplicaAssignmentForTopics(newTopics.toSeq)
22             // 需要更新上下文信息,才能完成之后的主副本选举过程
23             controllerContext.partitionReplicaAssignment = controllerContext.partitionReplicaAssignment.filter(p =>
24               !deletedTopics.contains(p._1.topic))
25             controllerContext.partitionReplicaAssignment.++=(addedPartitionReplicaAssignment)
26             info("New topics: [%s], deleted topics: [%s], new partition replica assignment [%s]".format(newTopics,
27               deletedTopics, addedPartitionReplicaAssignment))
28             if (newTopics.nonEmpty)
29               controller.onNewTopicCreation(newTopics, addedPartitionReplicaAssignment.keySet)
30           } catch {
31             case e: Throwable => error("Error while handling new topic", e)
32           }
33         }
34       }
35     }
36   }

可以看到,这是个监听集群topic变化的监听器,它首先会通过zk目录的变化计算出新创建的topic以及删除的topic有哪些,在分析创建topic的时候,我们可以先只关注新创建的topic。然后获取新创建topic的tp的assignment,并更新controllerContext.partitionReplicaAssignment,至于为什么要更新这个值,后面会介绍,暂且记作mark1吧。然后就是调用onNewTopicCreation。

1   def onNewTopicCreation(topics: Set[String], newPartitions: Set[TopicAndPartition]) {
2     info("New topic creation callback for %s".format(newPartitions.mkString(",")))
3     // subscribe to partition changes
4     // 先给新创建的topic添加tp change的listener
5     topics.foreach(topic => partitionStateMachine.registerPartitionChangeListener(topic))
6     onNewPartitionCreation(newPartitions)
7   }

可以看到里面首先给新创建的topic添加tp change的listener,然后对partition进行onNewPartitionCreation。

 1 /**
 2    * This callback is invoked by the topic change callback with the list of failed brokers as input.
 3    * It does the following -
 4    * 1. Move the newly created partitions to the NewPartition state
 5    * 2. Move the newly created partitions from NewPartition->OnlinePartition state
 6    */
 7   def onNewPartitionCreation(newPartitions: Set[TopicAndPartition]) {
 8     info("New partition creation callback for %s".format(newPartitions.mkString(",")))
 9     // 将所有tp的state转为NewPartition
10     partitionStateMachine.handleStateChanges(newPartitions, NewPartition)
11     // 将所有replica的state转为NewReplica
12     replicaStateMachine.handleStateChanges(controllerContext.replicasForPartition(newPartitions), NewReplica)
13     // 将所有tp的state转为online上线
14     partitionStateMachine.handleStateChanges(newPartitions, OnlinePartition, offlinePartitionSelector)
15     // 将所有replica的state转为online上线
16     replicaStateMachine.handleStateChanges(controllerContext.replicasForPartition(newPartitions), OnlineReplica)
17   }

这四步就是创建topic的tp的四大步,简要概括就是针对于partition和replica有各自对应的状态机,先将所有partition和replica的state初始化为NewPartition,然后再分别online上线。我们再针对每一步去做细致的分析,首先是partitionStateMachine.handleStateChanges(newPartitions, NewPartition):

 1 /**
 2    * This API is invoked by the partition change zookeeper listener
 3    * @param partitions   The list of partitions that need to be transitioned to the target state
 4    * @param targetState  The state that the partitions should be moved to
 5    */
 6   def handleStateChanges(partitions: Set[TopicAndPartition], targetState: PartitionState,
 7                          leaderSelector: PartitionLeaderSelector = noOpPartitionLeaderSelector,
 8                          callbacks: Callbacks = (new CallbackBuilder).build) {
 9     debug("Invoking state change to %s for partitions %s".format(targetState, partitions.mkString(",")))
10     try {
11       brokerRequestBatch.newBatch()
12       partitions.foreach { topicAndPartition =>
13         // 给每个tp改了状态,分好了leader和isr,还写了zk,为下面的给server端发leaderAndIsr请求和update metadata请求做铺垫
14         handleStateChange(topicAndPartition.topic, topicAndPartition.partition, targetState, leaderSelector, callbacks)
15       }
16       // controller给brokers发leaderAndIsr请求和update metadata请求
17       brokerRequestBatch.sendRequestsToBrokers(controller.epoch)
18     }catch {
19       case e: Throwable => error("Error while moving some partitions to %s state".format(targetState), e)
20       // TODO: It is not enough to bail out and log an error, it is important to trigger state changes for those partitions
21     }
22   }

其中的两大步的中文注释在这一步先不用看,因为对于partition和replica的状态机变化的方法是复用的,注释是针对partition上线,对每个partition的state进行初始化的过程如下:

 1 private def handleStateChange(topic: String, partition: Int, targetState: PartitionState,
 2                                 leaderSelector: PartitionLeaderSelector,
 3                                 callbacks: Callbacks) {
 4     val topicAndPartition = TopicAndPartition(topic, partition)
 5     if (!hasStarted.get)
 6       throw new StateChangeFailedException(("Controller %d epoch %d initiated state change for partition %s to %s failed because " +
 7                                             "the partition state machine has not started")
 8                                               .format(controllerId, controller.epoch, topicAndPartition, targetState))
 9     // 初始给每个tp都置为NonExistentPartition状态,为了下面进行validation
10     val currState = partitionState.getOrElseUpdate(topicAndPartition, NonExistentPartition)
11     try {
12       targetState match {
13         case NewPartition =>
14           // pre: partition did not exist before this
15           assertValidPreviousStates(topicAndPartition, List(NonExistentPartition), NewPartition)
16           partitionState.put(topicAndPartition, NewPartition)
17           val assignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition).mkString(",")
18           stateChangeLogger.trace("Controller %d epoch %d changed partition %s state from %s to %s with assigned replicas %s"
19                                     .format(controllerId, controller.epoch, topicAndPartition, currState, targetState,
20                                             assignedReplicas))

可以看到这一步状态更改很简单,只是单纯的在partitionState里添加新partition的状态,为NewPartition,然后出来,看看brokerRequestBatch.sendRequestsToBrokers的实现:

  1 def sendRequestsToBrokers(controllerEpoch: Int) {
  2     try {
  3       leaderAndIsrRequestMap.foreach { case (broker, partitionStateInfos) =>
  4         partitionStateInfos.foreach { case (topicPartition, state) =>
  5           val typeOfRequest = if (broker == state.leaderIsrAndControllerEpoch.leaderAndIsr.leader) "become-leader" else "become-follower"
  6           stateChangeLogger.trace(("Controller %d epoch %d sending %s LeaderAndIsr request %s to broker %d " +
  7                                    "for partition [%s,%d]").format(controllerId, controllerEpoch, typeOfRequest,
  8                                                                    state.leaderIsrAndControllerEpoch, broker,
  9                                                                    topicPartition.topic, topicPartition.partition))
 10         }
 11         val leaderIds = partitionStateInfos.map(_._2.leaderIsrAndControllerEpoch.leaderAndIsr.leader).toSet
 12         val leaders = controllerContext.liveOrShuttingDownBrokers.filter(b => leaderIds.contains(b.id)).map {
 13           _.getNode(controller.config.interBrokerListenerName)
 14         }
 15         val partitionStates = partitionStateInfos.map { case (topicPartition, partitionStateInfo) =>
 16           val LeaderIsrAndControllerEpoch(leaderIsr, controllerEpoch) = partitionStateInfo.leaderIsrAndControllerEpoch
 17           val partitionState = new requests.PartitionState(controllerEpoch, leaderIsr.leader,
 18             leaderIsr.leaderEpoch, leaderIsr.isr.map(Integer.valueOf).asJava, leaderIsr.zkVersion,
 19             partitionStateInfo.allReplicas.map(Integer.valueOf).asJava)
 20           topicPartition -> partitionState
 21         }
 22         val leaderAndIsrRequest = new LeaderAndIsrRequest.
 23             Builder(controllerId, controllerEpoch, partitionStates.asJava, leaders.asJava)
 24         // leaderAndIsrRequest只会给涉及到的brokers发送
 25         controller.sendRequest(broker, ApiKeys.LEADER_AND_ISR, leaderAndIsrRequest, null)
 26       }
 27       leaderAndIsrRequestMap.clear()
 28
 29       // 必须先处理leaderAndIsrRequest,再处理updateMetadataRequest
 30       updateMetadataRequestPartitionInfoMap.foreach(p => stateChangeLogger.trace(("Controller %d epoch %d sending UpdateMetadata request %s " +
 31         "to brokers %s for partition %s").format(controllerId, controllerEpoch, p._2.leaderIsrAndControllerEpoch,
 32         updateMetadataRequestBrokerSet.toString(), p._1)))
 33       val partitionStates = updateMetadataRequestPartitionInfoMap.map { case (topicPartition, partitionStateInfo) =>
 34         val LeaderIsrAndControllerEpoch(leaderIsr, controllerEpoch) = partitionStateInfo.leaderIsrAndControllerEpoch
 35         val partitionState = new requests.PartitionState(controllerEpoch, leaderIsr.leader,
 36           leaderIsr.leaderEpoch, leaderIsr.isr.map(Integer.valueOf).asJava, leaderIsr.zkVersion,
 37           partitionStateInfo.allReplicas.map(Integer.valueOf).asJava)
 38         topicPartition -> partitionState
 39       }
 40
 41       val version: Short =
 42         if (controller.config.interBrokerProtocolVersion >= KAFKA_0_10_2_IV0) 3
 43         else if (controller.config.interBrokerProtocolVersion >= KAFKA_0_10_0_IV1) 2
 44         else if (controller.config.interBrokerProtocolVersion >= KAFKA_0_9_0) 1
 45         else 0
 46
 47       val updateMetadataRequest = {
 48         val liveBrokers = if (version == 0) {
 49           // Version 0 of UpdateMetadataRequest only supports PLAINTEXT.
 50           controllerContext.liveOrShuttingDownBrokers.map { broker =>
 51             val securityProtocol = SecurityProtocol.PLAINTEXT
 52             val listenerName = ListenerName.forSecurityProtocol(securityProtocol)
 53             val node = broker.getNode(listenerName)
 54             val endPoints = Seq(new EndPoint(node.host, node.port, securityProtocol, listenerName))
 55             new UpdateMetadataRequest.Broker(broker.id, endPoints.asJava, broker.rack.orNull)
 56           }
 57         } else {
 58           controllerContext.liveOrShuttingDownBrokers.map { broker =>
 59             val endPoints = broker.endPoints.map { endPoint =>
 60               new UpdateMetadataRequest.EndPoint(endPoint.host, endPoint.port, endPoint.securityProtocol, endPoint.listenerName)
 61             }
 62             new UpdateMetadataRequest.Broker(broker.id, endPoints.asJava, broker.rack.orNull)
 63           }
 64         }
 65         new UpdateMetadataRequest.Builder(
 66           controllerId, controllerEpoch, partitionStates.asJava, liveBrokers.asJava).
 67           setVersion(version)
 68       }
 69
 70       // 给所有存活的brokers发updateMetadataRequest
 71       updateMetadataRequestBrokerSet.foreach { broker =>
 72         controller.sendRequest(broker, ApiKeys.UPDATE_METADATA_KEY, updateMetadataRequest, null)
 73       }
 74       updateMetadataRequestBrokerSet.clear()
 75       updateMetadataRequestPartitionInfoMap.clear()
 76
 77       stopReplicaRequestMap.foreach { case (broker, replicaInfoList) =>
 78         val stopReplicaWithDelete = replicaInfoList.filter(_.deletePartition).map(_.replica).toSet
 79         val stopReplicaWithoutDelete = replicaInfoList.filterNot(_.deletePartition).map(_.replica).toSet
 80         debug("The stop replica request (delete = true) sent to broker %d is %s"
 81           .format(broker, stopReplicaWithDelete.mkString(",")))
 82         debug("The stop replica request (delete = false) sent to broker %d is %s"
 83           .format(broker, stopReplicaWithoutDelete.mkString(",")))
 84
 85         val (replicasToGroup, replicasToNotGroup) = replicaInfoList.partition(r => !r.deletePartition && r.callback == null)
 86
 87         // Send one StopReplicaRequest for all partitions that require neither delete nor callback. This potentially
 88         // changes the order in which the requests are sent for the same partitions, but that‘s OK.
 89         val stopReplicaRequest = new StopReplicaRequest.Builder(controllerId, controllerEpoch, false,
 90           replicasToGroup.map(r => new TopicPartition(r.replica.topic, r.replica.partition)).toSet.asJava)
 91         controller.sendRequest(broker, ApiKeys.STOP_REPLICA, stopReplicaRequest)
 92
 93         replicasToNotGroup.foreach { r =>
 94           val stopReplicaRequest = new StopReplicaRequest.Builder(
 95               controllerId, controllerEpoch, r.deletePartition,
 96               Set(new TopicPartition(r.replica.topic, r.replica.partition)).asJava)
 97           controller.sendRequest(broker, ApiKeys.STOP_REPLICA, stopReplicaRequest, r.callback)
 98         }
 99       }
100       stopReplicaRequestMap.clear()
101     } catch {
102       case e: Throwable =>
103         if (leaderAndIsrRequestMap.nonEmpty) {
104           error("Haven‘t been able to send leader and isr requests, current state of " +
105               s"the map is $leaderAndIsrRequestMap. Exception message: $e")
106         }
107         if (updateMetadataRequestBrokerSet.nonEmpty) {
108           error(s"Haven‘t been able to send metadata update requests to brokers $updateMetadataRequestBrokerSet, " +
109                 s"current state of the partition info is $updateMetadataRequestPartitionInfoMap. Exception message: $e")
110         }
111         if (stopReplicaRequestMap.nonEmpty) {
112           error("Haven‘t been able to send stop replica requests, current state of " +
113               s"the map is $stopReplicaRequestMap. Exception message: $e")
114         }
115         throw new IllegalStateException(e)
116     }
117   }

可以看到这步的操作主要是针对leaderAndIsrRequestMap和leaderAndIsrRequestMap进行操作的,细看这两个集合的操作完成之后都会clear,所以对于刚初始化的partition这两个集合是空的,什么也不做。到此partition的状态机转为newPartition就做完了,然后就是将所有replica的state转为NewReplica。

 1 def handleStateChanges(replicas: Set[PartitionAndReplica], targetState: ReplicaState,
 2                          callbacks: Callbacks = (new CallbackBuilder).build) {
 3     if(replicas.nonEmpty) {
 4       debug("Invoking state change to %s for replicas %s".format(targetState, replicas.mkString(",")))
 5       try {
 6         brokerRequestBatch.newBatch()
 7         replicas.foreach(r => handleStateChange(r, targetState, callbacks))
 8         // 已经将updateMetadataRequestBrokerSet和leaderAndIsrRequestMap在tp状态机上线完成server请求后clear
 9         // 此轮什么也不做
10         brokerRequestBatch.sendRequestsToBrokers(controller.epoch)
11       }catch {
12         case e: Throwable => error("Error while moving some replicas to %s state".format(targetState), e)
13       }
14     }
15   }

同partition的初始化状态一样,这步的中文注释也是online上线,进去看下对于replica状态机的初始化的处理:

 1 def handleStateChange(partitionAndReplica: PartitionAndReplica, targetState: ReplicaState,
 2                         callbacks: Callbacks) {
 3     val topic = partitionAndReplica.topic
 4     val partition = partitionAndReplica.partition
 5     val replicaId = partitionAndReplica.replica
 6     val topicAndPartition = TopicAndPartition(topic, partition)
 7     if (!hasStarted.get)
 8       throw new StateChangeFailedException(("Controller %d epoch %d initiated state change of replica %d for partition %s " +
 9                                             "to %s failed because replica state machine has not started")
10                                               .format(controllerId, controller.epoch, replicaId, topicAndPartition, targetState))
11     val currState = replicaState.getOrElseUpdate(partitionAndReplica, NonExistentReplica)
12     try {
13       val replicaAssignment = controllerContext.partitionReplicaAssignment(topicAndPartition)
14       targetState match {
15         case NewReplica =>
16           assertValidPreviousStates(partitionAndReplica, List(NonExistentReplica), targetState)
17           // start replica as a follower to the current leader for its partition
18           val leaderIsrAndControllerEpochOpt = ReplicationUtils.getLeaderIsrAndEpochForPartition(zkUtils, topic, partition)
19           leaderIsrAndControllerEpochOpt match {
20             case Some(leaderIsrAndControllerEpoch) =>
21               if(leaderIsrAndControllerEpoch.leaderAndIsr.leader == replicaId)
22                 throw new StateChangeFailedException("Replica %d for partition %s cannot be moved to NewReplica"
23                   .format(replicaId, topicAndPartition) + "state as it is being requested to become leader")
24               brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(replicaId),
25                                                                   topic, partition, leaderIsrAndControllerEpoch,
26                                                                   replicaAssignment)
27             case None => // new leader request will be sent to this replica when one gets elected
28           }
29           replicaState.put(partitionAndReplica, NewReplica)
30           stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s from %s to %s"
31                                     .format(controllerId, controller.epoch, replicaId, topicAndPartition, currState,
32                                             targetState))

相比partition的单纯在partitionState里添加新partition的状态,它还多了一步brokerRequestBatch.addLeaderAndIsrRequestForBrokers,但由于此时还没有选出leader,所以这一步将被跳过,这就完成了replica状态机的初始化。

然后就该对partition的状态进行online上线了,这一步非常重要,80%的处理都在这部分,一起进去看看吧。

 1 def handleStateChanges(partitions: Set[TopicAndPartition], targetState: PartitionState,
 2                          leaderSelector: PartitionLeaderSelector = noOpPartitionLeaderSelector,
 3                          callbacks: Callbacks = (new CallbackBuilder).build) {
 4     debug("Invoking state change to %s for partitions %s".format(targetState, partitions.mkString(",")))
 5     try {
 6       brokerRequestBatch.newBatch()
 7       partitions.foreach { topicAndPartition =>
 8         // 给每个tp改了状态,分好了leader和isr,还写了zk,为下面的给server端发leaderAndIsr请求和update metadata请求做铺垫
 9         handleStateChange(topicAndPartition.topic, topicAndPartition.partition, targetState, leaderSelector, callbacks)
10       }
11       // controller给brokers发leaderAndIsr请求和update metadata请求
12       brokerRequestBatch.sendRequestsToBrokers(controller.epoch)
13     }catch {
14       case e: Throwable => error("Error while moving some partitions to %s state".format(targetState), e)
15       // TODO: It is not enough to bail out and log an error, it is important to trigger state changes for those partitions
16     }
17   }

 1 private def handleStateChange(topic: String, partition: Int, targetState: PartitionState,
 2                                 leaderSelector: PartitionLeaderSelector,
 3                                 callbacks: Callbacks) {
 4     val topicAndPartition = TopicAndPartition(topic, partition)
 5     if (!hasStarted.get)
 6       throw new StateChangeFailedException(("Controller %d epoch %d initiated state change for partition %s to %s failed because " +
 7                                             "the partition state machine has not started")
 8                                               .format(controllerId, controller.epoch, topicAndPartition, targetState))
 9     // 初始给每个tp都置为NonExistentPartition状态,为了下面进行validation
10     val currState = partitionState.getOrElseUpdate(topicAndPartition, NonExistentPartition)
11     try {
12       targetState match {
13         case NewPartition =>
14           // pre: partition did not exist before this
15           assertValidPreviousStates(topicAndPartition, List(NonExistentPartition), NewPartition)
16           partitionState.put(topicAndPartition, NewPartition)
17           val assignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition).mkString(",")
18           stateChangeLogger.trace("Controller %d epoch %d changed partition %s state from %s to %s with assigned replicas %s"
19                                     .format(controllerId, controller.epoch, topicAndPartition, currState, targetState,
20                                             assignedReplicas))
21           // post: partition has been assigned replicas
22         case OnlinePartition =>
23           assertValidPreviousStates(topicAndPartition, List(NewPartition, OnlinePartition, OfflinePartition), OnlinePartition)
24           partitionState(topicAndPartition) match {
25             case NewPartition =>
26               // initialize leader and isr path for new partition
27               // 不仅建leader和isr的zk path,还更新leaderAndIsrRequestMap和updateMetadataRequestPartitionInfoMap,
28               // 还更新了partitionLeadershipInfo
29               initializeLeaderAndIsrForPartition(topicAndPartition)
30             case OfflinePartition =>
31               electLeaderForPartition(topic, partition, leaderSelector)
32             case OnlinePartition => // invoked when the leader needs to be re-elected
33               electLeaderForPartition(topic, partition, leaderSelector)
34             case _ => // should never come here since illegal previous states are checked above
35           }
36           partitionState.put(topicAndPartition, OnlinePartition)
37           val leader = controllerContext.partitionLeadershipInfo(topicAndPartition).leaderAndIsr.leader
38           stateChangeLogger.trace("Controller %d epoch %d changed partition %s from %s to %s with leader %d"
39                                     .format(controllerId, controller.epoch, topicAndPartition, currState, targetState, leader))
40 }
41 }
42 {

回到这个复用的方法里,可以看到除了将partition的状态从newPartition改为onlinePartition之外,最重要的就是initializeLeaderAndIsrForPartition,在里面,选了leader和isr,并且为其写了zk路径,还更新leaderAndIsrRequestMap和updateMetadataRequestPartitionInfoMap,以及partitionLeadershipInfo。细节需要进去看:

 1 private def initializeLeaderAndIsrForPartition(topicAndPartition: TopicAndPartition) {
 2     val replicaAssignment = controllerContext.partitionReplicaAssignment(topicAndPartition)
 3     val liveAssignedReplicas = replicaAssignment.filter(r => controllerContext.liveBrokerIds.contains(r))
 4     liveAssignedReplicas.size match {
 5       case 0 =>
 6         val failMsg = ("encountered error during state change of partition %s from New to Online, assigned replicas are [%s], " +
 7                        "live brokers are [%s]. No assigned replica is alive.")
 8                          .format(topicAndPartition, replicaAssignment.mkString(","), controllerContext.liveBrokerIds)
 9         stateChangeLogger.error("Controller %d epoch %d ".format(controllerId, controller.epoch) + failMsg)
10         throw new StateChangeFailedException(failMsg)
11       case _ =>
12         debug("Live assigned replicas for partition %s are: [%s]".format(topicAndPartition, liveAssignedReplicas))
13         // make the first replica in the list of assigned replicas, the leader
14         val leader = liveAssignedReplicas.head
15         val leaderIsrAndControllerEpoch = new LeaderIsrAndControllerEpoch(new LeaderAndIsr(leader, liveAssignedReplicas.toList),
16           controller.epoch)
17         debug("Initializing leader and isr for partition %s to %s".format(topicAndPartition, leaderIsrAndControllerEpoch))
18         try {
19           zkUtils.createPersistentPath(
20             getTopicPartitionLeaderAndIsrPath(topicAndPartition.topic, topicAndPartition.partition),
21             zkUtils.leaderAndIsrZkData(leaderIsrAndControllerEpoch.leaderAndIsr, controller.epoch))
22           // NOTE: the above write can fail only if the current controller lost its zk session and the new controller
23           // took over and initialized this partition. This can happen if the current controller went into a long
24           // GC pause
25           controllerContext.partitionLeadershipInfo.put(topicAndPartition, leaderIsrAndControllerEpoch)
26           // 更新leaderAndIsrRequestMap和更新updateMetadataRequestPartitionInfoMap,这两个
27           // 是用来后面的server端处理leaderAndIsr请求用的,只给tp的相关brokers添加LeaderAndIsrRequest
28           brokerRequestBatch.addLeaderAndIsrRequestForBrokers(liveAssignedReplicas, topicAndPartition.topic,
29             topicAndPartition.partition, leaderIsrAndControllerEpoch, replicaAssignment)
30         } catch {
31           case _: ZkNodeExistsException =>
32             // read the controller epoch
33             val leaderIsrAndEpoch = ReplicationUtils.getLeaderIsrAndEpochForPartition(zkUtils, topicAndPartition.topic,
34               topicAndPartition.partition).get
35             val failMsg = ("encountered error while changing partition %s‘s state from New to Online since LeaderAndIsr path already " +
36                            "exists with value %s and controller epoch %d")
37                              .format(topicAndPartition, leaderIsrAndEpoch.leaderAndIsr.toString(), leaderIsrAndEpoch.controllerEpoch)
38             stateChangeLogger.error("Controller %d epoch %d ".format(controllerId, controller.epoch) + failMsg)
39             throw new StateChangeFailedException(failMsg)
40         }
41     }
42   }

首先是获取了该partition的replica分配,这正是从controllerContext.partitionReplicaAssignment里得到,与mark1呼应。在创建topic时replica的选举leader也很简单,就是对一个partition的replica中选第一个作为leader,所有在存活brokers上的relica都默认进isr队列,并且记录当前controller的epoch,这个值大有用处,在后面会有用到,暂且记作mark2。然后就是在zk上leader和isr的相关数据,并且更新到controllerContext.partitionLeadershipInfo,最后就是为该partition的所有replica相关的brokers添加LeaderAndIsrRequest:

 1 def addLeaderAndIsrRequestForBrokers(brokerIds: Seq[Int], topic: String, partition: Int,
 2                                        leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch,
 3                                        replicas: Seq[Int], callback: AbstractResponse => Unit = null) {
 4     val topicPartition = new TopicPartition(topic, partition)
 5
 6     brokerIds.filter(_ >= 0).foreach { brokerId =>
 7       val result = leaderAndIsrRequestMap.getOrElseUpdate(brokerId, mutable.Map.empty)
 8       result.put(topicPartition, PartitionStateInfo(leaderIsrAndControllerEpoch, replicas.toSet))
 9     }
10
11     // 更新updateMetadataRequestPartitionInfoMap,需要更新meta data的那些tp的request,可以看到给所有存活的brokers都添加
12     addUpdateMetadataRequestForBrokers(controllerContext.liveOrShuttingDownBrokerIds.toSeq,
13                                        Set(TopicAndPartition(topic, partition)))
14   }
 1 /** Send UpdateMetadataRequest to the given brokers for the given partitions and partitions that are being deleted */
 2   def addUpdateMetadataRequestForBrokers(brokerIds: Seq[Int],
 3                                          partitions: collection.Set[TopicAndPartition] = Set.empty[TopicAndPartition],
 4                                          callback: AbstractResponse => Unit = null) {
 5     def updateMetadataRequestPartitionInfo(partition: TopicAndPartition, beingDeleted: Boolean) {
 6       val leaderIsrAndControllerEpochOpt = controllerContext.partitionLeadershipInfo.get(partition)
 7       leaderIsrAndControllerEpochOpt match {
 8         case Some(leaderIsrAndControllerEpoch) =>
 9           val replicas = controllerContext.partitionReplicaAssignment(partition).toSet
10           val partitionStateInfo = if (beingDeleted) {
11             val leaderAndIsr = new LeaderAndIsr(LeaderAndIsr.LeaderDuringDelete, leaderIsrAndControllerEpoch.leaderAndIsr.isr)
12             PartitionStateInfo(LeaderIsrAndControllerEpoch(leaderAndIsr, leaderIsrAndControllerEpoch.controllerEpoch), replicas)
13           } else {
14             PartitionStateInfo(leaderIsrAndControllerEpoch, replicas)
15           }
16           updateMetadataRequestPartitionInfoMap.put(new TopicPartition(partition.topic, partition.partition), partitionStateInfo)
17         // replica由不存在转为new时
18         case None =>
19           info("Leader not yet assigned for partition %s. Skip sending UpdateMetadataRequest.".format(partition))
20       }
21     }
22
23     val filteredPartitions = {
24       val givenPartitions = if (partitions.isEmpty)
25         controllerContext.partitionLeadershipInfo.keySet
26       else
27         partitions
28       if (controller.deleteTopicManager.partitionsToBeDeleted.isEmpty)
29         givenPartitions
30       else
31         givenPartitions -- controller.deleteTopicManager.partitionsToBeDeleted
32     }
33
34     // updateMetadataRequest会给所有存活的broker发送
35     updateMetadataRequestBrokerSet ++= brokerIds.filter(_ >= 0)
36     filteredPartitions.foreach(partition => updateMetadataRequestPartitionInfo(partition, beingDeleted = false))
37     controller.deleteTopicManager.partitionsToBeDeleted.foreach(partition => updateMetadataRequestPartitionInfo(partition, beingDeleted = true))
38   }

可以看到添加的请求有两种:LeaderAndIsrRequest和updateMetadataRequest,它们实质上都更新的是leaderAndIsrRequestMap和updateMetadataRequestPartitionInfoMap,因为最后给server端发送的request都是从这两个集合里取的,还有一点就是,LeaderAndIsrRequest只会给该partition的相应replica的brokerids添加,而updateMetadataRequest会给所有的存活的brokers添加,换句话说就是LeaderAndIsr的请求会发给相关的broker处理,updateMetadata的请求会发给所有broker的更新。当相应的request的map包装完添加到相应的broker上之后就可以发送请求给server端处理了,目光回到partition的handleStateChanges:

 1 def handleStateChanges(partitions: Set[TopicAndPartition], targetState: PartitionState,
 2                          leaderSelector: PartitionLeaderSelector = noOpPartitionLeaderSelector,
 3                          callbacks: Callbacks = (new CallbackBuilder).build) {
 4     debug("Invoking state change to %s for partitions %s".format(targetState, partitions.mkString(",")))
 5     try {
 6       brokerRequestBatch.newBatch()
 7       partitions.foreach { topicAndPartition =>
 8         // 给每个tp改了状态,分好了leader和isr,还写了zk,为下面的给server端发leaderAndIsr请求和update metadata请求做铺垫
 9         handleStateChange(topicAndPartition.topic, topicAndPartition.partition, targetState, leaderSelector, callbacks)
10       }
11       // controller给brokers发leaderAndIsr请求和update metadata请求
12       brokerRequestBatch.sendRequestsToBrokers(controller.epoch)
13     }catch {
14       case e: Throwable => error("Error while moving some partitions to %s state".format(targetState), e)
15       // TODO: It is not enough to bail out and log an error, it is important to trigger state changes for those partitions
16     }
17   }

brokerRequestBatch.sendRequestsToBrokers(controller.epoch)完成的就是发送给server端处理的过程,进去看一下:

  1 def sendRequestsToBrokers(controllerEpoch: Int) {
  2     try {
  3       leaderAndIsrRequestMap.foreach { case (broker, partitionStateInfos) =>
  4         partitionStateInfos.foreach { case (topicPartition, state) =>
  5           val typeOfRequest = if (broker == state.leaderIsrAndControllerEpoch.leaderAndIsr.leader) "become-leader" else "become-follower"
  6           stateChangeLogger.trace(("Controller %d epoch %d sending %s LeaderAndIsr request %s to broker %d " +
  7                                    "for partition [%s,%d]").format(controllerId, controllerEpoch, typeOfRequest,
  8                                                                    state.leaderIsrAndControllerEpoch, broker,
  9                                                                    topicPartition.topic, topicPartition.partition))
 10         }
 11         val leaderIds = partitionStateInfos.map(_._2.leaderIsrAndControllerEpoch.leaderAndIsr.leader).toSet
 12         val leaders = controllerContext.liveOrShuttingDownBrokers.filter(b => leaderIds.contains(b.id)).map {
 13           _.getNode(controller.config.interBrokerListenerName)
 14         }
 15         val partitionStates = partitionStateInfos.map { case (topicPartition, partitionStateInfo) =>
 16           val LeaderIsrAndControllerEpoch(leaderIsr, controllerEpoch) = partitionStateInfo.leaderIsrAndControllerEpoch
 17           val partitionState = new requests.PartitionState(controllerEpoch, leaderIsr.leader,
 18             leaderIsr.leaderEpoch, leaderIsr.isr.map(Integer.valueOf).asJava, leaderIsr.zkVersion,
 19             partitionStateInfo.allReplicas.map(Integer.valueOf).asJava)
 20           topicPartition -> partitionState
 21         }
 22         val leaderAndIsrRequest = new LeaderAndIsrRequest.
 23             Builder(controllerId, controllerEpoch, partitionStates.asJava, leaders.asJava)
 24         // leaderAndIsrRequest只会给涉及到的brokers发送
 25         controller.sendRequest(broker, ApiKeys.LEADER_AND_ISR, leaderAndIsrRequest, null)
 26       }
 27       leaderAndIsrRequestMap.clear()
 28
 29       // 必须先处理leaderAndIsrRequest,再处理updateMetadataRequest
 30       updateMetadataRequestPartitionInfoMap.foreach(p => stateChangeLogger.trace(("Controller %d epoch %d sending UpdateMetadata request %s " +
 31         "to brokers %s for partition %s").format(controllerId, controllerEpoch, p._2.leaderIsrAndControllerEpoch,
 32         updateMetadataRequestBrokerSet.toString(), p._1)))
 33       val partitionStates = updateMetadataRequestPartitionInfoMap.map { case (topicPartition, partitionStateInfo) =>
 34         val LeaderIsrAndControllerEpoch(leaderIsr, controllerEpoch) = partitionStateInfo.leaderIsrAndControllerEpoch
 35         val partitionState = new requests.PartitionState(controllerEpoch, leaderIsr.leader,
 36           leaderIsr.leaderEpoch, leaderIsr.isr.map(Integer.valueOf).asJava, leaderIsr.zkVersion,
 37           partitionStateInfo.allReplicas.map(Integer.valueOf).asJava)
 38         topicPartition -> partitionState
 39       }
 40
 41       val version: Short =
 42         if (controller.config.interBrokerProtocolVersion >= KAFKA_0_10_2_IV0) 3
 43         else if (controller.config.interBrokerProtocolVersion >= KAFKA_0_10_0_IV1) 2
 44         else if (controller.config.interBrokerProtocolVersion >= KAFKA_0_9_0) 1
 45         else 0
 46
 47       val updateMetadataRequest = {
 48         val liveBrokers = if (version == 0) {
 49           // Version 0 of UpdateMetadataRequest only supports PLAINTEXT.
 50           controllerContext.liveOrShuttingDownBrokers.map { broker =>
 51             val securityProtocol = SecurityProtocol.PLAINTEXT
 52             val listenerName = ListenerName.forSecurityProtocol(securityProtocol)
 53             val node = broker.getNode(listenerName)
 54             val endPoints = Seq(new EndPoint(node.host, node.port, securityProtocol, listenerName))
 55             new UpdateMetadataRequest.Broker(broker.id, endPoints.asJava, broker.rack.orNull)
 56           }
 57         } else {
 58           controllerContext.liveOrShuttingDownBrokers.map { broker =>
 59             val endPoints = broker.endPoints.map { endPoint =>
 60               new UpdateMetadataRequest.EndPoint(endPoint.host, endPoint.port, endPoint.securityProtocol, endPoint.listenerName)
 61             }
 62             new UpdateMetadataRequest.Broker(broker.id, endPoints.asJava, broker.rack.orNull)
 63           }
 64         }
 65         new UpdateMetadataRequest.Builder(
 66           controllerId, controllerEpoch, partitionStates.asJava, liveBrokers.asJava).
 67           setVersion(version)
 68       }
 69
 70       // 给所有存活的brokers发updateMetadataRequest
 71       updateMetadataRequestBrokerSet.foreach { broker =>
 72         controller.sendRequest(broker, ApiKeys.UPDATE_METADATA_KEY, updateMetadataRequest, null)
 73       }
 74       updateMetadataRequestBrokerSet.clear()
 75       updateMetadataRequestPartitionInfoMap.clear()
 76
 77       stopReplicaRequestMap.foreach { case (broker, replicaInfoList) =>
 78         val stopReplicaWithDelete = replicaInfoList.filter(_.deletePartition).map(_.replica).toSet
 79         val stopReplicaWithoutDelete = replicaInfoList.filterNot(_.deletePartition).map(_.replica).toSet
 80         debug("The stop replica request (delete = true) sent to broker %d is %s"
 81           .format(broker, stopReplicaWithDelete.mkString(",")))
 82         debug("The stop replica request (delete = false) sent to broker %d is %s"
 83           .format(broker, stopReplicaWithoutDelete.mkString(",")))
 84
 85         val (replicasToGroup, replicasToNotGroup) = replicaInfoList.partition(r => !r.deletePartition && r.callback == null)
 86
 87         // Send one StopReplicaRequest for all partitions that require neither delete nor callback. This potentially
 88         // changes the order in which the requests are sent for the same partitions, but that‘s OK.
 89         val stopReplicaRequest = new StopReplicaRequest.Builder(controllerId, controllerEpoch, false,
 90           replicasToGroup.map(r => new TopicPartition(r.replica.topic, r.replica.partition)).toSet.asJava)
 91         controller.sendRequest(broker, ApiKeys.STOP_REPLICA, stopReplicaRequest)
 92
 93         replicasToNotGroup.foreach { r =>
 94           val stopReplicaRequest = new StopReplicaRequest.Builder(
 95               controllerId, controllerEpoch, r.deletePartition,
 96               Set(new TopicPartition(r.replica.topic, r.replica.partition)).asJava)
 97           controller.sendRequest(broker, ApiKeys.STOP_REPLICA, stopReplicaRequest, r.callback)
 98         }
 99       }
100       stopReplicaRequestMap.clear()
101     } catch {
102       case e: Throwable =>
103         if (leaderAndIsrRequestMap.nonEmpty) {
104           error("Haven‘t been able to send leader and isr requests, current state of " +
105               s"the map is $leaderAndIsrRequestMap. Exception message: $e")
106         }
107         if (updateMetadataRequestBrokerSet.nonEmpty) {
108           error(s"Haven‘t been able to send metadata update requests to brokers $updateMetadataRequestBrokerSet, " +
109                 s"current state of the partition info is $updateMetadataRequestPartitionInfoMap. Exception message: $e")
110         }
111         if (stopReplicaRequestMap.nonEmpty) {
112           error("Haven‘t been able to send stop replica requests, current state of " +
113               s"the map is $stopReplicaRequestMap. Exception message: $e")
114         }
115         throw new IllegalStateException(e)
116     }
117   }

代码虽多,但核心就是对上面的两个集合leaderAndIsrRequestMap和updateMetadataRequestPartitionInfoMap进行取元素发请求,然后跳到server端先看下leaderAndIsrRequest是如何处理的:

 1 def handleLeaderAndIsrRequest(request: RequestChannel.Request) {
 2     // ensureTopicExists is only for client facing requests
 3     // We can‘t have the ensureTopicExists check here since the controller sends it as an advisory to all brokers so they
 4     // stop serving data to clients for the topic being deleted
 5     val correlationId = request.header.correlationId
 6     val leaderAndIsrRequest = request.body.asInstanceOf[LeaderAndIsrRequest]
 7
 8     try {
 9       // 如果是__consumer_offsets相关,还需要处理consumer group的数据迁移,包括log数据的迁移和group metadata的缓存迁移
10       // log数据迁移已经在make leader和make follower中ready,所以这部分主要做group metadata的缓存迁移
11       def onLeadershipChange(updatedLeaders: Iterable[Partition], updatedFollowers: Iterable[Partition]) {
12         // for each new leader or follower, call coordinator to handle consumer group migration.
13         // this callback is invoked under the replica state change lock to ensure proper order of
14         // leadership changes
15         updatedLeaders.foreach { partition =>
16           LimitTopicsManager.checkAndPutData(partition.topic)
17           if (partition.topic == Topic.GroupMetadataTopicName)
18             coordinator.handleGroupImmigration(partition.partitionId)
19         }
20         updatedFollowers.foreach { partition =>
21           LimitTopicsManager.checkAndPutData(partition.topic)
22           if (partition.topic == Topic.GroupMetadataTopicName)
23             coordinator.handleGroupEmigration(partition.partitionId)
24         }
25       }
26
27       val leaderAndIsrResponse =
28         if (authorize(request.session, ClusterAction, Resource.ClusterResource)) {
29           val result = replicaManager.becomeLeaderOrFollower(correlationId, leaderAndIsrRequest, metadataCache, onLeadershipChange)
30           new LeaderAndIsrResponse(result.errorCode, result.responseMap.mapValues(new JShort(_)).asJava)
31         } else {
32           val result = leaderAndIsrRequest.partitionStates.asScala.keys.map((_, new JShort(Errors.CLUSTER_AUTHORIZATION_FAILED.code))).toMap
33           new LeaderAndIsrResponse(Errors.CLUSTER_AUTHORIZATION_FAILED.code, result.asJava)
34         }
35
36       requestChannel.sendResponse(new Response(request, leaderAndIsrResponse))
37     } catch {
38       case e: KafkaStorageException =>
39         fatal("Disk error during leadership change.", e)
40         Runtime.getRuntime.halt(1)
41     }
42   }

 1 def becomeLeaderOrFollower(correlationId: Int,leaderAndISRRequest: LeaderAndIsrRequest,
 2                              metadataCache: MetadataCache,
 3                              onLeadershipChange: (Iterable[Partition], Iterable[Partition]) => Unit): BecomeLeaderOrFollowerResult = {
 4     info("becomeLeaderOrFollower correlationId=" + correlationId + " leaderAndISRRequest=" + leaderAndISRRequest)
 5     leaderAndISRRequest.partitionStates.asScala.foreach { case (topicPartition, stateInfo) =>
 6       stateChangeLogger.trace("Broker %d received LeaderAndIsr request %s correlation id %d from controller %d epoch %d for partition [%s,%d]"
 7                                 .format(localBrokerId, stateInfo, correlationId,
 8                                         leaderAndISRRequest.controllerId, leaderAndISRRequest.controllerEpoch, topicPartition.topic, topicPartition.partition))
 9     }
10     replicaStateChangeLock synchronized {
11       val responseMap = new mutable.HashMap[TopicPartition, Short]
12       // 请求里的controller leader epoch小于当前的值,说明是无效的
13       if (leaderAndISRRequest.controllerEpoch < controllerEpoch) {
14         stateChangeLogger.warn(("Broker %d ignoring LeaderAndIsr request from controller %d with correlation id %d since " +
15           "its controller epoch %d is old. Latest known controller epoch is %d").format(localBrokerId, leaderAndISRRequest.controllerId,
16           correlationId, leaderAndISRRequest.controllerEpoch, controllerEpoch))
17         BecomeLeaderOrFollowerResult(responseMap, Errors.STALE_CONTROLLER_EPOCH.code)
18       } else {
19         val controllerId = leaderAndISRRequest.controllerId
20         controllerEpoch = leaderAndISRRequest.controllerEpoch
21
22         // First check partition‘s leader epoch
23         val partitionState = new mutable.HashMap[Partition, PartitionState]()
24         leaderAndISRRequest.partitionStates.asScala.foreach { case (topicPartition, stateInfo) =>
25           val partition = getOrCreatePartition(topicPartition)
26           // 当前内存里获取到的tp leader的epoch值,初始是0-1=-1
27           val partitionLeaderEpoch = partition.getLeaderEpoch
28           // If the leader epoch is valid record the epoch of the controller that made the leadership decision.
29           // This is useful while updating the isr to maintain the decision maker controller‘s epoch in the zookeeper path
30           // 请求里的tp leader的epoch值要大于当前值才有效,并且只留下本broker的tp
31           if (partitionLeaderEpoch < stateInfo.leaderEpoch) {
32             if(stateInfo.replicas.contains(localBrokerId))
33               partitionState.put(partition, stateInfo)
34             else {
35               stateChangeLogger.warn(("Broker %d ignoring LeaderAndIsr request from controller %d with correlation id %d " +
36                 "epoch %d for partition [%s,%d] as itself is not in assigned replica list %s")
37                 .format(localBrokerId, controllerId, correlationId, leaderAndISRRequest.controllerEpoch,
38                   topicPartition.topic, topicPartition.partition, stateInfo.replicas.asScala.mkString(",")))
39               responseMap.put(topicPartition, Errors.UNKNOWN_TOPIC_OR_PARTITION.code)
40             }
41           } else {
42             // Otherwise record the error code in response
43             stateChangeLogger.warn(("Broker %d ignoring LeaderAndIsr request from controller %d with correlation id %d " +
44               "epoch %d for partition [%s,%d] since its associated leader epoch %d is not higher than the current leader epoch %d")
45               .format(localBrokerId, controllerId, correlationId, leaderAndISRRequest.controllerEpoch,
46                 topicPartition.topic, topicPartition.partition, stateInfo.leaderEpoch, partitionLeaderEpoch))
47             responseMap.put(topicPartition, Errors.STALE_CONTROLLER_EPOCH.code)
48           }
49         }
50
51         val partitionsTobeLeader = partitionState.filter { case (_, stateInfo) =>
52           stateInfo.leader == localBrokerId
53         }
54         val partitionsToBeFollower = partitionState -- partitionsTobeLeader.keys
55
56         val partitionsBecomeLeader = if (partitionsTobeLeader.nonEmpty)
57           makeLeaders(controllerId, controllerEpoch, partitionsTobeLeader, correlationId, responseMap)
58         else
59           Set.empty[Partition]
60         val partitionsBecomeFollower = if (partitionsToBeFollower.nonEmpty)
61           makeFollowers(controllerId, controllerEpoch, partitionsToBeFollower, correlationId, responseMap, metadataCache)
62         else
63           Set.empty[Partition]
64
65         // we initialize highwatermark thread after the first leaderisrrequest. This ensures that all the partitions
66         // have been completely populated before starting the checkpointing there by avoiding weird race conditions
67         if (!hwThreadInitialized) {
68           startHighWaterMarksCheckPointThread()
69           hwThreadInitialized = true
70         }
71         // make完leaders和followers后,清理掉无用的fetch threads
72         replicaFetcherManager.shutdownIdleFetcherThreads()
73
74         // 处理数据迁移
75         onLeadershipChange(partitionsBecomeLeader, partitionsBecomeFollower)
76         BecomeLeaderOrFollowerResult(responseMap, Errors.NONE.code)
77       }
78     }
79   }

进入到becomeLeaderOrFollower方法中,首先判断了请求中的controllerEpoch的值与当前值的大小关系,这个用处与mark2呼应,如果请求的值小于当前值,那么说明这个controller发来的请求作废,因为controllerEpoch代表的是controller的版本,每当controller选主都会更新该值加1,如果请求里的该标记值小于内存中的当前值,则说明发送请求的controller是过期的。epoch值有效的话就直接更新controllerEpoch,然后再对所有partition做如下操作:将该tp加到allPartitions中,获取该tp的leaderEpoch值,作用同controller的epoch值一样,其次过滤出有副本在本broker上的所有tp,然后在这些tp中,找出tp的leaderid等于该brokerid的所有tp,表示这些tp将在该broker上完成makeLeader的过程,同理剩余tp将完成makeFollower的过程。当makeLeader和makeFollower完成之后,就需要清理下无用的fetch线程,如果涉及到的topic是__consumer_offsets,那么还需要完成相关的数据迁移。接下来重点讨论下makeLeaders和makeFollowers的过程,首先是makeLeaders:

 1 private def makeLeaders(controllerId: Int,
 2                           epoch: Int,
 3                           partitionState: Map[Partition, PartitionState],
 4                           correlationId: Int,
 5                           responseMap: mutable.Map[TopicPartition, Short]): Set[Partition] = {
 6     info("makeLeaders controllerId=" + controllerId + " epoch=" + epoch + " correlationId=" + correlationId)
 7     partitionState.keys.foreach { partition =>
 8       stateChangeLogger.trace(("Broker %d handling LeaderAndIsr request correlationId %d from controller %d epoch %d " +
 9         "starting the become-leader transition for partition %s")
10         .format(localBrokerId, correlationId, controllerId, epoch, partition.topicPartition))
11     }
12
13     for (partition <- partitionState.keys)
14       responseMap.put(partition.topicPartition, Errors.NONE.code)
15
16     val partitionsToMakeLeaders: mutable.Set[Partition] = mutable.Set()
17
18     try {
19       // First stop fetchers for all the partitions
20       // 要成为leader,首先停掉follower的fetch线程
21       replicaFetcherManager.removeFetcherForPartitions(partitionState.keySet.map(_.topicPartition))
22       // Update the partition information to be the leader
23       partitionState.foreach{ case (partition, partitionStateInfo) =>
24         // make leader的行为
25         if (partition.makeLeader(controllerId, partitionStateInfo, correlationId))
26           partitionsToMakeLeaders += partition
27         else
28           stateChangeLogger.info(("Broker %d skipped the become-leader state change after marking its partition as leader with correlation id %d from " +
29             "controller %d epoch %d for partition %s since it is already the leader for the partition.")
30             .format(localBrokerId, correlationId, controllerId, epoch, partition.topicPartition))
31       }
32       partitionsToMakeLeaders.foreach { partition =>
33         stateChangeLogger.trace(("Broker %d stopped fetchers as part of become-leader request from controller " +
34           "%d epoch %d with correlation id %d for partition %s")
35           .format(localBrokerId, controllerId, epoch, correlationId, partition.topicPartition))
36       }
37
38       // Init partition newOffsetMetaDataMap for Leader
39       val metaData = new mutable.HashMap[TopicPartition, NewOffsetMetaData]
40       partitionsToMakeLeaders.map{ partition =>
41         val leo: Long = partition.leaderReplicaIfLocal.get.logEndOffset.messageOffset
42         val lst: Long = partition.logManager.getLog(partition.topicPartition).get.segments.firstEntry().getValue.log.creationTime()
43         val let: Long = partition.logManager.getLog(partition.topicPartition).get.segments.lastEntry().getValue.log.file.lastModified()
44         val lso: Long = partition.leaderReplicaIfLocal.get.logStartOffset
45         metaData.put(partition.topicPartition, new NewOffsetMetaData(partition.leaderReplicaIdOpt.get, leo, lst, let, lso))
46       }
47
48       info("makeLeaders updateNewOffsetMetaData broker=" + localBrokerId + " metaData=" + metaData)
49       updateNewOffsetMetaData(localBrokerId, metaData)
50     } catch {
51       case e: Throwable =>
52         partitionState.keys.foreach { partition =>
53           val errorMsg = ("Error on broker %d while processing LeaderAndIsr request correlationId %d received from controller %d" +
54             " epoch %d for partition %s").format(localBrokerId, correlationId, controllerId, epoch, partition.topicPartition)
55           stateChangeLogger.error(errorMsg, e)
56         }
57         // Re-throw the exception for it to be caught in KafkaApis
58         throw e
59     }
60
61     partitionState.keys.foreach { partition =>
62       stateChangeLogger.trace(("Broker %d completed LeaderAndIsr request correlationId %d from controller %d epoch %d " +
63         "for the become-leader transition for partition %s")
64         .format(localBrokerId, correlationId, controllerId, epoch, partition.topicPartition))
65     }
66
67     partitionsToMakeLeaders
68   }

这里一共有两步:既然要成为leader,首先停掉follower的fetch线程,其次就是更新partition信息,而更新partition信息的具体过程就在makeLeader中,这里实现的是一个partition成为leader时要做的工作,具体如下:

 1 /**
 2    * Make the local replica the leader by resetting LogEndOffset for remote replicas (there could be old LogEndOffset
 3    * from the time when this broker was the leader last time) and setting the new leader and ISR.
 4    * If the leader replica id does not change, return false to indicate the replica manager.
 5    */
 6   def makeLeader(controllerId: Int, partitionStateInfo: PartitionState, correlationId: Int): Boolean = {
 7     val (leaderHWIncremented, isNewLeader) = inWriteLock(leaderIsrUpdateLock) {
 8       val allReplicas = partitionStateInfo.replicas.asScala.map(_.toInt)
 9       // record the epoch of the controller that made the leadership decision. This is useful while updating the isr
10       // to maintain the decision maker controller‘s epoch in the zookeeper path
11       // 维护最新的controllerEpoch,方便在特殊情况下出现多leader controller时做决策
12       controllerEpoch = partitionStateInfo.controllerEpoch
13       // add replicas that are new
14       // 重新make leader replica时,更新offset到hw值(截断至hw),放到assignedReplicas中,如果是local broker的
15       // replica,还会附带log数据
16       allReplicas.foreach(replica => getOrCreateReplica(replica))
17       val newInSyncReplicas = partitionStateInfo.isr.asScala.map(r => getOrCreateReplica(r)).toSet
18       // remove assigned replicas that have been removed by the controller
19       // 此tp之前在本broker上的的follower replica都要移除,oar+rar-rar,要删除oar
20       (assignedReplicas.map(_.brokerId) -- allReplicas).foreach(removeReplica)
21       inSyncReplicas = newInSyncReplicas
22       leaderEpoch = partitionStateInfo.leaderEpoch
23       zkVersion = partitionStateInfo.zkVersion
24       val isNewLeader =
25         if (leaderReplicaIdOpt.isDefined && leaderReplicaIdOpt.get == localBrokerId) {
26           false
27         } else {
28           // 主副本编号不存在或现主副本编号与原主副本编号不一致(备份副本转为主副本)
29           info("makeLeader true, before leaderReplicaIdOpt=" + leaderReplicaIdOpt +
30             " after leaderReplicaIdOpt=" + localBrokerId + " topic="+ topicPartition)
31           leaderReplicaIdOpt = Some(localBrokerId)
32           true
33         }
34       val leaderReplica = getReplica().get
35       val curLeaderLogEndOffset = leaderReplica.logEndOffset.messageOffset
36       val curTimeMs = time.milliseconds
37       // initialize lastCaughtUpTime of replicas as well as their lastFetchTimeMs and lastFetchLeaderLogEndOffset.
38       // 对其余replica reset最近的进队时间
39       (assignedReplicas - leaderReplica).foreach { replica =>
40         val lastCaughtUpTimeMs = if (inSyncReplicas.contains(replica)) curTimeMs else 0L
41         replica.resetLastCaughtUpTime(curLeaderLogEndOffset, curTimeMs, lastCaughtUpTimeMs)
42       }
43       // we may need to increment high watermark since ISR could be down to 1
44       if (isNewLeader) {
45         // construct the high watermark metadata for the new leader replica
46         leaderReplica.convertHWToLocalOffsetMetadata()
47         // reset log end offset for remote replicas
48         // 由于是新leader,需要重置下远程replica的几个offset值
49         assignedReplicas.filter(_.brokerId != localBrokerId).foreach(_.updateLogReadResult(LogReadResult.UnknownLogReadResult))
50       }
51       // 比较现有replica的leo值,看是否更新leader的hw值
52       (maybeIncrementLeaderHW(leaderReplica), isNewLeader)
53     }
54     // some delayed operations may be unblocked after HW changed
55     if (leaderHWIncremented)
56       // hw更新后,需要尝试完成fetch和完成同步response
57       tryCompleteDelayedRequests()
58     isNewLeader
59   }

具体controllerEpoch的使用同上面所说,接下来就是对该partition的所有replica进行创建,对于本broker的replica(也就是leader),需要更新offset到hw值(截断至hw),放到assignedReplicas中,还会附带log数据,而不在本broker的replica,也就是follower,只会建立一个远程副本。具体创建的实现如下:

 1 def getOrCreateReplica(replicaId: Int = localBrokerId): Replica = {
 2     assignedReplicaMap.getAndMaybePut(replicaId, {
 3       if (isReplicaLocal(replicaId)) {
 4         val config = LogConfig.fromProps(logManager.defaultConfig.originals,
 5                                          AdminUtils.fetchEntityConfig(zkUtils, ConfigType.Topic, topic))
 6         val log = logManager.createLog(topicPartition, config)
 7         val checkpoint = replicaManager.highWatermarkCheckpoints(log.dir.getParentFile.getAbsolutePath)
 8         val offsetMap = checkpoint.read
 9         if (!offsetMap.contains(topicPartition))
10           info(s"No checkpointed highwatermark is found for partition $topicPartition")
11         val offset = math.min(offsetMap.getOrElse(topicPartition, 0L), log.logEndOffset)
12         new Replica(replicaId, this, time, offset, Some(log))
13       } else new Replica(replicaId, this, time) // 这是在本broker中建远程副本
14     })
15   }

回到makeLeader中来,所有的replica创建完之后,然后将之前旧的replica移出,具体就是assignedReplicas包含的是之前的oar加上了新的rar,allReplicas就是新的rar,所以oar+rar-rar=oar,也就是就的replica集合要被删除。然后更新该tp的isr和leaderEpoch信息,值得注意的是,这里的leaderEpoch的使用同controller的LeaderEpoch。然后判断选出来的leader是不是新leader,因为可能存在之前该tp的leader就在该broker上,也就是前后一致,这样就不是新选出的leader。之后就是对isr里的其它replica重置进队时间(当前时间),如果是新leader的情况,还需要重置下远程replica的几个offset值。然后比较现有replica的leo值,看是否更新leader的hw值,如果leader更新了leo值,那么之前的delay fetch和完成客户端对发送消息的response,这两个delay operation的具体实现读者可以自行深入研究,由于篇幅有限,不做展开。到这里makeLeader中更新partition信息就完成了,makeLeaders也就完成了,本broker上的所有leaders已经处理完成,接下来就是处理makeFollowers了,进去看下具体实现:

  1 private def makeFollowers(controllerId: Int,
  2                             epoch: Int,
  3                             partitionState: Map[Partition, PartitionState],
  4                             correlationId: Int,
  5                             responseMap: mutable.Map[TopicPartition, Short],
  6                             metadataCache: MetadataCache) : Set[Partition] = {
  7     info("makeFollowers controllerId=" + controllerId + " epoch=" + epoch + " correlationId=" + correlationId)
  8     partitionState.keys.foreach { partition =>
  9       stateChangeLogger.trace(("Broker %d handling LeaderAndIsr request correlationId %d from controller %d epoch %d " +
 10         "starting the become-follower transition for partition %s")
 11         .format(localBrokerId, correlationId, controllerId, epoch, partition.topicPartition))
 12     }
 13
 14     for (partition <- partitionState.keys)
 15       responseMap.put(partition.topicPartition, Errors.NONE.code)
 16
 17     val partitionsToMakeFollower: mutable.Set[Partition] = mutable.Set()
 18
 19     try {
 20
 21       // TODO: Delete leaders from LeaderAndIsrRequest
 22       partitionState.foreach{ case (partition, partitionStateInfo) =>
 23         val newLeaderBrokerId = partitionStateInfo.leader
 24         metadataCache.getAliveBrokers.find(_.id == newLeaderBrokerId) match {
 25           // Only change partition state when the leader is available
 26           case Some(_) =>
 27             if (partition.makeFollower(controllerId, partitionStateInfo, correlationId))
 28               partitionsToMakeFollower += partition
 29             else
 30               stateChangeLogger.info(("Broker %d skipped the become-follower state change after marking its partition as follower with correlation id %d from " +
 31                 "controller %d epoch %d for partition %s since the new leader %d is the same as the old leader")
 32                 .format(localBrokerId, correlationId, controllerId, partitionStateInfo.controllerEpoch,
 33                 partition.topicPartition, newLeaderBrokerId))
 34           case None =>
 35             // The leader broker should always be present in the metadata cache.
 36             // If not, we should record the error message and abort the transition process for this partition
 37             stateChangeLogger.error(("Broker %d received LeaderAndIsrRequest with correlation id %d from controller" +
 38               " %d epoch %d for partition %s but cannot become follower since the new leader %d is unavailable.")
 39               .format(localBrokerId, correlationId, controllerId, partitionStateInfo.controllerEpoch,
 40               partition.topicPartition, newLeaderBrokerId))
 41             // Create the local replica even if the leader is unavailable. This is required to ensure that we include
 42             // the partition‘s high watermark in the checkpoint file (see KAFKA-1647)
 43             partition.getOrCreateReplica()
 44         }
 45       }
 46
 47       // 停掉现有的fetch
 48       replicaFetcherManager.removeFetcherForPartitions(partitionsToMakeFollower.map(_.topicPartition))
 49       partitionsToMakeFollower.foreach { partition =>
 50         stateChangeLogger.trace(("Broker %d stopped fetchers as part of become-follower request from controller " +
 51           "%d epoch %d with correlation id %d for partition %s")
 52           .format(localBrokerId, controllerId, epoch, correlationId, partition.topicPartition))
 53       }
 54
 55       // 将log截断至hw位置
 56       logManager.truncateTo(partitionsToMakeFollower.map { partition =>
 57         (partition.topicPartition, partition.getOrCreateReplica().highWatermark.messageOffset)
 58       }.toMap)
 59
 60       // 之前可能是leader,需要完成之前作为leader时的一些延时请求
 61       partitionsToMakeFollower.foreach { partition =>
 62           val topicPartitionOperationKey = new TopicPartitionOperationKey(partition.topicPartition)
 63           tryCompleteDelayedProduce(topicPartitionOperationKey)
 64           tryCompleteDelayedFetch(topicPartitionOperationKey)
 65         }
 66
 67       partitionsToMakeFollower.foreach { partition =>
 68         stateChangeLogger.trace(("Broker %d truncated logs and checkpointed recovery boundaries for partition %s as part of " +
 69           "become-follower request with correlation id %d from controller %d epoch %d").format(localBrokerId,
 70           partition.topicPartition, correlationId, controllerId, epoch))
 71       }
 72
 73       if (isShuttingDown.get()) {
 74         partitionsToMakeFollower.foreach { partition =>
 75           stateChangeLogger.trace(("Broker %d skipped the adding-fetcher step of the become-follower state change with correlation id %d from " +
 76             "controller %d epoch %d for partition %s since it is shutting down").format(localBrokerId, correlationId,
 77             controllerId, epoch, partition.topicPartition))
 78         }
 79       } else {
 80         debug("makefollowers config.smartExtendEnable=" + config.smartExtendEnable)
 81         if (config.smartExtendEnable) {
 82           try {
 83             // 1. clear NewOffsetMetaData.
 84             clearNewOffsetMetaData(partitionsToMakeFollower)
 85
 86             // 2. get offset from leader.
 87             val ResponseOffsetMap = getBestOffset(partitionsToMakeFollower, metadataCache, config.getStartOffsetRetries)
 88
 89             if (ResponseOffsetMap.nonEmpty) {
 90               val partitionsToMakeFollowerWithLeaderAndOffset = partitionsToMakeFollower.map(partition =>
 91                 partition.topicPartition -> BrokerAndInitialOffset(
 92                   metadataCache.getAliveBrokers.find(_.id == partition.leaderReplicaIdOpt.get).get.getBrokerEndPoint(config.interBrokerListenerName),
 93                   ResponseOffsetMap.get(partition.topicPartition).get.baseOffset)).toMap
 94
 95               // 3. trunc log
 96               val partitionOffsets: mutable.Map[TopicPartition, Long] = mutable.Map[TopicPartition, Long]()
 97               ResponseOffsetMap.map { partition =>
 98                 if (partition._2.error == Errors.OFFSET_HW) {
 99                   partitionOffsets.put(partition._1, partition._2.baseOffset)
100                 }
101               }
102
103               if (partitionOffsets.nonEmpty) {
104                 info("makefollowers trunc log, partitionOffsets size=" + partitionOffsets.size + " ResponseOffsetMap size=" +
105                     ResponseOffsetMap.size + " partitionOffsets=" + partitionOffsets + " ResponseOffsetMap=" + ResponseOffsetMap)
106                 partitionOffsets.map { partitionInfo =>
107                   // 按照new metadata返回的offset信息截断
108                   logManager.truncateFullyAndStartAt(partitionInfo._1, partitionInfo._2)
109                 }
110               }
111               replicaFetcherManager.addFetcherForPartitions(partitionsToMakeFollowerWithLeaderAndOffset)
112             } else {
113               // 走老策略兜底
114               val partitionsToMakeFollowerWithLeaderAndOffset = partitionsToMakeFollower.map(partition =>
115                 partition.topicPartition -> BrokerAndInitialOffset(
116                   metadataCache.getAliveBrokers.find(_.id == partition.leaderReplicaIdOpt.get).get.getBrokerEndPoint(config.interBrokerListenerName),
117                   partition.getReplica().get.logEndOffset.messageOffset)).toMap
118               error("makefollowers getStartOffset fail, and use old mode partitionsToMakeFollowerWithLeaderAndOffset=" + partitionsToMakeFollowerWithLeaderAndOffset)
119               replicaFetcherManager.addFetcherForPartitions(partitionsToMakeFollowerWithLeaderAndOffset)
120             }
121           } catch {
122             case e: Exception =>
123               val partitionsToMakeFollowerWithLeaderAndOffset = partitionsToMakeFollower.map(partition =>
124                 partition.topicPartition -> BrokerAndInitialOffset(
125                   metadataCache.getAliveBrokers.find(_.id == partition.leaderReplicaIdOpt.get).get.getBrokerEndPoint(config.interBrokerListenerName),
126                   partition.getReplica().get.logEndOffset.messageOffset)).toMap
127               error("ReplicaManager makefollowers getStartOffset fail, and use old mode partitionsToMakeFollowerWithLeaderAndOffset=" + partitionsToMakeFollowerWithLeaderAndOffset
128                 + " Exception=" + e.getMessage)
129               replicaFetcherManager.addFetcherForPartitions(partitionsToMakeFollowerWithLeaderAndOffset)
130           }
131         } else {
132           //we do not need to check if the leader exists again since this has been done at the beginning of this process
133           val partitionsToMakeFollowerWithLeaderAndOffset = partitionsToMakeFollower.map(partition =>
134             partition.topicPartition -> BrokerAndInitialOffset(
135               metadataCache.getAliveBrokers.find(_.id == partition.leaderReplicaIdOpt.get).get.getBrokerEndPoint(config.interBrokerListenerName),
136               partition.getReplica().get.logEndOffset.messageOffset)).toMap
137           replicaFetcherManager.addFetcherForPartitions(partitionsToMakeFollowerWithLeaderAndOffset)
138         }
139
140         partitionsToMakeFollower.foreach { partition =>
141           stateChangeLogger.trace(("Broker %d started fetcher to new leader as part of become-follower request from controller " +
142             "%d epoch %d with correlation id %d for partition %s")
143             .format(localBrokerId, controllerId, epoch, correlationId, partition.topicPartition))
144         }
145       }
146     } catch {
147       case e: Throwable =>
148         val errorMsg = ("Error on broker %d while processing LeaderAndIsr request with correlationId %d received from controller %d " +
149           "epoch %d").format(localBrokerId, correlationId, controllerId, epoch)
150         stateChangeLogger.error(errorMsg, e)
151         // Re-throw the exception for it to be caught in KafkaApis
152         throw e
153     }
154
155     partitionState.keys.foreach { partition =>
156       stateChangeLogger.trace(("Broker %d completed LeaderAndIsr request correlationId %d from controller %d epoch %d " +
157         "for the become-follower transition for partition %s")
158         .format(localBrokerId, correlationId, controllerId, epoch, partition.topicPartition))
159     }
160
161     partitionsToMakeFollower
162   }

和makeLeaders的makeLeader一样,首先也会有一个makeFollower的操作,它的功能几乎一样,也是创建所有replica,将旧replica删除,更新该tp的isr和leaderEpoch信息,值得注意的是,如果该tp的leader还和老的tp leader一致,就不会对该broker上的replica做接下来的操作:先停掉现有的fetch线程,然后将log截断至hw的位置,可能之前是leader,所以需要该replica作为leader完成之前delay的客户端对发送消息的response和对它的fetch,最后就是用新leader的leo值重置下该replica作为新follower的初始offset值,并添加fetch线程。至此,makeFollowers的过程也结束了。

然后,回到becomeLeaderOrFollower,接下来就是清理掉无用的fetch threads,最后就是处理数据迁移。代码如下:

 1 // 如果是__consumer_offsets相关,还需要处理consumer group的数据迁移,包括log数据的迁移和group metadata的缓存迁移
 2       // log数据迁移已经在make leader和make follower中ready,所以这部分主要做group metadata的缓存迁移
 3       def onLeadershipChange(updatedLeaders: Iterable[Partition], updatedFollowers: Iterable[Partition]) {
 4         // for each new leader or follower, call coordinator to handle consumer group migration.
 5         // this callback is invoked under the replica state change lock to ensure proper order of
 6         // leadership changes
 7         updatedLeaders.foreach { partition =>
 8           LimitTopicsManager.checkAndPutData(partition.topic)
 9           if (partition.topic == Topic.GroupMetadataTopicName)
10             coordinator.handleGroupImmigration(partition.partitionId)
11         }
12         updatedFollowers.foreach { partition =>
13           LimitTopicsManager.checkAndPutData(partition.topic)
14           if (partition.topic == Topic.GroupMetadataTopicName)
15             coordinator.handleGroupEmigration(partition.partitionId)
16         }
17       }

也就是说如果涉及到的topic是内置topic __consumer_offsets的话,还需要处理数据迁移,对于在本broker上成为leader的partition,需要『迁移进来group metadata』,在本broker上成为follower的partition,需要『迁出去metadata』,具体怎么迁移的,进去看下吧,首先是handleGroupImmigration:

1 def handleGroupImmigration(offsetTopicPartitionId: Int) { 2 groupManager.loadGroupsForPartition(offsetTopicPartitionId, onGroupLoaded) 3 }

1   private def onGroupLoaded(group: GroupMetadata) {
2     group synchronized {
3       info(s"Loading group metadata for ${group.groupId} with generation ${group.generationId}")
4       assert(group.is(Stable) || group.is(Empty))
5       group.allMemberMetadata.foreach(completeAndScheduleNextHeartbeatExpiration(group, _))
6     }
7   }

 1 def loadGroupsForPartition(offsetsPartition: Int, onGroupLoaded: GroupMetadata => Unit) {
 2     val topicPartition = new TopicPartition(Topic.GroupMetadataTopicName, offsetsPartition)
 3
 4     def doLoadGroupsAndOffsets() {
 5       info(s"Loading offsets and group metadata from $topicPartition")
 6
 7       inLock(partitionLock) {
 8         if (loadingPartitions.contains(offsetsPartition)) {
 9           info(s"Offset load from $topicPartition already in progress.")
10           return
11         } else {
12           loadingPartitions.add(offsetsPartition)
13         }
14       }
15
16       try {
17         loadGroupsAndOffsets(topicPartition, onGroupLoaded)
18       } catch {
19         case t: Throwable => error(s"Error loading offsets from $topicPartition", t)
20       } finally {
21         inLock(partitionLock) {
22           ownedPartitions.add(offsetsPartition)
23           loadingPartitions.remove(offsetsPartition)
24         }
25       }
26     }
27
28     // 加载和清理meta data cache的一个线程
29     scheduler.schedule(topicPartition.toString, doLoadGroupsAndOffsets)
30   }

可以看到在这一步,将在本broker的相应的leader partition(这里操作的都是__consumer_offsets)加载到loadingPartitions中,然后执行loadGroupsAndOffsets的过程,执行完毕,将该partition放入ownedPartitions,再将其从loadingPartitions移出,这样就完成了leader partition的数据迁入过程。其中loadGroupsAndOffsets的具体实现如下:

 1 private[coordinator] def loadGroupsAndOffsets(topicPartition: TopicPartition, onGroupLoaded: GroupMetadata => Unit) {
 2     def highWaterMark = replicaManager.getHighWatermark(topicPartition).getOrElse(-1L)
 3
 4     val startMs = time.milliseconds()
 5     replicaManager.getLog(topicPartition) match {
 6       case None =>
 7         warn(s"Attempted to load offsets and group metadata from $topicPartition, but found no log")
 8
 9       case Some(log) =>
10         var currOffset = log.logStartOffset
11         val buffer = ByteBuffer.allocate(config.loadBufferSize)
12         // loop breaks if leader changes at any time during the load, since getHighWatermark is -1
13         val loadedOffsets = mutable.Map[GroupTopicPartition, OffsetAndMetadata]()
14         val removedOffsets = mutable.Set[GroupTopicPartition]()
15         val loadedGroups = mutable.Map[String, GroupMetadata]()
16         val removedGroups = mutable.Set[String]()
17
18         while (currOffset < highWaterMark && !shuttingDown.get()) {
19           buffer.clear()
20           val fileRecords = log.read(currOffset, config.loadBufferSize, maxOffset = None, minOneMessage = true)
21             .records.asInstanceOf[FileRecords]
22           val bufferRead = fileRecords.readInto(buffer, 0)
23
24           MemoryRecords.readableRecords(bufferRead).deepEntries.asScala.foreach { entry =>
25             val record = entry.record
26             require(record.hasKey, "Group metadata/offset entry key should not be null")
27
28             // 两类数据分别处理:offset数据和group元数据
29             GroupMetadataManager.readMessageKey(record.key) match {
30               case offsetKey: OffsetKey =>
31                 // load offset
32                 val key = offsetKey.key
33                 if (record.hasNullValue) {
34                   loadedOffsets.remove(key)
35                   removedOffsets.add(key)
36                 } else {
37                   val value = GroupMetadataManager.readOffsetMessageValue(record.value)
38                   loadedOffsets.put(key, value)
39                   removedOffsets.remove(key)
40                 }
41
42               case groupMetadataKey: GroupMetadataKey =>
43                 // load group metadata
44                 val groupId = groupMetadataKey.key
45                 val groupMetadata = GroupMetadataManager.readGroupMessageValue(groupId, record.value)
46                 if (groupMetadata != null) {
47                   trace(s"Loaded group metadata for group $groupId with generation ${groupMetadata.generationId}")
48                   removedGroups.remove(groupId)
49                   loadedGroups.put(groupId, groupMetadata)
50                 } else {
51                   loadedGroups.remove(groupId)
52                   removedGroups.add(groupId)
53                 }
54
55               case unknownKey =>
56                 throw new IllegalStateException(s"Unexpected message key $unknownKey while loading offsets and group metadata")
57             }
58
59             currOffset = entry.nextOffset
60           }
61         }
62
63         val (groupOffsets, emptyGroupOffsets) = loadedOffsets
64           .groupBy(_._1.group)
65           .mapValues(_.map { case (groupTopicPartition, offset) => (groupTopicPartition.topicPartition, offset)} )
66           .partition { case (group, _) => loadedGroups.contains(group) }
67
68         loadedGroups.values.foreach { group =>
69           val offsets = groupOffsets.getOrElse(group.groupId, Map.empty[TopicPartition, OffsetAndMetadata])
70           // 真正更新metadata,包括group metadata和offset
71           loadGroup(group, offsets)
72           // 更新完所有members都心跳一下,表示能读到group metadata,group处于stable状态
73           onGroupLoaded(group)
74         }
75
76         // load groups which store offsets in kafka, but which have no active members and thus no group
77         // metadata stored in the log
78         emptyGroupOffsets.foreach { case (groupId, offsets) =>
79           val group = new GroupMetadata(groupId)
80           loadGroup(group, offsets)
81           onGroupLoaded(group)
82         }
83
84         removedGroups.foreach { groupId =>
85           // if the cache already contains a group which should be removed, raise an error. Note that it
86           // is possible (however unlikely) for a consumer group to be removed, and then to be used only for
87           // offset storage (i.e. by "simple" consumers)
88           if (groupMetadataCache.contains(groupId) && !emptyGroupOffsets.contains(groupId))
89             throw new IllegalStateException(s"Unexpected unload of active group $groupId while " +
90               s"loading partition $topicPartition")
91         }
92
93         if (!shuttingDown.get())
94           info("Finished loading offsets from %s in %d milliseconds."
95             .format(topicPartition, time.milliseconds() - startMs))
96     }
97   }

整体来说,就是读取这个partition的log数据,分为两类数据分别处理:offset数据和group元数据,然后调用loadGroup来真正更新group的offset数据和metadata。其次调用onGroupLoaded使得该group的所有members都心跳一次,这个在上一篇博客coordinator中细讲过。这样leader的数据迁入就完成了。接下来是follower的数据迁出:

1 def handleGroupEmigration(offsetTopicPartitionId: Int) { 2 groupManager.removeGroupsForPartition(offsetTopicPartitionId, onGroupUnloaded) 3 }

 1 private def onGroupUnloaded(group: GroupMetadata) {
 2     group synchronized {
 3       info(s"Unloading group metadata for ${group.groupId} with generation ${group.generationId}")
 4       val previousState = group.currentState
 5       group.transitionTo(Dead)
 6       group.metrics.close()
 7
 8       previousState match {
 9         case Empty | Dead =>
10         case PreparingRebalance =>
11           for (member <- group.allMemberMetadata) {
12             if (member.awaitingJoinCallback != null) {
13               member.awaitingJoinCallback(joinError(member.memberId, Errors.NOT_COORDINATOR_FOR_GROUP.code))
14               member.awaitingJoinCallback = null
15             }
16           }
17           joinPurgatory.checkAndComplete(GroupKey(group.groupId))
18
19         case Stable | AwaitingSync =>
20           for (member <- group.allMemberMetadata) {
21             if (member.awaitingSyncCallback != null) {
22               member.awaitingSyncCallback(Array.empty[Byte], Errors.NOT_COORDINATOR_FOR_GROUP.code)
23               member.awaitingSyncCallback = null
24             }
25             heartbeatPurgatory.checkAndComplete(MemberKey(member.groupId, member.memberId))
26           }
27       }
28     }
29   }

 1 def removeGroupsForPartition(offsetsPartition: Int,
 2                                onGroupUnloaded: GroupMetadata => Unit) {
 3     val topicPartition = new TopicPartition(Topic.GroupMetadataTopicName, offsetsPartition)
 4     scheduler.schedule(topicPartition.toString, removeGroupsAndOffsets)
 5
 6     def removeGroupsAndOffsets() {
 7       var numOffsetsRemoved = 0
 8       var numGroupsRemoved = 0
 9
10       inLock(partitionLock) {
11         // we need to guard the group removal in cache in the loading partition lock
12         // to prevent coordinator‘s check-and-get-group race condition
13         ownedPartitions.remove(offsetsPartition)
14
15         for (group <- groupMetadataCache.values) {
16           if (partitionFor(group.groupId) == offsetsPartition) {
17             // 将group转为dead状态
18             onGroupUnloaded(group)
19             group.metrics.close()
20             groupMetadataCache.remove(group.groupId, group)
21             numGroupsRemoved += 1
22             numOffsetsRemoved += group.numOffsets
23           }
24         }
25       }
26
27       if (numOffsetsRemoved > 0)
28         info(s"Removed $numOffsetsRemoved cached offsets for $topicPartition on follower transition.")
29
30       if (numGroupsRemoved > 0)
31         info(s"Removed $numGroupsRemoved cached groups for $topicPartition on follower transition.")
32     }
33   }

可以看到在本broker的__consumer_offsets partition成为follower之后,需要从ownedPartitions移除,然后将属于该partition的group转为dead状态(这个通过onGroupUnloaded完成),最后将该group的metadata从groupMetadataCache中删除,这样follower的数据迁出也完成了。至此,server端的leaderAndIsr请求就完成了,回到controller那端,接下来就该处理updateMetadataRequest,上面提到过,updateMetadataRequest是发给所有存活的broker的,然后再去server端看下是怎么处理updateMetadataRequest的:

 1 def handleUpdateMetadataRequest(request: RequestChannel.Request) {
 2     val correlationId = request.header.correlationId
 3     val updateMetadataRequest = request.body.asInstanceOf[UpdateMetadataRequest]
 4
 5     val updateMetadataResponse =
 6       if (authorize(request.session, ClusterAction, Resource.ClusterResource)) {
 7         // 这一步更新了metadata cache
 8         val deletedPartitions = replicaManager.maybeUpdateMetadataCache(correlationId, updateMetadataRequest, metadataCache)
 9         if (deletedPartitions.nonEmpty)
10           // 需要从group metadata中删除
11           coordinator.handleDeletedPartitions(deletedPartitions)
12
13         // 已经更新完meta data,可以完成delayed topic operations了
14         if (adminManager.hasDelayedTopicOperations) {
15           updateMetadataRequest.partitionStates.keySet.asScala.map(_.topic).foreach { topic =>
16             adminManager.tryCompleteDelayedTopicOperations(topic)
17           }
18         }
19         new UpdateMetadataResponse(Errors.NONE.code)
20       } else {
21         new UpdateMetadataResponse(Errors.CLUSTER_AUTHORIZATION_FAILED.code)
22       }
23
24     if (config.useHdfsFunction) {
25       replicaManager.taskManager.addDiffTask()
26     }
27
28     requestChannel.sendResponse(new Response(request, updateMetadataResponse))
29   }

 1 def maybeUpdateMetadataCache(correlationId: Int, updateMetadataRequest: UpdateMetadataRequest, metadataCache: MetadataCache) : Seq[TopicPartition] =  {
 2     replicaStateChangeLock synchronized {
 3       if(updateMetadataRequest.controllerEpoch < controllerEpoch) {
 4         val stateControllerEpochErrorMessage = ("Broker %d received update metadata request with correlation id %d from an " +
 5           "old controller %d with epoch %d. Latest known controller epoch is %d").format(localBrokerId,
 6           correlationId, updateMetadataRequest.controllerId, updateMetadataRequest.controllerEpoch,
 7           controllerEpoch)
 8         stateChangeLogger.warn(stateControllerEpochErrorMessage)
 9         throw new ControllerMovedException(stateControllerEpochErrorMessage)
10       } else {
11         // 更新metadata
12         val deletedPartitions = metadataCache.updateCache(correlationId, updateMetadataRequest)
13         controllerEpoch = updateMetadataRequest.controllerEpoch
14         metadataCache.setControllerEpoch(controllerEpoch)
15         deletedPartitions
16       }
17     }
18   }

 1 def updateCache(correlationId: Int, updateMetadataRequest: UpdateMetadataRequest): Seq[TopicPartition] = {
 2     inWriteLock(partitionMetadataLock) {
 3       controllerId = updateMetadataRequest.controllerId match {
 4           case id if id < 0 => None
 5           case id => Some(id)
 6         }
 7       aliveNodes.clear()
 8       aliveBrokers.clear()
 9       updateMetadataRequest.liveBrokers.asScala.foreach { broker =>
10         // `aliveNodes` is a hot path for metadata requests for large clusters, so we use java.util.HashMap which
11         // is a bit faster than scala.collection.mutable.HashMap. When we drop support for Scala 2.10, we could
12         // move to `AnyRefMap`, which has comparable performance.
13         val nodes = new java.util.HashMap[ListenerName, Node]
14         val endPoints = new mutable.ArrayBuffer[EndPoint]
15         broker.endPoints.asScala.foreach { ep =>
16           endPoints += EndPoint(ep.host, ep.port, ep.listenerName, ep.securityProtocol)
17           nodes.put(ep.listenerName, new Node(broker.id, ep.host, ep.port))
18         }
19         aliveBrokers(broker.id) = Broker(broker.id, endPoints, Option(broker.rack))
20         aliveNodes(broker.id) = nodes.asScala
21       }
22
23       val deletedPartitions = new mutable.ArrayBuffer[TopicPartition]
24       updateMetadataRequest.partitionStates.asScala.foreach { case (tp, info) =>
25         val controllerId = updateMetadataRequest.controllerId
26         val controllerEpoch = updateMetadataRequest.controllerEpoch
27         if (info.leader == LeaderAndIsr.LeaderDuringDelete) {
28           removePartitionInfo(tp.topic, tp.partition)
29           stateChangeLogger.trace(s"Broker $brokerId deleted partition $tp from metadata cache in response to UpdateMetadata " +
30             s"request sent by controller $controllerId epoch $controllerEpoch with correlation id $correlationId")
31           deletedPartitions += tp
32         } else {
33           val partitionInfo = partitionStateToPartitionStateInfo(info)
34           // 实际更新metadata动作
35           addOrUpdatePartitionInfo(tp.topic, tp.partition, partitionInfo)
36           stateChangeLogger.trace(s"Broker $brokerId cached leader info $partitionInfo for partition $tp in response to " +
37             s"UpdateMetadata request sent by controller $controllerId epoch $controllerEpoch with correlation id $correlationId")
38         }
39       }
40       deletedPartitions
41     }
42   }

可以看到实际的更新metadata动作在addOrUpdatePartitionInfo中完成,在这几个过程中,发现如果有partition的leader处于被删除状态,还会顺便删除它的元信息,进而把它从group的metadata中删除,这就是updateMetadataRequest完成的事情,相比leaderAndIsrRequest简单很多。再回到controller端,完成了updateMetadata之后,就是stopReplicaRequestMap,由于创建topic过程不会涉及到这块,所以本文暂不深入进去。这样一来,将所有tp的state转为online上线的过程就完成了,最后就是将所有replica的state转为online上线:

 1 def handleStateChanges(replicas: Set[PartitionAndReplica], targetState: ReplicaState,
 2                          callbacks: Callbacks = (new CallbackBuilder).build) {
 3     if(replicas.nonEmpty) {
 4       debug("Invoking state change to %s for replicas %s".format(targetState, replicas.mkString(",")))
 5       try {
 6         brokerRequestBatch.newBatch()
 7         replicas.foreach(r => handleStateChange(r, targetState, callbacks))
 8         // 已经将updateMetadataRequestBrokerSet和leaderAndIsrRequestMap在tp状态机上线完成server请求后clear
 9         // 此轮什么也不做
10         brokerRequestBatch.sendRequestsToBrokers(controller.epoch)
11       }catch {
12         case e: Throwable => error("Error while moving some replicas to %s state".format(targetState), e)
13       }
14     }
15   }

还是一样的流程,先handleStateChange,再sendRequestsToBrokers,由于已经将updateMetadataRequestBrokerSet和leaderAndIsrRequestMap在tp状态机上线完成server请求后clear,所以在replica上线时sendRequestsToBrokers什么也不做,进去handleStateChange看看做了啥:

  1 def handleStateChange(partitionAndReplica: PartitionAndReplica, targetState: ReplicaState,
  2                         callbacks: Callbacks) {
  3     val topic = partitionAndReplica.topic
  4     val partition = partitionAndReplica.partition
  5     val replicaId = partitionAndReplica.replica
  6     val topicAndPartition = TopicAndPartition(topic, partition)
  7     if (!hasStarted.get)
  8       throw new StateChangeFailedException(("Controller %d epoch %d initiated state change of replica %d for partition %s " +
  9                                             "to %s failed because replica state machine has not started")
 10                                               .format(controllerId, controller.epoch, replicaId, topicAndPartition, targetState))
 11     val currState = replicaState.getOrElseUpdate(partitionAndReplica, NonExistentReplica)
 12     try {
 13       val replicaAssignment = controllerContext.partitionReplicaAssignment(topicAndPartition)
 14       targetState match {
 15         case NewReplica =>
 16           assertValidPreviousStates(partitionAndReplica, List(NonExistentReplica), targetState)
 17           // start replica as a follower to the current leader for its partition
 18           val leaderIsrAndControllerEpochOpt = ReplicationUtils.getLeaderIsrAndEpochForPartition(zkUtils, topic, partition)
 19           leaderIsrAndControllerEpochOpt match {
 20             case Some(leaderIsrAndControllerEpoch) =>
 21               if(leaderIsrAndControllerEpoch.leaderAndIsr.leader == replicaId)
 22                 throw new StateChangeFailedException("Replica %d for partition %s cannot be moved to NewReplica"
 23                   .format(replicaId, topicAndPartition) + "state as it is being requested to become leader")
 24               brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(replicaId),
 25                                                                   topic, partition, leaderIsrAndControllerEpoch,
 26                                                                   replicaAssignment)
 27             case None => // new leader request will be sent to this replica when one gets elected
 28           }
 29           replicaState.put(partitionAndReplica, NewReplica)
 30           stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s from %s to %s"
 31                                     .format(controllerId, controller.epoch, replicaId, topicAndPartition, currState,
 32                                             targetState))
 33         case ReplicaDeletionStarted =>
 34           assertValidPreviousStates(partitionAndReplica, List(OfflineReplica), targetState)
 35           replicaState.put(partitionAndReplica, ReplicaDeletionStarted)
 36           // send stop replica command
 37           brokerRequestBatch.addStopReplicaRequestForBrokers(List(replicaId), topic, partition, deletePartition = true,
 38             callbacks.stopReplicaResponseCallback)
 39           stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s from %s to %s"
 40             .format(controllerId, controller.epoch, replicaId, topicAndPartition, currState, targetState))
 41         case ReplicaDeletionIneligible =>
 42           assertValidPreviousStates(partitionAndReplica, List(ReplicaDeletionStarted), targetState)
 43           replicaState.put(partitionAndReplica, ReplicaDeletionIneligible)
 44           stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s from %s to %s"
 45             .format(controllerId, controller.epoch, replicaId, topicAndPartition, currState, targetState))
 46         case ReplicaDeletionSuccessful =>
 47           assertValidPreviousStates(partitionAndReplica, List(ReplicaDeletionStarted), targetState)
 48           replicaState.put(partitionAndReplica, ReplicaDeletionSuccessful)
 49           stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s from %s to %s"
 50             .format(controllerId, controller.epoch, replicaId, topicAndPartition, currState, targetState))
 51         case NonExistentReplica =>
 52           assertValidPreviousStates(partitionAndReplica, List(ReplicaDeletionSuccessful), targetState)
 53           // remove this replica from the assigned replicas list for its partition
 54           val currentAssignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition)
 55           controllerContext.partitionReplicaAssignment.put(topicAndPartition, currentAssignedReplicas.filterNot(_ == replicaId))
 56           replicaState.remove(partitionAndReplica)
 57           stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s from %s to %s"
 58             .format(controllerId, controller.epoch, replicaId, topicAndPartition, currState, targetState))
 59         case OnlineReplica =>
 60           assertValidPreviousStates(partitionAndReplica,
 61             List(NewReplica, OnlineReplica, OfflineReplica, ReplicaDeletionIneligible), targetState)
 62           replicaState(partitionAndReplica) match {
 63             case NewReplica =>
 64               // add this replica to the assigned replicas list for its partition
 65               val currentAssignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition)
 66               if(!currentAssignedReplicas.contains(replicaId))
 67                 controllerContext.partitionReplicaAssignment.put(topicAndPartition, currentAssignedReplicas :+ replicaId)
 68               stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s from %s to %s"
 69                                         .format(controllerId, controller.epoch, replicaId, topicAndPartition, currState,
 70                                                 targetState))
 71             case _ =>
 72               // check if the leader for this partition ever existed
 73               controllerContext.partitionLeadershipInfo.get(topicAndPartition) match {
 74                 case Some(leaderIsrAndControllerEpoch) =>
 75                   brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(replicaId), topic, partition, leaderIsrAndControllerEpoch,
 76                     replicaAssignment)
 77                   replicaState.put(partitionAndReplica, OnlineReplica)
 78                   stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s from %s to %s"
 79                     .format(controllerId, controller.epoch, replicaId, topicAndPartition, currState, targetState))
 80                 case None => // that means the partition was never in OnlinePartition state, this means the broker never
 81                   // started a log for that partition and does not have a high watermark value for this partition
 82               }
 83           }
 84           replicaState.put(partitionAndReplica, OnlineReplica)
 85         case OfflineReplica =>
 86           assertValidPreviousStates(partitionAndReplica,
 87             List(NewReplica, OnlineReplica, OfflineReplica, ReplicaDeletionIneligible), targetState)
 88           // send stop replica command to the replica so that it stops fetching from the leader
 89           brokerRequestBatch.addStopReplicaRequestForBrokers(List(replicaId), topic, partition, deletePartition = false)
 90           // As an optimization, the controller removes dead replicas from the ISR
 91           val leaderAndIsrIsEmpty: Boolean =
 92             controllerContext.partitionLeadershipInfo.get(topicAndPartition) match {
 93               case Some(_) =>
 94                 controller.removeReplicaFromIsr(topic, partition, replicaId) match {
 95                   case Some(updatedLeaderIsrAndControllerEpoch) =>
 96                     // send the shrunk ISR state change request to all the remaining alive replicas of the partition.
 97                     val currentAssignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition)
 98                     if (!controller.deleteTopicManager.isPartitionToBeDeleted(topicAndPartition)) {
 99                       brokerRequestBatch.addLeaderAndIsrRequestForBrokers(currentAssignedReplicas.filterNot(_ == replicaId),
100                         topic, partition, updatedLeaderIsrAndControllerEpoch, replicaAssignment)
101                     }
102                     replicaState.put(partitionAndReplica, OfflineReplica)
103                     stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s from %s to %s"
104                       .format(controllerId, controller.epoch, replicaId, topicAndPartition, currState, targetState))
105                     false
106                   case None =>
107                     true
108                 }
109               case None =>
110                 true
111             }
112           if (leaderAndIsrIsEmpty && !controller.deleteTopicManager.isPartitionToBeDeleted(topicAndPartition))
113             throw new StateChangeFailedException(
114               "Failed to change state of replica %d for partition %s since the leader and isr path in zookeeper is empty"
115               .format(replicaId, topicAndPartition))
116       }
117     }
118     catch {
119       case t: Throwable =>
120         stateChangeLogger.error("Controller %d epoch %d initiated state change of replica %d for partition [%s,%d] from %s to %s failed"
121                                   .format(controllerId, controller.epoch, replicaId, topic, partition, currState, targetState), t)
122     }
123   }

可以看到仅仅时将controllerContext.partitionReplicaAssignment和replicaState更新了一下,也就是可以理解为仅仅将replica的状态由NewReplica改为OnlineReplica。

这样一来,partition和replica的状态机转变就完成了,整个topic创建的过程也就完成了。回头看其实最主要的还是在partition从new到online上线的那一段逻辑。

原文地址:https://www.cnblogs.com/hudeqi/p/12640017.html

时间: 2024-10-17 13:28:23

kafka源码走读-controller (创建topic过程)的相关文章

apache kafka源码分析走读-Producer分析

apache kafka中国社区QQ群:162272557 producer的发送方式剖析 Kafka提供了Producer类作为java producer的api,该类有sync和async两种发送方式. sync架构图 async架构图 调用流程如下: 代码流程如下: Producer:当new Producer(new ProducerConfig()),其底层实现,实际会产生两个核心类的实例:Producer.DefaultEventHandler.在创建的同时,会默认new一个Prod

kafka源码分析之一server启动分析

1. 分析kafka源码的目的 深入掌握kafka的内部原理 深入掌握scala运用 2. server的启动 如下所示(本来准备用时序图的,但感觉时序图没有思维图更能反映,故采用了思维图): 2.1 启动入口Kafka.scala 从上面的思维导图,可以看到Kafka的启动入口是Kafka.scala的main()函数: def main(args: Array[String]): Unit = { try { val serverProps = getPropsFromArgs(args)

Linux Kafka源码环境搭建

本文主要讲述的是如何搭建Kafka的源码环境,主要针对的Linux操作系统下IntelliJ IDEA编译器,其余操作系统或者IDE可以类推. 1.安装和配置JDK确认JDK版本至少为1.7,最好是1.8及以上.使用java -version命令来查看当前JDK的版本,示例如下: [email protected]:~/workspace/software/hadoop-2.7.3/bin$ java -version java version "1.8.0_191" Java(TM)

Apache Spark源码走读之12 -- Hive on Spark运行环境搭建

欢迎转载,转载请注明出处,徽沪一郎. 楔子 Hive是基于Hadoop的开源数据仓库工具,提供了类似于SQL的HiveQL语言,使得上层的数据分析人员不用知道太多MapReduce的知识就能对存储于Hdfs中的海量数据进行分析.由于这一特性而收到广泛的欢迎. Hive的整体框架中有一个重要的模块是执行模块,这一部分是用Hadoop中MapReduce计算框架来实现,因而在处理速度上不是非常令人满意.由于Spark出色的处理速度,有人已经成功将HiveQL的执行利用Spark来运行,这就是已经非常

Apache Spark源码走读之13 -- hiveql on spark实现详解

欢迎转载,转载请注明出处,徽沪一郎 概要 在新近发布的spark 1.0中新加了sql的模块,更为引人注意的是对hive中的hiveql也提供了良好的支持,作为一个源码分析控,了解一下spark是如何完成对hql的支持是一件非常有趣的事情. Hive简介 Hive的由来 以下部分摘自Hadoop definite guide中的Hive一章 "Hive由Facebook出品,其设计之初目的是让精通SQL技能的分析师能够对Facebook存放在HDFS上的大规模数据集进行分析和查询. Hive大大

Apache Spark源码走读之21 -- 浅谈mllib中线性回归的算法实现

欢迎转载,转载请注明出处,徽沪一郎. 概要 本文简要描述线性回归算法在Spark MLLib中的具体实现,涉及线性回归算法本身及线性回归并行处理的理论基础,然后对代码实现部分进行走读. 线性回归模型 机器学习算法是的主要目的是找到最能够对数据做出合理解释的模型,这个模型是假设函数,一步步的推导基本遵循这样的思路 假设函数 为了找到最好的假设函数,需要找到合理的评估标准,一般来说使用损失函数来做为评估标准 根据损失函数推出目标函数 现在问题转换成为如何找到目标函数的最优解,也就是目标函数的最优化

Apache Spark源码走读之5 -- DStream处理的容错性分析

欢迎转载,转载请注明出处,徽沪一郎,谢谢. 在流数据的处理过程中,为了保证处理结果的可信度(不能多算,也不能漏算),需要做到对所有的输入数据有且仅有一次处理.在Spark Streaming的处理机制中,不能多算,比较容易理解.那么它又是如何作到即使数据处理结点被重启,在重启之后这些数据也会被再次处理呢? 环境搭建 为了有一个感性的认识,先运行一下简单的Spark Streaming示例.首先确认已经安装了openbsd-netcat. 运行netcatnc -lk 9999 运行spark-s

Apache Spark源码走读之16 -- spark repl实现详解

欢迎转载,转载请注明出处,徽沪一郎. 概要 之所以对spark shell的内部实现产生兴趣全部缘于好奇代码的编译加载过程,scala是需要编译才能执行的语言,但提供的scala repl可以实现代码的实时交互式执行,这是为什么呢? 既然scala已经提供了repl,为什么spark还要自己单独搞一套spark repl,这其中的缘由到底何在? 显然,这些都是问题,要解开这些谜团,只有再次开启一段源码分析之旅了. 全局视图 上图显示了java源文件从编译到加载执行的全局视图,整个过程中最主要的步

Apache Spark源码走读之7 -- Standalone部署方式分析

欢迎转载,转载请注明出处,徽沪一郎. 楔子 在Spark源码走读系列之2中曾经提到Spark能以Standalone的方式来运行cluster,但没有对Application的提交与具体运行流程做详细的分析,本文就这些问题做一个比较详细的分析,并且对在standalone模式下如何实现HA进行讲解. 没有HA的Standalone运行模式 先从比较简单的说起,所谓的没有ha是指master节点没有ha. 组成cluster的两大元素即Master和Worker.slave worker可以有1到