flume自动reload配置的源码分析

在1.5.0的flume版本中开始提供这个功能,判断配置文件的更新时间戳来reload服务
原理:
1)在启动中使用EventBus.register注册Application对象,同时Application有一个Subscribe的方法handleConfigurationEvent(参数是MaterializedConfiguration对象)
2)定义了一个计划任务线程池,检测到文件更新情况(判断文件的更新时间)
3)如果检测到文件有更新会使用EventBus.post方法发送这个event(MaterializedConfiguration对象)
4)调用Application.handleConfigurationEvent重启各个组件

下面看具体实现:
在org.apache.flume.node.Application的main方法中:
支持reload时

        EventBus eventBus = new EventBus(agentName + "-event-bus" ); //实例化一个EventBus对象
        PollingPropertiesFileConfigurationProvider configurationProvider =
            new PollingPropertiesFileConfigurationProvider(agentName,
                configurationFile, eventBus, 30); //默认的检测文件是否更新的interval时间为30s
        components.add(configurationProvider); //添加到启动列表中,在start方法中会启动PollingPropertiesFileConfigurationProvider 的计划任务线程池
        application = new Application(components);
        eventBus.register(application); //向EventBus中注册Application对象,让Application对象作为时间的监听者

org.apache.flume.node.PollingPropertiesFileConfigurationProvider //扩展了PropertiesFileConfigurationProvider类并实现了LifecycleAware接口
PollingPropertiesFileConfigurationProvider是一个有生命周期概念的监控配置更改的服务:
start方法:

  public void start() {
....
    executorService = Executors.newSingleThreadScheduledExecutor(
            new ThreadFactoryBuilder().setNameFormat("conf-file-poller-%d" )
                .build()); //定义一个单线程的任务调度池
    FileWatcherRunnable fileWatcherRunnable =
        new FileWatcherRunnable(file , counterGroup ); //构建一个FileWatcherRunnable服务对象
    executorService.scheduleWithFixedDelay(fileWatcherRunnable, 0, interval,
        TimeUnit. SECONDS); //以interval为时间间隔运行FileWatcherRunnable
    lifecycleState = LifecycleState. START; // 设置为START状态
...
  }

FileWatcherRunnable是一个实现了Runnable 接口的线程类:
其主要的run方法

    public void run() {
...
      long lastModified = file.lastModified(); //调用File.lastModified获取文件的上一次更新时的时间戳
      if (lastModified > lastChange) { //初始lastChange 为0
...
        lastChange = lastModified; // 设置本次的lastChange 为更新时间,如果下一次更新文件,lastModified 会大于这个lastChange 
        try {
          eventBus.post(getConfiguration());
        } 
.....
      }
    }
  }

在检测到更新后调用eventBus.post(getConfiguration())向监听者发送信息,这里为Application对象,监听者调用注释为Subscribe的方法处理信息:

@Subscribe
public synchronized void handleConfigurationEvent(MaterializedConfiguration conf) { //reload服务
  stopAllComponents();
  startAllComponents(conf);
}
时间: 2024-08-03 20:10:59

flume自动reload配置的源码分析的相关文章

【Flume】flume中拦截器的源码分析,以TimestampInterceptor为例

本文将以TimestampInterceptor为例来分析一下flume中拦截器的工作原理 首先来看下改拦截器的实现结构 1.实现了Interceptor接口 该接口的方法定义如下: public void initialize(); public Event intercept(Event event); public List<Event> intercept(List<Event> events); public void close(); /** Builder imple

(五)myBatis架构以及SQlSessionFactory,SqlSession,通过代理执行crud源码分析---待更

MyBatis架构 首先MyBatis大致上可以分为四层: 1.接口层:这个比较容易理解,就是指MyBatis暴露给我们的各种方法,配置,可以理解为你import进来的各种类.,告诉用户你可以干什么 2.数据处理层:顾名思义对数据的处理,当接收到一个sql语句时,比如 selecr *from person where id=#{id};  会进行这四步:参数处理---sql解析---sql执行----处理结果,这里我们重点关心sql的执行 3.框架支撑层:一些辅助操作,缓存机制,事务管理,连接

SpringMVC关于json、xml自动转换的原理研究[附带源码分析 --转

SpringMVC关于json.xml自动转换的原理研究[附带源码分析] 原文地址:http://www.cnblogs.com/fangjian0423/p/springMVC-xml-json-convert.html 目录 前言 现象 源码分析 实例讲解 关于配置 总结 参考资料 前言 SpringMVC是目前主流的Web MVC框架之一. 如果有同学对它不熟悉,那么请参考它的入门blog:http://www.cnblogs.com/fangjian0423/p/springMVC-in

SpringMVC关于json、xml自动转换的原理研究[附带源码分析]

本文讨论SpringMVC关于json.xml自动转换的原理. 实现这个功能只需要三个配置 1.springmvc配置文件 dispatcher-servlet.xml中的关键配置如下 <mvc:resources location="/static/" mapping="/static/**"/> <!-- 配置包扫描器 --> <context:component-scan base-package="com.winner

Flume 实战(2)--Flume-ng-sdk源码分析

具体参考: 官方用户手册和开发指南 http://flume.apache.org/FlumeDeveloperGuide.html *) 定位和简单例子 1). Flume-ng-sdk是用于编写往flume agent发送数据的client sdk2). 简单示例 RpcClient client = null; try { client = RpcClientFactory.getDefaultInstance("127.0.0.1", 41414); Event event =

Servlet容器Tomcat中web.xml中url-pattern的配置详解[附带源码分析]

前言 今天研究了一下tomcat上web.xml配置文件中url-pattern的问题. 这个问题其实毕业前就困扰着我,当时忙于找工作. 找到工作之后一直忙,也就没时间顾虑这个问题了. 说到底还是自己懒了,没花时间来研究. 今天看了tomcat的部分源码 了解了这个url-pattern的机制.  下面让我一一道来. tomcat的大致结构就不说了, 毕竟自己也不是特别熟悉. 有兴趣的同学请自行查看相关资料. 等有时间了我会来补充这部分的知识的. 想要了解url-pattern的大致配置必须了解

memcached源码分析-----memcached启动参数详解以及关键配置的默认值

转载请注明出处: http://blog.csdn.net/luotuo44/article/details/42672913 本文开启本系列博文的代码分析.本系列博文研究是memcached版本是1.4.21. 本文将给出memcached启动时各个参数的详细解释以及一些关键配置的默认值.以便在分析memcached源码的时候好随时查看.当然也方便使用memcached时可以随时查看各个参数的含义.<如何阅读memcached源码>说到memcached有很多全局变量(也就是关键配置),这些

Spring Boot(3):加载DataSource过程的源码分析及yml中DataSource的配置

Spring Boot实现了自动加载DataSource及相关配置.当然,使用时加上@EnableAutoConfiguration注解是必须的.下面就是对这一部分的源码分析. (1)Spring Boot启动后会调用org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration.下面是部分源码. 1 @Configuration 2 @ConditionalOnClass({ DataSource.class, E

android 从源码分析为什么Listview初次显示时没滚动却自动调用onScroll方法的原因

我们做Listview的分批加载时,需要为Listview调用setOnScrollListener(具体代码可见我上一篇博客) 可是,我们会发现,当运行程序时,listview明明没有滚动,那为什么系统会调用onScroll方法呢?(补充:此时onScrollStateChanged并不会调用) 我们先看setOnScrollListener源码: public void setOnScrollListener(OnScrollListener l) { mOnScrollListener =