flume启动过程分析

1.启动命令

nohup bin/flume-ng agent -n agent-server  -f  agent-server1.conf &

flume-ng是一个shell脚本:

  agent                 run a Flume agent  ---> org.apache.flume.node.Application 类
  avro-client           run an avro Flume client ---> org.apache.flume.client.avro.AvroCLIClient 类
run_flume() { #shell脚本实现
  local FLUME_APPLICATION_CLASS
  if [ "$#" -gt 0 ]; then
    FLUME_APPLICATION_CLASS=$1
    shift
  else
    error "Must specify flume application class" 1
  fi
  if [ ${CLEAN_FLAG} -ne 0 ]; then
    set -x
  fi
  $EXEC $JAVA_HOME/bin/java $JAVA_OPTS -cp "$FLUME_CLASSPATH"       -Djava.library.path=$FLUME_JAVA_LIBRARY_PATH "$FLUME_APPLICATION_CLASS" $*
}
......
# finally, invoke the appropriate command
if [ -n "$opt_agent" ] ; then  #如果第一个参数为agent时,opt_agent取值为1
  run_flume $FLUME_AGENT_CLASS $args #FLUME_AGENT_CLASS="org.apache.flume.node.Application"
elif [ -n "$opt_avro_client" ] ; then
  run_flume $FLUME_AVRO_CLIENT_CLASS $args
elif [ -n "${opt_version}" ] ; then
  run_flume $FLUME_VERSION_CLASS $args
elif [ -n "${opt_tool}" ] ; then
  run_flume $FLUME_TOOLS_CLASS $args
else
  error "This message should never appear" 1
fi

最终启动的时候调用org.apache.flume.node.Application类的main方法
2.org.apache.flume.node.Application类
1)调用main方法,首先会解析参数,主要是n和f以及no-reload-conf,n为节点名称,f为配置文件,no-reload-conf代表是否支持自动reload(1.5.0才有的功能)
n/f 都有设置的值,no-reload-conf没有设置的项,如果设置了no-reload-conf代表不能自动reload

      Options options = new Options();
      Option option = new Option("n" , "name" , true, "the name of this agent");
      option.setRequired( true);
      options.addOption(option);
      option = new Option("f" , "conf-file" , true, "specify a conf file");
      option.setRequired( true);
      options.addOption(option);
      option = new Option(null , "no-reload-conf" , false, "do not reload " +
        "conf file if changed");
      options.addOption(option);
....
      CommandLineParser parser = new GnuParser();
      CommandLine commandLine = parser.parse(options, args);
      File configurationFile = new File(commandLine.getOptionValue(‘f‘ ));
      String agentName = commandLine.getOptionValue( ‘n‘);
      boolean reload = !commandLine.hasOption( "no-reload-conf");  //获取是否含有no-reload-conf的设置,如果没有设置no-reload-conf则reload为true

2)

 List<LifecycleAware> components = Lists.newArrayList(); //初始化一个List<LifecycleAware>对象,用来存放需要启动的组件,这个只有在支持reload的情况才会使用
      Application application;
      if(reload) {
        EventBus eventBus = new EventBus(agentName + "-event-bus" );
        PollingPropertiesFileConfigurationProvider configurationProvider =
            new PollingPropertiesFileConfigurationProvider(agentName,
                configurationFile, eventBus, 30);
        components.add(configurationProvider);
        application = new Application(components);
        eventBus.register(application);
      } else { //不知道reload的情况
        PropertiesFileConfigurationProvider configurationProvider =
            new PropertiesFileConfigurationProvider(agentName,
                configurationFile); //实例化一个PropertiesFileConfigurationProvider 对象,参数是agent的名称和配置文件(即n和f的设置)
        application = new Application(); //实例化一个Application对象
        application.handleConfigurationEvent(configurationProvider.getConfiguration()); //调用handleConfigurationEvent方法
      }
      application.start(); // 调用start方法

不支持reload的启动方法调用:

main--->handleConfigurationEvent-->stopAllComponents+startAllComponents-->start

3)handleConfigurationEvent方法调用stopAllComponents和startAllComponents方法

  public synchronized void handleConfigurationEvent(MaterializedConfiguration conf) {
    stopAllComponents(); //用于
    startAllComponents(conf);
  }

这里handleConfigurationEvent方法的参数为MaterializedConfiguration对象(这里为SimpleMaterializedConfiguration实例)
MaterializedConfiguration对象由AbstractConfigurationProvider.getConfiguration方法返回,在AbstractConfigurationProvider.getConfiguration方法中通过
调用loadChannels/loadSources/loadSinks方法来解析flume的配置文件,同时把对应的Channel,SourceRunner,SinkRunner放到对应的hashmap中,并最终通过SimpleMaterializedConfiguration的addChannel/addSourceRunner/addSinkRunner加载到SimpleMaterializedConfiguration对象中,然后供stopAllComponents/startAllComponents使用

stopAllComponents方法用于关闭所有的组件,
其通过调用MaterializedConfiguration对象(这里具体实现类为SimpleMaterializedConfiguration)的getSourceRunners和getChannels来获取需要关闭的SourceRunner和Channel组件对象,然后对各个组件对象调用LifecycleSupervisor.unsupervise来关闭组件,而startAllComponents正好相反,其对各个组件对象调用LifecycleSupervisor.supervise方法用于启动各个组件服务,另外
startAllComponents方法会调用this.loadMonitoring()方法启动监控flume的metrics的服务(而支持reload的方式不会调用这个方法)

4)start方法会对每一个组件调用LifecycleSupervisor.supervise方法,来进行服务的状态管理(在服务异常时可以自动拉起),这个主要是对支持reload的设置有用,
用来启动检测文件更新的计划任务线程池

  public synchronized void start() {
    for(LifecycleAware component : components) {
      supervisor.supervise(component,
          new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
    }

}
supervise的实现参见(http://caiguangguang.blog.51cto.com/1652935/1619527)
支持reload的启动方法调用:main--->EventBus.register-->start方法
reload的实现参见(http://caiguangguang.blog.51cto.com/1652935/1619523)

时间: 2024-10-13 00:26:15

flume启动过程分析的相关文章

S5PV210-kernel-内核启动过程分析

1.1.内核启动过程分析前的准备 1.拿到一个内核源码时,先目录下的无用文件删除 2.建立SI工程 3.makefile (1)makefile中不详细的去分析,几个关键的地方,makefile开始部分是kernel的版本号,这个版本号比较重要,因为在模块化驱动安装时会需要用到,要注意会查,会改,版本号在makefile中,改直接改的就行 (2)kernel顶层的makefile中定义的两个变量很重要,一个是ARCH,一个CROSS,ARCH表示我们当前的配置编译路径,如果我们的ARCH =AR

ActivityManagerService启动过程分析

ActivityManagerService启动过程 一 从Systemserver到AMS zygote-> systemserver:java入层口: /** * The main entry point from zygote. */ public static void main(String[] args) { new SystemServer().run(); } 接下来继续看SystemServer run函数执行过程: private void run() { // 准备Syst

Android4.0(Phone)拨号启动过程分析(二)

接上:Android4.0(Phone)拨号启动过程分析(一) InCallScreen处理来电和拨号的界面,接通电话也是这个界面,接下来分析InCallScreen类是如何处理拨号流程的: @Override protected void onCreate(Bundle icicle) { Log.i(LOG_TAG, "onCreate()... this = " + this); Profiler.callScreenOnCreate(); super.onCreate(icic

startActivity启动过程分析(转)

基于Android 6.0的源码剖析, 分析android Activity启动流程,相关源码: frameworks/base/services/core/java/com/android/server/am/ - ActivityManagerService.java - ActivityStackSupervisor.java - ActivityStack.java - ActivityRecord.java - ProcessRecord.java frameworks/base/co

OpenWrt启动过程分析+添加自启动脚本【转】

一.OpenWrt启动过程分析 转自: http://www.eehello.com/?post=107 总结一下OpenWrt的启动流程:1.CFE->2.linux->3./etc/preinit->4./sbin/init ->5./etc/inittab ->6./etc/init.d/rcS->7./etc/rc.d/S* ->8. OpenWrt是一个开放的linux平台,主要用于带wifi的无线路由上. 类似于Ubuntu.Red Hat.之类的li

ARM多核处理器启动过程分析

说明: 该流程图按照代码执行时间顺序划分为4部分: 1.     Bootloader在图片上半部,最先启动: 2.     Kernel在图片下半部,由bootloader引导启动: 3.CPU0执行流程在图片左半部,bootloader代码会进行判断,先行启动CPU0: 4.  Secondary CPUs在图片右半部,由CPU唤醒 具体启动流程如下: 1.     在bootloader启动时,会判断执行代码的是否为CPU0,如果不是,则执行wfe等待CPU0发出sev指令唤醒.如果是CP

Chromium的Plugin进程启动过程分析

前面我们分析了Chromium的Render进程和GPU进程的启动过程,它们都是由Browser进程启动的.在Chromium中,还有一类进程是由Browser进程启动的,它们就是Plugin进程.顾名思义,Plugin进程是用来运行浏览器插件的.浏览器插件的作用是扩展网页功能,它们由第三方开发,安全性和稳定性都无法得到保证,因此运行在独立的进程中.本文接下来就详细分析Plugin进程的启动过程. 老罗的新浪微博:http://weibo.com/shengyangluo,欢迎关注! 在Chro

Chromium的GPU进程启动过程分析

Chromium除了有Browser进程和Render进程,还有GPU进程.GPU进程负责Chromium的GPU操作,例如Render进程通过GPU进程离屏渲染网页,Browser进程也是通过GPU进程将离屏渲染好的网页显示在屏幕上.Chromium之所以将GPU操作运行在独立进程中,是考虑到稳定性问题.毕竟GPU操作是硬件相关操作,硬件的差异性会引发一定的不稳性.本文分析GPU进程的启动过程. 老罗的新浪微博:http://weibo.com/shengyangluo,欢迎关注! GPU进程

iOS程序的完整启动过程分析

1.点击程序图标,打开程序 2.执行main函数,分析如下: ? 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 int main(int argc, char * argv[]) {     @autoreleasepool {         /*          argc: 系统或者用户传入的参数个数          argv: 系统或者用户传入的实际参数          */         // return UIApplic