spark 笔记 8: Task/TaskContext

DAGScheduler最终创建了task set,并提交给了taskScheduler。那先得看看task是怎么定义和执行的。

Task是execution执行的一个单元。

/** * A unit of execution. We have two kinds of Task‘s in Spark: * - [[org.apache.spark.scheduler.ShuffleMapTask]] * - [[org.apache.spark.scheduler.ResultTask]] * * A Spark job consists of one or more stages. The very last stage in a job consists of multiple * ResultTasks, while earlier stages consist of ShuffleMapTasks. A ResultTask executes the task * and sends the task output back to the driver application. A ShuffleMapTask executes the task * and divides the task output to multiple buckets (based on the task‘s partitioner). * * @param stageId id of the stage this task belongs to * @param partitionId index of the number in the RDD */private[spark] abstract class Task[T](val stageId: Int, var partitionId: Int) extends Serializable {

主要属性:

final def run(attemptId: Long): T = {  context = new TaskContext(stageId, partitionId, attemptId, runningLocally = false)  context.taskMetrics.hostname = Utils.localHostName()  taskThread = Thread.currentThread()  if (_killed) {    kill(interruptThread = false)  }  runTask(context)
def runTask(context: TaskContext): T
// Map output tracker epoch. Will be set by TaskScheduler.var epoch: Long = -1

var metrics: Option[TaskMetrics] = None

// Task context, to be initialized in run().@transient protected var context: TaskContext = _
// The actual Thread on which the task is running, if any. Initialized in run().@volatile @transient private var taskThread: Thread = _
/** * Handles transmission of tasks and their dependencies, because this can be slightly tricky. We * need to send the list of JARs and files added to the SparkContext with each task to ensure that * worker nodes find out about it, but we can‘t make it part of the Task because the user‘s code in * the task might depend on one of the JARs. Thus we serialize each task as multiple objects, by * first writing out its dependencies. */private[spark] object Task {  /**   * Serialize a task and the current app dependencies (files and JARs added to the SparkContext)   */  def serializeWithDependencies(
/** * Deserialize the list of dependencies in a task serialized with serializeWithDependencies, * and return the task itself as a serialized ByteBuffer. The caller can then update its * ClassLoaders and deserialize the task. * * @return (taskFiles, taskJars, taskBytes) */def deserializeWithDependencies(serializedTask: ByteBuffer)

ShuffleMapTask

/*** A ShuffleMapTask divides the elements of an RDD into multiple buckets (based on a partitioner* specified in the ShuffleDependency).** See [[org.apache.spark.scheduler.Task]] for more information.* * @param stageId id of the stage this task belongs to * @param taskBinary broadcast version of of the RDD and the ShuffleDependency. Once deserialized, *                   the type should be (RDD[_], ShuffleDependency[_, _, _]). * @param partition partition of the RDD this task is associated with * @param locs preferred task execution locations for locality scheduling */private[spark] class ShuffleMapTask(    stageId: Int,    taskBinary: Broadcast[Array[Byte]],    partition: Partition,    @transient private var locs: Seq[TaskLocation])  extends Task[MapStatus](stageId, partition.index) with Logging {
override def runTask(context: TaskContext): MapStatus = {  // Deserialize the RDD using the broadcast variable.  val ser = SparkEnv.get.closureSerializer.newInstance()  val (rdd, dep) = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])](    ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)

metrics = Some(context.taskMetrics)  var writer: ShuffleWriter[Any, Any] = null  try {    val manager = SparkEnv.get.shuffleManager    writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context)    writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])    return writer.stop(success = true).get  } catch {    case e: Exception =>      if (writer != null) {        writer.stop(success = false)      }      throw e  } finally {    context.markTaskCom pleted()  }}

ResultTask

/** * A task that sends back the output to the driver application. * * See [[Task]] for more information. * * @param stageId id of the stage this task belongs to * @param taskBinary broadcasted version of the serialized RDD and the function to apply on each *                   partition of the given RDD. Once deserialized, the type should be *                   (RDD[T], (TaskContext, Iterator[T]) => U). * @param partition partition of the RDD this task is associated with * @param locs preferred task execution locations for locality scheduling * @param outputId index of the task in this job (a job can launch tasks on only a subset of the *                 input RDD‘s partitions). */private[spark] class ResultTask[T, U](    stageId: Int,    taskBinary: Broadcast[Array[Byte]],    partition: Partition,    @transient locs: Seq[TaskLocation],    val outputId: Int)  extends Task[U](stageId, partition.index) with Serializable {
override def runTask(context: TaskContext): U = {  // Deserialize the RDD and the func using the broadcast variables.  val ser = SparkEnv.get.closureSerializer.newInstance()  val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)](    ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)

metrics = Some(context.taskMetrics)  try {    func(context, rdd.iterator(partition, context))  } finally {    context.markTaskCompleted()  }}

来自为知笔记(Wiz)

时间: 2024-10-07 08:37:59

spark 笔记 8: Task/TaskContext的相关文章

spark 笔记 15: ShuffleManager,shuffle map两端的stage/task的桥梁

无论是Hadoop还是spark,shuffle操作都是决定其性能的重要因素.在不能减少shuffle的情况下,使用一个好的shuffle管理器也是优化性能的重要手段. ShuffleManager的主要功能是在task直接传递数据,所以getWriter和getReader是它的主要接口. 大流程: 1)需求方:当一个Stage依赖于一个shuffleMap的结果,那它在DAG分解的时候就能识别到这个依赖,并注册到shuffleManager: 2)供应方:也就是shuffleMap,它在结束

spark 笔记 12: Executor,task最后的归宿

spark的Executor是执行task的容器.和java的executor概念类似. ===================start executor runs task============================ ->CoarseGrainedExecutorBackend::receiveWithLogging --接收CoarseGrainedSchedulerBackend发来的消息 ->case LaunchTask(data) =>  处理启动task的消息

spark 笔记 7: DAGScheduler

在前面的sparkContex和RDD都可以看到,真正的计算工作都是同过调用DAGScheduler的runjob方法来实现的.这是一个很重要的类.在看这个类实现之前,需要对actor模式有一点了解:http://en.wikipedia.org/wiki/Actor_model http://www.slideshare.net/YungLinHo/introduction-to-actor-model-and-akka 粗略知道actor模式怎么实现就可以了.另外,应该先看看DAG相关的概念

spark 笔记 5: SparkContext,SparkConf

SparkContext 是spark的程序入口,相当于熟悉的'main'函数.它负责链接spark集群.创建RDD.创建累加计数器.创建广播变量. /** * Main entry point for Spark functionality. A SparkContext represents the connection to a Spark * cluster, and can be used to create RDDs, accumulators and broadcast varia

spark 笔记 14: spark中的delay scheduling实现

延迟调度算法的实现是在TaskSetManager类中的,它通过将task存放在四个不同级别的hash表里,当有可用的资源时,resourceOffer函数的参数之一(maxLocality)就是这些资源的最大(或者最优)locality级别,如果存在task满足资源的locality,那从最优级别的hash表.也就是task和excutor都有loclity级别,如果能找到匹配的task,那从匹配的task中找一个最优的task. =====================延迟调度算法====

spark 笔记 11: SchedulingAlgorithm 两种调度算法的优先级比较

调度算法的最基本工作之一,就是比较两个可执行的task的优先级.spark提供的FIFO和FAIR的优先级比较在SchedulingAlgorithm这个接口体现. FIFO: --计算优先级的差.注意,在程序中,大部分时候是优先级的数字越小,它优先级越高 --如果优先级相同,那么stage编号越靠前,优先级越高 --如果优先级字段和stage id都相同,那么s2比s1更优先.(有这种情况?) FAIR: --没有达到最小资源的task比已经达到最小资源的task优先级高 --如果两个task

spark 笔记 10: TaskScheduler相关

任务调度器的接口类.应用程序可以定制自己的调度器来执行.当前spark只实现了一个任务调度器TaskSchedulerImpl ===================task scheduler begin==================== -> TaskSchedulerImpl::submitTasks(taskSet: TaskSet)  处理接受task,它做了同步操作. -> new TaskSetManager(this, taskSet, maxTaskFailures)

spark 笔记 8: Stage

Stage 是一组独立的任务,他们在一个job中执行相同的功能(function),功能的划分是以shuffle为边界的.DAG调度器以拓扑顺序执行同一个Stage中的task. /** * A stage is a set of independent tasks all computing the same function that need to run as part * of a Spark job, where all the tasks have the same shuffle

spark 笔记 6: RDD

了解RDD之前,必读UCB的论文,个人认为这是最好的资料,没有之一. http://www.cs.berkeley.edu/~matei/papers/2012/nsdi_spark.pdf A Resilient Distributed Dataset (RDD), the basic abstraction in Spark. Represents an immutable,* partitioned collection of elements that can be operated o