Spark Shuffle 详解(1)

版本:1.6.2

不管是hadoop中map/reduce还是spark中各种算子,shuffle过程都是其中核心过程,shuffle的设计是否高效,基本确定了整个计算过程是否高效。 设计难点在于shuffle过程涉及到大数据的IO操作(包括本地临时文件IO和网络IO),以及可能存在的cpu密集型排序计算操作。

在spark1.6.2版本,spark针对大型数据有三种shuffle 机制,即“sort-based shuffle”,”hash-based shuffle”,”tungsten-sort shuffle"

下面 是官方对其的描述:

/**

 * In sort-based shuffle, incoming records are sorted according to their target partition ids, then

 * written to a single map output file. Reducers fetch contiguous regions of this file in order to

 * read their portion of the map output. In cases where the map output data is too large to fit in

 * memory, sorted subsets of the output can are spilled to disk and those on-disk files are merged

 * to produce the final output file.

 *

 * Sort-based shuffle has two different write paths for producing its map output files:

 *

 *  - Serialized sorting: used when all three of the following conditions hold:

 *    1. The shuffle dependency specifies no aggregation or output ordering.

 *    2. The shuffle serializer supports relocation of serialized values (this is currently

 *       supported by KryoSerializer and Spark SQL‘s custom serializers).

 *    3. The shuffle produces fewer than 16777216 output partitions.

 *  - Deserialized sorting: used to handle all other cases.

 *

 * -----------------------

 * Serialized sorting mode

 * -----------------------

 *

 * In the serialized sorting mode, incoming records are serialized as soon as they are passed to the

 * shuffle writer and are buffered in a serialized form during sorting. This write path implements

 * several optimizations:

 *

 *  - Its sort operates on serialized binary data rather than Java objects, which reduces memory

 *    consumption and GC overheads. This optimization requires the record serializer to have certain

 *    properties to allow serialized records to be re-ordered without requiring deserialization.

 *    See SPARK-4550, where this optimization was first proposed and implemented, for more details.

 *

 *  - It uses a specialized cache-efficient sorter ([[ShuffleExternalSorter]]) that sorts

 *    arrays of compressed record pointers and partition ids. By using only 8 bytes of space per

 *    record in the sorting array, this fits more of the array into cache.

 *

 *  - The spill merging procedure operates on blocks of serialized records that belong to the same

 *    partition and does not need to deserialize records during the merge.

 *

 *  - When the spill compression codec supports concatenation of compressed data, the spill merge

 *    simply concatenates the serialized and compressed spill partitions to produce the final output

 *    partition.  This allows efficient data copying methods, like NIO‘s `transferTo`, to be used

 *    and avoids the need to allocate decompression or copying buffers during the merge.

 *

 * For more details on these optimizations, see SPARK-7081.

 */

本文针对shuffle相关的代码逻辑做一次串读,其中包括shuffle的原理,以及shuffle代码级别的实现。

Job,Stage,Task, Dependency

在Spark中,RDD是操作对象的单位,其中操作可以分为转换(transformation)和动作(actions),只有动作操作才会触发一个spark计算操作。

以rdd.map操作和rdd.count操作做比较

/**

 * Return a new RDD by applying a function to all elements of this RDD.

 */

def map[U: ClassTag](f: T => U): RDD[U] = withScope {

val cleanF = sc.clean(f)

new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))

}

/**

 * Return the number of elements in the RDD.

 */

def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum

map是一个转换操作,它只是在当前的rdd的基础上创建一个MapPartitionsRDD对象,而count是一个动作操作,它会调用sc.runJob向spark提交一个Job

Job是一组rdd的转换以及最后动作的操作集合,它是Spark里面计算最大最虚的概念,甚至在spark的任务页面中都无法看到job这个单位。 但是不管怎么样,在spark用户的角度,job是我们计算目标的单位,每次在一个rdd上做一个动作操作(acions)时,都会触发一个job,完成计算并返回我们想要的数据。

Job是由一组RDD上转换和动作组成,这组RDD之间的转换关系表现为一个有向无环图(DAG),每个RDD的生成依赖于前面1个或多个RDD。

在Spark中,两个RDD之间的依赖关系是Spark的核心。站在RDD的角度,两者依赖表现为点对点依赖, 但是在Spark中,RDD存在分区(partition)的概念,两个RDD之间的转换会被细化为两个RDD分区之间的转换。

如上图所示,站在job角度,RDD_B由RDD_A转换而成,RDD_D由RDD_C转换而成,最后RDD_E由RDD_B和RDD_D转换,最后输出RDD_E上做了一个动作,将结果输出。 但是细化到RDD内分区之间依赖,RDD_B对RDD_A的依赖,RDD_D对RDD_C的依赖是不一样,他们的区别用专业词汇来描述即为窄依赖和宽依赖。

所谓的窄依赖是说子RDD中的每一个数据分区只依赖于父RDD中的对应的有限个固定的数据分区,而宽依赖是指子RDD中的每个数据分区依赖于父RDD中的所有数据分区。

宽依赖很好理解,但是对于窄依赖比较绕口,特别是定义中有限与固定两个要求,宽依赖也满足有限和固定这两个要求?难道他们俩个之间区别也仅仅在于“有限”这个数字的大小? 其实就是这样的理解,“有限”就表现为所依赖的分区数目相比完整分区数相差很大,而且spark靠窄依赖来实现的RDD基本上都大部分都是一对一的依赖,所以就不需要纠结这个有限的关键字。

这里还有一个问题,count操作是依赖父RDD的所有分区进行计算而得到,那么它是宽依赖吗?这么疑问,答案肯定就是否定的,首先这里依赖是父RDD和子RDD之间的关系描述,count操作只有输出, 没有子rdd的概念,就不要把依赖的关系硬套上给你带来麻烦。看上面的实现,count只是把sc.runJob计算返回的Array[U]做一次sum操作而已。

窄依赖和宽依赖的分类是Spark中很重要的特性,不同依赖在实现,任务调度机制,容错恢复上都有不同的机制。

  • 实现上:对于窄依赖,rdd之间的转换可以直接pipe化,而宽依赖需要采用shuffle过程来实现。
  • 任务调度上:窄依赖意味着可以在某一个计算节点上直接通过父RDD的某几块数据(通常是一块)计算得到子RDD某一块的数据; 而相对的,宽依赖意味着子RDD某一块数据的计算必须等到它的父RDD所有数据都计算完成之后才可以进行,而且需要对父RDD的计算结果需要经过shuffle才能被下一个rdd所操作。
  • 容错恢复上:窄依赖的错误恢复会比宽依赖的错误恢复要快很多,因为对于窄依赖来说,只有丢失的那一块数据需要被重新计算, 而宽依赖意味着所有的祖先RDD中所有的数据块都需要被重新计算一遍,这也是我们建议在长“血统”链条特别是有宽依赖的时候,需要在适当的时机设置一个数据检查点以避免过长的容错恢复。

在这边 可以使用:RDD.checkpoint的方法来实现检查点

/**

 * Mark this RDD for checkpointing. It will be saved to a file inside the checkpoint

 * directory set with `SparkContext#setCheckpointDirand all references to its parent

 * RDDs will be removed. This function must be called before any job has been

 * executed on this RDD. It is strongly recommended that this RDD is persisted in

 * memory, otherwise saving it on a file will require recomputation.

 */

def checkpoint(): Unit = RDDCheckpointData.synchronized {

// NOTE: we use a global lock here due to complexities downstream with ensuring

// children RDD partitions point to the correct parent partitions. In the future

// we should revisit this consideration.

if (context.checkpointDir.isEmpty) {

throw new SparkException("Checkpoint directory has not been set in the SparkContext")

else if (checkpointData.isEmpty) {

checkpointData = Some(new ReliableRDDCheckpointData(this))

}

}

理清楚了Job层面RDD之间的关系,RDD层面分区之间的关系,那么下面讲述一下Stage概念。

Stage的划分是对一个Job里面一系列RDD转换和动作进行划分。

  • 首先job是因动作而产生,因此每个job肯定都有一个ResultStage,否则job就不会启动。
  • 其次,如果Job内部RDD之间存在宽依赖,Spark会针对它产生一个中间Stage,即为ShuffleStage,严格来说应该是ShuffleMapStage,这个stage是针对父RDD而产生的, 相当于在父RDD上做一个父rdd.map().collect()的操作。ShuffleMapStage生成的map输入,对于子RDD,如果检测到所自己所“宽依赖”的stage完成计算,就可以启动一个shuffleFectch,
    从而将父RDD输出的数据拉取过程,进行后续的计算。

因此一个Job由一个ResultStage和多个ShuffleMapStage组成。

无Shuffle Job的执行过程

对一个无Shuffle的job执行过程的剖析可以知晓我们执行一个"动作"时,spark的处理流程. 下面我们就以一个简单例子进行讲解:

sc.textFile(“mahuacai").count

//def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum

 

这个例子很简单就是统计这个文件的行数;上面一行代码,对应了下面三个过程中:

  • sc.textFile(“mahucai")会返回一个rdd,
  • 然后在这个rdd上做count动作,触发了一次Job的提交sc.runJob(this, Utils.getIteratorSize _)
  • 对runJob返回的Array结构进行sum操作;

核心过程就是第二步,下面我们以代码片段的方式来描述这个过程,这个过程肯定是线性的,就用step来标示每一步,以及相关的代码类:

//step1:SparkContext

/**

 * Run a function on a given set of partitions in an RDD and return the results as an array.

 */

def runJob[T, U: ClassTag](

rdd: RDD[T],

func: (TaskContext, Iterator[T]) => U,

partitions: Seq[Int]): Array[U] = {

val results = new Array[U](partitions.size)

runJob[T, U](rdd, func, partitions, (index, res) => results(index) = res)

results

}

sc.runJob(this, Utils.getIteratorSize _)的过程会经过一组runJob的重载函数,进入上述step1中的runJob函数,相比原始的runJob,到达这边做的工作不多,比如设置partitions个数, Utils.getIteratorSize _到func转化等,以后像这样简单的过程就不再描述.

Step1做的一个很重要的工作是构造一个Array,并构造一个函数对象"(index, res) => results(index) = res"继续传递给runJob函数,然后等待runJob函数运行结束,将results返回; 对这里的解释相当在runJob添加一个回调函数,将runJob的运行结果保存到Array到, 回调函数,index表示mapindex, res为单个map的运行结果,对于我们这里例子.res就为每个分片的 文件行数.

//step2:SparkContext

/**
 * Run a function on a given set of partitions in an RDD and pass the results to the given
 * handler function. This is the main entry point for all actions in Spark.
 */
def runJob[T, U: ClassTag](
    rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
resultHandler: (Int, U) => Unit): Unit = {
  if (stopped.get()) {
    throw new IllegalStateException("SparkContext has been shutdown")
  }
  val callSite = getCallSite
  val cleanedFunc = clean(func)
  logInfo("Starting job: " + callSite.shortForm)
  if (conf.getBoolean("spark.logLineage", false)) {
    logInfo("RDD‘s recursive dependencies:\n" + rdd.toDebugString)
  }
  dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
  progressBar.foreach(_.finishAll())
  rdd.doCheckpoint()
}

Step2中runJob就有一个resultHandler参数,这就是Step1构造的回调函数,dagScheduler是Spark里面最外层调度器,通过调用它的runJob函数,将相关参见传入到Spark调度器中. 只有Step1中的runJob函数的返回值有返回值,这里的runJob,包括dagScheduler.runJob都是没有返回值的;返回是通过Step1的回调函数进行设置的.

为什么我要一再强调返回值是通过Step1的回调函数来设置的?这个很重要,否则你都不知道spark调度的job的运行结果是怎么样被我们自己的逻辑代码所获取的!!

还有一点很重要,Step2是Step1以后的直接步骤,所以Step2中的dagScheduler.runJob是堵塞的操作,即直到Spark完成Job的运行之前,rdd.doCheckpoint()是不会执行的;

//Step3:DAGScheduler

/**

 * Run an action job on the given RDD and pass all the results to the resultHandler function as

 * they arrive.

 *

 * @param rdd target RDD to run tasks on

 * @param func a function to run on each partition of the RDD

 * @param partitions set of partitions to run on; some jobs may not want to compute on all

 *   partitions of the target RDD, e.g. for operations like first()

 * @param callSite where in the user program this job was called

 * @param resultHandler callback to pass each result to

 * @param properties scheduler properties to attach to this job, e.g. fair scheduler pool name

 *

 * @throws Exception when the job fails

 */

def runJob[T, U](

rdd: RDD[T],

func: (TaskContext, Iterator[T]) => U,

partitions: Seq[Int],

callSite: CallSite,

resultHandler: (Int, U) => Unit,

properties: Properties): Unit = {

val start = System.nanoTime

val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)

waiter.awaitResult() match {

case JobSucceeded =>

logInfo("Job %d finished: %s, took %f s".format

(waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))

case JobFailed(exception: Exception) =>

logInfo("Job %d failed: %s, took %f s".format

(waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))

// SPARK-8644: Include user stack trace in exceptions coming from DAGScheduler.

val callerStackTrace = Thread.currentThread().getStackTrace.tail

exception.setStackTrace(exception.getStackTrace ++ callerStackTrace)

throw exception

}

}

Step2中说了dagScheduler.runJob是堵塞的,堵塞就堵塞在Step3的waiter.awaitResult()操作,即submitJob会返回一个waiter对象,而我们的awaitResult()就堵塞了;

到目前为止,我们终于从runJob这个多处出现的函数名称跳到submitJob这个函数名称;继续下一步

//Step4:DAGScheduler

/**

 * Submit an action job to the scheduler.

 *

 * @param rdd target RDD to run tasks on

 * @param func a function to run on each partition of the RDD

 * @param partitions set of partitions to run on; some jobs may not want to compute on all

 *   partitions of the target RDD, e.g. for operations like first()

 * @param callSite where in the user program this job was called

 * @param resultHandler callback to pass each result to

 * @param properties scheduler properties to attach to this job, e.g. fair scheduler pool name

 *

 * @return a JobWaiter object that can be used to block until the job finishes executing

 *         or can be used to cancel the job.

 *

 * @throws IllegalArgumentException when partitions ids are illegal

 */

def submitJob[T, U](

rdd: RDD[T],

func: (TaskContext, Iterator[T]) => U,

partitions: Seq[Int],

callSite: CallSite,

resultHandler: (Int, U) => Unit,

properties: Properties): JobWaiter[U] = {

// Check to make sure we are not launching a task on a partition that does not exist.

val maxPartitions = rdd.partitions.length

partitions.find(p => p >= maxPartitions || p < 0).foreach { p =>

throw new IllegalArgumentException(

"Attempting to access a non-existent partition: " + p + ". " +

"Total number of partitions: " + maxPartitions)

}

val jobId = nextJobId.getAndIncrement()

if (partitions.size == 0) {

// Return immediately if the job is running 0 tasks

return new JobWaiter[U](this, jobId, 0, resultHandler)

}

assert(partitions.size > 0)

val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]

val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)

eventProcessLoop.post(JobSubmitted(

jobId, rdd, func2, partitions.toArray, callSite, waiter,

SerializationUtils.clone(properties)))

waiter

}

在Step4的submitJob中,我们给这次job分配了一个jobID, 通过创建了一个JobWaiter对象,返回给Step3;最重要的步骤就是调用eventProcessLoop.post(JobSubmitted(jobId, rdd, func2, partitions.toArray, callSite, waiter,SerializationUtils.clone(properties)))向DAG调度器
发送一个JobSubmitted的消息;

/**

 * Put the event into the event queue. The event thread will process it later.

 */

def post(event: E): Unit = {

eventQueue.put(event)

}

到目前为止我们都没有关系函数的参数,这里我们要分析一下发送的JobSubmitted的消息包:

  • jobId,rdd,func2,partitions.toArray这几个都比较好理解,就不阐述了
  • callSite/properties:个人不是很感兴趣,姑且理解为不重要的
  • waiter就是上面创建的JobWaiter对象,这个很重要,因为这个对象封装了几个重要的参数:
    • jobId:Job编号
    • partitions.size:分区编号
    • resultHandler:我们Step1设置的回调函数

为什么JobWaiter重要,这个对象包含了我们分区的个数.我们知道分区的个数和task个数是相同的,因此JobWaiter成功返回的前提是: 它接受到partitions.size个归属于jobid的task成功运行的结果,并通过resultHandler来将这些task运行结果回调给Step2的Array

这句话应该不难理解,其实这句话也包含了我们后面job调度的整体过程, 下面我们就一步一步来分析从job到Stage,到task以及直到task运行成功,调用我们的resultHandler回调的过程.

//Step5:DAGScheduler

private[scheduler] def handleJobSubmitted(jobId: Int,

finalRDD: RDD[_],

func: (TaskContext, Iterator[_]) => _,

partitions: Array[Int],

callSite: CallSite,

listener: JobListener,

properties: Properties) {

var finalStage: ResultStage = null

  try {

// New stage creation may throw an exception if, for example, jobs are run on a

// HadoopRDD whose underlying HDFS files have been deleted.

finalStage = newResultStage(finalRDD, func, partitions, jobId, callSite)

catch {

case e: Exception =>

logWarning("Creating new stage failed due to exception - job: " + jobId, e)

listener.jobFailed(e)

return

  }

val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)

clearCacheLocs()

logInfo("Got job %s (%s) with %d output partitions".format(

job.jobId, callSite.shortForm, partitions.length))

logInfo("Final stage: " + finalStage + " (" + finalStage.name + ")")

logInfo("Parents of final stage: " + finalStage.parents)

logInfo("Missing parents: " + getMissingParentStages(finalStage))

val jobSubmissionTime = clock.getTimeMillis()

jobIdToActiveJob(jobId) = job

activeJobs += job

finalStage.setActiveJob(job)

val stageIds = jobIdToStageIds(jobId).toArray

val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))

listenerBus.post(

SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))

submitStage(finalStage)

submitWaitingStages()

}

Step4发送的消息最后被Step5中的handleJobSubmitted函数进行处理,我这里删除了handleJobSubmitted中很多我们不关心的代码,Step5的核心代码就是创建一个finalStage, 并调用 submitStage将stage提交给Dag进行调度;这里我们从Job单位层面进入Stage层;

这个Stage命名很好:

finalStage,它是整个DAG上的最后一个stage,它不是一个集合,而是单一的stage,这说明一个道理,runJob肯定只对应一个finalStage,即最终的输出肯定只有一个, 中间的stage就是我们传说中的shuffleStage,shuffleStage的生成就是在生成finalStage过程中生成的,即newStage.

那么我们就进入newResultStage这个函数,等一下我们还会回到submitStage,来分析怎么将Stage解析为Task提交给Spark进行运行;

//Step5.1:DAGScheduler

/**

 * Create a ResultStage associated with the provided jobId.

 */

private def newResultStage(

rdd: RDD[_],

func: (TaskContext, Iterator[_]) => _,

partitions: Array[Int],

jobId: Int,

callSite: CallSite): ResultStage = {

val (parentStages: List[Stage], id: Int) = getParentStagesAndId(rdd, jobId)

val stage = new ResultStage(id, rdd, func, partitions, parentStages, jobId, callSite)

stageIdToStage(id) = stage

updateJobIdStageIdMaps(jobId, stage)

stage

}

getParentStagesAndId 的具体的实现

/**

 * Helper function to eliminate some code re-use when creating new stages.

 */

private def getParentStagesAndId(rdd: RDD[_], firstJobId: Int): (List[Stage], Int) = {

val parentStages = getParentStages(rdd, firstJobId)

val id = nextStageId.getAndIncrement()

(parentStages, id)

}

getParentStages 的具体的实现

从实现的代码里面 不难看出 这边使用的 是图算法里面的 广度遍历

/**

 * Get or create the list of parent stages for a given RDD.  The new Stages will be created with

 * the provided firstJobId.

 */

private def getParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] = {

val parents = new HashSet[Stage] // 存储parent stage

val visited = new HashSet[RDD[_]] //存储已经被访问过的RDD

// We are manually maintaining a stack here to prevent StackOverflowError

// caused by recursively visiting // 存储需要被处理的RDD。Stack中得RDD都需要被处理。

val waitingForVisit = new Stack[RDD[_]]

def visit(r: RDD[_]) {

if (!visited(r)) {

visited += r

// Kind of ugly: need to register RDDs with the cache here since

// we can‘t do it in its constructor because # of partitions is unknown

for (dep <- r.dependencies) {

dep match {

case shufDep: ShuffleDependency[_, _, _] =>

parents += getShuffleMapStage(shufDep, firstJobId) // 在ShuffleDependency时需要生成新的stage

case _ =>

waitingForVisit.push(dep.rdd)

}

}

}

}

waitingForVisit.push(rdd) // 输入的rdd作为第一个需要处理的RDD。然后从该rdd开始,顺序访问其parent rdd

while (waitingForVisit.nonEmpty) { //只要stack不为空,则一直处理。

visit(waitingForVisit.pop()) //每次visit如果遇到了ShuffleDependency,那么就会形成一个Stage,否则这些RDD属于同一个Stage

}

parents.toList

}

RDD[_] 表示 是任何类型的 RDD

getShuffleMapStage 的逻辑

**

 * Get or create a shuffle map stage for the given shuffle dependency‘s map side.

 */

private def getShuffleMapStage(

shuffleDep: ShuffleDependency[_, _, _],

firstJobId: Int): ShuffleMapStage = {

shuffleToMapStage.get(shuffleDep.shuffleIdmatch {

case Some(stage) => stage

case None =>

// We are going to register ancestor shuffle dependencies

getAncestorShuffleDependencies(shuffleDep.rdd).foreach { dep =>

shuffleToMapStage(dep.shuffleId) = newOrUsedShuffleStage(dep, firstJobId)

}

// Then register current shuffleDep

val stage = newOrUsedShuffleStage(shuffleDep, firstJobId)

shuffleToMapStage(shuffleDep.shuffleId) = stage

stage

}

}

这边 可以简单的看一下 关于的 ShuffleDependency

@DeveloperApi

class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](

@transient private val _rdd: RDD[_ <: Product2[K, V]],

val partitioner: Partitioner,

val serializer: Option[Serializer] = None,

val keyOrdering: Option[Ordering[K]] = None,

val aggregator: Option[Aggregator[K, V, C]] = None,

val mapSideCombine: Boolean = false)

extends Dependency[Product2[K, V]] {

override def rdd: RDD[Product2[K, V]] = _rdd.asInstanceOf[RDD[Product2[K, V]]]

private[spark] val keyClassName: String = reflect.classTag[K].runtimeClass.getName

private[spark] val valueClassName: String = reflect.classTag[V].runtimeClass.getName

// Note: It‘s possible that the combiner class tag is null, if the combineByKey

// methods in PairRDDFunctions are used instead of combineByKeyWithClassTag.

private[spark] val combinerClassName: Option[String] =

Option(reflect.classTag[C]).map(_.runtimeClass.getName)

val shuffleId: Int = _rdd.context.newShuffleId()

val shuffleHandle: ShuffleHandle = _rdd.context.env.shuffleManager.registerShuffle(

shuffleId, _rdd.partitions.size, this)

_rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this))

}

Step5.1中首先是在当前的rdd上调用getParentStagesAndId来生成父Stage,父Stages是一个列表;我们这里分析的cache是没有Shuffle的,那么肯定就没有父Stage这个过程;我们就不深入 去分析这个过程;

然后就创建一个Stage对象,并更新Stage和job之间的关系.

/**

 * Helper function to eliminate some code re-use when creating new stages.

 */

private def getParentStagesAndId(rdd: RDD[_], firstJobId: Int): (List[Stage], Int) = {

val parentStages = getParentStages(rdd, firstJobId)

val id = nextStageId.getAndIncrement()

(parentStages, id)

}

下面我们要从维度5.1跳转到一个和执行流程无关的代码,即Stage类的实现,毕竟是Spark的核心对象,对它的理解还是很重要的;

官方对stage 的解释 说明

/**

 * A stage is a set of parallel tasks all computing the same function that need to run as part

 * of a Spark job, where all the tasks have the same shuffle dependencies. Each DAG of tasks run

 * by the scheduler is split up into stages at the boundaries where shuffle occurs, and then the

 * DAGScheduler runs these stages in topological order.

 *

 * Each Stage can either be a shuffle map stage, in which case its tasks‘ results are input for

 * other stage(s), or a result stage, in which case its tasks directly compute a Spark action

 * (e.g. count(), save(), etc) by running a function on an RDD. For shuffle map stages, we also

 * track the nodes that each output partition is on.

 *

 * Each Stage also has a firstJobId, identifying the job that first submitted the stage.  When FIFO

 * scheduling is used, this allows Stages from earlier jobs to be computed first or recovered

 * faster on failure.

 *

 * Finally, a single stage can be re-executed in multiple attempts due to fault recovery. In that

 * case, the Stage object will track multiple StageInfo objects to pass to listeners or the web UI.

 * The latest one will be accessible through latestInfo.

 *

 * @param id Unique stage ID

 * @param rdd RDD that this stage runs on: for a shuffle map stage, it‘s the RDD we run map tasks

 *   on, while for a result stage, it‘s the target RDD that we ran an action on

 * @param numTasks Total number of tasks in stage; result stages in particular may not need to

 *   compute all partitions, e.g. for first(), lookup(), and take().

 * @param parents List of stages that this stage depends on (through shuffle dependencies).

 * @param firstJobId ID of the first job this stage was part of, for FIFO scheduling.

 * @param callSite Location in the user program associated with this stage: either where the target

 *   RDD was created, for a shuffle map stage, or where the action for a result stage was called.

 */

private[scheduler] abstract class Stage(

val id: Int,

val rdd: RDD[_],

val numTasks: Int,

val parents: List[Stage],

val firstJobId: Int,

val callSite: CallSite)

extends Logging {

val numPartitions = rdd.partitions.length

/** Set of jobs that this stage belongs to. */

 
val jobIds new HashSet[Int]

val pendingPartitions new HashSet[Int]

/** The ID to use for the next new attempt for this stage. */

 
private var nextAttemptId: Int = 0

val name: String = callSite.shortForm

val details: String = callSite.longForm

private var _internalAccumulators: Seq[Accumulator[Long]] = Seq.empty

/** Internal accumulators shared across all tasks in this stage. */

 
def internalAccumulators: Seq[Accumulator[Long]] = _internalAccumulators

 
/**

   * Re-initialize the internal accumulators associated with this stage.

   *

   * This is called every time the stage is submitted, *except* when a subset of tasks

   * belonging to this stage has already finished. Otherwise, reinitializing the internal

   * accumulators here again will override partial values from the finished tasks.

   */

 
def resetInternalAccumulators(): Unit = {

_internalAccumulators = InternalAccumulator.create(rdd.sparkContext)

}

/**

   * Pointer to the [StageInfo] object for the most recent attempt. This needs to be initialized

   * here, before any attempts have actually been created, because the DAGScheduler uses this

   * StageInfo to tell SparkListeners when a job starts (which happens before any stage attempts

   * have been created).

   */

 
private var _latestInfo: StageInfo = StageInfo.fromStage(thisnextAttemptId)

/**

   * Set of stage attempt IDs that have failed with a FetchFailure. We keep track of these

   * failures in order to avoid endless retries if a stage keeps failing with a FetchFailure.

   * We keep track of each attempt ID that has failed to avoid recording duplicate failures if

   * multiple tasks from the same stage attempt fail (SPARK-5945).

   */

 
private val fetchFailedAttemptIds new HashSet[Int]

private[scheduler] def clearFailures() : Unit = {

fetchFailedAttemptIds.clear()

}

/**

   * Check whether we should abort the failedStage due to multiple consecutive fetch failures.

   *

   * This method updates the running set of failed stage attempts and returns

   * true if the number of failures exceeds the allowable number of failures.

   */

 
private[scheduler] def failedOnFetchAndShouldAbort(stageAttemptId: Int): Boolean = {

fetchFailedAttemptIds.add(stageAttemptId)

fetchFailedAttemptIds.size >= Stage.MAX_CONSECUTIVE_FETCH_FAILURES

  }

/** Creates a new attempt for this stage by creating a new StageInfo with a new attempt ID. */

 
def makeNewStageAttempt(

numPartitionsToCompute: Int,

taskLocalityPreferences: Seq[Seq[TaskLocation]] = Seq.empty): Unit = {

_latestInfo = StageInfo.fromStage(

thisnextAttemptId, Some(numPartitionsToCompute), taskLocalityPreferences)

nextAttemptId += 1

}

/** Returns the StageInfo for the most recent attempt for this stage. */

 
def latestInfo: StageInfo = _latestInfo

 
override final def hashCode(): Int = id

override final def equals(other: Any): Boolean = other match {

case stage: Stage => stage != null && stage.id == id

case _ => false

  }

/** Returns the sequence of partition ids that are missing (i.e. needs to be computed). */

 
def findMissingPartitions(): Seq[Int]

}

private[scheduler] object Stage {

// The number of consecutive failures allowed before a stage is aborted

val MAX_CONSECUTIVE_FETCH_FAILURES = 4

}

官方的解释

/**

 * ShuffleMapStages are intermediate stages in the execution DAG that produce data for a shuffle.

 * They occur right before each shuffle operation, and might contain multiple pipelined operations

 * before that (e.g. map and filter). When executed, they save map output files that can later be

 * fetched by reduce tasks. The `shuffleDepfield describes the shuffle each stage is part of,

 * and variables like `outputLocsand `numAvailableOutputstrack how many map outputs are ready.

 *

 * ShuffleMapStages can also be submitted independently as jobs with DAGScheduler.submitMapStage.

 * For such stages, the ActiveJobs that submitted them are tracked in `mapStageJobs`. Note that

 * there can be multiple ActiveJobs trying to compute the same shuffle map stage.

 */

private[spark] class ShuffleMapStage(

id: Int,

rdd: RDD[_],

numTasks: Int,

parents: List[Stage],

firstJobId: Int,

callSite: CallSite,

val shuffleDep: ShuffleDependency[_, _, _])

extends Stage(id, rdd, numTasks, parents, firstJobId, callSite) {

private[thisvar _mapStageJobs: List[ActiveJob] = Nil

 
private[thisvar _numAvailableOutputs: Int = 0

/**

   * List of [[MapStatus]] for each partition. The index of the array is the map partition id,

   * and each value in the array is the list of possible [[MapStatus]] for a partition

   * (a single task might run multiple times).

   */

 
private[thisval outputLocs = Array.fill[List[MapStatus]](numPartitions)(Nil)

override def toString: String = "ShuffleMapStage " + id

/**

   * Returns the list of active jobs,

   * i.e. map-stage jobs that were submitted to execute this stage independently (if any).

   */

 
def mapStageJobs: Seq[ActiveJob] = _mapStageJobs

/** Adds the job to the active job list. */

 
def addActiveJob(job: ActiveJob): Unit = {

_mapStageJobs = job :: _mapStageJobs

}

/** Removes the job from the active job list. */

 
def removeActiveJob(job: ActiveJob): Unit = {

_mapStageJobs = _mapStageJobs.filter(_ != job)

}

/**

   * Number of partitions that have shuffle outputs.

   * When this reaches [[numPartitions]], this map stage is ready.

   * This should be kept consistent as `outputLocs.filter(!_.isEmpty).size`.

   */

 
def numAvailableOutputs: Int = _numAvailableOutputs

 
/**

   * Returns true if the map stage is ready, i.e. all partitions have shuffle outputs.

   * This should be the same as `outputLocs.contains(Nil)`.

   */

 def isAvailable: Boolean = _numAvailableOutputs == numPartitions

 
/** Returns the sequence of partition ids that are missing (i.e. needs to be computed). */

 
override def findMissingPartitions(): Seq[Int] = {

val missing = (0 until numPartitions).filter(id => outputLocs(id).isEmpty)

assert(missing.size == numPartitions _numAvailableOutputs,

s"${missing.size} missing, expected ${numPartitions _numAvailableOutputs}")

missing

}

def addOutputLoc(partition: Int, status: MapStatus): Unit = {

val prevList = outputLocs(partition)

outputLocs(partition) = status :: prevList

if (prevList == Nil) {

_numAvailableOutputs += 1

}

}

def removeOutputLoc(partition: Int, bmAddress: BlockManagerId): Unit = {

val prevList = outputLocs(partition)

val newList = prevList.filterNot(_.location == bmAddress)

outputLocs(partition) = newList

if (prevList != Nil && newList == Nil) {

_numAvailableOutputs -= 1

}

}

/**

   * Returns an array of [[MapStatus]] (index by partition id). For each partition, the returned

   * value contains only one (i.e. the first) [[MapStatus]]. If there is no entry for the partition,

   * that position is filled with null.

   */

 
def outputLocInMapOutputTrackerFormat(): Array[MapStatus] = {

outputLocs.map(_.headOption.orNull)

}

/**

   * Removes all shuffle outputs associated with this executor. Note that this will also remove

   * outputs which are served by an external shuffle server (if one exists), as they are still

   * registered with this execId.

   */

 
def removeOutputsOnExecutor(execId: String): Unit = {

var becameUnavailable = false

    for (partition <- 0 until numPartitions) {

val prevList = outputLocs(partition)

val newList = prevList.filterNot(_.location.executorId == execId)

outputLocs(partition) = newList

if (prevList != Nil && newList == Nil) {

becameUnavailable = true

       
_numAvailableOutputs -= 1

}

}

if (becameUnavailable) {

logInfo("%s is now unavailable on executor %s (%d/%d, %s)".format(

this, execId, _numAvailableOutputsnumPartitions, isAvailable))

}

}

}

首先我们看ShuffleMapStage(是 stage 的实现 )几个字段,其中shuffleDep和parents最为重要,首先如果一个Stage的shuffleDep不为空,那么当前的Stage是因为shuffleMap输出而生成的Stage;

怎么解释呢?shuffleDep就是该Stage的生成原因;因为下游rdd对当前的rdd有这个依赖而生成在当前rdd上生成一个Stage. 因此FinalStage,shuffleDep值为none

parents参数就是父Stage列表,当前rdd被调度的前提是所有的父Stage都调度完成;对于我们当前研究这个case来说,shuffleDep和parents都为none;

Stage这个类还有两个比较重要的函数:

//ShuffleMapStage.class

/**

 * Returns true if the map stage is ready, i.e. all partitions have shuffle outputs.

 * This should be the same as `outputLocs.contains(Nil)`.

 */

def isAvailable: Boolean = _numAvailableOutputs == numPartitions

def addOutputLoc(partition: Int, status: MapStatus): Unit = {

val prevList = outputLocs(partition)

outputLocs(partition) = status :: prevList

if (prevList == Nil) {

_numAvailableOutputs += 1

}

}

时间: 2024-10-21 19:16:25

Spark Shuffle 详解(1)的相关文章

Apache Spark源码走读之13 -- hiveql on spark实现详解

欢迎转载,转载请注明出处,徽沪一郎 概要 在新近发布的spark 1.0中新加了sql的模块,更为引人注意的是对hive中的hiveql也提供了良好的支持,作为一个源码分析控,了解一下spark是如何完成对hql的支持是一件非常有趣的事情. Hive简介 Hive的由来 以下部分摘自Hadoop definite guide中的Hive一章 "Hive由Facebook出品,其设计之初目的是让精通SQL技能的分析师能够对Facebook存放在HDFS上的大规模数据集进行分析和查询. Hive大大

第37课:Spark中Shuffle详解及作业

1.什么是Spark的Shuffle 图1 Spark有很多算子,比如:groupByKey.join等等都会产生shuffle. 产生shuffle的时候,首先会产生Stage划分. 上一个Stage会把 计算结果放在LocalSystemFile中,并汇报给Driver: 下一个Stage的运行由Driver触发,Executor向Driver请求,把上一个Stage的计算结果抓取过来. 2.Hadoop的Shuffle过程 图2 该图表达了Hadoop的map和reduce两个阶段,通过S

spark配置详解

对付看把 到这里格式变化太大了,懒得调整了,这是大概spark1.5版本时候的一些参数默认值,现在2.x会有变化 这些皆可在 spark-default.conf配置,或者部分可在 sparkconf().set设置 应用程序属性 |--------------------------------------------------------------------------------------------| 属性名称                                   

Spark技术内幕: Shuffle详解(三)

前两篇文章写了Shuffle Read的一些实现细节.但是要想彻底理清楚这里边的实现逻辑,还是需要更多篇幅的:本篇开始,将按照Job的执行顺序,来讲解Shuffle.即,结果数据(ShuffleMapTask的结果和ResultTask的结果)是如何产生的:结果是如何处理的:结果是如何读取的. 在Worker上接收Task执行命令的是org.apache.spark.executor.CoarseGrainedExecutorBackend.它在接收到LaunchTask的命令后,通过在Driv

spark core源码分析15 Shuffle详解-写流程

博客地址: http://blog.csdn.net/yueqian_zhu/ Shuffle是一个比较复杂的过程,有必要详细剖析一下内部写的逻辑 ShuffleManager分为SortShuffleManager和HashShuffleManager 一.SortShuffleManager 每个ShuffleMapTask不会为每个Reducer生成一个单独的文件:相反,它会将所有的结果写到一个本地文件里,同时会生成一个index文件,Reducer可以通过这个index文件取得它需要处理

Spark分区详解!DT大数据梦工厂王家林老师亲自讲解!

http://www.tudou.com/home/_79823675/playlist?qq-pf-to=pcqq.group 一.分片和分区的区别? 分片是从数据角度,分区是从计算的角度,其实都是从大的状态,split成小的. 二.spark分区理解 rdd作为一个分布式的数据集,是分布在多个worker节点上的.如下图所示,RDD1有五个分区(partition),他们分布在了四个worker nodes 上面,RDD2有三个分区,分布在了三个worker nodes上面. 三.默认分区

Spark函数详解系列--RDD基本转换

摘要:   RDD:弹性分布式数据集,是一种特殊集合 ‚ 支持多种来源 ‚ 有容错机制 ‚ 可以被缓存 ‚ 支持并行操作,一个RDD代表一个分区里的数据集   RDD有两种操作算子:          Transformation(转换):Transformation属于延迟计算,当一个RDD转换成另一个RDD时并没有立即进行转换,仅仅是记住了数据集的逻辑操作          Ation(执行):触发Spark作业的运行,真正触发转换算子的计算 本系列主要讲解Spark中常用的函数操作:  

spark RPC详解

前段时间看spark,看着迷迷糊糊的.最近终于有点头绪,先梳理了一下spark rpc相关的东西,先记录下来. 1,概述 个人认为,如果把分布式系统(HDFS, HBASE,SPARK等)比作一个人,那么RPC可以认为是人体的血液循环系统.它将系统中各个不同的组件(如Hbase中的master, Regionserver, client)联系了起来.同样,在spark中,不同组件像driver,executor,worker,master(stanalone模式)之间的通信也是基于RPC来实现的

深入探究Spark -- RDD详解

Spark最基本.最根本的数据抽象 RDD基于内存,提高了迭代式.交互式操作的性能 RDD是只读的,只能通过其他RDD批量操作来创建,提高容错性    另外RDD还具有位置感知性调度和可伸缩性 RDD只支持粗粒度转换,记录Lineage,用于恢复丢失的分区,从物理存储的数据计算出相应的RDD分区 RDD的5个主要属性: 1.一组分片,默认的分片个数等于core数.BlockManager进行分配. 2.一个compute计算分区函数,对迭代器进行复合,以分片为单位 3.RDD之间的依赖关系,使数