kafka源码解析之十二KafkaController(下篇)

12.6 KafkaController内部的listener

KafkaControler(leader)通过在zk的不同目录建立各种listener来达到对topic的管理和维护,其在zk的目录结构和对应的listener如下:

12.6.1 brokerChangeListener

/**
 * This is the zookeeper listener that triggers all the state transitions for a replica
 */
class BrokerChangeListener() extends IZkChildListener with Logging {
  this.logIdent = "[BrokerChangeListener on Controller " + controller.config.brokerId + "]: "
  def handleChildChange(parentPath : String, currentBrokerList : java.util.List[String]) {
    info("Broker change listener fired for path %s with children %s".format(parentPath, currentBrokerList.mkString(",")))
    inLock(controllerContext.controllerLock) {
      if (hasStarted.get) {
        ControllerStats.leaderElectionTimer.time {
          try {
            val curBrokerIds = currentBrokerList.map(_.toInt).toSet
            val newBrokerIds = curBrokerIds -- controllerContext.liveOrShuttingDownBrokerIds
            val newBrokerInfo = newBrokerIds.map(ZkUtils.getBrokerInfo(zkClient, _))
//筛选出newBroker
            val newBrokers = newBrokerInfo.filter(_.isDefined).map(_.get)
//筛选出deadBrokerIds
val deadBrokerIds = controllerContext.liveOrShuttingDownBrokerIds -- curBrokerIds
            controllerContext.liveBrokers = curBrokerIds.map(ZkUtils.getBrokerInfo(zkClient, _)).filter(_.isDefined).map(_.get)
            info("Newly added brokers: %s, deleted brokers: %s, all live brokers: %s"
              .format(newBrokerIds.mkString(","), deadBrokerIds.mkString(","), controllerContext.liveBrokerIds.mkString(",")))
//添加和newBroker的通信通道
            newBrokers.foreach(controllerContext.controllerChannelManager.addBroker(_))
//删除和newBroker的通信通道
            deadBrokerIds.foreach(controlleContext.controllerChannelManager.removeBroker(_))
            if(newBrokerIds.size > 0)
//尝试将该broker上的replica切换为online状态,并且恢复删除topic的流程
              controller.onBrokerStartup(newBrokerIds.toSeq)
            if(deadBrokerIds.size > 0)
//尝试将该broker上的replica切换为offline状态,并且标记该replica删除失败
              controller.onBrokerFailure(deadBrokerIds.toSeq)
          } catch {
            case e: Throwable => error("Error while handling broker changes", e)
          }
        }
      }
    }
  }
}

12.6.2 topicChangeListener

class TopicChangeListener extends IZkChildListener with Logging {
  this.logIdent = "[TopicChangeListener on Controller " + controller.config.brokerId + "]: "

  @throws(classOf[Exception])
  def handleChildChange(parentPath : String, children : java.util.List[String]) {
    inLock(controllerContext.controllerLock) {
      if (hasStarted.get) {
        try {
          val currentChildren = {
            import JavaConversions._
            debug("Topic change listener fired for path %s with children %s".format(parentPath, children.mkString(",")))
            (children: Buffer[String]).toSet
          }
//筛选出newTopics
          val newTopics = currentChildren -- controllerContext.allTopics
//筛选出deletedTopics
          val deletedTopics = controllerContext.allTopics -- currentChildren
          controllerContext.allTopics = currentChildren
//获取topic的assignment分配情况
          val addedPartitionReplicaAssignment = ZkUtils.getReplicaAssignmentForTopics(zkClient, newTopics.toSeq)
//剔除deletedTopics的replicaassignment
          controllerContext.partitionReplicaAssignment = controllerContext.partitionReplicaAssignment.filter(p =>
            !deletedTopics.contains(p._1.topic))
//增加newTopics的replicaassignment
          controllerContext.partitionReplicaAssignment.++=(addedPartitionReplicaAssignment)
          info("New topics: [%s], deleted topics: [%s], new partition replica assignment [%s]".format(newTopics,
            deletedTopics, addedPartitionReplicaAssignment))
          if(newTopics.size > 0)//创建topic
            controller.onNewTopicCreation(newTopics, addedPartitionReplicaAssignment.keySet.toSet)
        } catch {
          case e: Throwable => error("Error while handling new topic", e )
        }
      }
    }
  }

12.6.3 deleteTopicsListener

class DeleteTopicsListener() extends IZkChildListener with Logging {
  this.logIdent = "[DeleteTopicsListener on " + controller.config.brokerId + "]: "
  val zkClient = controllerContext.zkClient
  /**
   * Invoked when a topic is being deleted
   * @throws Exception On any error.
   */
  @throws(classOf[Exception])
  def handleChildChange(parentPath : String, children : java.util.List[String]) {
    inLock(controllerContext.controllerLock) {
      var topicsToBeDeleted = {
        import JavaConversions._
        (children: Buffer[String]).toSet
      }
      debug("Delete topics listener fired for topics %s to be deleted".format(topicsToBeDeleted.mkString(",")))
//过滤出不存在的topic
      val nonExistentTopics = topicsToBeDeleted.filter(t => !controllerContext.allTopics.contains(t))
      if(nonExistentTopics.size > 0) {
        warn("Ignoring request to delete non-existing topics " + nonExistentTopics.mkString(","))
        nonExistentTopics.foreach(topic => ZkUtils.deletePathRecursive(zkClient, ZkUtils.getDeleteTopicPath(topic)))
      }
//剔除不存在的topic
      topicsToBeDeleted --= nonExistentTopics
      if(topicsToBeDeleted.size > 0) {
        info("Starting topic deletion for topics " + topicsToBeDeleted.mkString(","))
        // mark topic ineligible for deletion if other state changes are in progress
        topicsToBeDeleted.foreach { topic =>
          val preferredReplicaElectionInProgress =
            controllerContext.partitionsUndergoingPreferredReplicaElection.map(_.topic).contains(topic)
          val partitionReassignmentInProgress =
            controllerContext.partitionsBeingReassigned.keySet.map(_.topic).contains(topic)
//如果topic的partition的replica正在重分配或者重新选举的话,则标识该topic不能被删除
          if(preferredReplicaElectionInProgress || partitionReassignmentInProgress)
            controller.deleteTopicManager.markTopicIneligibleForDeletion(Set(topic))
        }
//把topic交由deleteTopicManager处理
        controller.deleteTopicManager.enqueueTopicsForDeletion(topicsToBeDeleted)
      }
    }
  }
  @throws(classOf[Exception])
  def handleDataDeleted(dataPath: String) {
  }
}

12.6.4 preferredReplicaElectionListener

class PreferredReplicaElectionListener(controller: KafkaController) extends IZkDataListener with Logging {
  this.logIdent = "[PreferredReplicaElectionListener on " + controller.config.brokerId + "]: "
  val zkClient = controller.controllerContext.zkClient
  val controllerContext = controller.controllerContext
  @throws(classOf[Exception])
  def handleDataChange(dataPath: String, data: Object) {
    debug("Preferred replica election listener fired for path %s. Record partitions to undergo preferred replica election %s"
            .format(dataPath, data.toString))
    inLock(controllerContext.controllerLock) {
      val partitionsForPreferredReplicaElection = PreferredReplicaLeaderElectionCommand.parsePreferredReplicaElectionData(data.toString)
      if(controllerContext.partitionsUndergoingPreferredReplicaElection.size > 0)
        info("These partitions are already undergoing preferred replica election: %s"
          .format(controllerContext.partitionsUndergoingPreferredReplicaElection.mkString(",")))
//剔除正在PreferredReplicaElection的topic的partition
      val partitions = partitionsForPreferredReplicaElection -- controllerContext.partitionsUndergoingPreferredReplicaElection
//筛选出topic处于删除状态的topic and partition
      val partitionsForTopicsToBeDeleted = partitions.filter(p => controller.deleteTopicManager.isTopicQueuedUpForDeletion(p.topic))
      if(partitionsForTopicsToBeDeleted.size > 0) {
        error("Skipping preferred replica election for partitions %s since the respective topics are being deleted"
          .format(partitionsForTopicsToBeDeleted))
      }
//剩余的topic and partition才是真正需要PreferredReplicaElection
      controller.onPreferredReplicaElection(partitions -- partitionsForTopicsToBeDeleted)
    }
  }
  @throws(classOf[Exception])
  def handleDataDeleted(dataPath: String) {
  }
}

12.6.5 partitionReassignedListener

class PartitionsReassignedListener(controller: KafkaController) extends IZkDataListener with Logging {
  this.logIdent = "[PartitionsReassignedListener on " + controller.config.brokerId + "]: "
  val zkClient = controller.controllerContext.zkClient
  val controllerContext = controller.controllerContext
  @throws(classOf[Exception])
  def handleDataChange(dataPath: String, data: Object) {
    debug("Partitions reassigned listener fired for path %s. Record partitions to be reassigned %s"
      .format(dataPath, data))
    val partitionsReassignmentData = ZkUtils.parsePartitionReassignmentData(data.toString)
//剔除正在重分配的partition
    val partitionsToBeReassigned = inLock(controllerContext.controllerLock) {
      partitionsReassignmentData.filterNot(p => controllerContext.partitionsBeingReassigned.contains(p._1))
    }
//剔除partition的topic处于删除状态的
    partitionsToBeReassigned.foreach { partitionToBeReassigned =>
      inLock(controllerContext.controllerLock) {
        if(controller.deleteTopicManager.isTopicQueuedUpForDeletion(partitionToBeReassigned._1.topic)) {
          error("Skipping reassignment of partition %s for topic %s since it is currently being deleted"
            .format(partitionToBeReassigned._1, partitionToBeReassigned._1.topic))
          controller.removePartitionFromReassignedPartitions(partitionToBeReassigned._1)
        } else {//开始进行真正的partition的reassigned动作
          val context = new ReassignedPartitionsContext(partitionToBeReassigned._2)
          controller.initiateReassignReplicasForTopicPartition(partitionToBeReassigned._1, context)
        }
      }
    }
  }
  @throws(classOf[Exception])
  def handleDataDeleted(dataPath: String) {
  }
}

Partition的reassign比较复杂,因此详细叙述下,继续往下看:

def initiateReassignReplicasForTopicPartition(topicAndPartition: TopicAndPartition,
                                      reassignedPartitionContext: ReassignedPartitionsContext) {
  val newReplicas = reassignedPartitionContext.newReplicas
  val topic = topicAndPartition.topic
  val partition = topicAndPartition.partition
//过滤出有效的topic and partition的replicas
  val aliveNewReplicas = newReplicas.filter(r => controllerContext.liveBrokerIds.contains(r))
  try {
    val assignedReplicasOpt = controllerContext.partitionReplicaAssignment.get(topicAndPartition)
    assignedReplicasOpt match {
      case Some(assignedReplicas) =>
        if(assignedReplicas == newReplicas) {//和之前的对比,如果一致,则不需要reassign
          throw new KafkaException("Partition %s to be reassigned is already assigned to replicas".format(topicAndPartition) +
            " %s. Ignoring request for partition reassignment".format(newReplicas.mkString(",")))
        } else {
          if(aliveNewReplicas == newReplicas) {// 如果reassign的replicas全部是在线状态的话,则执行reassign动作
            info("Handling reassignment of partition %s to new replicas %s".format(topicAndPartition, newReplicas.mkString(",")))
            //针对该partition的isr路径注册watch,检测它的变化,注意该listener为ReassignedPartitionsIsrChangeListener
            watchIsrChangesForReassignedPartition(topic, partition, reassignedPartitionContext)
//标记该topic and partition处于reassigned状态
            controllerContext.partitionsBeingReassigned.put(topicAndPartition, reassignedPartitionContext)
            //标记topic为非法,防止中途被删除
            deleteTopicManager.markTopicIneligibleForDeletion(Set(topic))
//真正地执行reassigned动作
            onPartitionReassignment(topicAndPartition, reassignedPartitionContext)
          } else {//有一些reassign的replica是离线状态,因此reassign失败
            // some replica in RAR is not alive. Fail partition reassignment
            throw new KafkaException("Only %s replicas out of the new set of replicas".format(aliveNewReplicas.mkString(",")) +
              " %s for partition %s to be reassigned are alive. ".format(newReplicas.mkString(","), topicAndPartition) +
              "Failing partition reassignment")
          }
        }
//找不到该topic and partition
      case None => throw new KafkaException("Attempt to reassign partition %s that doesn't exist"
        .format(topicAndPartition))
    }
  } catch {//只要发生异常,则从reassignedpartitions中删除
    case e: Throwable => error("Error completing reassignment of partition %s".format(topicAndPartition), e)
    // remove the partition from the admin path to unblock the admin client
    removePartitionFromReassignedPartitions(topicAndPartition)
  }
}

这其中最主要的流程是onPartitionReassignment内部的逻辑,如下:

/*
*1.首先解释下名词:
* RAR = Reassigned replicas(replicas的重分配情况)
* OAR = Original list of replicas for partition(replicas的初始状态)
* AR = current assigned replicas
*/
def onPartitionReassignment(topicAndPartition: TopicAndPartition, reassignedPartitionContext: ReassignedPartitionsContext) {
  val reassignedReplicas = reassignedPartitionContext.newReplicas
  areReplicasInIsr(topicAndPartition.topic, topicAndPartition.partition, reassignedReplicas) match {
    case false =>//发现new replicas不在之前该partition的isr中,表明没有同步上最新数据,则首先应该让这些new replicas同步上该partition的数据
      info("New replicas %s for partition %s being ".format(reassignedReplicas.mkString(","), topicAndPartition) +
        "reassigned not yet caught up with the leader")
      val newReplicasNotInOldReplicaList = reassignedReplicas.toSet -- controllerContext.partitionReplicaAssignment(topicAndPartition).toSet
      val newAndOldReplicas = (reassignedPartitionContext.newReplicas ++ controllerContext.partitionReplicaAssignment(topicAndPartition)).toSet
      //因此先把该partition的replicas置为 newAndOldReplicas
      updateAssignedReplicasForPartition(topicAndPartition, newAndOldReplicas.toSeq)
      //向这些replicas所在的broker发送 LeaderAndIsrRequest请求
      updateLeaderEpochAndSendRequest(topicAndPartition, controllerContext.partitionReplicaAssignment(topicAndPartition),
        newAndOldReplicas.toSeq)
//置newReplicasNotInOldReplicaList的状态为NewReplica,那么程序在这里之后是如何运行的呢?
//注意在这之前,KafkaControler在/brokers/topics/[topic]/partitions/[partitionId]/state注册了ReassignedPartitionsIsrChangeListener
//函数,当新增的replicas同步上这个partition的leader之后,KafkaController更新对应的isr时会进一步触发//ReassignedPartitionsIsrChangeListener,且看ReassignedPartitionsIsrChangeListener的实现
      startNewReplicasForReassignedPartition(topicAndPartition, reassignedPartitionContext, newReplicasNotInOldReplicaList)
      info("Waiting for new replicas %s for partition %s being ".format(reassignedReplicas.mkString(","), topicAndPartition) +
        "reassigned to catch up with the leader")
    case true =>//此时new replicas已经全部同步上了
      //过滤出旧的replicas
      val oldReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition).toSet -- reassignedReplicas.toSet
      //将resignedReplicas的状态置为OnlineReplica
      reassignedReplicas.foreach { replica =>
        replicaStateMachine.handleStateChanges(Set(new PartitionAndReplica(topicAndPartition.topic, topicAndPartition.partition,
          replica)), OnlineReplica)
      }
      //按需确定新的leader,如果leader在newreplicas中,则保持不变,如果不在,则重新选举
      moveReassignedPartitionLeaderIfRequired(topicAndPartition, reassignedPartitionContext)
      //删除旧的replicas
      stopOldReplicasOfReassignedPartition(topicAndPartition, reassignedPartitionContext, oldReplicas)
      //在kafkaController cache和zk上更新topicAndPartition的replicas
      updateAssignedReplicasForPartition(topicAndPartition, reassignedReplicas)
      //更新zk上的/admin/reassign_partitions内容,删除该topicAndPartition
      removePartitionFromReassignedPartitions(topicAndPartition)
      info("Removed partition %s from the list of reassigned partitions in zookeeper".format(topicAndPartition))
      controllerContext.partitionsBeingReassigned.remove(topicAndPartition)
      //发送UpdateMetadataRequest给broker
      sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq, Set(topicAndPartition))
      //恢复删除topic的流程,可能该topic的partition在重分配之后需要被删除
      deleteTopicManager.resumeDeletionForTopics(Set(topicAndPartition.topic))
  }
}

当新的replics同步上对应partition的leader之后,会在/brokers/topics/[topic]/partitions/[partitionId]/state路径更新对应partition的状态,此时触发ReassignedPartitionsIsrChangeListener的回调函数

class ReassignedPartitionsIsrChangeListener(controller: KafkaController, topic: String, partition: Int,
                                            reassignedReplicas: Set[Int])
  extends IZkDataListener with Logging {
  this.logIdent = "[ReassignedPartitionsIsrChangeListener on controller " + controller.config.brokerId + "]: "
  val zkClient = controller.controllerContext.zkClient
  val controllerContext = controller.controllerContext
  @throws(classOf[Exception])
  def handleDataChange(dataPath: String, data: Object) {
    inLock(controllerContext.controllerLock) {
      debug("Reassigned partitions isr change listener fired for path %s with children %s".format(dataPath, data))
      val topicAndPartition = TopicAndPartition(topic, partition)
      try {
        controllerContext.partitionsBeingReassigned.get(topicAndPartition) match {
          case Some(reassignedPartitionContext) =>
            val newLeaderAndIsrOpt = ZkUtils.getLeaderAndIsrForPartition(zkClient, topic, partition)
            newLeaderAndIsrOpt match {
              case Some(leaderAndIsr) =>
                val caughtUpReplicas = reassignedReplicas & leaderAndIsr.isr.toSet
                if(caughtUpReplicas == reassignedReplicas) {//如果reassigned的replicas全部处于isr之中的话,说明新增的replicas已经追上了其partition的leader
                  info("%d/%d replicas have caught up with the leader for partition %s being reassigned."
                    .format(caughtUpReplicas.size, reassignedReplicas.size, topicAndPartition) +
                    "Resuming partition reassignment")
//则再一次进入onPartitionReassignment处理流程,
//此时areReplicasInIsr(topicAndPartition.topic, topicAndPartition.partition, reassignedReplicas)为true
                  controller.onPartitionReassignment(topicAndPartition, reassignedPartitionContext)
                }
                else {
                  info("%d/%d replicas have caught up with the leader for partition %s being reassigned."
                    .format(caughtUpReplicas.size, reassignedReplicas.size, topicAndPartition) +
                    "Replica(s) %s still need to catch up".format((reassignedReplicas -- leaderAndIsr.isr.toSet).mkString(",")))
                }
              case None => error("Error handling reassignment of partition %s to replicas %s as it was never created"
                .format(topicAndPartition, reassignedReplicas.mkString(",")))
            }
          case None =>
        }
      } catch {
        case e: Throwable => error("Error while handling partition reassignment", e)
      }
    }
  }
  @throws(classOf[Exception])
  def handleDataDeleted(dataPath: String) {
  }
}

12.6.6 AddPartitionsListener

class AddPartitionsListener(topic: String) extends IZkDataListener with Logging {
  this.logIdent = "[AddPartitionsListener on " + controller.config.brokerId + "]: "
  @throws(classOf[Exception])
  def handleDataChange(dataPath : String, data: Object) {
    inLock(controllerContext.controllerLock) {
      try {
        info("Add Partition triggered " + data.toString + " for path " + dataPath)
        val partitionReplicaAssignment = ZkUtils.getReplicaAssignmentForTopics(zkClient, List(topic))
//过滤出新增的partition
        val partitionsToBeAdded = partitionReplicaAssignment.filter(p =>
          !controllerContext.partitionReplicaAssignment.contains(p._1))
//如果新增的partition的topic正在删除中的话,则忽略,否则开始创建新的partition
        if(controller.deleteTopicManager.isTopicQueuedUpForDeletion(topic))
          error("Skipping adding partitions %s for topic %s since it is currently being deleted"
                .format(partitionsToBeAdded.map(_._1.partition).mkString(","), topic))
        else {
          if (partitionsToBeAdded.size > 0) {
            info("New partitions to be added %s".format(partitionsToBeAdded))
            controller.onNewPartitionCreation(partitionsToBeAdded.keySet.toSet)
          }
        }
      } catch {
        case e: Throwable => error("Error while handling add partitions for data path " + dataPath, e )
      }
    }
  }
  @throws(classOf[Exception])
  def handleDataDeleted(parentPath : String) {
    // this is not implemented for partition change
  }
}

12.7 KafkaController内部rebalance流程

那什么是rebalance呢?rebalance就是当topic and partition的leader发生变化时,造成在集群内部分布不均,需要重新调整topic and partition的leader为原始状态,使负载均衡,即如下的过程:


Topic And Partition


Leader


ISR


[topic] partition 0


1


1,2,


[topic] partition 1


2


2,3


[topic] partition 2


3


3,4


[topic] partition 3


4


4,1

每个Broker都存在一个leader,则当broker 4离线了一段时间后再上线时,其topic and partition的变化如下:


Topic And Partition


Leader


ISR


[topic] partition 0


1


1,2,


[topic] partition 1


2


2,3


[topic] partition 2


3


3,4


[topic] partition 3


1


4,1

在Broker 1上出现了2个leader,即partition 0和partition 3的leader位于broker 1了。则接着broker 2离线了一段时间后再上线时,其topic and partition的变化如下:


Topic And Partition


Leader


ISR


[topic] partition 0


1


1,2,


[topic] partition 1


3


2,3


[topic] partition 2


3


3,4


[topic] partition 3


1


4,1

此时leader都集中在了broker 1和broker 3上,其它节点没有leader了,那么这个时候生成者都会把数据发生给broker 1和broker 3,造成该两个节点负载比较大,如果此时配置了auto.leader.rebalance.enable=true的话,即开了负载均衡的功能的话,topic and partition的leader会发生迁移,会尽量恢复成系统初始的状态,即如下:

Topic And Partition


Leader


ISR


[topic] partition 0


1


1,2,


[topic] partition 1


2


2,3


[topic] partition 2


3


3,4


[topic] partition 3


4


4,1

 
即定时任务checkAndTriggerPartitionRebalance
private def checkAndTriggerPartitionRebalance(): Unit = {
  if (isActive()) {
    trace("checking need to trigger partition rebalance")
    // 获取所有在线的broker的replicas
    var preferredReplicasForTopicsByBrokers: Map[Int, Map[TopicAndPartition, Seq[Int]]] = null
    inLock(controllerContext.controllerLock) {
      preferredReplicasForTopicsByBrokers =
        controllerContext.partitionReplicaAssignment.filterNot(p => deleteTopicManager.isTopicQueuedUpForDeletion(p._1.topic)).groupBy {
          case(topicAndPartition, assignedReplicas) => assignedReplicas.head
        }
    }
    debug("preferred replicas by broker " + preferredReplicasForTopicsByBrokers)
    // for each broker, check if a preferred replica election needs to be triggered
    preferredReplicasForTopicsByBrokers.foreach {
      case(leaderBroker, topicAndPartitionsForBroker) => {
        var imbalanceRatio: Double = 0
        var topicsNotInPreferredReplica: Map[TopicAndPartition, Seq[Int]] = null
        inLock(controllerContext.controllerLock) {
//过滤出leader不在PreferredReplica的head的topics
          topicsNotInPreferredReplica =
            topicAndPartitionsForBroker.filter {
              case(topicPartition, replicas) => {
                controllerContext.partitionLeadershipInfo.contains(topicPartition) &&
                //leaderAndIsr.leader != leaderBroker(目前的leader和原本的assignedReplicas的第一个broker不一样)
                controllerContext.partitionLeadershipInfo(topicPartition).leaderAndIsr.leader != leaderBroker
              }
            }
          debug("topics not in preferred replica " + topicsNotInPreferredReplica)
          val totalTopicPartitionsForBroker = topicAndPartitionsForBroker.size
          val totalTopicPartitionsNotLedByBroker = topicsNotInPreferredReplica.size
//计算不平衡度
          imbalanceRatio = totalTopicPartitionsNotLedByBroker.toDouble / totalTopicPartitionsForBroker
          trace("leader imbalance ratio for broker %d is %f".format(leaderBroker, imbalanceRatio))
        }
        //如果不平衡读到达某个程度,则触发均衡
        if (imbalanceRatio > (config.leaderImbalancePerBrokerPercentage.toDouble / 100)) {
          topicsNotInPreferredReplica.foreach {
            case(topicPartition, replicas) => {
              inLock(controllerContext.controllerLock) {
                if (controllerContext.liveBrokerIds.contains(leaderBroker) &&// leaderBroker必须是在线状态
                    controllerContext.partitionsBeingReassigned.size == 0 &&//没有partition在进行重分配,避免加重系统负担
                    controllerContext.partitionsUndergoingPreferredReplicaElection.size == 0 &&//没有partition在被重新选举leader
                    !deleteTopicManager.isTopicQueuedUpForDeletion(topicPartition.topic) &&//该topic不需要删除
                    controllerContext.allTopics.contains(topicPartition.topic)) {//该topic有效
                  onPreferredReplicaElection(Set(topicPartition), true)//则触发对这个topic and partition的PreferredReplicaElection过程
                }
              }
            }
          }
        }
      }
    }
  }
}

12.8 KafkaController内部topic删除流程TopicDeletionManager

本质是就是开启DeleteTopicsThread线程,然后等待KafakController触发删除
class DeleteTopicsThread() extends ShutdownableThread(name = "delete-topics-thread-" + controller.config.brokerId, isInterruptible = false) {
  val zkClient = controllerContext.zkClient
  override def doWork() {
    awaitTopicDeletionNotification()//等待KafakController触发删除
if (!isRunning.get)
      return
    inLock(controllerContext.controllerLock) {
      val topicsQueuedForDeletion = Set.empty[String] ++ topicsToBeDeleted
      if(!topicsQueuedForDeletion.isEmpty)
        info("Handling deletion for topics " + topicsQueuedForDeletion.mkString(","))
      topicsQueuedForDeletion.foreach { topic =>
      //由于是异步流程,则当topic的每个partition的replicas成功删除之后
        if(controller.replicaStateMachine.areAllReplicasForTopicDeleted(topic)) {
          //此时清除KafakController内部关于该topic的任何信息
          completeDeleteTopic(topic)
          info("Deletion of topic %s successfully completed".format(topic))
        } else {//忽略topic正在删除的状态
          if(controller.replicaStateMachine.isAtLeastOneReplicaInDeletionStartedState(topic)) {
            // ignore since topic deletion is in progress
            val replicasInDeletionStartedState = controller.replicaStateMachine.replicasInState(topic, ReplicaDeletionStarted)
            val replicaIds = replicasInDeletionStartedState.map(_.replica)
            val partitions = replicasInDeletionStartedState.map(r => TopicAndPartition(r.topic, r.partition))
            info("Deletion for replicas %s for partition %s of topic %s in progress".format(replicaIds.mkString(","),
              partitions.mkString(","), topic))
          } else {
            //删除replica出现意外,应该重试
            if(controller.replicaStateMachine.isAnyReplicaInState(topic, ReplicaDeletionIneligible)) {
              markTopicForDeletionRetry(topic)
            }
          }
        }
        //如果topic可以删除的话,则开始删除该topic,最重要的动作就是向该topic所在的所有broker发送StopReplicaRequest,
//通知各个broker停止同步并且删除对应的replica
        if(isTopicEligibleForDeletion(topic)) {
          info("Deletion of topic %s (re)started".format(topic))
          // topic deletion will be kicked off
          onTopicDeletion(Set(topic))
        } else if(isTopicIneligibleForDeletion(topic)) {
          info("Not retrying deletion of topic %s at this time since it is marked ineligible for deletion".format(topic))
        }
      }
    }
  }
}

12.9 KafkaController(leader)和其它broker通信流程ControllerChannelManager

ControllerChannelManager保存了和各个broker通信的通道:
class ControllerChannelManager (private val controllerContext: ControllerContext, config: KafkaConfig) extends Logging {
  private val brokerStateInfo = new HashMap[Int, ControllerBrokerStateInfo]
}

且看ControllerBrokerStateInfo类:
case class ControllerBrokerStateInfo(channel: BlockingChannel,
                                     broker: Broker,
                                     messageQueue: BlockingQueue[(RequestOrResponse, (RequestOrResponse) => Unit)],
                                     requestSendThread: RequestSendThread)

其messageQueue存放了发往特定broker的消息,其每个消息对应一个cb回调函数,channel为和broker通信的链路,RequestSendThread为其发送线程,查看requestSendThread发送线程:
class RequestSendThread(val controllerId: Int,
                        val controllerContext: ControllerContext,
                        val toBroker: Broker,
                        val queue: BlockingQueue[(RequestOrResponse, (RequestOrResponse) => Unit)],
                        val channel: BlockingChannel)
  extends ShutdownableThread("Controller-%d-to-broker-%d-send-thread".format(controllerId, toBroker.id)) {
  private val lock = new Object()
  private val stateChangeLogger = KafkaController.stateChangeLogger
  connectToBroker(toBroker, channel)

  override def doWork(): Unit = {
    val queueItem = queue.take()//获取请求
    val request = queueItem._1
    val callback = queueItem._2
    var receive: Receive = null
    try {
      lock synchronized {
        var isSendSuccessful = false
        while(isRunning.get() && !isSendSuccessful) {
          // if a broker goes down for a long time, then at some point the controller's zookeeper listener will trigger a
          // removeBroker which will invoke shutdown() on this thread. At that point, we will stop retrying.
          try {
            channel.send(request)//发送请求
            receive = channel.receive()//获取响应
            isSendSuccessful = true
          } catch {
            case e: Throwable => // if the send was not successful, reconnect to broker and resend the message
              warn(("Controller %d epoch %d fails to send request %s to broker %s. " +
                "Reconnecting to broker.").format(controllerId, controllerContext.epoch,
                request.toString, toBroker.toString()), e)
              channel.disconnect()
              connectToBroker(toBroker, channel)
              isSendSuccessful = false
              // backoff before retrying the connection and send
              Utils.swallow(Thread.sleep(300))
          }
        }
        var response: RequestOrResponse = null
        request.requestId.get match {// 转化不同的response
          case RequestKeys.LeaderAndIsrKey =>
            response = LeaderAndIsrResponse.readFrom(receive.buffer)
          case RequestKeys.StopReplicaKey =>
            response = StopReplicaResponse.readFrom(receive.buffer)
          case RequestKeys.UpdateMetadataKey =>
            response = UpdateMetadataResponse.readFrom(receive.buffer)
        }
        stateChangeLogger.trace("Controller %d epoch %d received response %s for a request sent to broker %s"
                                  .format(controllerId, controllerContext.epoch, response.toString, toBroker.toString))
        //如果设置了回调函数,则触发回调
        if(callback != null) {
          callback(response)
        }
      }
    } catch {
      case e: Throwable =>
        error("Controller %d fails to send a request to broker %s".format(controllerId, toBroker.toString()), e)
        // If there is any socket error (eg, socket timeout), the channel is no longer usable and needs to be recreated.
        channel.disconnect()
    }
  }

时间: 2024-07-29 18:38:19

kafka源码解析之十二KafkaController(下篇)的相关文章

kafka源码解析之十二KafkaController(中篇)

12.3 KafkaController PartitionStateMachine 它实现了topic的分区状态切换功能,Partition存在的状态如下: 状态名 状态存在的时间 有效的前置状态 NonExistentPartition 1.partition重来没有被创建 2.partition创建之后被删除 OfflinePartition NewPartition 1.partition创建之后,被分配了replicas,但是还没有leader/isr NonExistentParti

Mybaits 源码解析 (十二)----- Mybatis的事务如何被Spring管理?Mybatis和Spring事务中用的Connection是同一个吗?

不知道一些同学有没有这种疑问,为什么Mybtis中要配置dataSource,Spring的事务中也要配置dataSource?那么Mybatis和Spring事务中用的Connection是同一个吗?我们常用配置如下 <!--会话工厂 --> <bean id="sqlSessionFactory" class="org.mybatis.spring.SqlSessionFactoryBean"> <property name=&qu

hbase源码系列(十二)Get、Scan在服务端是如何处理?

继上一篇讲了Put和Delete之后,这一篇我们讲Get和Scan, 因为我发现这两个操作几乎是一样的过程,就像之前的Put和Delete一样,上一篇我本来只打算写Put的,结果发现Delete也可以走这个过程,所以就一起写了. Get 我们打开HRegionServer找到get方法.Get的方法处理分两种,设置了ClosestRowBefore和没有设置的,一般来讲,我们都是知道了明确的rowkey,不太会设置这个参数,它默认是false的. if (get.hasClosestRowBef

【原】Android热更新开源项目Tinker源码解析系列之二:资源文件热更新

上一篇文章介绍了Dex文件的热更新流程,本文将会分析Tinker中对资源文件的热更新流程. 同Dex,资源文件的热更新同样包括三个部分:资源补丁生成,资源补丁合成及资源补丁加载. 本系列将从以下三个方面对Tinker进行源码解析: Android热更新开源项目Tinker源码解析系列之一:Dex热更新 Android热更新开源项目Tinker源码解析系列之二:资源热更新 Android热更新开源项目Tinker源码解析系类之三:so热更新 转载请标明本文来源:http://www.cnblogs

libevent源码深度剖析十二

libevent源码深度剖析十二 --让libevent支持多线程张亮 Libevent本身不是多线程安全的,在多核的时代,如何能充分利用CPU的能力呢,这一节来说说如何在多线程环境中使用libevent,跟源代码并没有太大的关系,纯粹是使用上的技巧. 1 错误使用示例 在多核的CPU上只使用一个线程始终是对不起CPU的处理能力啊,那好吧,那就多创建几个线程,比如下面的简单服务器场景.1 主线程创建工作线程1:2 接着主线程监听在端口上,等待新的连接:3 在线程1中执行event事件循环,等待事

JDK8源码解析 -- HashMap(二)

在上一篇JDK8源码解析 -- HashMap(一)的博客中关于HashMap的重要知识点已经讲了差不多了,还有一些内容我会在今天这篇博客中说说,同时我也会把一些我不懂的问题抛出来,希望看到我这篇博客的大神帮忙解答困扰我的问题,让我明白一个所以然来.彼此互相进步,互相成长.HashMap从jdk7到jdk8版本改变大,1.新增加的节点在链表末尾进行添加  2.使用了红黑树. 1. HashMap容量大小求值方法 // 返回2的幂次 static final int tableSizeFor(in

android源码解析之(二)--&gt;异步任务AsyncTask

android的异步任务体系中还有一个非常重要的操作类:AsyncTask,其内部主要使用的是java的线程池和Handler来实现异步任务以及与UI线程的交互.本文主要解析AsyncTask的的使用与源码. 首先我们来看一下AsyncTask的基本使用: class MAsyncTask extends AsyncTask<Integer, Integer, Integer> { @Override protected void onPreExecute() { super.onPreExe

Redis源码解析(十五)--- aof-append only file解析

继续学习redis源码下的Data数据相关文件的代码分析,今天我看的是一个叫aof的文件,这个字母是append ONLY file的简称,意味只进行追加文件操作.这里的文件追加记录时为了记录数据操作的改变记录,用以异常情况的数据恢复的.类似于之前我说的redo,undo日志的作用.我们都知道,redis作为一个内存数据库,数据的每次操作改变是先放在内存中,等到内存数据满了,在刷新到磁盘文件中,达到持久化的目的.所以aof的操作模式,也是采用了这样的方式.这里引入了一个block块的概念,其实就

android源码解析(十九)--&gt;Dialog加载绘制流程

前面两篇文章,我们分析了Activity的布局文件加载.绘制流程,算是对整个Android系统中界面的显示流程有了一个大概的了解,其实Android系统中所有的显示控件(注意这里是控件,而不是组件)的加载绘制流程都是类似的,包括:Dialog的加载绘制流程,PopupWindow的加载绘制流程,Toast的显示原理等,上一篇文章中,我说在介绍了Activity界面的加载绘制流程之后,就会分析一下剩余几个控件的显示控制流程,这里我打算先分析一下Dialog的加载绘制流程. 可能有的同学问这里为什么