Flume-ng-1.4.0 spooling source的方式增加了对目录的递归检测的支持



因为flume的spooldir不支持子目录文件的递归检测,并且业务需要,所以修改了源码,重新编译



代码修改参考自:http://blog.csdn.net/yangbutao/article/details/8835563

不过在1.4中已经不是修改SpoolingFileLineReader类了,而是apache-flume-1.4.0-src\flume-ng-core\src\main\java\org\apache\flume\client\avro\ReliableSpoolingFileEventReader.java

并且变量directory应该改为spoolDirectory

 1     /*
 2      * @author admln
 3      *
 4      * @date 2015年4月8日 上午9:37:20
 5      */
 6     private void listDirFiles(List<File> files, File dir, FileFilter filter) {
 7         File[] childs = dir.listFiles(filter);
 8         for (int i = 0; i < childs.length; i++) {
 9             if (childs[i].isFile()) {
10                 files.add(childs[i]);
11             } else {
12                 if (childs[i].isDirectory()) {
13                     listDirFiles(files, childs[i], filter);
14                 }
15             }
16         }
17     }
18
19     /**
20      * Find and open the oldest file in the chosen directory. If two or more
21      * files are equally old, the file name with lower lexicographical value is
22      * returned. If the directory is empty, this will return an absent option.
23      */
24     private Optional<FileInfo> getNextFile() {
25         /* Filter to exclude finished or hidden files */
26         FileFilter filter = new FileFilter() {
27             public boolean accept(File candidate) {
28                 String fileName = candidate.getName();
29                 if ((candidate.isDirectory())
30                         || (fileName.endsWith(completedSuffix))
31                         || (fileName.startsWith("."))
32                         || ignorePattern.matcher(fileName).matches()) {
33                     return false;
34                 }
35                 return true;
36             }
37         };
38         // List<File> candidateFiles =
39         // Arrays.asList(spoolDirectory.listFiles(filter));
40         List<File> candidateFiles = new ArrayList<File>();
41         listDirFiles(candidateFiles, spoolDirectory, filter);


重新编译的时候可以参考:http://www.iteblog.com/archives/1032
编译命令:

mvn install -Phadoop-2 -DskipTests -Dtar

会下很多各种jar包,很浪费时间



已经编译好的:http://pan.baidu.com/s/1hqgV9vm    g4w6



Flume-ng-1.4.0 spooling source的方式增加了对目录的递归检测的支持

时间: 2024-10-10 10:34:08

Flume-ng-1.4.0 spooling source的方式增加了对目录的递归检测的支持的相关文章

Flume NG 学习笔记(一)简介

一.简介 Flume是一个分布式.可靠.高可用的海量日志聚合系统,支持在系统中定制各类数据发送方,用于收集数据:同时,Flume提供对数据的简单处理,并写到各种数据接收方的能力. Flume在0.9.x and 1.x之间有较大的架构调整,1.x版本之后的改称Flume NG(next generation),0.9.x的称为Flume OG(originalgeneration). 对于OG版本, Flume NG (1.x.x)的主要变化如下: 1.sources和sinks 使用chann

【转】Flume(NG)架构设计要点及配置实践

Flume(NG)架构设计要点及配置实践 Flume NG是一个分布式.可靠.可用的系统,它能够将不同数据源的海量日志数据进行高效收集.聚合.移动,最后存储到一个中心化数据存储系统中.由原来的Flume OG到现在的Flume NG,进行了架构重构,并且现在NG版本完全不兼容原来的OG版本.经过架构重构后,Flume NG更像是一个轻量的小工具,非常简单,容易适应各种方式日志收集,并支持failover和负载均衡. 架构设计要点 Flume的架构主要有一下几个核心概念: Event:一个数据单元

Flume NG 简介及配置实战

Flume 作为 cloudera 开发的实时日志收集系统,受到了业界的认可与广泛应用.Flume 初始的发行版本目前被统称为 Flume OG(original generation),属于 cloudera.但随着 FLume 功能的扩展,Flume OG 代码工程臃肿.核心组件设计不合理.核心配置不标准等缺点暴露出来,尤其是在 Flume OG 的最后一个发行版本 0.94.0 中,日志传输不稳定的现象尤为严重,为了解决这些问题,2011 年 10 月 22 号,cloudera 完成了

Flume NG源码分析(五)使用ThriftSource通过RPC方式收集日志

上一篇说了利用ExecSource从本地日志文件异步的收集日志,这篇说说采用RPC方式同步收集日志的方式.笔者对Thrift比较熟悉,所以用ThriftSource来介绍RPC的日志收集方式. 整体的结构图如下: 1. ThriftSource包含了一个Thrift Server,以及一个Thrift Service服务的实现.这里的Thrift Service是由ThriftSourceProtocol定义 2. 应用程序调用Thrift Service的客户端,以RPC的方式将日志发送到Th

Flume NG 学习笔记(五)Sinks和Channel配置

一.HDFS Sink Flume Sink是将事件写入到Hadoop分布式文件系统(HDFS)中.主要是Flume在Hadoop环境中的应用,即Flume采集数据输出到HDFS,适用大数据日志场景. 目前,它支持HDFS的文本和序列文件格式,以及支持两个文件类型的压缩.支持将所用的时间.数据大小.事件的数量为操作参数,对HDFS文件进行关闭(关闭当前文件,并创建一个新的).它还可以对事源的机器名(hostname)及时间属性分离数据,即通过时间戳将数据分布到对应的文件路径. HDFS目录路径可

Flume NG源码分析(三)使用Event接口表示数据流

Flume NG有4个主要的组件: Event表示在Flume各个Agent之间传递的数据流 Source表示从外部源接收Event数据流,然后传递给Channel Channel表示对从Source传递的Event数据流的临时存储 Sink表示从Channel中接收存储的Event数据流,并传递给下游的Source或者终点仓库 这篇看一下Event接口表示的数据流.Source, Channel, Sink操作的数据流都是基于Event接口的封装. public interface Event

Flume NG简介及配置

常用的分布式日志收集系统: Apache Flume. Facebook Scribe. Apache Chukwa 1.Flume 作为 cloudera 开发的实时日志收集系统,受到了业界的认可与广泛应用.Flume 初始的发行版本目前被统称为 Flume OG(original generation),属于 cloudera.但随着 FLume 功能的扩展,Flume OG 代码工程臃肿.核心组件设计不合理.核心配置不标准等缺点暴露出来,尤其是在 Flume OG 的最后一个发行版本 0.

高可用Hadoop平台-Flume NG实战图解篇

1.概述 今天补充一篇关于Flume的博客,前面在讲解高可用的Hadoop平台的时候遗漏了这篇,本篇博客为大家讲述以下内容: Flume NG简述 单点Flume NG搭建.运行 高可用Flume NG搭建 Failover测试 截图预览 下面开始今天的博客介绍. 2.Flume NG简述 Flume NG是一个分布式,高可用,可靠的系统,它能将不同的海量数据收集,移动并存储到一个数据存储系统中.轻量,配置简单,适用于各种日志收集,并支持Failover和负载均衡.并且它拥有非常丰富的组件.Fl

【Flume NG用户指南】(2)构造

作者:周邦涛(Timen) Email:[email protected] 转载请注明出处:  http://blog.csdn.net/zhoubangtao/article/details/28277575 上一篇请參考[Flume NG用户指南](1)设置 3. 配置 前边的文章已经介绍过了,Flume Agent配置是从一个具有分层属性的Java属性文件格式的文件里读取的. 3.1 定义数据流 要在一个Flume Agent中定义数据流,你须要通过一个Channel将Source和Sin