【Java】【Fulme】Flume-NG源代码阅读之SpoolDirectorySource

org.apache.flume.source.SpoolDirectorySource是flume的一个经常使用的source,这个源支持从磁盘中某目录获取文件数据。不同于其它异步源,这个源可以避免重新启动或者发送失败后数据丢失。flume可以监控目录,当出现新文件时会读取该文件并获取数据。当一个给定的文件被所有读入到通道中时,该文件会被重命名以标志已经完毕。同一时候,该源须要一个清理进程来定期移除完毕的文件。

  通道可选地将一个完毕路径的原始文件插入到每一个事件的hearder域中。在读取文件时,source缓存文件数据到内存中。同一时候,须要确定设置了bufferMaxLineLength选项,以确保该数据远大于输入数据中数据最长的某一行。

注意!!!channel仅仅接收spooling directory中唯一命名的文件。假设文件名称反复或文件在读取过程中被改动,则会有读取失败返回异常信息。这样的场景下,同名的文件拷贝到这个文件夹时建议带唯一标示,比方时间戳。

一、configure(Context context)方法。代码例如以下:

public void configure(Context context) {
    spoolDirectory = context.getString(SPOOL_DIRECTORY);
    Preconditions.checkState(spoolDirectory != null,
        "Configuration must specify a spooling directory");

    completedSuffix = context.getString(SPOOLED_FILE_SUFFIX,
        DEFAULT_SPOOLED_FILE_SUFFIX);
    deletePolicy = context.getString(DELETE_POLICY, DEFAULT_DELETE_POLICY);
    fileHeader = context.getBoolean(FILENAME_HEADER,
        DEFAULT_FILE_HEADER);
    fileHeaderKey = context.getString(FILENAME_HEADER_KEY,
        DEFAULT_FILENAME_HEADER_KEY);
    batchSize = context.getInteger(BATCH_SIZE,
        DEFAULT_BATCH_SIZE);
    inputCharset = context.getString(INPUT_CHARSET, DEFAULT_INPUT_CHARSET);

    ignorePattern = context.getString(IGNORE_PAT, DEFAULT_IGNORE_PAT);
    trackerDirPath = context.getString(TRACKER_DIR, DEFAULT_TRACKER_DIR);

    deserializerType = context.getString(DESERIALIZER, DEFAULT_DESERIALIZER);
    deserializerContext = new Context(context.getSubProperties(DESERIALIZER +
        "."));

    // "Hack" to support backwards compatibility with previous generation of
    // spooling directory source, which did not support deserializers
    Integer bufferMaxLineLength = context.getInteger(BUFFER_MAX_LINE_LENGTH);
    if (bufferMaxLineLength != null && deserializerType != null &&
        deserializerType.equals(DEFAULT_DESERIALIZER)) {
      deserializerContext.put(LineDeserializer.MAXLINE_KEY,
          bufferMaxLineLength.toString());
    }

  }

1、spoolDirectory是监控文件夹,不能为空,没有默认值。这个source不具有监控子文件夹的功能,也就是不能递归监控。假设须要,这须要自己去实现,http://blog.csdn.net/yangbutao/article/details/8835563 这里有递归检測的实现;

  2、completedSuffix是文件读取完毕后给完毕文件加入的标记后缀,默认是".COMPLETED";

  3、deletePolicy这是是否删除读取完成的文件,默认是"never",就是不删除,眼下仅仅支持"never"和“IMMEDIATE”;

  4、fileHeader是否在event的Header中加入文件名称,boolean类型

  5、fileHeaderKey这是event的Header中的key,value是文件名称

  6、batchSize这个是一次处理的记录数,默认是100;

  7、inputCharset编码方式,默认是"UTF-8";

  8、ignorePattern忽略符合条件的文件名称

  9、trackerDirPath被处理文件元数据的存储文件夹,默认".flumespool"

  10、deserializerType将文件里的数据序列化成event的方式,默认是“LINE”---org.apache.flume.serialization.LineDeserializer

  11、deserializerContext这个主要用在Deserializer中设置编码方式outputCharset和文件每行最大长度maxLineLength。

  

  二、start()方法。代码例如以下:

public void start() {
    logger.info("SpoolDirectorySource source starting with directory: {}",
        spoolDirectory);

    ScheduledExecutorService executor =
        Executors.newSingleThreadScheduledExecutor();
    counterGroup = new CounterGroup();

    File directory = new File(spoolDirectory);
    try {
      reader = new ReliableSpoolingFileEventReader.Builder()
          .spoolDirectory(directory)
          .completedSuffix(completedSuffix)
          .ignorePattern(ignorePattern)
          .trackerDirPath(trackerDirPath)
          .annotateFileName(fileHeader)
          .fileNameHeader(fileHeaderKey)
          .deserializerType(deserializerType)
          .deserializerContext(deserializerContext)
          .deletePolicy(deletePolicy)
          .inputCharset(inputCharset)
          .build();
    } catch (IOException ioe) {
      throw new FlumeException("Error instantiating spooling event parser",
          ioe);
    }

    Runnable runner = new SpoolDirectoryRunnable(reader, counterGroup);
    executor.scheduleWithFixedDelay(
        runner, 0, POLL_DELAY_MS, TimeUnit.MILLISECONDS);

    super.start();
    logger.debug("SpoolDirectorySource source started");
  }

 1、构建了一个org.apache.flume.client.avro.ReliableSpoolingFileEventReader的对象reader;

  2、启动了一个每隔POLL_DELAY_MS(默认500,单位ms)运行一次SpoolDirectoryRunnable的进程;

  三、读取并发送event进程。代码例如以下:

private class SpoolDirectoryRunnable implements Runnable {
    private ReliableSpoolingFileEventReader reader;
    private CounterGroup counterGroup;

    public SpoolDirectoryRunnable(ReliableSpoolingFileEventReader reader,
        CounterGroup counterGroup) {
      this.reader = reader;
      this.counterGroup = counterGroup;
    }

    @Override
    public void run() {
      try {
        while (true) {
          List<Event> events = reader.readEvents(batchSize);  //读取batchSize个记录
          if (events.isEmpty()) {
            break;
          }
          counterGroup.addAndGet("spooler.events.read", (long) events.size());

          getChannelProcessor().processEventBatch(events);  //将events批量发送到channel
          reader.commit();
        }
      } catch (Throwable t) {
        logger.error("Uncaught exception in Runnable", t);
        if (t instanceof Error) {
          throw (Error) t;
        }
      }
    }
  }

  该进程实现了批量读取reader所指向的文件的数据,并发送到channel。

四、org.apache.flume.client.avro.ReliableSpoolingFileEventReader的构造方法首先是先尝试对spoolDirectory是否有创建文件、读、写、删除等权限;然后在构造"$spoolDirectory/.flumespool/.flumespool-main.meta"元数据文件

五、上面SpoolDirectoryRunnable.run方法中的List<Event> events = reader.readEvents(batchSize),是org.apache.flume.client.avro.ReliableSpoolingFileEventReader.readEvents(batchSize):

 public List<Event> readEvents(int numEvents) throws IOException {
    if (!committed) {
      if (!currentFile.isPresent()) {//为空,假设Optional包括非null的引用(引用存在),返回true
        throw new IllegalStateException("File should not roll when " +
            "commit is outstanding.");
      }
      logger.info("Last read was never committed - resetting mark position.");
      currentFile.get().getDeserializer().reset();
    } else {//已经committed成功
      // Check if new files have arrived since last call
      //Returns true if this holder contains a (non-null) instance
      if (!currentFile.isPresent()) {//为空,获取下一个文件,初次调用
        currentFile = getNextFile();
      }
      // Return empty list if no new files
      if (!currentFile.isPresent()) {//为空,已经没有可读的文件了
        return Collections.emptyList();
      }
    //其他的说明是currentFile眼下还在读
    }

    EventDeserializer des = currentFile.get().getDeserializer();
    List<Event> events = des.readEvents(numEvents);//加入event的body

    /* It's possible that the last read took us just up to a file boundary.
     * If so, try to roll to the next file, if there is one. */
    if (events.isEmpty()) {
      retireCurrentFile();  //改名字
      currentFile = getNextFile();//换下一个文件
      if (!currentFile.isPresent()) {
        return Collections.emptyList();
      }
      events = currentFile.get().getDeserializer().readEvents(numEvents);//继续读,加入event的body
    }

    if (annotateFileName) {
      String filename = currentFile.get().getFile().getAbsolutePath();
      for (Event event : events) {
        event.getHeaders().put(fileNameHeader, filename);//加入header
      }
    }

    committed = false;
    lastFileRead = currentFile;
    return events;
  }

1,committed初始化时是true,所以第一次执行就是通过getNextFile()获取当前要去读的文件。假设是空就返回空值了。

2,使用deserializer(默认是org.apache.flume.serialization.LineDeserializer)的readEvents(numEvents)去批量读数据封装成event。

3,如获取的批量events为空,说明这个文件读完了,须要对这个读完的文件做个“删除”(retireCurrentFile()方法,在这也会删除元数据文件),就是依据deletePolicy(删除还是加入去读完成后缀completedSuffix);可是这个本方法是有返回值的就是events,所以须要获取下一个文件,即再次执行getNextFile(),并events = currentFile.get().getDeserializer().readEvents(numEvents)

4,是否要对这些events的Header中加入文件名称

5,committed = false;    lastFileRead = currentFile; 并返回events。

这种方法还有几点须要解释:

其一、就是committed參数,此參数关系到这一批量的event是否已经正确处理完成。能够看到上面的5中所讲,每调用一次ReliableSpoolingFileEventReader.readEvents(batchSize)均会在最后将committed设置为false,可是在SpoolDirectoryRunnable.run()方法中也能够看出在调用readEvents方法后还会调用ReliableSpoolingFileEventReader.commit()方法,代码例如以下:

/** Commit the last lines which were read. */
  @Override
  public void commit() throws IOException {
    if (!committed && currentFile.isPresent()) {
      currentFile.get().getDeserializer().mark();
      committed = true;
    }
  }

这种方法说明满足两个条件就能够:一、向trackerFile写入读到的记录位置,mark()方法会将syncPosition写入trackerFile,而ResettableFileInputStream中的position用来暂存位置添加的,待到何时会syncPosition=position,这样是为了防止出现异常时用于恢复丢失的数据;二、将committed  =
true。两个条件:一个是committed=false,这个运行完readEvents最后会置为false;二、currentFile“非空”,代表有正在读的文件。假设committed在readEvents中開始时为false,说明:一、event提交到channel时出现了问题,没有运行reader.commit;二、currentFile已经“为空”,说明没有能够读的文件。这两点也体如今readEvents開始部分,committed=false时,假设没有可读文件就会抛出异常File
should not roll when commit is outstanding.";假设是在提交到channel时出问题会通过currentFile.get().getDeserializer().reset()又一次撤回到上次正确提交channel的位置,这样能够使得不丢失数据。

其二、就是getNextFile()方法。这种方法会首先过滤检測文件夹的子文件夹(也就是不能递归)、隐藏文件(以"."开头的文件)、已经读完的文件(有completedSuffix后缀的)、符合ignorePattern的文件;然后将过滤后的文件按时间的先后顺序排序,再创建一个新的相应的元数据文件;构造一个读取文件的输入流ResettableFileInputStream,并将此输入流作为參数传递给deserializer,终于返回一个Optional.of(new FileInfo(nextFile, deserializer));

其三、就是LineDeserializer)的readEvents(numEvents)方法。这种方法会多次(numEvents)调用LineDeserializer(默认)的readLine()获取一行数据封装成event。readLine()会通过org.apache.flume.serialization.ResettableFileInputStream.readChar()不断的去获取数据,读完正行后推断每行的长度是否超过规定值maxLineLength。readChar()方法除了不断读取一个字符外,还会记下字符的位置,等待将位置写入元数据文件里(通过deserializer.mark()写入)

时间: 2025-01-12 13:24:45

【Java】【Fulme】Flume-NG源代码阅读之SpoolDirectorySource的相关文章

Java 推荐读物与源代码阅读

1. Java语言基础     谈到Java语言基础学习的书籍,大家肯定会推荐Bruce Eckel的<Thinking in Java>.它是一本写的相当深刻的技术书籍,Java语言基础部分基本没有其它任何一本书可以超越它.该书的作者Bruce Eckel在网络上被称为天才的投机者,作者的<Thinking in C++>在1995年曾获SoftwareDevelopment Jolt Award最佳书籍大奖,<Thinking in Java>被评为1999年Jav

Flume NG源代码分析(二)支持执行时动态改动配置的配置模块

在上一篇中讲了Flume NG配置模块主要的接口的类,PropertiesConfigurationProvider提供了基于properties配置文件的静态配置的能力,这篇细说一下PollingPropertiesFileConfigurationProvider提供的执行时动态改动配置并生效的能力. 要实现动态改动配置文件并生效,主要有两个待实现的功能 1. 观察配置文件是否改动 2. 假设改动,将改动的内容通知给观察者 对于第一点,监控配置文件是否改动,Flume NG定义了一个File

【Java】【Fulme】Flume-NG源码阅读之SpoolDirectorySource

org.apache.flume.source.SpoolDirectorySource是flume的一个常用的source,这个源支持从磁盘中某文件夹获取文件数据.不同于其他异步源,这个源能够避免重启或者发送失败后数据丢失.flume可以监控文件夹,当出现新文件时会读取该文件并获取数据.当一个给定的文件被全部读入到通道中时,该文件会被重命名以标志已经完成.同时,该源需要一个清理进程来定期移除完成的文件. 通道可选地将一个完成路径的原始文件插入到每个事件的hearder域中.在读取文件时,sou

【Java】【Flume】Flume-NG阅读源代码AvroSink

org.apache.flume.sink.AvroSink是用来通过网络来数据传输的.能够将event发送到RPCserver(比方AvroSource),使用AvroSink和AvroSource能够组成分层结构. 它继承自AbstractRpcSink  extends AbstractSink implements Configurable这跟其它的sink一样都得extends AbstractSink implements Configurable.所以重点也在confgure.st

分布式实时日志系统(二) 环境搭建之 flume 集群搭建/flume ng资料

最近公司业务数据量越来越大,以前的基于消息队列的日志系统越来越难以满足目前的业务量,表现为消息积压,日志延迟,日志存储日期过短,所以,我们开始着手要重新设计这块,业界已经有了比较成熟的流程,即基于流式处理,采用 flume 收集日志,发送到 kafka 队列做缓冲,storm 分布式实时框架进行消费处理,短期数据落地到 hbase.mongo中,长期数据进入 hadoop 中存储. 接下来打算将这其间所遇到的问题.学习到的知识记录整理下,作为备忘,作为分享,带给需要的人. 学习flume ng的

Flume NG 简介及配置实战

Flume 作为 cloudera 开发的实时日志收集系统,受到了业界的认可与广泛应用.Flume 初始的发行版本目前被统称为 Flume OG(original generation),属于 cloudera.但随着 FLume 功能的扩展,Flume OG 代码工程臃肿.核心组件设计不合理.核心配置不标准等缺点暴露出来,尤其是在 Flume OG 的最后一个发行版本 0.94.0 中,日志传输不稳定的现象尤为严重,为了解决这些问题,2011 年 10 月 22 号,cloudera 完成了

OpenJDK 源代码阅读之 Collections

概要 类继承关系 java.lang.Object java.util.Collections 定义 public class Collections extends Object 实现 sort public static <T extends Comparable<? super T>> void sort(List<T> list) { Object[] a = list.toArray(); Arrays.sort(a); ListIterator<T&g

Flume NG源码分析(五)使用ThriftSource通过RPC方式收集日志

上一篇说了利用ExecSource从本地日志文件异步的收集日志,这篇说说采用RPC方式同步收集日志的方式.笔者对Thrift比较熟悉,所以用ThriftSource来介绍RPC的日志收集方式. 整体的结构图如下: 1. ThriftSource包含了一个Thrift Server,以及一个Thrift Service服务的实现.这里的Thrift Service是由ThriftSourceProtocol定义 2. 应用程序调用Thrift Service的客户端,以RPC的方式将日志发送到Th

【Flume NG用户指南】(2)构造

作者:周邦涛(Timen) Email:[email protected] 转载请注明出处:  http://blog.csdn.net/zhoubangtao/article/details/28277575 上一篇请參考[Flume NG用户指南](1)设置 3. 配置 前边的文章已经介绍过了,Flume Agent配置是从一个具有分层属性的Java属性文件格式的文件里读取的. 3.1 定义数据流 要在一个Flume Agent中定义数据流,你须要通过一个Channel将Source和Sin