Kafka 消息格式中的变长字段(Varints)

kafka从0.11.0版本开始所使用的消息格式版本为v2,这个版本的消息相比于v0和v1的版本而言改动很大,同时还参考了Protocol Buffer而引入了变长整型(Varints)和ZigZag编码。为了更加形象的说明问题,首先我们来了解一下变长整型。

Varints是使用一个或多个字节来序列化整数的一种方法。数值越小,其所占用的字节数就越少。Varints中每个字节都有一个位于最高位的msb位(most significant bit),除了最后一个字节外其余msb位都设置为1,最后一个字节的msb位设置为0。这个msb位表示其后的字节是否和当前字节一起来表示同一个整数。除msb位之外,剩余的7位用于存储数据本身,这种表示类型又称之为Base 128。通常而言,一个字节8位可以表示256个值,所以称之为Base 256,而这里只能用7位表示,2的7次方即为128。Varints中采用小端字节序,即最小的字节放在最前面。

举个例子,比如数字1,它只占一个字节,所以msb位为0:

0000 0001

再举一个复杂点的例子,如数字300:

1010 1100 0000 0010

300的二进制表示原本为:0000 0001 0010 1100 = 256+32+8+4=300,那么为什么300的变长表示为上面的这种形式?

首先我们先去掉每个字节的msb位,表示如下:

1010 1100 0000 0010
-> 010 1100 000 0010

如前所述,varints是小端字节序的布局方式,所以这里两个字节的位置需要翻转一下:

010 1100 000 0010
-> 000 0010 010 1100 (翻转)
-> 000 0010 ++ 010 1100
-> 0000 0001 0010 1100 = 256+32+8+4=300

Varints可以用来表示int32、int64、uint32、uint64、sint32、sint64、bool、enum等类型。在实际使用过程中,如果当前字段可以表示为负数,那么对于int32/int64和sint32/sint64而言,它们在进行编码时存在着较大的区别。比如你使用int64表示一个负数,那么哪怕是-1,其编码后的长度始终为10个字节(可以通过下面的代码来测试长度),就如同对待一个很大的无符号长整型一样。为了使得编码更加的高效,Varints使用了ZigZag的编码方式。

public int sizeOfLong(int v) {
    int bytes = 1;
    while ((v & 0xffffffffffffff80L) != 0L) {
        bytes += 1;
        v >>>= 7;
    }
    return bytes;
}

ZigZag编码以一种锯齿形(zig-zags)的方式来回穿梭于正负整数之间,以使得带符号整数映射为无符号整数,这样可以使得绝对值较小的负数仍然享有较小的Varints编码值,比如-1编码为1,1编码为2,-2编码为3,参考下表:

对应的公式为:

(n << 1) ^ (n >> 31)

这是对于sint32而言的,sint64对应的公式为:

(n << 1) ^ (n >> 63)

以-1为例,其二进制表现形式为:1111 1111 1111 1111 1111 1111 1111 1111(补码)。

(n << 1) = 1111 1111 1111 1111 1111 1111 1111 1110
(n >> 31) = 0000 0000 0000 0000 0000 0000 0000 0001
(n << 1) ^ (n >> 31) = 1

最终-1的Varints编码为0000 0001,这样原本用4字节表示的-1现在可以用1个字节来表示了。对于1而言就显得非常简单了,其二进制表现形式为:0000 0000 0000 0000 0000 0000 0000 0001。

(n << 1) = 0000 0000 0000 0000 0000 0000 0000 0010
(n >> 31) = 0000 0000 0000 0000 0000 0000 0000 0000
(n << 1) ^ (n >> 31) = 2
最终1的Varints编码为0000 0010,也只占用一个字节。

前面说过Varints中的一个字节中只有7位是有效数值位,即只能表示128个数值,转变成绝对值之后其实质上只能表示64个数值。比如对于消息体长度而言,其值肯定是个大于等于0的正整数,那么一个字节长度的Varints最大只能表示63(从0开始计)。对于64而言,其二进制表示为:

0100 0000

经过ZigZag处理后为:

1000 0000 ^ 0000 0000 = 1000 0000

每个字节的低7位是有效数值位,所以1000 0000进一步转变为:

000 0001 000 0000

而Varints又是小端字节序,所以需要翻转一下位置:

000 0000 000 0001

设置非最后一个字节的msb位为1,最后一个字节的msb位为0,最终有:

1000 0000 0000 0001

所以最终64表示为:1000 0000 0000 0001,而63却表示为:0111 1110。

具体的编码实现如下(针对int32类型):

public static void writeVarint(int value, ByteBuffer buffer) {
    int v = (value << 1) ^ (value >> 31);
    while ((v & 0xffffff80) != 0L) {
        byte b = (byte) ((v & 0x7f) | 0x80);
        buffer.put(b);
        v >>>= 7;
    }
    buffer.put((byte) v);
}

对应的解码实现如下(针对int32类型):

public static int readVarint(ByteBuffer buffer) {
    int value = 0;
    int i = 0;
    int b;
    while (((b = buffer.get()) & 0x80) != 0) {
        value |= (b & 0x7f) << i;
        i += 7;
        if (i > 28)
            throw illegalVarintException(value);
    }
    value |= b << i;
    return (value >>> 1) ^ -(value & 1);
}

回顾一下kafka v0和v1版本的消息格式,如果消息本身没有key,那么key length字段为-1,int类型的需要4个字节来保存,而如果采用Varints来编码则只需要一个字节。根据Varints的规则可以推导出0-63之间的数字占1个字节,64-8191之间的数字占2个字节,8192-1048575之间的数字占3个字节。而kafka broker的配置message.max.bytes的默认大小为1000012(Varints编码占3个字节),如果消息格式中与长度有关的字段采用Varints的编码的话,绝大多数情况下都会节省空间,而v2版本的消息格式也正是这样做的。

不过需要注意的是Varints并非一直会省空间,一个int32最长会占用5个字节(大于默认的4字节),一个int64最长会占用10字节(大于默认的8字节)。下面代码展示如何计算一个int32占用的字节个数:

public static int sizeOfVarint(int value) {
    int v = (value << 1) ^ (value >> 31);
    int bytes = 1;
    while ((v & 0xffffff80) != 0L) {
        bytes += 1;
        v >>>= 7;
    }
    return bytes;
}

有关int32/int64的更多实现细节可以参考org.apache.kafka.common.utils.ByteUtils。

原文地址:https://blog.51cto.com/14230003/2399037

时间: 2024-10-08 05:48:47

Kafka 消息格式中的变长字段(Varints)的相关文章

一文看懂Kafka消息格式的演变

摘要 对于一个成熟的消息中间件而言,消息格式不仅关系到功能维度的扩展,还牵涉到性能维度的优化.随着Kafka的迅猛发展,其消息格式也在不断的升级改进,从0.8.x版本开始到现在的1.1.x版本,Kafka的消息格式也经历了3个版本.本文这里主要来讲述Kafka的三个版本的消息格式的演变,文章偏长,建议先关注后鉴定. Kafka根据topic(主题)对消息进行分类,发布到Kafka集群的每条消息都需要指定一个topic,每个topic将被分为多个partition(分区).每个partition在

设计表的时候,对变长字段长度选择的一点思考

不管是在MSSQL还是MySQL或者Oracle,变长字段的长度衡量都是要经常面对的.对于一个变长的字段,在满足业务的情况下(其实所谓的满足业务是一个比较模糊的东西),到底是选择varchar(50)还是varchar(200)亦或是varchar(500)?对于保守型选择,往往是选择一个较大的长度,比如varchar(500)要比varchar(50)更具有兼容性,因为是变长字段的原因,存储空间也一样.这样的选择并不能说就不好,看站在哪个角度来看问题.那么,相对于varchar(50),var

Objective-C中实现变长参数问题

版权声明:本文为博主原创文章,未经博主允许不得转载. ObjC中没有提供直接的变长参数方法,需要使用C标准库中的av_list方法,简单使用如下: -(void)somethingForyou:(NSString *)vString,....{ va_list varList; id arg; NSMutableArray *argsArray = [[NSMutableArray alloc]inti]; if(vString){ va_start(varList,vString); whil

【Unix环境高级编程】编写变长参数函数

文件的格式输入输出函数都支持变长参数.定义时,变长参数列表通过省略号'...'表示, 因此函数定义格式为: type 函数名(parm1, parm2,parmN,...); Unix的变长参数通过va_list对象实现,定义在文件'stdarg.h'中,变长参数的应用模板如下所示: #include <stdarg.h> function(parmN,...){ va_list pvar; ................................. va_start(pvar,par

源码分析 Kafka 消息发送流程(文末附流程图)

温馨提示:本文基于 Kafka 2.2.1 版本.本文主要是以源码的手段一步一步探究消息发送流程,如果对源码不感兴趣,可以直接跳到文末查看消息发送流程图与消息发送本地缓存存储结构. 从上文 初识 Kafka Producer 生产者,可以通过 KafkaProducer 的 send 方法发送消息,send 方法的声明如下: Future<RecordMetadata> send(ProducerRecord<K, V> record) Future<RecordMetada

C99新增内容之变长数组(VLA)

我们在使用多维数组是有一点,任何情况下只能省略第一维的长度.比如在函数中要传一个数组时,数组的行可以在函数调用时传递,当属数组的列却只能在能被预置在函数内部.看下面一个例子: #define COLS 4 int sum2d(int ar[][COLS],int rows) { int r; int c; int tot=0; for(r=0;r<rows;r++) for(c=0;c<COLS;c++) tot+=ar[r][c]; return tot; } 现在假设定义了如下数组: in

变长数组 - 转

http://ericwang.github.io/program/2010/02/10/c_Variable_length_arrays/ C中的Variable length arrays (变长数组) Variable length arrays 是C99的特性,而不是 C++98 的,关于c99标准的变长数组, 在标准的6.7.5.2 Array declarators里面有这样的说明: 2.Only ordinary identifiers (as defined in 6.2.3)

有关变长数组(VLA)(转)

原链接:http://blog.chinaunix.net/uid-24347760-id-1989578.html 在突然听到这个名词后,在网上搜的. 1. 变长数组是分配在堆栈上的, 其实从语义的角度也应该是这样, 变长数组还是一个数组, 还是一个局部变量, 在c语言中, 局部变量是分配在堆栈上的, malloc才是分配在堆上面的. 这里面没有不存在什么 内存泄漏, 因为堆栈上的内存是不需要程序员管理的   2. 变长数组和其他变量共同存在于一个作用域之内时, 变长数组是在最后分配的, 也就

【小白学Lua】之Lua变长参数和unpack函数

一.简介 Lua的变长参数和unpack函数在实际的开发中应用的还挺多的,比如在设计print函数的时候,需要支持对多个变量进行打印输出,这时我们就需要用到Lua中的变长参数和unpack函数了. 二.Lua变长参数与unpack函数 Lua中支持可变参数,用 ... 表示.比如定义下面的这样一个函数: local function func1(...) end 当然它也支持在变长参数前面添加固定参数: local function func1(var,...) --dosomething en