【Java】【Flume】Flume-NG启动过程源代码分析(一)

从bin/flume 这个shell脚本能够看到Flume的起始于org.apache.flume.node.Application类,这是flume的main函数所在。

  main方法首先会先解析shell命令,假设指定的配置文件不存在就甩出异常。

  依据命令中含有"no-reload-conf"參数,决定採用那种载入配置文件方式:一、没有此參数。会动态载入配置文件,默认每30秒载入一次配置文件,因此能够动态改动配置文件。二、有此參数,则仅仅在启动时载入一次配置文件。

实现动态载入功能採用了公布订阅模式,使用guava中的EventBus实现。

     EventBus eventBus = new EventBus(agentName + "-event-bus");
        PollingPropertiesFileConfigurationProvider configurationProvider =
            new PollingPropertiesFileConfigurationProvider(agentName,
                configurationFile, eventBus, 30); //这里是公布事件的类,这里的30则是动态载入配置文件时间间隔,单位是s
        components.add(configurationProvider);
        application = new Application(components);
        eventBus.register(application);    //将订阅类注冊到Bus中

  订阅类是application = new Application(components);公布代码在PollingPropertiesFileConfigurationProvider中的FileWatcherRunnable.run方法中。

在这仅仅是先构建一个PollingPropertiesFileConfigurationProvider对象,PollingPropertiesFileConfigurationProvider
extends  PropertiesFileConfigurationProvider implements LifecycleAware,继续跟踪PropertiesFileConfigurationProvider extends AbstractConfigurationProvider。再跟踪AbstractConfigurationProvider implements  ConfigurationProvider能够看到这些类的构造方法都是初始化。AbstractConfigurationProvid的构造方法初始化了sink、channel、source的工厂类。

  Application.handleConfigurationEvent(MaterializedConfiguration conf)有@Subscribe注解,是订阅方法,当eventBus.post(MaterializedConfiguration
conf)运行时,会触发运行handleConfigurationEvent方法。

  new Application(components)时,会构建一个对象supervisor = new LifecycleSupervisor()会启动10个线程用来执行配置文件里的各个组件,并监控组件的整个执行过程。

  application.start()方法会启动配置文件的载入过程supervisor.supervise(component, new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START); //LifecycleState.START開始执行。在这的component就是上面的PollingPropertiesFileConfigurationProvider对象。supervise方法会对component创建一个MonitorRunnable进程。并放入默认有10个线程的monitorService去执行

    Supervisoree process = new Supervisoree();
    process.status = new Status();

    process.policy = policy;
    process.status.desiredState = desiredState;
    process.status.error = false;

    MonitorRunnable monitorRunnable = new MonitorRunnable();
    monitorRunnable.lifecycleAware = lifecycleAware;//组件
    monitorRunnable.supervisoree = process;
    monitorRunnable.monitorService = monitorService;

    supervisedProcesses.put(lifecycleAware, process);
    //创建并运行一个在给定初始延迟后首次启用的定期操作。随后。在每一次运行终止和下一次运行開始之间都存在给定的延迟。

假设任务的任一运行遇到异常,就会取消兴许运行。
    ScheduledFuture<?

> future = monitorService.scheduleWithFixedDelay(
        monitorRunnable, 0, 3, TimeUnit.SECONDS);  //启动MonitorRunnable,结束之后3秒再又一次启动,能够用于重试
    monitorFutures.put(lifecycleAware, future);

  看MonitorRunnable类。其run方法主要是依据supervisoree.status.desiredState的值运行相应的操作。

这里的lifecycleAware就是上面supervise方法中的component,lifecycleAware在构造之初将lifecycleState=IDLE,application.start()方法通过supervisor.supervise方法将supervisoree.status.desiredState=START。所以在run方法中会运行lifecycleAware.start(),也就是PollingPropertiesFileConfigurationProvider.start()方法。

  PollingPropertiesFileConfigurationProvider.start()方法会启动一个单线程FileWatcherRunnable每隔30s去载入一次配置文件(假设配置文件有改动):eventBus.post(getConfiguration())。getConfiguration()是AbstractConfigurationProvider.getConfiguration()这种方法解析了配置文件获取了全部组件及其配置属性。这种方法较为复杂。放在兴许再解说。

  待eventBus.post(getConfiguration())之后会触发Application.handleConfigurationEvent方法:

  @Subscribe
  public synchronized void handleConfigurationEvent(MaterializedConfiguration conf) {
    stopAllComponents();
    startAllComponents(conf);
  }

  stopAllComponents()方法会依次stop各个组件的执行。顺序是:source、sink、channel。

之所以有顺序是由于:一、source是不停的读数据放入channel的;二、sink是不停的从channel拿数据的。channel两头都在使用应该最后停止,停止向channel发送数据后sink停止才不会丢数据。stop是通过supervisor.unsupervise方法来完毕的。

  startAllComponents(conf)是启动各个组件的,顺序正好和stopAllComponents()停止顺序相反。相信大伙非常easy理解。是通过supervisor.supervise启动组件的。另外须要注意的是启动channel组件后须要等待一定时间,是为了让所有channel所有启动。

  另外为什么要先stop再start呢?由于考虑到要动态载入配置文件啊。载入配置文件后就须要又一次启动全部组件。所以先停止全部的,再又一次启动全部的。

  main方法的最后另一个钩子函数Runtime.getRuntime().addShutdownHook,主要是用来进行内存清理、对象销毁等操作。

时间: 2024-11-05 16:12:22

【Java】【Flume】Flume-NG启动过程源代码分析(一)的相关文章

Android Content Provider的启动过程源代码分析

本文参考Android应用程序组件Content Provider的启动过程源代码分析http://blog.csdn.net/luoshengyang/article/details/6963418和<Android系统源代码情景分析>,作者罗升阳. 0.总图流程图如下: 1.MainActivity进程向ActivityServiceManager主线程发送GET_CONTENT_PORVIDER_TRANSACTION 如下图: 如图:第一步 ~/Android/frameworks/b

Android系统默认Home应用程序(Launcher)的启动过程源代码分析

在前面一篇文章中,我们分析了Android系统在启动时安装应用程序的过程,这些应用程序安装好之后,还需要有一个 Home应用程序来负责把它们在桌面上展示出来,在Android系统中,这个默认的Home应用程序就是Launcher了,本文将详细分析 Launcher应用程序的启动过程. Android系统的Home应用程序Launcher是由ActivityManagerService启动的,而 ActivityManagerService和PackageManagerService一样,都是在开

Android应用程序组件Content Provider的启动过程源代码分析

文章转载至CSDN社区罗升阳的安卓之旅,原文地址:http://blog.csdn.net/luoshengyang/article/details/6963418 通过前面的学习,我们知道在Android系统中,Content Provider可以为不同的应用程序访问相同的数据提供统一的入口.Content Provider一般是运行在独立的进程中的,每一个Content Provider在系统中只有一个实例存在,其它应用程序首先要找到这个实例,然后才能访问它的数据.那么,系统中的Conten

Android应用程序启动过程源代码分析

文章转载至CSDN社区罗升阳的安卓之旅,原文地址:http://blog.csdn.net/luoshengyang/article/details/6689748 前文简要介绍了Android应用程序的Activity的启动过程.在Android系统中,应用程序是由Activity组成的,因此,应用程 序的启动过程实际上就是应用程序中的默认Activity的启动过程,本文将详细分析应用程序框架层的源代码,了解Android应用程序的启动过程. 在上一篇文章Android应用程序的Activit

Android系统进程间通信(IPC)机制Binder中的Server启动过程源代码分析

文章转载至CSDN社区罗升阳的安卓之旅,原文地址:http://blog.csdn.net/luoshengyang/article/details/6629298 在前面一篇文章浅谈Android系统进程间通信(IPC)机制Binder中的Server和Client获得Service Manager接口之路中, 介绍了在Android系统中Binder进程间通信机制中的Server角色是如何获得Service Manager远程接口的,即defaultServiceManager函数的实现.S

Android系统进程间通信(IPC)机制Binder中的Client获得Server远程接口过程源代码分析

文章转载至CSDN社区罗升阳的安卓之旅,原文地址:http://blog.csdn.net/luoshengyang/article/details/6633311 在上一篇文章中,我 们分析了Android系统进程间通信机制Binder中的Server在启动过程使用Service Manager的addService接口把自己添加到Service Manager守护过程中接受管理.在这一篇文章中,我们将深入到Binder驱动程序源代码去分析Client是如何通过Service Manager的

Android应用程序绑定服务(bindService)的过程源代码分析

文章转载至CSDN社区罗升阳的安卓之旅,原文地址:http://blog.csdn.net/luoshengyang/article/details/6745181 Android应用程序组件Service与Activity一样,既可以在新的进程中启动,也可以在应用程序进程内部 启动:前面我们已经分析了在新的进程中启动Service的过程,本文将要介绍在应用程序内部绑定Service的过程,这是一种在应用程序进程内部启动 Service的方法. 在前面一篇文章Android进程间通信(IPC)机

struts2请求过程源代码分析

struts2请求过程源代码分析 Struts2是Struts社区和WebWork社区的共同成果.我们甚至能够说,Struts2是WebWork的升级版.他採用的正是WebWork的核心,所以.Struts2并非一个不成熟的产品,相反.构建在WebWork基础之上的Struts2是一个执行稳定.性能优异.设计成熟的WEB框架. 我这里的struts2源代码是从官网下载的一个最新的struts-2.3.15.1-src.zip.将其解压就可以. 里面的文件夹页文件很的多,我们仅仅须要定位到stru

Android --- Zygote和System进程启动过程简要分析

Android --- Zygote和System进程启动过程简要分析 在看过<Android情景源代码>的Zygote启动章节后,作如下简要总结.Zygote进程在init进程启动过程中被以service服务的形式启动: service zygote /system/bin/app_process -Xzygote /system/bin --zygote --start-system-server class main socket zygote stream 660 root syste