GroupCoordinator 部分源码解析

转发请注明原创地址 http://www.cnblogs.com/dongxiao-yang/p/7463693.html

kafka新版consumer所有的group管理工作在服务端都由GroupCoordinator这个新角色来处理,最近测试发现consumer在reblance过程中会有各种各样的等待行为,于是研究下相关源码,GroupCoordinator是broker服务端处理consumer各种group相关请求的管理类。本次源码研究版本是0.10.2.0

首先贴一下huxihxKafka消费组(consumer group)画过的一个流程图

这个图以及下面的几个流程图非常清晰的表明了当一个consumer(无论是新初始化的实例还是各种情况重新reblance的已有客户端)试图加入一个group的第一步都是先发送一个JoinGoupRequest到Coordinator,这个请求里具体包含了什么信息可以从AbstractCoordinator这个类的源代码找到

    /**
     * Join the group and return the assignment for the next generation. This function handles both
     * JoinGroup and SyncGroup, delegating to {@link #performAssignment(String, String, Map)} if
     * elected leader by the coordinator.
     * @return A request future which wraps the assignment returned from the group leader
     */
    private RequestFuture<ByteBuffer> sendJoinGroupRequest() {
        if (coordinatorUnknown())
            return RequestFuture.coordinatorNotAvailable();

        // send a join group request to the coordinator
        log.info("(Re-)joining group {}", groupId);
        JoinGroupRequest.Builder requestBuilder = new JoinGroupRequest.Builder(
                groupId,
                this.sessionTimeoutMs,
                this.generation.memberId,
                protocolType(),
                metadata()).setRebalanceTimeout(this.rebalanceTimeoutMs);

        log.debug("Sending JoinGroup ({}) to coordinator {}", requestBuilder, this.coordinator);
        return client.send(coordinator, requestBuilder)
                .compose(new JoinGroupResponseHandler());

  private Generation generation = Generation.NO_GENERATION;

    protected static class Generation {
        public static final Generation NO_GENERATION = new Generation(
                OffsetCommitRequest.DEFAULT_GENERATION_ID,
                JoinGroupRequest.UNKNOWN_MEMBER_ID,
                null);

        public final int generationId;
        public final String memberId;
        public final String protocol;

        public Generation(int generationId, String memberId, String protocol) {
            this.generationId = generationId;
            this.memberId = memberId;
            this.protocol = protocol;
        }
    }  

上述可以看出sendJoinGroupRequest里面包含了groupid,sesseionTimeout,membeid,rebalancetimeout等几个属性,如果是新初始化的consumer程序generation属性默认为NO_GENERATION,memberid就是JoinGroupRequest.UNKNOWN_MEMBER_ID

然后是server处理sendJoinGroupRequest的代码,请求被转交到了GroupCoordinator类里的handleJoinGroup方法,该方法在校验了部分参数和group状态的合法性后将具体工作放到了doJoinGroup方法里。

private def doJoinGroup(group: GroupMetadata,
                          memberId: String,
                          clientId: String,
                          clientHost: String,
                          rebalanceTimeoutMs: Int,
                          sessionTimeoutMs: Int,
                          protocolType: String,
                          protocols: List[(String, Array[Byte])],
                          responseCallback: JoinCallback) {
    group synchronized {
      if (!group.is(Empty) && (group.protocolType != Some(protocolType) || !group.supportsProtocols(protocols.map(_._1).toSet))) {
        // if the new member does not support the group protocol, reject it
        responseCallback(joinError(memberId, Errors.INCONSISTENT_GROUP_PROTOCOL.code))
      } else if (memberId != JoinGroupRequest.UNKNOWN_MEMBER_ID && !group.has(memberId)) {
        // if the member trying to register with a un-recognized id, send the response to let
        // it reset its member id and retry
        responseCallback(joinError(memberId, Errors.UNKNOWN_MEMBER_ID.code))
      } else {
        group.currentState match {
          case Dead =>
            // if the group is marked as dead, it means some other thread has just removed the group
            // from the coordinator metadata; this is likely that the group has migrated to some other
            // coordinator OR the group is in a transient unstable phase. Let the member retry
            // joining without the specified member id,
            responseCallback(joinError(memberId, Errors.UNKNOWN_MEMBER_ID.code))

          case PreparingRebalance =>
            if (memberId == JoinGroupRequest.UNKNOWN_MEMBER_ID) {
              addMemberAndRebalance(rebalanceTimeoutMs, sessionTimeoutMs, clientId, clientHost, protocolType, protocols, group, responseCallback)
            } else {
              val member = group.get(memberId)
              updateMemberAndRebalance(group, member, protocols, responseCallback)
            }

          case AwaitingSync =>
            if (memberId == JoinGroupRequest.UNKNOWN_MEMBER_ID) {
              addMemberAndRebalance(rebalanceTimeoutMs, sessionTimeoutMs, clientId, clientHost, protocolType, protocols, group, responseCallback)
            } else {
              val member = group.get(memberId)
              if (member.matches(protocols)) {
                // member is joining with the same metadata (which could be because it failed to
                // receive the initial JoinGroup response), so just return current group information
                // for the current generation.
                responseCallback(JoinGroupResult(
                  members = if (memberId == group.leaderId) {
                    group.currentMemberMetadata
                  } else {
                    Map.empty
                  },
                  memberId = memberId,
                  generationId = group.generationId,
                  subProtocol = group.protocol,
                  leaderId = group.leaderId,
                  errorCode = Errors.NONE.code))
              } else {
                // member has changed metadata, so force a rebalance
                updateMemberAndRebalance(group, member, protocols, responseCallback)
              }
            }

          case Empty | Stable =>
            if (memberId == JoinGroupRequest.UNKNOWN_MEMBER_ID) {
              // if the member id is unknown, register the member to the group
              addMemberAndRebalance(rebalanceTimeoutMs, sessionTimeoutMs, clientId, clientHost, protocolType, protocols, group, responseCallback)
            } else {
              val member = group.get(memberId)
              if (memberId == group.leaderId || !member.matches(protocols)) {
                // force a rebalance if a member has changed metadata or if the leader sends JoinGroup.
                // The latter allows the leader to trigger rebalances for changes affecting assignment
                // which do not affect the member metadata (such as topic metadata changes for the consumer)
                updateMemberAndRebalance(group, member, protocols, responseCallback)
              } else {
                // for followers with no actual change to their metadata, just return group information
                // for the current generation which will allow them to issue SyncGroup
                responseCallback(JoinGroupResult(
                  members = Map.empty,
                  memberId = memberId,
                  generationId = group.generationId,
                  subProtocol = group.protocol,
                  leaderId = group.leaderId,
                  errorCode = Errors.NONE.code))
              }
            }
        }

        if (group.is(PreparingRebalance))
          joinPurgatory.checkAndComplete(GroupKey(group.groupId))
      }
    }
  }  

GroupMetadata对象是一个有PreparingRebalance,AwaitingSync,Stable,Dead,Empty几种状态的状态机,在服务端用于表示当前管理group的状态。

一 第一批consumer加入group

1 由上文可知,新初始化的consumer刚开始的memberid都是JoinGroupRequest.UNKNOWN_MEMBER_ID,所有新成员都进入addMemberAndRebalance方法初始化一个member对象并add进group列表内部,只有一个加入的member才能进入maybePrepareRebalance的同步代码块内调用prepareReblacne方法

  private def addMemberAndRebalance(rebalanceTimeoutMs: Int,
                                    sessionTimeoutMs: Int,
                                    clientId: String,
                                    clientHost: String,
                                    protocolType: String,
                                    protocols: List[(String, Array[Byte])],
                                    group: GroupMetadata,
                                    callback: JoinCallback) = {
    // use the client-id with a random id suffix as the member-id
    val memberId = clientId + "-" + group.generateMemberIdSuffix
    val member = new MemberMetadata(memberId, group.groupId, clientId, clientHost, rebalanceTimeoutMs,
      sessionTimeoutMs, protocolType, protocols)
    member.awaitingJoinCallback = callback
    group.add(member)
    maybePrepareRebalance(group)
    member
  }

  private def maybePrepareRebalance(group: GroupMetadata) {
    group synchronized {
      if (group.canRebalance)
        prepareRebalance(group)
    }
  }

  

prepareReblacne会把group的状态由上述的empty转变为PreparingRebalance,后续的客户端会判断PreparingRebalance同样进入addMemberAndRebalance,这样即使第一个member退出maybePrepareRebalance的synchronized代码块,剩余的member会发现group.canRebalacne返回的都是false直接略过

  private def prepareRebalance(group: GroupMetadata) {
    // if any members are awaiting sync, cancel their request and have them rejoin
    if (group.is(AwaitingSync))
      resetAndPropagateAssignmentError(group, Errors.REBALANCE_IN_PROGRESS)

    group.transitionTo(PreparingRebalance)
    info("Preparing to restabilize group %s with old generation %s".format(group.groupId, group.generationId))

    val rebalanceTimeout = group.rebalanceTimeoutMs
    val delayedRebalance = new DelayedJoin(this, group, rebalanceTimeout)
    val groupKey = GroupKey(group.groupId)
    joinPurgatory.tryCompleteElseWatch(delayedRebalance, Seq(groupKey))
  }

 

上述代码里生成了一个DelayJoin,DelayJoin是kafka内部一种有超时时间的Timer.task的实现,会在两种情况下根据情况执行对应操作,一是timeout超时,另一种是满足某种条件后由程序主动运行并注销定时任务,注意这里放的时间是rebalanceTimeout而不是sessiontimeout。

我们看一下joinPurgatory.tryCompleteElseWatch(delayedRebalance, Seq(groupKey))和joinPurgatory.checkAndComplete(GroupKey(group.groupId))这两个方法的调用链路。

joinPurgatory.tryCompleteElseWatch->DelayedJoin.safeTryComplete->DelayedJoin.tryComplete->coordinator.tryCompleteJoin

joinPurgatory.checkAndComplete->DelayedOperation.checkAndComplete->DelayedJoin.safeTryComplete->DelayedJoin.tryComplete->coordinator.tryCompleteJoin

所以无论是第一个member结束prepareReblacne还是后续的member在doJoinGroup代码的最后都是去调用一下coordinator.tryCompleteJoin这个方法尝试完成joinGroup的等待

  def tryCompleteJoin(group: GroupMetadata, forceComplete: () => Boolean) = {
    group synchronized {
      if (group.notYetRejoinedMembers.isEmpty)
        forceComplete()
      else false
    }
  }

def notYetRejoinedMembers = members.values.filter(_.awaitingJoinCallback == null).toList

tryCompleteJoin的判断逻辑非常简单,GroupMetadata内部缓存的所有member都有对应的注册连接上来(addMemberAndRebalance方法里的member.awaitingJoinCallback = callback会给member的awaitingJoinCallback赋予一个值,值为null的就是有之前的member没有加入进来),如果notYetRejoinedMembers的列表为空,那么客户端就齐了,可以进行reblance分配,如果一直不齐,那么会等到rebalanceTimeout过期后触发强制reblance。

二 heartbeat和session timeout

在reblance过程中可以从下列源码看到heartbeat的delay时间设置的是session.timeout,如果一个旧的consumer死掉后在这个时间内持续没有心跳,那么服务端onMemberFailure会把group内对应的memberid删除并重试一下joinPurgatory.checkAndComplete,如果前次删除后notYetRejoinedMembers变为空后那么joingroup的等待也结束了。

  /**
   * Complete existing DelayedHeartbeats for the given member and schedule the next one
   */
  private def completeAndScheduleNextHeartbeatExpiration(group: GroupMetadata, member: MemberMetadata) {
    // complete current heartbeat expectation
    member.latestHeartbeat = time.milliseconds()
    val memberKey = MemberKey(member.groupId, member.memberId)
    heartbeatPurgatory.checkAndComplete(memberKey)

    // reschedule the next heartbeat expiration deadline
    val newHeartbeatDeadline = member.latestHeartbeat + member.sessionTimeoutMs
    val delayedHeartbeat = new DelayedHeartbeat(this, group, member, newHeartbeatDeadline, member.sessionTimeoutMs)
    heartbeatPurgatory.tryCompleteElseWatch(delayedHeartbeat, Seq(memberKey))
  }

 def onExpireHeartbeat(group: GroupMetadata, member: MemberMetadata, heartbeatDeadline: Long) {
    group synchronized {
      if (!shouldKeepMemberAlive(member, heartbeatDeadline))
        onMemberFailure(group, member)
    }
  }

  private def onMemberFailure(group: GroupMetadata, member: MemberMetadata) {
    trace("Member %s in group %s has failed".format(member.memberId, group.groupId))
    group.remove(member.memberId)
    group.currentState match {
      case Dead | Empty =>
      case Stable | AwaitingSync => maybePrepareRebalance(group)
      case PreparingRebalance => joinPurgatory.checkAndComplete(GroupKey(group.groupId))
    }
  }

  

结论,个人在测试过程中发现重启consumer中会有的部分卡顿大部分应该是由于这个notYetRejoinedMembers的列表由于上一次的关掉的consumer的session没有到期造成非空引起的等待。

参考文档

1 http://blog.csdn.net/zhanglh046/article/details/72833073

2 http://www.cnblogs.com/huxi2b/p/6223228.html

时间: 2024-10-21 02:19:36

GroupCoordinator 部分源码解析的相关文章

ChrisRenke/DrawerArrowDrawable源码解析

转载请注明出处http://blog.csdn.net/crazy__chen/article/details/46334843 源码下载地址http://download.csdn.net/detail/kangaroo835127729/8765757 这次解析的控件DrawerArrowDrawable是一款侧拉抽屉效果的控件,在很多应用上我们都可以看到(例如知乎),控件的github地址为https://github.com/ChrisRenke/DrawerArrowDrawable

五.jQuery源码解析之jQuery.extend(),jQuery.fn.extend()

给jQuery做过扩展或者制作过jQuery插件的人这两个方法东西可能不陌生.jQuery.extend([deep],target,object1,,object2...[objectN]) jQuery.fn.extend([deep],target,object1,,object2...[objectN])这两个属性都是用于合并两个或多个对象的属性到target对象.deep是布尔值,表示是否进行深度合并,默认是false,不执行深度合并.通过这种方式可以在jQuery或jQuery.fn

eclipse中导入jdk源码、SpringMVC注解@RequestParam、SpringMVC文件上传源码解析、ajax上传excel文件

eclipse中导入jdk源码:http://blog.csdn.net/evolly/article/details/18403321, http://www.codingwhy.com/view/799.html. ------------------------------- SpringMVC注解@RequestParam:http://825635381.iteye.com/blog/2196911. --------------------------- SpringMVC文件上传源

String源码解析(一)

本篇文章内的方法介绍,在方法的上面的注释讲解的很清楚,这里只阐述一些要点. Java中的String类的定义如下: 1 public final class String 2 implements java.io.Serializable, Comparable<String>, CharSequence { ...} 可以看到,String是final的,而且继承了Serializable.Comparable和CharSequence接口. 正是因为这个特性,字符串对象可以被共享,例如下面

Flume-ng源码解析之Channel组件

如果还没看过Flume-ng源码解析之启动流程,可以点击Flume-ng源码解析之启动流程 查看 1 接口介绍 组件的分析顺序是按照上一篇中启动顺序来分析的,首先是Channel,然后是Sink,最后是Source,在开始看组件源码之前我们先来看一下两个重要的接口,一个是LifecycleAware ,另一个是NamedComponent 1.1 LifecycleAware @[email protected] interface LifecycleAware {  public void s

Spring源码解析-applicationContext

Demo uml类图 ApplicationContext ApplicationListener 源码解析 主流程 obtainFreshBeanFactory prepareBeanFactory invokeBeanFactoryPostProcessors registerBeanPostProcessors registerListeners finishRefresh 总结 在已经有BeanFactory可以完成Ioc功能情况下,spring又提供了ApplicationContex

socketserver源码解析和协程版socketserver

来,贴上一段代码让你仰慕一下欧socketserver的魅力,看欧怎么完美实现多并发的魅力 client import socket ip_port = ('127.0.0.1',8009) sk = socket.socket() sk.connect(ip_port) sk.settimeout(5) while True: data = sk.recv(1024) print('receive:',data.decode()) inp = input('please input:') sk

Handler机制(四)---Handler源码解析

Handler的主要用途有两个:(1).在将来的某个时刻执行消息或一个runnable,(2)把消息发送到消息队列. 主要依靠post(Runnable).postAtTime(Runnable, long).postDelayed(Runnable, long).sendEmptyMessage(int).sendMessage(Message).sendMessageAtTime(Message).sendMessageDelayed(Message, long)这些方法来来完成消息调度.p

Android EventBus源码解析, 带你深入理解EventBus

上一篇带大家初步了解了EventBus的使用方式,详见:Android EventBus实战 没听过你就out了,本篇博客将解析EventBus的源码,相信能够让大家深入理解该框架的实现,也能解决很多在使用中的疑问:为什么可以这么做?为什么这么做不好呢? 1.概述 一般使用EventBus的组件类,类似下面这种方式: [java] view plain copy public class SampleComponent extends Fragment { @Override public vo