从物理执行的角度透视Spark Job(DT大数据梦工厂)

内容:

1、再次思考pipeline;

2、窄依赖物理执行内幕;

3、宽依赖物理执行内幕;

4、Job提交流程;

物理执行是更深层次的角度。

==========再次思考pipeline ============

即使采用pipeline的方式,函数f对依赖的RDD中的数据集合的操作也会有两种方式:

1、f(record),f作用于集合的每一条记录,每次只作用于一条记录;

2、f(records), f一次性作用于集合的全部数据;

Spark运行的时候用的是第一种方式,为什么呢?

1、无需等待,可以最大化的使用集群的计算资源;

2、减少OOM的发生;

3、最大化的有利于并发;

4、可以精准的控制每一个partition本身(Dependency)极其内部的计算(compute);

5、基于lineage的算子流动式函数式编程,节省了中间结果的产生,并且可以最快的恢复;

疑问:会不会增加网络通信?

当然不会!因为在pipeline!把很多算子合并成一个算子,在一个stage。

==========思考Spark Job的物理执行============

Spark Application里面可以产生一个或者多个Job,例如Spark Shell默认启动的时候内部就没有Job,只是作为资源的分配程序,可以在Spark Shell里面写代码产生若干个Job。普通程序中一般而言可以有不同的Action,每一个Action一般也会触发一个Job。

Spark是MapReduce思想的一种更加精致和高效的实现。MapReduce有很多具体不同的实现。

例如:Hadoop的MapReduce基本的计算流程如下:首先是以JVM为对象的并发执行的Mapper,Mapper中的map的执行会产生输出数据,输出的数据会经由partitioner指定的规则放到LocalFile System中,然后再经由Shuffle、Sort、Aggregate变成reducer中的reduce的输入,执行reduce产生最终的执行结果。Hadoop MapReduce执行的流程虽然简单,但是过于死板,尤其是在构造复杂(迭代)算法的时候,非常不利于算法的实现,且执行效率极为低下。

Spark算法构造和物理执行时最最基本的核心:最大化pipeline。pipeline 越多,复用越好。基于pipeline的思想是数据被使用的时候才开始计算。从数据流动流动角度来说,是数据流动到计算的位置!!!实质上从逻辑的角度来看,是算子在数据上流动。

从算法构建角度而言,肯定是算子作用于数据,所以是算子在数据上流动,有利于构建算法。从物理执行的角度而言,是数据流动到计算的位置,有利于系统最高效的计算。

数据要流动到计算的位置,那这个位置在哪个地方?

对于pipeline而言,数据计算的位置就是每个stage位置中最后的RDD。就算一个stage有5000个步骤,真正的计算,也还是在第5000步计算(比如第5000个函数!!!),发给executor之前,所有的算子已经合并成1个了。如果第5步需要cache一下,是系统自己的设计,和前面一段话无关。

一个震撼人心的内幕真相就是:每个stage,除了最后一个RDD算子是真实的之外,前面的算子都是假的

由于计算的lazy特性,导致计算从后往前回溯,形成Computing Chain,导致的结果就是:需要首先计算出具体一个stage内部最左侧的RDD中本次计算依赖的partition

如果stage没有parent stage的话,则stage从最左边的RDD立即执行,每次计算出的record都流入下一个函数。

计算步骤从后往前回溯,计算执行是从前往后。

==========窄依赖物理执行内幕 ============

一个Stage内部的RDD都是窄依赖,窄依赖从逻辑上看是从内幕最左侧的RDD开始立即计算的。

根据计算链条,数据从一个计算步骤流动到下一个计算步骤,以此类推,直到计算到Stage内部的最后一个RDD来产生计算结果。

Computing Chain构建是从后往前回溯而成的,而实际的物理计算则是让数据从前往后在算子上流动,直到流动到不能再流动为止才开始计算下一个record。

所有RDD里面的compute都是iterator来一步步执行。

后面的RDD对前面的RDD的依赖虽然是partition级别的依赖,但是并不需要父RDD把partition中所有的records计算完毕才整体往后流动数据进行计算,这就极大的提高了计算速率

/**
 * An RDD that applies the provided function to every partition of the parent RDD.
 */
private[spark] class MapPartitionsRDD[U: ClassTag, T: ClassTag](
    prev: RDD[T],
    f: (TaskContext, Int, Iterator[T]) => Iterator[U],  // (TaskContext, partition index, iterator)
    preservesPartitioning: Boolean = false)
  extends RDD[U](prev) {

override val partitioner = if (preservesPartitioning) firstParent[T].partitioner else None

override def getPartitions: Array[Partition] = firstParent[T].partitions

override def compute(split: Partition, context: TaskContext): Iterator[U] =
    f(context, split.index, firstParent[T].iterator(split, context))
}

==========宽依赖物理执行内幕 ============

必须等到依赖的父Stage中的最后一个RDD把全部数据彻底计算完毕才能够经过shuffle来计算当前的Stage。

这样写代码的时候尽量避免宽依赖!!!

/**
 * Implemented by subclasses to return how this RDD depends on parent RDDs. This method will only
 * be called once, so it is safe to implement a time-consuming computation in it.
 */
protected def getDependencies: Seq[Dependency[_]] = deps

compute负责接受父Stage的数据流,计算出record

==========Job提交流程 ============

作业提交,触发Action

/**
 * Run a function on a given set of partitions in an RDD and pass the results to the given
 * handler function. This is the main entry point for all actions in Spark.
 */
def runJob[T, U: ClassTag](
    rdd: RDD[T],
    func: (TaskContext, Iterator[T]) => U,
    partitions: Seq[Int],
    resultHandler: (Int, U) => Unit): Unit = {
  if (stopped.get()) {
    throw new IllegalStateException("SparkContext has been shutdown")
  }
  val callSite = getCallSite
  val cleanedFunc = clean(func)
  logInfo("Starting job: " + callSite.shortForm)
  if (conf.getBoolean("spark.logLineage", false)) {
    logInfo("RDD‘s recursive dependencies:\n" + rdd.toDebugString)
  }
  dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
  progressBar.foreach(_.finishAll())
  rdd.doCheckpoint()
}

/**
 * Run an action job on the given RDD and pass all the results to the resultHandler function as
 * they arrive.
 *
 * @param rdd target RDD to run tasks on
 * @param func a function to run on each partition of the RDD
 * @param partitions set of partitions to run on; some jobs may not want to compute on all
 *   partitions of the target RDD, e.g. for operations like first()
 * @param callSite where in the user program this job was called
 * @param resultHandler callback to pass each result to
 * @param properties scheduler properties to attach to this job, e.g. fair scheduler pool name
 *
 * @throws Exception when the job fails
 */
def runJob[T, U](
    rdd: RDD[T],
    func: (TaskContext, Iterator[T]) => U,
    partitions: Seq[Int],
    callSite: CallSite,
    resultHandler: (Int, U) => Unit,
    properties: Properties): Unit = {
  val start = System.nanoTime
  val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)
  waiter.awaitResult() match {
    case JobSucceeded =>
      logInfo("Job %d finished: %s, took %f s".format
        (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
    case JobFailed(exception: Exception) =>
      logInfo("Job %d failed: %s, took %f s".format
        (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
      // SPARK-8644: Include user stack trace in exceptions coming from DAGScheduler.
      val callerStackTrace = Thread.currentThread().getStackTrace.tail
      exception.setStackTrace(exception.getStackTrace ++ callerStackTrace)
      throw exception
  }
}

/**
 * Submit an action job to the scheduler.
 *
 * @param rdd target RDD to run tasks on
 * @param func a function to run on each partition of the RDD
 * @param partitions set of partitions to run on; some jobs may not want to compute on all
 *   partitions of the target RDD, e.g. for operations like first()
 * @param callSite where in the user program this job was called
 * @param resultHandler callback to pass each result to
 * @param properties scheduler properties to attach to this job, e.g. fair scheduler pool name
 *
 * @return a JobWaiter object that can be used to block until the job finishes executing
 *         or can be used to cancel the job.
 *
 * @throws IllegalArgumentException when partitions ids are illegal
 */
def submitJob[T, U](
    rdd: RDD[T],
    func: (TaskContext, Iterator[T]) => U,
    partitions: Seq[Int],
    callSite: CallSite,
    resultHandler: (Int, U) => Unit,
    properties: Properties): JobWaiter[U] = {
  // Check to make sure we are not launching a task on a partition that does not exist.
  val maxPartitions = rdd.partitions.length
  partitions.find(p => p >= maxPartitions || p < 0).foreach { p =>
    throw new IllegalArgumentException(
      "Attempting to access a non-existent partition: " + p + ". " +
        "Total number of partitions: " + maxPartitions)
  }

val jobId = nextJobId.getAndIncrement()
  if (partitions.size == 0) {
    // Return immediately if the job is running 0 tasks
    return new JobWaiter[U](this, jobId, 0, resultHandler)
  }

assert(partitions.size > 0)
  val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
  val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)
  eventProcessLoop.post(JobSubmitted(
    jobId, rdd, func2, partitions.toArray, callSite, waiter,
    SerializationUtils.clone(properties)))
  waiter
}

作业:

写一下我理解中的spark job物理执行。

时间: 2024-08-23 16:55:42

从物理执行的角度透视Spark Job(DT大数据梦工厂)的相关文章

底层战详解使用Java开发Spark程序(DT大数据梦工厂)

Scala开发Spark很多,为什么还要用Java开发原因:1.一般Spark作为数据处理引擎,一般会跟IT其它系统配合,现在业界里面处于霸主地位的是Java,有利于团队的组建,易于移交:2.Scala学习角度讲,比Java难.找Scala的高手比Java难,项目的维护和二次开发比较困难:3.很多人员有Java的基础,确保对Scala不是很熟悉的人可以编写课程中的案例预测:2016年Spark取代Map Reduce,拯救HadoopHadoop+Spark = A winning combat

DT大数据梦工厂第三十五课 Spark系统运行循环流程

本节课内容: 1.     TaskScheduler工作原理 2.     TaskScheduler源码 一.TaskScheduler工作原理 总体调度图: 通过前几节课的讲解,RDD和DAGScheduler以及Worker都已有深入的讲解,这节课我们主要讲解TaskScheduler的运行原理. 回顾: DAGScheduler面向整个Job划分多个Stage,划分是从后往前的回溯过程:运行时从前往后运行的.每个Stage中有很多任务Task,Task是可以并行执行的.它们的执行逻辑完

DT大数据梦工厂Spark机器学习相关视频资料

大数据未来几年发展的重点方向,大数据战略已经在十八届五中全会上作为重点战略方向,中国在大数据方面才刚刚起步,但是在美国已经产生了上千亿的市场价值.举个例子,美国通用公司是一个生产飞机发动机的一个公司,这家公司在飞机发动机的每一个零部件上都安装了传感器,这些传感器在飞机发动机运作的同时不断的把发动机状态的数据传到通用公司的云平台上,通用公司又有很多数据分析中心专门接受这些数据,根据大数据的分析可以随时掌握每一家航空公司发动机的飞行状况,可以告知这些航空公司发动机的哪些部件需要检修或保养,避免飞机事

IDEA下Spark的开发(DT大数据梦工厂)

IDEA越使用效果越好,快捷键方便,阅读源码方便 一般阅读Spark或者Scala的源码都采用IDEA使用 下载IDEA最新版本的社区版本即可, 安装的时候必须安装Scala,这个过程是IDEA自动化的插件管理,所以点击后会自动下载(跳过在setting plugins里面也可以安装) 本地JAVA8和Scala2.10.4软件套件的安装和Eclipse不同 打开 打开之后点击File->Project Structure来设置工程的Libraries 核心是添加Spark的jar依赖 代码拷贝

王家林谈Spark性能优化第一季!(DT大数据梦工厂)

内容: 1.Spark性能优化需要思考的基本问题: 2.CPU和Memory: 3.并行度和Task: 4.网络: ==========王家林每日大数据语录============ 王家林每日大数据语录Spark篇0080(2016.1.26于深圳):如果Spark中CPU的使用率不够高,可以考虑为当前的程序分配更多的Executor,或者增加更多的Worker实例来充分的使用多核的潜能. 王家林每日大数据语录Spark篇0079(2016.1.26于深圳):适当设置Partition分片数是非

Spark Runtime(Driver、Masster、Worker、Executor)内幕解密(DT大数据梦工厂)

内容: 1.再论Spark集群部署: 2.Job提交解密: 3.Job的生成和接受: 4.Task的运行: 5.再论Shuffle: 从一个作业视角,透过Master.Drvier.Executor来透视Spark Runtime ==========再论Spark集群部署============ 官网中关于集群的部署: 默认情况下,每个Worker下有一个Executor,会最大化的使用内存和CPU. Master发指令给Worker来分配资源,不关心Worker能不能分配到这个资源,他发给多

Spark内核架构解密(DT大数据梦工厂)

只有知道内核架构的基础上,才知道为什么要这样写程序? 手工绘图来解密Spark内核架构 通过案例来验证Spark内核架构 Spark架构思考 ==========Spark Runtime的几个概念============ 下载下来运行,基本都是standalone模式,如果掌握了standalone,则yarn和mesos,以后不做特别说明,一律是standalone模式 application=driver+executor,executor是具体处理数据分片,里面是线程池并发的处理数据分片

Spark on Yarn彻底解密(DT大数据梦工厂)

内容: 1.Hadoop Yarn的工作流程解密: 2.Spark on Yarn两种运行模式实战: 3.Spark on Yarn工作流程解密: 4.Spark on Yarn工作内幕解密: 5.Spark on Yarn最佳实践: 资源管理框架Yarn Mesos是分布式集群的资源管理框架,和大数据没关系,但是可以管理大数据的资源 ==========Hadoop Yarn解析============ 1.Yarn是Hadoop推出的资源管理器,是负责分布式(大数据)集群计算的资源管理的,负

Spark运行原理和RDD解析(DT大数据梦工厂)

Spark一般基于内存,一些情况下也会基于磁盘 Spark优先会把数据放到内存中,如果内存实在放不下,也会放到磁盘里面的 不单能计算内存放的下的数据,也能计算内存放不下的数据 实际如果数据大于内存,则要考虑数据放置策略和优化算法,因为Spark初衷是一寨式处理 小到5~10台的分布式大到8000台的规模,Spark都能运行 大数据计算问题:交互式查询(基于shell.sparkSQL).批处理.机器学习和计算等等 底层基于RDD,分布式弹性数据级,支持各种各样的比如流处理.SQL.SparkR等