Kafka 源代码分析之LogManager

这里分析kafka 0.8.2的LogManager

logmanager是kafka用来管理log文件的子系统.源代码文件在log目录下.

这里会逐步分析logmanager的源代码.首先看class 初始化部分.

private def createLogManager(zkClient: ZkClient, brokerState: BrokerState): LogManager = {     //这个函数就是在kafkaServer.start函数里调用的封装函数
    val defaultLogConfig = LogConfig(segmentSize = config.logSegmentBytes,  //创建各种logconfig
                                     segmentMs = config.logRollTimeMillis,
                                     segmentJitterMs = config.logRollTimeJitterMillis,
                                     flushInterval = config.logFlushIntervalMessages,
                                     flushMs = config.logFlushIntervalMs.toLong,
                                     retentionSize = config.logRetentionBytes,
                                     retentionMs = config.logRetentionTimeMillis,
                                     maxMessageSize = config.messageMaxBytes,
                                     maxIndexSize = config.logIndexSizeMaxBytes,
                                     indexInterval = config.logIndexIntervalBytes,
                                     deleteRetentionMs = config.logCleanerDeleteRetentionMs,
                                     fileDeleteDelayMs = config.logDeleteDelayMs,
                                     minCleanableRatio = config.logCleanerMinCleanRatio,
                                     compact = config.logCleanupPolicy.trim.toLowerCase == "compact")
    val defaultProps = defaultLogConfig.toProps
    val configs = AdminUtils.fetchAllTopicConfigs(zkClient).mapValues(LogConfig.fromProps(defaultProps, _))
    // read the log configurations from zookeeper
    val cleanerConfig = CleanerConfig(numThreads = config.logCleanerThreads,  //创建压缩log的配置文件
                                      dedupeBufferSize = config.logCleanerDedupeBufferSize,
                                      dedupeBufferLoadFactor = config.logCleanerDedupeBufferLoadFactor,
                                      ioBufferSize = config.logCleanerIoBufferSize,
                                      maxMessageSize = config.messageMaxBytes,
                                      maxIoBytesPerSecond = config.logCleanerIoMaxBytesPerSecond,
                                      backOffMs = config.logCleanerBackoffMs,
                                      enableCleaner = config.logCleanerEnable)    //这里就是创建了logmanager的实例对象.
    new LogManager(logDirs = config.logDirs.map(new File(_)).toArray,  //这是logdirs的位置.即配置文件中的logdir
                   topicConfigs = configs,                             //topic的配置信息,从zookeeper上得到的.
                   defaultConfig = defaultLogConfig,                   //默认配置信息
                   cleanerConfig = cleanerConfig,                      //压缩log配置信息
                   ioThreads = config.numRecoveryThreadsPerDataDir,    //io线程的个数.一般是8
                   flushCheckMs = config.logFlushSchedulerIntervalMs,  //log刷新到磁盘的间隔.一般是10000ms
                   flushCheckpointMs = config.logFlushOffsetCheckpointIntervalMs, //log检查点的间隔.一般是10000ms
                   retentionCheckMs = config.logCleanupIntervalMs,                //log 清除的时间间隔.即保留时长.一般是7*24hour
                   scheduler = kafkaScheduler,                         //这个即kafkaserver.start函数里最早启动和声明的对象.用来做后台任务.
                   brokerState = brokerState,                          //borker状态
                   time = time)
  }

  上面这个函数就是kafkaserver里创建logmanager对象的入口.下面看看logmanager本身的初始化部分.

class LogManager(val logDirs: Array[File],           //这是class声明部分.可以跟调用部分对照.每个参数的类型都很清楚.
                 val topicConfigs: Map[String, LogConfig],
                 val defaultConfig: LogConfig,
                 val cleanerConfig: CleanerConfig,
                 ioThreads: Int,
                 val flushCheckMs: Long,
                 val flushCheckpointMs: Long,
                 val retentionCheckMs: Long,
                 scheduler: Scheduler,
                 val brokerState: BrokerState,
                 private val time: Time) extends Logging {
  val RecoveryPointCheckpointFile = "recovery-point-offset-checkpoint" //默认的检查点文件.
  val LockFile = ".lock"      //默认的锁文件.kafka不正常关闭的时候可以看见这个文件未被清理.在logdir下.
  val InitialTaskDelayMs = 30*1000 //初始任务时常.
  private val logCreationOrDeletionLock = new Object
  private val logs = new Pool[TopicAndPartition, Log]() //logs 是后面所有topic对象的总集.之后关于所有log上的操作都是通过logs.

  createAndValidateLogDirs(logDirs)   //创建和验证logdir.
  private val dirLocks = lockLogDirs(logDirs) //对logdir加锁.创建锁文件.就是上面的lockFile文件.
  private val recoveryPointCheckpoints = logDirs.map(dir => (dir, new OffsetCheckpoint(new File(dir, RecoveryPointCheckpointFile)))).toMap      //为每个log存储路径做一个恢复点检查文件的map集合.数据结构是(存储路径1:File类型->OffsetCheckpoint对象,...),每个存储路径下都有一个恢复文件.文件内容记录按行记录.第一行记录版本,第二行记录所有topic个数,之后的行按"topicname partition lastoffset"的格式记录所有topic的名字,分区,最后的offset.recoveryPointCheckpoints这个对象在之后会用到为每一个topic查询最后的offset用.
  loadLogs()  //将所有存储路径下的log文件创建一组log对象,并put到logs中.

  // public, so we can access this from kafka.admin.DeleteTopicTest  //这一部分就是log归整压缩的功能是否启用.然后创建相应的对象.我从来没用过这个功能.
  val cleaner: LogCleaner =
    if(cleanerConfig.enableCleaner)
      new LogCleaner(cleanerConfig, logDirs, logs, time = time)
    else
      null

  初始化部分已经介绍完了.下面看看初始化部分用到的具体函数部分.

private def createAndValidateLogDirs(dirs: Seq[File]) {   //这个就是初始化部分创建和验证logdir的函数.
    if(dirs.map(_.getCanonicalPath).toSet.size < dirs.size) //检查logdir的路径是否合法.
      throw new KafkaException("Duplicate log directory found: " + logDirs.mkString(", "))
    for(dir <- dirs) {
      if(!dir.exists) {  //不存在就创建.
        info("Log directory ‘" + dir.getAbsolutePath + "‘ not found, creating it.")
        val created = dir.mkdirs()
        if(!created)
          throw new KafkaException("Failed to create data directory " + dir.getAbsolutePath)
      }
      if(!dir.isDirectory || !dir.canRead) //不是目录或不可读就抛出异常
        throw new KafkaException(dir.getAbsolutePath + " is not a readable log directory.")
    }
  }

  /**
   * Lock all the given directories
   */
  private def lockLogDirs(dirs: Seq[File]): Seq[FileLock] = {
    dirs.map { dir =>
      val lock = new FileLock(new File(dir, LockFile)) //创建锁文件
      if(!lock.tryLock()) //尝试获得锁.
        throw new KafkaException("Failed to acquire lock on file .lock in " + lock.file.getParentFile.getAbsolutePath +
                               ". A Kafka instance in another process or thread is using this directory.")
      lock //返回锁.
    }
  }

  上面两个函数是对目录做的一些检查和枷锁工作.下面的loadlogs函数就是将logdir下所有的日志加载的复杂工作了.

private def loadLogs(): Unit = {
    info("Loading logs.")

    val threadPools = mutable.ArrayBuffer.empty[ExecutorService]  //初始化了一个线程工厂池,容纳所有存储路径对应的线程池对象.
    val jobs = mutable.Map.empty[File, Seq[Future[_]]]  //jobs用来记录每一个创建log对象的runnable工作结果集.

    for (dir <- this.logDirs) {         //这里开始遍历每一个logdirs下面的log路径.logdirs一般都是"/data1/kafka/logs,/data2/kafka/logs,/data3/kafka/logs"这种数据目录格式.因此dir对应的应该是每一个单独的log存储目录.
      val pool = Executors.newFixedThreadPool(ioThreads) //这里为每一个存储目录创建一个固定数量的线程池.因为一般为了提高磁盘读写性能都会设置多个磁盘目录.因此这个实际上是为每一个磁盘创建一个固定数量的线程池.
      threadPools.append(pool)  //把创建的线程池加入到线程工厂池里.

      val cleanShutdownFile = new File(dir, Log.CleanShutdownFile)      //这个是获取标志这个log存储目录是否要被恢复的文件对象.Log.CleanShutdownFile在同目录下Log.scala里定义的.值是".kafka_cleanshutdown",这个也会成为下文用来识别每一个存储路径对应的工作线程池的标识.

      if (cleanShutdownFile.exists) { //这里判断是否跳过恢复这个log存储目录,否则就创建一个新的borkerstate
        debug(
          "Found clean shutdown file. " +
          "Skipping recovery for all logs in data directory: " +
          dir.getAbsolutePath)
      } else {
        // log recovery itself is being performed by `Log` class during initialization
        brokerState.newState(RecoveringFromUncleanShutdown)
      }
   //这个对象是一个Map[TopicAndPartition Long]对象.内容是通过上OffsetCheckpoints里的read方法.将对象存储路径下的恢复文件解析成Map类型,下文会介绍这个类和方法.
      val recoveryPoints = this.recoveryPointCheckpoints(dir).read
      //通过for循环来生成工作集.
      val jobsForDir = for {
        dirContent <- Option(dir.listFiles).toList
        logDir <- dirContent if logDir.isDirectory
      } yield {
        Utils.runnable {  //通过这个方法为每一个topic生成一个runnable类型对象.这个对象的run方法就是这个块.
          debug("Loading log ‘" + logDir.getName + "‘")
                   //这个对象实现的主要功能就在这里.通过目录名字获得topic名字和分区号.然后为每个分区生成一份默认的配置信息.再通过recoveryPoints对象获得lastoffset
          val topicPartition = Log.parseTopicPartitionName(logDir.getName) //获得topic信息
          val config = topicConfigs.getOrElse(topicPartition.topic, defaultConfig)  //生成配置信息
          val logRecoveryPoint = recoveryPoints.getOrElse(topicPartition, 0L)  //获取最后的offset
          //根据上面获得到的lastoffset和配置.生成一个log对象.并将这个对象put到logs全局变量中去.供之后操作.
          val current = new Log(logDir, config, logRecoveryPoint, scheduler, time)  //创建topic对应log对象
          val previous = this.logs.put(topicPartition, current)  //添加到logs全局变量中.

          if (previous != null) {
            throw new IllegalArgumentException(
              "Duplicate log directories found: %s, %s!".format(
              current.dir.getAbsolutePath, previous.dir.getAbsolutePath))
          }
        }
      }
      //最后在这里把上面生成的topic对应的runnable对象放到函数最开始声明的线程池中去执行.并将返回future对象集放到jobs对应的标识中去.
      jobs(cleanShutdownFile) = jobsForDir.map(pool.submit).toSeq
    }

   //函数最后在这里获取所有runnable对象执行结果.并且清理cleanShutdownFile文件,关闭所有执行线程.
    try {
      for ((cleanShutdownFile, dirJobs) <- jobs) {
        dirJobs.foreach(_.get)  //获取结果
        cleanShutdownFile.delete() //清理对象
      }
    } catch {
      case e: ExecutionException => {
        error("There was an error in one of the threads during logs loading: " + e.getCause)
        throw e.getCause
      }
    } finally {
      threadPools.foreach(_.shutdown()) //关闭所有存储路径对应的线程池.
    }

    info("Logs loading complete.")
  }

  上面就是所有topic分区被加载的过程.下面插入一些被这个函数用到的一些关键函数.

class OffsetCheckpoint(val file: File) extends Logging {       //这个class就是上面用来恢复log的读写类.
  private val lock = new Object()
  new File(file + ".tmp").delete() // try to delete any existing temp files for cleanliness
  file.createNewFile() // in case the file doesn‘t exist

//这个方法是写检查点文件的 具体也比较简单.就不再做过多说明
  def write(offsets: Map[TopicAndPartition, Long]) {
    lock synchronized {
      // write to temp file and then swap with the existing file
      val temp = new File(file.getAbsolutePath + ".tmp")

      val fileOutputStream = new FileOutputStream(temp)
      val writer = new BufferedWriter(new OutputStreamWriter(fileOutputStream))
      try {
        // write the current version
        writer.write(0.toString)
        writer.newLine()

        // write the number of entries
        writer.write(offsets.size.toString)
        writer.newLine()

        // write the entries
        offsets.foreach { case (topicPart, offset) =>
          writer.write("%s %d %d".format(topicPart.topic, topicPart.partition, offset))
          writer.newLine()
        }

        // flush the buffer and then fsync the underlying file
        writer.flush()
        fileOutputStream.getFD().sync()
      } finally {
        writer.close()
      }

      // swap new offset checkpoint file with previous one
      if(!temp.renameTo(file)) {
        // renameTo() fails on Windows if the destination file exists.
        file.delete()
        if(!temp.renameTo(file))
          throw new IOException("File rename from %s to %s failed.".format(temp.getAbsolutePath, file.getAbsolutePath))
      }
    }
  }
//这个就是loadlogs函数中被调用的read函数.用来读取检查点文件的.
  def read(): Map[TopicAndPartition, Long] = {
    lock synchronized {
      val reader = new BufferedReader(new FileReader(file))
      try {
        var line = reader.readLine()
        if(line == null)
          return Map.empty
        val version = line.toInt
        version match {
          case 0 =>
            line = reader.readLine()
            if(line == null)
              return Map.empty
            val expectedSize = line.toInt
            var offsets = Map[TopicAndPartition, Long]()   //这个对象就是最后返回的对象.
            line = reader.readLine()
            while(line != null) {
              val pieces = line.split("\\s+")
              if(pieces.length != 3)
                throw new IOException("Malformed line in offset checkpoint file: ‘%s‘.".format(line))

              val topic = pieces(0)
              val partition = pieces(1).toInt
              val offset = pieces(2).toLong
              offsets += (TopicAndPartition(topic, partition) -> offset) //将解析的每一行都添加到offset里.
              line = reader.readLine()
            }
            if(offsets.size != expectedSize)
              throw new IOException("Expected %d entries but found only %d".format(expectedSize, offsets.size))
            offsets  //最后在这里返回.
          case _ =>
            throw new IOException("Unrecognized version of the highwatermark checkpoint file: " + version)
        }
      } finally {
        reader.close()
      }
    }
  }

}

  上面这个类是在server目录下.用来恢复和写入检查点文件的.

时间: 2024-10-19 20:18:53

Kafka 源代码分析之LogManager的相关文章

Kafka 源代码分析.

这里记录kafka源代码笔记.(代码版本是0.8.2.1) 这里不再从kafka启动顺序说起.网上已经一堆kafka启动顺序和框架上的文章了.这里不再罗嗦了,主要详细说一下代码细节部分.细节部分会一直读一直补充.如果想看看kafka 框架及启动顺序之类的文章推荐下面这个链接. http://www.cnblogs.com/davidwang456/p/5173486.html 这个链接作者贴上了框架图和代码.比较清晰. 配置文件解析等工作都在KafkaConfig.scala中实现.这个类文件在

Kafka 源代码分析之Log

这里分析Log对象本身的源代码. Log类是一个topic分区的基础类.一个topic分区的所有基本管理动作.都在这个对象里完成.类源代码文件为Log.scala.在源代码log目录下. Log类是LogSegment的集合和管理封装.首先看看初始化代码. class Log(val dir: File, //log的实例化对象在LogManager分析中已经介绍过.这里可以对照一下. @volatile var config: LogConfig, @volatile var recovery

Kafka 源代码分析之LogSegment

这里分析kafka LogSegment源代码 通过一步步分析LogManager,Log源代码之后就会发现,最终的log操作都在LogSegment上实现.LogSegment负责分片的读写恢复刷新删除等动作都在这里实现.LogSegment代码同样在源代码目录log下. LogSegment是一个日志分片的操作最小单元.直接作用与messages之上.负责实体消息的读写追加等等. LogSegment实际上是FileMessageSet类的代理类.LogSegment中的所有最终处理都在Fi

Kafka 源代码分析之ByteBufferMessageSet

这里分析一下message的封装类ByteBufferMessageSet类 ByteBufferMessageSet类的源代码在源代码目录message目录下.这个类主要封装了message,messageset,messageandoffset等类的对象.在Log类中读写log的时候基本上都是以这个类的对象为基本操作对象的. 下面看看类的具体代码.首先是初始化部分. class ByteBufferMessageSet(val buffer: ByteBuffer) extends Mess

Kafka 源代码分析之FileMessageSet

这里主要分析FileMessageSet类 这个类主要是管理log消息的内存对象和文件对象的类.源代码文件在log目录下.这个类被LogSegment类代理调用用来管理分片. 下面是完整代码.代码比较简单.就不做过多说明了.这个类是MessageSet抽象类的实现类. class FileMessageSet private[kafka](@volatile var file: File, private[log] val channel: FileChannel, private[log] v

Kafka 源代码分析之log框架介绍

这里主要介绍log管理,读写相关的类的调用关系的介绍. 在围绕log的实际处理上.有很多层的封装和调用.这里主要介绍一下调用结构和顺序. 首先从LogManager开始. 调用关系简单如下:LogManager->Log->LogSegment->FileMessageSet->ByteBufferMessageSet->MessageSet->Message LogManager作为kafka一个子系统在管理log的工作上必不可少.LogManager通过Log类来为

Jafka源代码分析——LogManager

在Kafka中,LogManager负责管理broker上所有的Log(每一个topic-partition为一个Log).通过阅读源代码可知其具体完成的功能如下: 1. 按照预设规则对消息队列进行清理. 2. 按照预设规则对消息队列进行持久化(flush操作). 3. 连接ZooKeeper进行broker.topic.partition相关的ZooKeeper操作. 4. 管理broker上所有的Log. 下面一一对这些功能的实现进行详细的解析. 一.对于Log的管理 LogManager包

Kafka SocketServer源代码分析

Kafka SocketServer源代码分析 标签: kafka 本文将详细分析Kafka SocketServer的相关源码. 总体设计 Kafka SocketServer是基于Java NIO来开发的,采用了Reactor的模式,其中包含了1个Acceptor负责接受客户端请求,N个Processor负责读写数据,M个Handler来处理业务逻辑.在Acceptor和Processor,Processor和Handler之间都有队列来缓冲请求. kafka.network.Accepto

【原创】kafka server源代码分析(二)

十四.AbstractFetcherManager.scala 该scala定义了两个case类和一个抽象类.两个case类很简单: 1. BrokerAndFectherId:封装了一个broker和一个fetcher的数据结构 2. BrokerAndInitialOffset:封装了broker和初始位移的一个数据结构 该scala中最核心的还是那个抽象类:AbstractFetcherManager.它维护了一个获取线程的map,主要保存broker id + fetcher id对应的