这里分析Log对象本身的源代码.
Log类是一个topic分区的基础类.一个topic分区的所有基本管理动作.都在这个对象里完成.类源代码文件为Log.scala.在源代码log目录下.
Log类是LogSegment的集合和管理封装.首先看看初始化代码.
class Log(val dir: File, //log的实例化对象在LogManager分析中已经介绍过.这里可以对照一下. @volatile var config: LogConfig, @volatile var recoveryPoint: Long = 0L, scheduler: Scheduler, time: Time = SystemTime) extends Logging with KafkaMetricsGroup { import kafka.log.Log._ //这里是把同文件下的object加载进来.代码在文件的末尾. /* A lock that guards all modifications to the log */ private val lock = new Object //锁对象 /* last time it was flushed */ private val lastflushedTime = new AtomicLong(time.milliseconds) //最后log刷新到磁盘的时间,这个变量贯穿整个管理过程. /* the actual segments of the log */ //这个对象是这个topic下所有分片的集合.这个集合贯彻整个log管理过程.之后所有动作都依赖此集合. private val segments: ConcurrentNavigableMap[java.lang.Long, LogSegment] = new ConcurrentSkipListMap[java.lang.Long, LogSegment] loadSegments() //将topic所有的分片加载到segments集合了.并做一些topic分片文件检查工作. /* Calculate the offset of the next message */ @volatile var nextOffsetMetadata = new LogOffsetMetadata(activeSegment.nextOffset(), activeSegment.baseOffset, activeSegment.size.toInt) //activeSegment表示当前最后一个分片.因为分片是按大小分布.最大的就是最新的.也就是活跃的分片.这里生成下一个offsetmetadata val topicAndPartition: TopicAndPartition = Log.parseTopicPartitionName(name) //获取topic名称和分区. info("Completed load of log %s with log end offset %d".format(name, logEndOffset)) val tags = Map("topic" -> topicAndPartition.topic, "partition" -> topicAndPartition.partition.toString) //监控度量的映射标签. //下面全是通过metrics做的一些监控. newGauge("NumLogSegments", new Gauge[Int] { def value = numberOfSegments }, tags) newGauge("LogStartOffset", new Gauge[Long] { def value = logStartOffset }, tags) newGauge("LogEndOffset", new Gauge[Long] { def value = logEndOffset }, tags) newGauge("Size", new Gauge[Long] { def value = size }, tags) /** The name of this log */ def name = dir.getName()
上面是Log class初始化的部分.这个部分最重要的就是声明了几个贯穿全过程的对象,并且将分片文件加载到内存对象中.
下面看看主要的加载函数loadSegments.
private def loadSegments() { // create the log directory if it doesn‘t exist dir.mkdirs() //这里是创建topic目录的.本身的注释也说明了这个. // first do a pass through the files in the log directory and remove any temporary files // and complete any interrupted swap operations for(file <- dir.listFiles if file.isFile) { //这个for循环是用来检查分片是否是要被清除或者删除的. if(!file.canRead) throw new IOException("Could not read file " + file) val filename = file.getName if(filename.endsWith(DeletedFileSuffix) || filename.endsWith(CleanedFileSuffix)) { // if the file ends in .deleted or .cleaned, delete it file.delete() } else if(filename.endsWith(SwapFileSuffix)) { //这里检查是不是有swap文件存在.根据不同情况删除或重命名swap文件. // we crashed in the middle of a swap operation, to recover: // if a log, swap it in and delete the .index file // if an index just delete it, it will be rebuilt val baseName = new File(Utils.replaceSuffix(file.getPath, SwapFileSuffix, "")) if(baseName.getPath.endsWith(IndexFileSuffix)) { file.delete() } else if(baseName.getPath.endsWith(LogFileSuffix)){ // delete the index val index = new File(Utils.replaceSuffix(baseName.getPath, LogFileSuffix, IndexFileSuffix)) index.delete() // complete the swap operation val renamed = file.renameTo(baseName) if(renamed) info("Found log file %s from interrupted swap operation, repairing.".format(file.getPath)) else throw new KafkaException("Failed to rename file %s.".format(file.getPath)) } } } // now do a second pass and load all the .log and .index files for(file <- dir.listFiles if file.isFile) { //这个for循环是加载和检查log分片是否存在的. val filename = file.getName if(filename.endsWith(IndexFileSuffix)) { // if it is an index file, make sure it has a corresponding .log file val logFile = new File(file.getAbsolutePath.replace(IndexFileSuffix, LogFileSuffix)) if(!logFile.exists) { //这里是如果只有index文件没有对应的log文件.就把index文件清理掉. warn("Found an orphaned index file, %s, with no corresponding log file.".format(file.getAbsolutePath)) file.delete() } } else if(filename.endsWith(LogFileSuffix)) { //这里是创建LogSegment对象的地方. // if its a log file, load the corresponding log segment val start = filename.substring(0, filename.length - LogFileSuffix.length).toLong val hasIndex = Log.indexFilename(dir, start).exists //确认对应的index文件是否存在. val segment = new LogSegment(dir = dir, startOffset = start, indexIntervalBytes = config.indexInterval, maxIndexSize = config.maxIndexSize, rollJitterMs = config.randomSegmentJitter, time = time) if(!hasIndex) { error("Could not find index file corresponding to log file %s, rebuilding index...".format(segment.log.file.getAbsolutePath)) segment.recover(config.maxMessageSize) //对应log文件的index不存在的话,进行recover.这个地方就是平常碰见kafka index出错需要重新建立的时候管理员删除了对应的index会引起的动作. } segments.put(start, segment) //将segment对象添加到总集里. } } if(logSegments.size == 0) { //这里判断是否是一个新的topic分区.尚不存在分片文件.所以创建一个空的分片文件对象. // no existing segments, create a new mutable segment beginning at offset 0 segments.put(0L, new LogSegment(dir = dir, startOffset = 0, indexIntervalBytes = config.indexInterval, maxIndexSize = config.maxIndexSize, rollJitterMs = config.randomSegmentJitter, time = time)) } else { recoverLog() //这里是topic分片不为空的话.就为检查点设置新offset值. // reset the index size of the currently active log segment to allow more entries activeSegment.index.resize(config.maxIndexSize) } // sanity check the index file of every segment to ensure we don‘t proceed with a corrupt segment for (s <- logSegments) s.index.sanityCheck() //index文件检查. }
看看recoverLog是做了哪些工作.
private def recoverLog() { // if we have the clean shutdown marker, skip recovery if(hasCleanShutdownFile) { //看看是否有cleanshutdownfile存在.hasCleanShutdownFile函数就是判断这个文件存不存在 this.recoveryPoint = activeSegment.nextOffset //存在则直接把恢复检查点设置成最后一个分片的最新offset值 return } // okay we need to actually recovery this log val unflushed = logSegments(this.recoveryPoint, Long.MaxValue).iterator //这个是获取检查点到最大值之间是否还有其他的分片.也就是检查检查点是不是就是最后一个分片文件. while(unflushed.hasNext) { //如果不是最后一个分片.就获取这个分片.然后调用这个对象的recover函数如果函数返回错误就删除这个分片. val curr = unflushed.next info("Recovering unflushed segment %d in log %s.".format(curr.baseOffset, name)) val truncatedBytes = try { curr.recover(config.maxMessageSize) } catch { case e: InvalidOffsetException => val startOffset = curr.baseOffset warn("Found invalid offset during recovery for log " + dir.getName +". Deleting the corrupt segment and " + "creating an empty one with starting offset " + startOffset) curr.truncateTo(startOffset) } if(truncatedBytes > 0) { // we had an invalid message, delete all remaining log warn("Corruption found in segment %d of log %s, truncating to offset %d.".format(curr.baseOffset, name, curr.nextOffset)) unflushed.foreach(deleteSegment) } } }
这个函数的处理动作.包装的是LogSegment对同名对象.LogSegment的分析会在后续的部分继续分析.
时间: 2024-11-03 22:42:24