Jafka源代码分析——LogManager

在Kafka中,LogManager负责管理broker上所有的Log(每一个topic-partition为一个Log)。通过阅读源代码可知其具体完成的功能如下:

1. 按照预设规则对消息队列进行清理。

2. 按照预设规则对消息队列进行持久化(flush操作)。

3. 连接ZooKeeper进行broker、topic、partition相关的ZooKeeper操作。

4. 管理broker上所有的Log。

下面一一对这些功能的实现进行详细的解析。

一、对于Log的管理

LogManager包含成员变量logs。logs的key是topic,value是Pool<Integer,Log>(该value又是一个Map,主键是partition,value是该partition所对应的Log)。因此LogManager通过logs保存该broker上所有的消息队列。

private final Pool<String, Pool<Integer, Log>> logs = new Pool<String, 	Pool<Integer, Log>>();

LogManager在初始化之后,需要根据配置文件配置的消息队列根目录进行遍历。通过遍历,查找并生成Log。该遍历的具体实现在方法load中:

① 获取消息队列根目录下的所有文件

② 对于根目录下的每一个文件进行如下操作

1.如果是目录,则有可能是一个Log,否则不是并忽略

2.对于通过1的目录分析其文件名,目录的文件名由两部分组成:topic-partition

3.对于通过2的目录,用目录、解析出的topic、解析出的partition生成Log

4.将3生成的Log放入logs日志池

5.最后,判断目录解析的partition与配置文件中配置的partition的大小,如果配置文件较小,则更新配置

二、消息队列清理

消息队列的清理由Scheduler周期性的调用,具体的调用在load函数中,主要的删除实现在cleanLogs函数中。消息队列的清理分为两种情况:一种是超过预设的时间则删除,二是超过预设的大小则删除,分别对应两个函数cleanupExpiredSegments和cleanupSegmentsToMaintainSize。第一种情况比较简单,因为每一个segment对应一个文件,通过对比文件的lastModifiedTime和系统的现在时间来确定其是否超时,如果超时则删除。对于第二种情况,首先比较Log的大小与配置的大小。如果小于配置的大小则不删除;如果大于了配置的大小,则计算超过配置大小的长度(定为差值);然后将小于该差值的segment删除(这地方有点疑惑,这样删除会不会把一些最新的消息队列给删除了)。

 if (this.scheduler != null) {
 	this.scheduler.scheduleWithRate(new Runnable() {
                public void run() {
                    try {
                        cleanupLogs();
                    } catch (IOException e) {
                        logger.error("cleanup log failed.", e);
                    }
                }
            }, 60 * 1000, logCleanupIntervalMs);
}

三、对于消息队列的持久化

对消息队列的flush操作同样由单独的线程来完成。该线程通过比较Log上一次的flush时间和当前的系统时间来确定是否需要flush,如果需要则持久化到文件。注意,消息的队列的持久化在新增消息的时候也会判断,如果一个Log保存的新增消息的条数超过了预设值则进行flush操作。

在Kafka中,LogManager负责管理broker上所有的Log(每一个topic-partition为一个Log)。通过阅读源代码可知其具体完成的功能如下:

1. 按照预设规则对消息队列进行清理。

2. 按照预设规则对消息队列进行持久化(flush操作)。

3. 连接ZooKeeper进行broker、topic、partition相关的ZooKeeper操作。

4. 管理broker上所有的Log。

下面一一对这些功能的实现进行详细的解析。

一、对于Log的管理

LogManager包含成员变量logs。logs的key是topic,value是Pool<Integer,Log>(该value又是一个Map,主键是partition,value是该partition所对应的Log)。因此LogManager通过logs保存该broker上所有的消息队列。

private final Pool<String, Pool<Integer, Log>> logs = new Pool<String, 	Pool<Integer, Log>>();

LogManager在初始化之后,需要根据配置文件配置的消息队列根目录进行遍历。通过遍历,查找并生成Log。该遍历的具体实现在方法load中:

① 获取消息队列根目录下的所有文件

② 对于根目录下的每一个文件进行如下操作

1.如果是目录,则有可能是一个Log,否则不是并忽略

2.对于通过1的目录分析其文件名,目录的文件名由两部分组成:topic-partition

3.对于通过2的目录,用目录、解析出的topic、解析出的partition生成Log

4.将3生成的Log放入logs日志池

5.最后,判断目录解析的partition与配置文件中配置的partition的大小,如果配置文件较小,则更新配置

二、消息队列清理

消息队列的清理由Scheduler周期性的调用,具体的调用在load函数中,主要的删除实现在cleanLogs函数中。消息队列的清理分为两种情况:一种是超过预设的时间则删除,二是超过预设的大小则删除,分别对应两个函数cleanupExpiredSegments和cleanupSegmentsToMaintainSize。第一种情况比较简单,因为每一个segment对应一个文件,通过对比文件的lastModifiedTime和系统的现在时间来确定其是否超时,如果超时则删除。对于第二种情况,首先比较Log的大小与配置的大小。如果小于配置的大小则不删除;如果大于了配置的大小,则计算超过配置大小的长度(定为差值);然后将小于该差值的segment删除(这地方有点疑惑,这样删除会不会把一些最新的消息队列给删除了)。

 if (this.scheduler != null) {
 	this.scheduler.scheduleWithRate(new Runnable() {
                public void run() {
                    try {
                        cleanupLogs();
                    } catch (IOException e) {
                        logger.error("cleanup log failed.", e);
                    }
                }
            }, 60 * 1000, logCleanupIntervalMs);
}

三、对于消息队列的持久化

对消息队列的flush操作同样由单独的线程来完成。该线程通过比较Log上一次的flush时间和当前的系统时间来确定是否需要flush,如果需要则持久化到文件。注意,消息的队列的持久化在新增消息的时候也会判断,如果一个Log保存的新增消息的条数超过了预设值则进行flush操作。

Jafka源代码分析——LogManager,布布扣,bubuko.com

时间: 2024-08-14 05:33:32

Jafka源代码分析——LogManager的相关文章

Jafka源代码分析——随笔

Kafka是一个分布式的消息中间件,可以粗略的将其划分为三部分:Producer.Broker和Consumer.其中,Producer负责产生消息并负责将消息发送给Kafka:Broker可以简单的理解为Kafka集群中的每一台机器,其负责完成消息队列的主要功能(接收消息.消息的持久化存储.为Consumer提供消息.消息清理.....):Consumer从Broker获取消息并进行后续的操作.每个broker会有一个ID标识,该标识由人工在配置文件中配置. Kafka中的消息隶属于topic

Jafka源代码分析——网络架构

在kafka中,每一个broker都是一个服务器.按照一般理解,服务器就是一个SocketServer,其不断接收用户的请求并进行处理.在Java中进行网络连接有两种方式一种为阻塞模式一种为非阻塞模式.Jafka采用非阻塞模式进行网络通讯.在Java的非阻塞模式中,建立socket server的一般流程如下: 1.启动ServerSocketChannel并将其绑定到特定的端口. 2.将ServerSocketChannel以及其感兴趣的操作注册到Selector,在这里感兴趣的操作是Acce

Jafka源代码分析——Processor

Jafka Acceptor接收到客户端请求并建立连接后,Acceptor会将Socket连接交给Processor进行处理.Processor通过以下的处理步骤进行客户端请求的处理: 1. 读取客户端请求. 2. 根据客户端请求类型的不同,调用相应的处理函数进行处理. Processor读取客户端请求是一个比较有意思的事情,需要考虑两个方面的事情:第一,请求规则(Processor需要按照一定的规则进行请求的解析):第二,如何确定一次请求的读取已经结束(因为是非阻塞连接,非常有可能第一次读操作

Kafka 源代码分析之LogManager

这里分析kafka 0.8.2的LogManager logmanager是kafka用来管理log文件的子系统.源代码文件在log目录下. 这里会逐步分析logmanager的源代码.首先看class 初始化部分. private def createLogManager(zkClient: ZkClient, brokerState: BrokerState): LogManager = { //这个函数就是在kafkaServer.start函数里调用的封装函数 val defaultLo

Kafka 源代码分析之LogSegment

这里分析kafka LogSegment源代码 通过一步步分析LogManager,Log源代码之后就会发现,最终的log操作都在LogSegment上实现.LogSegment负责分片的读写恢复刷新删除等动作都在这里实现.LogSegment代码同样在源代码目录log下. LogSegment是一个日志分片的操作最小单元.直接作用与messages之上.负责实体消息的读写追加等等. LogSegment实际上是FileMessageSet类的代理类.LogSegment中的所有最终处理都在Fi

Java中arraylist和linkedlist源代码分析与性能比較

Java中arraylist和linkedlist源代码分析与性能比較 1,简单介绍 在java开发中比較经常使用的数据结构是arraylist和linkedlist,本文主要从源代码角度分析arraylist和linkedlist的性能. 2,arraylist源代码分析 Arraylist底层的数据结构是一个对象数组.有一个size的成员变量标记数组中元素的个数,例如以下图: * The array buffer into which the elements of the ArrayLis

转:RTMPDump源代码分析

0: 主要函数调用分析 rtmpdump 是一个用来处理 RTMP 流媒体的开源工具包,支持 rtmp://, rtmpt://, rtmpe://, rtmpte://, and rtmps://.也提供 Android 版本. 最近研究了一下它内部函数调用的关系. 下面列出几个主要的函数的调用关系. RTMPDump用于下载RTMP流媒体的函数Download: 用于建立网络连接(NetConnect)的函数Connect: 用于建立网络流(NetStream)的函数 rtmpdump源代码

Kafka SocketServer源代码分析

Kafka SocketServer源代码分析 标签: kafka 本文将详细分析Kafka SocketServer的相关源码. 总体设计 Kafka SocketServer是基于Java NIO来开发的,采用了Reactor的模式,其中包含了1个Acceptor负责接受客户端请求,N个Processor负责读写数据,M个Handler来处理业务逻辑.在Acceptor和Processor,Processor和Handler之间都有队列来缓冲请求. kafka.network.Accepto

pomelo源代码分析(一)

千里之行始于足下,一直说想了解pomelo,对pomelo有兴趣,但一直迟迟没有去碰,尽管对pomelo进行源代码分析,在网络上肯定不止我一个,已经有非常优秀的前辈走在前面,如http://golanger.cn/,在阅读Pomelo代码的时候,已经连载到了11篇了,在我的源代码分析參考了该博客,当然,也会添?我对pomelo的理解,借此希望能提高一下自己对node.js的了解和学习一些优秀的设计. 开发环境:win7 调试环境:webstorm5.0 node.js版本号:v0.8.21 源代