Apache Flink源码解析之stream-operator

前面我们谈论了Flink stream中的transformation。你可以将transformation看成编写Flink程序并构建流式处理程序的必要组成部分(静态表现形式);而本篇我们将探讨transformation在Flink运行时对应的动态表现形式——operator。他们之间的映射关系见下图:

具体的探讨可以查看前文:Flink中的一些核心概念

StreamOperator

所有operator的最终基类,operator的分类方式,按照输入流个数不同分为:

  • 无输入:StreamSource
  • 单个流输入:OneInputStreamOperator
  • 两个流输入:TwoInputStreamOperator

跟生命周期有关的核心抽象方法:

  • setup : 实例化operator
  • open :该方法会在任何元素被处理之前执行,它的实现通常包含了operator的初始化逻辑
  • close :该方法在所有的元素都进入到operator被处理之后调用
  • dispose :该方法在operator生命周期的最后阶段执行,主要用于回收资源

StreamOperator及其实现类中还包含了一些状态恢复与保存相关的逻辑,但这些不是本文的主题,所有暂时不做探讨。

先来看一下整个package的类关系图:

我们整个剖析方式大致也按照以上operator的分类方式以及类的层次结构来。

StreamSource

作为一个流处理DAG的起点,source operator相比其他operator无疑是特别的(从类的继承关系图也可以看出来)。

它需要接受SourceFunction的实例。并且我们可以看到,它的chaining strategyHEAD(它表示operator不能有前置operator,但可以作为其他operator的前置operator,下文会谈到)。

this.chainingStrategy = ChainingStrategy.HEAD;

StreamSource的实现略显复杂,因为它涉及到我们前面文章谈SourceFunction时谈到的SourceFunction.SourceContext的实现。在这里提供了三个实现,分别对应我们之前谈到的Flink对事件时间的三个分类:

  • NonTimestampContext:针对ProcessingTime,该SourceContext将时间戳设置为-1,并且不发射watermark
  • AutomaticWatermarkContext:针对IngestionTime,提供自动的watermark发射机制的SourceContext
  • ManualWatermarkContext:针对EventTime的人工发射watermarkSourceContext

它们之间的对应关系也体现在其run方法的实现中:

        switch (timeCharacteristic) {
            case EventTime:
                ctx = new ManualWatermarkContext<>(this, lockingObject, collector);
                break;
            case IngestionTime:
                ctx = new AutomaticWatermarkContext<>(this, lockingObject, collector,
                        getRuntimeContext().getExecutionConfig().getAutoWatermarkInterval());
                break;
            case ProcessingTime:
                ctx = new NonTimestampContext<>(this, lockingObject, collector);
                break;
            default:
                throw new Exception(String.valueOf(timeCharacteristic));
        }

run方法内部会调用SourceFunctionrun方法:

try {
            userFunction.run(ctx);

            // if we get here, then the user function either exited after being done (finite source)
            // or the function was canceled or stopped. For the finite source case, we should emit
            // a final watermark that indicates that we reached the end of event-time
            if (!isCanceledOrStopped()) {
                ctx.emitWatermark(Watermark.MAX_WATERMARK);
            }
        } finally {
            // make sure that the context is closed in any case
            ctx.close();
        }

StreamSource通过一个属性:canceledOrStopped来控制sourceFunction的停止。

整个StreamSource的运行逻辑由run来表述,通过cancel来控制停止逻辑。

NonTimestampContext

NonTimestampContext会忽略时间戳,因此它的实现里稍微特别一点的地方在下面的这两个方法:

public void collectWithTimestamp(T element, long timestamp) {
    // ignore the timestamp
    collect(element);
}

以及

public void emitWatermark(Watermark mark) {
    owner.checkAsyncException();
    // do nothing else
}

第一个方法忽略了时间戳,第二个方法不发送watermark

ManualWatermarkContext

无需特别说明

AutomaticWatermarkContext

该类是自动发送watermark的实现,在构造器中接收参数watermarkInterval来指定自动发送watermark的时间间隔。具体的实现机制是,新建一个独立的发射线程,以指定的时间间隔发射:

            this.scheduleExecutor = Executors.newScheduledThreadPool(1);

            this.watermarkTimer = scheduleExecutor.scheduleAtFixedRate(new Runnable() {
                @Override
                public void run() {
                    final long currentTime = System.currentTimeMillis();

                    if (currentTime > nextWatermarkTime) {
                        // align the watermarks across all machines. this will ensure that we
                        // don‘t have watermarks that creep along at different intervals because
                        // the machine clocks are out of sync
                        final long watermarkTime = currentTime - (currentTime % watermarkInterval);

                        synchronized (lockingObjectParam) {
                            if (currentTime > nextWatermarkTime) {
                                outputParam.emitWatermark(new Watermark(watermarkTime));
                                nextWatermarkTime += watermarkInterval;
                            }
                        }
                    }
                }
            }, 0, watermarkInterval, TimeUnit.MILLISECONDS);

除了这种基于时间的以固定频率发射watermark的机制,在collect方法被调用时,也会检查当前的时间戳,如果达到发送条件也会触发emit watermark

而因为该类实现的是自动发送,在构造器中实现一个定时发送机制,所以emitWatermark方法也就不需要再实现发送逻辑(因为已不再需要用户程序调用emitWatermark方法了),而该方法在该类中的主要任务是负责停止自动发送。停止自动发送的触发条件是收到最后一个元素的信号(将最后一个元素的时间戳设置为Long.MAX_VALUE),emitWatermark收到该标识后,再将其往下游传递并关闭定时发送线程。

OneInputStreamOperator

单一输入流的operator接口,继承自StreamOperator。提供了两个接口方法:

  • processElement:处理到达该operator的一个元素
  • processWatermark:处理一个Watermark

TwoInputStreamOperator

支持两个流作为输入的operator,同样继承自StreamOperator。扩充了多个接口方法:

  • processElement1 : 处理来自第一个输入的某个元素
  • processElement2 : 处理来自第二个输入的某个元素
  • processWatermark1 : 处理来自第一个输入的一个Watermark
  • processWatermark2 : 处理来自第二个输入的一个Watermark

辅助实现类

Output

Collector的扩展,增加了发射WaterMark的功能。该接口主要供operator用于发射元素或者WaterMark

  • emitWatermark : 该发射WaterMark将广播给下游的所有operator

TimeCharacteristic

Flink在涉及到时间相关的处理时,将时间划分为三类。而时间类型的定义在Flink中就是用该枚举来表示:

  • ProcessingTime
  • IngestionTime
  • EventTime

这三种时间类型之前我们曾多次提及,这里不再啰嗦

TimestampedCollector

Output的包装器实现,它用于给元素设置时间戳

AbstractStreamOperator

该抽象类为实现一个具体的operator提供基本的支持,Flink内置提供的operator全部都直接或间接继承自AbstractStreamOperator

它内部包含了三大类的属性:

  • 配置属性
  • 运行时属性
  • 键值对状态属性

大都数方法都是辅助方法,值得一提的是setup方法。从这里我们可以看到所有operator标识符的生成方式:

String operatorIdentifier = getClass().getSimpleName() + "_" + config.getVertexID() + "_" + runtimeContext.getIndexOfThisSubtask();

可以看到标识是由”_”间隔的三段拼接而成。三段分别是:类名,vertex id,以及当前subtask的索引。

然后基于此标识,创建了用于存储状态的stateBackend

stateBackend = container.createStateBackend(operatorIdentifier, keySerializer);

stateBackenddispose方法中会被关闭。

AbstractStreamOperator并没有对open/close等生命周期方法提供具体的实现,这些方法的具体实现被后延至后面谈到的AbstractUdfStreamOperator中。

AbstractUdfStreamOperator

该类主要针对operator生命周期相关的方法(open/close/dispose)提供了模板实现。而这些实现都统一针对用户定义的Function的实例(简称udf)。

ChainingStrategy

该枚举定义了operatorchain strategy(链接策略)。当一个operator链接到其前置operator时,意味着它们将在同一个线程上执行。StreamOperator的默认值是HEAD,这意味着它将没有前置operator,不过它有可能成为其他operator的前置operator。大部分StreamOperator将该枚举以ALWAYS覆盖,表示它们将链接到一个前置operator

它的三个枚举值:

  • ALWAYS :上面已经提到过,它允许将当前operator链接到某前置operator,这是提升性能的良好实践,它能够提升operator的并行度
  • NEVER :该策略不支持operator被链接到某前置operator也不支持被作为其他operator的前置operator
  • HEAD :该策略表示operator没有前置operator,不过可以作为其他operatorchain header

内置的Operator实现

StreamCounter

元素累加器,没有什么特别的

StreamProject

这里需要解释一下,此处的project,并非通常所指的项目的意思,而是投射、投影的意思。你可以将其类比于SQL中的SELECT子句。因此他允许你选择你需要的fields集合。这通过其构造器的一个字段索引数组来指定:

processElement方法中,它依次遍历所有需要的字段索引,将元素中需要的字段提取出来,放入一个用于输出的outTuple,最后再将其发射出去:

    public void processElement(StreamRecord<IN> element) throws Exception {
        for (int i = 0; i < this.numFields; i++) {
            outTuple.setField(((Tuple) element.getValue()).getField(fields[i]), i);
        }
        output.collect(element.replace(outTuple));
    }

StreamFilter

filter operator,处理逻辑很简单,根据自定义的FilterFunction方法,对每个元素进行过滤,如果满足过滤条件,则将该元素emit出去。

StreamMap

map operator,根据传入的MapFunction,对每个元素应用map操作后将其发射出去。

StreamFlatMap

flatmap operator接收FlatMapFunction函数,有一些特别之处:在其open方法中,它初始化了一个TimestampedCollector,作为传递给FlatMapFunctioncollector,该collector是给那些特定的userFunction使用的,并且用于给他们操作的元素设置时间戳。

StreamGroupedFold

分组的fold operatorfold函数的执行依赖于一个初始化值initialValue。因此这里涉及到状态保存。并且状态是跟具体的分区关联的。因此,在open方法的实现中,需要获得跟分区关联的ValueState

        ValueStateDescriptor<OUT> stateId = new ValueStateDescriptor<>(STATE_NAME, outTypeSerializer, null);
        values = getPartitionedState(stateId);

processElement方法的实现,涉及到一系列的操作:从ValueState中获取数据,作为“新”的初始值跟当前元素一起进行fold函数运算,获得结果后更新ValueState,然后将获得的结果emit出去。

StreamGroupedReduce

按分组进行reduce操作的operator.

基于特定的状态名称:

private static final String STATE_NAME = "_op_state";

构建状态id

ValueStateDescriptor<IN> stateId = new ValueStateDescriptor<>(STATE_NAME, serializer, null);

然后再获取状态值:

values = getPartitionedState(stateId);

以上两个动作在open方法中实现

processElement方法中,分为两种情况:

  • 如果之前已存在状态值,那么拿当前值跟之前的状态值做reduce并获得结果,将结果再次更新到最新状态并emit出去
  • 如果之前不存在状态值,那么直接将当前值更新到状态中,并将当前值emit出去

StreamSink

sink operator,通常是流处理的最后一个operator。它接收SinkFunction的实例。在processElement中依次调用其invoke方法。

小结

本文主要探讨了stream transformation的运行时形式operator的大致实现。


微信扫码关注公众号:Apache_Flink


QQ扫码关注QQ群:Apache Flink学习交流群(123414680)

时间: 2024-10-14 21:52:51

Apache Flink源码解析之stream-operator的相关文章

Apache Flink源码解析之stream-window

window(窗口)是Flink流处理中非常重要的概念,本篇我们来对窗口相关的概念以及关联的实现进行解析.本篇的内容主要集中在package org.apache.flink.streaming.api.windowing下. Window 一个Window代表有限对象的集合.一个窗口有一个最大的时间戳,该时间戳意味着在其代表的某时间点--所有应该进入这个窗口的元素都已经到达. Flink的根窗口对象是一个抽象类,只提供了一个抽象方法: public abstract long maxTimes

Apache Flink源码解析之stream-sink

上一篇我们谈论了Flink stream source,它作为流的数据入口是整个DAG(有向无环图)拓扑的起点.那么与此对应的,流的数据出口就是跟source对应的Sink.这是我们本篇解读的内容. SinkFunction 跟SourceFunction对应,Flink针对Sink的根接口被称为SinkFunction.继承自Function这一标记接口.SinkFunction接口只提供了一个方法: void invoke(IN value) throws Exception; 该方法提供基

Apache Flink源码解析之stream-windowfunction

Window也即窗口,是Flink流处理的特性之一.前一篇文章我们谈到了Winodw的相关概念及其实现.窗口的目的是将无界的流转换为有界的元素集合,但这还不是最终的目的,最终的目的是在这有限的集合上apply(应用)某种函数,这就是我们本篇要谈的主题--WindowFunction(窗口函数). 那么窗口函数会在什么时候被应用呢?还记得上篇文章我们谈到了触发器Trigger,在触发器触发后会返回TriggerResult这个枚举类型的其中一个枚举值.当返回的是FIRE或者FIRE_AND_PUR

Apache Flink源码解析之stream-transformation

之前我们聊了Flink程序的source.sink就差transformation了.今天我们就来解读一下Flink的transformation.它们三者的关系如下图: 当然这还是从Flink编程API的角度来看的(编程视角).所谓的transformation,用于转换一个或多个DataStream从而形成一个新的DataStream对象.Flink提供编程接口,允许你组合这些transformation从而形成非常灵活的拓扑结构. StreamTransformation StreamTr

Flink 源码解析 —— 如何获取 ExecutionGraph ?

https://t.zsxq.com/UnA2jIi 博客 1.Flink 从0到1学习 -- Apache Flink 介绍 2.Flink 从0到1学习 -- Mac 上搭建 Flink 1.6.0 环境并构建运行简单程序入门 3.Flink 从0到1学习 -- Flink 配置文件详解 4.Flink 从0到1学习 -- Data Source 介绍 5.Flink 从0到1学习 -- 如何自定义 Data Source ? 6.Flink 从0到1学习 -- Data Sink 介绍 7

Flink 源码解析 —— JobManager 处理 SubmitJob 的过程

JobManager 处理 SubmitJob https://t.zsxq.com/3JQJMzZ 博客 1.Flink 从0到1学习 -- Apache Flink 介绍 2.Flink 从0到1学习 -- Mac 上搭建 Flink 1.6.0 环境并构建运行简单程序入门 3.Flink 从0到1学习 -- Flink 配置文件详解 4.Flink 从0到1学习 -- Data Source 介绍 5.Flink 从0到1学习 -- 如何自定义 Data Source ? 6.Flink

Flink 源码解析 —— 深度解析 Flink 序列化机制

Flink 序列化机制 https://t.zsxq.com/JaQfeMf 博客 1.Flink 从0到1学习 -- Apache Flink 介绍 2.Flink 从0到1学习 -- Mac 上搭建 Flink 1.6.0 环境并构建运行简单程序入门 3.Flink 从0到1学习 -- Flink 配置文件详解 4.Flink 从0到1学习 -- Data Source 介绍 5.Flink 从0到1学习 -- 如何自定义 Data Source ? 6.Flink 从0到1学习 -- Da

Flink 源码解析 —— 项目结构一览

Flink 源码项目结构一览 https://t.zsxq.com/MNfAYne 博客 1.Flink 从0到1学习 -- Apache Flink 介绍 2.Flink 从0到1学习 -- Mac 上搭建 Flink 1.6.0 环境并构建运行简单程序入门 3.Flink 从0到1学习 -- Flink 配置文件详解 4.Flink 从0到1学习 -- Data Source 介绍 5.Flink 从0到1学习 -- 如何自定义 Data Source ? 6.Flink 从0到1学习 --

[Apache Spark源码阅读]天堂之门——SparkContext解析

稍微了解Spark源码的人应该都知道SparkContext,作为整个Project的程序入口,其重要性不言而喻,许多大牛也在源码分析的文章中对其做了很多相关的深入分析和解读.这里,结合自己前段时间的阅读体会,与大家共同讨论学习一下Spark的入口对象—天堂之门—SparkContex. SparkContex位于项目的源码路径\spark-master\core\src\main\scala\org\apache\spark\SparkContext.scala中,源文件包含Classs Sp