Flink流处理之迭代案例

当前Flink将迭代的重心集中在批处理上,之前我们谈及了批量迭代和增量迭代主要是针对批处理(DataSet)API而言的,并且Flink为批处理中的迭代提供了针对性的优化。但是对于流处理(DataStream),Flink同样提供了对迭代的支持,这一节我们主要来分析流处理中的迭代,我们将会看到流处理中的迭代相较于批处理有相似之处,但差异也是十分之明显。

可迭代的流处理程序允许定义“步函数”(step function)并将其内嵌到一个可迭代的流(IterativeStream)中。因为一个流处理程序可能永不终止,因此不同于批处理中的迭代机制,流处理中无法设置迭代的最大次数。取而代之的是,你可以指定等待反馈输入的最大时间间隔(如果超过该时间间隔没有反馈元素到来,那么该迭代将会终止)。通过应用split或filter转换,你可以指定流的哪一部分用于反馈给迭代头,哪一部分分发给下游。这里我们以filter作为示例来展示可迭代的流处理程序的API使用模式。

首先,基于输入流构建IterativeStream,这是一个迭代的起始,通常称之为迭代头:

IterativeStream<Integer> iteration = inputStream.iterate();

接着,我们指定一系列的转换操作用于表述在迭代过程中执行的逻辑(这里简单以map转换作为示例),map API所接受的UDF就是我们上文所说的步函数:

DataStream<Integer> iteratedStream = iteration.map(/* this is executed many times */);

然后,作为迭代我们肯定需要有数据反馈给迭代头进行重复计算,所以我们从迭代过的流中过滤出符合条件的元素组成的部分流,我们称之为反馈流:

DataStream<Integer> feedbackStream = iteratedStream.filter(/* one part of the stream */);

将反馈流反馈给迭代头就意味着一个迭代的完整逻辑的完成,那么它就可以“关闭”这个闭合的“环”了。通过调用IterativeStream的closeWith这一实例方法可以关闭一个迭代(也可表述为定义了迭代尾)。传递给closeWith的数据流将会反馈给迭代头:

iteration.closeWith(feedbackStream);

另外,一个惯用的模式是过滤出需要继续向前分发的部分流,这个过滤转换其实定义的是“终止迭代”的逻辑条件,符合条件的元素将被分发给下游而不用于进行下一次迭代:

DataStream<Integer> output = iteratedStream.filter(/* some other part of the stream */);

跟分析批处理中的迭代一样,我们仍然以解决实际问题的案例作为切入点来看看流处理中的迭代跟批处理中的迭代有何不同。

首先描述一下需要解决的问题:产生一个由一系列二元组(两个字段都是在一个区间内产生的正整数来作为斐波那契数列的两个初始值)构成的数据流,然后对该数据流中的二元组不断地迭代使其产生斐波那契数列,直到某次产生的值大于给定的阈值,则停止迭代并输出迭代次数。

该案例参考自Flink随源码发布的迭代示例,此案例问题规模较小并且能够说明问题。但它示例代码中的一系列变量稍显混乱,为了增强程序的表述性,笔者会对其稍作调整。

这个案例如果拆分到对单个元素(二元组)的角度来看,其执行过程如下图所示:

n表示迭代次数,在最初的map转换中初始化为0;m是判定迭代停止的阈值;

另外,T后面跟的是字段索引,比如T2表示取元组中位置为3的字段。且注意随着迭代T在不断变化。

上面我们已经对问题的核心过程进行了分析,接下来我们会分步解决这个问题的构建迭代的流处理程序。

首先,我们先通过source函数创建初始的流对象inputStream:

DataStream<Tuple2<Integer, Integer>> inputStream = env.addSource(new RandomFibonacciSource());

该source函数会生成二元组序列,二元组的两个字段值是随机生成的作为斐波那契数列的初始值:

private static class RandomFibonacciSource
    implements SourceFunction<Tuple2<Integer, Integer>> {    

    private Random random = new Random();
    private volatile boolean isRunning = true;
    private int counter = 0;   

    public void run(SourceContext<Tuple2<Integer, Integer>> ctx) throws Exception {
        while (isRunning && counter < MAX_RANDOM_VALUE) {
            int first = random.nextInt(MAX_RANDOM_VALUE / 2 - 1) + 1;
            int second = random.nextInt(MAX_RANDOM_VALUE / 2 -1) + 1;  

            if (first > second) continue;            

            ctx.collect(new Tuple2<Integer, Integer>(first, second));
            counter++;
            Thread.sleep(50);
        }
    }    

    public void cancel() {
        isRunning = false;
    }
}

为了对新计算的斐波那契数列中的值以及累加的迭代次数进行存储,我们需要将二元组数据流转换为五元组数据流,并据此创建迭代对象:

IterativeStream<Tuple5<Integer, Integer, Integer, Integer, Integer>> iterativeStream =
    inputStream.map(new TupleTransformMapFunction()).iterate(5000);

注意上面代码段中iterate API的参数5000,不是指迭代5000次,而是等待反馈输入的最大时间间隔为5秒。流被认为是无界的,所以无法像批处理迭代那样指定最大迭代次数。但它允许指定一个最大等待间隔,如果在给定的时间间隔里没有元素到来,那么将会终止迭代。

元组转换的map函数实现:

private static class TupleTransformMapFunction extends RichMapFunction<Tuple2<Integer,
    Integer>, Tuple5<Integer, Integer, Integer, Integer, Integer>> {
    public Tuple5<Integer, Integer, Integer, Integer, Integer> map(
        Tuple2<Integer, Integer> inputTuples) throws Exception {
        return new Tuple5<Integer, Integer, Integer, Integer, Integer>(
            inputTuples.f0,
            inputTuples.f1,
            inputTuples.f0,
            inputTuples.f1,
            0);
    }
}

上面五元组中,其中索引为0,1这两个位置的元素,始终都是最初生成的两个元素不会变化,而后三个字段都会随着迭代而变化。

在迭代流iterativeStream创建完成之后,我们将基于它执行斐波那契数列的步函数并产生斐波那契数列流fibonacciStream:

DataStream<Tuple5<Integer, Integer, Integer, Integer, Integer>> fibonacciStream =
    iterativeStream.map(new FibonacciCalcStepFunction());

这里的fibonacciStream只是一个代称,其中的数据并不是真正的斐波那契数列,其实就是上面那个五元组。

其中用于计算斐波那契数列的步函数实现如下:

private static class FibonacciCalcStepFunction extends
    RichMapFunction<Tuple5<Integer, Integer, Integer, Integer, Integer>,
    Tuple5<Integer, Integer, Integer, Integer, Integer>> {
    public Tuple5<Integer, Integer, Integer, Integer, Integer> map(
        Tuple5<Integer, Integer, Integer, Integer, Integer> inputTuple) throws Exception {
        return new Tuple5<Integer, Integer, Integer, Integer, Integer>(
            inputTuple.f0,
            inputTuple.f1,
            inputTuple.f3,
            inputTuple.f2 + inputTuple.f3,
            ++inputTuple.f4);
    }
}

正如上文所述,后三个字段会产生变化,在计算之前,数列最后一个元素会被保留,也就是f3对应的元素,然后通过f2元素加上f3元素会产生最新值并更新f3元素,而f4则会累加。

随着迭代次数增加,不是整个数列都会被保留,只有最初的两个元素和最新的两个元素会被保留,这里也没必要保留整个数列,因为我们不需要完整的数列,我们只需要对最新的两个元素进行判断即可。

上文我们对每个元素计算斐波那契数列的新值并产生了fibonacciStream,但是我们需要对最新的两个值进行判断,看它们是否超过了指定的阈值。超过了阈值的元组将会被输出,而没有超过的则会再次参与迭代。因此这将产生两个不同的分支,我们也为此构建了分支流:

SplitStream<Tuple5<Integer, Integer, Integer, Integer, Integer>> branchedStream =
    fibonacciStream.split(new FibonacciOverflowSelector());

而对是否超过阈值的元组进行判断并分离的实现如下:

private static class FibonacciOverflowSelector implements OutputSelector<
    Tuple5<Integer, Integer, Integer, Integer, Integer>> {
    public Iterable<String> select(
        Tuple5<Integer, Integer, Integer, Integer, Integer> inputTuple) {
        if (inputTuple.f2 < OVERFLOW_THRESHOLD && inputTuple.f3 < OVERFLOW_THRESHOLD) {
            return Collections.singleton(ITERATE_FLAG);
        }        

        return Collections.singleton(OUTPUT_FLAG);
    }
}

在筛选方法select中,我们对不同的分支以不同的常量标识符进行标识:ITERATE_FLAG(还要继续迭代)和OUTPUT_FLAG(直接输出)。

产生了分支流之后,我们就可以从中检出不同的流分支做迭代或者输出处理。对需要再次迭代的,就通过迭代流的closeWith方法反馈给迭代头:

iterativeStream.closeWith(branchedStream.select(ITERATE_FLAG));

而对于不需要的迭代就直接让其流向下游处理,这里我们只是简单得将流“重构”了一下然后直接输出:

DataStream<Tuple3<Integer, Integer, Integer>> outputStream = branchedStream
    .select(OUTPUT_FLAG).map(new BuildOutputTupleMapFunction());
outputStream.print();

所谓的重构就是将之前的五元组重新缩减为三元组,实现如下:

private static class BuildOutputTupleMapFunction extends RichMapFunction<
    Tuple5<Integer, Integer, Integer, Integer, Integer>,
    Tuple3<Integer, Integer, Integer>> {
    public Tuple3<Integer, Integer, Integer> map(Tuple5<Integer, Integer, Integer, Integer,
        Integer> inputTuple) throws Exception {
        return new Tuple3<Integer, Integer, Integer>(
            inputTuple.f0,
            inputTuple.f1,
            inputTuple.f4);
    }
}

最终我们将会得到类似如下的输出:

(7,14,5)

(18,37,3)

(3,46,3)

(23,32,3)

(31,43,2)

(13,45,2)

(37,42,2)

……

前两个整数是斐波那契数列的两个初始值,第三个整数表示其需要经历多少次迭代其斐波那契数列最新的两个值才会超过阈值。

最终完整的主干程序代码如下:

public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment
        .getExecutionEnvironment().setBufferTimeout(1);

    DataStream<Tuple2<Integer, Integer>> inputStream = env.addSource(new RandomFibonacciSource());    

    IterativeStream<Tuple5<Integer, Integer, Integer, Integer, Integer>> iterativeStream =
        inputStream.map(new TupleTransformMapFunction()).iterate(5000);    

    DataStream<Tuple5<Integer, Integer, Integer, Integer, Integer>> fibonacciStream =
        iterativeStream.map(new FibonacciCalcStepFunction());    

    SplitStream<Tuple5<Integer, Integer, Integer, Integer, Integer>> branchedStream =
        fibonacciStream.split(new FibonacciOverflowSelector());    

    iterativeStream.closeWith(branchedStream.select(ITERATE_FLAG));    

    DataStream<Tuple3<Integer, Integer, Integer>> outputStream = branchedStream
        .select(OUTPUT_FLAG).map(new BuildOutputTupleMapFunction());    

    outputStream.print();
    env.execute("Streaming Iteration Example");
}

微信扫码关注公众号:Apache_Flink


QQ扫码关注QQ群:Apache Flink学习交流群(123414680)

时间: 2025-01-13 09:41:39

Flink流处理之迭代案例的相关文章

Flink流计算编程--在WindowedStream中体会EventTime与ProcessingTime

一.Flink流处理简介 Flink流处理的API叫做DataStream,可以在保证Exactly-Once的前提下提供高吞吐.低延时的实时流处理.用Flink作为流处理框架成功的案例可参考Flink母公司–Data Artisans官方blog中的2篇文章: How we selected Apache Flink as our Stream Processing Framework at the Otto Group Business Intelligence Department RBE

Flink流计算编程--在双流中体会joinedStream与coGroupedStream

一.joinedStream与coGroupedStream简介 在实际的流计算中,我们经常会遇到多个流进行join的情况,Flink提供了2个Transformations来实现. 如下图: 注意:Join(Cogroups) two data streams on a given key and a common window.这里很明确了,我们要在2个DataStream中指定连接的key以及window下来运算. 二.SQL比较 我们最熟悉的SQL语言中,如果想要实现2个表join,可以

Apache Flink流分区器剖析

这篇文章介绍Flink的分区器,在流进行转换操作后,Flink通过分区器来精确得控制数据流向. StreamPartitioner StreamPartitioner是Flink流分区器的基类,它只定义了一个抽象方法: public abstract StreamPartitioner<T> copy(); 但这个方法并不是各个分区器之间互相区别的地方,定义不同的分区器的核心在于--各个分区器需要实现channel选择的接口方法: int[] selectChannels(T record,

Flink流计算随笔(1)

相比 Spark Stream.Kafka Stream.Storm 等,为什么阿里会选择 Flink 作为新一代流式计算引擎?前期经过了哪些调研和对比? 大沙:我们是 2015 年开始调研新一代流计算引擎的.我们当时的目标就是要设计一款低延迟.exactly once.流和批统一的,能够支撑足够大体量的复杂计算的引擎.Spark streaming 的本质还是一款基于 microbatch 计算的引擎.这种引擎一个天生的缺点就是每个 microbatch 的调度开销比较大,当我们要求越低的延迟

Flink流处理的动态实时亿级全端用户画像系统视频课程分享

基于Flink流处理的动态实时亿级全端用户画像系统课程下载: https://pan.baidu.com/s/1YtMs-XG5-PsTFV9_7-AlfA 提取码: 639m 项目中采用到的算法包含Logistic Regression.Kmeans.TF-IDF等,Flink暂时支持的算法比较少,对于以上算法,本课程将手把手带大家用Flink实现,并且结合真实场景,学完即用.本套教程的Flink算法部分属于国内课程首创. 系统包含所有终端的数据(移动端.pc端.小程序端,快应用等等),支持亿

Flink流式引擎技术分析--大纲

Flink简介 Flink组件栈 Flink特性 流处理特性 API支持 Libraries支持 整合支持 Flink概念 Stream.Transformation.Operator Parallel Dataflow Task.Operator Chain Window Time Flink架构 JobManager TaskManager Client Flink调度 逻辑调度 物理调度 Flink容错 Flink的集群部署 环境准备 集群安装 集群启动 案例测试 整体执行过程 数据流图的

flink流计算随笔(3)

Stateful Computations over Data Streams(在数据流的有状态计算)Apache Flink是一个用于分布式流和批处理数据的开源平台.Flink的核心是一个流数据流引擎,它为数据流上的分布式计算提供数据分布.通信和容错能力.Flink在流引擎之上构建批处理,覆盖本地迭代支持.托管内存和程序优化.通常在程序中的转换和数据流中的操作符之间存在一对一的对应关系.然而,有时一个转换可能包含多个转换操作符. 在串流连接器和批处理连接器文档中记录了源和汇(Sources a

Flink 流处理API之一

1. Environment 1.1 getExecutionEnvironment 创建一个执行环境,表示当前执行程序的上下文. 如果程序是独立调用的,则此方法返回本地执行环境 如果从命令行客户端调用程序以提交到集群,则此方法返回此集群的执行环境 也就是说,getExecutionEnvironment会根据查询运行的方式决定返回什么样的运行环境,是最常用的一种创建执行环境的方式. 批处理环境 val env = ExecutionEnvironment.getExecutionEnviron

Flink流处理之窗口算子分析

窗口算子WindowOperator是窗口机制的底层实现,它几乎会牵扯到所有窗口相关的知识点,因此相对复杂.本文将以由面及点的方式来分析WindowOperator的实现.首先,我们来看一下对于最常见的时间窗口(包含处理时间和事件时间)其执行示意图: 上图中,左侧从左往右为事件流的方向.方框代表事件,事件流中夹杂着的竖直虚线代表水印,Flink通过水印分配器(TimestampsAndPeriodicWatermarksOperator和TimestampsAndPunctuatedWaterm