Kafka 源代码分析之LogSegment

这里分析kafka LogSegment源代码

通过一步步分析LogManager,Log源代码之后就会发现,最终的log操作都在LogSegment上实现.LogSegment负责分片的读写恢复刷新删除等动作都在这里实现.LogSegment代码同样在源代码目录log下.

LogSegment是一个日志分片的操作最小单元.直接作用与messages之上.负责实体消息的读写追加等等.

LogSegment实际上是FileMessageSet类的代理类.LogSegment中的所有最终处理都在FileMessageSet类中实现.FileMessageSet类的最终操作建立在ByteBufferMessageSet这个消息实体类的基础上.通过操作FileChannel对象来实现消息读写.

下面来看看主要的一些函数方法.

  初始化部分

class LogSegment(val log: FileMessageSet,     //实际构造是这个.
                 val index: OffsetIndex,
                 val baseOffset: Long,
                 val indexIntervalBytes: Int,
                 val rollJitterMs: Long,
                 time: Time) extends Logging {

  var created = time.milliseconds

  /* the number of bytes since we last added an entry in the offset index */
  private var bytesSinceLastIndexEntry = 0
  //在Log中被调用的构造是这个.可以看见是通过topicAndPartition路径和startOffset来创建index和logfile的.
  def this(dir: File, startOffset: Long, indexIntervalBytes: Int, maxIndexSize: Int, rollJitterMs: Long, time: Time) =
    this(new FileMessageSet(file = Log.logFilename(dir, startOffset)),
         new OffsetIndex(file = Log.indexFilename(dir, startOffset), baseOffset = startOffset, maxIndexSize = maxIndexSize),
         startOffset,
         indexIntervalBytes,
         rollJitterMs,
         time)

  添加消息函数append

def append(offset: Long, messages: ByteBufferMessageSet) {
    if (messages.sizeInBytes > 0) { //判断消息不为空.
      trace("Inserting %d bytes at offset %d at position %d".format(messages.sizeInBytes, offset, log.sizeInBytes()))
      // append an entry to the index (if needed)
      if(bytesSinceLastIndexEntry > indexIntervalBytes) {
        index.append(offset, log.sizeInBytes())
        this.bytesSinceLastIndexEntry = 0
      }
      // append the messages
      log.append(messages) //调用FileMessageSet类的append方法想写消息.实际上最终调用的是ByteBufferMessageSet类方法来操作消息实体的.
      this.bytesSinceLastIndexEntry += messages.sizeInBytes
    }
  }

  刷新消息到磁盘的flush函数

def flush() {
    LogFlushStats.logFlushTimer.time {
      log.flush()   //可以看见调用的FileMessageSet类的方法.最终FileMessageSet.flush方法调用channel.force方法刷新存储设备.
      index.flush() //同上.
    }
  }

  读取消息的read函数

def read(startOffset: Long, maxOffset: Option[Long], maxSize: Int): FetchDataInfo = {
    if(maxSize < 0)
      throw new IllegalArgumentException("Invalid max size for log read (%d)".format(maxSize))

    val logSize = log.sizeInBytes // this may change, need to save a consistent copy
    val startPosition = translateOffset(startOffset)  //获取对应offset的读取点位置.

    // if the start position is already off the end of the log, return null
    if(startPosition == null) //没有读取点位置则返回空
      return null

    val offsetMetadata = new LogOffsetMetadata(startOffset, this.baseOffset, startPosition.position) //定义offsetMetadata

    // if the size is zero, still return a log segment but with zero size
    if(maxSize == 0)  //最大读取尺寸是0的话.返回空消息.
      return FetchDataInfo(offsetMetadata, MessageSet.Empty)

    // calculate the length of the message set to read based on whether or not they gave us a maxOffset
    val length =   //计算最大读取的消息总长度.
      maxOffset match {
        case None => //未设置maxoffset则使用maxsize.
          // no max offset, just use the max size they gave unmolested
          maxSize
        case Some(offset) => { //如果设置了Maxoffset,则计算对应的消息长度.
          // there is a max offset, translate it to a file position and use that to calculate the max read size
          if(offset < startOffset)  //maxoffset小于startoffset则返回异常
            throw new IllegalArgumentException("Attempt to read with a maximum offset (%d) less than the start offset (%d).".format(offset, startOffset))
          val mapping = translateOffset(offset, startPosition.position) //获取相对maxoffset读取点.
          val endPosition =
            if(mapping == null)
              logSize // the max offset is off the end of the log, use the end of the file
            else
              mapping.position
          min(endPosition - startPosition.position, maxSize) //用maxoffset读取点减去开始的读取点.获取需要读取的数据长度.如果长度比maxsize大则返回maxsize
        }
      }
    FetchDataInfo(offsetMetadata, log.read(startPosition.position, length)) //使用FileMessageSet.read读取相应长度的数据返回FetchDataInfo的封装对象.
  }

  读取函数通过映射offset到读取长度.来读取多个offset.

private[log] def translateOffset(offset: Long, startingFilePosition: Int = 0): OffsetPosition = {  //用来将offset映射到读取指针位置的函数.
    val mapping = index.lookup(offset) //通过查找index获取对应的指针对象.
    log.searchFor(offset, max(mapping.position, startingFilePosition)) //通过FileMessageSet获取对应的指针位置.
  }

  recover函数.kafka启动检查时用到的各层调用的最后代理函数.

def recover(maxMessageSize: Int): Int = {
    index.truncate()
    index.resize(index.maxIndexSize)
    var validBytes = 0
    var lastIndexEntry = 0
    val iter = log.iterator(maxMessageSize)
    try {
      while(iter.hasNext) {
        val entry = iter.next
        entry.message.ensureValid()
        if(validBytes - lastIndexEntry > indexIntervalBytes) {
          // we need to decompress the message, if required, to get the offset of the first uncompressed message
          val startOffset =
            entry.message.compressionCodec match {
              case NoCompressionCodec =>
                entry.offset
              case _ =>
                ByteBufferMessageSet.decompress(entry.message).head.offset
          }
          index.append(startOffset, validBytes)
          lastIndexEntry = validBytes
        }
        validBytes += MessageSet.entrySize(entry.message)
      }
    } catch {
      case e: InvalidMessageException =>
        logger.warn("Found invalid messages in log segment %s at byte offset %d: %s.".format(log.file.getAbsolutePath, validBytes, e.getMessage))
    }
    val truncated = log.sizeInBytes - validBytes
    log.truncateTo(validBytes)
    index.trimToValidSize()
    truncated
  }

  分片删除函数

 def delete() {
    val deletedLog = log.delete()   //最终是删除文件,关闭内存数组.在FileMessageSet里实现.
    val deletedIndex = index.delete() //同上.
    if(!deletedLog && log.file.exists)
      throw new KafkaStorageException("Delete of log " + log.file.getName + " failed.")
    if(!deletedIndex && index.file.exists)
      throw new KafkaStorageException("Delete of index " + index.file.getName + " failed.")
  }

到这里LogSegment主要函数都分析完了.

时间: 2024-08-05 22:54:25

Kafka 源代码分析之LogSegment的相关文章

Kafka 源代码分析.

这里记录kafka源代码笔记.(代码版本是0.8.2.1) 这里不再从kafka启动顺序说起.网上已经一堆kafka启动顺序和框架上的文章了.这里不再罗嗦了,主要详细说一下代码细节部分.细节部分会一直读一直补充.如果想看看kafka 框架及启动顺序之类的文章推荐下面这个链接. http://www.cnblogs.com/davidwang456/p/5173486.html 这个链接作者贴上了框架图和代码.比较清晰. 配置文件解析等工作都在KafkaConfig.scala中实现.这个类文件在

Kafka 源代码分析之Log

这里分析Log对象本身的源代码. Log类是一个topic分区的基础类.一个topic分区的所有基本管理动作.都在这个对象里完成.类源代码文件为Log.scala.在源代码log目录下. Log类是LogSegment的集合和管理封装.首先看看初始化代码. class Log(val dir: File, //log的实例化对象在LogManager分析中已经介绍过.这里可以对照一下. @volatile var config: LogConfig, @volatile var recovery

Kafka 源代码分析之FileMessageSet

这里主要分析FileMessageSet类 这个类主要是管理log消息的内存对象和文件对象的类.源代码文件在log目录下.这个类被LogSegment类代理调用用来管理分片. 下面是完整代码.代码比较简单.就不做过多说明了.这个类是MessageSet抽象类的实现类. class FileMessageSet private[kafka](@volatile var file: File, private[log] val channel: FileChannel, private[log] v

Kafka 源代码分析之ByteBufferMessageSet

这里分析一下message的封装类ByteBufferMessageSet类 ByteBufferMessageSet类的源代码在源代码目录message目录下.这个类主要封装了message,messageset,messageandoffset等类的对象.在Log类中读写log的时候基本上都是以这个类的对象为基本操作对象的. 下面看看类的具体代码.首先是初始化部分. class ByteBufferMessageSet(val buffer: ByteBuffer) extends Mess

Kafka 源代码分析之LogManager

这里分析kafka 0.8.2的LogManager logmanager是kafka用来管理log文件的子系统.源代码文件在log目录下. 这里会逐步分析logmanager的源代码.首先看class 初始化部分. private def createLogManager(zkClient: ZkClient, brokerState: BrokerState): LogManager = { //这个函数就是在kafkaServer.start函数里调用的封装函数 val defaultLo

Kafka 源代码分析之log框架介绍

这里主要介绍log管理,读写相关的类的调用关系的介绍. 在围绕log的实际处理上.有很多层的封装和调用.这里主要介绍一下调用结构和顺序. 首先从LogManager开始. 调用关系简单如下:LogManager->Log->LogSegment->FileMessageSet->ByteBufferMessageSet->MessageSet->Message LogManager作为kafka一个子系统在管理log的工作上必不可少.LogManager通过Log类来为

Kafka SocketServer源代码分析

Kafka SocketServer源代码分析 标签: kafka 本文将详细分析Kafka SocketServer的相关源码. 总体设计 Kafka SocketServer是基于Java NIO来开发的,采用了Reactor的模式,其中包含了1个Acceptor负责接受客户端请求,N个Processor负责读写数据,M个Handler来处理业务逻辑.在Acceptor和Processor,Processor和Handler之间都有队列来缓冲请求. kafka.network.Accepto

Jafka源代码分析——随笔

Kafka是一个分布式的消息中间件,可以粗略的将其划分为三部分:Producer.Broker和Consumer.其中,Producer负责产生消息并负责将消息发送给Kafka:Broker可以简单的理解为Kafka集群中的每一台机器,其负责完成消息队列的主要功能(接收消息.消息的持久化存储.为Consumer提供消息.消息清理.....):Consumer从Broker获取消息并进行后续的操作.每个broker会有一个ID标识,该标识由人工在配置文件中配置. Kafka中的消息隶属于topic

Jafka源代码分析——LogManager

在Kafka中,LogManager负责管理broker上所有的Log(每一个topic-partition为一个Log).通过阅读源代码可知其具体完成的功能如下: 1. 按照预设规则对消息队列进行清理. 2. 按照预设规则对消息队列进行持久化(flush操作). 3. 连接ZooKeeper进行broker.topic.partition相关的ZooKeeper操作. 4. 管理broker上所有的Log. 下面一一对这些功能的实现进行详细的解析. 一.对于Log的管理 LogManager包