spark(二):spark架构及物理执行图


上图是一个job的提交流程图,job提交的具体步骤如下

  1. 一旦有action,就会触发DagScheduler.runJob来提交任务,主要是先生成逻辑执行图DAG,然后调用 finalStage = newStage() 来划分 stage。
  2. new Stage() 的时候会调用 finalRDD 的 getParentStages();
  3. getParentStages() 从 finalRDD 出发,反向 visit 逻辑执行图,遇到 NarrowDependency 就将依赖的 RDD 加入到 stage,遇到 ShuffleDependency 切开 stage,并递归到 ShuffleDepedency 依赖的 stage。
  4. 一个 ShuffleMapStage(不是最后形成 result 的 stage)形成后,会将该 stage 最后一个 RDD 注册到MapOutputTrackerMaster.registerShuffle(shuffleDep.shuffleId, rdd.partitions.size),这一步很重要,因为 shuffle 过程需要 MapOutputTrackerMaster 来指示 ShuffleMapTask 输出数据的位置。
  5. 之后是submitStage(finalStage)
  6. 先确定该 stage 的 missingParentStages,使用getMissingParentStages(stage)。如果 parentStages 都可能已经执行过了,那么就为空了。
  7. 如果 missingParentStages 不为空,那么先递归提交 missing 的 parent stages,并将自己加入到 waitingStages 里面,等到 parent stages 执行结束后,会触发提交 waitingStages 里面的 stage。
  8. 如果 missingParentStages 为空,说明该 stage 可以立即执行,那么就调用submitMissingTasks(stage, jobId)来生成和提交具体的 task。如果 stage 是 ShuffleMapStage,那么 new 出来与该 stage 最后一个 RDD 的 partition 数相同的 ShuffleMapTasks。如果 stage 是 ResultStage,那么 new 出来与 stage 最后一个 RDD 的 partition 个数相同的 ResultTasks。一个 stage 里面的 task 组成一个 TaskSet,最后调用taskScheduler.submitTasks(taskSet)来提交一整个 taskSet。
  9. taskScheduler会把task发给DriverActor进程,DriverActor序列话之后发给exector真正执行。


上图是task执行流程,具体执行过程如下

  1. Worker 端接收到 tasks 后,executor 将 task 包装成 taskRunner,并从线程池中抽取出一个空闲线程运行 task。
  2. Executor 收到 serialized 的 task 后,先 deserialize 出正常的 task,然后运行 task 得到其执行结果 directResult,这个结果要送回到 driver 那里。
  3. 如果 result 比较大(比如 groupByKey 的 result)先把 result 存放到本地的“内存+磁盘”上,由 blockManager 来管理,只把存储位置信息(indirectResult)发送给 driver。
  4. ShuffleMapTask 生成的是 MapStatus,MapStatus 包含两项内容:一是该 task 所在的 BlockManager 的 BlockManagerId(实际是 executorId + host, port, nettyPort),二是 task 输出的每个 FileSegment 大小。
  5. ResultTask 生成的 result 的是 func 在 partition 上的执行结果。**比如 count() 的 func 就是统计 partition 中 records 的个数。
  6. Driver 收到 task 的执行结果 result 后会进行一系列的操作:
  7. a,首先告诉 taskScheduler 这个 task 已经执行完,然后去分析 result。
  8. b,如果是 ResultTask 的 result,那么可以使用 ResultHandler 对 result 进行 driver 端的计算(比如 count() 会对所有 ResultTask 的 result 作 sum)
  9. c,如果 result 是 ShuffleMapTask 的 MapStatus,那么需要将 MapStatus(ShuffleMapTask 输出的 FileSegment 的位置和大小信息)存放到 mapOutputTrackerMaster 中的 mapStatuses 数据结构中以便以后 reducer shuffle 的时候查询
  10. d,如果 driver 收到的 task 是该 stage 中的最后一个 task,那么可以 submit 下一个 stage,如果该 stage 已经是最后一个 stage,那么告诉 dagScheduler job 已经完成

原文地址:https://blog.51cto.com/4876017/2382943

时间: 2024-10-16 12:37:33

spark(二):spark架构及物理执行图的相关文章

Job 逻辑执行图

General logical plan 典型的 Job 逻辑执行图如上所示,经过下面四个步骤可以得到最终执行结果: 从数据源(可以是本地 file,内存数据结构, HDFS,HBase 等)读取数据创建最初的 RDD.上一章例子中的 parallelize() 相当于 createRDD(). 对 RDD 进行一系列的 transformation() 操作,每一个 transformation() 会产生一个或多个包含不同类型 T 的 RDD[T].T 可以是 Scala 里面的基本类型或数

【转】Spark架构与作业执行流程简介

原文链接 http://www.cnblogs.com/shenh062326/p/3658543.html Spark架构与作业执行流程简介 Local模式 运行Spark最简单的方法是通过Local模式(即伪分布式模式). 运行命令为:./bin/run-example org.apache.spark.examples.SparkPi local 基于standalone的Spark架构与作业执行流程 Standalone模式下,集群启动时包括Master与Worker,其中Master负

从物理执行角度透视Spark Job(23)

一.再次思考pipeline 即使采用pipeline的方式,函数f对依赖的RDD中的数据集合的操作也会有两种方式: 1, f(record),f作用于集合的每一条记录,每次只作用于一条记录: 2, f(records),f一次性作用于集合的全部数据: Spark采用是是第一种方式,原因: 1, 无需等待,可以最大化的使用集群的计算资源: 2, 减少OOM的发生: 3, 最大化的有利于并发: 4, 可以精准的控制每一Partition本身(Dependency)及其内部的计算(compute):

spark(一):spark概览及逻辑执行图

上图是spark框架概要图,spark一些重要概念先简要介绍一下: cluster manager:资源管理集群,比如standalone.yarn: application:用户编写的应用程序: Driver:application中的main函数,创建的SparkContext负责与cluster manager通信,进行资源的申请.任务的分配与监控.一般认为SparkContext就是Driver: worker:集群中可以运行任务的节点: executor:worker上运行任务的进程,

Spark Job具体的物理执行

即使采用pipeline的方式,函数f对依赖的RDD中的数据集合的操作也会有两种方式: 1.f(record),f作用于集合的每一条记录,每次只作用于一条记录 2.f(records),f一次性作用于集合的全部数据: Spark采用的是第一种方式,因为: 1.无需等待,可以最大化的使用集群的计算资源 2.减少OOM的产生 3.最大化的有利于并发 4.可以精准的控制每一个Partition本身(Dependency)及其内部的计算(compute) 5.基于lineage的算子流动式函数式计算,可

从物理执行的角度透视Spark Job(DT大数据梦工厂)

内容: 1.再次思考pipeline: 2.窄依赖物理执行内幕: 3.宽依赖物理执行内幕: 4.Job提交流程: 物理执行是更深层次的角度. ==========再次思考pipeline ============ 即使采用pipeline的方式,函数f对依赖的RDD中的数据集合的操作也会有两种方式: 1.f(record),f作用于集合的每一条记录,每次只作用于一条记录: 2.f(records), f一次性作用于集合的全部数据: Spark运行的时候用的是第一种方式,为什么呢? 1.无需等待,

Spark 教程:Spark的体系架构

最近看到一篇关于Spark架构的博文,作者是 Alexey Grishchenko.看过Alexey博文的同学应该都知道,他对Spark理解地非常深入,读完他的 “spark-architecture” 这篇博文,有种醍醐灌顶的感觉,从JVM内存分配到Spark集群的资源管理,步步深入,感触颇多(腾云科技ty300.com).因此,在周末的业余时间里,将此文的核心内容译成中文,并在这里与大家分享.如在翻译过程中有文字上的表达纰漏,还请大家指出. 首先来看一张Spark 1.3.0 官方给出的图片

3.spark streaming Job 架构和容错解析

一.Spark streaming Job 架构 SparkStreaming框架会自动启动Job并每隔BatchDuration时间会自动触发Job的调用. Spark Streaming的Job 分为两大类: 每隔BatchInterval时间片就会产生的一个个Job,这里的Job并不是Spark Core中的Job,它只是基于DStreamGraph而生成的RDD的DAG而已:从Java角度讲相当于Runnable接口的实现类,要想运行Job需要将Job提交给JobScheduler,在J

Spark(一): 基本架构及原理

Apache Spark是一个围绕速度.易用性和复杂分析构建的大数据处理框架,最初在2009年由加州大学伯克利分校的AMPLab开发,并于2010年成为Apache的开源项目之一,与Hadoop和Storm等其他大数据和MapReduce技术相比,Spark有如下优势: Spark提供了一个全面.统一的框架用于管理各种有着不同性质(文本数据.图表数据等)的数据集和数据源(批量数据或实时的流数据)的大数据处理的需求 官方资料介绍Spark可以将Hadoop集群中的应用在内存中的运行速度提升100倍