【原创】Kafka console consumer源代码分析

上一篇中分析了Scala版的console producer代码,这篇文章来分析一下console consumer的工作原理。其实不论是哪个consumer,大部分的工作原理都是类似的。我们用console consumer作为切入点,既容易理解又不失一般性。

首先需要说明的,我使用的Kafka环境是Kafka0.8.2.1版本,这也是最新的版本。另外我们主要分析consumer的原理,没有过分纠结于console consumer的使用方法——所以我在这里选用了最简单的一条命令作为开始:bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test-topic

通过查看kafka-console-consumer.sh脚本,可以看到主要的调用方法是:exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleConsumer [email protected]

这个shell脚本调用了kafka.tools包下的ConsoleConsumer类,并将命令行参数全部传给了这个类。我们现在来看看这个类的主要逻辑。Console consumer的主要工作原理如下图所示:

从根本上来说,console consumer启动之后会创建一个KafkaStream不停地等待消费新的消息——具体原理是通过LinkedBlockingQueue阻塞队列来实现(后面会分析是怎么做的)。下面我们就从ConsoleConsumer.scala开始说起。

1. 先从ConsoleConsumer开始看

这个类有个main方法,可以看出这个是可执行的Scala类。类的前100多行几乎都在处理命令行参数的解析。其中真正必要的参数只有zookeeper.connect一个,如下面代码所示。

// REQUIRED表示这是一个必须要指定的参数
    val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The connection string for the zookeeper connection in the form host:port. " +
            "Multiple URLS can be given to allow fail-over.")
            .withRequiredArg
            .describedAs("urls")
            .ofType(classOf[String])

乍一看这和官网上说的似乎不匹配,因为官网中说consumer真正必要的参数实际上有两个:zookeeper.connect和group.id。很显然console consumer会自己生成group.id的值。console consumer本质上也是一个consumer,必然属于一个消费组,也有它自己的consumer id。下面的代码中展示了console consumer如何生成自己的group id: (consumer id的生成后面再说)

// 如果没有显式指定group.id,那么代码就自己合成一个
// 具体格式: console-consumer-[10万以内的一个随机数]
// 10万是一个很大的数,因此只有非常低的几率会碰到多个console consumer的group id相同的情况
if(!consumerProps.containsKey("group.id")) {
      consumerProps.put("group.id","console-consumer-" + new Random().nextInt(100000))
      groupIdPassed=false
    }

确定了consumer的group.id之后,下面就要开始把这些参数封装进ConsumerConfig类中并把后者传给Consumer的create方法以构造一个ConsumerConnector——即初始化consumer了,具体逻辑见下面的代码:

1 val config = new ConsumerConfig(consumerProps) // 封装ConsumerConfig配置类
2 val skipMessageOnError = if (options.has(skipMessageOnErrorOpt)) true else false
3 val messageFormatterClass = Class.forName(options.valueOf(messageFormatterOpt))  // 创建消息格式类,用于最后的输出显示
4 val formatterArgs = CommandLineUtils.parseKeyValueArgs(options.valuesOf(messageFormatterArgOpt))
5 val maxMessages = if(options.has(maxMessagesOpt)) options.valueOf(maxMessagesOpt).intValue else -1
6 val connector = Consumer.create(config) // 创建ConsumerConnector,Consumer核心接口

上面代码中的最后一句非常重要,因为console consumer在启动的时候必须要创建一个ConsumerConnector接口。该接口实际上是一个Scala的trait类型,类似于Java的interface,里面定义了很多抽象方法,比较重要的方法有createMessageStreams, createMessageStreamsFilter和commitOffsets等。Kakfa官网把这个接口称为high level的consumer API。 对于大多数consumer来说,这个high level的consumer API就已经足够了,但有些程序可能有重设位移这样的需求,此时Kafka推荐它使用low level的consumer API —— SimpleConsumer。okay,扯得有点远了,大家可以参考这篇文章来学习high level API的具体用法。

2. ConsumerConnector与它的实现类ZookeeperConsumerConnector

目前,Kafka代码中只定义了一个实现类,即ZookeeperConsumerConnector。在具体讨论这个类的构造之前,我们先理清一些基本概念。Kafka consumer的位移信息默认是保存在Zookeeper中的,具体的路径是/consumers/[groupId]/offsets/[topic]/[partitionId] -> long (offset)。Kafka 0.8.2版本开始支持在Kafka中保存consumer的位移,因为Kafka团队认为zookeeper并不适合频繁地做更新,因此Kafka用一个特殊topic来保存consumer的位移。我们可以使用offsets.storage属性来指定到底使用哪种存储来保存位移——值得注意的是,目前zookeeper还是默认的保存方式,本文也将以zookeeper的方式进行讨论。

继续说回ZookeeperConsumerConnector类实例的创建,这个类在创建的时候都做了什么呢?如果浏览该类的代码,可以发现这是一个很长的类。我们目前不用去管那些东西,只需要关注下面的几行语句:

 1 // topicRegistry是一个ConcurrentHashMap,key是topic名,value是另一个ConcurrentHashMap,后者的key是分区Id,后者的value是PartitionTopicInfo
 2 // PartitionTopicInfo数据结构很重要,里面包含了要处理的消息队列以及位移信息
 3 // 后面会说到这个变量
 4   private var topicRegistry = new Pool[String, Pool[Int, PartitionTopicInfo]]
 5 // 当前这个consumerConnector缓存过的topic分区对应的位移值,后面提交位移的时候需要使用它与要提交的位移进行比较,也是很重要的一个变量
 6   private val checkpointedZkOffsets = new Pool[TopicAndPartition, Long]
 7 // 定义了哪个topic的哪个消费线程所使用的消息阻塞队列
 8   private val topicThreadIdAndQueues = new Pool[(String, ConsumerThreadId), BlockingQueue[FetchedDataChunk]]
 9 // Kafka的调度器,不用太管它的具体实现。只需要知道它后面会创建一个定时任务用于定时提交位移到zookeeper中
10   private val scheduler = new KafkaScheduler(threads = 1, threadNamePrefix = "kafka-consumer-scheduler-")
11 ...
12 // 这段代码就是要计算consumer.id,可以看出格式就是[group id]_[主机名]_随机UUID的前8位——如果没有指定consumer id的话
13   val consumerIdString = {
14     var consumerUuid : String = null
15     config.consumerId match {
16       case Some(consumerId) // for testing only
17       => consumerUuid = consumerId
18       case None // generate unique consumerId automatically
19       => val uuid = UUID.randomUUID()
20       consumerUuid = "%s-%d-%s".format(
21         InetAddress.getLocalHost.getHostName, System.currentTimeMillis,
22         uuid.getMostSignificantBits().toHexString.substring(0,8))
23     }
24     config.groupId + "_" + consumerUuid
25 ...
26   connectZk()  // 创建一个zookeeper客户端用于操作ZK
27   createFetcher()  // 创建一个Fetcher,后面会展开细说
28   ensureOffsetManagerConnected() //如果是使用zookeeper来保存则没有作用

上面的代码基本覆盖了ZookeeperConsumerConnector启动时候的主要工作,其中值得细说的就是createFetcher方法,以下是它的代码:

1   private def createFetcher() {
2     if (enableFetcher)
3       fetcher = Some(new ConsumerFetcherManager(consumerIdString, config, zkClient))
4   }

ConsumerFetcherManager是为consumer使用的获取线程管理器。

未完待续。。。

				
时间: 2024-07-29 19:58:35

【原创】Kafka console consumer源代码分析的相关文章

【原创】Kafka console consumer源代码分析(二)

我们继续讨论console consumer的实现原理,本篇着重探讨ZookeeperConsumerConnector的使用,即后续所有的内容都由下面这条语句而起: val connector = Consumer.create(config) 那么问题来了?这条语句后面执行了什么呢?我们先看create方法的定义 def create(config: ConsumerConfig): ConsumerConnector = { val consumerConnect = new Zookee

【原创】kafka consumer源代码分析

顾名思义,就是kafka的consumer api包. 一.ConsumerConfig.scala Kafka consumer的配置类,除了一些默认值常量及验证参数的方法之外,就是consumer的配置参数了,比如group.id, consumer.id等,详细列表见官网. 二.ConsumerIterator.scala KafkaStream的迭代器类,当stream底层的阻塞队列为空时该iterator会置于阻塞状态.这个迭代器还提供了一个shutdownCommand对象可作为一个

【原创】Kakfa utils源代码分析(三)

Kafka utils包最后一篇~~~ 十五.ShutdownableThread.scala 可关闭的线程抽象类! 继承自Thread同时还接收一个boolean变量isInterruptible表明是否允许中断.既然是可关闭的,因此一定不是守护线程,而是一个用户线程(不会阻塞JVM关闭).提供的方法有: 1. doWork: 抽象方法.子类必须实现这个方法,从名字来说应该是指定线程要完成的操作. 2. initiateShutdown: 发起关闭请求.首先通过CAS的方式判断是否线程在运行中

【原创】Kakfa utils源代码分析(二)

我们继续研究kafka.utils包 八.KafkaScheduler.scala 首先该文件定义了一个trait:Scheduler——它就是运行任务的一个调度器.任务调度的方式支持重复执行的后台任务或是一次性的延时任务.这个trait定义了三个抽象方法: 1. startup: 启动调度器,用于接收调度任务 2. shutdown: 关闭调度器.一旦关闭就不再执行调度任务了,即使是那些晚于关闭时刻的任务. 3. schedule: 调度一个任务的执行.方法接收4个参数 3.1 任务名称 3.

【原创】Kakfa utils源代码分析(一)

Kafka.utils,顾名思义,就是一个工具套件包,里面的类封装了很多常见的功能实现——说到这里,笔者有一个感触:当初为了阅读Kafka源代码而学习了Scala语言,本以为Kafka的实现会用到很多函数编程(Functional Programming, FP),结果目前来看,大部分还是很朴素地以面向对象的方式来实现的,只有很少一部分集合的处理使用诸如map,reduce这样的FP方式.不能不说有点小小的遗憾.——当然也许后面Kafka的核心代码中会看到更多FP的身影. 下图就是kafka.u

Kafka SocketServer源代码分析

Kafka SocketServer源代码分析 标签: kafka 本文将详细分析Kafka SocketServer的相关源码. 总体设计 Kafka SocketServer是基于Java NIO来开发的,采用了Reactor的模式,其中包含了1个Acceptor负责接受客户端请求,N个Processor负责读写数据,M个Handler来处理业务逻辑.在Acceptor和Processor,Processor和Handler之间都有队列来缓冲请求. kafka.network.Accepto

Jafka源代码分析——随笔

Kafka是一个分布式的消息中间件,可以粗略的将其划分为三部分:Producer.Broker和Consumer.其中,Producer负责产生消息并负责将消息发送给Kafka:Broker可以简单的理解为Kafka集群中的每一台机器,其负责完成消息队列的主要功能(接收消息.消息的持久化存储.为Consumer提供消息.消息清理.....):Consumer从Broker获取消息并进行后续的操作.每个broker会有一个ID标识,该标识由人工在配置文件中配置. Kafka中的消息隶属于topic

Spark SQL 源代码分析之 In-Memory Columnar Storage 之 in-memory query

/** Spark SQL源代码分析系列文章*/ 前面讲到了Spark SQL In-Memory Columnar Storage的存储结构是基于列存储的. 那么基于以上存储结构,我们查询cache在jvm内的数据又是怎样查询的,本文将揭示查询In-Memory Data的方式. 一.引子 本例使用hive console里查询cache后的src表. select value from src 当我们将src表cache到了内存后,再次查询src,能够通过analyzed运行计划来观察内部调

Android init源代码分析(2)init.rc解析

本文描述init.rc脚本解析以及执行过程,读完本章后,读者应能 (1) 了解init.rc解析过程 (2) 定制init.rc init.rc介绍 init.rc是一个文本文件,可认为它是Android系统启动脚本.init.rc文件中定义了环境变量配置.系统进程启动,分区挂载,属性配置等诸多内容.init.rc具有特殊的语法.init源码目录下的readme.txt中详细的描述了init启动脚本的语法规则,是试图定制init.rc的开发者的必读资料. Android启动脚本包括一组文件,包括