Apache Spark-1.0.0源码浅析(三 ):作业提交

RDD的操作可以分为Transformations和Actions,Transformations是lazy的不立即执行,Action则会触发作业的提交和执行。例如本例中的foreach

def foreach(f: T => Unit) {
  sc.runJob(this, (iter: Iterator[T]) => iter.foreach(f))
}

一句话,Actions调用sc.runJob触发作业运行。

SparkContext中的runJob有多个版本的重载

foreach调用的版本,以rdd和func为参数,返回执行的结果

/**
   * Run a job on all partitions in an RDD and return the results in an array.
   */
  def runJob[T, U: ClassTag](rdd: RDD[T], func: Iterator[T] => U): Array[U] = {
    runJob(rdd, func, 0 until rdd.partitions.size, false)
  }

然后,进入下一个runJob,加入参数partitions和allowLocal

/**
   * Run a job on a given set of partitions of an RDD, but take a function of type
   * `Iterator[T] => U` instead of `(TaskContext, Iterator[T]) => U`.
   */
  def runJob[T, U: ClassTag](
      rdd: RDD[T],
      func: Iterator[T] => U,
      partitions: Seq[Int],
      allowLocal: Boolean
      ): Array[U] = {
    runJob(rdd, (context: TaskContext, iter: Iterator[T]) => func(iter), partitions, allowLocal)
  }

之后调用下一个runJob,将结果返回到result数组中

/**
   * Run a function on a given set of partitions in an RDD and return the results as an array. The
   * allowLocal flag specifies whether the scheduler can run the computation on the driver rather
   * than shipping it out to the cluster, for short actions like first().
   */
  def runJob[T, U: ClassTag](
      rdd: RDD[T],
      func: (TaskContext, Iterator[T]) => U,
      partitions: Seq[Int],
      allowLocal: Boolean
      ): Array[U] = {
    val results = new Array[U](partitions.size)
    runJob[T, U](rdd, func, partitions, allowLocal, (index, res) => results(index) = res)
    results
  }

最后调用,参数中加入resultHandler句柄

/**
   * Run a function on a given set of partitions in an RDD and pass the results to the given
   * handler function. This is the main entry point for all actions in Spark. The allowLocal
   * flag specifies whether the scheduler can run the computation on the driver rather than
   * shipping it out to the cluster, for short actions like first().
   */
  def runJob[T, U: ClassTag](
      rdd: RDD[T],
      func: (TaskContext, Iterator[T]) => U,
      partitions: Seq[Int],
      allowLocal: Boolean,
      resultHandler: (Int, U) => Unit) {
    if (dagScheduler == null) {
      throw new SparkException("SparkContext has been shutdown")
    }
    val callSite = getCallSite
    val cleanedFunc = clean(func)
    logInfo("Starting job: " + callSite)
    val start = System.nanoTime
    dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, allowLocal,
      resultHandler, localProperties.get)
    logInfo("Job finished: " + callSite + ", took " + (System.nanoTime - start) / 1e9 + " s")
    rdd.doCheckpoint()
  }

sc.runJob最终调用dagScheduler.runJob。

需要提到的一点是

val cleanedFunc = clean(func)

其作用在注释中

/**
   * Clean a closure to make it ready to serialized and send to tasks
   * (removes unreferenced variables in $outer‘s, updates REPL variables)
   */
  private[spark] def clean[F <: AnyRef](f: F): F = {
    ClosureCleaner.clean(f)
    f
  }

END

时间: 2025-01-07 19:21:34

Apache Spark-1.0.0源码浅析(三 ):作业提交的相关文章

ArrayList类源码浅析(三)

1.看一个示例 运行上述代码,抛出一个异常: 这是一个典型的并发修改异常,如果把上述代码中的125行注释,把126行打开,运行就能通过了: 原因: 1)因为在迭代的时候,使用的是Itr类的对象,在调用hasNext()方法的时候,只要cursor和szie不相等就返回true: 2)在Itr类中存在一个属性字段:expectedModCount,每次调用next()方法的时候都会检查expectedModCount和modCount是否相等, 如果不相等,就会抛出异常: 3)调用Itr类的rem

【Spark】DAGScheduler源码浅析2

引入 上一篇文章DAGScheduler源码浅析主要从提交Job的流程角度介绍了DAGScheduler源码中的重要函数和关键点,这篇DAGScheduler源码浅析2主要参考fxjwind的Spark源码分析 – DAGScheduler一文,介绍一下DAGScheduler文件中之前没有介绍的几个重要函数. 事件处理 在Spark 1.0版本之前,在DAGScheduler类中加入eventQueue私有成员,设置eventLoop Thread循环读取事件进行处理.在Spark 1.0源码

Spark 1.0.1源码安装

apache 网站上面已经有了已经构建好了的版本,我这里还是自己利用午休时间重新构建一下(jdk,python,scala的安装就省略了,自己可以去安装) http://www.apache.org/dist/spark/spark-1.0.1/ 具体官网的下载链接可以去这里 我下载的是http://www.apache.org/dist/spark/spark-1.0.1/spark-1.0.1.tgz源码包 下载对应的linux服务器上面,然后解压 wget http://www.apach

Centos 7.0 编译安装LAMP(Linxu+apache+mysql+php)之源码安装Mysql (二)

mysql 简介: MySQL是一个关系型数据库管理系统,关系数据库将数据保存在不同的表中,这样就增加了速度并提高了灵活性.目前其属于 Oracle 旗下产品.MySQL 是最流行的关系型数据库管理系统之一,在 WEB 应用方面,MySQL是最好的 RDBMS (Relational Database Management System,关系数据库管理系统) 应用软件.MySQL所使用的 SQL 语言是用于访问数据库的最常用标准化语言. 安装环境: 系统: centos 7.0 最小化安装 软件

springmvc源码浅析(基于spring3.1.0)

请求处理过程:通过url找到对应Controller类中处理请求的方法,执行方法返回结果视图的过程.大致分为三个步骤: 其一,ApplicationContext初始化时建立所有url和controller类的对应关系(用Map保存); 其二,根据请求url找到对应的controller,并从controller中找到处理请求的方法; 其三,执行方法处理请求,并返回结果视图. 我们首先看第一个步骤,也就是建立Map<url,controller>关系的部分.第一部分的入口类为Applicati

Centos 7.0 编译安装LAMP(Linxu+apache+mysql+php)之源码安装php (三)

PHP简介: PHP(外文名:PHP: Hypertext Preprocessor,中文名:"超文本预处理器")是一种通用开源脚本语言.语法吸收了C语言.Java和Perl的特点,利于学习,使用广泛,主要适用于Web开发领域.PHP 独特的语法混合了C.Java.Perl以及PHP自创的语法.它可以比CGI或者Perl更快速地执行动态网页.用PHP做出的动态页面与其他的编程语言相比,PHP是将程序嵌入到HTML(标准通用标记语言下的一个应用)文档中去执行,执行效率比完全生成HTML标

【Spark Core】任务执行机制和Task源码浅析2

引言 上一小节<任务执行机制和Task源码浅析1>介绍了Executor的注册过程. 这一小节,我将从Executor端,就接收LaunchTask消息之后Executor的执行任务过程进行介绍. 1. Executor的launchTasks函数 DriverActor提交任务,发送LaunchTask指令给CoarseGrainedExecutorBackend,接收到指令之后,让它内部的executor来发起任务,即调用空闲的executor的launchTask函数. 下面是Coars

【Spark Core】TaskScheduler源码与任务提交原理浅析2

引言 上一节<TaskScheduler源码与任务提交原理浅析1>介绍了TaskScheduler的创建过程,在这一节中,我将承接<Stage生成和Stage源码浅析>中的submitMissingTasks函数继续介绍task的创建和分发工作. DAGScheduler中的submitMissingTasks函数 如果一个Stage的所有的parent stage都已经计算完成或者存在于cache中,那么他会调用submitMissingTasks来提交该Stage所包含的Tas

【Spark】Stage生成和Stage源码浅析

引入 上一篇文章<DAGScheduler源码浅析>中,介绍了handleJobSubmitted函数,它作为生成finalStage的重要函数存在,这一篇文章中,我将就DAGScheduler生成Stage过程继续学习,同时介绍Stage的相关源码. Stage生成 Stage的调度是由DAGScheduler完成的.由RDD的有向无环图DAG切分出了Stage的有向无环图DAG.Stage的DAG通过最后执行的Stage为根进行广度优先遍历,遍历到最开始执行的Stage执行,如果提交的St

自己动手编译apache-tomcat-6.0.41-src源码

第一步:下载apache-tomcat-6.0.41-src 第二步:阅读BUILDING.txt.了解所需要的步骤. In order to build a binary distribution version of Apache Tomcat from a source distribution, do the following: (0) Download and Install a Java Development Kit 因为tomcat依赖于apache dbcp 源码进行编译,而