【Java】【Flume】Flume-NG启动过程源码分析(一)

从bin/flume 这个shell脚本可以看到Flume的起始于org.apache.flume.node.Application类,这是flume的main函数所在。

  main方法首先会先解析shell命令,如果指定的配置文件不存在就甩出异常。

  根据命令中含有"no-reload-conf"参数,决定采用那种加载配置文件方式:一、没有此参数,会动态加载配置文件,默认每30秒加载一次配置文件,因此可以动态修改配置文件;二、有此参数,则只在启动时加载一次配置文件。实现动态加载功能采用了发布订阅模式,使用guava中的EventBus实现。

     EventBus eventBus = new EventBus(agentName + "-event-bus");
        PollingPropertiesFileConfigurationProvider configurationProvider =
            new PollingPropertiesFileConfigurationProvider(agentName,
                configurationFile, eventBus, 30); //这里是发布事件的类,这里的30则是动态加载配置文件时间间隔,单位是s
        components.add(configurationProvider);
        application = new Application(components);
        eventBus.register(application);    //将订阅类注册到Bus中

  订阅类是application = new Application(components);发布代码在PollingPropertiesFileConfigurationProvider中的FileWatcherRunnable.run方法中。在这只是先构建一个PollingPropertiesFileConfigurationProvider对象,PollingPropertiesFileConfigurationProvider
extends  PropertiesFileConfigurationProvider implements LifecycleAware,继续跟踪PropertiesFileConfigurationProvider extends AbstractConfigurationProvider,再跟踪AbstractConfigurationProvider implements  ConfigurationProvider可以看到这些类的构造方法都是初始化,AbstractConfigurationProvid的构造方法初始化了sink、channel、source的工厂类。

  Application.handleConfigurationEvent(MaterializedConfiguration conf)有@Subscribe注解,是订阅方法,当eventBus.post(MaterializedConfiguration
conf)执行时,会触发执行handleConfigurationEvent方法。

  new Application(components)时,会构建一个对象supervisor = new LifecycleSupervisor()会启动10个线程用来执行配置文件中的各个组件,并监控组件的整个运行过程。

  application.start()方法会启动配置文件的加载过程supervisor.supervise(component, new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START); //LifecycleState.START开始运行,在这的component就是上面的PollingPropertiesFileConfigurationProvider对象。supervise方法会对component创建一个MonitorRunnable进程,并放入默认有10个线程的monitorService去执行

    Supervisoree process = new Supervisoree();
    process.status = new Status();

    process.policy = policy;
    process.status.desiredState = desiredState;
    process.status.error = false;

    MonitorRunnable monitorRunnable = new MonitorRunnable();
    monitorRunnable.lifecycleAware = lifecycleAware;//组件
    monitorRunnable.supervisoree = process;
    monitorRunnable.monitorService = monitorService;

    supervisedProcesses.put(lifecycleAware, process);
    //创建并执行一个在给定初始延迟后首次启用的定期操作,随后,在每一次执行终止和下一次执行开始之间都存在给定的延迟。如果任务的任一执行遇到异常,就会取消后续执行。
    ScheduledFuture<?> future = monitorService.scheduleWithFixedDelay(
        monitorRunnable, 0, 3, TimeUnit.SECONDS);  //启动MonitorRunnable,结束之后3秒再重新启动,可以用于重试
    monitorFutures.put(lifecycleAware, future);

  看MonitorRunnable类,其run方法主要是根据supervisoree.status.desiredState的值执行对应的操作。这里的lifecycleAware就是上面supervise方法中的component,lifecycleAware在构造之初将lifecycleState=IDLE,application.start()方法通过supervisor.supervise方法将supervisoree.status.desiredState=START。所以在run方法中会执行lifecycleAware.start(),也就是PollingPropertiesFileConfigurationProvider.start()方法。

  PollingPropertiesFileConfigurationProvider.start()方法会启动一个单线程FileWatcherRunnable每隔30s去加载一次配置文件(如果配置文件有修改):eventBus.post(getConfiguration())。getConfiguration()是AbstractConfigurationProvider.getConfiguration()这个方法解析了配置文件获取了所有组件及其配置属性。这个方法较为复杂,放在后续再讲解。

  待eventBus.post(getConfiguration())之后会触发Application.handleConfigurationEvent方法:

  @Subscribe
  public synchronized void handleConfigurationEvent(MaterializedConfiguration conf) {
    stopAllComponents();
    startAllComponents(conf);
  }

  stopAllComponents()方法会依次stop各个组件的运行,顺序是:source、sink、channel。之所以有顺序是因为:一、source是不停的读数据放入channel的;二、sink是不停的从channel拿数据的,channel两头都在使用应该最后停止,停止向channel发送数据后sink停止才不会丢数据。stop是通过supervisor.unsupervise方法来完成的。

  startAllComponents(conf)是启动各个组件的,顺序正好和stopAllComponents()停止顺序相反,相信大伙很容易理解。是通过supervisor.supervise启动组件的。另外需要注意的是启动channel组件后需要等待一定时间,是为了让所有channel全部启动。

  另外为什么要先stop再start呢?因为考虑到要动态加载配置文件啊,加载配置文件后就需要重新启动所有组件,所以先停止所有的,再重新启动所有的。

  main方法的最后还有一个钩子函数Runtime.getRuntime().addShutdownHook,主要是用来进行内存清理、对象销毁等操作。

【Java】【Flume】Flume-NG启动过程源码分析(一),布布扣,bubuko.com

时间: 2024-08-02 06:51:59

【Java】【Flume】Flume-NG启动过程源码分析(一)的相关文章

【Java】【Flume】Flume-NG启动过程源码分析(二)

本节分析配置文件的解析,即PollingPropertiesFileConfigurationProvider.FileWatcherRunnable.run中的eventBus.post(getConfiguration()).分析getConfiguration()方法.此方法在AbstractConfigurationProvider类中实现了,并且这个类也初始化了三大组件的工厂类:this.sourceFactory = new DefaultSourceFactory();this.s

【Java】【Flume】Flume-NG启动过程源码分析(三)

本篇分析加载配置文件后各个组件是如何运行的? 加载完配置文件订阅者Application类会收到订阅信息执行: @Subscribe public synchronized void handleConfigurationEvent(MaterializedConfiguration conf) { stopAllComponents(); startAllComponents(conf); } MaterializedConfiguration conf就是getConfiguration()

Flume-NG启动过程源码分析(三)(原创)

上一篇文章分析了Flume如何加载配置文件的,动态加载也只是重复运行getConfiguration(). 本篇分析加载配置文件后各个组件是如何运行的? 加载完配置文件订阅者Application类会收到订阅信息执行: @Subscribe public synchronized void handleConfigurationEvent(MaterializedConfiguration conf) { stopAllComponents(); startAllComponents(conf)

Activity启动流程源码分析之Launcher启动(二)

1.前述 在前一篇文章中我们简要的介绍Activity的启动流程Activity启动流程源码分析之入门(一),当时只是简单的分析了一下流程,而且在上一篇博客中我们也说了Activity的两种启动方式,现在我们就来分析其中的第一种方式--Launcher启动,这种启动方式的特点是会创建一个新的进程来加载相应的Activity(基于Android5.1源码). 2.Activity启动流程时序图 好啦,接下来我们先看一下Launcher启动Activity的时序图: 好啦,接下来我们将上述时序图用代

Android系统默认Home应用程序(Launcher)的启动过程源码分析

在前面一篇文章中,我们分析了Android系统在启动时安装应用程序的过程,这些应用程序安装好之后,还须要有一个Home应用程序来负责把它们在桌面上展示出来,在Android系统中,这个默认的Home应用程序就是Launcher了,本文将详细分析Launcher应用程序的启动过程. Android系统的Home应用程序Launcher是由ActivityManagerService启动的,而ActivityManagerService和PackageManagerService一样,都是在开机时由

嵌入式linux开发uboot移植(三)——uboot启动过程源码分析

嵌入式linux开发uboot移植(三)--uboot启动过程源码分析 一.uboot启动流程简介 与大多数BootLoader一样,uboot的启动过程分为BL1和BL2两个阶段.BL1阶段通常是开发板的配置等设备初始化代码,需要依赖依赖于SoC体系结构,通常用汇编语言来实现:BL2阶段主要是对外部设备如网卡.Flash等的初始化以及uboot命令集等的自身实现,通常用C语言来实现. 1.BL1阶段 uboot的BL1阶段代码通常放在start.s文件中,用汇编语言实现,其主要代码功能如下:

【Android】应用程序启动过程源码分析

在Android系统中,应用程序是由Activity组成的,因此,应用程序的启动过程实际上就是应用程序中的默认Activity的启动过程,本文将详细分析应用程序框架层的源代码,了解Android应用程序的启动过程. 启动Android应用程序中的Activity的两种情景:其中,在手机屏幕中点击应用程序图标的情景就会引发Android应用程序中的默认Activity的启动,从而把应用程序启动起来.这种启动方式的特点是会启动一个新的进程来加载相应的Activity. 这里,我们以这个例子为例来说明

Openstack liberty 中Cinder-api启动过程源码分析2

在前一篇博文中,我分析了cinder-api启动过程中WSGI应用的加载及路由的建立过程,本文将分析WSGI服务器的启动及其接收客户端请求并完成方法映射的过程,请看下文的分析内容: 创建WSGI服务器 在初始化WSGIService对象的过程中,会创建WSGI服务器,如下: #`cinder.service:WSGIService.__init__` def __init__(self, name, loader=None): """ name = `osapi_volume

spark内核揭秘-13-Worker中Executor启动过程源码分析

进入Worker类源码: 可以看出Worker本身是Akka中的一个Actor. 进入Worker类的LaunchExecutor: 从源代码可以看出Worker节点上要分配CPU和Memory给新的Executor,首先需要创建一个ExecutorRunner: ExecutorRunner是用于维护executor进程的: 1.进入ExecutorRunner 的start方法: 1.1.进入fetchAndRunExecutor()方法(核心方法): 2.进入 master ! Execu