kafka源码解析之十二KafkaController(中篇)

12.3 KafkaController PartitionStateMachine

它实现了topic的分区状态切换功能,Partition存在的状态如下:


状态名


状态存在的时间


有效的前置状态


NonExistentPartition


1.partition重来没有被创建

2.partition创建之后被删除

OfflinePartition
NewPartition

1.partition创建之后,被分配了replicas,但是还没有leader/isr

NonExistentPartition
OnlinePartition

1.partition在replicas中选举某个成为leader之后

NewPartition/OfflinePartition
OfflinePartition

1.partition的replicas中的leader下线之后,没有重新选举新的leader之前

2.partition创建之后直接被下线

NewPartition/OnlinePartition

Partition状态切换的过程如下:


状态切换


切换的时机


NonExistentPartition -> NewPartition


1.从zk上加载assigned replicas置kafkaControl内部的缓存中


NewPartition-> OnlinePartition

1.分配第一个live replica作为leader,其它libe replicas作为isr,并把信息写入到zk

OnlinePartition,OfflinePartition -> OnlinePartition

1.为partition重新选举新的leader和isr,并把信息写入到zk

NewPartition,OnlinePartition,OfflinePartition -> OfflinePartition


1.仅仅是在kafkaControl中标记该状态为OfflinePartition


OfflinePartition -> NonExistentPartition


1.仅仅是在kafkaControl中标记该状态为NonExistentPartition

因此重点关注PartitionStateMachine的handleStateChange函数
private def handleStateChange(topic: String, partition: Int, targetState: PartitionState,
                              leaderSelector: PartitionLeaderSelector,
                              callbacks: Callbacks) {
  val topicAndPartition = TopicAndPartition(topic, partition)
  if (!hasStarted.get)
    throw new StateChangeFailedException(("Controller %d epoch %d initiated state change for partition %s to %s failed because " +
                                          "the partition state machine has not started")
                                            .format(controllerId, controller.epoch, topicAndPartition, targetState))
  val currState = partitionState.getOrElseUpdate(topicAndPartition, NonExistentPartition)
  try {
    targetState match {
      case NewPartition =>
//检查前置状态
        assertValidPreviousStates(topicAndPartition, List(NonExistentPartition), NewPartition)
        //更新controllerContext中的partitionReplicaAssignment
assignReplicasToPartitions(topic, partition)
//修改partition的状态
        partitionState.put(topicAndPartition, NewPartition)
        val assignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition).mkString(",")
        stateChangeLogger.trace("Controller %d epoch %d changed partition %s state from %s to %s with assigned replicas %s"
                                  .format(controllerId, controller.epoch, topicAndPartition, currState, targetState,
                                          assignedReplicas))
      case OnlinePartition =>
//检查前置状态
        assertValidPreviousStates(topicAndPartition, List(NewPartition, OnlinePartition, OfflinePartition), OnlinePartition)
        partitionState(topicAndPartition) match {
          case NewPartition =>// NewPartition-> OnlinePartition
            /* 1.根据partitionReplicaAssignment中信息选择第一个live的replica为leader,其余为isr
     *2.将leader和isr持久化到zk
             *3.更新controllerContext中的partitionLeadershipInfo
*4.封装发送给这些replica所在的broker的LeaderAndIsrRequest请求,交由ControllerBrokerRequestBatch处理
*/
            initializeLeaderAndIsrForPartition(topicAndPartition)
          case OfflinePartition =>// OfflinePartition-> OnlinePartition
/* 1.根据不同的leaderSelector选举新的leader,这里一般调用的是OfflinePartitionLeaderSelector
     *2.将leader和isr持久化到zk
     *3.更新controllerContext中的partitionLeadershipInfo
*4.封装发送给这些replica所在的broker的LeaderAndIsrRequest请求,交由ControllerBrokerRequestBatch处理
*/
            electLeaderForPartition(topic, partition, leaderSelector)
          case OnlinePartition =>// OnlinePartition -> OnlinePartition
/* 1.根据不同的leaderSelector选举新的leader,这里一般调用的是ReassignedPartitionLeaderSelector
     *2.将leader和isr持久化到zk
     *3.更新controllerContext中的partitionLeadershipInfo
*4.封装发送给这些replica所在的broker的LeaderAndIsrRequest请求,交由ControllerBrokerRequestBatch处理
*/
            electLeaderForPartition(topic, partition, leaderSelector)
          case _ => // should never come here since illegal previous states are checked above
        }
//更新partition的状态
        partitionState.put(topicAndPartition, OnlinePartition)
        val leader = controllerContext.partitionLeadershipInfo(topicAndPartition).leaderAndIsr.leader
        stateChangeLogger.trace("Controller %d epoch %d changed partition %s from %s to %s with leader %d"
                                  .format(controllerId, controller.epoch, topicAndPartition, currState, targetState, leader))
      case OfflinePartition =>
        //检查前置状态
        assertValidPreviousStates(topicAndPartition, List(NewPartition, OnlinePartition, OfflinePartition), OfflinePartition)
        stateChangeLogger.trace("Controller %d epoch %d changed partition %s state from %s to %s"
                                  .format(controllerId, controller.epoch, topicAndPartition, currState, targetState))
//更新partition的状态
        partitionState.put(topicAndPartition, OfflinePartition)
      case NonExistentPartition =>
        //检查前置状态
        assertValidPreviousStates(topicAndPartition, List(OfflinePartition), NonExistentPartition)
        stateChangeLogger.trace("Controller %d epoch %d changed partition %s state from %s to %s"
                                  .format(controllerId, controller.epoch, topicAndPartition, currState, targetState))
//更新partition的状态
        partitionState.put(topicAndPartition, NonExistentPartition)
        // post: partition state is deleted from all brokers and zookeeper
    }
  } catch {
    case t: Throwable =>
      stateChangeLogger.error("Controller %d epoch %d initiated state change for partition %s from %s to %s failed"
        .format(controllerId, controller.epoch, topicAndPartition, currState, targetState), t)
  }
}

12.4 KafkaController PartitionLeaderSelector

当partition的状态发生切换时,特别发生如下切换:OfflinePartition-> OnlinePartition和OnlinePartition -> OnlinePartition时需要调用不同的PartitionLeaderSelector来确定leader和isr,当前一共支持5种PartitionLeaderSelector,分别为:NoOpLeaderSelector,OfflinePartitionLeaderSelector,ReassignedPartitionLeaderSelector,PreferredReplicaPartitionLeaderSelector,ControlledShutdownLeaderSelector。

12.4.1 NoOpLeaderSelector

/**
 * Essentially does nothing. Returns the current leader and ISR, and the current
 * set of replicas assigned to a given topic/partition.
 */
class NoOpLeaderSelector(controllerContext: ControllerContext) extends PartitionLeaderSelector with Logging {

  this.logIdent = "[NoOpLeaderSelector]: "

  def selectLeader(topicAndPartition: TopicAndPartition, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int]) = {
    warn("I should never have been asked to perform leader election, returning the current LeaderAndIsr and replica assignment.")
    (currentLeaderAndIsr, controllerContext.partitionReplicaAssignment(topicAndPartition))
  }
}
基本上啥也没做,就是把currentLeaderAndIsr和set of replicas assigned to a given topic/partition

12.4.2 OfflinePartitionLeaderSelector

class OfflinePartitionLeaderSelector(controllerContext: ControllerContext, config: KafkaConfig)
  extends PartitionLeaderSelector with Logging {
  this.logIdent = "[OfflinePartitionLeaderSelector]: "
  def selectLeader(topicAndPartition: TopicAndPartition, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int]) = {
    controllerContext.partitionReplicaAssignment.get(topicAndPartition) match {
      case Some(assignedReplicas) =>
        val liveAssignedReplicas = assignedReplicas.filter(r => controllerContext.liveBrokerIds.contains(r))
        val liveBrokersInIsr = currentLeaderAndIsr.isr.filter(r => controllerContext.liveBrokerIds.contains(r))
        val currentLeaderEpoch = currentLeaderAndIsr.leaderEpoch
        val currentLeaderIsrZkPathVersion = currentLeaderAndIsr.zkVersion
        val newLeaderAndIsr = liveBrokersInIsr.isEmpty match {
          case true =>//isr中的broker都离线了,则需要从asr中选择leader
            if (!LogConfig.fromProps(config.props.props, AdminUtils.fetchTopicConfig(controllerContext.zkClient,
              topicAndPartition.topic)).uncleanLeaderElectionEnable) {
              throw new NoReplicaOnlineException(("No broker in ISR for partition " +
                "%s is alive. Live brokers are: [%s],".format(topicAndPartition, controllerContext.liveBrokerIds)) +
                " ISR brokers are: [%s]".format(currentLeaderAndIsr.isr.mkString(",")))
            }
            debug("No broker in ISR is alive for %s. Pick the leader from the alive assigned replicas: %s"
              .format(topicAndPartition, liveAssignedReplicas.mkString(",")))
            liveAssignedReplicas.isEmpty match {
              case true =>//如果asr中的broker也都已经离线了,则这个topic/partition挂了
                throw new NoReplicaOnlineException(("No replica for partition " +
                  "%s is alive. Live brokers are: [%s],".format(topicAndPartition, controllerContext.liveBrokerIds)) +
                  " Assigned replicas are: [%s]".format(assignedReplicas))
              case false =>//如果asr中的broker有一些是在线的
                ControllerStats.uncleanLeaderElectionRate.mark()
                val newLeader = liveAssignedReplicas.head//取第一个为leader
                warn("No broker in ISR is alive for %s. Elect leader %d from live brokers %s. There's potential data loss."
                     .format(topicAndPartition, newLeader, liveAssignedReplicas.mkString(",")))
                new LeaderAndIsr(newLeader, currentLeaderEpoch + 1, List(newLeader), currentLeaderIsrZkPathVersion + 1)
            }
          case false =>//isr中的broker有一些是在线的
            val liveReplicasInIsr = liveAssignedReplicas.filter(r => liveBrokersInIsr.contains(r))
            val newLeader = liveReplicasInIsr.head//选择第一个live的replica
            debug("Some broker in ISR is alive for %s. Select %d from ISR %s to be the leader."
                  .format(topicAndPartition, newLeader, liveBrokersInIsr.mkString(",")))
            new LeaderAndIsr(newLeader, currentLeaderEpoch + 1, liveBrokersInIsr.toList, currentLeaderIsrZkPathVersion + 1)
        }
        info("Selected new leader and ISR %s for offline partition %s".format(newLeaderAndIsr.toString(), topicAndPartition))
        (newLeaderAndIsr, liveAssignedReplicas)
      case None =>
        throw new NoReplicaOnlineException("Partition %s doesn't have replicas assigned to it".format(topicAndPartition))
    }
  }
}

12.4.3 ReassignedPartitionLeaderSelector

class ReassignedPartitionLeaderSelector(controllerContext: ControllerContext) extends PartitionLeaderSelector with Logging {
  this.logIdent = "[ReassignedPartitionLeaderSelector]: "
  def selectLeader(topicAndPartition: TopicAndPartition, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int]) = {
//patition被重新分配的replicas
    val reassignedInSyncReplicas = controllerContext.partitionsBeingReassigned(topicAndPartition).newReplicas
    val currentLeaderEpoch = currentLeaderAndIsr.leaderEpoch
    val currentLeaderIsrZkPathVersion = currentLeaderAndIsr.zkVersion
//在reassignedInSyncReplicas中筛选replica其所在的broker是live的和当前的replica是位于isr中的
val aliveReassignedInSyncReplicas = reassignedInSyncReplicas.filter(r => controllerContext.liveBrokerIds.contains(r) &&
                                                                             currentLeaderAndIsr.isr.contains(r))
    val newLeaderOpt = aliveReassignedInSyncReplicas.headOption
    newLeaderOpt match {//存在满足以上条件的replica,则筛选为leader
      case Some(newLeader) => (new LeaderAndIsr(newLeader, currentLeaderEpoch + 1, currentLeaderAndIsr.isr,
        currentLeaderIsrZkPathVersion + 1), reassignedInSyncReplicas)
      case None =>//否则reassigned失败
        reassignedInSyncReplicas.size match {
          case 0 =>
            throw new NoReplicaOnlineException("List of reassigned replicas for partition " +
              " %s is empty. Current leader and ISR: [%s]".format(topicAndPartition, currentLeaderAndIsr))
          case _ =>
            throw new NoReplicaOnlineException("None of the reassigned replicas for partition " +
              "%s are in-sync with the leader. Current leader and ISR: [%s]".format(topicAndPartition, currentLeaderAndIsr))
        }
    }
  }
}

12.4.4 PreferredReplicaPartitionLeaderSelector

class PreferredReplicaPartitionLeaderSelector(controllerContext: ControllerContext) extends PartitionLeaderSelector
with Logging {
  this.logIdent = "[PreferredReplicaPartitionLeaderSelector]: "
  def selectLeader(topicAndPartition: TopicAndPartition, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int]) = {
    val assignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition)
//默认选举第一个replica作为leader
    val preferredReplica = assignedReplicas.head
    // check if preferred replica is the current leader
    val currentLeader = controllerContext.partitionLeadershipInfo(topicAndPartition).leaderAndIsr.leader
    if (currentLeader == preferredReplica) {//如果已经实现,则退出
      throw new LeaderElectionNotNeededException("Preferred replica %d is already the current leader for partition %s"
                                                   .format(preferredReplica, topicAndPartition))
    } else {
      info("Current leader %d for partition %s is not the preferred replica.".format(currentLeader, topicAndPartition) +
        " Trigerring preferred replica leader election")
      // 检查这个replica是否位于isr和其所在的broker是否live,如果是的话,则其恢复成leader,此场景主要用于负载均衡的情况
  if (controllerContext.liveBrokerIds.contains(preferredReplica) && currentLeaderAndIsr.isr.contains(preferredReplica)) {
        (new LeaderAndIsr(preferredReplica, currentLeaderAndIsr.leaderEpoch + 1, currentLeaderAndIsr.isr,
          currentLeaderAndIsr.zkVersion + 1), assignedReplicas)
      } else {
        throw new StateChangeFailedException("Preferred replica %d for partition ".format(preferredReplica) +
          "%s is either not alive or not in the isr. Current leader and ISR: [%s]".format(topicAndPartition, currentLeaderAndIsr))
      }
    }
  }
}

12.4.5 ControlledShutdownLeaderSelector

class ControlledShutdownLeaderSelector(controllerContext: ControllerContext)
        extends PartitionLeaderSelector
        with Logging {
  this.logIdent = "[ControlledShutdownLeaderSelector]: "
  def selectLeader(topicAndPartition: TopicAndPartition, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int]) = {
    val currentLeaderEpoch = currentLeaderAndIsr.leaderEpoch
    val currentLeaderIsrZkPathVersion = currentLeaderAndIsr.zkVersion
    val currentLeader = currentLeaderAndIsr.leader
    val assignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition)
    val liveOrShuttingDownBrokerIds = controllerContext.liveOrShuttingDownBrokerIds
//筛选出live状态的replica
    val liveAssignedReplicas = assignedReplicas.filter(r => liveOrShuttingDownBrokerIds.contains(r))
//筛选出live状态的isr
    val newIsr = currentLeaderAndIsr.isr.filter(brokerId => !controllerContext.shuttingDownBrokerIds.contains(brokerId))
    val newLeaderOpt = newIsr.headOption
    newLeaderOpt match {
      case Some(newLeader) =>//如果存在newLeader,选择其作为leader
        debug("Partition %s : current leader = %d, new leader = %d"
              .format(topicAndPartition, currentLeader, newLeader))
        (LeaderAndIsr(newLeader, currentLeaderEpoch + 1, newIsr, currentLeaderIsrZkPathVersion + 1),
         liveAssignedReplicas)
      case None =>
        throw new StateChangeFailedException(("No other replicas in ISR %s for %s besides" +
          " shutting down brokers %s").format(currentLeaderAndIsr.isr.mkString(","), topicAndPartition, controllerContext.shuttingDownBrokerIds.mkString(",")))
    }
  }
}

12.5 KafkaController ReplicaStateMachine

它实现了topic的partition的replica状态切换功能,replica存在的状态如下:


状态名


状态存在的时间


有效的前置状态

NewReplica

1.replica被分配的时候,此时该replica还没有工作,其角色只能是follower

NonExistentReplica
OnlineReplica

1.replica开始工作,可能作为leader或者follower

NewReplica/OnlineReplica/ OfflineReplica
OfflineReplica

1.该replica挂了,比如说该replica所在的broker离线了

NewReplica, OnlineReplica
ReplicaDeletionStarted

1.开始删除该replica的时候

OfflineReplica
ReplicaDeletionSuccessful

1.replica成功响应删除该副本的请求的时候
,此时kafkaControl内存中还保留此replica的信息

ReplicaDeletionStarted
ReplicaDeletionIneligible

1.如果该replica删除失败

ReplicaDeletionStarted
NonExistentReplica

1.
replica信息被从KafkaControl内存中删除的时候

ReplicaDeletionSuccessful

replica状态切换的过程如下:

状态切换 切换的时机
NonExistentReplica->
NewReplica
1.KafkaControl 发送LeaderAndIsr request with current leader and isr to the new replica and UpdateMetadata request for the partition to every live broker
NewReplica -> OnlineReplica 1.当KafkaControl按需把new replica加入到asr中的时候,实际上NewReplica转化为OnlineReplica是一个很快的过程,中间存在的时间很短,其转化出现在onNewPartitionCreation
OnlineReplica,OfflineReplica->
OnlineReplica
1. KafkaControl 发送 LeaderAndIsr request with current leader and isr to the new replica and UpdateMetadata request for the partition to every live broker
NewReplica,OnlineReplica,OfflineReplica,ReplicaDeletionIneligible
-> OfflineReplica
1. kafkaControl发送StopReplicaRequest to the replica (w/o deletion)
2.kafkaControl 清除 this replica from the isr and send LeaderAndIsr request (with new isr) to the leader replica and UpdateMetadata request for the partition to every
live broker.
OfflineReplica->ReplicaDeletionStarted 1.kafkaControl发送StopReplicaRequest to the replica
ReplicaDeletionStarted->ReplicaDeletionSuccessful 1.kafkaControl mark the state of the replica in the state
machine
ReplicaDeletionStarted->ReplicaDeletionIneligible 1.kafkaControl mark the state of the replica in the state machine
ReplicaDeletionSuccessful-> NonExistentReplica 1.kafkaControl remove the replica from
the in memory partition replica assignment cache

因此重点关注ReplicaStateMachine的handleStateChange函数

def handleStateChange(partitionAndReplica: PartitionAndReplica, targetState: ReplicaState,
                      callbacks: Callbacks) {
  val topic = partitionAndReplica.topic
  val partition = partitionAndReplica.partition
  val replicaId = partitionAndReplica.replica
  val topicAndPartition = TopicAndPartition(topic, partition)
  if (!hasStarted.get)
    throw new StateChangeFailedException(("Controller %d epoch %d initiated state change of replica %d for partition %s " +
                                          "to %s failed because replica state machine has not started")
                                            .format(controllerId, controller.epoch, replicaId, topicAndPartition, targetState))
  val currState = replicaState.getOrElseUpdate(partitionAndReplica, NonExistentReplica)
  try {
    val replicaAssignment = controllerContext.partitionReplicaAssignment(topicAndPartition)
    targetState match {
      case NewReplica =>//当客户端刚创建topic的时候,触发KafkaControl内部的回调onNewPartitionCreation
//判断前置状态
        assertValidPreviousStates(partitionAndReplica, List(NonExistentReplica), targetState)
        val leaderIsrAndControllerEpochOpt = ReplicationUtils.getLeaderIsrAndEpochForPartition(zkClient, topic, partition)
        leaderIsrAndControllerEpochOpt match {
          case Some(leaderIsrAndControllerEpoch) =>
            if(leaderIsrAndControllerEpoch.leaderAndIsr.leader == replicaId) //NewReplica不可能是该Partition的leader,只有online状态才有leader
              throw new StateChangeFailedException("Replica %d for partition %s cannot be moved to NewReplica"
                .format(replicaId, topicAndPartition) + "state as it is being requested to become leader")
//封装发送给这些replica所在的broker的LeaderAndIsrRequest请求,交由ControllerBrokerRequestBatch处理
            brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(replicaId),
                                                                topic, partition, leaderIsrAndControllerEpoch,
                                                                replicaAssignment)
          case None => // new leader request will be sent to this replica when one gets elected
        }
//置状态为NewReplica
        replicaState.put(partitionAndReplica, NewReplica)
        stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s from %s to %s"
                                  .format(controllerId, controller.epoch, replicaId, topicAndPartition, currState,
                                          targetState))
      case ReplicaDeletionStarted =>
//判断前置状态
        assertValidPreviousStates(partitionAndReplica, List(OfflineReplica), targetState)
//置状态为ReplicaDeletionStarted
        replicaState.put(partitionAndReplica, ReplicaDeletionStarted)
//封装发送给这些replica所在的broker的StopReplicaRequest请求,交由ControllerBrokerRequestBatch处理,并且在收到reponse的时候回调TopicDeletionManager中的deleteTopicStopReplicaCallback,将那些成功删除的replica状态切换为ReplicaDeletionSuccessful,将那些删除失败的replica状态切换为ReplicaDeletionIneligible
        brokerRequestBatch.addStopReplicaRequestForBrokers(List(replicaId), topic, partition, deletePartition = true,
          callbacks.stopReplicaResponseCallback)
        stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s from %s to %s"
          .format(controllerId, controller.epoch, replicaId, topicAndPartition, currState, targetState))
      case ReplicaDeletionIneligible =>
//判断前置状态
        assertValidPreviousStates(partitionAndReplica, List(ReplicaDeletionStarted), targetState)
//置状态为ReplicaDeletionIneligible
        replicaState.put(partitionAndReplica, ReplicaDeletionIneligible)
        stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s from %s to %s"
          .format(controllerId, controller.epoch, replicaId, topicAndPartition, currState, targetState))
      case ReplicaDeletionSuccessful =>
//判断前置状态
        assertValidPreviousStates(partitionAndReplica, List(ReplicaDeletionStarted), targetState)
//置状态为ReplicaDeletionIneligible
        replicaState.put(partitionAndReplica, ReplicaDeletionSuccessful)
        stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s from %s to %s"
          .format(controllerId, controller.epoch, replicaId, topicAndPartition, currState, targetState))
      case NonExistentReplica =>
//判断前置状态
        assertValidPreviousStates(partitionAndReplica, List(ReplicaDeletionSuccessful), targetState)
        val currentAssignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition)
//更新partition的分布请求
        controllerContext.partitionReplicaAssignment.put(topicAndPartition, currentAssignedReplicas.filterNot(_ == replicaId))
//删除该replica的状态
        replicaState.remove(partitionAndReplica)
        stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s from %s to %s"
          .format(controllerId, controller.epoch, replicaId, topicAndPartition, currState, targetState))
      case OnlineReplica =>
//判断前置状态
        assertValidPreviousStates(partitionAndReplica,
          List(NewReplica, OnlineReplica, OfflineReplica, ReplicaDeletionIneligible), targetState)
        replicaState(partitionAndReplica) match {
          case NewReplica =>//基本上啥也没做
            val currentAssignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition)
            if(!currentAssignedReplicas.contains(replicaId))//按需添加replica
              controllerContext.partitionReplicaAssignment.put(topicAndPartition, currentAssignedReplicas :+ replicaId)
            stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s from %s to %s"
                                      .format(controllerId, controller.epoch, replicaId, topicAndPartition, currState,
                                              targetState))
          case _ =>//可能之前已经存在,则向其发送leader和isr的request
            controllerContext.partitionLeadershipInfo.get(topicAndPartition) match {
              case Some(leaderIsrAndControllerEpoch) =>
                brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(replicaId), topic, partition, leaderIsrAndControllerEpoch,
                  replicaAssignment)
//置状态为OnlineReplica,感觉有点多余
                replicaState.put(partitionAndReplica, OnlineReplica)
                stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s from %s to %s"
                  .format(controllerId, controller.epoch, replicaId, topicAndPartition, currState, targetState))
              case None => 

            }
        }
//置状态为OnlineReplica
        replicaState.put(partitionAndReplica, OnlineReplica)
      case OfflineReplica =>
//判断前置状态
        assertValidPreviousStates(partitionAndReplica,
          List(NewReplica, OnlineReplica, OfflineReplica, ReplicaDeletionIneligible), targetState)
//封装发送给这些replica所在的broker的StopReplicaRequest请求,交由ControllerBrokerRequestBatch处理
        brokerRequestBatch.addStopReplicaRequestForBrokers(List(replicaId), topic, partition, deletePartition = false)
        val leaderAndIsrIsEmpty: Boolean =
          controllerContext.partitionLeadershipInfo.get(topicAndPartition) match {
            case Some(currLeaderIsrAndControllerEpoch) =>
//删除该replica
              controller.removeReplicaFromIsr(topic, partition, replicaId) match {
                case Some(updatedLeaderIsrAndControllerEpoch) =>
//此topic的partition的replicas发生了shrink(缩减),需要通知其它的replica
                  val currentAssignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition)
                  if (!controller.deleteTopicManager.isPartitionToBeDeleted(topicAndPartition)) {
                    brokerRequestBatch.addLeaderAndIsrRequestForBrokers(currentAssignedReplicas.filterNot(_ == replicaId),
                      topic, partition, updatedLeaderIsrAndControllerEpoch, replicaAssignment)
                  }
//置状态为OfflineReplica
                  replicaState.put(partitionAndReplica, OfflineReplica)
                  stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s from %s to %s"
                    .format(controllerId, controller.epoch, replicaId, topicAndPartition, currState, targetState))
                  false
                case None =>
                  true
              }
            case None =>
              true
          }
        if (leaderAndIsrIsEmpty)//不能没有leader
          throw new StateChangeFailedException(
            "Failed to change state of replica %d for partition %s since the leader and isr path in zookeeper is empty"
            .format(replicaId, topicAndPartition))
    }
  }
  catch {
    case t: Throwable =>
      stateChangeLogger.error("Controller %d epoch %d initiated state change of replica %d for partition [%s,%d] from %s to %s failed"
                                .format(controllerId, controller.epoch, replicaId, topic, partition, currState, targetState), t)
  }
}
时间: 2024-07-29 18:38:20

kafka源码解析之十二KafkaController(中篇)的相关文章

kafka源码解析之十二KafkaController(下篇)

12.6 KafkaController内部的listener KafkaControler(leader)通过在zk的不同目录建立各种listener来达到对topic的管理和维护,其在zk的目录结构和对应的listener如下: 12.6.1 brokerChangeListener /** * This is the zookeeper listener that triggers all the state transitions for a replica */ class Broke

Mybaits 源码解析 (十二)----- Mybatis的事务如何被Spring管理?Mybatis和Spring事务中用的Connection是同一个吗?

不知道一些同学有没有这种疑问,为什么Mybtis中要配置dataSource,Spring的事务中也要配置dataSource?那么Mybatis和Spring事务中用的Connection是同一个吗?我们常用配置如下 <!--会话工厂 --> <bean id="sqlSessionFactory" class="org.mybatis.spring.SqlSessionFactoryBean"> <property name=&qu

hbase源码系列(十二)Get、Scan在服务端是如何处理?

继上一篇讲了Put和Delete之后,这一篇我们讲Get和Scan, 因为我发现这两个操作几乎是一样的过程,就像之前的Put和Delete一样,上一篇我本来只打算写Put的,结果发现Delete也可以走这个过程,所以就一起写了. Get 我们打开HRegionServer找到get方法.Get的方法处理分两种,设置了ClosestRowBefore和没有设置的,一般来讲,我们都是知道了明确的rowkey,不太会设置这个参数,它默认是false的. if (get.hasClosestRowBef

【原】Android热更新开源项目Tinker源码解析系列之二:资源文件热更新

上一篇文章介绍了Dex文件的热更新流程,本文将会分析Tinker中对资源文件的热更新流程. 同Dex,资源文件的热更新同样包括三个部分:资源补丁生成,资源补丁合成及资源补丁加载. 本系列将从以下三个方面对Tinker进行源码解析: Android热更新开源项目Tinker源码解析系列之一:Dex热更新 Android热更新开源项目Tinker源码解析系列之二:资源热更新 Android热更新开源项目Tinker源码解析系类之三:so热更新 转载请标明本文来源:http://www.cnblogs

libevent源码深度剖析十二

libevent源码深度剖析十二 --让libevent支持多线程张亮 Libevent本身不是多线程安全的,在多核的时代,如何能充分利用CPU的能力呢,这一节来说说如何在多线程环境中使用libevent,跟源代码并没有太大的关系,纯粹是使用上的技巧. 1 错误使用示例 在多核的CPU上只使用一个线程始终是对不起CPU的处理能力啊,那好吧,那就多创建几个线程,比如下面的简单服务器场景.1 主线程创建工作线程1:2 接着主线程监听在端口上,等待新的连接:3 在线程1中执行event事件循环,等待事

JDK8源码解析 -- HashMap(二)

在上一篇JDK8源码解析 -- HashMap(一)的博客中关于HashMap的重要知识点已经讲了差不多了,还有一些内容我会在今天这篇博客中说说,同时我也会把一些我不懂的问题抛出来,希望看到我这篇博客的大神帮忙解答困扰我的问题,让我明白一个所以然来.彼此互相进步,互相成长.HashMap从jdk7到jdk8版本改变大,1.新增加的节点在链表末尾进行添加  2.使用了红黑树. 1. HashMap容量大小求值方法 // 返回2的幂次 static final int tableSizeFor(in

android源码解析之(二)--&gt;异步任务AsyncTask

android的异步任务体系中还有一个非常重要的操作类:AsyncTask,其内部主要使用的是java的线程池和Handler来实现异步任务以及与UI线程的交互.本文主要解析AsyncTask的的使用与源码. 首先我们来看一下AsyncTask的基本使用: class MAsyncTask extends AsyncTask<Integer, Integer, Integer> { @Override protected void onPreExecute() { super.onPreExe

Redis源码解析(十五)--- aof-append only file解析

继续学习redis源码下的Data数据相关文件的代码分析,今天我看的是一个叫aof的文件,这个字母是append ONLY file的简称,意味只进行追加文件操作.这里的文件追加记录时为了记录数据操作的改变记录,用以异常情况的数据恢复的.类似于之前我说的redo,undo日志的作用.我们都知道,redis作为一个内存数据库,数据的每次操作改变是先放在内存中,等到内存数据满了,在刷新到磁盘文件中,达到持久化的目的.所以aof的操作模式,也是采用了这样的方式.这里引入了一个block块的概念,其实就

android源码解析(十九)--&gt;Dialog加载绘制流程

前面两篇文章,我们分析了Activity的布局文件加载.绘制流程,算是对整个Android系统中界面的显示流程有了一个大概的了解,其实Android系统中所有的显示控件(注意这里是控件,而不是组件)的加载绘制流程都是类似的,包括:Dialog的加载绘制流程,PopupWindow的加载绘制流程,Toast的显示原理等,上一篇文章中,我说在介绍了Activity界面的加载绘制流程之后,就会分析一下剩余几个控件的显示控制流程,这里我打算先分析一下Dialog的加载绘制流程. 可能有的同学问这里为什么