Flink批处理优化器之范围分区重写

为最终计划应用范围分区重写

Flink的批处理程序允许用户使用partitionByRange API来基于某个(或某些)字段进行按范围分区且可以选择性地指定排序顺序,示例代码如下:

final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

final DataSet<Tuple2<Integer, String>> ds = getTupleDataSet(env);
ds.partitionByRange(0).withOrders(Order.ASCENDING, Order.DESCENDING);

在使用范围分区这一特性时,需要尽可能保证各分区所处理的数据集均衡性以最大化利用计算资源并减少作业的执行时间。为此,优化器提供了范围分区重写器(RangePartitionRewriter)来对范围分区的分区策略进行优化,使其尽可能平均地分配数据,避免数据倾斜。要做到这一点需要对数据集的范围有足够的“了解”,RangePartitionRewriter通过对数据集进行采样来得到分区的范围。接下来我们就来分析RangePartitionRewriter的实现细节。

范围分区重写器

范围分区重写器(RangePartitionRewriter)同样遍历的是最终选择的计划并作用于计划节点(PlanNode),其主要用于在后置遍历时对传输策略为范围分区节点的输入端通道的连接情况进行重写,核心逻辑如下:

//提取当前所有的计划节点的输入通道
final Iterable<Channel> inputChannels = node.getInputs();
//遍历输入通道
for (Channel channel : inputChannels) {
    ShipStrategyType shipStrategy = channel.getShipStrategy();
    // 确保优化的通道的数据传输策略为范围分区
    if (shipStrategy == ShipStrategyType.PARTITION_RANGE) {

        if(channel.getDataDistribution() == null) {
            if (node.isOnDynamicPath()) {
                throw new InvalidProgramException("Range Partitioning not supported within iterations "
                    + " if users do not supply the data distribution.");
            }

            //对该通道的范围分区进行“重写”,并将当前通道从源计划节点的通道中删除,然后加入新的通道集合
            PlanNode channelSource = channel.getSource();
            List<Channel> newSourceOutputChannels = rewriteRangePartitionChannel(channel);
            channelSource.getOutgoingChannels().remove(channel);
            channelSource.getOutgoingChannels().addAll(newSourceOutputChannels);
        }
    }
}

上述代码段的关键在于对rewriteRangePartitionChannel方法的调用,它封装了对最终计划进行改写的逻辑,改写产生的逻辑Dataflow对比示意图如下:

由上图可见,改写后的逻辑dataflow被拆分成两个分支:上层分支主要完成的功能是采样跟构建范围边界,我们将其简称为“采样分支”;下层分支则用于对记录分区的索引进行查找、路由以及相关处理,可简称为“数据处理分支”。两分支之间有一个衔接关系在于:采样分支最终会输出“范围边界”,并将其以广播变量的形式传递给数据处理分支(见图中的虚线部分),数据处理分支将依据范围边界为来自source的记录查找该归属的分区编号。你可能会产生疑惑:依照这种表述来看,采样分支和数据处理分支是有前后的时序依赖关系的,而单纯的逻辑Dataflow中从source分拆的两个分支通常没有这种关系。那么Flink是如何保证该依赖关系的呢?答案在于数据处理分支的第一个channel,其数据交换模式(DataExchangeMode)被设置为Batch模式(见图中括号的标注,没有特别备注的数据交换模式默认都是Pipeline),Batch模式将数据生产者跟消费者解耦并使得它们不必时刻互相依赖(当数据都生产完成之后,消费者才消费),这也避免了数据处理分支开始处理数据流时还没有收到来自采样分支的范围边界广播变量。

具体而言,其核心流程可分解为如下六步:

  1. 为每个分区采样固定数目的记录作为样本;
  2. 让中央协调器从每个分区的样本中采样固定数目的样本作为最终的样本;
  3. 基于最终样本数据构建范围边界;
  4. 将范围边界作为广播变量传递同时为每个记录构建<分区编号,记录>的二元组并输出然后以自定义分区来分区记录;
  5. 找到记录的分区之后,分区编号就没有存在的意义了,因此为流中的记录移除分区编号;
  6. 连接目标节点

关于采样算法的细节我们将会在下一小节专门进行分析,因此这里我们先假设已采样完成并从广播变量中得到了范围边界。接下来我们来分析数据处理分支的核心逻辑。当记录到来后需要确定它要落到哪个分区,这需要对范围边界集合进行查找并定位分区编号,优化器提供了一个RangeBoundaries接口,其定义了一个方法来提供该功能:

int getRangeIndex(T record);

其通用实现CommonRangeBoundaries使用二分查找来实现该方法:

public int getRangeIndex(T record) {
    return binarySearch(record);
}

CommonRangeBoundaries将会被应用在一个名为AssignRangeIndex的UDF(扩展自:RichMapPartitionFunction)中。AssignRangeIndex首先获取“范围边界”这一广播变量,然后构建CommonRangeBoundaries的实例,随之遍历当前聚集的分区数据并一一查找其分区编号以构建二元组,然后输出到下游,代码如下:

public void mapPartition(Iterable<IN> values, Collector<Tuple2<Integer, IN>> out) throws Exception {
    List<Object> broadcastVariable = getRuntimeContext().getBroadcastVariable("RangeBoundaries");
    if (broadcastVariable == null || broadcastVariable.size() != 1) {
        throw new RuntimeException("AssignRangeIndex require a single RangeBoundaries as broadcast input.");
    }
    Object[][] boundaryObjects = (Object[][]) broadcastVariable.get(0);
    RangeBoundaries rangeBoundaries = new CommonRangeBoundaries(typeComparator.createComparator(),
        boundaryObjects);

    Tuple2<Integer, IN> tupleWithPartitionId = new Tuple2<>();

    for (IN record : values) {
        tupleWithPartitionId.f0 = rangeBoundaries.getRangeIndex(record);
        tupleWithPartitionId.f1 = record;
        out.collect(tupleWithPartitionId);
    }
}

以AssignRangeIndex构建的运算符所产生的计划节点连接着自定义的分区器来对为记录路由到指定的分区:

//以下标为0的字段(也即上面查找到的分区索引)作为分区依据
final FieldList keys = new FieldList(0);
partChannel.setShipStrategy(ShipStrategyType.PARTITION_CUSTOM, keys,
    idPartitioner, DataExchangeMode.PIPELINED);

当记录(此时已是上面的二元组了)被路由到正确的分区之后,分区编号已没有用了,不需要再往下游传输了,优化器又定义了一个名为RemoveRangeIndex的UDF来移除分区编号,具体的做法是只输出二元组里下标为1的字段。最终将以RemoveRangeIndex构建的运算符所生成的计划节点替换通道原先的source节点并使得其与target节点进行连接。


微信扫码关注公众号:Apache_Flink


QQ扫码关注QQ群:Apache Flink学习交流群(123414680)

时间: 2024-08-29 18:34:56

Flink批处理优化器之范围分区重写的相关文章

浅谈Flink批处理优化器之Join优化

跟传统的关系型数据库类似,Flink提供了优化器"hint"(提示)以告诉优化器选择一些执行策略.目前优化提示主要针对批处理中的连接(join).在批处理中共有三个跟连接有关的转换函数: join:默认为等值连接(Equi-join),维基百科将其归类为内连接(inner join)的一种 https://en.wikipedia.org/wiki/Join_(SQL): outerJoin:外连接,具体细分为left-outer join.right-outer join.full-

Flink批处理优化器之成本估算

成本估算 在基于成本的优化器中,成本估算非常重要,它直接影响着候选计划的生成.在Flink中成本估算依赖于每个不同的运算符所提供的自己的"预算",本篇我们将分析什么是成本.运算符如何提供自己的预算以及如何基于预算估算成本. 什么是成本 Flink以类Costs来定义成本,它封装了一些成本估算的因素同时提供了一些针对成本对象的计算方法(加.减.乘.除)以及对这些因素未知值的认定与校验. "cost"一词也有译作:开销.代价,将其视为同义即可. Flink当前将成本估算

jvm性能优化及内存分区

jvm性能优化及内存分区 2012-09-17 15:51:37 分类: Java Some of the default values for Sun JVMs are listed below. JDK 1.3.1_06 Initial Size Maximum Size Client JVM 1MB 32MB Server JVM 1MB 64MB JDK 1.4.1_01 Initial Size Maximum Size Client JVM 4MB 64MB Server JVM 4

批处理优化项目总结

2015.10-2016.3月,参与批处理优化项目: 用到的技术:oracle执行计划,oracle hint优化器,java缓存,java多线程并发,javaweb监听器, 遇到的问题及解决: 开发一个static时,没有考虑的并发的情况,导致数据出现异常. 在一个类中声明了enum,但是这个类编程后生成3个class文件,但是上线只上了一个同名class,导致nopointernullException 和需求人员沟通是没沟通完整,

深度剖析阿里巴巴对Apache Flink的优化与改进

本文主要从两个层面深度剖析:阿里巴巴对Flink究竟做了哪些优化? 取之开源,用之开源 一.SQL层 为了能够真正做到用户根据自己的业务逻辑开发一套代码,能够同时运行在多种不同的场景,Flink首先需要给用户提供一个统一的API.在经过一番调研之后,阿里巴巴实时计算认为SQL是一个非常适合的选择.在批处理领域,SQL已经经历了几十年的考验,是公认的经典.在流计算领域,近年来也不断有流表二象性.流是表的ChangeLog等理论出现.在这些理论基础之上,阿里巴巴提出了动态表的概念,使得流计算也可以像

Unity中的批处理优化与GPU Instancing【转】

我们都希望能够在场景中投入一百万个物体,不幸的是,渲染和管理大量的游戏对象是以牺牲CPU和GPU性能为代价的,因为有太多Draw Call的问题,最后我们必须找到其他的解决方案. 在本文中,我们将讨论两种优化技术,它们可以帮助您减少Unity游戏中的Draw Call数量以提高整体性能:批处理和GPU Instancing. 批处理开发者在日常工作中遇到的最常见的问题之一是性能不足,这是由于CPU和GPU的运行能力不足.一些游戏可以运行在PC上,但是在移动设备上不行.游戏运行时运行是否流畅受Dr

关于Flink slot 和kafka topic 分区关系的说明

今天又有小伙伴在群里问 slot 和 kafka topic 分区(以下topic,默认为 kafka 的 topic )的关系,大概回答了一下,这里整理一份 首先必须明确的是,Flink Task Manager 的 slot 数 和 topic 的分区数是没有直接关系的,而这个问题其实是问的是: 任务的并发数与 slot 数的关系 最大并发数 = slot 数 这里有两个原因:每个算子的不同并行不能在同一slot,不同的算子可以共享 slot ,所以最大并行度 就等于 slot 数. 这样就

开源CEGUI编辑器之二(MFC重写的ImagesetEditor)

转载请注明出处:帘卷西风的专栏(http://blog.csdn.net/ljxfblog) 最近在整理自己几年前一直在研究的一套代码,使用OGRE+CEGUI开发的客户端引擎框架.当年自己倾心研究的东西,终究还是没能挺过时间的车轮,逐渐失去价值.以后估计再也没有时间去扩展和使用了,所以打算整理好之后将之逐步开源,希望能给喜欢研究这类端游技术的朋友吧. CEGUI自己也有编辑器,但是是使用类似MFC的开源软件wxWidgets实现的.开源的代码能给其他人带来好处,但是也有他的弱点,不够友好,缺乏

Oracle优化器之基数反馈 (Cardinality Feedback)功能

概述 在Oracle 11gR2的版本上推出了基数反馈(Cardinality Feedback 以后简称CFB)功能,通过这个特性,对于某些查询在第一次执行时,如果CBO发现根据统计信息估算出的基数(Computed cardinality)和SQL执行时的实际值差距很大的情况发生时,在SQL下次执行时,会根据实际值调整基数,重新生成执行计划. 另外,基数反馈 (CFB)在12c版本上得到更进一步的扩展改称为统计反馈(Statistics Feedback),成为12c自动重新优化(Autom