【原创】Kakfa utils源代码分析(一)

Kafka.utils,顾名思义,就是一个工具套件包,里面的类封装了很多常见的功能实现——说到这里,笔者有一个感触:当初为了阅读Kafka源代码而学习了Scala语言,本以为Kafka的实现会用到很多函数编程(Functional Programming, FP),结果目前来看,大部分还是很朴素地以面向对象的方式来实现的,只有很少一部分集合的处理使用诸如map,reduce这样的FP方式。不能不说有点小小的遗憾。——当然也许后面Kafka的核心代码中会看到更多FP的身影。

下图就是kafka.utils包的所有代码:

因为很难像其他包代码之间有逻辑关系,我们就一个一个说吧:

一、Annotations.scala

这个源代码文件中定义了3个注释类:threadsafe、nonthreadsafe和immutable。它们都继承了StaticAnnotation——Scala提供的StaticAnnotation类似于Java中的@Target(ElementType.TYPE),因此主要的作用域是类和接口。具体到这三个元注解(meta-annotation),很容易知道它们的含义:分别标记线程安全、非线程安全和不可变性。Kafka开发中常用到的SimpleConsumer类就是被标记为@threadsafe的。

二. CommandLineUtils.scala

这个文件使用JOpt Simple库负责解析命令行参数,具体使用用法参见官网:http://pholser.github.io/jopt-simple/

Kafka在这个文件中提供了一个object:CommandLineUtils。具体包含的方法有:

1. printUsageAndDie: 打印命令使用方法并终止程序

2. checkRequiredArgs:使用Jopts Simple的API(以下皆同)检查是否缺少必要参数

3. checkInvalidArgs:检查指定的参数是否存在不兼容情况,即哪些参数不能同时使用

4. parseKeyValueArgs:解析key=value格式的参数对,并返回一个Properties对象

三、Crc32.scala

这个类就是CRC32校验码的实现类,来自于Hadoop提供的PureJavaCrc32类——CRC32校验码的纯Java实现版本。这个类很长,里面有很多位操作,由于CRC32计算不在本次研究范围,所以就了解到这吧。

四、DelayedItem.scala

这个类是个泛型类,实现了java.util.Delayed接口。用于标记那些在给定延迟时间之后执行的对象。该类接收一个泛型T,一个延迟时间以及延迟时间的单位。另外,实现这个接口的话必须要实现一个compareTo和getDelay方法。

1. getDelay: 计算距离触发时间还剩下多长时间

2. compareTo: 比较2个Delayed对象的延迟触发时间

五、FileLock.scala

顾名思义,FileLock就是一个文件锁,它的构造函数接收一个文件对象,并总是先尝试创建这个文件(如果不存在的话),然后创建一个FileChannel对象对该文件进行随机读写操作。同时创建一个java.nio.channel.FlieLock文件锁对象用于实现下面的方法:

1. lock: 对文件加锁,如果该文件上已有锁抛出异常

2. tryLock: 尝试对文件加锁,如果成功返回true,否则返回false

3. unlock: 如果持有锁使用FileLock.release方法释放锁

4. destroy: 先释放锁然后调用FileChannel的close方法销毁该channel

六、IteratorTemplate.scala

这个文件视图定义一个迭代器模板,主要为遍历消息集合使用。迭代器模板有一个状态字段,因此在定义迭代器模板抽象类之前首先定义了一个State状态object,以及一组具体的状态object:完成(DONE),READY(准备就绪),NOT_READY(未准备)和FAILED(失败)。

之后就是定义IteratorTemplate抽象类了,它同时实现了trait Iterator和java Iterator接口——可谓迭代器领域的集大成者:)

如前所述,该类有个字段表明了迭代器的状态:state,还有一个nextItem字段执行遍历中的下一个对象,当然初始化为null——说起null,想到一个题外话。我很怀疑Kafka的开发人员是深度的Java编程人员亦或是强面向对象开发人员,Scala推荐使用Option来代替null的,可Kafka的代码中null还是随处可见,当然可能也是为了更好更自然地与Java集成。

这个抽象类提供很多方法,但似乎只有一个抽象方法:makeNext,其他全是具体方法:

1. next:如果迭代器已遍历完并无法找到下一项或下一项为空,直接抛出异常;否则将状态置为NOT_READY并返回下一项

2. peek:只是探查一下迭代器是否遍历完,如果是抛出异常,否则直接返回下一项,并不做非空判断,也不做状态设置

3. hasNext: 如果状态为FAILED直接抛出异常,如果是DONE返回false,如果是READY返回true,否则调用maybeComputeNext方法

4. makeNext: 返回下一项,这是你需要唯一需要实现的抽象方法。同时你还需要在该方法中对状态字段进行更新

5. maybeComputeNext:调用makeNext获取到下一项,如果状态是DONE返回false,否则返回true并将状态置为READY

6. allDone: 将状态置为DONE并返回null

7. resetStatus:顾名思义,就是重置状态字段为NOT_READY

七、JSON.scala

JSON的一个封装类,用于JSON到String的相互转换,该类不是线程安全的。Scala提供的JSON是将数字型的字符串转化为Double,不过该类创建一个简单函数用于将数字型字符串转为换Integer,并指定其为JSON.globalNumberParser。该类只有2个方法:

1. parseFull: 调用scala JSON的parseFull方法将一个json字符串转化为一个对象,如果出错则抛出异常

2. encode: 讲一个对象编码成json字符串。这个对象只能是null,Boolean,String,Number,Map[String, T],Array[T]或Iterable[T]中的一种,否则会报错

时间: 2024-10-08 02:21:11

【原创】Kakfa utils源代码分析(一)的相关文章

【原创】Kakfa utils源代码分析(三)

Kafka utils包最后一篇~~~ 十五.ShutdownableThread.scala 可关闭的线程抽象类! 继承自Thread同时还接收一个boolean变量isInterruptible表明是否允许中断.既然是可关闭的,因此一定不是守护线程,而是一个用户线程(不会阻塞JVM关闭).提供的方法有: 1. doWork: 抽象方法.子类必须实现这个方法,从名字来说应该是指定线程要完成的操作. 2. initiateShutdown: 发起关闭请求.首先通过CAS的方式判断是否线程在运行中

【原创】Kakfa utils源代码分析(二)

我们继续研究kafka.utils包 八.KafkaScheduler.scala 首先该文件定义了一个trait:Scheduler——它就是运行任务的一个调度器.任务调度的方式支持重复执行的后台任务或是一次性的延时任务.这个trait定义了三个抽象方法: 1. startup: 启动调度器,用于接收调度任务 2. shutdown: 关闭调度器.一旦关闭就不再执行调度任务了,即使是那些晚于关闭时刻的任务. 3. schedule: 调度一个任务的执行.方法接收4个参数 3.1 任务名称 3.

Spark SQL之External DataSource外部数据源(二)源代码分析

上周Spark1.2刚公布,周末在家没事,把这个特性给了解一下,顺便分析下源代码,看一看这个特性是怎样设计及实现的. /** Spark SQL源代码分析系列文章*/ (Ps: External DataSource使用篇地址:Spark SQL之External DataSource外部数据源(一)演示样例 http://blog.csdn.net/oopsoom/article/details/42061077) 一.Sources包核心 Spark SQL在Spark1.2中提供了Exte

Kafka SocketServer源代码分析

Kafka SocketServer源代码分析 标签: kafka 本文将详细分析Kafka SocketServer的相关源码. 总体设计 Kafka SocketServer是基于Java NIO来开发的,采用了Reactor的模式,其中包含了1个Acceptor负责接受客户端请求,N个Processor负责读写数据,M个Handler来处理业务逻辑.在Acceptor和Processor,Processor和Handler之间都有队列来缓冲请求. kafka.network.Accepto

pomelo源代码分析(一)

千里之行始于足下,一直说想了解pomelo,对pomelo有兴趣,但一直迟迟没有去碰,尽管对pomelo进行源代码分析,在网络上肯定不止我一个,已经有非常优秀的前辈走在前面,如http://golanger.cn/,在阅读Pomelo代码的时候,已经连载到了11篇了,在我的源代码分析參考了该博客,当然,也会添?我对pomelo的理解,借此希望能提高一下自己对node.js的了解和学习一些优秀的设计. 开发环境:win7 调试环境:webstorm5.0 node.js版本号:v0.8.21 源代

UiAutomator喷射事件的源代码分析

上一篇文章<UiAutomator源代码分析之UiAutomatorBridge框架>中我们把UiAutomatorBridge以及它相关的类进行的描写叙述,往下我们会尝试依据两个实例将这些类给串联起来,我准备做的是用例如以下两个非常有代表性的实例: 注入事件 获取控件 这一篇文章我们会通过分析UiDevice的pressHome这种方法来分析UiAutomator是怎样注入事件的,下一篇文章会描写叙述怎样获取控件,敬请期待. 1. UiObject.pressHome顺序图 首先我们看一下我

Spark SQL 源代码分析之 In-Memory Columnar Storage 之 in-memory query

/** Spark SQL源代码分析系列文章*/ 前面讲到了Spark SQL In-Memory Columnar Storage的存储结构是基于列存储的. 那么基于以上存储结构,我们查询cache在jvm内的数据又是怎样查询的,本文将揭示查询In-Memory Data的方式. 一.引子 本例使用hive console里查询cache后的src表. select value from src 当我们将src表cache到了内存后,再次查询src,能够通过analyzed运行计划来观察内部调

Android init源代码分析(2)init.rc解析

本文描述init.rc脚本解析以及执行过程,读完本章后,读者应能 (1) 了解init.rc解析过程 (2) 定制init.rc init.rc介绍 init.rc是一个文本文件,可认为它是Android系统启动脚本.init.rc文件中定义了环境变量配置.系统进程启动,分区挂载,属性配置等诸多内容.init.rc具有特殊的语法.init源码目录下的readme.txt中详细的描述了init启动脚本的语法规则,是试图定制init.rc的开发者的必读资料. Android启动脚本包括一组文件,包括

STL源代码分析——STL算法sort排序算法

前言 因为在前文的<STL算法剖析>中,源代码剖析许多,不方便学习,也不方便以后复习.这里把这些算法进行归类,对他们单独的源代码剖析进行解说.本文介绍的STL算法中的sort排序算法,SGI STL中的排序算法不是简单的高速排序,而是交叉利用各种排序:堆排序.插入排序和高速排序:这样做的目的是提高效率.针对数据量比較大的採用高速排序,数据量比較小的能够採用堆排序或插入排序. 本文介绍了有关排序的算法random_shuffle.partition.stable_partition.sort.s