Flume Spool Source 源码过程分析(未运行)

主要涉及到的类:

SpoolDirectorySource 读取用户配置,并按照batchSize去读取这么多量的Event从用户指定的Spooling Dir中。SpoolDirectorySource 不会去读取某一个具体的文件,而是通过内部的reader去读取。文件切换等操作,都是reader去实现

内部类:SpoolDirectoryRunnable是一个线程,其中的run方法,完成从Spooling Dir读取Event(使用reader去读取)

 1 @Override
 2     public void run() {
 3       int backoffInterval = 250;
 4       try {
 5         while (!Thread.interrupted()) {
 6           List<Event> events = reader.readEvents(batchSize);
 7           if (events.isEmpty()) {
 8             break;
 9           }
10           sourceCounter.addToEventReceivedCount(events.size());
11           sourceCounter.incrementAppendBatchReceivedCount();
12
13           try {
14             getChannelProcessor().processEventBatch(events);
15             reader.commit();
16           } catch (ChannelException ex) {
17             logger.warn("The channel is full, and cannot write data now. The " +
18               "source will try again after " + String.valueOf(backoffInterval) +
19               " milliseconds");
20             hitChannelException = true;
21             if (backoff) {
22               TimeUnit.MILLISECONDS.sleep(backoffInterval);
23               backoffInterval = backoffInterval << 1;
24               backoffInterval = backoffInterval >= maxBackoff ? maxBackoff :
25                                 backoffInterval;
26             }
27             continue;
28           }
29           backoffInterval = 250;
30           sourceCounter.addToEventAcceptedCount(events.size());
31           sourceCounter.incrementAppendBatchAcceptedCount();
32         }
33         logger.info("Spooling Directory Source runner has shutdown.");
34       } catch (Throwable t) {
35         logger.error("FATAL: " + SpoolDirectorySource.this.toString() + ": " +
36             "Uncaught exception in SpoolDirectorySource thread. " +
37             "Restart or reconfigure Flume to continue processing.", t);
38         hasFatalError = true;
39         Throwables.propagate(t);
40       }
41     }

ReliableSpoolingFileEventReader 定义在SpoolDirectorySource中的reader。看这个名字就知道碉堡了,reliable的,怎么实现reliable的??

reader的readEvent方法,会根据batchSize大小读取指定的Event

该方法的大致意思:

如果没有提交,如果当前文件空,错,否则获取EventDeserializer

如果已经提交,如果当前文件空,则获得下一个文件,之后,如果文件还是空,则返回空Event列表。

之后,调用EventDeserializer的readEvents。

 1  public List<Event> readEvents(int numEvents) throws IOException {
 2     if (!committed) {
 3       if (!currentFile.isPresent()) {
 4         throw new IllegalStateException("File should not roll when " +
 5             "commit is outstanding.");
 6       }
 7       logger.info("Last read was never committed - resetting mark position.");
 8       currentFile.get().getDeserializer().reset();
 9     } else {
10       // Check if new files have arrived since last call
11       if (!currentFile.isPresent()) {
12         currentFile = getNextFile();
13       }
14       // Return empty list if no new files
15       if (!currentFile.isPresent()) {
16         return Collections.emptyList();
17       }
18     }
19
20     EventDeserializer des = currentFile.get().getDeserializer();
21     List<Event> events = des.readEvents(numEvents);
22
23     /* It‘s possible that the last read took us just up to a file boundary.
24      * If so, try to roll to the next file, if there is one. */
25     if (events.isEmpty()) {
26       retireCurrentFile();
27       currentFile = getNextFile();
28       if (!currentFile.isPresent()) {
29         return Collections.emptyList();
30       }
31       events = currentFile.get().getDeserializer().readEvents(numEvents);
32     }
33
34     //写入header值,略47
48     committed = false;
49     lastFileRead = currentFile;
50     return events;
51   }

在这个方法中,我们看到了

currentFile:该对象采用了谷歌的Optional进行封装,更加容易判断空指针等等。Optional<FileInfo>,该FileInfo封装了普通的File对象和针对该file对象的EventDeserializer(事件序列器)

该currentFile主要在ReliableSpoolingFileEventReader 类中的Optional<FileInfo> openFile(File file),Optional<FileInfo> getNextFile() 方法中调用。

EventDeserializer:事件序列器的主要作用在于定义一些读取的基本操作

其中mark是读取的行position进行标记

EventDeserializer的实现子类,很多,这里只讲LineDeserializer,顾名思义,按照行去读取,一行就是一个Event

虽然EventDeserializer已经涉及到读取行了,但是真正读取记录的还不是他。

我们看openfile函数中

 1 String nextPath = file.getPath();
 2       PositionTracker tracker =
 3           DurablePositionTracker.getInstance(metaFile, nextPath);
 4       if (!tracker.getTarget().equals(nextPath)) {
 5         tracker.close();
 6         deleteMetaFile();
 7         tracker = DurablePositionTracker.getInstance(metaFile, nextPath);
 8       }
15       ResettableInputStream in =
16           new ResettableFileInputStream(file, tracker,
17               ResettableFileInputStream.DEFAULT_BUF_SIZE, inputCharset,
18               decodeErrorPolicy);
19       EventDeserializer deserializer = EventDeserializerFactory.getInstance
20           (deserializerType, deserializerContext, in);

因此可以看出EventDeserializer读取记录是靠 ResettableFileInputStream(in对象),ResettableFileInputStream的初始化需要File类和一个DurablePositionTracker,

因此,ResettableFileInputStream在读取File内容同时,使用DurablePositionTracker去记录position的信息。

DurablePositionTracker使用了apache avro来进行持久化

private final DataFileWriter<TransferStateFileMeta> writer;
private final DataFileReader<TransferStateFileMeta> reader;

这样,当我们使用EventDeserializer读取一个event的时候,就会从当前文件流中获取信息,同时也能够记录读取的位置信息。

当读取batchsize数量的event都正确处理后,ReliableSpoolingFileEventReader 会commit(),持久化位置信息

 public void commit() throws IOException {
    if (!committed && currentFile.isPresent()) {
      currentFile.get().getDeserializer().mark();
      committed = true;
    }
  }

这里的mark方法,调用

LineDeserializer的

 @Override
  public void mark() throws IOException {
    ensureOpen();
    in.mark();
  }

在调用ResettableFileInputStream(in)的mark方法

 @Override
  public void mark() throws IOException {
    tracker.storePosition(tell());
  }

之后调用位置tracker的storePostition方法(DurablePositionTracker)

 @Override
  public synchronized void storePosition(long position) throws IOException {
    metaCache.setOffset(position);
    writer.append(metaCache);
    writer.sync();
    writer.flush();
  }

之后,调用avro的DataFileWriter,完成写入操作。

最后,至于postition位置的持久化逻辑判断,基本也能猜到,当出现trash时候,从未读取的地方开始读取,等等,所以说,是ResettableFileInputStream的输入流,因为他能够读取信息,也能持久化读取的信息位置。

时间: 2024-09-30 06:33:52

Flume Spool Source 源码过程分析(未运行)的相关文章

android 源码编译及其运行模拟器相关问题记录

最近一直在看android源码相关的文档,包括编译源码,还有framework层的代码,本人很懒,一直没有写博客,今天想自己在编译一下源码,并且运行在模拟器中. 源码的版本不同,需要的jdk可能也有所不同,一切都参照官方给与的文档,下载源码的方法也参考官方文档. 注意点:1.环境变量要设置正确,不要出现错误 2.基本没有一次就能顺利编译源码的,多少都会出错,根据提示修改错误,安装一些包和库就可以搞定,具体问题具体查找,我遇到的问题也可多了,都是按官方文档解决的.有些问题可能和你的编译环境相关,这

【Java】【Flume】Flume-NG源码阅读之AvroSink

org.apache.flume.sink.AvroSink是用来通过网络来传输数据的,可以将event发送到RPC服务器(比如AvroSource),使用AvroSink和AvroSource可以组成分层结构.它继承自AbstractRpcSink  extends AbstractSink implements Configurable这跟其他的sink一样都得extends AbstractSink implements Configurable,所以重点也在confgure.start.

Zico源码分析:运行启动过程分析和总结

其实已经有童鞋对Zico的源码和运行过程进行了总结,例如:http://www.cnblogs.com/shuaiwang/p/4522905.html.这里我再补充一些内容.当我们使用mvn install将Zico打包成war包后,将其解压后可以看到MANIFEST.MF文件中指定的主类是:com.jitlogic.zico.main.ZicoMain,简单看一下这个类,发现其主要作用是启动Jetty Web Server,并加载配置文件,实现一些安全配置.那么我们需要问了,这个war包是怎

【Flume】【源码分析】flume中sink到hdfs,文件系统频繁产生文件,文件滚动配置不起作用? ERROR hdfs.BucketWriter: Hit max consecutive under-replication rotations (30)

[转载] http://blog.csdn.net/simonchi/article/details/43231891 ERROR hdfs.BucketWriter: Hit max consecutive under-replication rotations (30) 本人在测试hdfs的sink,发现sink端的文件滚动配置项起不到任何作用,配置如下: [plain] view plain copy print? a1.sinks.k1.type=hdfs a1.sinks.k1.cha

DroidPlugin源码分析插件运行环境初始化

从DroidPlugin的官方文档中我们知道. 2 在AndroidManifest.xml中使用插件的com.morgoo.droidplugin.PluginApplication: 或者在自定义的Application的onCreate()函数中,调用PluginHelper.getInstance().applicationOnCreate(getBaseContext()); 在Application的attachBaseContext()函数中,调用 PluginHelper.get

Monkey源码分析之运行流程

在<MonkeyRunner源码分析之与Android设备通讯方式>中,我们谈及到MonkeyRunner控制目标android设备有多种方法,其中之一就是在目标机器启动一个monkey服务来监听指定的一个端口,然后monkeyrunner再连接上这个端口来发送命令,驱动monkey去完成相应的工作. 当时我们只分析了monkeyrunner这个客户端的代码是怎么实现这一点的,但没有谈monkey那边是如何接受命令,接受到命令又是如何处理的. 所以自己打开源码看了一个晚上,大概有了概念.但今天

Docker环境下编译android源码|编译可运行xposed

前言 因为我的电脑是Ubuntu18的版本,成功编译xposed刷入手机之后无法启动,检查了所有的环境,没有问题,发现可能是Ubuntu系统的兼容库的问题,但是我不可能重新安装系统吧,毕竟有点蠢,所以最好的方式就是在docker的Ubuntu容器中编译,统一环境问题,也可以隔离环境平时 工作开发环境,下面开始操作 安装配置docker docker加速,采用对国人友好的镜像地址 通过修改daemon配置文件/etc/docker/daemon.json来使用加速器,执行以下命令: 您可以配置 D

kettle系列-1.kettle源码获取与运行

第一次写博客,心里有点小激动,肯定有很多需要改进的地方,望海涵. kettle算是我相对较为深入研究过的开源软件了,也是我最喜欢的开源软件之一,它可以完成工作中很多体力劳动,在ETL数据抽取方面得到了广泛的使用.我本身对kettle的各个控件使用也不是很熟悉,只会使用最常见的部分控件,就是这样简单的使用也被它的美深深的吸引住了. 好了,进入正题,这里假设你熟悉java开发.git一般使用.kettle一般使用.kettle源码之前托管在kettle官方的svn上,后来迁移到了github上,在g

Robotium源码分析之运行原理

从上一章<Robotium源码分析之Instrumentation进阶>中我们了解到了Robotium所基于的Instrumentation的一些进阶基础,比如它注入事件的原理等,但Robotium作为一个测试框架,其功能远不止于只是方便我们注入事件,其应该还包含其他高级的功能,参照我们前面其他框架如MonkeyRunner,UiAutomator和Appium的源码分析,我们知道一个移动平台自动化测试框架的基本功能除了事件注入外起码还应该有控件获取的功能.所以,这篇文章我们主要是围绕Robo