【原创】Kakfa cluster包源代码分析

kafka.cluster包定义了Kafka的基本逻辑概念:broker、cluster、partition和replica——这些是最基本的概念。只有弄懂了这些概念,你才真正地使用kakfa来帮助完成你的需求。因为scala文件不多,还是老规矩,我们一个一个分析。

一、Broker.scala

broker可以说是Kafka最基础的概念了,没有broker就没有kafka集群,更不用提负责解耦生产者和消费者了。Kafka使用了一个case class来定义Broker类。一个Broker的属性有id, host和port,分别表示这个Broker的id号,主机名或IP以及端口。除此之外还定义了几个很方便的方法:

1. connectionString: 返回host:port这样的字符串,如果是IPV6,返回[host]:port,其实在指定producer的核心配置项:metadata.broker.list时候你也需要按照这种格式指定,当然多个host:port对之间用逗号分隔

2. writeTo: 将broker的属性信息依次写入一个ByteBuffer,具体布局为:4个字节的id信息+2个字节的主机名或IP的长度N信息+N个字节的主机名或IP+4个字节的端口信息

3. sizeInBytes: Broker底层ByteBuffer的总字节数, 2(主机名或IP长度N信息,占2个字节) + N + 4 (broker id) + 4 (port)。比如一个broker的信息是id = 1, host:port = localhost:9092,那么sizeInBytes的值就是2 + 9 + 4 + 4 = 19个字节

4. equals: Broker类复写了equals方法,只有id,host, port相等且host时才是为两个Broker相等

5. hashCode: 类似地,要同时使用id,host和port三个属性一起计算hashcode

除了这个case class,这个scala文件还定义了一个object,提供了一些工厂方法用于创建Broker:

1. createBroker: 接收一个id和一个json格式的字符串来创建Broker。JSON格式的字符串格式大概是这样的:{"jmx_port":-1,"timestamp":"1429495288971","host":"tc_207_97","version":1,"port":9092}

然后该方法从中提取出port和host。这串字符串保存在zookeeper节点/brokers/ids/<borkerId>下

2. readFrom:也是创建一个Broker,只不过这次是从ByteBuffer(上面writeTo方法写入的ByteBuffer)中提取出Broker的属性

二、Cluster.scala

所谓的Kafka集群,其实就是当前可用的Broker集合。既然涉及到多个Broker,Cluster类必然会定义个容器类的变量来保存这些broker。没错,它定义brokers变量,使用了可变的HashMap来做底层实现。key是broker的id,value是对应的Broker实例。创建Kafka集群有两种方式:1. 创建一个空的Cluster实例,然后调用add方法不断添加broker;2. 将一组Broker实例传递给Cluster直接创建出Kafka集群(辅助构造函数)。除此之外,Cluster类还定义了一些方法:

1. getBroker: 根据id号查找对应的Broker

2. add: 增加一个Broker到集群中

3. remove: 从集群中移除一个Broker

4. size: 返回当前集群的Broker数

三、Partition.scala

Partition类是保存Kafka中某个topic分区的数据结构。每个分区都有一个leader分区和一组ISR(in-sync replica)副本(注意,ISR也包括leader)。所有的读写操作都只会传递到leader分区,ISR中的副本只是被称为追随者(follower)。关于这部分,网上有一篇很好的文章,推荐大家去阅读:http://www.infoq.com/cn/articles/kafka-analysis-part-2

这个类有很多代码,我们按照构造函数、类成员变量、类方法的顺序来分析。首先分析构造函数。

1. 构造函数

这个类的构造函数接收4个参数,其含义分别为:

I. topic: 该分区属于哪个topic

II. partitionId: 分区ID号

III. time: 提供日期方面的服务

IV. replicaManager: ReplicaManager对象(后面会说到),主要做副本管理用的

在ReplicaManager中的getOrCreatePartition方法会调用这个构造函数创建新的Partition实例。

2. 类成员变量

localBrokerId —— config文件中配置的broker id号,每个broker id号都应该是一个唯一的正整数

logManager —— 日志管理器,用于创建/删除日志,从replicaManager中获取

zkClient —— ZooKeeper客户端,用于操作zk,从replicaManager中获取

assignedReplicaMap —— 副本的map,key是broker id,value是副本实例(由Replica类表征,后面会说到这个类)。通常用于将本地分区加到对应的map中

leaderIsrUpdateLock —— 锁对象,提供用于多线程读时的同步保护

zkVersion —— 分区在zookeeper上的状态信息,初始状态是0

leaderEpoch —— 该broker成为partition leader的次数,初始值是-1

leaderReplicaIdOpt —— 该分区leader副本的broker id号,当然有可能是None

inSyncReplicas(ISR) —— 该分区的ISR副本集合

controllerEpoch —— controller broker变更该分区leader的次数,初始值是0

插一句,代码中出现的都是Epoch,感觉应该是纪元时间,但Zookeeper中的都是整数值,表示次数

logIdent —— 打印日志使用的字符串

newGauge("UnderReplicated") —— ISR集合数量比所有副本数量少的情况发生的次数

3. 类方法

getReplica —— 根据某个副本ID来获取对应的副本实例,默认是获取本地broker对应的副本对象

leaderReplicaIfLocal —— 如果这个partition的leader副本就在本地broker上返回对应的副本,否则返回None。

isUnderReplicated —— 比较ISR数与总的副本数

isReplicaLocal —— 判断给定的副本是否在本地broker上

addReplicaIfNotExists —— 为副本map增加一个给定的副本

getOrCreateReplica —— 获取副本实例,如果不存在直接创建一个新的Replica。具体做法是根据给定的副本id在副本map中搜寻,如果存在对应记录直接返回;否则先判断是否是本地副本,如果不是本地副本,直接创建一个Replica对象,加入到副本map中返回。如果就是本地的一个副本,创建对应的Log并读出对应目录下的检查点文件中的位移信息封装进一个Replica对象,加入到副本map中返回

assignedReplicas —— 返回该分区下所有的副本对象

removeReplica —— 删除给定的副本对象

delete —— 加锁的方式来删除一个分区,包括清空分区下所有副本对象,重置ISR集合对象并将leaderReplicaIdOpt置空,同时删除该分区下的日志

getLeaderEpoch —— 获取leaderEpoch的值

makeLeader —— 将当前broker选举为给定分区的leader。首先获取该分区所有副本信息以及该分区的leader、ISR集合和controller_epoch信息。第二步是创建出对应的副本实例(刚才得到的只是副本号的集合)并添加到assignedReplicaMap中。第三,根据isr副本号集合,创建一个ISR副本实例集合并删除那些controller已经去除掉的,然后更新ISR,leader和zkVersion以及leaderReplicaIdOpt,然后为这个本地的副本创建新的高水位元数据信息,最后重置非本地副本的日志结束位移为-1。如果这个分区属于__consumer_offsets这个特殊的topic,那么还需要发起异步请求读取这个分区并存入缓存,最后返回true表明选举leader成功

makeFollower —— 将本地副本选举为追随者,做法就是将leader和ISR置空。具体流程依然是获取该分区下所有副本信息以及该分区的leader、isr以及controller_epoch信息,创建出对应的副本实例并加入到副本map中,然后还是移除掉那些controller本已经删除的副本,最后更新isr,leader和zkVersion信息。如果分区属于特殊的offset topic的话,还需清理属于该分区组的cache。最后判断一下如果这个分区的leader没有改变的话,返回false否则更新leader返回true。

maybeIncrementLeaderHW —— 顾名思义,就是推高高水位,其实前面代码中也有高水位的概念——和oracle中的差不多,就是标记分区中最近一次已提交消息的位移。高水位信息会不断地被传给follower并且定期地更新到磁盘的检查点文件中用于broker的恢复。这样当重启一个失败的副本时,它首先会从磁盘中恢复最新的高水位并抛弃掉所有位于高水位之上的消息。这样做主要是因为高水位之上的消息并不能保证是已提交的。这些都做完之后该副本会被重新加入到ISR中,系统重新恢复。okay,重新说回这个方法,值得注意的是,这个方法不需要锁的保护,因为所有调用它的外层方法都已经获取锁了。这个方法具体的逻辑是先获取ISR中所有副本的结束位移,并提取出最小的一个位移作为新的高水位,如果给定leader副本的高水位在新高水位之下自然什么都不需要做;否则的话Kafka就需要更新这个分区的高水位。

getOutofSyncReplicas —— 根据给定的leader选出那些落后leader太多的副本,这里落后太多指满足落后时间已超配置时间或落后的消息数已超配置数。

updateIsr —— 根据给定的Replica集合创建新的LeaderAndIsr实例,然后更新Zookeeper

maybeShrinkIsr —— 将那些落后leader太多的副本从ISR中移除。选出要删除的副本之后将它们从ISR中去除掉之后更新Zk和缓存中的ISR

appendMessagesToLeader —— 将消息集合追加到leader副本。首先要判断这个broker是否是leader,如果不是的话报错返回;否则获取到该leader对应的日志和min.insync.replicas配置。如果当前ISR数少于该配置,那么就不能追加消息;如果这些条件都满足,那么调用Log.append方法将消息集合追加到日志中,最后更新高水位的位置

equals —— 两个分区如果topic或分区Id有不一样的就认为两个分区不同

hashCode —— 根据分区Id重新计算hash code

toString —— 分区信息,包括topic,分区Id,leader,AR(已分配的副本), ISR

四、Replica.scala

Replica类就是表示分区的副本对象。我们还是按照构造函数、类字段和类方法的顺序来说。

1. 构造函数

该构造函数接收4个参数,含义分别为:

① brokerId —— Broker id

② partition —— 该副本属于哪个分区

③ time —— 提供时间方面的服务

④ initialHighWatermarkValue —— 初始的高水位值

⑤ log —— 副本对象底层的日志对象

2. 类字段

highWatermarkMetadata —— 使用初始高水位值创建的LogOffsetMetadata,表示高水位的位移值。如果该副本不是leader的话那么就只保存它的位移值

logEndOffsetMetadata —— 日志结束位移,所有副本都要保存。如果是本地副本,那么该值就是日志的结束位移;如果是远程副本,该值只会被follower更新。初始化该值为-1

logEndOffsetUpdateTimeMsValue —— 更新位移时候的时间戳

topic/partitionId —— 副本所属的topic和partition

3. 类方法

isLocal —— 是否是本地副本,主要看日志对象是否为空,本地副本日志对象不为空

logEndOffset setter —— 根据给定的位移值更新结束位移,当然不能更新本地日志位移(否则抛错)——具体就是更新logEndOffsetMetadata和logEndOffsetUpdateTimeMsValue两个变量的值

logEndOffset getter —— 如果是本地副本,直接返回本地日志的结束位移(插入下一条消息的可用地址),否则返回由logEndOffsetMetadata

logEndOffsetUpdateTimeMs —— 获取最近一次更新offset的时间

highWatermark setter —— 更新本地副本的高水位值。注意,不能更新某分区下非本地副本的高水位值

highWatermark getter —— 返回highWatermarkMetadata保存的高水位offset值

convertHWToLocalOffsetMetadata —— 所有与高水位相关的操作都只能在本地副本上进行,具体做法是调用副本底层日志对象的convertToOffsetMetadata方法传入副本保存的高水位值实现

equals —— 如果两个副本对象的topic、broker id或分区有任何一样不同即视为两个副本对象不等

hashCode —— 根据topic、broker id和分区共同计算hash code

时间: 2024-11-10 12:11:20

【原创】Kakfa cluster包源代码分析的相关文章

【原创】Kakfa log包源代码分析(一)

Kafka日志包是提供的是日志管理系统.主要的类是LogManager——该类负责处理所有的日志,并根据topic/partition分发日志.它还负责flush策略以及日志保存策略.Kafka日志本身是由多个日志段组成(log segment).一个日志是一个FileMessageSet,它包含了日志数据以及OffsetIndex对象,该对象使用位移来读取日志数据. 下面我们一个一个地分析,先说最简单的: 一.LogConfig.scala 该scala定义了Defaults object,里

【原创】Kakfa message包源代码分析

笔者最近在研究Kafka的message包代码,有了一些心得,特此记录一下.其实研究的目的从来都不是只是看源代码,更多地是想借这个机会思考几个问题:为什么是这么实现的?你自己实现方式是什么?比起人家的实现方式,你的方案有哪些优缺点? 任何消息引擎系统最重要的都是定义消息,使用什么数据结构来保存消息和消息队列?刚刚思考这个问题的时候,我自己尝试实现了一下Message的定义: public class Message implements Serializable { private CRC32

【原创】Kakfa network包源代码分析

kafka.network包主要为kafka提供网络服务,通常不包含具体的逻辑,都是一些最基本的网络服务组件.其中比较重要的是Receive.Send和Handler.Receive和Send封装了底层的入站(inbound)和出站(outbound)字节传输请求,而Handler在此二者间做了一个映射.一个handler就代表了一个函数,该函数接收Receive类型的对象并返回Send类型的对象.用户可以处理过冲中添加逻辑代码,并且需要自行捕获传输读写过程中的异常,将其序列化之后发送给客户端.

【原创】Kakfa common包源代码分析

初一看common包的代码吓了一跳,这么多scala文件!后面仔细一看大部分都是Kafka自定义的Exception类,简直可以改称为kafka.exceptions包了.由于那些异常类的名称通常都定义得很直观,笔者就不在一一赘述了.现在我们说说common包中其他的代码. 一.AppInfo.scala 这是一个object,实现了KafkaMetricsGroup trait.后者可以认为是一个创建各种度量元的工厂类.主要利用Yammer Metrics框架创建各种度量元,比如guage,m

【原创】Kakfa serializer包源代码分析

这个包很简单,只有两个scala文件: decoder和encoder,就是提供序列化/反序列化的服务.我们一个一个说. 一.Decoder.scala 首先定义了一个trait: Decoder[T].在这个trait中定义了一个抽象方法fromBytes,用于将一个字节数组转换成一个类型T的对象.实现此trait的子类的构造函数中必须要接收一个VerifiableProperties. Kafka还定义了两个实现了 Decoder的子类: DefaultDecoder和StringDecod

【原创】Kakfa metrics包源代码分析

这个包主要是与Kafka度量相关的. 一.KafkaTimer.scala 对代码块的运行进行计时.仅提供一个方法: timer——在运行传入函数f的同时为期计时 二.KafkaMetricsConfig.scala 指定reporter类,以逗号分隔的reporter类,比如kafka.metrics.KafkaCSVMetricsReporter,这些类必须要在claasspath中指定.另外指定了度量的轮询间隔,默认是10秒. 三.KafkaMetricsReporter.scala Ka

【原创】Kakfa api包源代码分析

既然包名是api,说明里面肯定都是一些常用的Kafka API了. 一.ApiUtils.scala 顾名思义,就是一些常见的api辅助类,定义的方法包括: 1. readShortString: 从一个ByteBuffer中读取字符串长度和字符串.这个ByteBuffer的格式应该是:2个字节的字符串长度值N+N个字节的字符串 2. writeShortString: 与readShortString相反,先写入2个字节的长度N,然后写入N个字节到ByteBuffer中 3. shortStr

【原创】kafka consumer源代码分析

顾名思义,就是kafka的consumer api包. 一.ConsumerConfig.scala Kafka consumer的配置类,除了一些默认值常量及验证参数的方法之外,就是consumer的配置参数了,比如group.id, consumer.id等,详细列表见官网. 二.ConsumerIterator.scala KafkaStream的迭代器类,当stream底层的阻塞队列为空时该iterator会置于阻塞状态.这个迭代器还提供了一个shutdownCommand对象可作为一个

【原创】kafka server源代码分析(一)

这个是Kafka server的核心包,里面的类也很多,我们还是一个一个分析 一.BrokerStates.scala 定义了目前一个kafka broker的7中状态 —— 1. NotRunning:未运行 2. Starting:启动中 3. RecoveringFromUncleanShutdown:从上次异常恢复中 4. RunningAsBroker:已启动 5. RunningAsController:作为Controller运行 6. PendingControlledShutd