SourceRunner与ExecSource实现分析

在agent启动时,会启动Channel,SourceRunner,SinkRunner,比如在org.apache.flume.agent.embedded.EmbeddedAgent类的doStart方法中:

  private void doStart() {
    boolean error = true;
    try {
      channel.start(); //调用Channel.start启动Channel
      sinkRunner.start(); //调用SinkRunner.start启动SinkRunner
      sourceRunner.start(); //调用SourceRunner.start启动SourceRunner
      supervisor.supervise(channel,
          new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
      supervisor.supervise(sinkRunner,
          new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
      supervisor.supervise(sourceRunner,
          new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
      error = false;
.....

SourceRunner目前有两大类PollableSourceRunner和EventDrivenSourceRunner,分别对应PollableSource和EventDrivenSource,PollableSource相关类需要外部驱动来确定source中是否有消息可以使用,而EventDrivenSource相关类不需要外部驱动,自己实现了事件驱动机制,目前常见的Source类都属于EventDrivenSource类型。在org.apache.flume.conf.source.SourceType中定义了常见的Source类:

OTHER(null),
SEQ("org.apache.flume.source.SequenceGeneratorSource"),
NETCAT("org.apache.flume.source.NetcatSource"),
EXEC("org.apache.flume.source.ExecSource"),
AVRO("org.apache.flume.source.AvroSource"),
SYSLOGTCP("org.apache.flume.source.SyslogTcpSource"),
MULTIPORT_SYSLOGTCP("org.apache.flume.source.MultiportSyslogTCPSource"),
SYSLOGUDP("org.apache.flume.source.SyslogUDPSource"),
SPOOLDIR("org.apache.flume.source.SpoolDirectorySource"),
HTTP("org.apache.flume.source.http.HTTPSource");

可以由org.apache.flume.source.DefaultSourceFactory获取其对应的实例.
SourceRunner则负责启动Source,比如在org.apache.flume.source.EventDrivenSourceRunner的start方法:

  public void start() {
    Source source = getSource(); //通过getSource获取Source对象
    ChannelProcessor cp = source.getChannelProcessor(); //获取ChannelProcessor 对象
    cp.initialize(); //调用ChannelProcessor.initialize方法
    source.start(); //调用Source.start方法
    lifecycleState = LifecycleState. START;
  }

这里以execsource为例,配置source的类型为EXEC时,使用org.apache.flume.source.ExecSource.
主要原理是通过启动操作系统中的进程把每一行文本信息转换为Event,exec source不提供事务的特性(不能保证数据被成功发送到channel中),execsource关心的是标准输出,标准错误输出会被忽略(虽然可以打印到log中,但不会作为Event发送),可以设置restart的特性让source在退出时自动重启
主要方法分析:
1.configure方法用来设置相关参数
参数项:

command  执行的命令
restart 在命令退出后是否自动重启,默认是false
restartThrottle 重启命令的等待时间,默认是10s
logStderr 是否记录错误日志(只会放到日志中,不会做为Event),默认是false
batchSize 一次向channel发送的event的数量,批量发送的功能,默认是20
charset 字符编码设置,默认是UTF-8

2.start方法,SourceRunner的start方法会调用这个start方法:

  public void start() {
    logger.info( "Exec source starting with command:{}" , command );
    executor = Executors. newSingleThreadExecutor(); // 初始化一个单线程的线程池,private ExecutorService executor;
    counterGroup = new CounterGroup(); //初始化性能计数器
    runner = new ExecRunnable( command, getChannelProcessor(), counterGroup,
        restart, restartThrottle, logStderr , bufferCount , charset ); //实例化一个ExecRunnable线程
    // FIXME : Use a callback-like executor / future to signal us upon failure.
    runnerFuture = executor.submit( runner);  //private Future<?> runnerFuture;
    super.start(); //设置状态为start,lifecycleState = LifecycleState.START;
    logger.debug( "Exec source started");
  }

3.核心是一个实现Runnable接口的线程类ExecRunnable,用于启动读取数据的进程(在一个do...while循环中)

public void run() {
  do {
    String exitCode = "unknown";
    BufferedReader reader = null;
    try {
      String[] commandArgs = command.split("\\s+"); //由命令生成字符串数组
      process = new ProcessBuilder(commandArgs).start(); //生成一个Process对象,启动命令进程
      reader = new BufferedReader(
          new InputStreamReader(process.getInputStream(), charset)); //读取标准输出至缓冲区对象
      // StderrLogger dies as soon as the input stream is invalid
      StderrReader stderrReader = new StderrReader(new BufferedReader(
          new InputStreamReader(process.getErrorStream(), charset)), logStderr); //StderrReader线程用于记录标准错误日志
      stderrReader.setName("StderrReader-[" + command + "]");
      stderrReader.setDaemon(true);
      stderrReader.start();
       /*
     while((line = input.readLine()) != null) {
      if(logStderr) {
        logger.info("StderrLogger[{}] = ‘{}‘", ++i, line); //日志只会写到log中,不会转换为Event
      }
    }
     */
      String line = null;
      List<Event> eventList = new ArrayList<Event>();
      while ((line = reader.readLine()) != null) { //循环读取缓冲区中的内容
        counterGroup.incrementAndGet("exec.lines.read");
        eventList.add(EventBuilder.withBody(line.getBytes(charset)));  //使用EventBuilder.withBody转换成Event并放入eventList中
        if(eventList.size() >= bufferCount) { //达到batchSize的设置时调用ChannelProcessor.processEventBatch处理Event
          channelProcessor.processEventBatch(eventList);
          eventList.clear();
        }
      }
      if(!eventList.isEmpty()) {
        channelProcessor.processEventBatch(eventList);
      }
    } 
....
  } while(restart); //如果设置了restart,会在process退出后重新运行do代码块,否则退出
}
时间: 2024-10-11 09:05:32

SourceRunner与ExecSource实现分析的相关文章

【Flume】 flume中ExecSource源码的详细分析——执行终端命令获取数据

我们直接看该Source的start方法吧 public void start() { logger.info("Exec source starting with command:{}", command); executor = Executors.newSingleThreadExecutor(); runner = new ExecRunnable(shell, command, getChannelProcessor(), sourceCounter, restart, re

Flume-NG源码分析-整体结构及配置载入分析

在 http://flume.apache.org 上下载flume-1.6.0版本,将源码导入到Idea开发工具后如下图所示: 一.主要模块说明 flume-ng-channels 里面包含了filechannel,jdbcchannel,kafkachannel,memorychannel通道的实现. flume-ng-clients 实现了log4j相关的几个Appender,使得log4j的日志输出可以直接发送给flume-agent:其中有一个LoadBalancingLog4jApp

大数据系统数据采集产品的架构分析

任何完整的大数据平台,一般包括以下的几个过程: 数据采集 数据存储 数据处理 数据展现(可视化,报表和监控) 其中,数据采集是所有数据系统必不可少的,随着大数据越来越被重视,数据采集的挑战也变的尤为突出.这其中包括: 数据源多种多样 数据量大,变化快 如何保证数据采集的可靠性的性能 如何避免重复数据 如何保证数据的质量 我们今天就来看看当前可用的一些数据采集的产品,重点关注一些它们是如何做到高可靠,高性能和高扩展. Apache Flume Flume 是Apache旗下,开源,高可靠,高扩展,

Flume NG源码分析(五)使用ThriftSource通过RPC方式收集日志

上一篇说了利用ExecSource从本地日志文件异步的收集日志,这篇说说采用RPC方式同步收集日志的方式.笔者对Thrift比较熟悉,所以用ThriftSource来介绍RPC的日志收集方式. 整体的结构图如下: 1. ThriftSource包含了一个Thrift Server,以及一个Thrift Service服务的实现.这里的Thrift Service是由ThriftSourceProtocol定义 2. 应用程序调用Thrift Service的客户端,以RPC的方式将日志发送到Th

Flume-NG启动过程源码分析(三)(原创)

上一篇文章分析了Flume如何加载配置文件的,动态加载也只是重复运行getConfiguration(). 本篇分析加载配置文件后各个组件是如何运行的? 加载完配置文件订阅者Application类会收到订阅信息执行: @Subscribe public synchronized void handleConfigurationEvent(MaterializedConfiguration conf) { stopAllComponents(); startAllComponents(conf)

【Java】【Flume】Flume-NG启动过程源码分析(二)

本节分析配置文件的解析,即PollingPropertiesFileConfigurationProvider.FileWatcherRunnable.run中的eventBus.post(getConfiguration()).分析getConfiguration()方法.此方法在AbstractConfigurationProvider类中实现了,并且这个类也初始化了三大组件的工厂类:this.sourceFactory = new DefaultSourceFactory();this.s

【Java】【Flume】Flume-NG启动过程源码分析(三)

本篇分析加载配置文件后各个组件是如何运行的? 加载完配置文件订阅者Application类会收到订阅信息执行: @Subscribe public synchronized void handleConfigurationEvent(MaterializedConfiguration conf) { stopAllComponents(); startAllComponents(conf); } MaterializedConfiguration conf就是getConfiguration()

ChannelProcessor的实现分析

org.apache.flume.channel.ChannelProcessor 用于实际的Event到Channel的操作(在Source中用到),可以把它想象成channel的proxy,用于控制把Event put到哪些Channel中,以及怎么put(bacth或者单个),同时在put之前会使用 Interceptor对Event进行处理.把Event put到哪些Channel中是由ChannelSelector 控制的,根据selector的设置,目前主要有两种: REPLICAT

爱奇艺、优酷、腾讯视频竞品分析报告2016(一)

1 背景 1.1 行业背景 1.1.1 移动端网民规模过半,使用时长份额超PC端 2016年1月22日,中国互联网络信息中心 (CNNIC)发布第37次<中国互联网络发展状况统计报告>,报告显示,网民的上网设备正在向手机端集中,手机成为拉动网民规模增长的主要因素.截至2015年12月,我国手机网民规模达6.20亿,有90.1%的网民通过手机上网. 图 1  2013Q1~2015Q3在线视频移动端和PC端有效使用时长份额对比 根据艾瑞网民行为监测系统iUserTracker及mUserTrac