Kafka 源代码分析之FileMessageSet

这里主要分析FileMessageSet类

这个类主要是管理log消息的内存对象和文件对象的类.源代码文件在log目录下.这个类被LogSegment类代理调用用来管理分片.

下面是完整代码.代码比较简单.就不做过多说明了.这个类是MessageSet抽象类的实现类.

class FileMessageSet private[kafka](@volatile var file: File,
                                    private[log] val channel: FileChannel,
                                    private[log] val start: Int,
                                    private[log] val end: Int,
                                    isSlice: Boolean) extends MessageSet with Logging {

  /* the size of the message set in bytes */
  private val _size =
    if(isSlice)
      new AtomicInteger(end - start) // don‘t check the file size if this is just a slice view
    else
      new AtomicInteger(math.min(channel.size().toInt, end) - start)

  /* if this is not a slice, update the file pointer to the end of the file */
  if (!isSlice)
    /* set the file position to the last byte in the file */
    channel.position(channel.size)

  /**
   * Create a file message set with no slicing.
   */
  def this(file: File, channel: FileChannel) =
    this(file, channel, start = 0, end = Int.MaxValue, isSlice = false)

  /**
   * Create a file message set with no slicing
   */
  def this(file: File) =
    this(file, Utils.openChannel(file, mutable = true))

  /**
   * Create a file message set with mutable option
   */
  def this(file: File, mutable: Boolean) = this(file, Utils.openChannel(file, mutable))

  /**
   * Create a slice view of the file message set that begins and ends at the given byte offsets
   */
  def this(file: File, channel: FileChannel, start: Int, end: Int) =
    this(file, channel, start, end, isSlice = true)

  /**
   * Return a message set which is a view into this set starting from the given position and with the given size limit.
   *
   * If the size is beyond the end of the file, the end will be based on the size of the file at the time of the read.
   *
   * If this message set is already sliced, the position will be taken relative to that slicing.
   *
   * @param position The start position to begin the read from
   * @param size The number of bytes after the start position to include
   *
   * @return A sliced wrapper on this message set limited based on the given position and size
   */
  def read(position: Int, size: Int): FileMessageSet = {  //返回读取段对象
    if(position < 0)
      throw new IllegalArgumentException("Invalid position: " + position)
    if(size < 0)
      throw new IllegalArgumentException("Invalid size: " + size)
    new FileMessageSet(file,
                       channel,
                       start = this.start + position,
                       end = math.min(this.start + position + size, sizeInBytes()))
  }

  /**
   * Search forward for the file position of the last offset that is greater than or equal to the target offset
   * and return its physical position. If no such offsets are found, return null.
   * @param targetOffset The offset to search for.
   * @param startingPosition The starting position in the file to begin searching from.
   */
  def searchFor(targetOffset: Long, startingPosition: Int): OffsetPosition = {  //搜索读写点的方法
    var position = startingPosition
    val buffer = ByteBuffer.allocate(MessageSet.LogOverhead)
    val size = sizeInBytes()
    while(position + MessageSet.LogOverhead < size) {
      buffer.rewind()
      channel.read(buffer, position)
      if(buffer.hasRemaining)
        throw new IllegalStateException("Failed to read complete buffer for targetOffset %d startPosition %d in %s"
                                        .format(targetOffset, startingPosition, file.getAbsolutePath))
      buffer.rewind()
      val offset = buffer.getLong()
      if(offset >= targetOffset)
        return OffsetPosition(offset, position)
      val messageSize = buffer.getInt()
      if(messageSize < Message.MessageOverhead)
        throw new IllegalStateException("Invalid message size: " + messageSize)
      position += MessageSet.LogOverhead + messageSize
    }
    null
  }

  /**
   * Write some of this set to the given channel.
   * @param destChannel The channel to write to.
   * @param writePosition The position in the message set to begin writing from.
   * @param size The maximum number of bytes to write
   * @return The number of bytes actually written.
   */
  def writeTo(destChannel: GatheringByteChannel, writePosition: Long, size: Int): Int = {  //主要写方法
    // Ensure that the underlying size has not changed.
    val newSize = math.min(channel.size().toInt, end) - start
    if (newSize < _size.get()) {
      throw new KafkaException("Size of FileMessageSet %s has been truncated during write: old size %d, new size %d"
        .format(file.getAbsolutePath, _size.get(), newSize))
    }
    val bytesTransferred = channel.transferTo(start + writePosition, math.min(size, sizeInBytes), destChannel).toInt
    trace("FileMessageSet " + file.getAbsolutePath + " : bytes transferred : " + bytesTransferred
      + " bytes requested for transfer : " + math.min(size, sizeInBytes))
    bytesTransferred
  }

  /**
   * Get a shallow iterator over the messages in the set.
   */
  override def iterator() = iterator(Int.MaxValue)

  /**
   * Get an iterator over the messages in the set. We only do shallow iteration here.
   * @param maxMessageSize A limit on allowable message size to avoid allocating unbounded memory.
   * If we encounter a message larger than this we throw an InvalidMessageException.
   * @return The iterator.
   */
  def iterator(maxMessageSize: Int): Iterator[MessageAndOffset] = {  //主要构造器.
    new IteratorTemplate[MessageAndOffset] {
      var location = start
      val sizeOffsetBuffer = ByteBuffer.allocate(12)

      override def makeNext(): MessageAndOffset = {
        if(location >= end)
          return allDone()

        // read the size of the item
        sizeOffsetBuffer.rewind()
        channel.read(sizeOffsetBuffer, location)
        if(sizeOffsetBuffer.hasRemaining)
          return allDone()

        sizeOffsetBuffer.rewind()
        val offset = sizeOffsetBuffer.getLong()
        val size = sizeOffsetBuffer.getInt()
        if(size < Message.MinHeaderSize)
          return allDone()
        if(size > maxMessageSize)
          throw new InvalidMessageException("Message size exceeds the largest allowable message size (%d).".format(maxMessageSize))

        // read the item itself
        val buffer = ByteBuffer.allocate(size)
        channel.read(buffer, location + 12)
        if(buffer.hasRemaining)
          return allDone()
        buffer.rewind()

        // increment the location and return the item
        location += size + 12
        new MessageAndOffset(new Message(buffer), offset) //在这里做映射.同ByteBufferMessageSet里的实现方法类似.
      }
    }
  }

  /**
   * The number of bytes taken up by this file set
   */
  def sizeInBytes(): Int = _size.get()

  /**
   * Append these messages to the message set
   */
  def append(messages: ByteBufferMessageSet) {  //追加message的方法.被上层的append方法调用.
    val written = messages.writeTo(channel, 0, messages.sizeInBytes)
    _size.getAndAdd(written)
  }

  /**
   * Commit all written data to the physical disk
   */
  def flush() = {  //上层刷新方法的最终实现.
    channel.force(true)
  }

  /**
   * Close this message set
   */
  def close() {
    flush()
    channel.close()
  }

  /**
   * Delete this message set from the filesystem
   * @return True iff this message set was deleted.
   */
  def delete(): Boolean = {  //上层delete函数的最终实现方法
    Utils.swallow(channel.close()) //关闭内存数据
    file.delete()  //删除文件
  }

  /**
   * Truncate this file message set to the given size in bytes. Note that this API does no checking that the
   * given size falls on a valid message boundary.
   * @param targetSize The size to truncate to.
   * @return The number of bytes truncated off
   */
  def truncateTo(targetSize: Int): Int = {
    val originalSize = sizeInBytes
    if(targetSize > originalSize || targetSize < 0)
      throw new KafkaException("Attempt to truncate log segment to " + targetSize + " bytes failed, " +
                               " size of this log segment is " + originalSize + " bytes.")
    channel.truncate(targetSize)
    channel.position(targetSize)
    _size.set(targetSize)
    originalSize - targetSize
  }

  /**
   * Read from the underlying file into the buffer starting at the given position
   */
  def readInto(buffer: ByteBuffer, relativePosition: Int): ByteBuffer = {
    channel.read(buffer, relativePosition + this.start)
    buffer.flip()
    buffer
  }

  /**
   * Rename the file that backs this message set
   * @return true iff the rename was successful
   */
  def renameTo(f: File): Boolean = {
    val success = this.file.renameTo(f)
    this.file = f
    success
  }

}

object LogFlushStats extends KafkaMetricsGroup {
  val logFlushTimer = new KafkaTimer(newTimer("LogFlushRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS))
}
时间: 2024-07-31 22:02:10

Kafka 源代码分析之FileMessageSet的相关文章

Kafka 源代码分析.

这里记录kafka源代码笔记.(代码版本是0.8.2.1) 这里不再从kafka启动顺序说起.网上已经一堆kafka启动顺序和框架上的文章了.这里不再罗嗦了,主要详细说一下代码细节部分.细节部分会一直读一直补充.如果想看看kafka 框架及启动顺序之类的文章推荐下面这个链接. http://www.cnblogs.com/davidwang456/p/5173486.html 这个链接作者贴上了框架图和代码.比较清晰. 配置文件解析等工作都在KafkaConfig.scala中实现.这个类文件在

Kafka 源代码分析之LogSegment

这里分析kafka LogSegment源代码 通过一步步分析LogManager,Log源代码之后就会发现,最终的log操作都在LogSegment上实现.LogSegment负责分片的读写恢复刷新删除等动作都在这里实现.LogSegment代码同样在源代码目录log下. LogSegment是一个日志分片的操作最小单元.直接作用与messages之上.负责实体消息的读写追加等等. LogSegment实际上是FileMessageSet类的代理类.LogSegment中的所有最终处理都在Fi

Kafka 源代码分析之ByteBufferMessageSet

这里分析一下message的封装类ByteBufferMessageSet类 ByteBufferMessageSet类的源代码在源代码目录message目录下.这个类主要封装了message,messageset,messageandoffset等类的对象.在Log类中读写log的时候基本上都是以这个类的对象为基本操作对象的. 下面看看类的具体代码.首先是初始化部分. class ByteBufferMessageSet(val buffer: ByteBuffer) extends Mess

Kafka 源代码分析之LogManager

这里分析kafka 0.8.2的LogManager logmanager是kafka用来管理log文件的子系统.源代码文件在log目录下. 这里会逐步分析logmanager的源代码.首先看class 初始化部分. private def createLogManager(zkClient: ZkClient, brokerState: BrokerState): LogManager = { //这个函数就是在kafkaServer.start函数里调用的封装函数 val defaultLo

Kafka 源代码分析之Log

这里分析Log对象本身的源代码. Log类是一个topic分区的基础类.一个topic分区的所有基本管理动作.都在这个对象里完成.类源代码文件为Log.scala.在源代码log目录下. Log类是LogSegment的集合和管理封装.首先看看初始化代码. class Log(val dir: File, //log的实例化对象在LogManager分析中已经介绍过.这里可以对照一下. @volatile var config: LogConfig, @volatile var recovery

Kafka 源代码分析之log框架介绍

这里主要介绍log管理,读写相关的类的调用关系的介绍. 在围绕log的实际处理上.有很多层的封装和调用.这里主要介绍一下调用结构和顺序. 首先从LogManager开始. 调用关系简单如下:LogManager->Log->LogSegment->FileMessageSet->ByteBufferMessageSet->MessageSet->Message LogManager作为kafka一个子系统在管理log的工作上必不可少.LogManager通过Log类来为

Kafka SocketServer源代码分析

Kafka SocketServer源代码分析 标签: kafka 本文将详细分析Kafka SocketServer的相关源码. 总体设计 Kafka SocketServer是基于Java NIO来开发的,采用了Reactor的模式,其中包含了1个Acceptor负责接受客户端请求,N个Processor负责读写数据,M个Handler来处理业务逻辑.在Acceptor和Processor,Processor和Handler之间都有队列来缓冲请求. kafka.network.Accepto

Jafka源代码分析——随笔

Kafka是一个分布式的消息中间件,可以粗略的将其划分为三部分:Producer.Broker和Consumer.其中,Producer负责产生消息并负责将消息发送给Kafka:Broker可以简单的理解为Kafka集群中的每一台机器,其负责完成消息队列的主要功能(接收消息.消息的持久化存储.为Consumer提供消息.消息清理.....):Consumer从Broker获取消息并进行后续的操作.每个broker会有一个ID标识,该标识由人工在配置文件中配置. Kafka中的消息隶属于topic

Jafka源代码分析——LogManager

在Kafka中,LogManager负责管理broker上所有的Log(每一个topic-partition为一个Log).通过阅读源代码可知其具体完成的功能如下: 1. 按照预设规则对消息队列进行清理. 2. 按照预设规则对消息队列进行持久化(flush操作). 3. 连接ZooKeeper进行broker.topic.partition相关的ZooKeeper操作. 4. 管理broker上所有的Log. 下面一一对这些功能的实现进行详细的解析. 一.对于Log的管理 LogManager包