Task执行内幕与结果处理解密

本课主题

  • Task执行内幕与结果处理解密

引言

这一章我们主要关心的是 Task 是怎样被计算的以及结果是怎么被处理的

  • 了解 Task 是怎样被计算的以及结果是怎么被处理的

Task 执行原理流程图

[下图是Task执行原理流程图]

  1. Executor 会通过 TaskRunner 在 ThreadPool 来运行具体的 Task,TaskRunner 内部会做一些准备的工作,例如反序例化 Task,然后通过网络获取需要的文件、Jar等
  2. 运行 Thread 的 run 方法,导致 Task 的 runTask 被调用来执行具体的业务逻辑处理
  3. 在Task 的 runTask内部会调用 RDD 的 iterator( ) 方法,该方法就是我们针对当前 Task 所对应的 Partition 进行计算的关键之所在,在处理内部会迭代 Partition 的元素并交给我们先定义的 Function 进行处理
    • ShuffleMapTask: ShuffleMapTask 在计算具体的 Partition 之后实际上会通过 ShuffleManager 获得的 ShuffleWriter 把当前 Task 计算的数据具体 ShuffleManger 的实现来写入到具体的文件。操作完成后会把 MapStatus 发送给 DAGScheduler; (把 MapStatus 汇报给 MapOutputTracker)
    • ResultTask: 根据前面 Stage 的执行结果进行 Shuffle 产生整个 Job 最后的结果;(MapOutputTracker 會把 ShuffleMapTask 執行結果交給 ResultTask)

Task 执行内幕源码解密

    1. 当 Driver 中的 CoarseGrainedSchedulerBackend 给 CoarseGrainedExecutorBackend 发送 LaunchTask 之后,CoarseGrainedExecutorBackend 在收到 LaunchTask 消息后,首先会判断一下有没有 Executor,没有的话直接退出和打印出提示信息,有的话会反序例化 TaskDescription,在执行具体的业务逻辑前会进行3次反序例化,第一个是 taskDescription,第二个是任务 Task 本身进行反序例化,还有的是RDD 的反序例化。
      [下图是 CoarseGrainedExecutorBackend.scala 接收 LaunchTask case class 信息后的逻辑]

      然后再发 LaunchTask 消息,里面会创建一个 TaskRunner,然后把它交给一个 runningTasks 的数据结构中,然后交给线程池去执行 Thread Pool。
      [下图是 Executor.scala 中的 launchTask 方法]
    2. Executor 会通过 TaskRunner 在ThreadPool 来运行具体的 Task,在 TaskRunner 的 run( )方法中首先会通过调用 stateUpdate 给 Driver 发信息汇报自己的状态,说明自己的RUNNING 状态。
      [下图是 Executor.scala 中的 TaskRunner 类]

      [下图是 Executor.scala 中的 run 方法]

      [下图是 ExecutorBackend.scala 中的 statusUpdate 方法]
    3. TaskRunner 内部会做一些准备的工作,例如反序例化 Task 的依赖,这个反序例化得出一个 Tuple,然后通过网络获取需要的文件、Jar等;
      [下图是在 Executor.scala 中 run 方法内部具体的代码实现]
    4. 在同一个 Stage 的内部需要共享资源。在同一个 Stage 中我们 ExecutorBackend 会有很多并发线程,此时它们所依赖的 Jar 跟文件肯定是一样的,每一个 TaskRunner 运行的时候都会运行在线程中,这个方法会被多个线程去调,所以线程需要一个加锁,而这个方法是有全区中的。这主要是要防止资源竞争。下载一切这个 Task 需要的 Jar 文件,我们通 Executor 在不同的线程中共享全区资源。
      [下图是 Executor.scala 中的 updateDependencies 方法]
    5. 在 Task 的 runTask 内部会调用 RDD 的 iterator( ) 方法,该方法就是我们针对当前 Task 所对应的 Partition 进行计算的关键之所在,在处理内部会迭代 Partition 的元素并交给我们先定义的 Function 进行处理对于 ShuffleMapTask,首先要对 RDD 以及其他的依赖关系进行反序例化:
      [下图是 Executor.scala 中 run 方法内部具体的代码实现]

      [下图是 Task.scala 中的 run 方法]

      因为 Task 是一个 abstract class,它的子类是 ShuffleMapTask 或者是 ResultsMapTask,是乎我们当前的 Task 是那个类型。
      [下图是 ShuffleMapTask.scala 中的 runTask 方法]

      [下图是 RDD.scala 中的 iterator 方法]

      [下图是 RDD.scala 中的 computeOrReadCheckpoint 方法]

      最终计算会调用 RDD 的 compute 的方法具体计算的时候有具体的 RDD,例如 MapPartitionsRDD.compute,其中的 f 就是在当前 Stage 计算具体 Partition 的业务逻辑代码。
      [下图是 RDD.scala 中的 compute 方法]

      [下图是 MapPartitionsRDD.scala 中的 compute 方法]
    6. 调用反序例化后的 Task.run 方法来执行任务并获得执行结果,其中 Task 的 run 方法调用的时候会导致 Task 的抽象方法 runTask 的调用
      [下图是 Executor.scala 中 run 方法内部具体的代码实现]

    7. 把执行结果序例化
      [下图是 Executor.scala 中 run 方法内部具体的代码实现]
    8. 运行 Thread 的 run 方法,导致 Task 的 runTask 被调用来执行具体的业务逻辑处理
    9. 对于 ResultTask
      [下图是 ResultsMapTask.scala 中的 runTask 方法]
    10. 在 Spark 中 AkaFrameSize 是 128MB,所以可以扩播非常大的任务,而任务
    11. 并根据大小判断不同的结果传回给 Driver 的方式
    12. CoraseGrainedExectorBackend 给 DriverEndpoint 发送 StatusUpdate 来传执行结果
      [下图是 Executor.scala 中 run 方法内部具体的代码实现]

      [下图是 CoraseGrainedExectorBackend.scala 中 statusUpdate 方法]

      [下图是 DriveEndPoint.scala 中 receive 方法]
    13. DriverEndpoint 会把执行结果传给 TaskSchedulerImpl 处理,然后交给 TaskResultGetter 去分别处理执行成功和失败时候的不同情况,然后告X DAGScheduler 任务处理结的情况重
      [下图是 TaskSchedulerImpl.scala 中 statusUpdate 方法]

      [下图是 TaskResultsGetter.scala 中 handleSuccessfulTask 方法]

      [下图是 TaskSchedulerImpl.scala 中 handleSuccessfulTask 方法]

      [下图是 TaskSetManager.scala 中 handleSuccessfulTask 方法]

原文地址:https://www.cnblogs.com/sky-sql/p/9079039.html

时间: 2024-11-05 16:40:01

Task执行内幕与结果处理解密的相关文章

第三十七课 Spark之Task执行原理及结果

主要内容 1.     Task执行原理流程图 2.     Task执行源码 3.     Task执行结果在Driver端的处理 一.Task在Executor(worker)端执行及返回Driver流程图 图37-1 Driver端与Executor交互图 二.Executor(worker)端执行源码解析 1.接收Driver端发来的消息 当Driver中的SchedulerBackend给ExecutorBackend发送LaunchTask之后,ExecutorBackend在接收到

Apache Spark-1.0.0浅析(六):资源调度——Task执行

前面说到向executorActor(task.executorID)发送LaunchTask(task)消息,在CoarseGrainedExecutorBackend中定义receive接收launchTask消息,执行executor.launchTask override def receive = { case RegisteredExecutor(sparkProperties) => logInfo("Successfully registered with driver&q

Spark修炼之道(高级篇)——Spark源码阅读:第八节 Task执行

Task执行 在上一节中,我们提到在Driver端CoarseGrainedSchedulerBackend中的launchTasks方法向Worker节点中的Executor发送启动任务命令,该命令的接收者是CoarseGrainedExecutorBackend(Standalone模式),类定义源码如下: private[spark] class CoarseGrainedExecutorBackend( override val rpcEnv: RpcEnv, driverUrl: St

【Spark2.0源码学习】-10.Task执行与回馈

通过上一节内容,DriverEndpoint最终生成多个可执行的TaskDescription对象,并向各个ExecutorEndpoint发送LaunchTask指令,本节内容将关注ExecutorEndpoint如何处理LaunchTask指令,处理完成后如何回馈给DriverEndpoint,以及整个job最终如何多次调度直至结束. 一.Task的执行流程      承接上一节内容,Executor接受LaunchTask指令后,开启一个新线程TaskRunner解析RDD,并调用RDD的

task执行结果查看

使用debug输出task执行的register: - name: check extract session # script: /app/ansiblecfg/XXX/roles/test/tasks/psgrep.sh > /home/shdxspark/psgrep.log script: /app/ansiblecfg/XXX/roles/test/tasks/psgrep.sh register: psgrep_output #通过debug方式打印psgrep.sh脚本的返回结果p

[转]C# 线程知识--使用Task执行异步操作

C# 线程知识--使用Task执行异步操作 ??????在C#4.0之前需要执行一个复杂的异步操作时,只能使用CLR线程池技术来执行一个任务.线程池执行异步任务时,不知道任务何时完成,以及任务的在任务完成后不能获取到返回值.但是在C#4.0中引人了一个的任务(System.Threading.Tasks命名空间的类型)机制来解决异步操作完成时间和完成后返回值的问题. 1.使用Task类创建并执行简单任务 ??? 通过使用Task的构造函数来创建任务,并调用Start方法来启动任务并执行异步操作.

Execute Sql Task 执行有参数的存储过程

Execute Sql Task 执行有参数的存储过程时,传递参数的方式是不同的,根据使用链接的不同,主要分为两种:OleDB和Ado.Net. create dbo.test (id int) CREATE PROCEDURE dbo.usp_AddItem @id int AS BEGIN SET NOCOUNT ON; insert into dbo.test(id) values(@id) END 1,如果ConnectionType是Oledb,那么使用?代表参数名字,?的序号是从0,

C# 线程知识--使用Task执行异步操作

来源:https://www.cnblogs.com/pengstone/archive/2012/12/23/2830238.html 在C#4.0之前需要执行一个复杂的异步操作时,只能使用CLR线程池技术来执行一个任务.线程池执行异步任务时,不知道任务何时完成,以及任务的在任务完成后不能获取到返回值.但是在C#4.0中引人了一个的任务(System.Threading.Tasks命名空间的类型)机制来解决异步操作完成时间和完成后返回值的问题. 1.使用Task类创建并执行简单任务 通过使用T

Execute Package Task执行失败,而Child Package执行成功

今天Execute 一个 Execute Package Task 去调用其他packages,发现Task报错,而Child Package执行成功. Execute Package Task抛出的错误信息是: Warning: SSIS Warning Code DTS_W_MAXIMUMERRORCOUNTREACHED.  The Execution method succeeded, but the number of errors raised (1) reached the max