Flume-NG源码阅读之SinkGroups和SinkRunner

  在AbstractConfigurationProvider类中loadSinks方法会调用loadSinkGroups方法将所有的sink和sinkgroup放到了Map<String,
SinkRunner> sinkRunnerMap之中。

  SinkRunner可能对应一个sink也可能对应一个sinkgroup。因为如果配置文件中有sinkgroup则这个sinkgroup对应的sink会组成一个group然后封装为一个sinkRunner,然后不在sinkgroup中的sink会自己成为一个sinkRunner。每个SinkRunner的构造方法的参数是一个SinkProcessor是用来处理多个sink的。

  一、如果一个SinkRunner对应一个sink。SinkProcessor pr = new
DefaultSinkProcessor()是默认的SinkProcessor。loadSinkGroups方法中的相关代码如下:

1           SinkProcessor pr = new DefaultSinkProcessor();
2 List<Sink> sinkMap = new ArrayList<Sink>();
3 sinkMap.add(entry.getValue());
4 pr.setSinks(sinkMap);
5 Configurables.configure(pr, new Context());
6 sinkRunnerMap.put(entry.getKey(),new SinkRunner(pr));

  DefaultSinkProcessor.configure(Context
context)是空方法,所有这句Configurables.configure(pr, new
Context())没啥作用,重要的是DefaultSinkProcessor.process()方法,就一句代码就是return
sink.process()直接调用sink的process。setSinks方法只会设置一个sink。DefaultSinkProcessor的start()方法也会直接调用sink.start()来启动sink。

  二、如果一个SinkRunner对应多个sink。则会构造一个SinkGroup group = new
SinkGroup(groupSinks)然后获取SinkProcessor:group.getProcessor()。loadSinkGroups方法中的相关代码如下:

1           SinkGroup group = new SinkGroup(groupSinks);
2 Configurables.configure(group, groupConf);
3 sinkRunnerMap.put(comp.getComponentName(), new SinkRunner(group.getProcessor()));

  Configurables.configure(group,
groupConf)会调用SinkGroupConfiguration.configure(Context
context),该方法会获取配置文件中关于"processor."的属性通过getKnownSinkProcessor方法获取SinkProcessorType(是FailoverSinkProcessor或者是LoadBalancingSinkProcessor),并执行该SinkProcessor.configure(processorContext)进行实例化和配置。

  1、如果SinkProcessor是LoadBalancingSinkProcessor,这是负载均衡的processor,会将channel中的发送到指定的所有sink。通过配置选择器selector来选择何种方式的负载均衡,1.3有两种:ROUND_ROBIN,轮询,就是轮流向channel发送数据;RANDOM,随机选择channel发送数据。只有这个SinkProcessor有选择器。

  (1)configure(Context
context)方法。先获取选择器selector的类型,默认是ROUND_ROBIN,轮询;获取backoff(是否使用推迟算法,就是sink.process出问题后对这个sink设置惩罚时间,在此期间不再认为其可活动)的boolean值(默认false就是不启用);根据类型构造相应的选择器对象RoundRobinSinkSelector(实际上会构造一个RoundRobinOrderSelector)或者RandomOrderSinkSelector(实际上会构造一个RandomOrderSelector);然后实例化并设置sinks;最后对selector执行其configure(context)方法进行初始化。

  A、RoundRobinSinkSelector的实际操作者是RoundRobinOrderSelector extends
OrderSelector,它实现了createIterator()方法,该方法用来选出所有的sink及其可活动sinkl的索引封装成一个SpecificOrderIterator<T>(indexOrder,
getObjects())并返回,可以通过SpecificOrderIterator.hasNext()方法判断是否还有sink,用next()方法获取下一个sink。这样可以按照索引递增的顺序依次获取sink进行操作。SpecificOrderIterator主要是将两个:一个是索引数组,一个是sink列表。createIterator()方法代码如下:


 1 @Override
2 public Iterator<T> createIterator() {
3 List<Integer> activeIndices = getIndexList();  //会获取最新的活动的sink的索引列表
4 int size = activeIndices.size();
5 // possible that the size has shrunk so gotta adjust nextHead for that
6 if (nextHead >= size) { //可能会出现sink的总数调整,所以总得getIndexList()并调整nextHead
7 nextHead = 0;
8 }
9 int begin = nextHead++; //注意++在后面说明是先赋值,在自加
10 if (nextHead == activeIndices.size()) {
11 nextHead = 0;
12 }
13
14 int[] indexOrder = new int[size];
15
16 for (int i = 0; i < size; i++) {
17 indexOrder[i] = activeIndices.get((begin + i) % size);
18 }
19
20 return new SpecificOrderIterator<T>(indexOrder, getObjects()); //组成两个数组,大小都一样
21 }

  createIterator()方法中总会调用getIndexList()方法,因为可能有sink中断,或者sinkgroup再调整等情况,使得sinkgroup中实际活动的sink数产生变化。nextHead始终指向下一个可活动的sink索引。indexOrder是新的活动sink的索引数组;getObjects()则返回所有sink的List,通过索引即可即可获取此List中对应的sink。

  B、RandomOrderSinkSelector的实际操作者是RandomOrderSelector extends
OrderSelector,它实现了createIterator()方法:


 1 public synchronized Iterator<T> createIterator() {
2 List<Integer> indexList = getIndexList();
3
4 int size = indexList.size();
5 int[] indexOrder = new int[size];
6 //indexList由于remove操作会动态变化,所以一直使用indexList.size()会获得实际大小
7 while (indexList.size() != 1) {
8 int pick = random.nextInt(indexList.size());
9 indexOrder[indexList.size() - 1] = indexList.remove(pick); //取出pick位置的索引,这句总是使得indexOrder从后向前插入数据
10 }
11
12 indexOrder[0] = indexList.get(0); //将最后一个索引放入indexOrder
13
14 return new SpecificOrderIterator<T>(indexOrder, getObjects());
15 }

  这个方法最终只是将"可活动"的sink的顺序按随机的方式打乱了而已。注意的一个是总是调用indexList.size()动态获取最新的大小;一个是indexOrder[indexList.size()
- 1]始终是从后向前插入数据;一个是indexOrder[0] = indexList.get(0)将最后一个插入保证完整性。

  A和B都是OrderSelector抽象类的子类,都只实现了createIterator()方法,对于getIndexList()和sink.process()方法出现错误的时的selector.informSinkFailed(sink)都是一样的这两个方法决定了出现问题的sink的推迟时间,如果要修改推迟时间可以重写这两个方法。当sink.process运行出问题时informSinkFailed会更新对应sink的FailureState(就三个数,sequentialFails记录出错次数、restoreTime记录出错后惩罚恢复时间(在此期间不再认为是可活动的sink,通过getIndexList()来过滤)、lastFail记录上一次出错时间,三个初始化都是0),maxTimeout默认是3000。看informFailure代码:


public void informFailure(T failedObject) {
if (!shouldBackOff) { //不允许推迟
return;
}
FailureState state = stateMap.get(failedObject);
long now = System.currentTimeMillis(); //获取现在系统时间
long delta = now - state.lastFail; //获取和上次失败时间之间的时间间隔
long lastBackoffLength = Math.min(maxTimeout, 1000 * (1 << state.sequentialFails)); //获取上一次要推迟的时间增量
long allowableDiff = lastBackoffLength + CONSIDER_SEQUENTIAL_RANGE; //CONSIDER_SEQUENTIAL_RANGE=2000
if (allowableDiff > delta) {
if (state.sequentialFails < EXP_BACKOFF_COUNTER_LIMIT) { //说明是连续失败
state.sequentialFails++;
}
} else { //说明期间曾重新正确process,重新计数
state.sequentialFails = 1;
}
state.lastFail = now;
state.restoreTime = now + Math.min(maxTimeout, 1000 * (1 << state.sequentialFails));
}

  这个informFailure方法有需要说明的地方:1、如何判断是连续失败?关键在于CONSIDER_SEQUENTIAL_RANGE这个变量,等于2000,因为首先在推迟时间增量是1000的倍数,而且在推迟时间内是“不被认可”的,是不被认为是可活动的,所以超过推迟时间后自然会被重新认为是活动的,if
(allowableDiff > delta)这句代码写的不够友好,比较费解,它等价于if ((now-state.restoreTime) <
CONSIDER_SEQUENTIAL_RANGE),如果再次失败且比推迟时间等于1000说明是当重新认为是活动的第一次执行就失败,明显是连续失败,所以在失败次数不超过EXP_BACKOFF_COUNTER_LIMIT(等于16)时就增加state.sequentialFails,一旦超过16就不再增加就是16;当((now-state.restoreTime)
>=
CONSIDER_SEQUENTIAL_RANGE)成立时说明成功process至少一次但这次失败,需要重置state.sequentialFails为1。

  getIndexList()方法用来过滤“不被认可”的sink的索引。代码如下:


 1 protected List<Integer> getIndexList() {
2 long now = System.currentTimeMillis();
3
4 List<Integer> indexList = new ArrayList<Integer>();
5
6 int i = 0;
7 for (T obj : stateMap.keySet()) { //是sink的集合
8 if (!isShouldBackOff() || stateMap.get(obj).restoreTime < now) { //
9 indexList.add(i); //将索引存储
10 }
11 i++;
12 }
13 return indexList;
14 }

  如果shouldBackOff=true则会返回的列表将是所有的sink的索引。stateMap.get(obj).restoreTime <
now这句会过滤掉当前还处在惩罚时间内不被认可的sink的索引。

  (2)start()方法会先启动AbstractSinkProcessor.start()方法将所有的sink启动(start()),然后启动选择器 selector.start()。

  (3)process()方法遍历sinkIterator一次获取可活动的sink,执行sink.process()方法,如果有异常就跳出循环并执行失败处理informSinkFailed。

  2、如果SinkProcessor是FailoverSinkProcessor,这是容错的processor,一旦有一个sink中断可以使用其他的代替。

  (1)setSinks(List<Sink>
sinks)方法会将sinks列表中的所有sink,先调用父类的setSinks方法为的是可以执行父类的start和stop方法(子类中没有实现这俩方法),然后放入Map<String,
Sink> sinks中。

  (2)configure(Context
context)方法,会先获取中断时间的上限maxPenalty,然后将所有的sink及其对应的优先级放入liveSinks(这是一个TreeMap,默认根据键值的自然顺序排序存储),最后activeSink
= liveSinks.get(liveSinks.lastKey())获取优先级最高的sink作为活动sink。failedSinks = new
PriorityQueue<FailedSink>()是一个保存中断sink的一个优先级队列。

  (3)process()方法。循环执行如果failedSinks不为空并且记录惩罚时间小于当前系统时间,则取出failedSinks的head然后尝试执行getSink().process()如果能获取到Rady状态说明这个节点又重新建立了链接,则将其加入liveSinks,并重新获取优先级最高的sink作为activeSink,如果获取的是backOff状态则重新将其加入failedSinks,返回状态,如果出现异常则cur.incFails()重新记录惩罚时间并加入failedSinks,惩罚时间会动态变化,会根据失败的次数增加(会和设置的比较取较大者)。如果failedSinks为空或者当前系统小于惩罚时间则使用当前活动的sink:activeSink.process()。

  注:惩罚时间是动态变化的,会随着链接失败的次数而变化,失败次数越多到下次使用它的间隔越长。

  返回顶端sinkRunnerMap会在Application.startAllComponents方法中调用,放放到LifecycleSupervisor.supervise方法中去执行,最终会执行SinkRunner.start()方法来启动组件。


 1 public void start() {
2 SinkProcessor policy = getPolicy();
3
4 policy.start();
5
6 runner = new PollingRunner();
7
8 runner.policy = policy;
9 runner.counterGroup = counterGroup;
10 runner.shouldStop = new AtomicBoolean();
11
12 runnerThread = new Thread(runner);
13 runnerThread.setName("SinkRunner-PollingRunner-" +
14 policy.getClass().getSimpleName());
15 runnerThread.start();
16
17 lifecycleState = LifecycleState.START;
18 }

  上述代码中的policy其实就是SinkProcessor,可能是LoadBalancingSinkProcessor、FailoverSinkProcessor、DefaultSinkProcessor三者中其中之一。policy.start()会启动SinkProcessor。然后会启动一个线程PollingRunner,该线程会始终执行policy.process()方法根据返回的状态做一些统计。这就是我们自定义也要实现process方法的所在,及其需要返回Status的原因。

  至此SinkGroup的介绍完结。

Flume-NG源码阅读之SinkGroups和SinkRunner,布布扣,bubuko.com

时间: 2024-10-19 14:11:57

Flume-NG源码阅读之SinkGroups和SinkRunner的相关文章

Flume NG源码分析(五)使用ThriftSource通过RPC方式收集日志

上一篇说了利用ExecSource从本地日志文件异步的收集日志,这篇说说采用RPC方式同步收集日志的方式.笔者对Thrift比较熟悉,所以用ThriftSource来介绍RPC的日志收集方式. 整体的结构图如下: 1. ThriftSource包含了一个Thrift Server,以及一个Thrift Service服务的实现.这里的Thrift Service是由ThriftSourceProtocol定义 2. 应用程序调用Thrift Service的客户端,以RPC的方式将日志发送到Th

Flume NG源码分析(三)使用Event接口表示数据流

Flume NG有4个主要的组件: Event表示在Flume各个Agent之间传递的数据流 Source表示从外部源接收Event数据流,然后传递给Channel Channel表示对从Source传递的Event数据流的临时存储 Sink表示从Channel中接收存储的Event数据流,并传递给下游的Source或者终点仓库 这篇看一下Event接口表示的数据流.Source, Channel, Sink操作的数据流都是基于Event接口的封装. public interface Event

【Java】【Flume】Flume-NG源码阅读之AvroSink

org.apache.flume.sink.AvroSink是用来通过网络来传输数据的,可以将event发送到RPC服务器(比如AvroSource),使用AvroSink和AvroSource可以组成分层结构.它继承自AbstractRpcSink  extends AbstractSink implements Configurable这跟其他的sink一样都得extends AbstractSink implements Configurable,所以重点也在confgure.start.

Flume-NG源码阅读之AvroSink

org.apache.flume.sink.AvroSink是用来通过网络来传输数据的,可以将event发送到RPC服务器(比如AvroSource),使用AvroSink和AvroSource可以组成分层结构.它继承自AbstractRpcSink  extends AbstractSink implements Configurable这跟其他的sink一样都得extends AbstractSink implements Configurable,所以重点也在confgure.start.

Flume 源码阅读

Flume架构 主要由3个组件,分别是Source,Channel和Sink,3个组件组成Event在Flume中得数据流向或者说流水线,功能可以由Flume的介绍看出:When a Flume source receives an event, it stores it into one or more channels. The channel is a passive store that keeps the event until it’s consumed by a Flume sin

Flume-NG源码阅读之FileChannel

FileChannel是flume一个非常重要的channel组件,非常常用.这个channel非常复杂,涉及的文件更多涉及三个包:org.apache.flume.channel.file.org.apache.flume.channel.file.encryption(加密).org.apache.flume.channel.file.proto共计40个源码文件. 一.configure(Context context)方法: 1.首先获取配置文件中的checkpointDir和dataD

【Java】【Fulme】Flume-NG源码阅读之SpoolDirectorySource

org.apache.flume.source.SpoolDirectorySource是flume的一个常用的source,这个源支持从磁盘中某文件夹获取文件数据.不同于其他异步源,这个源能够避免重启或者发送失败后数据丢失.flume可以监控文件夹,当出现新文件时会读取该文件并获取数据.当一个给定的文件被全部读入到通道中时,该文件会被重命名以标志已经完成.同时,该源需要一个清理进程来定期移除完成的文件. 通道可选地将一个完成路径的原始文件插入到每个事件的hearder域中.在读取文件时,sou

Flume-NG源码阅读之HBaseSink

关于HBase的sink的所有内容均在org.apache.flume.sink.hbase包下. 每个sink包括自己定制的,都extends AbstractSink implements Configurable. 一.首先是configure(Context context)方法.该方法是对HBaseSink的参数初始化.主要包括以下几个: tableName:要写入的HBase数据表名,不能为空: columnFamily:数据表对应的列簇名,这个sink目前只支持一个列簇,不能为空:

Spark2.1内部原理剖析与源码阅读、程序设计与企业级应用案例视频教程

38套大数据,云计算,架构,数据分析师,Hadoop,Spark,Storm,Kafka,人工智能,机器学习,深度学习,项目实战视频教程 视频课程包含: 38套大数据和人工智能精品高级课包含:大数据,云计算,架构,数据挖掘实战,实时推荐系统实战,电视收视率项目实战,实时流统计项目实战,离线电商分析项目实战,Spark大型项目实战用户分析,智能客户系统项目实战,Linux基础,Hadoop,Spark,Storm,Docker,Mapreduce,Kafka,Flume,OpenStack,Hiv