DAGScheduler最终创建了task set,并提交给了taskScheduler。那先得看看task是怎么定义和执行的。
Task是execution执行的一个单元。
/** * A unit of execution. We have two kinds of Task‘s in Spark: * - [[org.apache.spark.scheduler.ShuffleMapTask]] * - [[org.apache.spark.scheduler.ResultTask]] * * A Spark job consists of one or more stages. The very last stage in a job consists of multiple * ResultTasks, while earlier stages consist of ShuffleMapTasks. A ResultTask executes the task * and sends the task output back to the driver application. A ShuffleMapTask executes the task * and divides the task output to multiple buckets (based on the task‘s partitioner). * * @param stageId id of the stage this task belongs to * @param partitionId index of the number in the RDD */private[spark] abstract class Task[T](val stageId: Int, var partitionId: Int) extends Serializable {
主要属性:
final def run(attemptId: Long): T = { context = new TaskContext(stageId, partitionId, attemptId, runningLocally = false) context.taskMetrics.hostname = Utils.localHostName() taskThread = Thread.currentThread() if (_killed) { kill(interruptThread = false) } runTask(context)
def runTask(context: TaskContext): T
// Map output tracker epoch. Will be set by TaskScheduler.var epoch: Long = -1 var metrics: Option[TaskMetrics] = None // Task context, to be initialized in run().@transient protected var context: TaskContext = _
// The actual Thread on which the task is running, if any. Initialized in run().@volatile @transient private var taskThread: Thread = _
/** * Handles transmission of tasks and their dependencies, because this can be slightly tricky. We * need to send the list of JARs and files added to the SparkContext with each task to ensure that * worker nodes find out about it, but we can‘t make it part of the Task because the user‘s code in * the task might depend on one of the JARs. Thus we serialize each task as multiple objects, by * first writing out its dependencies. */private[spark] object Task { /** * Serialize a task and the current app dependencies (files and JARs added to the SparkContext) */ def serializeWithDependencies(
/** * Deserialize the list of dependencies in a task serialized with serializeWithDependencies, * and return the task itself as a serialized ByteBuffer. The caller can then update its * ClassLoaders and deserialize the task. * * @return (taskFiles, taskJars, taskBytes) */def deserializeWithDependencies(serializedTask: ByteBuffer)
ShuffleMapTask
/*** A ShuffleMapTask divides the elements of an RDD into multiple buckets (based on a partitioner* specified in the ShuffleDependency).** See [[org.apache.spark.scheduler.Task]] for more information.* * @param stageId id of the stage this task belongs to * @param taskBinary broadcast version of of the RDD and the ShuffleDependency. Once deserialized, * the type should be (RDD[_], ShuffleDependency[_, _, _]). * @param partition partition of the RDD this task is associated with * @param locs preferred task execution locations for locality scheduling */private[spark] class ShuffleMapTask( stageId: Int, taskBinary: Broadcast[Array[Byte]], partition: Partition, @transient private var locs: Seq[TaskLocation]) extends Task[MapStatus](stageId, partition.index) with Logging {
override def runTask(context: TaskContext): MapStatus = { // Deserialize the RDD using the broadcast variable. val ser = SparkEnv.get.closureSerializer.newInstance() val (rdd, dep) = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])]( ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader) metrics = Some(context.taskMetrics) var writer: ShuffleWriter[Any, Any] = null try { val manager = SparkEnv.get.shuffleManager writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context) writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]]) return writer.stop(success = true).get } catch { case e: Exception => if (writer != null) { writer.stop(success = false) } throw e } finally { context.markTaskCom pleted() }}
ResultTask
/** * A task that sends back the output to the driver application. * * See [[Task]] for more information. * * @param stageId id of the stage this task belongs to * @param taskBinary broadcasted version of the serialized RDD and the function to apply on each * partition of the given RDD. Once deserialized, the type should be * (RDD[T], (TaskContext, Iterator[T]) => U). * @param partition partition of the RDD this task is associated with * @param locs preferred task execution locations for locality scheduling * @param outputId index of the task in this job (a job can launch tasks on only a subset of the * input RDD‘s partitions). */private[spark] class ResultTask[T, U]( stageId: Int, taskBinary: Broadcast[Array[Byte]], partition: Partition, @transient locs: Seq[TaskLocation], val outputId: Int) extends Task[U](stageId, partition.index) with Serializable {
override def runTask(context: TaskContext): U = { // Deserialize the RDD and the func using the broadcast variables. val ser = SparkEnv.get.closureSerializer.newInstance() val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)]( ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader) metrics = Some(context.taskMetrics) try { func(context, rdd.iterator(partition, context)) } finally { context.markTaskCompleted() }}
时间: 2024-10-07 08:37:59