Apache Flink流作业提交流程分析

提交流程调用的关键方法链

用户编写的程序逻辑需要提交给Flink才能得到执行。本文来探讨一下客户程序如何提交给Flink。鉴于用户将自己利用Flink的API编写的逻辑打成相应的应用程序包(比如Jar)然后提交到一个目标Flink集群上去运行是比较主流的使用场景,因此我们的分析也基于这一场景进行。

Flink的API针对不同的执行环境有不同的Environment对象,这里我们主要基于常用的RemoteStreamEnvironmentRemoteEnvironment进行分析

在前面我们谈到了Flink中实现了“惰性求值”,只有当最终调用execute方法时,才会“真正”开始执行。因此,execute方法是我们的切入点。

其源码位于org.apache.flink.streaming.api.environment.RemoteStreamEnvironment

首先,我们来看一下其execute方法触发的关键方法调用链示意图:

根据上图的调用链,我们针对这些关键方法进行剖析,当然一些细节性的内容我们可能会暂时略过,这样可以保证主路径一直都很清晰。

getStreamGraph方法用于获得一个StreamGraph的实例,该实例表示流的完整的拓扑结构并且包含了生成JobGraph所必要的相关信息(包含了sourcesink的集合以及这些在图中的“节点”抽象化的表示、一些虚拟的映射关系、执行和检查点的配置等)。

获得StreamGraph之后,通过调用executeRemotely方法进行远程执行。该方法首先根据获取到的用户程序包的路径以及类路径创建加载用户代码的类加载器:

ClassLoader usercodeClassLoader = JobWithJars.buildUserCodeClassLoader(
jarFiles, globalClasspaths,   getClass().getClassLoader());

紧接着根据配置构建Client对象(Client对象是真正跟JobManager对接的内部代理):

Client client;
try {
    client = new Client(configuration);
    client.setPrintStatusDuringExecution(getConfig().isSysoutLoggingEnabled());
}catch (Exception e) {
   throw new ProgramInvocationException("Cannot establish connection to JobManager: " + e.getMessage(), e);
}

后面的事情就此被Client接管:

try {
    return client.runBlocking(streamGraph, jarFiles, globalClasspaths, usercodeClassLoader);
}catch (ProgramInvocationException e) {
    throw e;
}catch (Exception e) {
    String term = e.getMessage() == null ? "." : (": " + e.getMessage());
    throw new ProgramInvocationException("The program execution failed" + term, e);
}finally {
    client.shutdown();
}

client对象调用了runBlocking以阻塞式的行为“运行”用户程序并等待返回JobExecutionResult对象作为Job的执行结果。执行完成,最终在finally块中,调用shutdown方法关闭并释放资源。

runBlocking被调用后,调用链跳转到Client类中。为了适配多种提交方式以及运行模式,runBlocking方法有着非常多的重载。在当前的远程执行环境下,runBlocking在多个重载方法之间跳转的过程中,会调用getJobGraph方法获得JobGraph的实例。JobGraph表示Flink dataflow 程序,它将会被JobManager所理解并接收。在某个Job被提交给JobManager之前,通过Flink提供的高层次的API都将会被转化为JobGraph表示。关于如何获得JobGraph的实现,我们后面会进行剖析。这里,让我们忽视这些细节,进入下一个关键方法。

runBlocking_1其实是runBlocking方法的重载,这里加一个后缀标识,只是为了跟上面的runBlocking进行区别。runBlocking_1方法中,首先利用LeaderRetrievalUtils创建了LeaderRetrievalService这一服务对象:

LeaderRetrievalService leaderRetrievalService;
try {
    leaderRetrievalService = LeaderRetrievalUtils.createLeaderRetrievalService(config);
} catch (Exception e) {
    throw new ProgramInvocationException("Could not create the leader retrieval service.", e);
}

顾名思义,LeaderRetrievalService在Flink中提供查找主节点的服务。它会根据Flink的配置信息(主要是recovery.mode来判断基于哪种恢复机制来创建该服务。当前有两种模式:一种是Standalone的独立运行模式;另一种是基于Zookeeper的高可用模式)。Flink提供了一个称之为LeaderRetrievalListener的回调接口来获得主节点的信息。接下来,就是调用JobClientsubmitJobAndWait方法将产生的JobGraph以及主节点查找的服务对象等相关信息提交给JobManager并等待返回结果:

try {
    this.lastJobID = jobGraph.getJobID();
    return JobClient.submitJobAndWait(actorSystem, leaderRetrievalService, jobGraph,
                                        timeout, printStatusDuringExecution, classLoader);
} catch (JobExecutionException e) {
    throw new ProgramInvocationException("The program execution failed: " + e.getMessage(), e);
}

上面的submitJobAndWait方法的第一个参数actorSystemActorSystem的实例。在构造Client对象时创建,在Job提交并获得返回结果后通过调用Clientshutdown方法关闭:

public void shutdown() {
    if (!this.actorSystem.isTerminated()) {
        this.actorSystem.shutdown();
        this.actorSystem.awaitTermination();
    }
}

该方法的调用见上面executeRemotely方法的代码段的finally语句块。

JobClient的出现可能会让你产生疑惑——它跟Client是什么关系?作用是什么?下面这幅示意图可以用来解释这些疑问:

上面这幅图展示了Client对象与其他几个对象的关系。JobClient在其中起到了“桥接”作用,它在基于API的编程层面上桥接了同步的方法调用和异步的消息通信。更具体得说,JobClient可以看做是一个“静态类”提供了一些静态方法,这里我们主要关注上面的submitJobAndWait方法,该方法内部封装了Actor之间的异步通信(具体的通信对象是JobClientActor,它负责跟JobManagerActorSystemActor进行通信),并以阻塞的形式返回结果。而Client只需调用JobClient的这些方法,而无需关注其内部是如何实现的。

通过调用JobClient的静态方法submitJobAndWait,会触发基于AkkaActor之间的消息通信来完成后续的提交JobGraph的动作。JobClient提交Job的基于消息交互的抽象示意图如下:

总体来说这里总共有两个ActorSystem,一个归属于Client,另一个归属于JobManager。在submitJobAndWait方法中,其首先会创建一个JobClientActorActorRef

ActorRef jobClientActor = actorSystem.actorOf(jobClientActorProps);

然后向其发起一个SubmitJobAndWait消息,该消息将JobGraph的实例提交给jobClientActor。该消息的发起模式是ask,它表示需要一个应答消息。

JobClient向JobClientActor发送消息的代码段如下所示:

Future<Object> future = Patterns.ask(jobClientActor,
                                     new JobClientMessages.SubmitJobAndWait(jobGraph),
                                     new Timeout(AkkaUtils.INF_TIMEOUT()));
answer = Await.result(future, AkkaUtils.INF_TIMEOUT());

JobClient会阻塞等待该future返回结果。在得到返回结果answer之后,先进行解析判断它是Job被成功执行返回的结果还是失败返回的结果。

小结

至此,Client提交Streaming Job的关键方法调用路径已梳理完成。这里为了突出主路线,同时避免被太多的实现细节干扰,我们暂时忽略了一些重要数据结构和关键概念的解读。不过,后续我们会对它们进行分析。


微信扫码关注公众号:Apache_Flink


QQ扫码关注QQ群:Apache Flink学习交流群(123414680)

时间: 2024-08-25 10:47:13

Apache Flink流作业提交流程分析的相关文章

Apache Flink流分区器剖析

这篇文章介绍Flink的分区器,在流进行转换操作后,Flink通过分区器来精确得控制数据流向. StreamPartitioner StreamPartitioner是Flink流分区器的基类,它只定义了一个抽象方法: public abstract StreamPartitioner<T> copy(); 但这个方法并不是各个分区器之间互相区别的地方,定义不同的分区器的核心在于--各个分区器需要实现channel选择的接口方法: int[] selectChannels(T record,

Flink流式引擎技术分析--大纲

Flink简介 Flink组件栈 Flink特性 流处理特性 API支持 Libraries支持 整合支持 Flink概念 Stream.Transformation.Operator Parallel Dataflow Task.Operator Chain Window Time Flink架构 JobManager TaskManager Client Flink调度 逻辑调度 物理调度 Flink容错 Flink的集群部署 环境准备 集群安装 集群启动 案例测试 整体执行过程 数据流图的

Flink流处理之窗口算子分析

窗口算子WindowOperator是窗口机制的底层实现,它几乎会牵扯到所有窗口相关的知识点,因此相对复杂.本文将以由面及点的方式来分析WindowOperator的实现.首先,我们来看一下对于最常见的时间窗口(包含处理时间和事件时间)其执行示意图: 上图中,左侧从左往右为事件流的方向.方框代表事件,事件流中夹杂着的竖直虚线代表水印,Flink通过水印分配器(TimestampsAndPeriodicWatermarksOperator和TimestampsAndPunctuatedWaterm

Flink on Yarn模式启动流程分析

此文已由作者岳猛授权网易云社区发布. 欢迎访问网易云社区,了解更多网易技术产品运营经验. Flink On Yarn 架构 Paste_Image.png 前提条件首先需要配置YARN_CONF_DIR, HADOOP_CONF_DIR ,HADOOP_CONF_PATH其中一个用来确保Flink能够访问HDFS和Yarn的RM. 主要启动流程 1. 启动进程 首先我们通过下面的命令行启动flink on yarn的集群bin/yarn-session.sh -n 3 -jm 1024 -nm

Apache mina流程分析

Apache mina工作流介绍 apache mina的整体工作流程包含了几个重要的概念和组件,分别是IoService,IoProcessor,IoHandler和IoFilter,在弄清楚整体的运作流程之前需要先介绍下各个组件各自的作用. IoService 这个是mina请求接受器(Acceptor)以及连接器(Connector)的一个抽象的父类,作用就是提供连接和接受请求的服务. IoProcessor 请求处理器,负责请求的处理工作,包括监听事件的更改,filterChain的建立

Apache Flink 整体介绍

前言 Flink 是一种流式计算框架,为什么我会接触到 Flink 呢?因为我目前在负责的是监控平台的告警部分,负责采集到的监控数据会直接往 kafka 里塞,然后告警这边需要从 kafka topic 里面实时读取到监控数据,并将读取到的监控数据做一些 聚合/转换/计算 等操作,然后将计算后的结果与告警规则的阈值进行比较,然后做出相应的告警措施(钉钉群.邮件.短信.电话等).画了个简单的图如下: 目前告警这块的架构是这样的结构,刚进公司那会的时候,架构是所有的监控数据直接存在 ElasticS

Flink流计算随笔(1)

相比 Spark Stream.Kafka Stream.Storm 等,为什么阿里会选择 Flink 作为新一代流式计算引擎?前期经过了哪些调研和对比? 大沙:我们是 2015 年开始调研新一代流计算引擎的.我们当时的目标就是要设计一款低延迟.exactly once.流和批统一的,能够支撑足够大体量的复杂计算的引擎.Spark streaming 的本质还是一款基于 microbatch 计算的引擎.这种引擎一个天生的缺点就是每个 microbatch 的调度开销比较大,当我们要求越低的延迟

Apache Flink 是什么?

架构 Apache Flink 是一个框架和分布式处理引擎,用于在无边界和有边界数据流上进行有状态的计算.Flink 能在所有常见集群环境中运行,并能以内存速度和任意规模进行计算. 接下来,我们来介绍一下 Flink 架构中的重要方面. 处理无界和有界数据 任何类型的数据都可以形成一种事件流.信用卡交易.传感器测量.机器日志.网站或移动应用程序上的用户交互记录,所有这些数据都形成一种流. 数据可以被作为 无界 或者 有界 流来处理. 无界流 有定义流的开始,但没有定义流的结束.它们会无休止地产生

JobTracker启动流程源码级分析

org.apache.hadoop.mapred.JobTracker类是个独立的进程,有自己的main函数.JobTracker是在网络环境中提交及运行MR任务的核心位置. main方法主要代码有两句: 1 //创建jobTracker对象 2 JobTracker tracker = startTracker(new JobConf()); 3 //启动各个服务,包括JT内部一些重要的服务或者线程 4 tracker.offerService(); 一.startTracker(new Jo