既然包名是api,说明里面肯定都是一些常用的Kafka API了。
一、ApiUtils.scala
顾名思义,就是一些常见的api辅助类,定义的方法包括:
1. readShortString: 从一个ByteBuffer中读取字符串长度和字符串。这个ByteBuffer的格式应该是:2个字节的字符串长度值N+N个字节的字符串
2. writeShortString: 与readShortString相反,先写入2个字节的长度N,然后写入N个字节到ByteBuffer中
3. shortStringLength: 获取符合上面方法中格式的ByteBuffer长度——即2+N
4. readIntInRange: 返回ByteBuffer当前位置出的一个整数值并判断是否在给定的范围内。如果不在直接抛出异常,但其实调用这个方法时总是传入Int.MaxValue,所以通常要是满足的。这个整数值可以代表分区数、分区id、副本数、ISR数或topic数
5. readShortInRange: 与readIntInRange类似,只是这个方法读取一个2字节的short数,这个short数通常都是被用作error code
6. readLongInRange: 与前两个类似,只是它读取一个Long型的数,不过这个方法貌似没有被调用过
二、RequestOrResponse.scala
Kafka中有很多种客户请求(request),该文件定义了一个Request object抽象出了所有请求共同拥有的属性:
OrdinaryConsumerId: 表示follower的副本id
DebuggingConsumerId: 仅供Debug使用
isValidBrokerId: 是否是合法的Broker id,必须是非负值
下面还定义了一个抽象类,这个类特别重要,因为后面所有种类的请求或响应都继承了该类
RequestOrResponse类——即请求或响应类
如果是表示请求,那么子类必须传入一个requestId表示请求的种类(具体种类在RequestKeys中定义);如果是表示响应,那么子类不需要传入任何参数直接调用无参构造函数。这个多功能类定义了4个抽象方法:
1. sizeInBytes: 计算请求或响应所占字节数
2. writeTo: 将请求或响应写入ByteBuffer
3. handleError: 主要用于处理请求时的错误
4. describe: ?只用于请求,返回对该请求的一个描述字符串
三、RequestKeys.scala
定义了所有的请求种类,包括ProduceKey、FetchKey、OffsetsKey等,每种请求都有一个编号。另外还定义了一个Map将请求种类编号与读取请求或响应的函数关联起来以及两个对应的方法分别返回请求种类的名称以及对应的解析函数。
四、GenericRequestAndHeader.scala
一个抽象类,继承了RequestOrResponse类,自然也要实现RequestOrResponse类定义的4个抽象方法
1. writeTo: 写入版本、correlationId、clientId和body
2. sizeInBytes: 2个字节的版本号+4个字节的correlation号+(2 + N)个字节的客户端id+body的字节数
3. toString/describe: 两个方法一起构成了请求的描述字符串
五、GenericResponseAndHeader.scala
与GenericRequestAndHeader对应的response类,代码中写的是extends RequestOrResponse(requestId),由于所有response应该是extends RequestOrResponse(),所以我谨慎的怀疑它这里写错了,不过反正requestId也没有在该类中使用。该类因为继承了RequestOrResponse,自然也要实现那4个方法: writeTo,sizeInBytes, toString和describe。这里就不赘述了。
下面将Request和Response组合在一起说了,
六、ProducerRequest.scala/ProducerResponse.scala
在具体说对应的request/response之前,先说说Kafka通用的request和response的结构:(以下大部分内容来自:https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-ProduceRequest)
RequestOrResponse => size + (requestMessage 或者 responseMessage)。其中size是一个32位的整数,表明后面request或response的长度。
Request格式 => ApiKey + ApiVersion + CorrelationId + ClientId + RequestMessage
——ApiKey: SHOTRT类型的整数,标识这个request的类型,比如是metadata request、producer request还是fectch request等,具体定义在RequestKeys.scala中
——ApiVersion: SHORT类型的整数,主要用于迭代升级使用,目前的值是0,比如说增加一些request的字段,版本变为1等。不过目前统一是0
——CorrelationId: 4个字节的整数,在服务器端和客户端关联reponse使用
——ClientId: 用户自定义的一个名称,可用于记录日志、监控使用。比如监控不同应用产生的请求数
Response格式 => CorrelationId + ResponseMessage
可以看出,response的格式明显比request来的简明。上面2个组成部分的含义都很清晰就不再详细说了。
okay,既然说了request和response的一些共同的格式,下面我们展开说一些具体的request和response
Metadata API
Kafka有提供了几类API,其中一类API可以查询一些元数据信息,比如
- 集群中有哪些topic?
- 每个topic有多少分区?
- 每个分区当前的leader是哪个broker?
- 这些broker的host/port是什么?
于是,这类API会发送Metadata request给Kafka的集群并从集群处获得对应的response。需要注意的是,这类API发送的请求能够被集群中任意一个broker处理的,其他API的request没有这样的能力!
客户发出请求后Kafka也不总是返回所有的topic,客户可以提供一个topic列表选择感兴趣的topic。在开始看Metadata request代码之前,我们有必要先看一下Topic元数据是怎么定义的。
TopicMetadata.scala
这个scala文件结构非常鲜明,两组伴生对象,分别定义了topic级别的metadata和partition级别的metadata。先看Partition级别的metadata。
构造函数
partitionId —— partition号
leader —— 该partition的leader broker,可能是空
replicas —— 该partition的所有副本集合
isr —— 该partition的ISR集合
errorCode —— 错误码,初始为NoError
类方法
sizeInBytes —— 该metadata信息总字节数,包括2个字节的错误码+4个字节的分区号+4个字节的leader号+(4+副本集合大小N1* 4) + (4 + ISR集合大小N2*4)。每对括号中第一个4表示集合长度字节,里面的值就是N1或N2
writeTo —— 按照错误码、分区号、leaderId、副本集合大小、副本集合所有ID、ISR大小、ISR集合所有ID的顺序写入ByteBuffer。特别注意的是,如果没有leader,直接写入-1,表示leader node不存在
toString —— 将以上信息组成字符串输出
formatBroker —— 拼成这样的字符串: broker ID + (broker host : broker : port)
而PartitionMetadata object只提供了一个readFrom方法,从一段ByteBuffer中按照writeTo中的顺序读取各种信息封装到一个PartitionMetadata实例返回。
定义好了partition metadata,topic的metadata就比较简单了,只包含topic信息、一组PartitionMetadata和一个ErrorCode。它的方法包括:
sizeInBytes —— 总的字节数 = 2 + size(topic) + 4 + 集合中所有partitionmetadata字节数
writeTo —— 按照error code、topic、partition数、partitionmetadata集合元素的顺序写入ByteBuffer
toString —— 构造一个供打印输出的字符串
TopicMetadata object也只定义了一个readFrom方法,按照writeTo写入的方法读取到一个ByteBuffer。
好了,现在可以说ProducerRequest/ProducerResponse了。
TopicMetadataRequest的代码结构和其他request很类似,包括writeTo、sizeInBytes、toString和describe方法等以及对应object定义的readFrom方法。这个类允许用户提供了一个Topic集合以获取指定topic的元数据。如果发生错误,handleError方法会创建一组错误response,然后通过RequestChannel的sendResponse方法返回给客户。
TopicMetadataResponse.scala
response比request要简单得多,类只定义了sizeInBytes和writeTo方法,而object还是定义了readFrom方法来读取response信息
七、UpdateMetadataRequest.scala/UpdateMetadataResponse.scala
既然能够查询元数据,自然也要有更新元数据的API。该request除了公共的信息还包含controller id,controller_epoch,topic+partition -> partition状态的映射以及当前可用的Broker。自然writeTo和readFrom时候都要把这些额外的信息加进去。
八、ConsumerMetadataRequest.scala/ConsumerMetaResponse.scala
consumer的元数据request/response,大多都是公共的request字段,不过有一个需要注意的是group,表示consumer group。另外如果response返回的Broker的host是空,port是-1表示这是一个假的broker——其实就是表示没有broker的意思。
九、ControlledShutdownRequest.scala/ControlledShutdownResponse.scala
关闭一个broker的request和response
十、HeartbeatRequestAndHeader.scala/HeartbeatResponseAndHeader.scala
从名字上看,很像是维持心跳的request和response,不过貌似代码中没有使用到
十一、JoinGroupRequestAndHeader.scala/JoinGroupResponseAndHeader.scala
貌似代码中也没有用到
十二、LeaderAndIsrRequest.scala/LeaderAndIsrResponse.scala
LeaderAndIsrRequest.scala中其实定义了三组伴生对象: LeaderAndIsr、PartitionStateInfo和LeaderAndIsrRequest。LeaderAndIsr定义了一个leader以及leader epoch, 一组ISR集合以及对应的zookeeper版本,貌似变更leader或ISR时会将leader_epoch值加一。
再说PartitionStateInfo伴生对象,它包含了该分区的AR集合以及一个LeaderIsrAndControllerEpoch实例,后者简单来说就是保存了leader、isr和controller_epoch信息——后者在controller状态变更时会加1。PartitionStateInfo作为LeaderAndIsrRequest的一部分,也维持了与request/response类似的代码结构,即提供了writeTo、readFrom、sizeInBytes等方法。其中写入/读取的顺序是controller_epoch, leader, leader_epoch, ISR size, ISR set, zkVersion, AR size, AR set。
最后就是LeaderAndIsrRequest伴生对象了,除了常见的request信息之外它还包括了controllerId, controller_epoch以及一组leader和一组PartitionStateInfo信息。而LeaderAndIsrResponse则返回一个映射,key是topic+partition,value是对应的错误码
十三、StopReplicaRequest.scala/StopReplicaResponse.scala
关闭一组分区的副本的request和response。提request时候还需要额外提供controllerId、controller_epoch和副本所在的分区集合以及一个bool值表明是否删除这些分区。
十四、ProducerRequest.scala/ProducerResponse.scala
从这组request,response开始都是比较重要的Kafka请求了。客户端使用producer API提交发送请求发送消息集合给服务器。Kafka允许一次发送属于多个topic分区的消息。Producer request格式如下:
versionId(2Byte) + correlationId(4Byte) + clientId(2Byte + size(clientId)) + requiredAcks(2Byte) + ackTimeoutMs(4Byte) + Topics (Partitions + MessageSetSize + MessageSet)
几个字段重点说一下,
requiredAcks —— 服务器在响应请求前需接收应答的次数。如果是0,服务器不会响应请求;如果是1,表示服务器会等待数据被写入到本地log然后再发送response;如果是-1,那么就要等ISR中所有副本都提交了才发送response。这个值由属性request.required.acks控制
ackTimeoutMs —— 等待应答的最大超时时间,由属性request.timeout.ms指定,默认是10秒。这只是个近似值,因为很多元素都没有被包含在这个超时间隔内。比如它并不包括网络的延时,也不会计算request在队列中的等待时间等。如果要精确地计算这些部分的时间,还是使用Socket的超时比较好
与ProducerRequest对应的响应类就是ProducerResponse——它的格式如下:
correlationId + topic count + [topic + partition count + [partitionId + errorCode + nextOffset]*]*
每个partition都有自己的errorCode,nextOffset表示消息集合的第一条消息的offset
十五、FetchRequest.scala/FetchResponse.scala
Fetch API用于获取一些topic分区的一条或多条消息,只需要客户段代码指定topic、分区和开始获取的起始位移即可。通常来说,返回的消息的位移一般都不小于给定的起始位移。但是如果是压缩消息,那么就有可能比起始位移小。这种消息不会太多,因为fetch api的调用者需要自己来过滤掉这些消息。
FetchRequest的格式如下:
versionId + correlationId + clientId + replicaId + maxWait + minBytes + topic count + [topic + partition count + [partitionId + offset + fetchSize]]*
replicaId —— 发起请求的副本所在节点的ID,通常使用时总是要将其设置为-1
maxWait / MinBytes —— 如果maxWait设置为100ms、MinBytes是64KB的话,Kafka server会等待100ms来收集64KB大小的response数据。
FetchResponse与其他的response相比有些特殊,它为不同的子部分定义了两个类:TopicData和PartitionData,因此格式如下:
corrleationId + topic number + [topic + [partitionId errorCode highWaterOffset MessageSetSize MessageSet ]* ]*
值得一提的是,因为这个response可能返回大量的数据,所以Kafka在构建这个reponse的时候使用了sendfile的机制(交由java.NIO包FileChannel来做)
十六、OffsetRequest.scala/OffsetResponse.scala
这个API主要用于为一组topic分区获取合法的offset范围,和produce和fetch api一样,offset请求也必须发送到分区的leader broker处理——当然可以使用metadata api来获取leader broker id。OffsetResponse返回的是所请求分区每个日志段的起始位移以及日志结束位移(也就是下一条消息被追加到分区的位移)
OffsetRequest中有两个比较重要的概念: LatestTime和EarliestTime,分别为-1和-2。它们和属性auto.offset.reset关系也很亲密,分别对应于largest和smallest。其中largest表示自动地重试位移到最大位移;smallest表示自动地重设位移为最小位移。
OffsetRequest格式如下:
versionId + correlationId + clientId + replicaId + topic count + [topic + partition count + [partitionId + partition time + maxNumOffsets]* ]*
Kafka专门创建了一个PartitionOffsetRequestInfo 类来保存partition time + maxNumOffset。指定Partition time(单位是ms)表示要请求所有在该时间点之前的消息,比如说可以指定OffsetRequest.LatestTime表示请求当前所有消息。
OffsetResponse比较简单,格式如下:
correlationId + topic count + [ topic + partition count + [partition Id + error Code + offset数组长度 + 每个offset]* ]*
返回的offset数组就是某个分区下每个日志段的起始位移。
十七、OffsetCommitRequest.scala/OffsetCommitResponse.scala
这个request与下面要说的OffsetFetchRequest api都是用于集中式管理位移的。其中OffsetCommitRequest格式如下:
versionId + correlationId + clientId + consumer group Id + consumer generationId(0.8.2以后新加的) + consumer id(0.8.2以后新加的) + topic count + [topic + partition count + [partitionId + offset + timestamp(0.8.2以后新加) + metadata]* ]*
其中,offset,timestamp和metadata都是OffsetAndMetadata类提供的信息。
OffsetCommitResponse就要简单的多,它只有一个版本的格式:
correlationId + topic count + [topic + partition count + [partitionId + errorCode]* ]*
十八、OffsetFetchRequest.scala/OffsetFetchResponse.scala
顾名思义,获取offset信息的request,格式如下:
versionId + correlationId + clientId + consumer group id + topic count + [ topic + partition count + [ partitionId ]* ]*
虽然只有一个格式,但需要注意的是0.8.2以前都是从zookeeper中读offset,从0.8.2之后从kafka中读取offset
OffsetFetchResponse格式如下:
correlationId + topic count + [ topic + partition count + [ partitionId + offset + medata + errorCode]* ]*
其中offset和metadata是保存在OffsetAndMetadata实例中。