flume spoolDirectorySource中的 File has been modified since being read与File has changed size since being read错误





 1 File fileToRoll = new File(currentFile.get().getFile().getAbsolutePath());
 3  currentFile.get().getDeserializer().close();
 5  // Verify that spooling assumptions hold
 6  if (fileToRoll.lastModified() != currentFile.get().getLastModified()) {
 7    String message = "File has been modified since being read: " + fileToRoll +"\n"
 8            + "fileToRoll.lastModified() : " + fileToRoll.lastModified() +
 9            "currentFile.get().getLastModified() : " + currentFile.get().getLastModified();
10    throw new IllegalStateException(message);
11  }
12  if (fileToRoll.length() != currentFile.get().getLength()) {
13    String message = "File has changed size since  being read: " + fileToRoll +"\n"
14            + "fileToRoll.length() : " + fileToRoll.length() +
15            "currentFile.get().getLength() : " + currentFile.get().getLength();
16    throw new IllegalStateException(message);
17  }



private static final int POLL_DELAY_MS = 15000;





 1 /**
 2  * Find and open the oldest file in the chosen directory. If two or more
 3  * files are equally old, the file name with lower lexicographical value is
 4  * returned. If the directory is empty, this will return an absent option.
 5  */
 6 private Optional<FileInfo> getNextFile() {
 7   /* Filter to exclude finished or hidden files */
 8   FileFilter filter = new FileFilter() {
 9     public boolean accept(File candidate) {
10       String fileName = candidate.getName();
11       if ((candidate.isDirectory()) ||
12           (fileName.endsWith(completedSuffix)) ||
13           (fileName.startsWith(".")) ||
14           ignorePattern.matcher(fileName).matches()) {
15         return false;
16       }
17       return true;
18     }
19   };
20   List<File> candidateFiles = Arrays.asList(spoolDirectory.listFiles(filter)); //获取spoolDirectory下满足条件的文件
21   if (candidateFiles.isEmpty()) {
22     return Optional.absent();
23   } else {
24     Collections.sort(candidateFiles, new Comparator<File>() { //按最后修改时间排序文件
25       public int compare(File a, File b) {
26         int timeComparison = new Long(a.lastModified()).compareTo(
27             new Long(b.lastModified()));
28         if (timeComparison != 0) {
29           return timeComparison;
30         }
31         else {
32           return a.getName().compareTo(b.getName());
33         }
34       }
35     });
36     File nextFile = candidateFiles.get(0); //因为每次获取到的文件处理完都会被标记为已完成,所以直接取拍完序的第一个
37     //修复传输大文件报错文件被修改的BUG
38     this.checkFileCpIsOver(nextFile);//此处被阻塞,直到文件拷贝文件或者超过20秒
40     try {
41       // roll the meta file, if needed
42       String nextPath = nextFile.getPath();
43       PositionTracker tracker =
44           DurablePositionTracker.getInstance(metaFile, nextPath);
45       if (!tracker.getTarget().equals(nextPath)) {
46         tracker.close();
47         deleteMetaFile();
48         tracker = DurablePositionTracker.getInstance(metaFile, nextPath);
49       }
51       // sanity check
52       Preconditions.checkState(tracker.getTarget().equals(nextPath),
53           "Tracker target %s does not equal expected filename %s",
54           tracker.getTarget(), nextPath);
56       ResettableInputStream in =
57           new ResettableFileInputStream(nextFile, tracker,
58               ResettableFileInputStream.DEFAULT_BUF_SIZE, inputCharset,
59               decodeErrorPolicy);
60       EventDeserializer deserializer = EventDeserializerFactory.getInstance
61           (deserializerType, deserializerContext, in);
63       return Optional.of(new FileInfo(nextFile, deserializer));
64     } catch (FileNotFoundException e) {
65       // File could have been deleted in the interim
66       logger.warn("Could not find file: " + nextFile, e);
67       return Optional.absent();
68     } catch (IOException e) {
69       logger.error("Exception opening file: " + nextFile, e);
70       return Optional.absent();
71     }
72   }
73 }



 1 /**
 2  *
 3  * @Title: checkFileCpIsOver
 4  * @Description: TODO(用来检查文件拷贝是否完成)
 5  * @param @param currentFile    设定文件
 6  * @return void    返回类型
 7  * @throws
 8  */
 9 private void checkFileCpIsOver(File file) {
10     long modified = file.lastModified();//目前文件的修改时间
11     long length = file.length();//目前文件的大小
12     try {
13       Thread.sleep(1000);//等待1秒钟
14     } catch (InterruptedException e) {
15       // TODO Auto-generated catch block
16       e.printStackTrace();
17     }
18     File currentFile = new File(file.getAbsolutePath());
19     int count = 0;//记录循环次数,超过20次,也就是10秒后抛出异常
20     while(currentFile.lastModified() != modified || currentFile.length() != length) {
21         if(count > 20) {
22             String message = "File Copy time too long. please check copy whether exception!" + "\n"
23                       + "File at :" + file.getAbsolutePath() + "\n"
24                       + "File current length is:" + currentFile.lastModified();
25             new IllegalStateException(message);
26         }
27         count++;
28         modified = currentFile.lastModified();
29         length = currentFile.length();
30         try {
31           Thread.sleep(500);//等待500毫秒
32         } catch (InterruptedException e) {
33           // TODO Auto-generated catch block
34           e.printStackTrace();
35         }
36         currentFile = new File(file.getAbsolutePath());
39     }
40     //一直到文件传输完成就可以退出
41 }


时间: 2024-08-01 23:09:36

