【原创】Kafka console consumer源代码分析(二)

我们继续讨论console consumer的实现原理,本篇着重探讨ZookeeperConsumerConnector的使用,即后续所有的内容都由下面这条语句而起:

val connector = Consumer.create(config)

那么问题来了?这条语句后面执行了什么呢?我们先看create方法的定义

def create(config: ConsumerConfig): ConsumerConnector = {

val consumerConnect = new ZookeeperConsumerConnector(config)

consumerConnect

}

可以看出它的全部逻辑就是创建一个ZookeeperConsumerConnector实例并调用它的构造函数。现在问题变得简单了,我们必须要弄清楚ZookeeperConsumerConnector在创建实例的时候都做了哪些事情:

1. 创建KafkaScheduler ---- 该调度器的任务是定时地提交位移到zookeeper中

2. 生成consumer.id, 格式是[group.id]_主机名-时间戳-随机UUID前8位;如果在命令行中指定了consumer id,则格式为[group.id]_[consumer id]

3. 创建连接zookeeper客户端

4. 创建ConsumerFetcherManager

5. 生成定时任务,根据auto.commit.interval.ms的配置定时地提交位移,默认是1分钟

提交位移到zookeeper就是要定期将已消费过的消息位移保存到zookeeper上,具体的逻辑也很简单,本文在这里就不赘述了。我们只关心上面步骤中的第四步——创建ConsumerFetcherManager。那么,ConsumerFetcherManager是做什么用的呢?

顾名思义,ConsumerFetcherManager就是消费者获取线程的管理器,它在内存中维护了两个映射关系:

1. 获取者线程与broker的映射,即每个broker上面都有哪些获取者线程

2. topic分区与分区消费信息的映射,这里的分区消费信息包含很多内容,比如底层的消费队列、保存到zk上的已消费位移、获取过的最大位移以及获取大小等信息。

有了这些信息,一个消费者线程管理器就可以很方便地对消费者线程进行动态地重分配。

时间: 2024-12-28 16:05:37

【原创】Kafka console consumer源代码分析(二)的相关文章

【原创】Kafka console consumer源代码分析

上一篇中分析了Scala版的console producer代码,这篇文章来分析一下console consumer的工作原理.其实不论是哪个consumer,大部分的工作原理都是类似的.我们用console consumer作为切入点,既容易理解又不失一般性. 首先需要说明的,我使用的Kafka环境是Kafka0.8.2.1版本,这也是最新的版本.另外我们主要分析consumer的原理,没有过分纠结于console consumer的使用方法——所以我在这里选用了最简单的一条命令作为开始:bi

【原创】Kakfa utils源代码分析(二)

我们继续研究kafka.utils包 八.KafkaScheduler.scala 首先该文件定义了一个trait:Scheduler——它就是运行任务的一个调度器.任务调度的方式支持重复执行的后台任务或是一次性的延时任务.这个trait定义了三个抽象方法: 1. startup: 启动调度器,用于接收调度任务 2. shutdown: 关闭调度器.一旦关闭就不再执行调度任务了,即使是那些晚于关闭时刻的任务. 3. schedule: 调度一个任务的执行.方法接收4个参数 3.1 任务名称 3.

【原创】kafka consumer源代码分析

顾名思义,就是kafka的consumer api包. 一.ConsumerConfig.scala Kafka consumer的配置类,除了一些默认值常量及验证参数的方法之外,就是consumer的配置参数了,比如group.id, consumer.id等,详细列表见官网. 二.ConsumerIterator.scala KafkaStream的迭代器类,当stream底层的阻塞队列为空时该iterator会置于阻塞状态.这个迭代器还提供了一个shutdownCommand对象可作为一个

【原创】kafka server源代码分析(二)

十四.AbstractFetcherManager.scala 该scala定义了两个case类和一个抽象类.两个case类很简单: 1. BrokerAndFectherId:封装了一个broker和一个fetcher的数据结构 2. BrokerAndInitialOffset:封装了broker和初始位移的一个数据结构 该scala中最核心的还是那个抽象类:AbstractFetcherManager.它维护了一个获取线程的map,主要保存broker id + fetcher id对应的

【原创】kafka controller源代码分析(二)

四.TopicDeletionManager.scala 管理topic删除的状态机,具体逻辑如下: TopicCommand发送topic删除命令,在zk的/admin/delete_topics目录下创建topic节点 controller会监听该zk目录下任何节点的变更并为对应的topic开启删除操作 controller开启一个后台线程处理topic的删除.使用该线程主要为了以后能够增加TTL(time to live)的特性.无论何时开启或重启topic删除操作时都会通知该线程.当前,

【原创】Kakfa utils源代码分析(三)

Kafka utils包最后一篇~~~ 十五.ShutdownableThread.scala 可关闭的线程抽象类! 继承自Thread同时还接收一个boolean变量isInterruptible表明是否允许中断.既然是可关闭的,因此一定不是守护线程,而是一个用户线程(不会阻塞JVM关闭).提供的方法有: 1. doWork: 抽象方法.子类必须实现这个方法,从名字来说应该是指定线程要完成的操作. 2. initiateShutdown: 发起关闭请求.首先通过CAS的方式判断是否线程在运行中

【原创】Kakfa utils源代码分析(一)

Kafka.utils,顾名思义,就是一个工具套件包,里面的类封装了很多常见的功能实现——说到这里,笔者有一个感触:当初为了阅读Kafka源代码而学习了Scala语言,本以为Kafka的实现会用到很多函数编程(Functional Programming, FP),结果目前来看,大部分还是很朴素地以面向对象的方式来实现的,只有很少一部分集合的处理使用诸如map,reduce这样的FP方式.不能不说有点小小的遗憾.——当然也许后面Kafka的核心代码中会看到更多FP的身影. 下图就是kafka.u

[Android]Fragment源代码分析(二) 状态

我们上一讲,抛出来一个问题,就是当Activity的onCreateView的时候,是怎样构造Fragment中的View參数.要回答这个问题我们先要了解Fragment的状态,这是Fragment管理中很重要的一环.我们先来看一下FragmentActivity提供的一些核心回调: @Override protected void onCreate(Bundle savedInstanceState) { mFragments.attachActivity(this, mContainer,

Android 中View的绘制机制源代码分析 三

到眼下为止,measure过程已经解说完了,今天開始我们就来学习layout过程.只是在学习layout过程之前.大家有没有发现我换了编辑器,哈哈.最终下定决心从Html编辑器切换为markdown编辑器.这里之所以使用"下定决心"这个词.是由于毕竟Html编辑器使用好几年了.非常多习惯都已经养成了,要改变多年的习惯确实不易.相信这也是还有非常多人坚持使用Html编辑器的原因. 这也反应了一个现象.当人对某一事物非常熟悉时,一旦出现了新的事物想代替老的事物时,人们都有一种抵触的情绪,做