以下内容都为自己浅显的理解,用作备忘的流水账,所以写的比较混乱。如理解有错误,请帮忙指正
FLUME-NG中没有之前的对文件的实时流SOURCE,只提供了spoolDir的source,这个source的功能监控指定文件夹,放入文件夹内的文件不能再做任何修改(包括修改时间和文件大小),这2个错误正是对应这2个
在代码中体现为
org.apache.flume.client.avro.ReliableSpoolingFileEventReader.retireCurrentFile()方法内
1 File fileToRoll = new File(currentFile.get().getFile().getAbsolutePath()); 2 3 currentFile.get().getDeserializer().close(); 4 5 // Verify that spooling assumptions hold 6 if (fileToRoll.lastModified() != currentFile.get().getLastModified()) { 7 String message = "File has been modified since being read: " + fileToRoll +"\n" 8 + "fileToRoll.lastModified() : " + fileToRoll.lastModified() + 9 "currentFile.get().getLastModified() : " + currentFile.get().getLastModified(); 10 throw new IllegalStateException(message); 11 } 12 if (fileToRoll.length() != currentFile.get().getLength()) { 13 String message = "File has changed size since being read: " + fileToRoll +"\n" 14 + "fileToRoll.length() : " + fileToRoll.length() + 15 "currentFile.get().getLength() : " + currentFile.get().getLength(); 16 throw new IllegalStateException(message); 17 }
但是问题是我们确实没有对文件做出过任何修改啊,会什么还是会报这个错误。查看了代码后,发现他的这个线程频率为500ms,当我们拷贝一个大些的文件的时候,500ms还没有拷贝完成,所以就会出现这样的错误。当然flume被设计成500MS,是因为默认大家都是传很小的文件,每几分钟或者每几秒就做写一个日志文件,就不会存在这样的问题。
org.apache.flume.source.SpoolDirectorySource
//这个检验的太快,当文件比较大的时候,需要拷贝的时间比较超过500毫秒,就会报文件更改或者文件大小的变化,改为5000 private static final int POLL_DELAY_MS = 15000;
默认为500MS,我给改成15000ms。
那么问题既然出来了,我想就把这个值调大点吧,调成15秒总行了吧。但是经过测试后发现,还是不可以。
那这又是为什么呢?原因很简单,即使我们把这个值调成1万秒,当一个大点的文件正好在第9999秒的时候往里面拷贝了,一秒后线程启动了发现一个新文件并记下了这时候的这个文件的修改时间与大小,但是因为文件并没有拷贝完成,所以这2个值都是错误。当刚才的那部分读完后,程序中检查文件的修改时间与大小看看是否被改变,刚才那段时间文件又被copy了一些进来,此时就会报上面的错误了。
知道了根本的原因,就像从根本上解决这个问题,最好的方式就是我们等文件完全拷贝完成,我们再开始读这个文件。那找到代码中获取要读文件的这部分
1 /** 2 * Find and open the oldest file in the chosen directory. If two or more 3 * files are equally old, the file name with lower lexicographical value is 4 * returned. If the directory is empty, this will return an absent option. 5 */ 6 private Optional<FileInfo> getNextFile() { 7 /* Filter to exclude finished or hidden files */ 8 FileFilter filter = new FileFilter() { 9 public boolean accept(File candidate) { 10 String fileName = candidate.getName(); 11 if ((candidate.isDirectory()) || 12 (fileName.endsWith(completedSuffix)) || 13 (fileName.startsWith(".")) || 14 ignorePattern.matcher(fileName).matches()) { 15 return false; 16 } 17 return true; 18 } 19 }; 20 List<File> candidateFiles = Arrays.asList(spoolDirectory.listFiles(filter)); //获取spoolDirectory下满足条件的文件 21 if (candidateFiles.isEmpty()) { 22 return Optional.absent(); 23 } else { 24 Collections.sort(candidateFiles, new Comparator<File>() { //按最后修改时间排序文件 25 public int compare(File a, File b) { 26 int timeComparison = new Long(a.lastModified()).compareTo( 27 new Long(b.lastModified())); 28 if (timeComparison != 0) { 29 return timeComparison; 30 } 31 else { 32 return a.getName().compareTo(b.getName()); 33 } 34 } 35 }); 36 File nextFile = candidateFiles.get(0); //因为每次获取到的文件处理完都会被标记为已完成,所以直接取拍完序的第一个 37 //修复传输大文件报错文件被修改的BUG 38 this.checkFileCpIsOver(nextFile);//此处被阻塞,直到文件拷贝文件或者超过20秒 39 40 try { 41 // roll the meta file, if needed 42 String nextPath = nextFile.getPath(); 43 PositionTracker tracker = 44 DurablePositionTracker.getInstance(metaFile, nextPath); 45 if (!tracker.getTarget().equals(nextPath)) { 46 tracker.close(); 47 deleteMetaFile(); 48 tracker = DurablePositionTracker.getInstance(metaFile, nextPath); 49 } 50 51 // sanity check 52 Preconditions.checkState(tracker.getTarget().equals(nextPath), 53 "Tracker target %s does not equal expected filename %s", 54 tracker.getTarget(), nextPath); 55 56 ResettableInputStream in = 57 new ResettableFileInputStream(nextFile, tracker, 58 ResettableFileInputStream.DEFAULT_BUF_SIZE, inputCharset, 59 decodeErrorPolicy); 60 EventDeserializer deserializer = EventDeserializerFactory.getInstance 61 (deserializerType, deserializerContext, in); 62 63 return Optional.of(new FileInfo(nextFile, deserializer)); 64 } catch (FileNotFoundException e) { 65 // File could have been deleted in the interim 66 logger.warn("Could not find file: " + nextFile, e); 67 return Optional.absent(); 68 } catch (IOException e) { 69 logger.error("Exception opening file: " + nextFile, e); 70 return Optional.absent(); 71 } 72 } 73 }
在方法的第36行是获取准备要读的文件的部分,之前就是直接拿到文件不会检查文件是否拷贝完成,第38行为自己添加的方法
方法如下:
1 /** 2 * 3 * @Title: checkFileCpIsOver 4 * @Description: TODO(用来检查文件拷贝是否完成) 5 * @param @param currentFile 设定文件 6 * @return void 返回类型 7 * @throws 8 */ 9 private void checkFileCpIsOver(File file) { 10 long modified = file.lastModified();//目前文件的修改时间 11 long length = file.length();//目前文件的大小 12 try { 13 Thread.sleep(1000);//等待1秒钟 14 } catch (InterruptedException e) { 15 // TODO Auto-generated catch block 16 e.printStackTrace(); 17 } 18 File currentFile = new File(file.getAbsolutePath()); 19 int count = 0;//记录循环次数,超过20次,也就是10秒后抛出异常 20 while(currentFile.lastModified() != modified || currentFile.length() != length) { 21 if(count > 20) { 22 String message = "File Copy time too long. please check copy whether exception!" + "\n" 23 + "File at :" + file.getAbsolutePath() + "\n" 24 + "File current length is:" + currentFile.lastModified(); 25 new IllegalStateException(message); 26 } 27 count++; 28 modified = currentFile.lastModified(); 29 length = currentFile.length(); 30 try { 31 Thread.sleep(500);//等待500毫秒 32 } catch (InterruptedException e) { 33 // TODO Auto-generated catch block 34 e.printStackTrace(); 35 } 36 currentFile = new File(file.getAbsolutePath()); 37 38 39 } 40 //一直到文件传输完成就可以退出 41 }
修复完成,将修改涉及到的部分重新打包,替换线上JAR包,再次验证。