Kafka 源代码分析之Log

这里分析Log对象本身的源代码.

Log类是一个topic分区的基础类.一个topic分区的所有基本管理动作.都在这个对象里完成.类源代码文件为Log.scala.在源代码log目录下.

Log类是LogSegment的集合和管理封装.首先看看初始化代码.

class Log(val dir: File,                           //log的实例化对象在LogManager分析中已经介绍过.这里可以对照一下.
          @volatile var config: LogConfig,
          @volatile var recoveryPoint: Long = 0L,
          scheduler: Scheduler,
          time: Time = SystemTime) extends Logging with KafkaMetricsGroup {

  import kafka.log.Log._    //这里是把同文件下的object加载进来.代码在文件的末尾.

  /* A lock that guards all modifications to the log */
  private val lock = new Object             //锁对象

  /* last time it was flushed */
  private val lastflushedTime = new AtomicLong(time.milliseconds)  //最后log刷新到磁盘的时间,这个变量贯穿整个管理过程.

  /* the actual segments of the log */  //这个对象是这个topic下所有分片的集合.这个集合贯彻整个log管理过程.之后所有动作都依赖此集合.
  private val segments: ConcurrentNavigableMap[java.lang.Long, LogSegment] = new ConcurrentSkipListMap[java.lang.Long, LogSegment]
  loadSegments() //将topic所有的分片加载到segments集合了.并做一些topic分片文件检查工作.

  /* Calculate the offset of the next message */
  @volatile var nextOffsetMetadata = new LogOffsetMetadata(activeSegment.nextOffset(), activeSegment.baseOffset, activeSegment.size.toInt)  //activeSegment表示当前最后一个分片.因为分片是按大小分布.最大的就是最新的.也就是活跃的分片.这里生成下一个offsetmetadata

  val topicAndPartition: TopicAndPartition = Log.parseTopicPartitionName(name)  //获取topic名称和分区.

  info("Completed load of log %s with log end offset %d".format(name, logEndOffset))

  val tags = Map("topic" -> topicAndPartition.topic, "partition" -> topicAndPartition.partition.toString) //监控度量的映射标签.
 //下面全是通过metrics做的一些监控.
  newGauge("NumLogSegments",
    new Gauge[Int] {
      def value = numberOfSegments
    },
    tags)

  newGauge("LogStartOffset",
    new Gauge[Long] {
      def value = logStartOffset
    },
    tags)

  newGauge("LogEndOffset",
    new Gauge[Long] {
      def value = logEndOffset
    },
    tags)

  newGauge("Size",
    new Gauge[Long] {
      def value = size
    },
    tags)

  /** The name of this log */
  def name  = dir.getName()

  上面是Log class初始化的部分.这个部分最重要的就是声明了几个贯穿全过程的对象,并且将分片文件加载到内存对象中.

  下面看看主要的加载函数loadSegments.

private def loadSegments() {
    // create the log directory if it doesn‘t exist
    dir.mkdirs()       //这里是创建topic目录的.本身的注释也说明了这个.

    // first do a pass through the files in the log directory and remove any temporary files
    // and complete any interrupted swap operations
    for(file <- dir.listFiles if file.isFile) {            //这个for循环是用来检查分片是否是要被清除或者删除的.
      if(!file.canRead)
        throw new IOException("Could not read file " + file)
      val filename = file.getName
      if(filename.endsWith(DeletedFileSuffix) || filename.endsWith(CleanedFileSuffix)) {
        // if the file ends in .deleted or .cleaned, delete it
        file.delete()
      } else if(filename.endsWith(SwapFileSuffix)) {       //这里检查是不是有swap文件存在.根据不同情况删除或重命名swap文件.
        // we crashed in the middle of a swap operation, to recover:
        // if a log, swap it in and delete the .index file
        // if an index just delete it, it will be rebuilt
        val baseName = new File(Utils.replaceSuffix(file.getPath, SwapFileSuffix, ""))
        if(baseName.getPath.endsWith(IndexFileSuffix)) {
          file.delete()
        } else if(baseName.getPath.endsWith(LogFileSuffix)){
          // delete the index
          val index = new File(Utils.replaceSuffix(baseName.getPath, LogFileSuffix, IndexFileSuffix))
          index.delete()
          // complete the swap operation
          val renamed = file.renameTo(baseName)
          if(renamed)
            info("Found log file %s from interrupted swap operation, repairing.".format(file.getPath))
          else
            throw new KafkaException("Failed to rename file %s.".format(file.getPath))
        }
      }
    }

    // now do a second pass and load all the .log and .index files
    for(file <- dir.listFiles if file.isFile) {   //这个for循环是加载和检查log分片是否存在的.
      val filename = file.getName
      if(filename.endsWith(IndexFileSuffix)) {
        // if it is an index file, make sure it has a corresponding .log file
        val logFile = new File(file.getAbsolutePath.replace(IndexFileSuffix, LogFileSuffix))
        if(!logFile.exists) {   //这里是如果只有index文件没有对应的log文件.就把index文件清理掉.
          warn("Found an orphaned index file, %s, with no corresponding log file.".format(file.getAbsolutePath))
          file.delete()
        }
      } else if(filename.endsWith(LogFileSuffix)) {   //这里是创建LogSegment对象的地方.
        // if its a log file, load the corresponding log segment
        val start = filename.substring(0, filename.length - LogFileSuffix.length).toLong
        val hasIndex = Log.indexFilename(dir, start).exists  //确认对应的index文件是否存在.
        val segment = new LogSegment(dir = dir,
                                     startOffset = start,
                                     indexIntervalBytes = config.indexInterval,
                                     maxIndexSize = config.maxIndexSize,
                                     rollJitterMs = config.randomSegmentJitter,
                                     time = time)
        if(!hasIndex) {
          error("Could not find index file corresponding to log file %s, rebuilding index...".format(segment.log.file.getAbsolutePath))
          segment.recover(config.maxMessageSize)  //对应log文件的index不存在的话,进行recover.这个地方就是平常碰见kafka index出错需要重新建立的时候管理员删除了对应的index会引起的动作.
        }
        segments.put(start, segment) //将segment对象添加到总集里.
      }
    }

    if(logSegments.size == 0) {  //这里判断是否是一个新的topic分区.尚不存在分片文件.所以创建一个空的分片文件对象.
      // no existing segments, create a new mutable segment beginning at offset 0
      segments.put(0L, new LogSegment(dir = dir,
                                     startOffset = 0,
                                     indexIntervalBytes = config.indexInterval,
                                     maxIndexSize = config.maxIndexSize,
                                     rollJitterMs = config.randomSegmentJitter,
                                     time = time))
    } else {
      recoverLog()  //这里是topic分片不为空的话.就为检查点设置新offset值.
      // reset the index size of the currently active log segment to allow more entries
      activeSegment.index.resize(config.maxIndexSize)
    }

    // sanity check the index file of every segment to ensure we don‘t proceed with a corrupt segment
    for (s <- logSegments)
      s.index.sanityCheck()  //index文件检查.
  }

  看看recoverLog是做了哪些工作.

private def recoverLog() {
    // if we have the clean shutdown marker, skip recovery
    if(hasCleanShutdownFile) {    //看看是否有cleanshutdownfile存在.hasCleanShutdownFile函数就是判断这个文件存不存在
      this.recoveryPoint = activeSegment.nextOffset //存在则直接把恢复检查点设置成最后一个分片的最新offset值
      return
    }

    // okay we need to actually recovery this log
    val unflushed = logSegments(this.recoveryPoint, Long.MaxValue).iterator //这个是获取检查点到最大值之间是否还有其他的分片.也就是检查检查点是不是就是最后一个分片文件.
    while(unflushed.hasNext) { //如果不是最后一个分片.就获取这个分片.然后调用这个对象的recover函数如果函数返回错误就删除这个分片.
      val curr = unflushed.next
      info("Recovering unflushed segment %d in log %s.".format(curr.baseOffset, name))
      val truncatedBytes =
        try {
          curr.recover(config.maxMessageSize)
        } catch {
          case e: InvalidOffsetException =>
            val startOffset = curr.baseOffset
            warn("Found invalid offset during recovery for log " + dir.getName +". Deleting the corrupt segment and " +
                 "creating an empty one with starting offset " + startOffset)
            curr.truncateTo(startOffset)
        }
      if(truncatedBytes > 0) {
        // we had an invalid message, delete all remaining log
        warn("Corruption found in segment %d of log %s, truncating to offset %d.".format(curr.baseOffset, name, curr.nextOffset))
        unflushed.foreach(deleteSegment)
      }
    }
  }

   这个函数的处理动作.包装的是LogSegment对同名对象.LogSegment的分析会在后续的部分继续分析.

时间: 2024-11-03 22:42:24

Kafka 源代码分析之Log的相关文章

Kafka 源代码分析之log框架介绍

这里主要介绍log管理,读写相关的类的调用关系的介绍. 在围绕log的实际处理上.有很多层的封装和调用.这里主要介绍一下调用结构和顺序. 首先从LogManager开始. 调用关系简单如下:LogManager->Log->LogSegment->FileMessageSet->ByteBufferMessageSet->MessageSet->Message LogManager作为kafka一个子系统在管理log的工作上必不可少.LogManager通过Log类来为

Kafka 源代码分析之LogManager

这里分析kafka 0.8.2的LogManager logmanager是kafka用来管理log文件的子系统.源代码文件在log目录下. 这里会逐步分析logmanager的源代码.首先看class 初始化部分. private def createLogManager(zkClient: ZkClient, brokerState: BrokerState): LogManager = { //这个函数就是在kafkaServer.start函数里调用的封装函数 val defaultLo

Kafka 源代码分析之LogSegment

这里分析kafka LogSegment源代码 通过一步步分析LogManager,Log源代码之后就会发现,最终的log操作都在LogSegment上实现.LogSegment负责分片的读写恢复刷新删除等动作都在这里实现.LogSegment代码同样在源代码目录log下. LogSegment是一个日志分片的操作最小单元.直接作用与messages之上.负责实体消息的读写追加等等. LogSegment实际上是FileMessageSet类的代理类.LogSegment中的所有最终处理都在Fi

Kafka 源代码分析之FileMessageSet

这里主要分析FileMessageSet类 这个类主要是管理log消息的内存对象和文件对象的类.源代码文件在log目录下.这个类被LogSegment类代理调用用来管理分片. 下面是完整代码.代码比较简单.就不做过多说明了.这个类是MessageSet抽象类的实现类. class FileMessageSet private[kafka](@volatile var file: File, private[log] val channel: FileChannel, private[log] v

Kafka 源代码分析.

这里记录kafka源代码笔记.(代码版本是0.8.2.1) 这里不再从kafka启动顺序说起.网上已经一堆kafka启动顺序和框架上的文章了.这里不再罗嗦了,主要详细说一下代码细节部分.细节部分会一直读一直补充.如果想看看kafka 框架及启动顺序之类的文章推荐下面这个链接. http://www.cnblogs.com/davidwang456/p/5173486.html 这个链接作者贴上了框架图和代码.比较清晰. 配置文件解析等工作都在KafkaConfig.scala中实现.这个类文件在

Kafka 源代码分析之ByteBufferMessageSet

这里分析一下message的封装类ByteBufferMessageSet类 ByteBufferMessageSet类的源代码在源代码目录message目录下.这个类主要封装了message,messageset,messageandoffset等类的对象.在Log类中读写log的时候基本上都是以这个类的对象为基本操作对象的. 下面看看类的具体代码.首先是初始化部分. class ByteBufferMessageSet(val buffer: ByteBuffer) extends Mess

Kafka SocketServer源代码分析

Kafka SocketServer源代码分析 标签: kafka 本文将详细分析Kafka SocketServer的相关源码. 总体设计 Kafka SocketServer是基于Java NIO来开发的,采用了Reactor的模式,其中包含了1个Acceptor负责接受客户端请求,N个Processor负责读写数据,M个Handler来处理业务逻辑.在Acceptor和Processor,Processor和Handler之间都有队列来缓冲请求. kafka.network.Accepto

Jafka源代码分析——LogManager

在Kafka中,LogManager负责管理broker上所有的Log(每一个topic-partition为一个Log).通过阅读源代码可知其具体完成的功能如下: 1. 按照预设规则对消息队列进行清理. 2. 按照预设规则对消息队列进行持久化(flush操作). 3. 连接ZooKeeper进行broker.topic.partition相关的ZooKeeper操作. 4. 管理broker上所有的Log. 下面一一对这些功能的实现进行详细的解析. 一.对于Log的管理 LogManager包

转:RTMPDump源代码分析

0: 主要函数调用分析 rtmpdump 是一个用来处理 RTMP 流媒体的开源工具包,支持 rtmp://, rtmpt://, rtmpe://, rtmpte://, and rtmps://.也提供 Android 版本. 最近研究了一下它内部函数调用的关系. 下面列出几个主要的函数的调用关系. RTMPDump用于下载RTMP流媒体的函数Download: 用于建立网络连接(NetConnect)的函数Connect: 用于建立网络流(NetStream)的函数 rtmpdump源代码