漫游Kafka实现篇之消息和日志

原文地址:http://blog.csdn.net/honglei915/article/details/37760631

消息格式

消息由一个固定长度的头部和可变长度的字节数组组成。头部包含了一个版本号和CRC32校验码。

	/**
	 * 具有N个字节的消息的格式如下
	 *
	 * 如果版本号是0
	 *
	 * 1. 1个字节的 "magic" 标记
	 *
	 * 2. 4个字节的CRC32校验码
	 *
	 * 3. N - 5个字节的具体信息
	 *
	 * 如果版本号是1
	 *
	 * 1. 1个字节的 "magic" 标记
	 *
	 * 2.1个字节的参数允许标注一些附加的信息比如是否压缩了,解码类型等
	 *
	 * 3.4个字节的CRC32校验码
	 *
	 * 4. N - 6 个字节的具体信息
	 *
	 */

日志

一个叫做“my_topic”且有两个分区的的topic,它的日志有两个文件夹组成,my_topic_0和my_topic_1,每个文件夹里放着具体的数据文件,每个数据文件都是一系列的日志实体,每个日志实体有一个4个字节的整数N标注消息的长度,后边跟着N个字节的消息。每个消息都可以由一个64位的整数offset标注,offset标注了这条消息在发送到这个分区的消息流中的起始位置。每个日志文件的名称都是这个文件第一条日志的offset.所以第一个日志文件的名字就是00000000000.kafka.所以每相邻的两个文件名字的差就是一个数字S,S差不多就是配置文件中指定的日志文件的最大容量。

消息的格式都由一个统一的接口维护,所以消息可以在producer,broker和consumer之间无缝的传递。存储在硬盘上的消息格式如下所示:

消息长度:     4 bytes (value: 1+4+n)
版本号:       1 byte
CRC校验码:    4 bytes
具体的消息:   n bytes

写操作

消息被不断的追加到最后一个日志的末尾,当日志的大小达到一个指定的值时就会产生一个新的文件。对于写操作有两个参数,一个规定了消息的数量达到这个值时必须将数据刷新到硬盘上,另外一个规定了刷新到硬盘的时间间隔,这对数据的持久性是个保证,在系统崩溃的时候只会丢失一定数量的消息或者一个时间段的消息。

读操作

读操作需要两个参数:一个64位的offset和一个S字节的最大读取量。S通常比单个消息的大小要大,但在一些个别消息比较大的情况下,S会小于单个消息的大小。这种情况下读操作会不断重试,每次重试都会讲读取量加倍,直到读取到一个完整的消息。可以配置单个消息的最大值,这样服务器就会拒绝大小超过这个值的消息。也可以给客户端指定一个尝试读取的最大上限,避免为了读到一个完整的消息而无限次的重试。

在实际执行读取操纵时,首先需要定位数据所在的日志文件,然后根据offset计算出在这个日志中的offset(前面的的offset是整个分区的offset),然后在这个offset的位置进行读取。定位操作是由二分查找法完成的,Kafka在内存中为每个文件维护了offset的范围。

下面是发送给consumer的结果的格式:

MessageSetSend (fetch result)

total length     : 4 bytes
error code       : 2 bytes
message 1        : x bytes
...
message n        : x bytes
MultiMessageSetSend (multiFetch result)

total length       : 4 bytes
error code         : 2 bytes
messageSetSend 1
...
messageSetSend n

删除

日志管理器允许定制删除策略。目前的策略是删除修改时间在N天之前的日志(按时间删除),也可以使用另外一个策略:保留最后的N GB数据的策略(按大小删除)。为了避免在删除时阻塞读操作,采用了copy-on-write形式的实现,删除操作进行时,读取操作的二分查找功能实际是在一个静态的快照副本上进行的,这类似于Java的CopyOnWriteArrayList。

可靠性保证

日志文件有一个可配置的参数M,缓存超过这个数量的消息将被强行刷新到硬盘。一个日志矫正线程将循环检查最新的日志文件中的消息确认每个消息都是合法的。合法的标准为:所有文件的大小的和最大的offset小于日志文件的大小,并且消息的CRC32校验码与存储在消息实体中的校验码一致。如果在某个offset发现不合法的消息,从这个offset到下一个合法的offset之间的内容将被移除。

有两种情况必须考虑:1,当发生崩溃时有些数据块未能写入。2,写入了一些空白数据块。第二种情况的原因是,对于每个文件,操作系统都有一个inode(inode是指在许多“类Unix文件系统”中的一种数据结构。每个inode保存了文件系统中的一个文件系统对象,包括文件、目录、大小、设备文件、socket、管道, 等等),但无法保证更新inode和写入数据的顺序,当inode保存的大小信息被更新了,但写入数据时发生了崩溃,就产生了空白数据块。CRC校验码可以检查这些块并移除,当然因为崩溃而未写入的数据块也就丢失了。

漫游Kafka实现篇之消息和日志,布布扣,bubuko.com

时间: 2025-01-15 08:17:55

漫游Kafka实现篇之消息和日志的相关文章

【转载】Kafka实现篇之消息和日志

http://blog.csdn.net/honglei915/article/details/37760631 消息格式 日志 一个叫做“my_topic”且有两个分区的的topic,它的日志有两个文件夹组成,my_topic_0和my_topic_1,每个文件夹里放着具体的数据文件,每个数据文件都是一系列的日志实体,每个日志实体有一个4个字节的整数N标注消息的长度,后边跟着N个字节的消息.每个消息都可以由一个64位的整数offset标注,offset标注了这条消息在发送到这个分区的消息流中的

漫游Kafka设计篇之消息传输的事务定义

之前讨论了consumer和producer是怎么工作的,现在来讨论一下数据传输方面.数据传输的事务定义通常有以下三种级别: 最多一次: 消息不会被重复发送,最多被传输一次,但也有可能一次不传输. 最少一次: 消息不会被漏发送,最少被传输一次,但也有可能被重复传输. 精确的一次(Exactly once):  不会漏传输也不会重复传输,每个消息都传输被一次而且仅仅被传输一次,这是大家所期望的. 大多数消息系统声称可以做到“精确的一次”,但是仔细阅读它们的的文档可以看到里面存在误导,比如没有说明当

漫游Kafka入门篇之简单介绍

原文地址:http://blog.csdn.net/honglei915/article/details/37564521 介绍 Kafka是一个分布式的.可分区的.可复制的消息系统.它提供了普通消息系统的功能,但具有自己独特的设计.这个独特的设计是什么样的呢? 首先让我们看几个基本的消息系统术语: Kafka将消息以topic为单位进行归纳. 将向Kafka topic发布消息的程序成为producers. 将预订topics并消费消息的程序成为consumer. Kafka以集群的方式运行,

漫游Kafka设计篇之效率优化

原文地址:http://blog.csdn.net/honglei915/article/details/37564757 Kafka在提高效率方面做了很大努力.Kafka的一个主要使用场景是处理网站活动日志,吞吐量是非常大的,每个页面都会产生好多次写操作.读方面,假设每个消息只被消费一次,读的量的也是很大的,Kafka也尽量使读的操作更轻量化. 我们之前讨论了磁盘的性能问题,线性读写的情况下影响磁盘性能问题大约有两个方面:太多的琐碎的I/O操作和太多的字节拷贝.I/O问题发生在客户端和服务端之

漫游kafka实战篇之搭建Kafka开发环境

转载注明出处:http://blog.csdn.net/honglei915/article/details/37563647 上篇文章中我们搭建了kafka的服务器,并可以使用Kafka的命令行工具创建topic,发送和接收消息.下面我们来搭建kafka的开发环境. 添加依赖 搭建开发环境需要引入kafka的jar包,一种方式是将Kafka安装包中lib下的jar包加入到项目的classpath中,这种比较简单了.不过我们使用另一种更加流行的方式:使用maven管理jar包依赖. 创建好mav

漫游Kafka设计篇之数据持久化

不要畏惧文件系统! Kafka大量依赖文件系统去存储和缓存消息.对于硬盘有个传统的观念是硬盘总是很慢,这使很多人怀疑基于文件系统的架构能否提供优异的性能.实际上硬盘的快慢完全取决于使用它的方式.设计良好的硬盘架构可以和内存一样快.在6块7200转的SATA RAID-5磁盘阵列的线性写速度差不多是600MB/s,但是随即写的速度却是100k/s,差了差不多6000倍.现代的操作系统都对次做了大量的优化,使用了 read-ahead 和 write-behind的技巧,读取的时候成块的预读取数据,

漫游Kafka设计篇之Producer和Consumer

Kafka Producer 消息发送 producer直接将数据发送到broker的leader(主节点),不需要在多个节点进行分发.为了帮助producer做到这点,所有的Kafka节点都可以及时的告知:哪些节点是活动的,目标topic目标分区的leader在哪.这样producer就可以直接将消息发送到目的地了. 客户端控制消息将被分发到哪个分区.可以通过负载均衡随机的选择,或者使用分区函数.Kafka允许用户实现分区函数,指定分区的key,将消息hash到不同的分区上(当然有需要的话,也

漫游Kafka实战篇之客户端API

Kafka Producer APIs 旧版的Procuder API有两种:kafka.producer.SyncProducer和kafka.producer.async.AsyncProducer.它们都实现了同一个接口: [java] view plaincopy class Producer { /* 将消息发送到指定分区 */ public void send(kafka.javaapi.producer.ProducerData<K,V> producerData); /* 批量

(转)漫游Kafka入门篇之简单介绍

转自:http://blog.csdn.net/honglei915/article/details/37564521 原文地址:http://blog.csdn.net/honglei915/article/details/37564521 介绍 Kafka是一个分布式的.可分区的.可复制的消息系统.它提供了普通消息系统的功能,但具有自己独特的设计.这个独特的设计是什么样的呢? 首先让我们看几个基本的消息系统术语: Kafka将消息以topic为单位进行归纳. 将向Kafka topic发布消