kafka从0.11.0版本开始所使用的消息格式版本为v2,这个版本的消息相比于v0和v1的版本而言改动很大,同时还参考了Protocol Buffer而引入了变长整型(Varints)和ZigZag编码。为了更加形象的说明问题,首先我们来了解一下变长整型。
Varints是使用一个或多个字节来序列化整数的一种方法。数值越小,其所占用的字节数就越少。Varints中每个字节都有一个位于最高位的msb位(most significant bit),除了最后一个字节外其余msb位都设置为1,最后一个字节的msb位设置为0。这个msb位表示其后的字节是否和当前字节一起来表示同一个整数。除msb位之外,剩余的7位用于存储数据本身,这种表示类型又称之为Base 128。通常而言,一个字节8位可以表示256个值,所以称之为Base 256,而这里只能用7位表示,2的7次方即为128。Varints中采用小端字节序,即最小的字节放在最前面。
举个例子,比如数字1,它只占一个字节,所以msb位为0:
0000 0001
再举一个复杂点的例子,如数字300:
1010 1100 0000 0010
300的二进制表示原本为:0000 0001 0010 1100 = 256+32+8+4=300,那么为什么300的变长表示为上面的这种形式?
首先我们先去掉每个字节的msb位,表示如下:
1010 1100 0000 0010
-> 010 1100 000 0010
如前所述,varints是小端字节序的布局方式,所以这里两个字节的位置需要翻转一下:
010 1100 000 0010
-> 000 0010 010 1100 (翻转)
-> 000 0010 ++ 010 1100
-> 0000 0001 0010 1100 = 256+32+8+4=300
Varints可以用来表示int32、int64、uint32、uint64、sint32、sint64、bool、enum等类型。在实际使用过程中,如果当前字段可以表示为负数,那么对于int32/int64和sint32/sint64而言,它们在进行编码时存在着较大的区别。比如你使用int64表示一个负数,那么哪怕是-1,其编码后的长度始终为10个字节(可以通过下面的代码来测试长度),就如同对待一个很大的无符号长整型一样。为了使得编码更加的高效,Varints使用了ZigZag的编码方式。
public int sizeOfLong(int v) {
int bytes = 1;
while ((v & 0xffffffffffffff80L) != 0L) {
bytes += 1;
v >>>= 7;
}
return bytes;
}
ZigZag编码以一种锯齿形(zig-zags)的方式来回穿梭于正负整数之间,以使得带符号整数映射为无符号整数,这样可以使得绝对值较小的负数仍然享有较小的Varints编码值,比如-1编码为1,1编码为2,-2编码为3,参考下表:
对应的公式为:
(n << 1) ^ (n >> 31)
这是对于sint32而言的,sint64对应的公式为:
(n << 1) ^ (n >> 63)
以-1为例,其二进制表现形式为:1111 1111 1111 1111 1111 1111 1111 1111(补码)。
(n << 1) = 1111 1111 1111 1111 1111 1111 1111 1110
(n >> 31) = 0000 0000 0000 0000 0000 0000 0000 0001
(n << 1) ^ (n >> 31) = 1
最终-1的Varints编码为0000 0001,这样原本用4字节表示的-1现在可以用1个字节来表示了。对于1而言就显得非常简单了,其二进制表现形式为:0000 0000 0000 0000 0000 0000 0000 0001。
(n << 1) = 0000 0000 0000 0000 0000 0000 0000 0010
(n >> 31) = 0000 0000 0000 0000 0000 0000 0000 0000
(n << 1) ^ (n >> 31) = 2
最终1的Varints编码为0000 0010,也只占用一个字节。
前面说过Varints中的一个字节中只有7位是有效数值位,即只能表示128个数值,转变成绝对值之后其实质上只能表示64个数值。比如对于消息体长度而言,其值肯定是个大于等于0的正整数,那么一个字节长度的Varints最大只能表示63(从0开始计)。对于64而言,其二进制表示为:
0100 0000
经过ZigZag处理后为:
1000 0000 ^ 0000 0000 = 1000 0000
每个字节的低7位是有效数值位,所以1000 0000进一步转变为:
000 0001 000 0000
而Varints又是小端字节序,所以需要翻转一下位置:
000 0000 000 0001
设置非最后一个字节的msb位为1,最后一个字节的msb位为0,最终有:
1000 0000 0000 0001
所以最终64表示为:1000 0000 0000 0001,而63却表示为:0111 1110。
具体的编码实现如下(针对int32类型):
public static void writeVarint(int value, ByteBuffer buffer) {
int v = (value << 1) ^ (value >> 31);
while ((v & 0xffffff80) != 0L) {
byte b = (byte) ((v & 0x7f) | 0x80);
buffer.put(b);
v >>>= 7;
}
buffer.put((byte) v);
}
对应的解码实现如下(针对int32类型):
public static int readVarint(ByteBuffer buffer) {
int value = 0;
int i = 0;
int b;
while (((b = buffer.get()) & 0x80) != 0) {
value |= (b & 0x7f) << i;
i += 7;
if (i > 28)
throw illegalVarintException(value);
}
value |= b << i;
return (value >>> 1) ^ -(value & 1);
}
回顾一下kafka v0和v1版本的消息格式,如果消息本身没有key,那么key length字段为-1,int类型的需要4个字节来保存,而如果采用Varints来编码则只需要一个字节。根据Varints的规则可以推导出0-63之间的数字占1个字节,64-8191之间的数字占2个字节,8192-1048575之间的数字占3个字节。而kafka broker的配置message.max.bytes的默认大小为1000012(Varints编码占3个字节),如果消息格式中与长度有关的字段采用Varints的编码的话,绝大多数情况下都会节省空间,而v2版本的消息格式也正是这样做的。
不过需要注意的是Varints并非一直会省空间,一个int32最长会占用5个字节(大于默认的4字节),一个int64最长会占用10字节(大于默认的8字节)。下面代码展示如何计算一个int32占用的字节个数:
public static int sizeOfVarint(int value) {
int v = (value << 1) ^ (value >> 31);
int bytes = 1;
while ((v & 0xffffff80) != 0L) {
bytes += 1;
v >>>= 7;
}
return bytes;
}
有关int32/int64的更多实现细节可以参考org.apache.kafka.common.utils.ByteUtils。
原文地址:https://blog.51cto.com/14230003/2399037