Kafka日志包是提供的是日志管理系统。主要的类是LogManager——该类负责处理所有的日志,并根据topic/partition分发日志。它还负责flush策略以及日志保存策略。Kafka日志本身是由多个日志段组成(log segment)。一个日志是一个FileMessageSet,它包含了日志数据以及OffsetIndex对象,该对象使用位移来读取日志数据。
下面我们一个一个地分析,先说最简单的:
一、LogConfig.scala
该scala定义了Defaults object,里面包含了很多默认值:
1. SegmentSize: 对应于log.segment.bytes属性。日志段大小,默认是1GB
2. SegmentMs: 对应于log.roll.hours属性。Kafka强制切分一个日志的时间间隔,默认是7天
3. SegmentJitterMs: 对应于log.roll.jitter.{ms,hours}属性。随机的一个抖动时间,默认是0,如果不为空,则randomSegmentJitter方法返回一个随机的抖动时间(单位是ms)
4. FlushInterval: 对应于log.flush.interval.messages属性。冲刷消息数——即在flush之前写入log的消息数。server.properties默认值是10000条
5. FlushMs: 对应于log.flush.interval.ms属性。在强迫一次flush之前一条信息保存在内存中的最大时间,默认是1秒。通常不推荐设置该值
6. RetentionSize: 对应于log.retention.bytes属性。为每个topic-分区保存的log字节数。该值乘以总分区数就是总的topic日志大小。如果字节数比这个大,该log的segment会被删除。server.properties中设置为1GB,但未启用。默认值是-1表示无限制。
7. RetentionMs: 对应于log.retention.{ms,minutes,hours},表示日志段保存最大时间,默认是7天。
8. MaxMessageSize: 对应于message.max.bytes属性。表示一个服务器能够接收处理的消息的最大字节数,注意这个值不要大于fetch.message.max.bytes属性的值。该值默认是1000012字节,大概900KB+
9. MaxIndexSize: 对应于log.index.size.max.bytes属性。Kafka中的索引文件包含的是位移与文件位置的映射关系。该值代表了一个索引文件的最大字节数,默认是10MB。通常不需要设置该值。
10. IndexInterval: 对应于log.index.interval.bytes属性。索引项之间的字节间隔。该值越大,索引文件也就越大,但需要扫描的项也就越少。通常不需要单独设置该值。
11. FileDeleteDelayMs: 对应于log.cleaner.delete.retention.ms属性。保存已删除日志的时间,默认是1天
12. DeleteRetentionMs: 对应于log.segment.delete.delay.ms属性。删除文件系统上文件的等待时间,默认是1分钟
13. MinCleanableDirtyRatio: 对应于min.cleanable.dirty.ratio属性,默认是50%,表示需要清除的脏日志与所有日志的比率
14. Compact: 对应于log.cleanup.policy属性,该属性有两个值:delete和compact。默认是delete,即删除满足规则的日志;如果设置为compact表示启用日志压缩。
15. UncleanLeaderElectionEnable: 对应于unclean.leader.election.enable属性,默认是true,该属性表示是否允许不在ISR中的副本选举leader,不过这么做有可能会造成数据丢失。
16. MinInSyncReplicas: 对应于min.insync.replicas属性。默认值是1,该属性规定了最小的ISR数
case类LogConfig的构造函数就接收以上这么多参数(当然,有几个直接使用了默认值就没有指定)。最主要的方法就是toProps,将这么多属性封装到一个Properties对象中,当然每个属性的key名称来自于LogConfig object中的定义。LogConfig object中除了定义这些属性名称之外,还定义了一个Set集合保存这些配置属性名称。该object提供的其他方法如下:
1. fromProps: 解析给定的Properties对象封装到一个LogConfig对象中。
2. validateNames: 检查给定的Properties对象中是否存在未知的属性名称——即不在那个属性名称集合中的属性名
3. validateMinInSyncReplicas: 检查最少ISR数值是否非法,比如小于1
4. validate: 先检查属性名称合法性,再检查ISR数值合法性,最后尝试解析这些属性——检查属性完备性和值的合法性
二、CleanerConfig.scala
Kafka除了提供基于日志大小和时间规则的切分,还提供一种日志压缩的特性,只保存每个key的最后值来压缩保存的offset数。——该特性是通过一组log cleaner线程来完成的。Log cleaner的配置case类——log cleaner就是一组后台线程协助完成日志压缩的(log compact)。此配置类中关心的参数有:
1. numThreads: 开启的cleaner线程数,默认是1
2. dedupeBufferSize: log cleaner线程使用的内存空间用于deduplicate,默认是4MB,每个线程不能使用超过2GB。在KafkaConfig中会读取log.cleaner.dedupe.buffer.size属性,并将其值复制给该字段。
3. dedupeBufferLoadFactor: log cleaner使用的哈希表的加载因此(factor load),对应于log.cleaner.io.buffer.load.factor属性,默认是0.9,一般不需要修改此值。
4. ioBufferSize: 所有cleaner线程被允许进行读写的最大字节数,对应于log.cleaner.io.buffer.size属性,默认是512KB。通常不需要修改此值
5. maxMessageSize: 对应于message.max.bytes属性,表示log中能够容纳消息的最大字节数
6. maxIoBytesPerSecond: log cleaner在执行日志压缩操作时的最大IO字节数,对应于log.cleaner.io.max,bytes.per.second属性。主要结合Throttler类使用起到限流作用。
7. backOffMs: 定期检查log是否需要执行log clean的时间间隔,主要给一个CountDownLatch使用。
8. enableCleaner: 对应于log.cleaner.enable属性,标识是否开启日志压缩(log compaction)
9. hashAlgorithm: 在键压缩过程使用的哈希算法,默认值是MD5
三、OffsetPosition.scala
一个非常的简单的case类,保存逻辑offset与log物理实际位置的映射
四、OffsetMap.scala
顾名思义,应该是key=>offset的map映射。定义了OffsetMap trait,里面包含6个抽象方法: slots, put, get, clear, size和utilization。我们在它的实现类SkimpyOffsetMap时才具体分析下这6个方法都是做什么的。下面就说说它的实现类: SkimpyOffsetMap。这是一个哈希表的实现,用于删除日志中的重复数据。不过它不允许删除记录。SkimpyOffsetMap构造函数接收两个参数: 一个整数表示该哈希map占用的ByteBuffer字节数,而另一个字段hashAlgorithm表示所使用的哈希算法,默认是使用MD5。该类还创建了9个私有字段:
- bytes: 一段ByteBuffer,用来表示底层保存的hashmap
- digest: 使用java.security.MessageDigest.getInstance方法根据传入的算法名(默认是MD5)创建的哈希算法实例
- hashSize: 返回数字摘要的字节长度
- hash1/hash2: 根据hashSize创建两个字节数组保存hash的key
- entries: hashmap当前保存的记录数
- lookups: 查询该hashmap次数
- probes: 该hashmap探查次数——哈希探查是解决哈希冲突的一张方式。有兴趣的话读者可以搜搜开放寻址
- bytesPerEntry: hashSize + 8个字节的位移。因为MD5数字摘要通常产生128位,也就是16个字节,因此这个值通常都是24字节——也就是每条记录需要24个字节。如果是long来表示的话,就是需要3个long整型——这点需要记住!
okay,说完了那些成员字段,我们来说说这个类提供的方法,除了实现基类或trait定义的那6个之外,它还定义了很多辅助方法,我们还是一个一个说:
1. size: 直接返回entries保存的值,即由entries保存该hashmap中当前的记录数
2. hashInto: 使用hashmap的某个key更新MessageDigest的摘要,并根据该摘要计算哈希码写入给定的ByteBuffer
3. slots: 总的hashmap槽数,通过构造函数中给定的字节数除以每条记录数计算而成。因为该类只是用于log cleaner的,因此假设我们设置了log.cleaner.dedupe.buffer.size是500MB, cleaner线程数是4的话,那么计算出来的slots数就是500 * 1024 * 1024(500MB) / 4(线程数) / 24 (每条记录占用的字节数) = 5461333,也就是该hashmap只能容纳5461333条记录
4. isEmpty: 判断在给定的位置上是否无记录。还记得刚才说过一条记录需要24个字节吧? 具体逻辑是从给定位置、给定位置+1个long长度以及给定位置+2个long长度的位移处读取3个long型数据,如果都是0,表明该position确实无记录。——不过现在在代码中写死了是测试3个long长度,但如果使用的是SHA-256算法,估计就要有问题了,因为SHA-256本身就需要32个字节,再加上offset8个字节,一条记录是40个字节。只测试3个long长度显然不够吧。当然了,这只是笔者的猜测而已。
5. positionOf: 主要作用是获得第i次探查的位置,每次探查都将probes加1。
6. put: 将给定的key与给定的offset关联。每次关联前都将lookups数+1,然后循环调用isEmpty和positionOf方法:如果当前的位置处不为空,那么比较key与bytes中的数据,如果一致,说明该key以前在hashmap中保存过,那么使用给定的offset直接覆盖;如果不匹配,那么就继续寻找空槽。一旦找到空槽,直接定位到该位置然后将key字节数组写入到hashmap上,然后将offset加到bytes,然后为记录数+1返回。
7. get: 从Hashmap中获取指定key的offset。类似地,先将key打入到hash1(前面可能忘了说了,hash1和hash2是两个大小为16的字节数组,hash1固定保存key,而hash2用于保存从bytes中读取到的16字节以判断是否和hash1相同,如果相同表示hashmap中已保存过hash1表示的key)。具体方法就是从位置0来时不断探查,如果找到和key相同的即返回其offset;如果就找到一个空槽说明不存在该key直接返回-1——一个无效的offset
8. clear: 清除hashmap的记录数、查询数以及探查数,另外把底层的字节数组全部填0处理
9. collisionRate: hash冲突比率,不过貌似没有用。
五、OffsetIndex.scala
OffsetIndex,顾名思义,就是位移索引,作用是使用位移来读取日志中的数据。所谓的一个索引就是将位移映射为某个日志segment的物理文件位置。保存索引的文件是预先分配好的,文件中每一项都是8个字节(4个字节的相对位移+该位移标识的消息所在的文件位置,也是4个字节)。具体能保存多少项由属性segment.index.bytes控制,默认是10MB。索引查询的物理文件实际上一个MappedByteBuffer,使用一个二分查找的方式来定位小于等于目标offset的最大offset所对应的物理文件位置。
索引文件可以被追加写入索引也可以作为一个只读索引文件进行查询。后面在该类中提供了makeReadOnly方法就是将可变的索引文件转换成一个不可变的索引文件——这对于实现文件切分功能非常有用。另外,因为是索引文件,不对该文件的校验码做验证。
前面提到了文件中的每一项都是8个字节,4个字节的相对位移加上4个字节的文件位置。这里的位移是相对于基础位移(base offset)的。举个例子,如果基础位移是50,那么位移55保存的值就是5。使用相对位移就将位移信息压缩至4个字节,不需要使用8个字节来保存。但既然是相对位移,就需要将相对位移转换成绝对位移,但使用该类的用户不需要去关心这些。
下面开始分析代码,首先从构造函数开始,该构造函数接收三个变量:一个表示索引文件的文件变量;一个基础offset和一个变量表示最大的索引文件字节数。该类还定义了一些类成员变量和很多方法,我们一个一个分析:
1. lock: 私有字段,使用ReentrantLock实现,用于同步访问MappedByteBuffer。ReentrantLock提供与synchronized相同的内存和并发性语义,另外性能也更好。
2. roundToExactMultiple: 私有方法,就是计算小于第一个参数的第二个参数的最大整数倍,比如roundToExactMultiple(67, 8)返回64
3. mmap: 私有字段,负责初始化包含该索引的内存映射对象。首先检查给定的File对象,如果不存在的话预先创建出来并设定长度为maxIndexSize,并设定好开始的位置之后返回。
4. size: 私有字段,索引文件中当前保存的索引项(每项都是8字节)
5. maxEntries: 成员变量,索引文件能包含的最大索引项个数
6. relativeOffset: 返回根据base offset的第n个位移。假设n是5,每项是8个字节,那么相对位移的值(使用4个字节)必然是保存在buffer的第40个字节到第43个字节。
7. physical: 获取第n个位移对应的物理文件位置(依然是4个字节)——还是假设n=5, 那么返回的值就是从44字节~47字节处保存的值。
8. readLastEntry: 读取索引文件中最后一项对应的OffsetPosition
9. lastOffset: 返回索引文件中最后一个索引项的位移
10. maybeLock: 在一个锁保护的情况下执行给定的方法
11. indexSlotFor: 以二分查找的方式寻找比给定offset小的最大offset。当然了,如果最小的位移都比给定的offset大或者索引文件干脆就是空的话直接返回-1
12. lookup: 计算比给定offset小的最大位移,找到后返回offset-对应物理文件位置的映射对
13. entry: 返回索引文件中的第n个位移映射对
14. append: 从给定的offset-position处插入一索引项。既然叫append,该项给定的offset必须比现有的所有索引项都要大
15. isFull: 判断该索引文件是否已满
16. truncate: 删除所有索引项
17. truncateToEntries: 删除索引项到给定的数目
18. truncateTo: 删除那些位移不小于给定offset的所有索引项
19. resize: 重设索引文件的大小——主要用于新的日志segment切分时候调用。需要注意的是代码中区分了操作系统,因为Windows平台不允许调整内存映射文件的长度
20. forceUnmap: 主要为Windows平台上使用。因为在Windows平台上修改文件长度时需要先释放内存映射对象
21. trimToValidSize: 调整为当前索引文件的真实占用字节大小
22. flush: 调用MappedByteBuffer的force方法将对buffer的修改写入底层的文件
23. delete: 删除该索引文件
24. entries: 返回索引文件中的索引项数
25. sizeInBytes: 索引文件当前使用的索引项字节总数
26. close: 调用trimToValidSize方法关闭索引
27. renameTo: 重命名索引文件名称
28. sanityCheck: 对索引文件进行完整性检查,包括索引文件字节数是否为8的整数倍、当前最大位移是否小于基础位移等
六、FileMessageSet.scala
非线程安全的文件消息集合,继承了MessageSet抽象类。FileMessageSet类有一个起始和结束的指针标识消息集合的起始位置和结束位置——这样就能实现从整个消息集合中切片的功能。该类有5个构造函数参数:
1. file: 日志文件
2. channel: 底层使用到的文件通道(file channel)
3. start/end: 消息集合在文件中的绝对起始位置/绝对结束位置
4. isSlice: 是否从整个消息集合中切分处一个切片
除了主构造函数之外还提供了很多便利的辅助构造函数。另外FileMessageSet类定义了一个_size变量,用于保存消息集合的字节数(同时考虑了是否支持切片)。如果不是一个切片,则将底层文件通道的指针移到最后一个字节。该类定义的方法如下:
1. read: 从日志文件中的指定位置读取指定大小的buffer并封装到一个FileMessageSet对象返回。
2. sizeInBytes: 该文件消息集合字节数
3. searchFor: 从给定位置处开始向后寻找不小于targetOffset的位移,并返回实际的物理文件位置。如果没有找到的话,直接返回null
4. writeTo: 写入这个消息集合到指定的channel,允许从指定的位置写入指定大小的字节数,并返回真实写入的字节数
5. iterator: 获取遍历该消息集合的迭代器,只做一层迭代
6. append: 将保存在一个ByteBuffer中的一组消息追加到指定的该消息集合所在的channel尾部并增加总的消息集合字节数
7. flush: 提交所有已写数据到物理磁盘
8. close: 先调用flush存磁盘,然后关闭channel
9. delete: 从文件系统中删除消息集合
10. truncateTo: 将文件消息集合截断成指定的字节大小
11. readInto: 将底层的文件从给定的位置开始读取内容到一个ByteBuffer中
12. renameTo: 更名消息集合底层的文件名
除了FileMessageSet类,该scala还定义了一个object: LogFlushStats——里面只定义了一个定时器,用于统计写入日志段到文件的时间
七、LogSegment.scala
日志都是分段(segment)的,每个日志段有两个组成部分:日志和索引。日志是FileMessageSet对象,该对象包含了真正的消息;而索引部分就是OffsetIndex对象。每个日志段都有一个基础位移。基础位移不大于该日志段中最小的位移,但却一定大于以前日志段的所有位移。
假设一个日志段的基础位移是a,那么Kafka会保存两个文件: a.index和a.log
该scala文件有一个非线程安全的类:LogSegment,用于表示日志段。该类的构造函数有6个参数,分别是:
1. log: FileMessageSet定义的消息集合
2. index: OffsetIndex定义的位移索引,包含了位移到物理文件位置的映射
3. baseOffset: 日志文件的基础位移,也就是这个日志段中最低的位移
4. indexIntervalBytes: 索引文件中索引项的间隔,即Kafka查找下一个物理位置时进行线性查找的最大字节数。
5. rollJitterMs: 指定日志段切分时的jitter time,避免日志切分时出现惊群
6. time: 一个时间变量,主要提供时间方面的服务
下面对LogSegment的一些关键代码进行分析:
1. created变量: 创建一个日志段的时间信息是很有用的,所以需要有个变量保存这个信息
2. size方法: 既然是保存消息的日志段,也必然有个方法保存当前日志段占用的字节数,具体实现方法就是调用LogSegment包含的日志对象的size方法
3. bytesSinceLastIndexEntry变量: 这个变量主要的作用就是用于判断在追加写日志的同时是否需要增加一条索引项。由于log.index.interval.bytes默认是4KB,因此每写4KB就会在索引文件中增加一条索引记录。增加索引项之后需要将该变量置为0重新计算
4. lastModified以及lastModified_方法: Kafka在清理日志段的时候根据当前时间与该方法返回值比较清理那些陈旧的日志段并且根据UAP原则提供了同名的setter方法用于更新日志段对象中日志文件和索引文件的最近修改时间。
5. delete方法: 逻辑很简单的方法,就是删除日志文件和索引文件
6. close方法: 关闭日志段的方法,具体就是关闭底层的日志文件和索引文件
7. changeFileSuffixes方法: 同时更改日志文件和索引文件的后缀名。例如在删除日志段的时候把a.log和a.index更名为a.log.delete和a.index.delete
8. flush方法: 将buffer中的消息和索引项写入磁盘
9. nextOffset方法: 计算这个日志段中下一条消息的位移。这个方法运行起来是有很高的代价的,因为它需要从索引文件中最后一项标识的位移处开始读出一个消息集合。特别注意的是如果索引文件为空的话,它就需要将整个日志段的数据都读出来并返回一个FetchDataInfo对象。这个对象由一个位移元数据加上一个消息集合组成。如果这个FetchDataInfo为空,或者它包含的消息集合为空就只返回baseOffset——即这个日志段开始offset,否则返回offset+1 (主要是因为消息集合本身也就是一组MessageAndOffset对象)
10. truncateTo方法: 给定一个位移,将位于该位移之后的所有索引项和日志项全部清除,如果给定的位移大于日志段本身的最大位移就什么都不做。最后函数返回日志数据总共截断的字节数。值得注意的是,如果把所有日志数据都截断了,那么需要更新这个日志段的创建日期。同时还会将检查是否增加索引项的指针清零。
11. append方法: 将一组消息追加写入到以给定offset开始的日志段中。如果写入超过了4KB(默认的log.index.interval.bytes属性值)则额外写入一条新的索引项记录到索引文件中。这个方法不是线程安全的,所以后面调用的时候需要有锁同步机制的保护
12. translateOffset方法: 给定一个offset,找出该日志段中不小于该offset的第一条消息对应的物理文件位置。这个方法还有一个参数可以用来调优,不必从查询到的索引项中包含的位置开始,可以直接从给定的文件位置开始查找。当然这样做的前提是你必须已经知道这是文件中的一个合法的开始位置并且比最靠近的索引项中包含的位置要大。
13. read方法: 给定一个offset,从不小于这个offset处的第一条开始读消息,不能超过maxSize个字节,也必须在maxOffset(如果提供了maxOffset)处结束——读到的这些消息封装到一个FetchDataInfo对象返回。FetchDataInfo由一个日志位移元数据LogOffsetMetadata对象和一个消息集合组成,所谓的LogOffsetMetadata就是由消息offset加上该日志段的基础位移再加上日志段内的相对物理位置组成。这个方法有一个关键的问题是,要读取消息集合到底多少字节?如果给定的maxSize是0,那么就返回一个空的消息集合。如果maxSize大于0且没有指定maxOffset,那么就表示我们能够读取最多maxSize字节的消息;而如果maxSize>0且指定了maxOffset,程序就需要计算一下maxOffset所表示的物理文件位置与起始位置的差距和maxSize谁大谁小——同时也只能选取小的作为最终的可读取字节数
14. recover方法: 恢复一个日志段——即根据日志文件重建索引并砍掉那些无效的字节,所谓的无效字节就是由参数限定的,任何在maxMessageSize之外的字节都是为无效状态。该方法实现也很简单,就是先将索引项全部截断并将索引文件重置为原来的大小,然后遍历该消息集合,超过indexIntervalBytes之后就追加一条索引记录从而达到重建索引的目的