kafka consumer 分区reblance算法

转载请注明原创地址 http://www.cnblogs.com/dongxiao-yang/p/6238029.html

最近需要详细研究下kafka reblance过程中分区计算的算法细节,网上搜了部分说法,感觉比较晦涩且不太易懂,还是自己抠源码比较简便一点。

kafka reblance计算部分代码如下:

class RangeAssignor() extends PartitionAssignor with Logging {

  def assign(ctx: AssignmentContext) = {
    val valueFactory = (topic: String) => new mutable.HashMap[TopicAndPartition, ConsumerThreadId]
    val partitionAssignment =
      new Pool[String, mutable.Map[TopicAndPartition, ConsumerThreadId]](Some(valueFactory))
    for (topic <- ctx.myTopicThreadIds.keySet) {
      val curConsumers = ctx.consumersForTopic(topic)
      val curPartitions: Seq[Int] = ctx.partitionsForTopic(topic)

      val nPartsPerConsumer = curPartitions.size / curConsumers.size
      val nConsumersWithExtraPart = curPartitions.size % curConsumers.size

      info("Consumer " + ctx.consumerId + " rebalancing the following partitions: " + curPartitions +
        " for topic " + topic + " with consumers: " + curConsumers)

      for (consumerThreadId <- curConsumers) {
        val myConsumerPosition = curConsumers.indexOf(consumerThreadId)
        assert(myConsumerPosition >= 0)
        val startPart = nPartsPerConsumer * myConsumerPosition + myConsumerPosition.min(nConsumersWithExtraPart)
        val nParts = nPartsPerConsumer + (if (myConsumerPosition + 1 > nConsumersWithExtraPart) 0 else 1)

        /**
         *   Range-partition the sorted partitions to consumers for better locality.
         *  The first few consumers pick up an extra partition, if any.
         */
        if (nParts <= 0)
          warn("No broker partitions consumed by consumer thread " + consumerThreadId + " for topic " + topic)
        else {
          for (i <- startPart until startPart + nParts) {
            val partition = curPartitions(i)
            info(consumerThreadId + " attempting to claim partition " + partition)
            // record the partition ownership decision
            val assignmentForConsumer = partitionAssignment.getAndMaybePut(consumerThreadId.consumer)
            assignmentForConsumer += (TopicAndPartition(topic, partition) -> consumerThreadId)
          }
        }
      }
    }
  def getPartitionsForTopics(topics: Seq[String]): mutable.Map[String, Seq[Int]] = {
    getPartitionAssignmentForTopics(topics).map { topicAndPartitionMap =>
      val topic = topicAndPartitionMap._1
      val partitionMap = topicAndPartitionMap._2
      debug("partition assignment of /brokers/topics/%s is %s".format(topic, partitionMap))
      (topic -> partitionMap.keys.toSeq.sortWith((s,t) => s < t))
    }
  }
  def getConsumersPerTopic(group: String, excludeInternalTopics: Boolean) : mutable.Map[String, List[ConsumerThreadId]] = {
    val dirs = new ZKGroupDirs(group)
    val consumers = getChildrenParentMayNotExist(dirs.consumerRegistryDir)
    val consumersPerTopicMap = new mutable.HashMap[String, List[ConsumerThreadId]]
    for (consumer <- consumers) {
      val topicCount = TopicCount.constructTopicCount(group, consumer, this, excludeInternalTopics)
      for ((topic, consumerThreadIdSet) <- topicCount.getConsumerThreadIdsPerTopic) {
        for (consumerThreadId <- consumerThreadIdSet)
          consumersPerTopicMap.get(topic) match {
            case Some(curConsumers) => consumersPerTopicMap.put(topic, consumerThreadId :: curConsumers)
            case _ => consumersPerTopicMap.put(topic, List(consumerThreadId))
          }
      }
    }
    for ( (topic, consumerList) <- consumersPerTopicMap )
      consumersPerTopicMap.put(topic, consumerList.sortWith((s,t) => s < t))
    consumersPerTopicMap
  }

计算过程主要由上述高亮代码部分实现,举例说明,一个拥有十个分区的topic,相同group拥有三个consumerid为aaa,ccc,bbb的消费者

1 由后两段代码可知,获取consumerid列表和partition分区列表都是已经排好序的,所以

curConsumers=(aaa,bbb,ccc)

curPartitions=(0,1,2,3,4,5,6,7,8,9)

2

nPartsPerConsumer=10/3  =3

nConsumersWithExtraPart=10%3  =1

3 假设当前客户端id为aaa

myConsumerPosition= curConsumers.indexof(aaa) =0

4 计算分区范围

startPart= 3*0+0.min(1) = 0

nParts = 3+(if (0 + 1 > 1) 0 else 1)=3+1=4

所以aaa对应的分区号为[0,4),即0,1,2,3前面四个分区

同理可得bbb对应myConsumerPosition=1,对应分区4,5,6中间三个分区

ccc对应myConsumerPosition=2,对应7,8,9最后三个分区。

时间: 2024-08-24 02:57:31

kafka consumer 分区reblance算法的相关文章

Kafka设计解析(四)- Kafka Consumer设计解析

本文转发自Jason’s Blog,原文链接 http://www.jasongj.com/2015/08/09/KafkaColumn4 摘要 本文主要介绍了Kafka High Level Consumer,Consumer Group,Consumer Rebalance,Low Level Consumer实现的语义,以及适用场景.以及未来版本中对High Level Consumer的重新设计–使用Consumer Coordinator解决Split Brain和Herd等问题. H

[Big Data - Kafka] Kafka设计解析(四):Kafka Consumer解析

High Level Consumer 很多时候,客户程序只是希望从Kafka读取数据,不太关心消息offset的处理.同时也希望提供一些语义,例如同一条消息只被某一个Consumer消费(单播)或被所有Consumer消费(广播).因此,Kafka High Level Consumer提供了一个从Kafka消费数据的高层抽象,从而屏蔽掉其中的细节并提供丰富的语义. Consumer Group High Level Consumer将从某个Partition读取的最后一条消息的offset存

转:Kafka设计解析(四):Kafka Consumer解析

High Level Consumer 很多时候,客户程序只是希望从Kafka读取数据,不太关心消息offset的处理.同时也希望提供一些语义,例如同一条消息只被某一个 Consumer消费(单播)或被所有Consumer消费(广播).因此,Kafka High Level Consumer提供了一个从Kafka消费数据的高层抽象,从而屏蔽掉其中的细节并提供丰富的语义. Consumer Group High Level Consumer将从某个Partition读取的最后一条消息的offset

kafka的分区模式?

当别人问这个问题的时候,别人肯定是想你是否看过源码.是否针对不同场景改过kafka的分区模式 这是别人最想知道的是,你的message如何负载均衡的发送给topic的partition 我们用kafka的时候,可以动态指定partition,也可以不指定partition 当我们动态指定了partition的时候,kafka会将消息发送到指定的partition 如果没有指定partition 这就是关键了, 如何让这些消息,均衡的发送给每个partition 先看看发送消息的方式 kafka首

Kafka Consumer应用与高级应用

Kafka Consumer应用与高级应用 PS:本博客仅作学习.总结.交流使用,参考以下博客&资料 1.http://kafka.apache.org/intro.html 2.https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example 3.http://www.cnblogs.com/luotianshuai/p/5206662.html 4.http://www.cnblogs.com/fxj

【原创】Kafka Consumer 多线程 实例

Kafka 0.9版本开始推出了Java版本的consumer,优化了coordinator的设计以及摆脱了对zookeeper的依赖.社区最近也在探讨正式用这套consumer API替换Scala版本的consumer的计划.鉴于目前这方面的资料并不是很多,本文将尝试给出一个利用KafkaConsumer编写的多线程消费者实例,希望对大家有所帮助. 这套API最重要的入口就是KafkaConsumer(o.a.k.clients.consumer.KafkaConsumer),普通的单线程使

Kafka详解五、Kafka Consumer的底层API- SimpleConsumer

Kafka提供了两套API给Consumer The high-level Consumer API The SimpleConsumer API 第一种高度抽象的Consumer API,它使用起来简单.方便,但是对于某些特殊的需求我们可能要用到第二种更底层的API,那么先介绍下第二种API能够帮助我们做哪些事情 一个消息读取多次 在一个处理过程中只消费Partition其中的一部分消息 添加事务管理机制以保证消息被处理且仅被处理一次 使用SimpleConsumer有哪些弊端呢? 必须在程序

【原创】kafka consumer源代码分析

顾名思义,就是kafka的consumer api包. 一.ConsumerConfig.scala Kafka consumer的配置类,除了一些默认值常量及验证参数的方法之外,就是consumer的配置参数了,比如group.id, consumer.id等,详细列表见官网. 二.ConsumerIterator.scala KafkaStream的迭代器类,当stream底层的阻塞队列为空时该iterator会置于阻塞状态.这个迭代器还提供了一个shutdownCommand对象可作为一个

kafka 心跳和 reblance

kafka 的心跳是 kafka consumer 和 broker 之间的健康检查,只有当 broker coordinator 正常时,consumer 才会发送心跳. consumer 和 reblance 相关的 2 个配置参数: 参数名 --> MemberMetadata 字段 session.timeout.ms --> MemberMetadata.sessionTimeoutMs max.poll.interval.ms --> MemberMetadata.rebal