【原创】kafka consumer源代码分析

顾名思义,就是kafka的consumer api包。

一、ConsumerConfig.scala

Kafka consumer的配置类,除了一些默认值常量及验证参数的方法之外,就是consumer的配置参数了,比如group.id, consumer.id等,详细列表见官网。

二、ConsumerIterator.scala

KafkaStream的迭代器类,当stream底层的阻塞队列为空时该iterator会置于阻塞状态。这个迭代器还提供了一个shutdownCommand对象可作为一个标识位被加入到队列中从而触发关闭操作。

既然是迭代器,最重要的next方法一定是要提供的。下面我们依次分析下其定义的方法:

1. next:获取下一个元素。具体逻辑就是用父类的next方法获取下一个MessageAndMetadata,然后再更新一下consumer的度量元统计信息

2. makeNext:核心方法,具体逻辑如下:

  • 获取当前的迭代器,如果是空,就获取一个。具体做法就是根据超时配置以不同的方式从获取底层的channel中读取一个数据块
  • 如果该数据块是关闭命令,直接返回
  • 否则,获取当前的topic信息。如果要请求的位移值比当前已消耗的位移大,那么consumer就有可能会丢失数据。
  • 之后获取一个iterator,并调用next方法获取下一个元素,并构造新的MessageAndMetadata实例返回

3. clearCurrentChunk:清除当前的数据块,即清空了当前的迭代器引用

三、KafkaStream.scala

定义了一个Kafka consumer stream。每个stream都支持迭代遍历其MessageAndMetadata元素。内部维护了一个迭代器ConsumerIterator。KafkaStream定义的方法如下:

1. iterator:返回内部维护的迭代器

2. clear:在consumer重分布时清除被迭代的队列。主要是为了减少consumer接收到重复消息

四、ConsumerConnector.scala

consumer的主接口。定义了一个trait和一个object。ConsumerConnector trait定义了一些抽象方法:

1. createMessageStreams:为每个topic创建一组KafkaStream

2. createMessageStreams (支持指定KeyDeCoder和ValueDecoder)

3. createMessageStreamsByFilter:也是为给定的所有topic创建一组KafkaStream,只不过这个方法允许传递一个filter,允许黑白名单过滤

4. commitOffsets:向连接此consumer connector的所有broker分区执行提交位移操作

5. shutdown:关闭connector

而Consumer object定义了两个方法:

1. create:创建一个ConsumerConnector

2. createJavaConsumerConnector:创建一个java client使用的consumer connector

五、FetchedDataChunk.scala

表示一段获取到的数据块,封装了一组保存在一个字节缓冲区的消息,分区topic信息以及获取到的位移值

六、PartitionAssignor.scala

为一个consumer group中的consumer做分区分配的。PartitionAssignor trait定义了assign方法,返回分区到consumer线程的映射记录。其中被分配的线程必须要属于给定分区上下文(AssignmentContext)中的某个consumer。

说到分配上下文类——AssignmentContext,它需要接收一个consumer group、一个consumer id以及一个zkClient,并在内部维护了一个map记录topic对应的consumer线程集合(主要由TopicCount类中的方法提供)。其定义的方法还包括:

1. partitionsForTopic:返回topic对应的分区集合

2. consumersForTopic:返回topic对应的consumers线程

3. consumers:返回consumers id的集合

PartitionAssignor object定义了一个工厂方法用于创建不同策略的分区分配器,目前Kafka支持两种再平衡策略(也就是分区分配策略):round robin和range。值得注意的是,这里所说的分区策略其实是指指如何将分区分配给消费组内的不同consumer实例。

假设我们有一个topic:T1,T1有10个分区,分别是[P0, P9],然后我们有2个consumer,C1和C2。C1有一个线程,C2有两个线程。

下面我们来看看默认的range策略是如何分配分区的:

1. Range策略

对于每一个topic,range策略会首先按照数字顺序排序所有可用的分区,并按照字典顺序列出所有的consumer线程。结合我们上面的例子,分区顺序是0,1,2,3,4,5,6,7,8,9,而consumer线程的顺序是c1-0, c2-0, c2-1。然后使用分区数除以线程数以确定每个线程至少获取的分区数。在我们的例子中,10/3不能整除,余数为1,因此c1-0会被额外多分配一个分区。最后的分区分配如下:

c1-0 获得分区 0 1 2 3

c2-0 获得分区 4 5 6

c2-1 获得分区 7 8 9

如果该topic是11个分区,那么分区分配如下:

c1-0 获取分区 0 1 2 3

c2-0 获取分区 4 5 6 7

c2-1 获取分区 8 9 10

2. roundrobin策略——轮询策略

如果是轮询策略,我们上面假设的例子就不适用了,因为该策略要求订阅某个topic的所有consumer都必须有相同数目的线程数,因此我们修改上面的例子,假设每个consumer都有2个线程。round robin策略与range的一个主要的区别就是在再分配之前你是没法预测分配结果的——因为它会使用哈希求模的方式随机化排序顺序。

如果要采用roundrobin策略必须要先满足两个条件:

  • 订阅topic的consumer必须有相同数目的线程数
  • consumer group内每个consumer实例都必须有相同的被订阅topic集合

当这两个条件满足后,kafka会将topic-partition对根据hashcode进行随机排序以防某个topic的所有分区都被分配给一个consumer。之后所有的topic-partition对按照轮询的方式分配给可用的consumer线程。以我们改进过的例子来说,假设排序之后的topic-分区是这样的:

T1-5, T1-3, T1-0, T1-8, T1-2, T1-1, T1-4, T1-7, T1-6和T1-9,而consumer线程是c1-0, c1-1, c2-0, c2-1.那么最后的分区结果如下:

T1-5 去 c1-0

T1-3 去 c1-1

T1-0 去 c2-0

T1-8 去 c2-1

此时所有的consumer线程已经分配过了,但还有尚未分配的分区,这时候就从头再次分配线程:

T1-2 去 c1-0

T1-1 去 c1-1

T1-4 去 c2-0

T1-7 去 c2-1

再次从头开始,

T1-6 去 c1-0

T1-9 去 c1-1

此时所有的分区都已经分配过了,每个consumer线程能够分配到几乎相同数目的分区——这就是round robin的方式。

七、TopicCount.scala

该scala定义了很多类,我们一一分析:

1. ConsumerThreadId:封装了consumer id和线程id。因为扩展了Ordered接口,因此支持按照字典顺序排序。主要为分区策略使用。

2. TopicCount trait:提供topic分组统计的主接口,定义了三个方法:

  • getConsumerThreadIdsPerTopic——返回topic及其Consumer线程id集合的映射
  • getTopicCountMap——返回topic对应consumer stream数的映射
  • pattern:目前有三种pattern:static、white_list和black_list。通过对黑白名单的支持,允许consumer订阅多个topic

3. TopicCount object:定义了一些常用方法,比如:

  • makeThreadId:consumer thread的命名规则是[consumer id]-thread id
  • makeConsumerThreadIdsPerTopic:为给定的一组topic创建出一组ConsumerThreadId来
  • constructTopicCount:根据给定的consumer group和consumer id创建一个TopicCount。具体逻辑如下:
    • 读取/consumers/[group_id]/ids/[consumer_id]节点下的数据(JSON)
    • 解析这个JSON串,提取出各个字段的值
    • 如果pattern是static类型,创建一个StaticTopicCount返回;否则创建一个WildcardTopicCount返回

constructTopicCount还有另外两个重载方法,分别创建StaticTopicCount和WildcardTopicCount

4. StaticTopicCount类:实现了TopicCount接口。其pattern类型为static

5. WildcardTopicCount类:实现了TopicCount接口。根据给定的TopicFilter来判断pattern是white_list还是black_list

八、TopicFilter.scala

TopicFilter抽象类,用于解析topic的正则表达式,并提供一个isTopicAllowed方法用于过滤topic。它有两个子类:Whitelist和Blacklist分别实现白名单过滤和黑名单过滤。

九、PartitionTopicInfo.scala

封装了topic的分区信息,包括这个分区的数据块队列,已消费的位移、已获取的位移以及获取大小等信息。另外提供了一些setter和getter方法可以获取并设置这些信息

十、ZookeeperConsumerConnector.scala

该类主要负责处理consumer与zookeeper之间的交互。

与consumer相关的zookeeper目录结构:

1. consumer id注册节点: /consumers/[group_id]/ids/[consumer_id]  每个consumer在consumer group内有个唯一的id号。它会将该id号以临时节点的方式注册到zookeeper的对应目录中,并把它订阅的所有topic都封装到subscription子JSON元素中。因为是临时节点,consumer一结束zookeeper就会删除该节点。值得注意的是,consumer id的命名没有采用顺序节点的方式,而是从配置中选定的——主要是因为顺序生成节点不利于错误恢复

2. broker节点注册:/brokers/ids/[brokerId]. 每个broker节点都会被分配一个逻辑节点号,从0开始。broker启动时会将其自身注册到zookeeper中——即在/brokers/ids下创建一个以逻辑节点号命名的子节点。这个znode的值是一个JSON串包含以下信息:

  • version:版本号,固定为1
  • host:broker的IP地址或主机名
  • port:broker端口
  • jmx:若启用了jmx,就是jmx的端口号,否则为-1
  • timestamp:broker创建时的时间戳

3. 分区注册信息: /consumers/[group_id]/owners/[topic]/[partitionId]。

4. consumer位移信息:/consumers/[group_id]/offsets/[topic]/[partitionId] -> 位移

这个scala定义了一组伴生对象,其中object中就只有一个变量shutdownCommand用于标识关闭标识。当在队列中看到这个标识的时候就需要结束迭代过程。而ZookeeperConsumerConnector类是这个文件中的核心。它实现了ConsumerConnector trait,因此也就要实现该trait定义的那些抽象方法。

下面先分析一下该类定义的一些重要字段:

1. isShuttingDown:用于标识该connector的状态是否正处理关闭状态

2. fetcher:ConsumerFetcher管理器,用于管理fetcher线程

3. zkClient:用于连接zookeeper的客户端

4. topicRegistry:保存topic下的分区信息

5. checkpointedZkOffsets:保存topic分区对应的位移

6. topicThreadIdAndQueues:保存topic与其消费者线程对应的阻塞队列

7. scheduler:调度器每过auto.commit.interval.ms时间就向zookeeper提交consumer位移

8. messageStreamCreated:标识KafkaStream是否已经创建

9. sessionExpirationListener/topicPartitionChangeListener/loadBalancerListener:三个zk监控器,分别由三个嵌套类实现,后面会提及

10. offsetsChannel:用于发送OffsetFetchRequst的通道

11. wildcardTopicWatcher:ZookeeperTopicEventWatcher类实现的topic事件监听类

12. consumerIdString:定义了如何命名consumer id的规则。如果没有指定consumer.id了,就设置为consumer group_主机名-时间戳-(uuid的一部分)

在构造函数中,该类会首先连接zookeeper,然后创建Fetcher管理器并会以阻塞的方式确认连上副本管理器,最后如果开启了自动提交(auto.commit.enable),那么使用调度器创建一个定时任务。

下面重点说说它提供的一些方法:

1. connectZk:连接zookeeper.connect中指定的zookeeper,就是创建zkClient

2. createFetcher:创建ConsumerFetcherManager

3. ensureOffsetManagerConnected:该方法会一直阻塞知道确认找到可用的副本管理器,其底层的IO通道也已创建。该方法只是针对使用kafka来保存consumer位移的情况——即设置offsets.storage=kafka

4. shutdown:关闭该connector,主要涉及到关闭wildcardTopicWatcher、调度器、fetcher管理器、清除所有队列、提交位移以及关闭zookeeper客户端和位移通道等

5. registerConsumerInZK:在zookeeper中注册给定的consumer——即在zookeeper的/consumers/[groupId]/ids下创建一个临时节点

6. sendShutdownToAllQueues:清除topicThreadIdAndQueues中的队列并向所有队列发送关闭命令

7. autocommit:自动提交位移,主要由方法commitOffsets实现

8. commitOffsetToZooKeeper:向zookeeper提交位移,就是更新指定节点的数据并将offset保存在checkpointedZKOffsets缓存中

9. commitOffsets:提交位移。在具体分析代码之前,先来分析下属性offsets.commit.retries——重试位移的次数。它只对关闭connector时候的位移提交有效,而不计算自动提交线程发起的提交。它也不考虑在提交前的查询位移。比如一个consumer元数据请求基于某种原因失败了,它会被重试但并不计入这个统计之中。commitOffsets貌似参数含义写反了,它现在的参数名是isAutoCommit,但实际实际调用过程中,如果是自动提交反而需要指定false。

具体逻辑如下:

  • 根据是否为自动提交来设定重试次数——如果是为1次即不重试;否则为offsets.commit.retries + 1
  • 从topicRegistry中构建要提交的位移集合
  • 如果该集合是空自然也不需要提交什么,否则判断一下使用何种存储来保存consumer位移
  • 如果是zookeeper保存(默认情况),遍历待提交位移集合,为每一个topic分区去zookeeper的对应节点下更新位移
  • 如果是kafka来保存位移,
    • 首先要创建OffsetCommitRequest请求
    • 然后确保能够连上副本管理器
    • 发送OffsetCommitRequest请求并得到对应的response
    • 找出response中包含的错误码,如果有错误标记为提交位移失败

10. fetchOffsetFromZooKeeper:从Zookeeper中获取给定分区的位移

11. fetchOffsets:获取一组分区的consumer位移,如果是保存在zookeeper中直接调用fetchOffsetFromZooKeeper获取,否则具体逻辑如下:

  • 创建OffsetFetchRequest
  • 确保连入副本管理器并发送OffsetFetchRequest请求,获取对应的response
  • 如果leader发生了变更或位移缓存正在加载中的话则返回的response是空——以便后面重试
  • 查看是否启用了双路位移提交(dual.commit.enable)——比如一个consumer group正在从迁移zookeeper中的位移到kafka中,如果没有的话直接返回response,否则就从zookeeper和kafka中选取大的那个返回给response

该类还有一些很重要的方法,但我们先看一下该scala文件中嵌套定义的4个类:

1. ZKSessionExpireListener —— 监听zookeeper会话过期的监听器。因为事先了IZKStateListener接口,因此也必须实现handleStateChanged和handleNewSession两个方法。

  • handleStateChanged:什么都不用做,因为zookeeper客户端会重连
  • handleNewSession:zookeeper会话过期后调用该方法来创建新的会话。也就是重建临时节点,重新注册consumer。主要逻辑就是
    • 首先清空topicRegistry分区信息缓存
    • 在zookeeper中重新注册consumer (registerConsumerInZK)
    • 在consumer上重新发起负载均衡操作——通过负载均衡监听器的syncRebalance方法。另外由于在负载均衡过程中会重新注册子节点变更和状态变更的监听器,因此handleNewSession方法中就不在重订阅它们了。

2. ZKTopicPartitionChangeListener:也是一个监听器,用于监听zookeeper节点数据的变更。两个方法:

  • handleDataChange:  topic数据发生变更时调用该方法,应对的方法就是调用relabalanceEventTriggered通知所有监听执行线程继续执行
  • handleDataDeleted:抛出警告表明topic数据被意外地删除了

3. ZKRebalancerListener:监听zookeeper子节点变更的监听器,用于触发consumer的负载均衡。在类的内部它会创建一个监控执行线程用于监控给定的consumer,一旦监控到要触发rebalance就调用syncedRebalance开始执行rebalance。因为是zookeeper的子节点监听类,它还必须实现handleChildChange,用于触发rebalacen事件。下面一一分析其定义的方法:

  • rebalanceEventTriggered —— 设置isWatcherTriggered为true并唤醒监控线程开始执行rebalance操作
  • deletePartitionOwnershipFromZK —— 从zookeeper中删除给定topic分对应的分区znode: /consumers/[groupId]/owners/[topic]/[partition],就是删除这个consumer的注册信息
  • releasePartitionOwnership —— 通过循环调用deletePartitionOwnershipFromZK方法, 取消给定所有topic的所有分区的consumer注册信息。并删除对应的统计信息以及清空对应的计数器
  • resetState —— 清空该consumer connector上注册的所有topic信息
  • clearFetcherQueues —— 清空fetcher相关的所有队列以及当前正在consumer线程中遍历的数据块(data chunk)
  • closeFetchersForQueues —— 停止所有fetcher线程并清空所有队列避免数据重复。在清空fetcher之前先要停掉leader发现线程。之后如果启用了自动提交位移还是需要提交位移以防止consumer从当前数据块中再返回消息。由于分区注册信息还在zookeeper中没有被释放,本次提交位移能够保证现在提交的位移会被下一个拥有当前数据块分区的consumer线程所使用。因为fetcher总是要关闭的并且这是consumer遍历的最后一个数据块,迭代器就不会再返回任何新的消息了直到rebalance成功完成且fetcher重启之后获取更多的数据块
  • closeFetchers —— 清空consumer"可能"不再消费的topic分区的fetcher队列
  • updateFetcher —— 更新fetcher的分区
  • reflectPartitionOwnershipDecision —— 判断consumer是否是给定topic分区的owner,即在zookeeper上创建/consumers/[groupId]/owners/[topic]/[partition],如果能创建就是owner
  • addPartitionTopicInfo —— 将给定的topic分区信息加入到这个connector的缓存中
  • reinitializeConsumer —— 重新初始化consumer,主要就是创建各种监听器,更新各种缓存等
  • rebalance —— 根据可用broker重新分配consumer-topic分区的对应记录
  • syncedRebalance —— 重新再平衡分配consumer-topic分区的对应记录

4. WildcardStreamsHandler类:用于做topic的通配符过滤之用

十一、ConsumerFetcherManager.scala

consumer fetcher的管理类,其定义的startConnections和stopConnections方法会被反复地调用。该类主要定义了一个嵌套类:
LeaderFinderThread —— 顾名思义,就是leader发现者线程,当leader可用时,将fetcher添加到对应的broker上

十二、ConsumerFetcherThread.scala

consumer获取线程,三个方法:

1. processPartitionData:处理获取到的数据,主要就是将消息集合入队列等待处理

2. handleOffsetOutOfRange:处理一个分区的位移越界的情况,主要根据auto.offset.reset属性设定的值来指定

3. handlePartitionsWithErrors:处理没有leader需要leader选举的分区

十三、ConsumerTopicStats.scala

consumer的统计信息类,就不详细说了

十四、FetchRequestAndResponseStats.scala

统计一个给定的consumer客户端提交给所有broker的所有FetchRequest请求统计信息以及对应的response统计信息

十五、TopicEventHandler.scala

一个处理topic事件的trait,只定义了一个方法:handleTopicEvent

十六、ZookeeperTopicEventWatcher.scala

监控/brokers/topics节点下各个topic子节点的变更

十七、SimpleConsumer.scala

kafka消息的consumer。它会维护一个BlockingChannel用于收发请求/响应,因此也提供了connect和disconnect方法用于开启和关闭底层的blockingchannel。该类的定义核心方法还包括:

1. send,也就是发送TopicMetadataRequest和ConsumerMetadataRequest

2. getOffsetsBefore:获取给定时间之前的一组有效位移

3. commitOffsets:提交一个topic的位移。请求中如果版本是0,提交位移给zookeeper,否则提交位移给Kafka

4. fetchOffsets:获取一个topic的位移。版本0从zookeeper中获取,否则从kafka中获取

5. earliestOrLatestOffset:为给定的topic分区获取最早或最新的位移

6. fetch:从FetchRequest中获取一个topic的一组消息

时间: 2024-10-19 02:52:16

【原创】kafka consumer源代码分析的相关文章

Kafka SocketServer源代码分析

Kafka SocketServer源代码分析 标签: kafka 本文将详细分析Kafka SocketServer的相关源码. 总体设计 Kafka SocketServer是基于Java NIO来开发的,采用了Reactor的模式,其中包含了1个Acceptor负责接受客户端请求,N个Processor负责读写数据,M个Handler来处理业务逻辑.在Acceptor和Processor,Processor和Handler之间都有队列来缓冲请求. kafka.network.Accepto

【原创】Kafka console consumer源代码分析

上一篇中分析了Scala版的console producer代码,这篇文章来分析一下console consumer的工作原理.其实不论是哪个consumer,大部分的工作原理都是类似的.我们用console consumer作为切入点,既容易理解又不失一般性. 首先需要说明的,我使用的Kafka环境是Kafka0.8.2.1版本,这也是最新的版本.另外我们主要分析consumer的原理,没有过分纠结于console consumer的使用方法——所以我在这里选用了最简单的一条命令作为开始:bi

【原创】Kafka console consumer源代码分析(二)

我们继续讨论console consumer的实现原理,本篇着重探讨ZookeeperConsumerConnector的使用,即后续所有的内容都由下面这条语句而起: val connector = Consumer.create(config) 那么问题来了?这条语句后面执行了什么呢?我们先看create方法的定义 def create(config: ConsumerConfig): ConsumerConnector = { val consumerConnect = new Zookee

【原创】kafka server源代码分析(一)

这个是Kafka server的核心包,里面的类也很多,我们还是一个一个分析 一.BrokerStates.scala 定义了目前一个kafka broker的7中状态 —— 1. NotRunning:未运行 2. Starting:启动中 3. RecoveringFromUncleanShutdown:从上次异常恢复中 4. RunningAsBroker:已启动 5. RunningAsController:作为Controller运行 6. PendingControlledShutd

【原创】kafka server源代码分析(二)

十四.AbstractFetcherManager.scala 该scala定义了两个case类和一个抽象类.两个case类很简单: 1. BrokerAndFectherId:封装了一个broker和一个fetcher的数据结构 2. BrokerAndInitialOffset:封装了broker和初始位移的一个数据结构 该scala中最核心的还是那个抽象类:AbstractFetcherManager.它维护了一个获取线程的map,主要保存broker id + fetcher id对应的

【原创】kafka controller源代码分析(一)

Kafka集群中的一个broker会被作为controller负责管理分区和副本的状态以及执行类似于重分配分区之类的管理任务.如果当前的controller失败了,会从剩下的broker中选出新的controller. 一.PartitionLeaderSelector.scala 顾名思义就是为分区选举出leader broker,该trait只定义了一个方法selectLeader,接收一个TopicAndPartition对象和一个LeaderAndIsr对象.TopicAndPartit

【原创】kafka controller源代码分析(二)

四.TopicDeletionManager.scala 管理topic删除的状态机,具体逻辑如下: TopicCommand发送topic删除命令,在zk的/admin/delete_topics目录下创建topic节点 controller会监听该zk目录下任何节点的变更并为对应的topic开启删除操作 controller开启一个后台线程处理topic的删除.使用该线程主要为了以后能够增加TTL(time to live)的特性.无论何时开启或重启topic删除操作时都会通知该线程.当前,

【原创】kafka admin源代码分析

admin包定义了命令行的一些实现 一.AdminOperationException.scala 一个异常类,表示执行admin命令时候抛出的异常 二.AdminUtils.scala admin一些常用工具方法: 1. assignReplicasToBrokers:负责分配副本到不同的broker上.主要有两个目标:① 尽可能均匀地在不同的broker上分配副本:② 对于被分配到同一个broker上的分区而言,它们的其他副本会尽可能地分配到不同的broker上.如果要达到这些目标,需要①分

【原创】kafka client源代码分析

该包下只有一个文件:ClientUtils.scala.它是一个object,里面封装了各种client(包括producer,consumer或admin)可能会用到的方法: 1. fetchTopicMetadata(producer版本): producer client会调用该方法来发送一个TopicMetadata请求,最后返回该请求对应的response.具体逻辑如下: 构造一个TopicMetadataRequest请求 将给定的broker列表按照随机顺序打散以防止大量的请求被路