flume spoolDirectorySource中的 File has been modified since being read与File has changed size since being read错误

  以下内容都为自己浅显的理解,用作备忘的流水账,所以写的比较混乱。如理解有错误,请帮忙指正

  FLUME-NG中没有之前的对文件的实时流SOURCE,只提供了spoolDir的source,这个source的功能监控指定文件夹,放入文件夹内的文件不能再做任何修改(包括修改时间和文件大小),这2个错误正是对应这2个

在代码中体现为

org.apache.flume.client.avro.ReliableSpoolingFileEventReader.retireCurrentFile()方法内

 1 File fileToRoll = new File(currentFile.get().getFile().getAbsolutePath());
 2
 3  currentFile.get().getDeserializer().close();
 4
 5  // Verify that spooling assumptions hold
 6  if (fileToRoll.lastModified() != currentFile.get().getLastModified()) {
 7    String message = "File has been modified since being read: " + fileToRoll +"\n"
 8            + "fileToRoll.lastModified() : " + fileToRoll.lastModified() +
 9            "currentFile.get().getLastModified() : " + currentFile.get().getLastModified();
10    throw new IllegalStateException(message);
11  }
12  if (fileToRoll.length() != currentFile.get().getLength()) {
13    String message = "File has changed size since  being read: " + fileToRoll +"\n"
14            + "fileToRoll.length() : " + fileToRoll.length() +
15            "currentFile.get().getLength() : " + currentFile.get().getLength();
16    throw new IllegalStateException(message);
17  }

但是问题是我们确实没有对文件做出过任何修改啊,会什么还是会报这个错误。查看了代码后,发现他的这个线程频率为500ms,当我们拷贝一个大些的文件的时候,500ms还没有拷贝完成,所以就会出现这样的错误。当然flume被设计成500MS,是因为默认大家都是传很小的文件,每几分钟或者每几秒就做写一个日志文件,就不会存在这样的问题。

org.apache.flume.source.SpoolDirectorySource

//这个检验的太快,当文件比较大的时候,需要拷贝的时间比较超过500毫秒,就会报文件更改或者文件大小的变化,改为5000
private static final int POLL_DELAY_MS = 15000;

默认为500MS,我给改成15000ms。

那么问题既然出来了,我想就把这个值调大点吧,调成15秒总行了吧。但是经过测试后发现,还是不可以。

那这又是为什么呢?原因很简单,即使我们把这个值调成1万秒,当一个大点的文件正好在第9999秒的时候往里面拷贝了,一秒后线程启动了发现一个新文件并记下了这时候的这个文件的修改时间与大小,但是因为文件并没有拷贝完成,所以这2个值都是错误。当刚才的那部分读完后,程序中检查文件的修改时间与大小看看是否被改变,刚才那段时间文件又被copy了一些进来,此时就会报上面的错误了。

知道了根本的原因,就像从根本上解决这个问题,最好的方式就是我们等文件完全拷贝完成,我们再开始读这个文件。那找到代码中获取要读文件的这部分

 1 /**
 2  * Find and open the oldest file in the chosen directory. If two or more
 3  * files are equally old, the file name with lower lexicographical value is
 4  * returned. If the directory is empty, this will return an absent option.
 5  */
 6 private Optional<FileInfo> getNextFile() {
 7   /* Filter to exclude finished or hidden files */
 8   FileFilter filter = new FileFilter() {
 9     public boolean accept(File candidate) {
10       String fileName = candidate.getName();
11       if ((candidate.isDirectory()) ||
12           (fileName.endsWith(completedSuffix)) ||
13           (fileName.startsWith(".")) ||
14           ignorePattern.matcher(fileName).matches()) {
15         return false;
16       }
17       return true;
18     }
19   };
20   List<File> candidateFiles = Arrays.asList(spoolDirectory.listFiles(filter)); //获取spoolDirectory下满足条件的文件
21   if (candidateFiles.isEmpty()) {
22     return Optional.absent();
23   } else {
24     Collections.sort(candidateFiles, new Comparator<File>() { //按最后修改时间排序文件
25       public int compare(File a, File b) {
26         int timeComparison = new Long(a.lastModified()).compareTo(
27             new Long(b.lastModified()));
28         if (timeComparison != 0) {
29           return timeComparison;
30         }
31         else {
32           return a.getName().compareTo(b.getName());
33         }
34       }
35     });
36     File nextFile = candidateFiles.get(0); //因为每次获取到的文件处理完都会被标记为已完成,所以直接取拍完序的第一个
37     //修复传输大文件报错文件被修改的BUG
38     this.checkFileCpIsOver(nextFile);//此处被阻塞,直到文件拷贝文件或者超过20秒
39
40     try {
41       // roll the meta file, if needed
42       String nextPath = nextFile.getPath();
43       PositionTracker tracker =
44           DurablePositionTracker.getInstance(metaFile, nextPath);
45       if (!tracker.getTarget().equals(nextPath)) {
46         tracker.close();
47         deleteMetaFile();
48         tracker = DurablePositionTracker.getInstance(metaFile, nextPath);
49       }
50
51       // sanity check
52       Preconditions.checkState(tracker.getTarget().equals(nextPath),
53           "Tracker target %s does not equal expected filename %s",
54           tracker.getTarget(), nextPath);
55
56       ResettableInputStream in =
57           new ResettableFileInputStream(nextFile, tracker,
58               ResettableFileInputStream.DEFAULT_BUF_SIZE, inputCharset,
59               decodeErrorPolicy);
60       EventDeserializer deserializer = EventDeserializerFactory.getInstance
61           (deserializerType, deserializerContext, in);
62
63       return Optional.of(new FileInfo(nextFile, deserializer));
64     } catch (FileNotFoundException e) {
65       // File could have been deleted in the interim
66       logger.warn("Could not find file: " + nextFile, e);
67       return Optional.absent();
68     } catch (IOException e) {
69       logger.error("Exception opening file: " + nextFile, e);
70       return Optional.absent();
71     }
72   }
73 }

在方法的第36行是获取准备要读的文件的部分,之前就是直接拿到文件不会检查文件是否拷贝完成,第38行为自己添加的方法

方法如下:

 1 /**
 2  *
 3  * @Title: checkFileCpIsOver
 4  * @Description: TODO(用来检查文件拷贝是否完成)
 5  * @param @param currentFile    设定文件
 6  * @return void    返回类型
 7  * @throws
 8  */
 9 private void checkFileCpIsOver(File file) {
10     long modified = file.lastModified();//目前文件的修改时间
11     long length = file.length();//目前文件的大小
12     try {
13       Thread.sleep(1000);//等待1秒钟
14     } catch (InterruptedException e) {
15       // TODO Auto-generated catch block
16       e.printStackTrace();
17     }
18     File currentFile = new File(file.getAbsolutePath());
19     int count = 0;//记录循环次数,超过20次,也就是10秒后抛出异常
20     while(currentFile.lastModified() != modified || currentFile.length() != length) {
21         if(count > 20) {
22             String message = "File Copy time too long. please check copy whether exception!" + "\n"
23                       + "File at :" + file.getAbsolutePath() + "\n"
24                       + "File current length is:" + currentFile.lastModified();
25             new IllegalStateException(message);
26         }
27         count++;
28         modified = currentFile.lastModified();
29         length = currentFile.length();
30         try {
31           Thread.sleep(500);//等待500毫秒
32         } catch (InterruptedException e) {
33           // TODO Auto-generated catch block
34           e.printStackTrace();
35         }
36         currentFile = new File(file.getAbsolutePath());
37
38
39     }
40     //一直到文件传输完成就可以退出
41 }

修复完成,将修改涉及到的部分重新打包,替换线上JAR包,再次验证。

时间: 2024-08-01 23:09:36

flume spoolDirectorySource中的 File has been modified since being read与File has changed size since being read错误的相关文章

flume异常崩溃 File has been modified since being read

日志采集异常,生产报错误日志: (org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.run:280) - FATAL: Spool Directory source spool_source: { spoolDir: /apps/logs/libra }: Uncaught exception in SpoolDirectorySource thread. Restart orreconfigure Flume

mysql通过sqoop导入到hbase中时数据量为1000w时出现Incorrect key file for table &#39;/tmp/#sql_458_0.MYI&#39;; try to repair it

问题:mysql通过sqoop导入到hbase中时数据量为1000w时出现Incorrect key file for table '/tmp/#sql_458_0.MYI'; try to repair it,数据量为100w等时没该问题 分析:出现该问题时因为mysql的临时目录(默认为/tmp)太小 解决方法:参考:http://blog.sina.com.cn/s/blog_4c197d420101bdn9.html mysql通过sqoop导入到hbase中时数据量为1000w时出现I

将字符串添加到指定的文件中去 AppendAllText ;判断指定路径的文件是否存在File.Exists(Path)

using System; using System.Collections.Generic; using System.Linq; using System.Text; namespace Test { class Program { static void Main(string[] args) { string path = @"F:\1.txt"; //指定文件的路径 //---------------------------------------------- //.Exi

[转]ADT中通过DDMS导入文件出错ddms transfer error: Read-only file system,Failed to push selection: Read-only file system

[已解决] 原文  http://www.crifan.com/ddms_import_file_error_transfer_error_read_only_file_system/ 想要通过adt的ddms的file explorer去向adv中导入文件,结果出错: [2013-04-23 10:32:00 - ddms] transfer error: Read-only file system [2013-04-23 10:32:00] Failed to push selection:

oozie 启动过程中--- Existing PID file found during start. Removing/clearing stale PID file.

如果oozie使用kill -9 暴力杀死了tomcat,再启动的时候,会出问题,需要删除tomcat的pid文件 彻底停止oozie的tomcat的进程,然后删除pid文件 rm -rf  /export/servers/oozie-4.1.0-cdh5.14.0/oozie-server/temp/oozie.pid 非正常停止服务启动报错为: Existing PID file found during start. Removing/clearing stale PID file. 原文

The URL &quot;filename&quot; is invalid. It may refer to a nonexistent file or folder, or refer to a valid file or folder that is not in the current Web

Sharepoint Error : The URL "filename" is invalid. It may refer to a nonexistent file or folder, or refer to a valid file or folder that is not in the current Web 中文错误: 一次网站突然出现问题 新建网站会报如下错误: 该 URL“DocLib1/Doc1.doc”无效.它可能指向不存在的文件或文件夹,或者是指向不在当前网站中

PHP Warning: File upload error - unable to create a temporary file in Unknown on line 0

代码在本地运行一切都OK,放到服务器上,网站访问正常,上传就出现该错误. 提示:PHP Warning: File upload error - unable to create a temporary file in Unknown on line 0 出现问题的服务器:Windows server 2012 Standard iis8 php5.5.37 fastCGI 原因:原来是在配置php环境的时候,php.ini没有设置upload_tmp_dir . 如果 php.ini 没有设置

Warning: File upload error - unable to create a temporary file in Unknown on line 0 的解决办法

upload_tmp_dir  临时文件夹问题 上传文件提示 Warning: File upload error - unable to create a temporary file in Unknown on line 0 找到php.ini 中的 upload_tmp_dir 把前边的“:”去掉然后改为upload_tmp_dir =C:\Windows\temp 最后记得重启apache 是不是很简单呀 原文地址:https://www.cnblogs.com/xm666/p/1161

backup, file manipulation operations (such as ALTER DATABASE ADD FILE) and encryption changes on a database must be serialized.

昨天在检查YourSQLDba备份时,发现有台数据库做备份时出现了下面错误信息,如下所示: <Exec>   <ctx>yMaint.ShrinkLog</ctx>   <inf>Log Shrink</inf>   <Sql> --  ======================================================================== -- Shrink of log file E:\SQ