1.启动命令
nohup bin/flume-ng agent -n agent-server -f agent-server1.conf &
flume-ng是一个shell脚本:
agent run a Flume agent ---> org.apache.flume.node.Application 类 avro-client run an avro Flume client ---> org.apache.flume.client.avro.AvroCLIClient 类
run_flume() { #shell脚本实现 local FLUME_APPLICATION_CLASS if [ "$#" -gt 0 ]; then FLUME_APPLICATION_CLASS=$1 shift else error "Must specify flume application class" 1 fi if [ ${CLEAN_FLAG} -ne 0 ]; then set -x fi $EXEC $JAVA_HOME/bin/java $JAVA_OPTS -cp "$FLUME_CLASSPATH" -Djava.library.path=$FLUME_JAVA_LIBRARY_PATH "$FLUME_APPLICATION_CLASS" $* } ...... # finally, invoke the appropriate command if [ -n "$opt_agent" ] ; then #如果第一个参数为agent时,opt_agent取值为1 run_flume $FLUME_AGENT_CLASS $args #FLUME_AGENT_CLASS="org.apache.flume.node.Application" elif [ -n "$opt_avro_client" ] ; then run_flume $FLUME_AVRO_CLIENT_CLASS $args elif [ -n "${opt_version}" ] ; then run_flume $FLUME_VERSION_CLASS $args elif [ -n "${opt_tool}" ] ; then run_flume $FLUME_TOOLS_CLASS $args else error "This message should never appear" 1 fi
最终启动的时候调用org.apache.flume.node.Application类的main方法
2.org.apache.flume.node.Application类
1)调用main方法,首先会解析参数,主要是n和f以及no-reload-conf,n为节点名称,f为配置文件,no-reload-conf代表是否支持自动reload(1.5.0才有的功能)
n/f 都有设置的值,no-reload-conf没有设置的项,如果设置了no-reload-conf代表不能自动reload
Options options = new Options(); Option option = new Option("n" , "name" , true, "the name of this agent"); option.setRequired( true); options.addOption(option); option = new Option("f" , "conf-file" , true, "specify a conf file"); option.setRequired( true); options.addOption(option); option = new Option(null , "no-reload-conf" , false, "do not reload " + "conf file if changed"); options.addOption(option); .... CommandLineParser parser = new GnuParser(); CommandLine commandLine = parser.parse(options, args); File configurationFile = new File(commandLine.getOptionValue(‘f‘ )); String agentName = commandLine.getOptionValue( ‘n‘); boolean reload = !commandLine.hasOption( "no-reload-conf"); //获取是否含有no-reload-conf的设置,如果没有设置no-reload-conf则reload为true
2)
List<LifecycleAware> components = Lists.newArrayList(); //初始化一个List<LifecycleAware>对象,用来存放需要启动的组件,这个只有在支持reload的情况才会使用 Application application; if(reload) { EventBus eventBus = new EventBus(agentName + "-event-bus" ); PollingPropertiesFileConfigurationProvider configurationProvider = new PollingPropertiesFileConfigurationProvider(agentName, configurationFile, eventBus, 30); components.add(configurationProvider); application = new Application(components); eventBus.register(application); } else { //不知道reload的情况 PropertiesFileConfigurationProvider configurationProvider = new PropertiesFileConfigurationProvider(agentName, configurationFile); //实例化一个PropertiesFileConfigurationProvider 对象,参数是agent的名称和配置文件(即n和f的设置) application = new Application(); //实例化一个Application对象 application.handleConfigurationEvent(configurationProvider.getConfiguration()); //调用handleConfigurationEvent方法 } application.start(); // 调用start方法
不支持reload的启动方法调用:
main--->handleConfigurationEvent-->stopAllComponents+startAllComponents-->start
3)handleConfigurationEvent方法调用stopAllComponents和startAllComponents方法
public synchronized void handleConfigurationEvent(MaterializedConfiguration conf) { stopAllComponents(); //用于 startAllComponents(conf); }
这里handleConfigurationEvent方法的参数为MaterializedConfiguration对象(这里为SimpleMaterializedConfiguration实例)
MaterializedConfiguration对象由AbstractConfigurationProvider.getConfiguration方法返回,在AbstractConfigurationProvider.getConfiguration方法中通过
调用loadChannels/loadSources/loadSinks方法来解析flume的配置文件,同时把对应的Channel,SourceRunner,SinkRunner放到对应的hashmap中,并最终通过SimpleMaterializedConfiguration的addChannel/addSourceRunner/addSinkRunner加载到SimpleMaterializedConfiguration对象中,然后供stopAllComponents/startAllComponents使用
stopAllComponents方法用于关闭所有的组件,
其通过调用MaterializedConfiguration对象(这里具体实现类为SimpleMaterializedConfiguration)的getSourceRunners和getChannels来获取需要关闭的SourceRunner和Channel组件对象,然后对各个组件对象调用LifecycleSupervisor.unsupervise来关闭组件,而startAllComponents正好相反,其对各个组件对象调用LifecycleSupervisor.supervise方法用于启动各个组件服务,另外
startAllComponents方法会调用this.loadMonitoring()方法启动监控flume的metrics的服务(而支持reload的方式不会调用这个方法)
4)start方法会对每一个组件调用LifecycleSupervisor.supervise方法,来进行服务的状态管理(在服务异常时可以自动拉起),这个主要是对支持reload的设置有用,
用来启动检测文件更新的计划任务线程池
public synchronized void start() { for(LifecycleAware component : components) { supervisor.supervise(component, new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START); }
}
supervise的实现参见(http://caiguangguang.blog.51cto.com/1652935/1619527)
支持reload的启动方法调用:main--->EventBus.register-->start方法
reload的实现参见(http://caiguangguang.blog.51cto.com/1652935/1619523)