Kafka 源代码分析.

这里记录kafka源代码笔记.(代码版本是0.8.2.1)

这里不再从kafka启动顺序说起.网上已经一堆kafka启动顺序和框架上的文章了.这里不再罗嗦了,主要详细说一下代码细节部分.细节部分会一直读一直补充.如果想看看kafka 框架及启动顺序之类的文章推荐下面这个链接.

http://www.cnblogs.com/davidwang456/p/5173486.html

这个链接作者贴上了框架图和代码.比较清晰.

配置文件解析等工作都在KafkaConfig.scala中实现.这个类文件在server目录下.下文使用的配置文件项都是由这个类提供.这个类继承ZKConfig类实现.ZKConfig在Utils.scala文件中.功能是将zookeeper相关参数分离出来.

1,首先从kafkaScheduler.start()启动说起.

  KafkaScheduler是同名类的实例对象.类源文件在源代码目录的utils目录下.这个对象的初始化是在KafkaServer 类的开头部分.实例化参数是config.backgroundThreads.这个参数对应的是配置文件中的background.threads参数.默认是4.

  KafkaScheduler的是封装了ScheduldThreadPoolExecutor,在这个类的start函数里实例化stpe类.并设置了在执行器关闭的时候不再执行现有和周期性任务.通过setThreadFactory 设置了自己的工厂函数.工厂函数里调用了utils目录下的Utils类中的newThread方法.做了一些记录性工作.

  KafkaScheduler类同时实现了scheduler方法.这个方法是封装了stpe的scheduler的方法.用来传递需要执行的方法.

  shutdown函数也是封装了stpe的shutdown和awaitshutdown方法实现的.

  这个对象通过createlogmanager函数传递到LogManager类中.在这个类中的start方法里调用scheduler方法将logmanager的任务放入线程池中.logmanager具体任务是log保留,log刷新,log检查这三个.这三个任务的详细情况将在LogManager部分解释.

2,之后是通过Zkinit()建立了zkclient对象.

  zkinit()函数做的就是通过ZkClient初始化zkclient对象,并且做一些kafka在zookeeper里路径设置的工作.

  首先检查config.Zkconnect是否指定了zookeeper中的根路径.ZKconnect就是配置文件中的zookeeper.connect参数.这个参数可以在ip_list后面加上/xxx/xxx这种指定的zookeeper工作路径.如:192.168.1.100,127.0.0.1:2181/kafka/dir,代码中直接使用substr(index(‘/‘))的方式截取chroot.如果未设置则直接使用ZkUtils.setupCommonPaths(zkClient)来创建kafka的所有应用目录.具体应用目录可以见ZkUtils.scala文件中.这个文件在utils目录下.

  不过这个函数代码有点小疑问,这个疑问先暂时保留.也可能是我未看完全部代码导致的.

private def initZk(): ZkClient = {
    info("Connecting to zookeeper on " + config.zkConnect)
//一般配置zookeeper.connect选项的时候很少会在后面跟路径.但是当跟路径之后chroot就会是追加的路径.
    val chroot = {
      if (config.zkConnect.indexOf("/") > 0)
        config.zkConnect.substring(config.zkConnect.indexOf("/"))
      else
        ""
    }
//chroot是追加路径的时候.会执行下面这段代码.
    if (chroot.length > 1) {  //疑问在这个地方.
      val zkConnForChrootCreation = config.zkConnect.substring(0, config.zkConnect.indexOf("/"))//这是取出zookeeper ip list
      val zkClientForChrootCreation = new ZkClient(zkConnForChrootCreation, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs, ZKStringSerializer)//然后建立zkclient对象.
      ZkUtils.makeSurePersistentPathExists(zkClientForChrootCreation, chroot)//之后通过这个方法创建这个路径.
      info("Created zookeeper path " + chroot)//打印一个create info
      zkClientForChrootCreation.close()//这里清理这个追加路径的zkclient对象.
    }
//做完这些事之后.又正常的用zkconnect来建立一个zkClient对象........如果chroot.length>1的话.这个时候zkconnect应该是跟if块里的一样需要提取ip list才对.//如果一开始就没有追加路径的话.这里是没有任何问题的.
    val zkClient = new ZkClient(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs, ZKStringSerializer)
    ZkUtils.setupCommonPaths(zkClient) //这个函数是调用ZkUtils.makeSurePersistentPathExists函数来建立kafka路径.zkClient }

def setupCommonPaths(zkClient: ZkClient) { //可以看见这个函数是如何工作的
  for(path <- Seq(ConsumersPath, BrokerIdsPath, BrokerTopicsPath, TopicConfigChangesPath, TopicConfigPath, DeleteTopicsPath))
  makeSurePersistentPathExists(zkClient, path) //以上这些路径是提前定义好的.定义如下.
}


val ConsumersPath = "/consumers"  //这些路径是在ZkUtils类开头定义的.可以看见这个定义里无论你是否追加了chroot.也没有任何影响.
val BrokerIdsPath = "/brokers/ids"
val BrokerTopicsPath = "/brokers/topics"
val TopicConfigPath = "/config/topics"
val TopicConfigChangesPath = "/config/changes"
val ControllerPath = "/controller"
val ControllerEpochPath = "/controller_epoch"
val ReassignPartitionsPath = "/admin/reassign_partitions"
val DeleteTopicsPath = "/admin/delete_topics"
val PreferredReplicaLeaderElectionPath = "/admin/preferred_replica_election"

 

  因为是版本0.8.2.1的代码所以我又去对照trunk版本的代码看了一下.这个函数变了.不是直接调用zkclient实现是调用zkutils实现得了.

  

  

时间: 2024-07-29 15:17:05

Kafka 源代码分析.的相关文章

Kafka 源代码分析之ByteBufferMessageSet

这里分析一下message的封装类ByteBufferMessageSet类 ByteBufferMessageSet类的源代码在源代码目录message目录下.这个类主要封装了message,messageset,messageandoffset等类的对象.在Log类中读写log的时候基本上都是以这个类的对象为基本操作对象的. 下面看看类的具体代码.首先是初始化部分. class ByteBufferMessageSet(val buffer: ByteBuffer) extends Mess

Kafka 源代码分析之LogManager

这里分析kafka 0.8.2的LogManager logmanager是kafka用来管理log文件的子系统.源代码文件在log目录下. 这里会逐步分析logmanager的源代码.首先看class 初始化部分. private def createLogManager(zkClient: ZkClient, brokerState: BrokerState): LogManager = { //这个函数就是在kafkaServer.start函数里调用的封装函数 val defaultLo

Kafka 源代码分析之Log

这里分析Log对象本身的源代码. Log类是一个topic分区的基础类.一个topic分区的所有基本管理动作.都在这个对象里完成.类源代码文件为Log.scala.在源代码log目录下. Log类是LogSegment的集合和管理封装.首先看看初始化代码. class Log(val dir: File, //log的实例化对象在LogManager分析中已经介绍过.这里可以对照一下. @volatile var config: LogConfig, @volatile var recovery

Kafka 源代码分析之LogSegment

这里分析kafka LogSegment源代码 通过一步步分析LogManager,Log源代码之后就会发现,最终的log操作都在LogSegment上实现.LogSegment负责分片的读写恢复刷新删除等动作都在这里实现.LogSegment代码同样在源代码目录log下. LogSegment是一个日志分片的操作最小单元.直接作用与messages之上.负责实体消息的读写追加等等. LogSegment实际上是FileMessageSet类的代理类.LogSegment中的所有最终处理都在Fi

Kafka 源代码分析之FileMessageSet

这里主要分析FileMessageSet类 这个类主要是管理log消息的内存对象和文件对象的类.源代码文件在log目录下.这个类被LogSegment类代理调用用来管理分片. 下面是完整代码.代码比较简单.就不做过多说明了.这个类是MessageSet抽象类的实现类. class FileMessageSet private[kafka](@volatile var file: File, private[log] val channel: FileChannel, private[log] v

Kafka 源代码分析之log框架介绍

这里主要介绍log管理,读写相关的类的调用关系的介绍. 在围绕log的实际处理上.有很多层的封装和调用.这里主要介绍一下调用结构和顺序. 首先从LogManager开始. 调用关系简单如下:LogManager->Log->LogSegment->FileMessageSet->ByteBufferMessageSet->MessageSet->Message LogManager作为kafka一个子系统在管理log的工作上必不可少.LogManager通过Log类来为

Kafka SocketServer源代码分析

Kafka SocketServer源代码分析 标签: kafka 本文将详细分析Kafka SocketServer的相关源码. 总体设计 Kafka SocketServer是基于Java NIO来开发的,采用了Reactor的模式,其中包含了1个Acceptor负责接受客户端请求,N个Processor负责读写数据,M个Handler来处理业务逻辑.在Acceptor和Processor,Processor和Handler之间都有队列来缓冲请求. kafka.network.Accepto

Jafka源代码分析——随笔

Kafka是一个分布式的消息中间件,可以粗略的将其划分为三部分:Producer.Broker和Consumer.其中,Producer负责产生消息并负责将消息发送给Kafka:Broker可以简单的理解为Kafka集群中的每一台机器,其负责完成消息队列的主要功能(接收消息.消息的持久化存储.为Consumer提供消息.消息清理.....):Consumer从Broker获取消息并进行后续的操作.每个broker会有一个ID标识,该标识由人工在配置文件中配置. Kafka中的消息隶属于topic

Jafka源代码分析——LogManager

在Kafka中,LogManager负责管理broker上所有的Log(每一个topic-partition为一个Log).通过阅读源代码可知其具体完成的功能如下: 1. 按照预设规则对消息队列进行清理. 2. 按照预设规则对消息队列进行持久化(flush操作). 3. 连接ZooKeeper进行broker.topic.partition相关的ZooKeeper操作. 4. 管理broker上所有的Log. 下面一一对这些功能的实现进行详细的解析. 一.对于Log的管理 LogManager包