Jafka源码分析——LogManager

在Kafka中,LogManager负责管理broker上全部的Log(每个topic-partition为一个Log)。

通过阅读源码可知其详细完毕的功能例如以下:

1. 依照预设规则对消息队列进行清理。

2. 依照预设规则对消息队列进行持久化(flush操作)。

3. 连接ZooKeeper进行broker、topic、partition相关的ZooKeeper操作。

4. 管理broker上全部的Log。

以下一一对这些功能的实现进行具体的解析。

一、对于Log的管理

LogManager包括成员变量logs。logs的key是topic,value是Pool<Integer,Log>(该value又是一个Map。主键是partition,value是该partition所相应的Log)。因此LogManager通过logs保存该broker上全部的消息队列。

private final Pool<String, Pool<Integer, Log>> logs = new Pool<String, 	Pool<Integer, Log>>();

LogManager在初始化之后。须要依据配置文件配置的消息队列根文件夹进行遍历。

通过遍历,查找并生成Log。该遍历的详细实如今方法load中:

① 获取消息队列根文件夹下的全部文件

② 对于根文件夹下的每个文件进行例如以下操作

1.假设是文件夹。则有可能是一个Log,否则不是并忽略

2.对于通过1的文件夹分析其文件名称,文件夹的文件名称由两部分组成:topic-partition

3.对于通过2的文件夹。用文件夹、解析出的topic、解析出的partition生成Log

4.将3生成的Log放入logs日志池

5.最后,推断文件夹解析的partition与配置文件里配置的partition的大小,假设配置文件较小。则更新配置

watermark/2/text/aHR0cDovL2Jsb2cuY3Nkbi5uZXQvenVvY2hhbnhpYW9oZXNoYW5n/font/5a6L5L2T/fontsize/400/fill/I0JBQkFCMA==/dissolve/70/gravity/SouthEast" >

二、消息队列清理

消息队列的清理由Scheduler周期性的调用,详细的调用在load函数中,基本的删除实如今cleanLogs函数中。

消息队列的清理分为两种情况:一种是超过预设的时间则删除。二是超过预设的大小则删除。分别相应两个函数cleanupExpiredSegments和cleanupSegmentsToMaintainSize。第一种情况比較简单,由于每个segment相应一个文件,通过对照文件的lastModifiedTime和系统的如今时间来确定其是否超时,假设超时则删除。对于另外一种情况,首先比較Log的大小与配置的大小。假设小于配置的大小则不删除。假设大于了配置的大小,则计算超过配置大小的长度(定为差值);然后将小于该差值的segment删除(这地方有点疑惑。这样删除会不会把一些最新的消息队列给删除了)。

 if (this.scheduler != null) {
 	this.scheduler.scheduleWithRate(new Runnable() {
                public void run() {
                    try {
                        cleanupLogs();
                    } catch (IOException e) {
                        logger.error("cleanup log failed.", e);
                    }
                }
            }, 60 * 1000, logCleanupIntervalMs);
}

三、对于消息队列的持久化

对消息队列的flush操作相同由单独的线程来完毕。该线程通过比較Log上一次的flush时间和当前的系统时间来确定是否须要flush。假设须要则持久化到文件。

注意,消息的队列的持久化在新增消息的时候也会推断,假设一个Log保存的新增消息的条数超过了预设值则进行flush操作。

在Kafka中,LogManager负责管理broker上全部的Log(每个topic-partition为一个Log)。通过阅读源码可知其详细完毕的功能例如以下:

1. 依照预设规则对消息队列进行清理。

2. 依照预设规则对消息队列进行持久化(flush操作)。

3. 连接ZooKeeper进行broker、topic、partition相关的ZooKeeper操作。

4. 管理broker上全部的Log。

以下一一对这些功能的实现进行具体的解析。

一、对于Log的管理

LogManager包括成员变量logs。logs的key是topic,value是Pool<Integer,Log>(该value又是一个Map,主键是partition。value是该partition所相应的Log)。

因此LogManager通过logs保存该broker上全部的消息队列。

private final Pool<String, Pool<Integer, Log>> logs = new Pool<String, 	Pool<Integer, Log>>();

LogManager在初始化之后,须要依据配置文件配置的消息队列根文件夹进行遍历。通过遍历,查找并生成Log。该遍历的详细实如今方法load中:

① 获取消息队列根文件夹下的全部文件

② 对于根文件夹下的每个文件进行例如以下操作

1.假设是文件夹。则有可能是一个Log。否则不是并忽略

2.对于通过1的文件夹分析其文件名称,文件夹的文件名称由两部分组成:topic-partition

3.对于通过2的文件夹。用文件夹、解析出的topic、解析出的partition生成Log

4.将3生成的Log放入logs日志池

5.最后。推断文件夹解析的partition与配置文件里配置的partition的大小,假设配置文件较小,则更新配置

二、消息队列清理

消息队列的清理由Scheduler周期性的调用,详细的调用在load函数中。基本的删除实如今cleanLogs函数中。消息队列的清理分为两种情况:一种是超过预设的时间则删除,二是超过预设的大小则删除,分别相应两个函数cleanupExpiredSegments和cleanupSegmentsToMaintainSize。第一种情况比較简单,由于每个segment相应一个文件,通过对照文件的lastModifiedTime和系统的如今时间来确定其是否超时。假设超时则删除。对于另外一种情况。首先比較Log的大小与配置的大小。假设小于配置的大小则不删除;假设大于了配置的大小,则计算超过配置大小的长度(定为差值)。然后将小于该差值的segment删除(这地方有点疑惑。这样删除会不会把一些最新的消息队列给删除了)。

 if (this.scheduler != null) {
 	this.scheduler.scheduleWithRate(new Runnable() {
                public void run() {
                    try {
                        cleanupLogs();
                    } catch (IOException e) {
                        logger.error("cleanup log failed.", e);
                    }
                }
            }, 60 * 1000, logCleanupIntervalMs);
}

三、对于消息队列的持久化

对消息队列的flush操作相同由单独的线程来完毕。该线程通过比較Log上一次的flush时间和当前的系统时间来确定是否须要flush,假设须要则持久化到文件。注意,消息的队列的持久化在新增消息的时候也会推断,假设一个Log保存的新增消息的条数超过了预设值则进行flush操作。

时间: 2024-10-12 14:50:28

Jafka源码分析——LogManager的相关文章

Tomcat7.0源码分析——启动与停止服务

前言 熟悉Tomcat的工程师们,肯定都知道Tomcat是如何启动与停止的.对于startup.sh.startup.bat.shutdown.sh.shutdown.bat等脚本或者批处理命令,大家一定知道改如何使用它,但是它们究竟是如何实现的,尤其是shutdown.sh脚本(或者shutdown.bat)究竟是如何和Tomcat进程通信的呢?本文将通过对Tomcat7.0的源码阅读,深入剖析这一过程. 由于在生产环境中,Tomcat一般部署在Linux系统下,所以本文将以startup.s

Tomcat7.0源码分析——启动与停止服务原理

前言 熟悉Tomcat的工程师们,肯定都知道Tomcat是如何启动与停止的.对于startup.sh.startup.bat.shutdown.sh.shutdown.bat等脚本或者批处理命令,大家一定知道改如何使用它,但是它们究竟是如何实现的,尤其是shutdown.sh脚本(或者shutdown.bat)究竟是如何和Tomcat进程通信的呢?本文将通过对Tomcat7.0的源码阅读,深入剖析这一过程. 由于在生产环境中,Tomcat一般部署在Linux系统下,所以本文将以startup.s

Tomcat源码分析——启动与停止服务

前言 熟悉Tomcat的工程师们,肯定都知道Tomcat是如何启动与停止的.对于startup.sh.startup.bat.shutdown.sh.shutdown.bat等脚本或者批处理命令,大家一定知道改如何使用它,但是它们究竟是如何实现的,尤其是shutdown.sh脚本(或者shutdown.bat)究竟是如何和Tomcat进程通信的呢?本文将通过对Tomcat7.0的源码阅读,深入剖析这一过程. 由于在生产环境中,Tomcat一般部署在Linux系统下,所以本文将以startup.s

3. SOFAJRaft源码分析— 是如何进行选举的?

开篇 在上一篇文章当中,我们讲解了NodeImpl在init方法里面会初始化话的动作,选举也是在这个方法里面进行的,这篇文章来从这个方法里详细讲一下选举的过程. 由于我这里介绍的是如何实现的,所以请大家先看一下原理:SOFAJRaft 选举机制剖析 | SOFAJRaft 实现原理 文章比较长,我也慢慢的写了半个月时间~ 选举过程分析 我在这里只把有关选举的代码列举出来,其他的代码暂且忽略 NodeImpl#init public boolean init(final NodeOptions o

8. SOFAJRaft源码分析— JRaft是如何实现日志复制的?

前言 前几天和腾讯的大佬一起吃饭聊天,说起我对SOFAJRaft的理解,我自然以为我是很懂了的,但是大佬问起了我那SOFAJRaft集群之间的日志是怎么复制的? 我当时哑口无言,说不出是怎么实现的,所以这次来分析一下SOFAJRaft中日志复制是怎么做的. Leader发送探针获取Follower的LastLogIndex Leader 节点在通过 Replicator 和 Follower 建立连接之后,要发送一个 Probe 类型的探针请求,目的是知道 Follower 已经拥有的的日志位置

9. SOFAJRaft源码分析— Follower如何通过Snapshot快速追上Leader日志?

前言 引入快照机制主要是为了解决两个问题: JRaft新节点加入后,如何快速追上最新的数据 Raft 节点出现故障重新启动后如何高效恢复到最新的数据 Snapshot 源码分析 生成 Raft 节点的快照文件 如果用户需开启 SOFAJRaft 的 Snapshot 机制,则需要在其客户端中设置配置参数类 NodeOptions 的"snapshotUri"属性(即为:Snapshot 文件的存储路径),配置该属性后,默认会启动一个定时器任务("JRaft-SnapshotT

TeamTalk源码分析之login_server

login_server是TeamTalk的登录服务器,负责分配一个负载较小的MsgServer给客户端使用,按照新版TeamTalk完整部署教程来配置的话,login_server的服务端口就是8080,客户端登录服务器地址配置如下(这里是win版本客户端): 1.login_server启动流程 login_server的启动是从login_server.cpp中的main函数开始的,login_server.cpp所在工程路径为server\src\login_server.下表是logi

Android触摸屏事件派发机制详解与源码分析二(ViewGroup篇)

1 背景 还记得前一篇<Android触摸屏事件派发机制详解与源码分析一(View篇)>中关于透过源码继续进阶实例验证模块中存在的点击Button却触发了LinearLayout的事件疑惑吗?当时说了,在那一篇咱们只讨论View的触摸事件派发机制,这个疑惑留在了这一篇解释,也就是ViewGroup的事件派发机制. PS:阅读本篇前建议先查看前一篇<Android触摸屏事件派发机制详解与源码分析一(View篇)>,这一篇承接上一篇. 关于View与ViewGroup的区别在前一篇的A

HashMap与TreeMap源码分析

1. 引言     在红黑树--算法导论(15)中学习了红黑树的原理.本来打算自己来试着实现一下,然而在看了JDK(1.8.0)TreeMap的源码后恍然发现原来它就是利用红黑树实现的(很惭愧学了Java这么久,也写过一些小项目,也使用过TreeMap无数次,但到现在才明白它的实现原理).因此本着"不要重复造轮子"的思想,就用这篇博客来记录分析TreeMap源码的过程,也顺便瞅一瞅HashMap. 2. 继承结构 (1) 继承结构 下面是HashMap与TreeMap的继承结构: pu