Flink数据流图的生成----简单执行计划的生成

Flink的数据流图的生成主要分为简单执行计划-->StreamGraph的生成-->JobGraph的生成-->ExecutionGraph的生成-->物理执行图。其中前三个(ExecutionGraph的之前都是在client上生成的)。ExectuionGraph是JobGraph的并行版本,是在JobManager(master)端生成的。而物理执行图只是一个抽象的概念,其具体的执行是在多个slave上并行执行的。

原理分析

    

  Flink效仿了传统的关系型数据库在运行SQL时生成运行计划并对其进行优化的思路。在具体生成数据流图之前会生成一个运行计划,当程序执行execute方法时,才具体生成数据流图运行任务。

  首先Flink会加载数据源,读取配置文件,获取配置参数parallelism等,为source 的transformation对应的类型是SourceTransformation,opertorName是source,然后进入flatmap,用户重写了内置的flatmap内核函数,按照空格进行划分单词,获取到其各种配制参数,parallelism以及输出的类型封装Tuple2<String,Integer>,以及operatorName是Flat Map,其对应的Transformation类型是OneInputTransformation。然后开始keyby(0),其中0指的是Tuple2<String, Integer>中的String,其意义是按照word进行重分区,其对应的parallelism是4,operatorName是partition,Transformation的类型是PartitionTransformation,输出类型的封装是Tuple2<String, Integer>。接着sum(1),该函数的作用是把相同的key对应的值进行加1操作。其对应的parallelism是4,operatorName是keyed Aggregation,对应的输出类型封装是Tuple2<String, Integer>,Transformation的类型是OneInputTransformation。最后是进行结果输出处理sink,对应的parallelism是4,输出类型的封装是Tuple2<String, Integer>,对应的operatorName是sink,对应的Transformation类型是SinkTransformation。

源码

以WordCount.java为例:

 1 package org.apache.flink.streaming.examples.wordcount;
 2 public class WordCount {
 3     private static  Logger LOG = LoggerFactory.getLogger(WordCount.class);
 4     private static SimpleDateFormat df=new SimpleDateFormat("yyyy/MM/dd HH:mm:ss:SSS");
 5     public static long time=0;
 6     public static void main(String[] args) throws Exception {
 7         // Checking input parameters
 8         LOG.info("set up the execution environment: start= "+df.format(System.currentTimeMillis()));
 9         final ParameterTool params = ParameterTool.fromArgs(args);
10         final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
11         env.getConfig().setGlobalJobParameters(params);
12         DataStream<String> text;
13         if (params.has("input")) {
14             text = env.readTextFile(params.get("input"));
15         } else {
16             text = env.fromElements(WordCountData.WORDS);
17         }
18         DataStream<Tuple2<String, Integer>> counts =
19             text.flatMap(new Tokenizer()).keyBy(0).sum(1);
20         if (params.has("output")) {
21             counts.writeAsText(params.get("output"));
22         } else {
23             System.out.println("Printing result to stdout. Use --output to specify output path.");
24             counts.print();
25         }
26         env.execute("Streaming WordCount");
27     }
28     public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
29         private static final long serialVersionUID = 1L;
30         @Override
31         public void flatMap(String value, Collector<Tuple2<String, Integer>> out)
32                 throws Exception {
33             String[] tokens = value.toLowerCase().split("\\W+");
34             for (String token : tokens) {
35                 if (token.length() > 0) {
36                     out.collect(new Tuple2<String, Integer>(token, 1));
37                 }
38             }
39         }
40     }
41 }

  Flink在程序执行时,首先会获取程序需要的执行计划,类似数据的惰性加载,当具体执行execute()函数时,程序才会具体真正执行。首先执行

1 text = env.readTextFile(params.get("input"));

  该函数的作用是加载数据文件,获取数据源,形成source的属性信息,包括source的Transformation类型、并行度、输出类型等。源码如下:

 1 public final <OUT> DataStreamSource<OUT> readTextFile(OUT... data) {
 2         TypeInformation<OUT> typeInfo;
 3         try {
 4             typeInfo = TypeExtractor.getForObject(data[0]);
 5         }
 6         return fromCollection(Arrays.asList(data), typeInfo);
 7 }
 8
 9 public <OUT> DataStreamSource<OUT> fromCollection(Collection<OUT> data, TypeInformation<OUT> typeInfo) {
10         FromElementsFunction.checkCollection(data, typeInfo.getTypeClass());
11         SourceFunction<OUT> function;
12         try {
13             function = new FromElementsFunction<>(typeInfo.createSerializer(getConfig()), data);
14         }
15         catch (IOException e) {
16             throw new RuntimeException(e.getMessage(), e);
17         }
18         return addSource(function, "Collection Source", typeInfo).setParallelism(1);
19     }
20
21     public <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> function, String sourceName, TypeInformation<OUT> typeInfo) {
22         boolean isParallel = function instanceof ParallelSourceFunction;
23         clean(function);
24         StreamSource<OUT, ?> sourceOperator;
25         if (function instanceof StoppableFunction) {
26             sourceOperator = new StoppableStreamSource<>(cast2StoppableSourceFunction(function));
27         } else {
28             sourceOperator = new StreamSource<>(function);
29         }
30         return new DataStreamSource<>(this, typeInfo, sourceOperator, isParallel, sourceName);
31     }
32
33 public DataStreamSource(StreamExecutionEnvironment environment,
34             TypeInformation<T> outTypeInfo, StreamSource<T, ?> operator,
35             boolean isParallel, String sourceName) {
36         super(environment, new SourceTransformation<>(sourceName, operator, outTypeInfo, environment.getParallelism()));
37         this.isParallel = isParallel;
38         if (!isParallel) {
39             setParallelism(1);
40         }
41     }

获取source信息

  从上述代码可知,这部分会执行addSource()函数,通过new StreamSource,生成source的operator,然后通过new DataStreamSource生成SourceTransformation,获取并行度等。然后就是执行flatmap函数text.flatMap(new Tokenizer()),该函数内和source类似,也是获取Transformation类型、并行度、输出类型等。

 1 public <R> SingleOutputStreamOperator<R> flatMap(FlatMapFunction<T, R> flatMapper) {
 2         TypeInformation<R> outType = TypeExtractor.getFlatMapReturnTypes(clean(flatMapper),
 3                 getType(), Utils.getCallLocationName(), true);
 4         SingleOutputStreamOperator result = transform("Flat Map", outType, new StreamFlatMap<>(clean(flatMapper)));
 5         return result;
 6     }
 7
 8 public <R> SingleOutputStreamOperator<R> transform(String operatorName, TypeInformation<R> outTypeInfo, OneInputStreamOperator<T, R> operator) {
 9         transformation.getOutputType();
10         OneInputTransformation<T, R> resultTransform = new OneInputTransformation<>(
11                 this.transformation,
12                 operatorName,
13                 operator,
14                 outTypeInfo,
15                 environment.getParallelism());
16         SingleOutputStreamOperator<R> returnStream = new SingleOutputStreamOperator(environment, resultTransform);
17         getExecutionEnvironment().addOperator(resultTransform);
18         return returnStream;
19     }

flatmap获取信息

  对应该operator,其Transformation的类型是OneInputTransformation类型,对应着属性信息有该operator的名称,输出类型,执行的并行度等,然后会执行addOperator函数将该operator (flatmap)加入到执行环境中,以便后续执行。 接下来执行.keyBy(0),该函数的作用就是重分区,把word的单词作为key,然后按照key相同的放在一个分区内,方便执行。该函数的内部是形成其transformation类型(PartitionTransformation),以及相关的属性信息等。

 1 private KeyedStream<T, Tuple> keyBy(Keys<T> keys) {
 2         return new KeyedStream<>(this, clean(KeySelectorUtil.getSelectorForKeys(keys,
 3                 getType(), getExecutionConfig())));
 4     }
 5 public KeyedStream(DataStream<T> dataStream, KeySelector<T, KEY> keySelector, TypeInformation<KEY> keyType) {
 6         super(
 7             dataStream.getExecutionEnvironment(),
 8             new PartitionTransformation<>(
 9                 dataStream.getTransformation(),
10                 new KeyGroupStreamPartitioner<>(keySelector, StreamGraphGenerator.DEFAULT_LOWER_BOUND_MAX_PARALLELISM)));
11         this.keySelector = keySelector;
12         this.keyType = validateKeyType(keyType);
13         LOG.info("part of keyBy(partition): end= "+df.format(System.currentTimeMillis()));
14     }

keyby获取信息

  在上述代码中,keyby会创建一个PartitionTransformation,作为其Transformation的类型,该在类中会得到input(输入数据),以及partioner分区器。同样会得到执行的并行度、输出类型等信息。 接下来是sum(1),该函数的作用是按照keyby的word作为key,进行加1操作。源码如下:

1 protected SingleOutputStreamOperator<T> aggregate(AggregationFunction<T> aggregate) {
2         StreamGroupedReduce<T> operator = new StreamGroupedReduce<T>(
3                 clean(aggregate), getType().createSerializer(getExecutionConfig()));
4         return transform("Keyed Aggregation", getType(), operator);
5     }

aggregate

  在上述的代码中,可以看到,该operator的所属的类型是StreamGroupedReduce,对着着核心方法reduce(),通过new该对象,会获取到其operator的名称等属性信息,然后执行transform()函数,该函数的代码之前已经给出,主要的作用是创建一个该operator的Transformation类型,即OneInputTransformtion,会得到并行度、输出类型等属性信息,然后执行addOperator()函数,将operator加入执行环境,让能起能够具体执行任务。 接下来会对结果进行输出,将执行counts.print(),该函数内部对应着一个operator,即sink(具体的逻辑就是结果输出),源码如下:

 1 public DataStreamSink<T> print() {
 2         PrintSinkFunction<T> printFunction = new PrintSinkFunction<>();
 3         return addSink(printFunction);
 4     }
 5 public DataStreamSink<T> addSink(SinkFunction<T> sinkFunction) {
 6         transformation.getOutputType();
 7         if (sinkFunction instanceof InputTypeConfigurable) {
 8             ((InputTypeConfigurable) sinkFunction).setInputType(getType(), getExecutionConfig());
 9         }
10         StreamSink<T> sinkOperator = new StreamSink<>(clean(sinkFunction));
11         DataStreamSink<T> sink = new DataStreamSink<>(this, sinkOperator);
12         getExecutionEnvironment().addOperator(sink.getTransformation());
13         return sink;
14     }
15 protected DataStreamSink(DataStream<T> inputStream, StreamSink<T> operator) {
16         this.transformation = new SinkTransformation<T>(inputStream.getTransformation(), "Unnamed", operator, inputStream.getExecutionEnvironment().getParallelism());
17     }

sink获取信息

  print()函数内部只有一个方法:addSink(),其功能和addSource()一样,首先会创建一个StreamSink,生成一个operator对象,然后创建DataStreamSink,该类中会创建一个该operator的Transformation类型即SinkTransformtion,会得到该operator的名称,并行度,输出类型等属性信息。同样,会执行addOperator()函数,该函数的作用将该operator加入到env执行环境中,用来进行具体操作。

原文地址:https://www.cnblogs.com/liuzhongfeng/p/8653842.html

时间: 2024-10-20 09:53:50

Flink数据流图的生成----简单执行计划的生成的相关文章

执行计划的生成

SQL Server使用许多技术来优化资源消耗: 基于语法的查询优化: 无用计划匹配以避免对简单查询的深度优化: 根据当前分布统计的索引和连接策略: 多阶段的查询优化以控制优化开销: 执行计划缓冲以避免重新生成执行计划: 以上技术按以下顺序执行: 解析器: 代数化器: 查询优化器: 执行计划生成,缓冲和hash计划生成: 查询执行: 其执行顺序如下: 一.解析器(parser) 当查询被提交时,SQL Server将它传递给关系引擎中的解析器. 关系引擎-负责解析.名称和类型解析.优化和按照查询

SqlServer 中如何查看某一个Sql语句是复用了执行计划,还是重新生成了执行计划

我们知道SqlServer的查询优化器会将所执行的Sql语句的执行计划作缓存,如果后续查询可以复用缓存中的执行计划,那么SqlServer就会为后续查询复用执行计划而不是重新生成一个新的执行计划,因为复用执行计划的性能比生成执行计划的性能要高很多,所以SqlServer的这一特性可以大大提高Sql语句的执行效率.特别是对于存储过程,因为存储过程的执行计划是在存储过程第一次执行的时候生成的,存储过程的执行计划生成后就会被缓存到SqlServer的执行计划列表中,如果以后存储过程再被执行,那么存储过

Pig源码分析: 简析执行计划的生成

摘要 本文通过跟代码的方式,分析从输入一批Pig-latin到输出物理执行计划(与launcher引擎有关,一般是MR执行计划,也可以是Spark RDD的执行算子)的整体流程. 不会具体涉及AST如何解析.如何使用了Anltr.逻辑执行计划如何映射.逻辑执行计划如何优化.MR执行计划如何切分为MR Job,而是从输入一批Pig DSL到待执行的真正执行计划的关键变化步骤(方法和类). 执行计划完整解析 入口处书Main类的main函数 /** * The Main-Class for the

Sql Server之旅——第十一站 简单说说sqlserver的执行计划

原文:Sql Server之旅--第十一站 简单说说sqlserver的执行计划 我们知道sql在底层的执行给我们上层人员开了一个窗口,那就是执行计划,有了执行计划之后,我们就清楚了那些烂sql是怎么执行的,这样 就可以方便的找到sql的缺陷和优化点. 一:执行计划生成过程 说到执行计划,首先要知道的是执行计划大概生成的过程,这样就可以做到就心中有数了,下面我画下简图: 1. 分析过程 这三个比较容易理解,首先我们要保证sql的语法不能错误,select和join的表是必须存在的,以及你是有执行

利用DBMS_STATS包修改统计信息,欺骗优化器,生成糟糕的执行计划

在使用基于成本的优化器的优化器时,优化器生产执行计划时要估算每条SQL的执行成本,选择最佳的执行计划来执行sql语句.通过操纵统计信息就可以简介操纵执行计划的生成. 当然 需要强调的一点是,这是非常危险的行为 1 创建测试表 SQL> create table test_stats  as   2  select * from dba_objects ; Table created. 2 收集统计信息 SQL> EXEC dbms_stats.gather_table_stats(ownnam

通过重新生成执行计划解决绑定变量执行计划偏差导致SQL执行时间过长

基本要素(时间.用户.问题) 用户11g环境下有段SQL语句在程序中执行效率非常差,但是在plsql中执行却很快,通过查看执行计划,发现使用了不同的索引导致,程序中执行的如下: PLSQL中执行的效果如下: 可以看到差别,使用门诊费用记录_IX_登记时间索引是在plsql中的执行计划,使用门诊费用记录_UQ_NO的是程序中的执行计划,两者SQL是完全相同的,唯一却别就是前者使用了绑定变量,后者是直接带参数值执行. 问题分析 问题很明显,由于绑定变量生成的执行计划与实际有偏差,11g本来有个绑定变

淘宝数据库OceanBase SQL编译器部分 源码阅读--生成物理查询计划

SQL编译解析三部曲分为:构建语法树,制定逻辑计划,生成物理执行计划.前两个步骤请参见我的博客<<淘宝数据库OceanBase SQL编译器部分 源码阅读--解析SQL语法树>>和<<淘宝数据库OceanBase SQL编译器部分 源码阅读--生成逻辑计划>>.这篇博客主要研究第三步,生成物理查询计划. 一. 什么是物理查询计划 与之前的阅读方法一致,这篇博客的两个主要问题是what 和how.那么什么是物理查询计划?物理查询计划能够直接执行并返回数据结果数

Oracle SQL执行计划基线总结(SQL Plan Baseline)

一.基础概念 Oracle 11g开始,提供了一种新的固定执行计划的方法,即SQL plan baseline,中文名SQL执行计划基线(简称基线),可以认为是OUTLINE(大纲)或者SQL PROFILE的改进版本,基本上它的主要作用可以归纳为如下两个: 1.稳定给定SQL语句的执行计划,防止执行环境或对象统计信息等等因子的改变对SQL语句的执行计划产生影响! 2.减少数据库中出现SQL语句性能退化的概率,理论上不允许一条语句切换到一个比已经执行过的执行计划慢很多的新的执行计划上! 注意:

SQL Server中参数化SQL写法遇到parameter sniff ,导致不合理执行计划重用的一种解决方案

parameter sniff问题是重用其他参数生成的执行计划,导致当前参数采用该执行计划非最优化的现象.想必熟悉数据的同学都应该知道,产生parameter sniff最典型的问题就是使用了参数化的SQL(或者存储过程中使用了参数化)写法,如果存在数据分布不均匀的情况下,正常情况下生成的执行计划,在传入在分布数据较多的参数的情况下,重用了正常参数生成的执行计划,而这种缓存的执行计划并非适合当前参数的一种情况. 这种情况,在实际业务中,出现的频率还是比较高的,因为存储过程一般都是采用参数化的写法