这里主要分析FileMessageSet类
这个类主要是管理log消息的内存对象和文件对象的类.源代码文件在log目录下.这个类被LogSegment类代理调用用来管理分片.
下面是完整代码.代码比较简单.就不做过多说明了.这个类是MessageSet抽象类的实现类.
class FileMessageSet private[kafka](@volatile var file: File, private[log] val channel: FileChannel, private[log] val start: Int, private[log] val end: Int, isSlice: Boolean) extends MessageSet with Logging { /* the size of the message set in bytes */ private val _size = if(isSlice) new AtomicInteger(end - start) // don‘t check the file size if this is just a slice view else new AtomicInteger(math.min(channel.size().toInt, end) - start) /* if this is not a slice, update the file pointer to the end of the file */ if (!isSlice) /* set the file position to the last byte in the file */ channel.position(channel.size) /** * Create a file message set with no slicing. */ def this(file: File, channel: FileChannel) = this(file, channel, start = 0, end = Int.MaxValue, isSlice = false) /** * Create a file message set with no slicing */ def this(file: File) = this(file, Utils.openChannel(file, mutable = true)) /** * Create a file message set with mutable option */ def this(file: File, mutable: Boolean) = this(file, Utils.openChannel(file, mutable)) /** * Create a slice view of the file message set that begins and ends at the given byte offsets */ def this(file: File, channel: FileChannel, start: Int, end: Int) = this(file, channel, start, end, isSlice = true) /** * Return a message set which is a view into this set starting from the given position and with the given size limit. * * If the size is beyond the end of the file, the end will be based on the size of the file at the time of the read. * * If this message set is already sliced, the position will be taken relative to that slicing. * * @param position The start position to begin the read from * @param size The number of bytes after the start position to include * * @return A sliced wrapper on this message set limited based on the given position and size */ def read(position: Int, size: Int): FileMessageSet = { //返回读取段对象 if(position < 0) throw new IllegalArgumentException("Invalid position: " + position) if(size < 0) throw new IllegalArgumentException("Invalid size: " + size) new FileMessageSet(file, channel, start = this.start + position, end = math.min(this.start + position + size, sizeInBytes())) } /** * Search forward for the file position of the last offset that is greater than or equal to the target offset * and return its physical position. If no such offsets are found, return null. * @param targetOffset The offset to search for. * @param startingPosition The starting position in the file to begin searching from. */ def searchFor(targetOffset: Long, startingPosition: Int): OffsetPosition = { //搜索读写点的方法 var position = startingPosition val buffer = ByteBuffer.allocate(MessageSet.LogOverhead) val size = sizeInBytes() while(position + MessageSet.LogOverhead < size) { buffer.rewind() channel.read(buffer, position) if(buffer.hasRemaining) throw new IllegalStateException("Failed to read complete buffer for targetOffset %d startPosition %d in %s" .format(targetOffset, startingPosition, file.getAbsolutePath)) buffer.rewind() val offset = buffer.getLong() if(offset >= targetOffset) return OffsetPosition(offset, position) val messageSize = buffer.getInt() if(messageSize < Message.MessageOverhead) throw new IllegalStateException("Invalid message size: " + messageSize) position += MessageSet.LogOverhead + messageSize } null } /** * Write some of this set to the given channel. * @param destChannel The channel to write to. * @param writePosition The position in the message set to begin writing from. * @param size The maximum number of bytes to write * @return The number of bytes actually written. */ def writeTo(destChannel: GatheringByteChannel, writePosition: Long, size: Int): Int = { //主要写方法 // Ensure that the underlying size has not changed. val newSize = math.min(channel.size().toInt, end) - start if (newSize < _size.get()) { throw new KafkaException("Size of FileMessageSet %s has been truncated during write: old size %d, new size %d" .format(file.getAbsolutePath, _size.get(), newSize)) } val bytesTransferred = channel.transferTo(start + writePosition, math.min(size, sizeInBytes), destChannel).toInt trace("FileMessageSet " + file.getAbsolutePath + " : bytes transferred : " + bytesTransferred + " bytes requested for transfer : " + math.min(size, sizeInBytes)) bytesTransferred } /** * Get a shallow iterator over the messages in the set. */ override def iterator() = iterator(Int.MaxValue) /** * Get an iterator over the messages in the set. We only do shallow iteration here. * @param maxMessageSize A limit on allowable message size to avoid allocating unbounded memory. * If we encounter a message larger than this we throw an InvalidMessageException. * @return The iterator. */ def iterator(maxMessageSize: Int): Iterator[MessageAndOffset] = { //主要构造器. new IteratorTemplate[MessageAndOffset] { var location = start val sizeOffsetBuffer = ByteBuffer.allocate(12) override def makeNext(): MessageAndOffset = { if(location >= end) return allDone() // read the size of the item sizeOffsetBuffer.rewind() channel.read(sizeOffsetBuffer, location) if(sizeOffsetBuffer.hasRemaining) return allDone() sizeOffsetBuffer.rewind() val offset = sizeOffsetBuffer.getLong() val size = sizeOffsetBuffer.getInt() if(size < Message.MinHeaderSize) return allDone() if(size > maxMessageSize) throw new InvalidMessageException("Message size exceeds the largest allowable message size (%d).".format(maxMessageSize)) // read the item itself val buffer = ByteBuffer.allocate(size) channel.read(buffer, location + 12) if(buffer.hasRemaining) return allDone() buffer.rewind() // increment the location and return the item location += size + 12 new MessageAndOffset(new Message(buffer), offset) //在这里做映射.同ByteBufferMessageSet里的实现方法类似. } } } /** * The number of bytes taken up by this file set */ def sizeInBytes(): Int = _size.get() /** * Append these messages to the message set */ def append(messages: ByteBufferMessageSet) { //追加message的方法.被上层的append方法调用. val written = messages.writeTo(channel, 0, messages.sizeInBytes) _size.getAndAdd(written) } /** * Commit all written data to the physical disk */ def flush() = { //上层刷新方法的最终实现. channel.force(true) } /** * Close this message set */ def close() { flush() channel.close() } /** * Delete this message set from the filesystem * @return True iff this message set was deleted. */ def delete(): Boolean = { //上层delete函数的最终实现方法 Utils.swallow(channel.close()) //关闭内存数据 file.delete() //删除文件 } /** * Truncate this file message set to the given size in bytes. Note that this API does no checking that the * given size falls on a valid message boundary. * @param targetSize The size to truncate to. * @return The number of bytes truncated off */ def truncateTo(targetSize: Int): Int = { val originalSize = sizeInBytes if(targetSize > originalSize || targetSize < 0) throw new KafkaException("Attempt to truncate log segment to " + targetSize + " bytes failed, " + " size of this log segment is " + originalSize + " bytes.") channel.truncate(targetSize) channel.position(targetSize) _size.set(targetSize) originalSize - targetSize } /** * Read from the underlying file into the buffer starting at the given position */ def readInto(buffer: ByteBuffer, relativePosition: Int): ByteBuffer = { channel.read(buffer, relativePosition + this.start) buffer.flip() buffer } /** * Rename the file that backs this message set * @return true iff the rename was successful */ def renameTo(f: File): Boolean = { val success = this.file.renameTo(f) this.file = f success } } object LogFlushStats extends KafkaMetricsGroup { val logFlushTimer = new KafkaTimer(newTimer("LogFlushRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS)) }
时间: 2024-10-14 00:57:51