Flume-NG源码分析-整体结构及配置载入分析

在 http://flume.apache.org 上下载flume-1.6.0版本,将源码导入到Idea开发工具后如下图所示:

一、主要模块说明

  • flume-ng-channels 里面包含了filechannel,jdbcchannel,kafkachannel,memorychannel通道的实现。
  • flume-ng-clients 实现了log4j相关的几个Appender,使得log4j的日志输出可以直接发送给flume-agent;其中有一个LoadBalancingLog4jAppender的实现,提供了多个flume-agent的load balance和ha功能,采用flume作为日志收集的可以考虑将这个appender引入内部的log4j中。
  • flume-ng-configuration 这个主要就是Flume配置信息相关的类,包括载入flume-config.properties配置文件并解析。其中包括了Source的配置,Sink的配置,Channel的配置,在阅读源码前推荐先梳理这部分关系再看其他部分的。
  • flume-ng-core flume整个核心框架,包括了各个模块的接口以及逻辑关系实现。其中instrumentation是flume内部实现的一套metric机制,metric的变化和维护,其核心也就是在MonitoredCounterGroup中通过一个Map<key, AtomicLong>来实现metric的计量。ng-core下几乎大部分代码任然几种在channel、sink、source几个子目录下,其他目录基本完成一个util和辅助的功能。
  • flume-ng-node 实现启动flume的一些基本类,包括main函数的入口(Application.java中)。在理解configuration之后,从application的main函数入手,可以较快的了解整个flume的代码。

二、Flume逻辑结构图

三、flume-ng启动文件介绍

################################
# constants
################################

#设置常量值,主要是针对不同的参数执行相应的类,以启动Flume环境
FLUME_AGENT_CLASS="org.apache.flume.node.Application"
FLUME_AVRO_CLIENT_CLASS="org.apache.flume.client.avro.AvroCLIClient"
FLUME_VERSION_CLASS="org.apache.flume.tools.VersionInfo"
FLUME_TOOLS_CLASS="org.apache.flume.tools.FlumeToolsMain"

#真正启动Flume环境的方法
run_flume() {
  local FLUME_APPLICATION_CLASS

  if [ "$#" -gt 0 ]; then
    FLUME_APPLICATION_CLASS=$1
    shift
  else
    error "Must specify flume application class" 1
  fi

  if [ ${CLEAN_FLAG} -ne 0 ]; then
    set -x
  fi

  #执行这一行命令,执行相应的启动类,比如org.apache.flume.node.Application
  $EXEC $JAVA_HOME/bin/java $JAVA_OPTS $FLUME_JAVA_OPTS "${arr_java_props[@]}" -cp "$FLUME_CLASSPATH"       -Djava.library.path=$FLUME_JAVA_LIBRARY_PATH "$FLUME_APPLICATION_CLASS" $*
}

################################
# main
################################

# set default params
# 在启动的过程中使用到的参数
FLUME_CLASSPATH=""
FLUME_JAVA_LIBRARY_PATH=""
#默认占用堆空间大小,这一块都可以根据JVM进行重新设置
JAVA_OPTS="-Xmx20m"
LD_LIBRARY_PATH=""

opt_conf=""
opt_classpath=""
opt_plugins_dirs=""
arr_java_props=()
arr_java_props_ct=0
opt_dryrun=""

# 根据不同的参数,执行不同的启动类,每个常量所对应的类路径在代码前面有过介绍。
if [ -n "$opt_agent" ] ; then
  run_flume $FLUME_AGENT_CLASS $args
elif [ -n "$opt_avro_client" ] ; then
  run_flume $FLUME_AVRO_CLIENT_CLASS $args
elif [ -n "${opt_version}" ] ; then
  run_flume $FLUME_VERSION_CLASS $args
elif [ -n "${opt_tool}" ] ; then
  run_flume $FLUME_TOOLS_CLASS $args
else
  error "This message should never appear" 1
fi

这是其中最主要的一部分flume-ng命令行,根据重要性摘取了一段,感兴趣的读者可以自己到bin目录下查看全部。

四、从Flume-NG启动过程开始说起

从bin/flume-ng这个shell脚本可以看到Flume的起始于org.apache.flume.node.Application类,这是flume的main函数所在。

main方法首先会先解析shell命令,如果指定的配置文件不存在就抛出异常。

代码如下所示:

Options options = new Options();
Option option = new Option("n", "name", true, "the name of this agent");
option.setRequired(true);
options.addOption(option);

option = new Option("f", "conf-file", true,
    "specify a config file (required if -z missing)");
option.setRequired(false);
options.addOption(option);

option = new Option(null, "no-reload-conf", false,
    "do not reload config file if changed");
options.addOption(option);

// Options for Zookeeper
option = new Option("z", "zkConnString", true,
    "specify the ZooKeeper connection to use (required if -f missing)");
option.setRequired(false);
options.addOption(option);

option = new Option("p", "zkBasePath", true,
    "specify the base path in ZooKeeper for agent configs");
option.setRequired(false);
options.addOption(option);

option = new Option("h", "help", false, "display help text");
options.addOption(option);

 #命令行解析类
CommandLineParser parser = new GnuParser();
CommandLine commandLine = parser.parse(options, args);

if (commandLine.hasOption(‘h‘)) {
  new HelpFormatter().printHelp("flume-ng agent", options, true);
  return;
}

String agentName = commandLine.getOptionValue(‘n‘);
boolean reload = !commandLine.hasOption("no-reload-conf");

if (commandLine.hasOption(‘z‘) || commandLine.hasOption("zkConnString")) {
  isZkConfigured = true;
}

以上代码是Application类中校验shell命令行的代码,举个例子在启动flume的时候,使用如下命令行:

./bin/flume-ng agent -n agent -c conf -f conf/hw.conf -Dflume.root.logger=INFO,console

里面的-n -f等参数都是在上面代码中校验的。

再往下看main方法里的代码:

File configurationFile = new File(commandLine.getOptionValue(‘f‘));

  /*
   * The following is to ensure that by default the agent will fail on
   * startup if the file does not exist.
   */
  if (!configurationFile.exists()) {
    // If command line invocation, then need to fail fast
    if (System.getProperty(Constants.SYSPROP_CALLED_FROM_SERVICE) ==
      null) {
      String path = configurationFile.getPath();
      try {
        path = configurationFile.getCanonicalPath();
      } catch (IOException ex) {
        logger.error("Failed to read canonical path for file: " + path,
          ex);
      }
      throw new ParseException(
        "The specified configuration file does not exist: " + path);
    }
  }
  List<LifecycleAware> components = Lists.newArrayList();

  if (reload) {
    EventBus eventBus = new EventBus(agentName + "-event-bus");
    PollingPropertiesFileConfigurationProvider configurationProvider =
      new PollingPropertiesFileConfigurationProvider(
        agentName, configurationFile, eventBus, 30);
    components.add(configurationProvider);
    application = new Application(components);
    eventBus.register(application);
  } else {
    PropertiesFileConfigurationProvider configurationProvider =
      new PropertiesFileConfigurationProvider(
        agentName, configurationFile);
    application = new Application();
    application.handleConfigurationEvent(configurationProvider
      .getConfiguration());
  }
}
application.start();

说明:

根据命令中含有”no-reload-conf”参数,决定采用那种加载配置文件方式:

一、没有此参数,会动态加载配置文件,默认每30秒加载一次配置文件,因此可以动态修改配置文件;

二、有此参数,则只在启动时加载一次配置文件。实现动态加载功能采用了发布订阅模式,使用guava中的EventBus实现。

三、PropertiesFileConfigurationProvider这个类是配置文件加载类。

类图如下:

从图中可以看出在整个PollingPropertiesFileConfigurationProvider类中,它实现了LifecycleAware接口,而这个接口是掌管整个Flume生命周期的一个核心接口,LifecycleSupervisor实现了这个接口,通过上面代码中application.start方法触发LifecyleAware的start方法,下面是这个接口的方法定义及相关类代码:

public interface LifecycleAware {

  /**
   * <p>
   * Starts a service or component.
   * </p>
   * @throws LifecycleException
   * @throws InterruptedException
   */
  public void start();

  /**
   * <p>
   * Stops a service or component.
   * </p>
   * @throws LifecycleException
   * @throws InterruptedException
   */
  public void stop();

  /**
   * <p>
   * Return the current state of the service or component.
   * </p>
   */
  public LifecycleState getLifecycleState();

}

Application.start()方法内容:

public synchronized void start() {
  for(LifecycleAware component : components) {
    supervisor.supervise(component,
        new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
  }
}

LifecycleSupervisor.supervise方法内容如下:

public synchronized void supervise(LifecycleAware lifecycleAware,
    SupervisorPolicy policy, LifecycleState desiredState) {
  if(this.monitorService.isShutdown()
      || this.monitorService.isTerminated()
      || this.monitorService.isTerminating()){
    throw new FlumeException("Supervise called on " + lifecycleAware + " " +
        "after shutdown has been initiated. " + lifecycleAware + " will not" +
        " be started");
  }

  Preconditions.checkState(!supervisedProcesses.containsKey(lifecycleAware),
      "Refusing to supervise " + lifecycleAware + " more than once");

  if (logger.isDebugEnabled()) {
    logger.debug("Supervising service:{} policy:{} desiredState:{}",
        new Object[] { lifecycleAware, policy, desiredState });
  }

  Supervisoree process = new Supervisoree();
  process.status = new Status();

  process.policy = policy;
  process.status.desiredState = desiredState;
  process.status.error = false;

  MonitorRunnable monitorRunnable = new MonitorRunnable();
  monitorRunnable.lifecycleAware = lifecycleAware;
  monitorRunnable.supervisoree = process;
  monitorRunnable.monitorService = monitorService;

  supervisedProcesses.put(lifecycleAware, process);

  ScheduledFuture<?> future = monitorService.scheduleWithFixedDelay(
      monitorRunnable, 0, 3, TimeUnit.SECONDS);
  monitorFutures.put(lifecycleAware, future);
}

在上面的代码中,会创建MonitorRunnable对象,这个对象是个定时对象,里面的run方法主要是根据supervisoree.status.desiredState的值执行对应的操作。

包括:START,STOP等状态, 大家注意scheduleWithFixedDelay这个方法,这是java线程池自带的,要求每次任务执行完以后再延迟3秒,而不是每隔3秒执行一次,大家注意这一点。

又有同学会问循环调用会不会有问题,这里回应大家其实也没问题,这么做是为了重试机制,看下面代码:

if (!lifecycleAware.getLifecycleState().equals( supervisoree.status.desiredState))

在MonitorRunnable内部有这样一个判断,当getLifecycleState与supervisoree.status.desiredState状态不相等的时候才会执行,而ifecycleAware.getLifecycleState()初始状态是IDLE。

时序调用图如下所示

注:

PollingPropertiesFileConfigurationProvider.start()方法会启动一个单线程FileWatcherRunnable每隔30s去加载一次配置文件:

eventBus.post(getConfiguration())。

getConfiguration()解析了配置文件并且获取所有组件及配置属性

五、配置文件加载详细分析

先看一下FileWatcherRunnable内部的代码:

public MaterializedConfiguration getConfiguration() {
 //初始化三大组件的配置Map,source,channel,sink
  MaterializedConfiguration conf = new SimpleMaterializedConfiguration();
  FlumeConfiguration fconfig = getFlumeConfiguration();
  AgentConfiguration agentConf = fconfig.getConfigurationFor(getAgentName());
  if (agentConf != null) {
    Map<String, ChannelComponent> channelComponentMap = Maps.newHashMap();
    Map<String, SourceRunner> sourceRunnerMap = Maps.newHashMap();
    Map<String, SinkRunner> sinkRunnerMap = Maps.newHashMap();
    try {
      loadChannels(agentConf, channelComponentMap);
      loadSources(agentConf, channelComponentMap, sourceRunnerMap);
      loadSinks(agentConf, channelComponentMap, sinkRunnerMap);
      Set<String> channelNames =
          new HashSet<String>(channelComponentMap.keySet());
      for(String channelName : channelNames) {
        ChannelComponent channelComponent = channelComponentMap.
            get(channelName);
        if(channelComponent.components.isEmpty()) {
          LOGGER.warn(String.format("Channel %s has no components connected" +
              " and has been removed.", channelName));
          channelComponentMap.remove(channelName);
          Map<String, Channel> nameChannelMap = channelCache.
              get(channelComponent.channel.getClass());
          if(nameChannelMap != null) {
            nameChannelMap.remove(channelName);
          }
        } else {
          LOGGER.info(String.format("Channel %s connected to %s",
              channelName, channelComponent.components.toString()));
          conf.addChannel(channelName, channelComponent.channel);
        }
      }
      for(Map.Entry<String, SourceRunner> entry : sourceRunnerMap.entrySet()) {
        conf.addSourceRunner(entry.getKey(), entry.getValue());
      }
      for(Map.Entry<String, SinkRunner> entry : sinkRunnerMap.entrySet()) {
        conf.addSinkRunner(entry.getKey(), entry.getValue());
      }
    } catch (InstantiationException ex) {
      LOGGER.error("Failed to instantiate component", ex);
    } finally {
      channelComponentMap.clear();
      sourceRunnerMap.clear();
      sinkRunnerMap.clear();
    }
  } else {
    LOGGER.warn("No configuration found for this host:{}", getAgentName());
  }
  return conf;
}

说明:

一、在哪里加载的配置文件

其实是在这里,FlumeConfiguration fconfig = getFlumeConfiguration();

getFlumeConfiguration()这个方法是一个抽象方法,可以通过下图的方式查找加载方式。

我们选择PollingPropertiesFileConfigurationProvider这个,可以看到:

@Override
public FlumeConfiguration getFlumeConfiguration() {
  BufferedReader reader = null;
  try {
    reader = new BufferedReader(new FileReader(file));
    Properties properties = new Properties();
    properties.load(reader);
    return new FlumeConfiguration(toMap(properties));
  } catch (IOException ex) {
    LOGGER.error("Unable to load file:" + file
        + " (I/O failure) - Exception follows.", ex);
  } finally {
    if (reader != null) {
      try {
        reader.close();
      } catch (IOException ex) {
        LOGGER.warn(
            "Unable to close file reader for file: " + file, ex);
      }
    }
  }
  return new FlumeConfiguration(new HashMap<String, String>());
}

就是上面这个方法通过JAVA最基本的流的方式加载的配置文件,也就是图上面我配置的flume的hw.conf配置文件。方法读取配置文件,然后解析成name(输姓名全称,即等号左侧的全部)、value(等号的右侧)对,存入一个Map当中,返回一个封装了这个Map的FlumeConfiguration对象。

FlumeConfiguration类的构造函数会遍历这个Map的所有<name,value>对,调用addRawProperty(String name, String value)处理<name,value>对,addRawProperty方法会先做一些合法性检查,启动Flume的时候会构造一个AgentConfiguration对象aconf,然后agentConfigMap.put(agentName, aconf),以后动态加载配置文件时只需要AgentConfiguration aconf = agentConfigMap.get(agentName)就可以得到,然后调用aconf.addProperty(configKey, value)处理。

二、我们重点看一下addProperty方法内部的parseConfigKey方法,这里会深入解析每一行配置文件内容。

我们举一个配置文件的例子:

agent.sources=s1
agent.channels=c1 c2
agent.sinks=k1 k2

agent.sources.s1.type=exec
agent.sources.s1.command=tail -F /Users/it-od-m-2687/Downloads/abc.log
agent.sources.s1.channels=c1
agent.channels.c1.type=memory
agent.channels.c1.capacity=10000
agent.channels.c1.transactionCapacity=100

agent.sinks.k1.type= org.apache.flume.sink.kafka.KafkaSink
agent.sinks.k1.brokerList=127.0.0.1:9092

agent.sinks.k1.topic=testKJ1
agent.sinks.k1.serializer.class=kafka.serializer.StringEncoder

agent.sinks.k1.channel=c1

解析上面的文件就是使用下面parseConfigKey这个方法:

cnck = parseConfigKey(key, BasicConfigurationConstants.CONFIG_SINKGROUPS_PREFIX);
public final class BasicConfigurationConstants {

public static final String CONFIG_SOURCES = "sources";
public static final String CONFIG_SOURCES_PREFIX = CONFIG_SOURCES + ".";
public static final String CONFIG_SOURCE_CHANNELSELECTOR_PREFIX = "selector.";

public static final String CONFIG_SINKS = "sinks";
public static final String CONFIG_SINKS_PREFIX = CONFIG_SINKS + ".";
public static final String CONFIG_SINK_PROCESSOR_PREFIX = "processor.";

public static final String CONFIG_SINKGROUPS = "sinkgroups";
public static final String CONFIG_SINKGROUPS_PREFIX = CONFIG_SINKGROUPS + ".";

public static final String CONFIG_CHANNEL = "channel";
public static final String CONFIG_CHANNELS = "channels";
public static final String CONFIG_CHANNELS_PREFIX = CONFIG_CHANNELS + ".";

public static final String CONFIG_CONFIG = "config";
public static final String CONFIG_TYPE = "type";

private BasicConfigurationConstants() {
  // disable explicit object creation
}

1、我们用agent.sources.s1.command=s1来举例:

变量prefix指的是:sink,source,channel等关键字。

如下面代码:

public final class BasicConfigurationConstants {

public static final String CONFIG_SOURCES = "sources";
public static final String CONFIG_SOURCES_PREFIX = CONFIG_SOURCES + ".";
public static final String CONFIG_SOURCE_CHANNELSELECTOR_PREFIX = "selector.";

public static final String CONFIG_SINKS = "sinks";
public static final String CONFIG_SINKS_PREFIX = CONFIG_SINKS + ".";
public static final String CONFIG_SINK_PROCESSOR_PREFIX = "processor.";

public static final String CONFIG_SINKGROUPS = "sinkgroups";
public static final String CONFIG_SINKGROUPS_PREFIX = CONFIG_SINKGROUPS + ".";

public static final String CONFIG_CHANNEL = "channel";
public static final String CONFIG_CHANNELS = "channels";
public static final String CONFIG_CHANNELS_PREFIX = CONFIG_CHANNELS + ".";

public static final String CONFIG_CONFIG = "config";
public static final String CONFIG_TYPE = "type";

private BasicConfigurationConstants() {
  // disable explicit object creation
}

2、上面parseConfigKey方法,首先根据prefix判断prefix的后面,有少多字符。比如:sources.s1.command,在sources后面s1.command一共有10个字符。

3、解析出name变量,如s1,这个是自己定义的。

4、解析出configKey固定关键字,如command,这个是系统定义的。

5、封装new ComponentNameAndConfigKey(name, configKey)返回。

6、将sources、channel、sink配置信息,分别存放到sourceContextMap、channelConfigMap、sinkConfigMap三个HashMap,最后统一封装到AgentConfiguration对象中,然后再把AgentConfiguration存放到agentConfigMap中,key是agentName。说了这么多相信很多同学都已经晕了,agentConfigMap的结构如下图所示:

读源码是一个很痛苦的过程,不仅要分析整体框架的架构,还要理解作者的用意和设计思想,但只要坚持下来你会发现还是能学到很多东西的。

时间: 2024-10-07 00:00:08

Flume-NG源码分析-整体结构及配置载入分析的相关文章

Flume NG源码分析(五)使用ThriftSource通过RPC方式收集日志

上一篇说了利用ExecSource从本地日志文件异步的收集日志,这篇说说采用RPC方式同步收集日志的方式.笔者对Thrift比较熟悉,所以用ThriftSource来介绍RPC的日志收集方式. 整体的结构图如下: 1. ThriftSource包含了一个Thrift Server,以及一个Thrift Service服务的实现.这里的Thrift Service是由ThriftSourceProtocol定义 2. 应用程序调用Thrift Service的客户端,以RPC的方式将日志发送到Th

Flume NG源码分析(三)使用Event接口表示数据流

Flume NG有4个主要的组件: Event表示在Flume各个Agent之间传递的数据流 Source表示从外部源接收Event数据流,然后传递给Channel Channel表示对从Source传递的Event数据流的临时存储 Sink表示从Channel中接收存储的Event数据流,并传递给下游的Source或者终点仓库 这篇看一下Event接口表示的数据流.Source, Channel, Sink操作的数据流都是基于Event接口的封装. public interface Event

WorldWind源码剖析系列:配置载入器类ConfigurationLoader

配置载入器类ConfigurationLoader主要从指定的路径中加载保存星球相关参数的xml文件,从中读取数据来构造星球对象及其所关联的可渲染子对象列表并返回.该类的类图如下所示. 该类所包含的主要的方法基本都是静态的,功能说明如下: public static double ParseDouble(string s)将字符串s解析为Double型数字 private static bool ParseBool(string booleanString) 将字符串s解析为bool型 publ

【源码解析】自动配置的这些细节都不知道,别说你会 springboot

spring-boot 相对于 spring,很重要的一个特点就是自动配置,使约定大于配置思想成功落地.xxx-spring-boot-starter 一系列引导器能够开箱即用,或者只需要很少的配置(对于初学人员)就是因为已做了默认的自动配置. 自动配置在一开始就初始化了一些配置,同时提供修改配置的入口. 整体结构spring-boot-autoconfigure 包是 spring-boot-starter 中一个非常重要的包,其中提供了自动配置功能,还对常用依赖,设置了默认配置. 依赖其依赖

分析开源项目源码,我们该如何入手分析?(授人以渔)

1 前言 本文接上篇文章跟大家聊聊我们为什么要学习源码?学习源码对我们有用吗?,那么本篇文章再继续跟小伙伴们聊聊源码这个话题. 在工作之余开始写SpringBoot源码分析专栏前,跟小伙伴们聊聊"分析开源项目源码,我们该如何入手分析?"这个话题,我们就随便扯皮,反正是跟小伙伴们一起学习交流,没必要太正式. 小伙伴们看完本文后,若有自己的源码阅读心得可以在下面进行评论或私聊我进行分享,让我从小伙伴们身上GET多点源码阅读的一些技巧,嘿嘿. 2 学习开源框架源码到底难不难? 那么,先跟小伙

Netty源码学习——EventLoopGroup原理:NioEventLoopGroup分析

类结构图: 不了解Executor接口原理的可以查看concurrent包中的api介绍,这里只介绍Netty中EventExecutorGroup的主要功能! 从类的结构图中可以看到EventExecutorGroup是直接继承ScheduledExecutorService这个接口的,为了说明白Group的原理这里顺便提一下ScheduledExecutorService的用途! java.util.concurrent.ScheduledExecutorService An Executo

Apache Spark源码走读之6 -- 存储子系统分析

欢迎转载,转载请注明出处,徽沪一郎. 楔子 Spark计算速度远胜于Hadoop的原因之一就在于中间结果是缓存在内存而不是直接写入到disk,本文尝试分析Spark中存储子系统的构成,并以数据写入和数据读取为例,讲述清楚存储子系统中各部件的交互关系. 存储子系统概览 上图是Spark存储子系统中几个主要模块的关系示意图,现简要说明如下 CacheManager  RDD在进行计算的时候,通过CacheManager来获取数据,并通过CacheManager来存储计算结果 BlockManager

qemu-kvm-1.1.0源码中关于迁移的代码分析

Description Businesses like to have memorable telephone numbers. One way to make a telephone number memorable is to have it spell a memorable word or phrase. For example, you can call the University of Waterloo by dialing the memorable TUT-GLOP. Some

网狐源码架设:MatchID配置错误

本文引用于:征途源码论坛(www.zhengtuwl.com)-----专注各类精品源码下载的平台 网狐棋牌源码架设常见问题: [配置错误]MatchID配置错误....的解决方法 MATCHID 就是房间ID 房间ID配置上去就好了