顾名思义,就是kafka的consumer api包。
一、ConsumerConfig.scala
Kafka consumer的配置类,除了一些默认值常量及验证参数的方法之外,就是consumer的配置参数了,比如group.id, consumer.id等,详细列表见官网。
二、ConsumerIterator.scala
KafkaStream的迭代器类,当stream底层的阻塞队列为空时该iterator会置于阻塞状态。这个迭代器还提供了一个shutdownCommand对象可作为一个标识位被加入到队列中从而触发关闭操作。
既然是迭代器,最重要的next方法一定是要提供的。下面我们依次分析下其定义的方法:
1. next:获取下一个元素。具体逻辑就是用父类的next方法获取下一个MessageAndMetadata,然后再更新一下consumer的度量元统计信息
2. makeNext:核心方法,具体逻辑如下:
- 获取当前的迭代器,如果是空,就获取一个。具体做法就是根据超时配置以不同的方式从获取底层的channel中读取一个数据块
- 如果该数据块是关闭命令,直接返回
- 否则,获取当前的topic信息。如果要请求的位移值比当前已消耗的位移大,那么consumer就有可能会丢失数据。
- 之后获取一个iterator,并调用next方法获取下一个元素,并构造新的MessageAndMetadata实例返回
3. clearCurrentChunk:清除当前的数据块,即清空了当前的迭代器引用
三、KafkaStream.scala
定义了一个Kafka consumer stream。每个stream都支持迭代遍历其MessageAndMetadata元素。内部维护了一个迭代器ConsumerIterator。KafkaStream定义的方法如下:
1. iterator:返回内部维护的迭代器
2. clear:在consumer重分布时清除被迭代的队列。主要是为了减少consumer接收到重复消息
四、ConsumerConnector.scala
consumer的主接口。定义了一个trait和一个object。ConsumerConnector trait定义了一些抽象方法:
1. createMessageStreams:为每个topic创建一组KafkaStream
2. createMessageStreams (支持指定KeyDeCoder和ValueDecoder)
3. createMessageStreamsByFilter:也是为给定的所有topic创建一组KafkaStream,只不过这个方法允许传递一个filter,允许黑白名单过滤
4. commitOffsets:向连接此consumer connector的所有broker分区执行提交位移操作
5. shutdown:关闭connector
而Consumer object定义了两个方法:
1. create:创建一个ConsumerConnector
2. createJavaConsumerConnector:创建一个java client使用的consumer connector
五、FetchedDataChunk.scala
表示一段获取到的数据块,封装了一组保存在一个字节缓冲区的消息,分区topic信息以及获取到的位移值
六、PartitionAssignor.scala
为一个consumer group中的consumer做分区分配的。PartitionAssignor trait定义了assign方法,返回分区到consumer线程的映射记录。其中被分配的线程必须要属于给定分区上下文(AssignmentContext)中的某个consumer。
说到分配上下文类——AssignmentContext,它需要接收一个consumer group、一个consumer id以及一个zkClient,并在内部维护了一个map记录topic对应的consumer线程集合(主要由TopicCount类中的方法提供)。其定义的方法还包括:
1. partitionsForTopic:返回topic对应的分区集合
2. consumersForTopic:返回topic对应的consumers线程
3. consumers:返回consumers id的集合
PartitionAssignor object定义了一个工厂方法用于创建不同策略的分区分配器,目前Kafka支持两种再平衡策略(也就是分区分配策略):round robin和range。值得注意的是,这里所说的分区策略其实是指指如何将分区分配给消费组内的不同consumer实例。
假设我们有一个topic:T1,T1有10个分区,分别是[P0, P9],然后我们有2个consumer,C1和C2。C1有一个线程,C2有两个线程。
下面我们来看看默认的range策略是如何分配分区的:
1. Range策略
对于每一个topic,range策略会首先按照数字顺序排序所有可用的分区,并按照字典顺序列出所有的consumer线程。结合我们上面的例子,分区顺序是0,1,2,3,4,5,6,7,8,9,而consumer线程的顺序是c1-0, c2-0, c2-1。然后使用分区数除以线程数以确定每个线程至少获取的分区数。在我们的例子中,10/3不能整除,余数为1,因此c1-0会被额外多分配一个分区。最后的分区分配如下:
c1-0 获得分区 0 1 2 3
c2-0 获得分区 4 5 6
c2-1 获得分区 7 8 9
如果该topic是11个分区,那么分区分配如下:
c1-0 获取分区 0 1 2 3
c2-0 获取分区 4 5 6 7
c2-1 获取分区 8 9 10
2. roundrobin策略——轮询策略
如果是轮询策略,我们上面假设的例子就不适用了,因为该策略要求订阅某个topic的所有consumer都必须有相同数目的线程数,因此我们修改上面的例子,假设每个consumer都有2个线程。round robin策略与range的一个主要的区别就是在再分配之前你是没法预测分配结果的——因为它会使用哈希求模的方式随机化排序顺序。
如果要采用roundrobin策略必须要先满足两个条件:
- 订阅topic的consumer必须有相同数目的线程数
- consumer group内每个consumer实例都必须有相同的被订阅topic集合
当这两个条件满足后,kafka会将topic-partition对根据hashcode进行随机排序以防某个topic的所有分区都被分配给一个consumer。之后所有的topic-partition对按照轮询的方式分配给可用的consumer线程。以我们改进过的例子来说,假设排序之后的topic-分区是这样的:
T1-5, T1-3, T1-0, T1-8, T1-2, T1-1, T1-4, T1-7, T1-6和T1-9,而consumer线程是c1-0, c1-1, c2-0, c2-1.那么最后的分区结果如下:
T1-5 去 c1-0
T1-3 去 c1-1
T1-0 去 c2-0
T1-8 去 c2-1
此时所有的consumer线程已经分配过了,但还有尚未分配的分区,这时候就从头再次分配线程:
T1-2 去 c1-0
T1-1 去 c1-1
T1-4 去 c2-0
T1-7 去 c2-1
再次从头开始,
T1-6 去 c1-0
T1-9 去 c1-1
此时所有的分区都已经分配过了,每个consumer线程能够分配到几乎相同数目的分区——这就是round robin的方式。
七、TopicCount.scala
该scala定义了很多类,我们一一分析:
1. ConsumerThreadId:封装了consumer id和线程id。因为扩展了Ordered接口,因此支持按照字典顺序排序。主要为分区策略使用。
2. TopicCount trait:提供topic分组统计的主接口,定义了三个方法:
- getConsumerThreadIdsPerTopic——返回topic及其Consumer线程id集合的映射
- getTopicCountMap——返回topic对应consumer stream数的映射
- pattern:目前有三种pattern:static、white_list和black_list。通过对黑白名单的支持,允许consumer订阅多个topic
3. TopicCount object:定义了一些常用方法,比如:
- makeThreadId:consumer thread的命名规则是[consumer id]-thread id
- makeConsumerThreadIdsPerTopic:为给定的一组topic创建出一组ConsumerThreadId来
- constructTopicCount:根据给定的consumer group和consumer id创建一个TopicCount。具体逻辑如下:
- 读取/consumers/[group_id]/ids/[consumer_id]节点下的数据(JSON)
- 解析这个JSON串,提取出各个字段的值
- 如果pattern是static类型,创建一个StaticTopicCount返回;否则创建一个WildcardTopicCount返回
constructTopicCount还有另外两个重载方法,分别创建StaticTopicCount和WildcardTopicCount
4. StaticTopicCount类:实现了TopicCount接口。其pattern类型为static
5. WildcardTopicCount类:实现了TopicCount接口。根据给定的TopicFilter来判断pattern是white_list还是black_list
八、TopicFilter.scala
TopicFilter抽象类,用于解析topic的正则表达式,并提供一个isTopicAllowed方法用于过滤topic。它有两个子类:Whitelist和Blacklist分别实现白名单过滤和黑名单过滤。
九、PartitionTopicInfo.scala
封装了topic的分区信息,包括这个分区的数据块队列,已消费的位移、已获取的位移以及获取大小等信息。另外提供了一些setter和getter方法可以获取并设置这些信息
十、ZookeeperConsumerConnector.scala
该类主要负责处理consumer与zookeeper之间的交互。
与consumer相关的zookeeper目录结构:
1. consumer id注册节点: /consumers/[group_id]/ids/[consumer_id] 每个consumer在consumer group内有个唯一的id号。它会将该id号以临时节点的方式注册到zookeeper的对应目录中,并把它订阅的所有topic都封装到subscription子JSON元素中。因为是临时节点,consumer一结束zookeeper就会删除该节点。值得注意的是,consumer id的命名没有采用顺序节点的方式,而是从配置中选定的——主要是因为顺序生成节点不利于错误恢复
2. broker节点注册:/brokers/ids/[brokerId]. 每个broker节点都会被分配一个逻辑节点号,从0开始。broker启动时会将其自身注册到zookeeper中——即在/brokers/ids下创建一个以逻辑节点号命名的子节点。这个znode的值是一个JSON串包含以下信息:
- version:版本号,固定为1
- host:broker的IP地址或主机名
- port:broker端口
- jmx:若启用了jmx,就是jmx的端口号,否则为-1
- timestamp:broker创建时的时间戳
3. 分区注册信息: /consumers/[group_id]/owners/[topic]/[partitionId]。
4. consumer位移信息:/consumers/[group_id]/offsets/[topic]/[partitionId] -> 位移
这个scala定义了一组伴生对象,其中object中就只有一个变量shutdownCommand用于标识关闭标识。当在队列中看到这个标识的时候就需要结束迭代过程。而ZookeeperConsumerConnector类是这个文件中的核心。它实现了ConsumerConnector trait,因此也就要实现该trait定义的那些抽象方法。
下面先分析一下该类定义的一些重要字段:
1. isShuttingDown:用于标识该connector的状态是否正处理关闭状态
2. fetcher:ConsumerFetcher管理器,用于管理fetcher线程
3. zkClient:用于连接zookeeper的客户端
4. topicRegistry:保存topic下的分区信息
5. checkpointedZkOffsets:保存topic分区对应的位移
6. topicThreadIdAndQueues:保存topic与其消费者线程对应的阻塞队列
7. scheduler:调度器每过auto.commit.interval.ms时间就向zookeeper提交consumer位移
8. messageStreamCreated:标识KafkaStream是否已经创建
9. sessionExpirationListener/topicPartitionChangeListener/loadBalancerListener:三个zk监控器,分别由三个嵌套类实现,后面会提及
10. offsetsChannel:用于发送OffsetFetchRequst的通道
11. wildcardTopicWatcher:ZookeeperTopicEventWatcher类实现的topic事件监听类
12. consumerIdString:定义了如何命名consumer id的规则。如果没有指定consumer.id了,就设置为consumer group_主机名-时间戳-(uuid的一部分)
在构造函数中,该类会首先连接zookeeper,然后创建Fetcher管理器并会以阻塞的方式确认连上副本管理器,最后如果开启了自动提交(auto.commit.enable),那么使用调度器创建一个定时任务。
下面重点说说它提供的一些方法:
1. connectZk:连接zookeeper.connect中指定的zookeeper,就是创建zkClient
2. createFetcher:创建ConsumerFetcherManager
3. ensureOffsetManagerConnected:该方法会一直阻塞知道确认找到可用的副本管理器,其底层的IO通道也已创建。该方法只是针对使用kafka来保存consumer位移的情况——即设置offsets.storage=kafka
4. shutdown:关闭该connector,主要涉及到关闭wildcardTopicWatcher、调度器、fetcher管理器、清除所有队列、提交位移以及关闭zookeeper客户端和位移通道等
5. registerConsumerInZK:在zookeeper中注册给定的consumer——即在zookeeper的/consumers/[groupId]/ids下创建一个临时节点
6. sendShutdownToAllQueues:清除topicThreadIdAndQueues中的队列并向所有队列发送关闭命令
7. autocommit:自动提交位移,主要由方法commitOffsets实现
8. commitOffsetToZooKeeper:向zookeeper提交位移,就是更新指定节点的数据并将offset保存在checkpointedZKOffsets缓存中
9. commitOffsets:提交位移。在具体分析代码之前,先来分析下属性offsets.commit.retries——重试位移的次数。它只对关闭connector时候的位移提交有效,而不计算自动提交线程发起的提交。它也不考虑在提交前的查询位移。比如一个consumer元数据请求基于某种原因失败了,它会被重试但并不计入这个统计之中。commitOffsets貌似参数含义写反了,它现在的参数名是isAutoCommit,但实际实际调用过程中,如果是自动提交反而需要指定false。
具体逻辑如下:
- 根据是否为自动提交来设定重试次数——如果是为1次即不重试;否则为offsets.commit.retries + 1
- 从topicRegistry中构建要提交的位移集合
- 如果该集合是空自然也不需要提交什么,否则判断一下使用何种存储来保存consumer位移
- 如果是zookeeper保存(默认情况),遍历待提交位移集合,为每一个topic分区去zookeeper的对应节点下更新位移
- 如果是kafka来保存位移,
- 首先要创建OffsetCommitRequest请求
- 然后确保能够连上副本管理器
- 发送OffsetCommitRequest请求并得到对应的response
- 找出response中包含的错误码,如果有错误标记为提交位移失败
10. fetchOffsetFromZooKeeper:从Zookeeper中获取给定分区的位移
11. fetchOffsets:获取一组分区的consumer位移,如果是保存在zookeeper中直接调用fetchOffsetFromZooKeeper获取,否则具体逻辑如下:
- 创建OffsetFetchRequest
- 确保连入副本管理器并发送OffsetFetchRequest请求,获取对应的response
- 如果leader发生了变更或位移缓存正在加载中的话则返回的response是空——以便后面重试
- 查看是否启用了双路位移提交(dual.commit.enable)——比如一个consumer group正在从迁移zookeeper中的位移到kafka中,如果没有的话直接返回response,否则就从zookeeper和kafka中选取大的那个返回给response
该类还有一些很重要的方法,但我们先看一下该scala文件中嵌套定义的4个类:
1. ZKSessionExpireListener —— 监听zookeeper会话过期的监听器。因为事先了IZKStateListener接口,因此也必须实现handleStateChanged和handleNewSession两个方法。
- handleStateChanged:什么都不用做,因为zookeeper客户端会重连
- handleNewSession:zookeeper会话过期后调用该方法来创建新的会话。也就是重建临时节点,重新注册consumer。主要逻辑就是
- 首先清空topicRegistry分区信息缓存
- 在zookeeper中重新注册consumer (registerConsumerInZK)
- 在consumer上重新发起负载均衡操作——通过负载均衡监听器的syncRebalance方法。另外由于在负载均衡过程中会重新注册子节点变更和状态变更的监听器,因此handleNewSession方法中就不在重订阅它们了。
2. ZKTopicPartitionChangeListener:也是一个监听器,用于监听zookeeper节点数据的变更。两个方法:
- handleDataChange: topic数据发生变更时调用该方法,应对的方法就是调用relabalanceEventTriggered通知所有监听执行线程继续执行
- handleDataDeleted:抛出警告表明topic数据被意外地删除了
3. ZKRebalancerListener:监听zookeeper子节点变更的监听器,用于触发consumer的负载均衡。在类的内部它会创建一个监控执行线程用于监控给定的consumer,一旦监控到要触发rebalance就调用syncedRebalance开始执行rebalance。因为是zookeeper的子节点监听类,它还必须实现handleChildChange,用于触发rebalacen事件。下面一一分析其定义的方法:
- rebalanceEventTriggered —— 设置isWatcherTriggered为true并唤醒监控线程开始执行rebalance操作
- deletePartitionOwnershipFromZK —— 从zookeeper中删除给定topic分对应的分区znode: /consumers/[groupId]/owners/[topic]/[partition],就是删除这个consumer的注册信息
- releasePartitionOwnership —— 通过循环调用deletePartitionOwnershipFromZK方法, 取消给定所有topic的所有分区的consumer注册信息。并删除对应的统计信息以及清空对应的计数器
- resetState —— 清空该consumer connector上注册的所有topic信息
- clearFetcherQueues —— 清空fetcher相关的所有队列以及当前正在consumer线程中遍历的数据块(data chunk)
- closeFetchersForQueues —— 停止所有fetcher线程并清空所有队列避免数据重复。在清空fetcher之前先要停掉leader发现线程。之后如果启用了自动提交位移还是需要提交位移以防止consumer从当前数据块中再返回消息。由于分区注册信息还在zookeeper中没有被释放,本次提交位移能够保证现在提交的位移会被下一个拥有当前数据块分区的consumer线程所使用。因为fetcher总是要关闭的并且这是consumer遍历的最后一个数据块,迭代器就不会再返回任何新的消息了直到rebalance成功完成且fetcher重启之后获取更多的数据块
- closeFetchers —— 清空consumer"可能"不再消费的topic分区的fetcher队列
- updateFetcher —— 更新fetcher的分区
- reflectPartitionOwnershipDecision —— 判断consumer是否是给定topic分区的owner,即在zookeeper上创建/consumers/[groupId]/owners/[topic]/[partition],如果能创建就是owner
- addPartitionTopicInfo —— 将给定的topic分区信息加入到这个connector的缓存中
- reinitializeConsumer —— 重新初始化consumer,主要就是创建各种监听器,更新各种缓存等
- rebalance —— 根据可用broker重新分配consumer-topic分区的对应记录
- syncedRebalance —— 重新再平衡分配consumer-topic分区的对应记录
4. WildcardStreamsHandler类:用于做topic的通配符过滤之用
十一、ConsumerFetcherManager.scala
consumer fetcher的管理类,其定义的startConnections和stopConnections方法会被反复地调用。该类主要定义了一个嵌套类:
LeaderFinderThread —— 顾名思义,就是leader发现者线程,当leader可用时,将fetcher添加到对应的broker上
十二、ConsumerFetcherThread.scala
consumer获取线程,三个方法:
1. processPartitionData:处理获取到的数据,主要就是将消息集合入队列等待处理
2. handleOffsetOutOfRange:处理一个分区的位移越界的情况,主要根据auto.offset.reset属性设定的值来指定
3. handlePartitionsWithErrors:处理没有leader需要leader选举的分区
十三、ConsumerTopicStats.scala
consumer的统计信息类,就不详细说了
十四、FetchRequestAndResponseStats.scala
统计一个给定的consumer客户端提交给所有broker的所有FetchRequest请求统计信息以及对应的response统计信息
十五、TopicEventHandler.scala
一个处理topic事件的trait,只定义了一个方法:handleTopicEvent
十六、ZookeeperTopicEventWatcher.scala
监控/brokers/topics节点下各个topic子节点的变更
十七、SimpleConsumer.scala
kafka消息的consumer。它会维护一个BlockingChannel用于收发请求/响应,因此也提供了connect和disconnect方法用于开启和关闭底层的blockingchannel。该类的定义核心方法还包括:
1. send,也就是发送TopicMetadataRequest和ConsumerMetadataRequest
2. getOffsetsBefore:获取给定时间之前的一组有效位移
3. commitOffsets:提交一个topic的位移。请求中如果版本是0,提交位移给zookeeper,否则提交位移给Kafka
4. fetchOffsets:获取一个topic的位移。版本0从zookeeper中获取,否则从kafka中获取
5. earliestOrLatestOffset:为给定的topic分区获取最早或最新的位移
6. fetch:从FetchRequest中获取一个topic的一组消息