这里分析kafka 0.8.2的LogManager
logmanager是kafka用来管理log文件的子系统.源代码文件在log目录下.
这里会逐步分析logmanager的源代码.首先看class 初始化部分.
private def createLogManager(zkClient: ZkClient, brokerState: BrokerState): LogManager = { //这个函数就是在kafkaServer.start函数里调用的封装函数 val defaultLogConfig = LogConfig(segmentSize = config.logSegmentBytes, //创建各种logconfig segmentMs = config.logRollTimeMillis, segmentJitterMs = config.logRollTimeJitterMillis, flushInterval = config.logFlushIntervalMessages, flushMs = config.logFlushIntervalMs.toLong, retentionSize = config.logRetentionBytes, retentionMs = config.logRetentionTimeMillis, maxMessageSize = config.messageMaxBytes, maxIndexSize = config.logIndexSizeMaxBytes, indexInterval = config.logIndexIntervalBytes, deleteRetentionMs = config.logCleanerDeleteRetentionMs, fileDeleteDelayMs = config.logDeleteDelayMs, minCleanableRatio = config.logCleanerMinCleanRatio, compact = config.logCleanupPolicy.trim.toLowerCase == "compact") val defaultProps = defaultLogConfig.toProps val configs = AdminUtils.fetchAllTopicConfigs(zkClient).mapValues(LogConfig.fromProps(defaultProps, _)) // read the log configurations from zookeeper val cleanerConfig = CleanerConfig(numThreads = config.logCleanerThreads, //创建压缩log的配置文件 dedupeBufferSize = config.logCleanerDedupeBufferSize, dedupeBufferLoadFactor = config.logCleanerDedupeBufferLoadFactor, ioBufferSize = config.logCleanerIoBufferSize, maxMessageSize = config.messageMaxBytes, maxIoBytesPerSecond = config.logCleanerIoMaxBytesPerSecond, backOffMs = config.logCleanerBackoffMs, enableCleaner = config.logCleanerEnable) //这里就是创建了logmanager的实例对象. new LogManager(logDirs = config.logDirs.map(new File(_)).toArray, //这是logdirs的位置.即配置文件中的logdir topicConfigs = configs, //topic的配置信息,从zookeeper上得到的. defaultConfig = defaultLogConfig, //默认配置信息 cleanerConfig = cleanerConfig, //压缩log配置信息 ioThreads = config.numRecoveryThreadsPerDataDir, //io线程的个数.一般是8 flushCheckMs = config.logFlushSchedulerIntervalMs, //log刷新到磁盘的间隔.一般是10000ms flushCheckpointMs = config.logFlushOffsetCheckpointIntervalMs, //log检查点的间隔.一般是10000ms retentionCheckMs = config.logCleanupIntervalMs, //log 清除的时间间隔.即保留时长.一般是7*24hour scheduler = kafkaScheduler, //这个即kafkaserver.start函数里最早启动和声明的对象.用来做后台任务. brokerState = brokerState, //borker状态 time = time) }
上面这个函数就是kafkaserver里创建logmanager对象的入口.下面看看logmanager本身的初始化部分.
class LogManager(val logDirs: Array[File], //这是class声明部分.可以跟调用部分对照.每个参数的类型都很清楚. val topicConfigs: Map[String, LogConfig], val defaultConfig: LogConfig, val cleanerConfig: CleanerConfig, ioThreads: Int, val flushCheckMs: Long, val flushCheckpointMs: Long, val retentionCheckMs: Long, scheduler: Scheduler, val brokerState: BrokerState, private val time: Time) extends Logging { val RecoveryPointCheckpointFile = "recovery-point-offset-checkpoint" //默认的检查点文件. val LockFile = ".lock" //默认的锁文件.kafka不正常关闭的时候可以看见这个文件未被清理.在logdir下. val InitialTaskDelayMs = 30*1000 //初始任务时常. private val logCreationOrDeletionLock = new Object private val logs = new Pool[TopicAndPartition, Log]() //logs 是后面所有topic对象的总集.之后关于所有log上的操作都是通过logs. createAndValidateLogDirs(logDirs) //创建和验证logdir. private val dirLocks = lockLogDirs(logDirs) //对logdir加锁.创建锁文件.就是上面的lockFile文件. private val recoveryPointCheckpoints = logDirs.map(dir => (dir, new OffsetCheckpoint(new File(dir, RecoveryPointCheckpointFile)))).toMap //为每个log存储路径做一个恢复点检查文件的map集合.数据结构是(存储路径1:File类型->OffsetCheckpoint对象,...),每个存储路径下都有一个恢复文件.文件内容记录按行记录.第一行记录版本,第二行记录所有topic个数,之后的行按"topicname partition lastoffset"的格式记录所有topic的名字,分区,最后的offset.recoveryPointCheckpoints这个对象在之后会用到为每一个topic查询最后的offset用. loadLogs() //将所有存储路径下的log文件创建一组log对象,并put到logs中. // public, so we can access this from kafka.admin.DeleteTopicTest //这一部分就是log归整压缩的功能是否启用.然后创建相应的对象.我从来没用过这个功能. val cleaner: LogCleaner = if(cleanerConfig.enableCleaner) new LogCleaner(cleanerConfig, logDirs, logs, time = time) else null
初始化部分已经介绍完了.下面看看初始化部分用到的具体函数部分.
private def createAndValidateLogDirs(dirs: Seq[File]) { //这个就是初始化部分创建和验证logdir的函数. if(dirs.map(_.getCanonicalPath).toSet.size < dirs.size) //检查logdir的路径是否合法. throw new KafkaException("Duplicate log directory found: " + logDirs.mkString(", ")) for(dir <- dirs) { if(!dir.exists) { //不存在就创建. info("Log directory ‘" + dir.getAbsolutePath + "‘ not found, creating it.") val created = dir.mkdirs() if(!created) throw new KafkaException("Failed to create data directory " + dir.getAbsolutePath) } if(!dir.isDirectory || !dir.canRead) //不是目录或不可读就抛出异常 throw new KafkaException(dir.getAbsolutePath + " is not a readable log directory.") } } /** * Lock all the given directories */ private def lockLogDirs(dirs: Seq[File]): Seq[FileLock] = { dirs.map { dir => val lock = new FileLock(new File(dir, LockFile)) //创建锁文件 if(!lock.tryLock()) //尝试获得锁. throw new KafkaException("Failed to acquire lock on file .lock in " + lock.file.getParentFile.getAbsolutePath + ". A Kafka instance in another process or thread is using this directory.") lock //返回锁. } }
上面两个函数是对目录做的一些检查和枷锁工作.下面的loadlogs函数就是将logdir下所有的日志加载的复杂工作了.
private def loadLogs(): Unit = { info("Loading logs.") val threadPools = mutable.ArrayBuffer.empty[ExecutorService] //初始化了一个线程工厂池,容纳所有存储路径对应的线程池对象. val jobs = mutable.Map.empty[File, Seq[Future[_]]] //jobs用来记录每一个创建log对象的runnable工作结果集. for (dir <- this.logDirs) { //这里开始遍历每一个logdirs下面的log路径.logdirs一般都是"/data1/kafka/logs,/data2/kafka/logs,/data3/kafka/logs"这种数据目录格式.因此dir对应的应该是每一个单独的log存储目录. val pool = Executors.newFixedThreadPool(ioThreads) //这里为每一个存储目录创建一个固定数量的线程池.因为一般为了提高磁盘读写性能都会设置多个磁盘目录.因此这个实际上是为每一个磁盘创建一个固定数量的线程池. threadPools.append(pool) //把创建的线程池加入到线程工厂池里. val cleanShutdownFile = new File(dir, Log.CleanShutdownFile) //这个是获取标志这个log存储目录是否要被恢复的文件对象.Log.CleanShutdownFile在同目录下Log.scala里定义的.值是".kafka_cleanshutdown",这个也会成为下文用来识别每一个存储路径对应的工作线程池的标识. if (cleanShutdownFile.exists) { //这里判断是否跳过恢复这个log存储目录,否则就创建一个新的borkerstate debug( "Found clean shutdown file. " + "Skipping recovery for all logs in data directory: " + dir.getAbsolutePath) } else { // log recovery itself is being performed by `Log` class during initialization brokerState.newState(RecoveringFromUncleanShutdown) } //这个对象是一个Map[TopicAndPartition Long]对象.内容是通过上OffsetCheckpoints里的read方法.将对象存储路径下的恢复文件解析成Map类型,下文会介绍这个类和方法. val recoveryPoints = this.recoveryPointCheckpoints(dir).read //通过for循环来生成工作集. val jobsForDir = for { dirContent <- Option(dir.listFiles).toList logDir <- dirContent if logDir.isDirectory } yield { Utils.runnable { //通过这个方法为每一个topic生成一个runnable类型对象.这个对象的run方法就是这个块. debug("Loading log ‘" + logDir.getName + "‘") //这个对象实现的主要功能就在这里.通过目录名字获得topic名字和分区号.然后为每个分区生成一份默认的配置信息.再通过recoveryPoints对象获得lastoffset val topicPartition = Log.parseTopicPartitionName(logDir.getName) //获得topic信息 val config = topicConfigs.getOrElse(topicPartition.topic, defaultConfig) //生成配置信息 val logRecoveryPoint = recoveryPoints.getOrElse(topicPartition, 0L) //获取最后的offset //根据上面获得到的lastoffset和配置.生成一个log对象.并将这个对象put到logs全局变量中去.供之后操作. val current = new Log(logDir, config, logRecoveryPoint, scheduler, time) //创建topic对应log对象 val previous = this.logs.put(topicPartition, current) //添加到logs全局变量中. if (previous != null) { throw new IllegalArgumentException( "Duplicate log directories found: %s, %s!".format( current.dir.getAbsolutePath, previous.dir.getAbsolutePath)) } } } //最后在这里把上面生成的topic对应的runnable对象放到函数最开始声明的线程池中去执行.并将返回future对象集放到jobs对应的标识中去. jobs(cleanShutdownFile) = jobsForDir.map(pool.submit).toSeq } //函数最后在这里获取所有runnable对象执行结果.并且清理cleanShutdownFile文件,关闭所有执行线程. try { for ((cleanShutdownFile, dirJobs) <- jobs) { dirJobs.foreach(_.get) //获取结果 cleanShutdownFile.delete() //清理对象 } } catch { case e: ExecutionException => { error("There was an error in one of the threads during logs loading: " + e.getCause) throw e.getCause } } finally { threadPools.foreach(_.shutdown()) //关闭所有存储路径对应的线程池. } info("Logs loading complete.") }
上面就是所有topic分区被加载的过程.下面插入一些被这个函数用到的一些关键函数.
class OffsetCheckpoint(val file: File) extends Logging { //这个class就是上面用来恢复log的读写类. private val lock = new Object() new File(file + ".tmp").delete() // try to delete any existing temp files for cleanliness file.createNewFile() // in case the file doesn‘t exist //这个方法是写检查点文件的 具体也比较简单.就不再做过多说明 def write(offsets: Map[TopicAndPartition, Long]) { lock synchronized { // write to temp file and then swap with the existing file val temp = new File(file.getAbsolutePath + ".tmp") val fileOutputStream = new FileOutputStream(temp) val writer = new BufferedWriter(new OutputStreamWriter(fileOutputStream)) try { // write the current version writer.write(0.toString) writer.newLine() // write the number of entries writer.write(offsets.size.toString) writer.newLine() // write the entries offsets.foreach { case (topicPart, offset) => writer.write("%s %d %d".format(topicPart.topic, topicPart.partition, offset)) writer.newLine() } // flush the buffer and then fsync the underlying file writer.flush() fileOutputStream.getFD().sync() } finally { writer.close() } // swap new offset checkpoint file with previous one if(!temp.renameTo(file)) { // renameTo() fails on Windows if the destination file exists. file.delete() if(!temp.renameTo(file)) throw new IOException("File rename from %s to %s failed.".format(temp.getAbsolutePath, file.getAbsolutePath)) } } } //这个就是loadlogs函数中被调用的read函数.用来读取检查点文件的. def read(): Map[TopicAndPartition, Long] = { lock synchronized { val reader = new BufferedReader(new FileReader(file)) try { var line = reader.readLine() if(line == null) return Map.empty val version = line.toInt version match { case 0 => line = reader.readLine() if(line == null) return Map.empty val expectedSize = line.toInt var offsets = Map[TopicAndPartition, Long]() //这个对象就是最后返回的对象. line = reader.readLine() while(line != null) { val pieces = line.split("\\s+") if(pieces.length != 3) throw new IOException("Malformed line in offset checkpoint file: ‘%s‘.".format(line)) val topic = pieces(0) val partition = pieces(1).toInt val offset = pieces(2).toLong offsets += (TopicAndPartition(topic, partition) -> offset) //将解析的每一行都添加到offset里. line = reader.readLine() } if(offsets.size != expectedSize) throw new IOException("Expected %d entries but found only %d".format(expectedSize, offsets.size)) offsets //最后在这里返回. case _ => throw new IOException("Unrecognized version of the highwatermark checkpoint file: " + version) } } finally { reader.close() } } } }
上面这个类是在server目录下.用来恢复和写入检查点文件的.
时间: 2024-10-19 20:18:53