因为flume的spooldir不支持子目录文件的递归检测,并且业务需要,所以修改了源码,重新编译
代码修改参考自:http://blog.csdn.net/yangbutao/article/details/8835563
不过在1.4中已经不是修改SpoolingFileLineReader类了,而是apache-flume-1.4.0-src\flume-ng-core\src\main\java\org\apache\flume\client\avro\ReliableSpoolingFileEventReader.java
并且变量directory应该改为spoolDirectory
1 /* 2 * @author admln 3 * 4 * @date 2015年4月8日 上午9:37:20 5 */ 6 private void listDirFiles(List<File> files, File dir, FileFilter filter) { 7 File[] childs = dir.listFiles(filter); 8 for (int i = 0; i < childs.length; i++) { 9 if (childs[i].isFile()) { 10 files.add(childs[i]); 11 } else { 12 if (childs[i].isDirectory()) { 13 listDirFiles(files, childs[i], filter); 14 } 15 } 16 } 17 } 18 19 /** 20 * Find and open the oldest file in the chosen directory. If two or more 21 * files are equally old, the file name with lower lexicographical value is 22 * returned. If the directory is empty, this will return an absent option. 23 */ 24 private Optional<FileInfo> getNextFile() { 25 /* Filter to exclude finished or hidden files */ 26 FileFilter filter = new FileFilter() { 27 public boolean accept(File candidate) { 28 String fileName = candidate.getName(); 29 if ((candidate.isDirectory()) 30 || (fileName.endsWith(completedSuffix)) 31 || (fileName.startsWith(".")) 32 || ignorePattern.matcher(fileName).matches()) { 33 return false; 34 } 35 return true; 36 } 37 }; 38 // List<File> candidateFiles = 39 // Arrays.asList(spoolDirectory.listFiles(filter)); 40 List<File> candidateFiles = new ArrayList<File>(); 41 listDirFiles(candidateFiles, spoolDirectory, filter);
重新编译的时候可以参考:http://www.iteblog.com/archives/1032
编译命令:
mvn install -Phadoop-2 -DskipTests -Dtar
会下很多各种jar包,很浪费时间
已经编译好的:http://pan.baidu.com/s/1hqgV9vm g4w6
Flume-ng-1.4.0 spooling source的方式增加了对目录的递归检测的支持
时间: 2024-10-10 10:34:08