【原创】Kakfa api包源代码分析

既然包名是api,说明里面肯定都是一些常用的Kafka API了。

一、ApiUtils.scala

顾名思义,就是一些常见的api辅助类,定义的方法包括:

1. readShortString: 从一个ByteBuffer中读取字符串长度和字符串。这个ByteBuffer的格式应该是:2个字节的字符串长度值N+N个字节的字符串

2. writeShortString: 与readShortString相反,先写入2个字节的长度N,然后写入N个字节到ByteBuffer中

3. shortStringLength: 获取符合上面方法中格式的ByteBuffer长度——即2+N

4. readIntInRange: 返回ByteBuffer当前位置出的一个整数值并判断是否在给定的范围内。如果不在直接抛出异常,但其实调用这个方法时总是传入Int.MaxValue,所以通常要是满足的。这个整数值可以代表分区数、分区id、副本数、ISR数或topic数

5. readShortInRange: 与readIntInRange类似,只是这个方法读取一个2字节的short数,这个short数通常都是被用作error code

6. readLongInRange: 与前两个类似,只是它读取一个Long型的数,不过这个方法貌似没有被调用过

二、RequestOrResponse.scala

Kafka中有很多种客户请求(request),该文件定义了一个Request object抽象出了所有请求共同拥有的属性:

OrdinaryConsumerId: 表示follower的副本id

DebuggingConsumerId: 仅供Debug使用

isValidBrokerId: 是否是合法的Broker id,必须是非负值

下面还定义了一个抽象类,这个类特别重要,因为后面所有种类的请求或响应都继承了该类

RequestOrResponse类——即请求或响应类

如果是表示请求,那么子类必须传入一个requestId表示请求的种类(具体种类在RequestKeys中定义);如果是表示响应,那么子类不需要传入任何参数直接调用无参构造函数。这个多功能类定义了4个抽象方法:

1. sizeInBytes: 计算请求或响应所占字节数

2. writeTo: 将请求或响应写入ByteBuffer

3. handleError: 主要用于处理请求时的错误

4. describe: ?只用于请求,返回对该请求的一个描述字符串

三、RequestKeys.scala

定义了所有的请求种类,包括ProduceKey、FetchKey、OffsetsKey等,每种请求都有一个编号。另外还定义了一个Map将请求种类编号与读取请求或响应的函数关联起来以及两个对应的方法分别返回请求种类的名称以及对应的解析函数。

四、GenericRequestAndHeader.scala

一个抽象类,继承了RequestOrResponse类,自然也要实现RequestOrResponse类定义的4个抽象方法

1. writeTo: 写入版本、correlationId、clientId和body

2. sizeInBytes: 2个字节的版本号+4个字节的correlation号+(2 + N)个字节的客户端id+body的字节数

3. toString/describe: 两个方法一起构成了请求的描述字符串

五、GenericResponseAndHeader.scala

与GenericRequestAndHeader对应的response类,代码中写的是extends RequestOrResponse(requestId),由于所有response应该是extends RequestOrResponse(),所以我谨慎的怀疑它这里写错了,不过反正requestId也没有在该类中使用。该类因为继承了RequestOrResponse,自然也要实现那4个方法: writeTo,sizeInBytes, toString和describe。这里就不赘述了。

下面将Request和Response组合在一起说了,

六、ProducerRequest.scala/ProducerResponse.scala

在具体说对应的request/response之前,先说说Kafka通用的request和response的结构:(以下大部分内容来自:https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-ProduceRequest)

RequestOrResponse => size + (requestMessage 或者 responseMessage)。其中size是一个32位的整数,表明后面request或response的长度。

Request格式 => ApiKey + ApiVersion + CorrelationId + ClientId + RequestMessage

——ApiKey: SHOTRT类型的整数,标识这个request的类型,比如是metadata request、producer request还是fectch request等,具体定义在RequestKeys.scala中

——ApiVersion: SHORT类型的整数,主要用于迭代升级使用,目前的值是0,比如说增加一些request的字段,版本变为1等。不过目前统一是0

——CorrelationId: 4个字节的整数,在服务器端和客户端关联reponse使用

——ClientId: 用户自定义的一个名称,可用于记录日志、监控使用。比如监控不同应用产生的请求数

Response格式 => CorrelationId + ResponseMessage

可以看出,response的格式明显比request来的简明。上面2个组成部分的含义都很清晰就不再详细说了。

okay,既然说了request和response的一些共同的格式,下面我们展开说一些具体的request和response

Metadata API

Kafka有提供了几类API,其中一类API可以查询一些元数据信息,比如

  • 集群中有哪些topic?
  • 每个topic有多少分区?
  • 每个分区当前的leader是哪个broker?
  • 这些broker的host/port是什么?

于是,这类API会发送Metadata request给Kafka的集群并从集群处获得对应的response。需要注意的是,这类API发送的请求能够被集群中任意一个broker处理的,其他API的request没有这样的能力!

客户发出请求后Kafka也不总是返回所有的topic,客户可以提供一个topic列表选择感兴趣的topic。在开始看Metadata request代码之前,我们有必要先看一下Topic元数据是怎么定义的。

TopicMetadata.scala

这个scala文件结构非常鲜明,两组伴生对象,分别定义了topic级别的metadata和partition级别的metadata。先看Partition级别的metadata。

构造函数

partitionId —— partition号

leader —— 该partition的leader broker,可能是空

replicas —— 该partition的所有副本集合

isr —— 该partition的ISR集合

errorCode —— 错误码,初始为NoError

类方法

sizeInBytes —— 该metadata信息总字节数,包括2个字节的错误码+4个字节的分区号+4个字节的leader号+(4+副本集合大小N1* 4) + (4 + ISR集合大小N2*4)。每对括号中第一个4表示集合长度字节,里面的值就是N1或N2

writeTo —— 按照错误码、分区号、leaderId、副本集合大小、副本集合所有ID、ISR大小、ISR集合所有ID的顺序写入ByteBuffer。特别注意的是,如果没有leader,直接写入-1,表示leader node不存在

toString —— 将以上信息组成字符串输出

formatBroker —— 拼成这样的字符串: broker ID + (broker host : broker : port)

而PartitionMetadata object只提供了一个readFrom方法,从一段ByteBuffer中按照writeTo中的顺序读取各种信息封装到一个PartitionMetadata实例返回。

定义好了partition metadata,topic的metadata就比较简单了,只包含topic信息、一组PartitionMetadata和一个ErrorCode。它的方法包括:

sizeInBytes —— 总的字节数 = 2 + size(topic) + 4 + 集合中所有partitionmetadata字节数

writeTo —— 按照error code、topic、partition数、partitionmetadata集合元素的顺序写入ByteBuffer

toString —— 构造一个供打印输出的字符串

TopicMetadata object也只定义了一个readFrom方法,按照writeTo写入的方法读取到一个ByteBuffer。

好了,现在可以说ProducerRequest/ProducerResponse了。

TopicMetadataRequest的代码结构和其他request很类似,包括writeTo、sizeInBytes、toString和describe方法等以及对应object定义的readFrom方法。这个类允许用户提供了一个Topic集合以获取指定topic的元数据。如果发生错误,handleError方法会创建一组错误response,然后通过RequestChannel的sendResponse方法返回给客户。

TopicMetadataResponse.scala

response比request要简单得多,类只定义了sizeInBytes和writeTo方法,而object还是定义了readFrom方法来读取response信息

七、UpdateMetadataRequest.scala/UpdateMetadataResponse.scala

既然能够查询元数据,自然也要有更新元数据的API。该request除了公共的信息还包含controller id,controller_epoch,topic+partition -> partition状态的映射以及当前可用的Broker。自然writeTo和readFrom时候都要把这些额外的信息加进去。

八、ConsumerMetadataRequest.scala/ConsumerMetaResponse.scala

consumer的元数据request/response,大多都是公共的request字段,不过有一个需要注意的是group,表示consumer group。另外如果response返回的Broker的host是空,port是-1表示这是一个假的broker——其实就是表示没有broker的意思。

九、ControlledShutdownRequest.scala/ControlledShutdownResponse.scala

关闭一个broker的request和response

十、HeartbeatRequestAndHeader.scala/HeartbeatResponseAndHeader.scala

从名字上看,很像是维持心跳的request和response,不过貌似代码中没有使用到

十一、JoinGroupRequestAndHeader.scala/JoinGroupResponseAndHeader.scala

貌似代码中也没有用到

十二、LeaderAndIsrRequest.scala/LeaderAndIsrResponse.scala

LeaderAndIsrRequest.scala中其实定义了三组伴生对象: LeaderAndIsr、PartitionStateInfo和LeaderAndIsrRequest。LeaderAndIsr定义了一个leader以及leader epoch, 一组ISR集合以及对应的zookeeper版本,貌似变更leader或ISR时会将leader_epoch值加一。

再说PartitionStateInfo伴生对象,它包含了该分区的AR集合以及一个LeaderIsrAndControllerEpoch实例,后者简单来说就是保存了leader、isr和controller_epoch信息——后者在controller状态变更时会加1。PartitionStateInfo作为LeaderAndIsrRequest的一部分,也维持了与request/response类似的代码结构,即提供了writeTo、readFrom、sizeInBytes等方法。其中写入/读取的顺序是controller_epoch, leader, leader_epoch, ISR size, ISR set, zkVersion, AR size, AR set。

最后就是LeaderAndIsrRequest伴生对象了,除了常见的request信息之外它还包括了controllerId, controller_epoch以及一组leader和一组PartitionStateInfo信息。而LeaderAndIsrResponse则返回一个映射,key是topic+partition,value是对应的错误码

十三、StopReplicaRequest.scala/StopReplicaResponse.scala

关闭一组分区的副本的request和response。提request时候还需要额外提供controllerId、controller_epoch和副本所在的分区集合以及一个bool值表明是否删除这些分区。

十四、ProducerRequest.scala/ProducerResponse.scala

从这组request,response开始都是比较重要的Kafka请求了。客户端使用producer API提交发送请求发送消息集合给服务器。Kafka允许一次发送属于多个topic分区的消息。Producer request格式如下:

versionId(2Byte) + correlationId(4Byte) + clientId(2Byte + size(clientId)) + requiredAcks(2Byte) + ackTimeoutMs(4Byte) + Topics (Partitions + MessageSetSize + MessageSet)

几个字段重点说一下,

requiredAcks —— 服务器在响应请求前需接收应答的次数。如果是0,服务器不会响应请求;如果是1,表示服务器会等待数据被写入到本地log然后再发送response;如果是-1,那么就要等ISR中所有副本都提交了才发送response。这个值由属性request.required.acks控制

ackTimeoutMs —— 等待应答的最大超时时间,由属性request.timeout.ms指定,默认是10秒。这只是个近似值,因为很多元素都没有被包含在这个超时间隔内。比如它并不包括网络的延时,也不会计算request在队列中的等待时间等。如果要精确地计算这些部分的时间,还是使用Socket的超时比较好

与ProducerRequest对应的响应类就是ProducerResponse——它的格式如下:

correlationId + topic count + [topic + partition count + [partitionId + errorCode + nextOffset]*]*

每个partition都有自己的errorCode,nextOffset表示消息集合的第一条消息的offset

十五、FetchRequest.scala/FetchResponse.scala

Fetch API用于获取一些topic分区的一条或多条消息,只需要客户段代码指定topic、分区和开始获取的起始位移即可。通常来说,返回的消息的位移一般都不小于给定的起始位移。但是如果是压缩消息,那么就有可能比起始位移小。这种消息不会太多,因为fetch api的调用者需要自己来过滤掉这些消息。

FetchRequest的格式如下:

versionId + correlationId + clientId + replicaId + maxWait + minBytes + topic count + [topic + partition count + [partitionId + offset + fetchSize]]*

replicaId —— 发起请求的副本所在节点的ID,通常使用时总是要将其设置为-1

maxWait / MinBytes —— 如果maxWait设置为100ms、MinBytes是64KB的话,Kafka server会等待100ms来收集64KB大小的response数据。

FetchResponse与其他的response相比有些特殊,它为不同的子部分定义了两个类:TopicData和PartitionData,因此格式如下:

corrleationId + topic number + [topic + [partitionId errorCode highWaterOffset MessageSetSize MessageSet ]* ]*

值得一提的是,因为这个response可能返回大量的数据,所以Kafka在构建这个reponse的时候使用了sendfile的机制(交由java.NIO包FileChannel来做)

十六、OffsetRequest.scala/OffsetResponse.scala

这个API主要用于为一组topic分区获取合法的offset范围,和produce和fetch api一样,offset请求也必须发送到分区的leader broker处理——当然可以使用metadata api来获取leader broker id。OffsetResponse返回的是所请求分区每个日志段的起始位移以及日志结束位移(也就是下一条消息被追加到分区的位移)

OffsetRequest中有两个比较重要的概念: LatestTime和EarliestTime,分别为-1和-2。它们和属性auto.offset.reset关系也很亲密,分别对应于largest和smallest。其中largest表示自动地重试位移到最大位移;smallest表示自动地重设位移为最小位移。

OffsetRequest格式如下:

versionId + correlationId + clientId + replicaId + topic count + [topic + partition count + [partitionId + partition time + maxNumOffsets]* ]*

Kafka专门创建了一个PartitionOffsetRequestInfo 类来保存partition time + maxNumOffset。指定Partition time(单位是ms)表示要请求所有在该时间点之前的消息,比如说可以指定OffsetRequest.LatestTime表示请求当前所有消息。

OffsetResponse比较简单,格式如下:

correlationId + topic count + [ topic + partition count + [partition Id + error Code + offset数组长度 + 每个offset]* ]*

返回的offset数组就是某个分区下每个日志段的起始位移。

十七、OffsetCommitRequest.scala/OffsetCommitResponse.scala

这个request与下面要说的OffsetFetchRequest api都是用于集中式管理位移的。其中OffsetCommitRequest格式如下:

versionId + correlationId + clientId + consumer group Id + consumer generationId(0.8.2以后新加的) + consumer id(0.8.2以后新加的) + topic count + [topic + partition count + [partitionId + offset + timestamp(0.8.2以后新加) + metadata]* ]*

其中,offset,timestamp和metadata都是OffsetAndMetadata类提供的信息。

OffsetCommitResponse就要简单的多,它只有一个版本的格式:

correlationId + topic count + [topic + partition count + [partitionId + errorCode]* ]*

十八、OffsetFetchRequest.scala/OffsetFetchResponse.scala

顾名思义,获取offset信息的request,格式如下:

versionId + correlationId + clientId + consumer group id + topic count + [ topic + partition count + [ partitionId ]* ]*

虽然只有一个格式,但需要注意的是0.8.2以前都是从zookeeper中读offset,从0.8.2之后从kafka中读取offset

OffsetFetchResponse格式如下:

correlationId + topic count + [ topic + partition count + [ partitionId + offset + medata + errorCode]* ]*

其中offset和metadata是保存在OffsetAndMetadata实例中。

时间: 2024-09-30 06:42:20

【原创】Kakfa api包源代码分析的相关文章

【原创】Kakfa cluster包源代码分析

kafka.cluster包定义了Kafka的基本逻辑概念:broker.cluster.partition和replica——这些是最基本的概念.只有弄懂了这些概念,你才真正地使用kakfa来帮助完成你的需求.因为scala文件不多,还是老规矩,我们一个一个分析. 一.Broker.scala broker可以说是Kafka最基础的概念了,没有broker就没有kafka集群,更不用提负责解耦生产者和消费者了.Kafka使用了一个case class来定义Broker类.一个Broker的属性

【原创】Kakfa log包源代码分析(一)

Kafka日志包是提供的是日志管理系统.主要的类是LogManager——该类负责处理所有的日志,并根据topic/partition分发日志.它还负责flush策略以及日志保存策略.Kafka日志本身是由多个日志段组成(log segment).一个日志是一个FileMessageSet,它包含了日志数据以及OffsetIndex对象,该对象使用位移来读取日志数据. 下面我们一个一个地分析,先说最简单的: 一.LogConfig.scala 该scala定义了Defaults object,里

【原创】Kakfa message包源代码分析

笔者最近在研究Kafka的message包代码,有了一些心得,特此记录一下.其实研究的目的从来都不是只是看源代码,更多地是想借这个机会思考几个问题:为什么是这么实现的?你自己实现方式是什么?比起人家的实现方式,你的方案有哪些优缺点? 任何消息引擎系统最重要的都是定义消息,使用什么数据结构来保存消息和消息队列?刚刚思考这个问题的时候,我自己尝试实现了一下Message的定义: public class Message implements Serializable { private CRC32

【原创】Kakfa network包源代码分析

kafka.network包主要为kafka提供网络服务,通常不包含具体的逻辑,都是一些最基本的网络服务组件.其中比较重要的是Receive.Send和Handler.Receive和Send封装了底层的入站(inbound)和出站(outbound)字节传输请求,而Handler在此二者间做了一个映射.一个handler就代表了一个函数,该函数接收Receive类型的对象并返回Send类型的对象.用户可以处理过冲中添加逻辑代码,并且需要自行捕获传输读写过程中的异常,将其序列化之后发送给客户端.

【原创】Kakfa metrics包源代码分析

这个包主要是与Kafka度量相关的. 一.KafkaTimer.scala 对代码块的运行进行计时.仅提供一个方法: timer——在运行传入函数f的同时为期计时 二.KafkaMetricsConfig.scala 指定reporter类,以逗号分隔的reporter类,比如kafka.metrics.KafkaCSVMetricsReporter,这些类必须要在claasspath中指定.另外指定了度量的轮询间隔,默认是10秒. 三.KafkaMetricsReporter.scala Ka

【原创】Kakfa common包源代码分析

初一看common包的代码吓了一跳,这么多scala文件!后面仔细一看大部分都是Kafka自定义的Exception类,简直可以改称为kafka.exceptions包了.由于那些异常类的名称通常都定义得很直观,笔者就不在一一赘述了.现在我们说说common包中其他的代码. 一.AppInfo.scala 这是一个object,实现了KafkaMetricsGroup trait.后者可以认为是一个创建各种度量元的工厂类.主要利用Yammer Metrics框架创建各种度量元,比如guage,m

【原创】Kakfa serializer包源代码分析

这个包很简单,只有两个scala文件: decoder和encoder,就是提供序列化/反序列化的服务.我们一个一个说. 一.Decoder.scala 首先定义了一个trait: Decoder[T].在这个trait中定义了一个抽象方法fromBytes,用于将一个字节数组转换成一个类型T的对象.实现此trait的子类的构造函数中必须要接收一个VerifiableProperties. Kafka还定义了两个实现了 Decoder的子类: DefaultDecoder和StringDecod

【原创】kafka consumer源代码分析

顾名思义,就是kafka的consumer api包. 一.ConsumerConfig.scala Kafka consumer的配置类,除了一些默认值常量及验证参数的方法之外,就是consumer的配置参数了,比如group.id, consumer.id等,详细列表见官网. 二.ConsumerIterator.scala KafkaStream的迭代器类,当stream底层的阻塞队列为空时该iterator会置于阻塞状态.这个迭代器还提供了一个shutdownCommand对象可作为一个

【原创】kafka server源代码分析(一)

这个是Kafka server的核心包,里面的类也很多,我们还是一个一个分析 一.BrokerStates.scala 定义了目前一个kafka broker的7中状态 —— 1. NotRunning:未运行 2. Starting:启动中 3. RecoveringFromUncleanShutdown:从上次异常恢复中 4. RunningAsBroker:已启动 5. RunningAsController:作为Controller运行 6. PendingControlledShutd