【Flume】 flume中ExecSource源码的详细分析——执行终端命令获取数据

我们直接看该Source的start方法吧

public void start() {
    logger.info("Exec source starting with command:{}", command);

    executor = Executors.newSingleThreadExecutor();

    runner = new ExecRunnable(shell, command, getChannelProcessor(), sourceCounter,
        restart, restartThrottle, logStderr, bufferCount, batchTimeout, charset);

    // FIXME: Use a callback-like executor / future to signal us upon failure.
    runnerFuture = executor.submit(runner);

    /*
     * NB: This comes at the end rather than the beginning of the method because
     * it sets our state to running. We want to make sure the executor is alive
     * and well first.
     */
    sourceCounter.start();
    super.start();

    logger.debug("Exec source started");
  }

启动了一个线程来运行,运行的详细过程看runner

它是一个线程,实现了Runnable接口,所以直接看它重写的run方法的逻辑,我们一块一块来看:

 if(shell != null) {
            String[] commandArgs = formulateShellCommand(shell, command);
            process = Runtime.getRuntime().exec(commandArgs);
          }  else {
            String[] commandArgs = command.split("\\s+");
            process = new ProcessBuilder(commandArgs).start();
          }
          reader = new BufferedReader(
              new InputStreamReader(process.getInputStream(), charset));

这里就是执行shell命令,并且将shell命令的输出结果作为输入流读到reader中,InputStreamReader是字节流通向字符流的桥梁,它使用指定的charset读取字节并将其解码为字符,每次调用read方法都会从底层输入流读取一个或多个字节。

 while ((line = reader.readLine()) != null) {
            synchronized (eventList) {
              sourceCounter.incrementEventReceivedCount();
              eventList.add(EventBuilder.withBody(line.getBytes(charset)));
              if(eventList.size() >= bufferCount || timeout()) {
                flushEventBatch(eventList);
              }
            }
          }

如果读入的内容非空,先同步eventList,如果eventList超出一定范围未做处理就会flush

private void flushEventBatch(List<Event> eventList){
      channelProcessor.processEventBatch(eventList);
      sourceCounter.addToEventAcceptedCount(eventList.size());
      eventList.clear();
      lastPushToChannel = systemClock.currentTimeMillis();
    }

flush就是现将积攒下来的eventList中的event都处理掉,然后清空

1、将event都放入配置的通道中

 for (Event event : events) {
      List<Channel> reqChannels = selector.getRequiredChannels(event);

      for (Channel ch : reqChannels) {
        List<Event> eventQueue = reqChannelQueue.get(ch);
        if (eventQueue == null) {
          eventQueue = new ArrayList<Event>();
          reqChannelQueue.put(ch, eventQueue);
        }
        eventQueue.add(event);
      }

      List<Channel> optChannels = selector.getOptionalChannels(event);

      for (Channel ch: optChannels) {
        List<Event> eventQueue = optChannelQueue.get(ch);
        if (eventQueue == null) {
          eventQueue = new ArrayList<Event>();
          optChannelQueue.put(ch, eventQueue);
        }

        eventQueue.add(event);
      }
    }

这里就是将event放到通道中的详细过程了,但是这里大家注意到有两次selector的getchannel的方法,这是因为通道的选择器模式有两种:复用和复制

  if(restart) {
          logger.info("Restarting in {}ms, exit code {}", restartThrottle,
              exitCode);
          try {
            Thread.sleep(restartThrottle);
          } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
          }
        } else {
          logger.info("Command [" + command + "] exited with " + exitCode);
        }
      } while(restart);

restart参数的含义是,当shell命令执行的时候进程死了,是否重启该命令的进程,默认是false

配置为true的话,就会将刚才的所有代码循环一遍

总结:

1、event如何产出的?

 eventList.add(EventBuilder.withBody(line.getBytes(charset)));
 public static Event withBody(byte[] body, Map<String, String> headers) {
    Event event = new SimpleEvent();

    if(body == null) {
      body = new byte[0];
    }
    event.setBody(body);

    if (headers != null) {
      event.setHeaders(new HashMap<String, String>(headers));
    }

    return event;
  }

2、event如何放入通道?

private void flushEventBatch(List<Event> eventList){
      channelProcessor.processEventBatch(eventList);
      sourceCounter.addToEventAcceptedCount(eventList.size());
      eventList.clear();
      lastPushToChannel = systemClock.currentTimeMillis();
    }
时间: 2024-08-01 22:16:40

【Flume】 flume中ExecSource源码的详细分析——执行终端命令获取数据的相关文章

Flume 实战(2)--Flume-ng-sdk源码分析

具体参考: 官方用户手册和开发指南 http://flume.apache.org/FlumeDeveloperGuide.html *) 定位和简单例子 1). Flume-ng-sdk是用于编写往flume agent发送数据的client sdk2). 简单示例 RpcClient client = null; try { client = RpcClientFactory.getDefaultInstance("127.0.0.1", 41414); Event event =

Flume-NG(1.5版本)中SpillableMemoryChannel源码级分析

SpillableMemoryChannel是1.5版本新增的一个channel.这个channel优先将evnet放在内存中,一旦内存达到设定的容量就使用file channel写入磁盘.然后读的时候会按照顺序读取:会通过一个DrainOrderQueue来保证不管是内存中的还是溢出(本文的“溢出”指的是内存channel已满,需要使用file channel存储数据)文件中的顺序.这个Channel是memory channel和file channel的一个折中,虽然在内存中的数据仍然可能

JFinal 源码超详细解析之DB+ActiveRecord

我记得以前有人跟我说,"面试的时候要看spring的源码,要看ioc.aop的源码"那为什么要看这些开源框架的源码呢,其实很多人都是"应急式"的去读,就像读一篇文章一下,用最快的速度把文章从头到尾读一遍,那结果就是当你读完它,你也不清楚它讲了一个什么故事,想表达什么. 一个优秀的架构的源码我认为就好像一本名著一样,你的"文学"水平越高,你就越能读出作者设计的精妙之处.一篇源码在你不同水平的时候,能读出不同的东西,因此,我觉得优秀的框架的源码是经久

opencv2.4.9中stitching_detailed源码环境搭建

今天做了一下老师给的第一套题,第一题是判断一个字符串是否在另一个字符串中:做了一下,感觉有好多种写法,java中的类真的好多啊,要掌握好一些基本类的用法: package com.exam.e120; public class java1 { public static void main(String[]args){ String str1,str2; str1="I am Tom, I am from China."; str2="Tom"; int i=str

如何在Eclipse中连接源码

最近在很多场合都看见设计模式的影子,一直以来,都投入主要时间在搞算法与数据结构,很来发现设计模式真的很重要.有的时候代码的可维护.可重用.可扩展确实胜过单纯的算法效率高.所以拾起大牛书籍<大话设计模式>同时参考网上诸大牛的博客,开始我的设计模式之旅.由于平时编程时用C/C++,现在是Java,也练练Java语法. 今天先介绍一下命令模式. 概念: 命令模式(Command):将一个请求封装成一个对象,从而使你可用不同的请求对象对客户进行参数化,对请求排队或记录请求日志,以及支持可撤销的操作.

Android中关联源码的方法

这里给大家介绍一个很方便的关联源码的方法. 1.打开Android SDK Manager.把你所使用的版本的API给下载下来,如下图所示... 2.关联源码时,将源码关联到对应API的目录,如: E:\开发者工具\android可能工具包\adt-bundle-windows-x86-20130729\sdk\sources\android-18 这时候,就能关联成功了... Android中关联源码的方法

查看support-v4支持包中的源码

在support-v4包里面,添加了很多的支持控件,比如ViewPager,Fragment等,为了解决一些问题,我们有时候想要看一下实现源码,但是点进去之后,源码并不会显示出来,会出现下面的情况. 那么,我们怎么才能看到支持包里面的源码呢? 下面,给大家一个解决方案. 首先,在文件夹libs下面创建一个文件,名称为android-support-v4.jar.properties,如下: 然后,在文件里面写上我们的support-v4包的源码文件夹的所在,比如,我是Mac系统,文件在下面这个目

深入理解 Node.js 中 EventEmitter源码分析(3.0.0版本)

events模块对外提供了一个 EventEmitter 对象,即:events.EventEmitter. EventEmitter 是NodeJS的核心模块events中的类,用于对NodeJS中的事件进行统一管理,使用events可以对特定的API事件进行添加,触发和移除等.我们可以通过 require('events')来访问该模块. 比如如下代码: // 引入 events 模块 const events = require('events'); console.log(events)

Python源码剖析笔记3-Python执行原理初探

Python源码剖析笔记3-Python执行原理初探 本文简书地址:http://www.jianshu.com/p/03af86845c95 之前写了几篇源码剖析笔记,然而慢慢觉得没有从一个宏观的角度理解python执行原理的话,从底向上分析未免太容易让人疑惑,不如先从宏观上对python执行原理有了一个基本了解,再慢慢探究细节,这样也许会好很多.这也是最近这么久没有更新了笔记了,一直在看源码剖析书籍和源码,希望能够从一个宏观层面理清python执行原理.人说读书从薄读厚,再从厚读薄方是理解了