spark 笔记 10: TaskScheduler相关

任务调度器的接口类。应用程序可以定制自己的调度器来执行。当前spark只实现了一个任务调度器TaskSchedulerImpl

===================task scheduler begin====================

-> TaskSchedulerImpl::submitTasks(taskSet: TaskSet)  处理接受task,它做了同步操作。

-> new TaskSetManager(this, taskSet, maxTaskFailures) 每个taskSet都会对应 一个TaskSetManager接管任务调度

->activeTaskSets(taskSet.id) = manager  记录为active taskSet

->schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)  --将taskManeger提交到调度器。

-> starvationTimer.scheduleAtFixedRate(  --起一个防饿死的定时器,默认15s后提示错误

->new TimerTask()

->backend.reviveOffers()  backend=(CoarseGrainedSchedulerBackend) --通知后台调度器,现在需要资源

->driverActor ! ReviveOffers

->makeOffers

->tasks = scheduler.resourceOffers(offers: Seq[WorkerOffer])-- 将所有资源作为参数

返回值Seq[Seq[TaskDescription]]为每个executor要执行的task列表

->val shuffledOffers = Random.shuffle(offers) --将所有资源随机排列后使用

->val sortedTaskSets = rootPool.getSortedTaskSetQueue --按优先级顺序获取要执行的taskSet序列

->schedulableQueue.toSeq.sortWith(taskSetSchedulingAlgorithm.comparator)

将所有taskSet按优先级排列,后面专门再看排列的算法

->for (taskSet <- sortedTaskSets)

->taskSet.executorAdded()

->for (taskSet <- sortedTaskSets; maxLocality <- taskSet.myLocalityLevels)

->until (launchedTask) --每个taskSet只要得到一个资源,就不再获取资源了

->for (i <- 0 until shuffledOffers.size) --查找所有可用的资源

->for (task <- taskSet.resourceOffer(execId, host, maxLocality)) --对taskSet给出的所有task,都记录为需要执行

->launchTasks(tasks: scala.Seq[scala.Seq[TaskDescription]]) --Launch tasks returned by a set of resource offers

->for (task <- tasks.flatten)

->//如果task过大(默认10m),直接失败退出)

->executorActor(task.executorId) ! LaunchTask(new SerializableBuffer(serializedTask))

给调度到task的executor发送task,由Executor接管task的执行。到此,离最后的task.run()不远了~~

==================end================================

调度的过程和常理有点不同:我们一般认为是拿task去寻找合适的资源,但是spark中是拿资源去寻找合适的task。这可能是因为task(由RDD分解而来)是不变的,但是资源是不断变化的缘故。

TaskScheduler :调度接口,当前只有TaskSchedulerImpl实现了。应用可以提供自己的调度器。

/** * Low-level task scheduler interface, currently implemented exclusively by TaskSchedulerImpl. * This interface allows plugging in different task schedulers. Each TaskScheduler schedulers tasks * for a single SparkContext. These schedulers get sets of tasks submitted to them from the * DAGScheduler for each stage, and are responsible for sending the tasks to the cluster, running * them, retrying if there are failures, and mitigating stragglers. They return events to the * DAGScheduler. */private[spark] trait TaskScheduler {

def rootPool: Pool  def schedulingMode: SchedulingMode  def start(): Unit  // Invoked after system has successfully initialized (typically in spark context).  // Yarn uses this to bootstrap allocation of resources based on preferred locations,  // wait for slave registerations, etc.  def postStartHook() { }  // Disconnect from the cluster.  def stop(): Unit  // Submit a sequence of tasks to run.  def submitTasks(taskSet: TaskSet): Unit  // Cancel a stage.  def cancelTasks(stageId: Int, interruptThread: Boolean)  // Set the DAG scheduler for upcalls. This is guaranteed to be set before submitTasks is called.  def setDAGScheduler(dagScheduler: DAGScheduler): Unit  // Get the default level of parallelism to use in the cluster, as a hint for sizing jobs.  def defaultParallelism(): Int  /**   * Update metrics for in-progress tasks and let the master know that the BlockManager is still   * alive. Return true if the driver knows about the given block manager. Otherwise, return false,   * indicating that the block manager should re-register.   */  def executorHeartbeatReceived(execId: String, taskMetrics: Array[(Long, TaskMetrics)],    blockManagerId: BlockManagerId): Boolean}
/** * Schedules tasks for multiple types of clusters by acting through a SchedulerBackend. * It can also work with a local setup by using a LocalBackend and setting isLocal to true. * It handles common logic, like determining a scheduling order across jobs, waking up to launch * speculative tasks, etc. * * Clients should first call initialize() and start(), then submit task sets through the * runTasks method. *     * THREADING: SchedulerBackends and task-submitting clients can call this class from multiple * threads, so it needs locks in public API methods to maintain its state. In addition, some * SchedulerBackends synchronize on themselves when they want to send events here, and then * acquire a lock on us, so we need to make sure that we don‘t try to lock the backend while * we are holding a lock on ourselves. */private[spark] class TaskSchedulerImpl(    val sc: SparkContext,    val maxTaskFailures: Int,    isLocal: Boolean = false)  extends TaskScheduler with Logging

主要属性:

// TaskSetManagers are not thread safe, so any access to one should be synchronized// on this class.val activeTaskSets = new HashMap[String, TaskSetManager]

val taskIdToTaskSetId = new HashMap[Long, String]val taskIdToExecutorId = new HashMap[Long, String]
// Which executor IDs we have executors onval activeExecutorIds = new HashSet[String]

// The set of executors we have on each host; this is used to compute hostsAlive, which// in turn is used to decide when we can attain data locality on a given hostprotected val executorsByHost = new HashMap[String, HashSet[String]]

protected val hostsByRack = new HashMap[String, HashSet[String]]

protected val executorIdToHost = new HashMap[String, String]
// Listener object to pass upcalls intovar dagScheduler: DAGScheduler = null

var backend: SchedulerBackend = null

val mapOutputTracker = SparkEnv.get.mapOutputTracker

var schedulableBuilder: SchedulableBuilder = nullvar rootPool: Pool = null

初始化不同的调度算法

def initialize(backend: SchedulerBackend) {  this.backend = backend  // temporarily set rootPool name to empty  rootPool = new Pool("", schedulingMode, 0, 0)  schedulableBuilder = {    schedulingMode match {      case SchedulingMode.FIFO =>        new FIFOSchedulableBuilder(rootPool)      case SchedulingMode.FAIR =>        new FairSchedulableBuilder(rootPool, conf)    }  }  schedulableBuilder.buildPools()}
override def submitTasks(taskSet: TaskSet) {  val tasks = taskSet.tasks  logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks")  this.synchronized {    val manager = new TaskSetManager(this, taskSet, maxTaskFailures)    activeTaskSets(taskSet.id) = manager    schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)

if (!isLocal && !hasReceivedTask) {      starvationTimer.scheduleAtFixedRate(new TimerTask() {        override def run() {          if (!hasLaunchedTask) {            logWarning("Initial job has not accepted any resources; " +              "check your cluster UI to ensure that workers are registered " +              "and have sufficient memory")          } else {            this.cancel()          }        }      }, STARVATION_TIMEOUT, STARVATION_TIMEOUT)    }    hasReceivedTask = true  }  backend.reviveOffers()}
/** * Called by cluster manager to offer resources on slaves. We respond by asking our active task * sets for tasks in order of priority. We fill each node with tasks in a round-robin manner so * that tasks are balanced across the cluster. */def resourceOffers(offers: Seq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized {
def statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer) {

TaskSetManager和TaskSet是意义对应的,它们可以等同看待。resourceOffer是他的主要功能:根据输入的资源,给出taskSet需要执行的task列表。

/** * Schedules the tasks within a single TaskSet in the TaskSchedulerImpl. This class keeps track of * each task, retries tasks if they fail (up to a limited number of times), and * handles locality-aware scheduling for this TaskSet via delay scheduling. The main interfaces * to it are resourceOffer, which asks the TaskSet whether it wants to run a task on one node, * and statusUpdate, which tells it that one of its tasks changed state (e.g. finished). * * THREADING: This class is designed to only be called from code with a lock on the * TaskScheduler (e.g. its event handlers). It should not be called from other threads. * * @param sched           the TaskSchedulerImpl associated with the TaskSetManager * @param taskSet         the TaskSet to manage scheduling for * @param maxTaskFailures if any particular task fails more than this number of times, the entire *                        task set will be aborted */private[spark] class TaskSetManager(    sched: TaskSchedulerImpl,    val taskSet: TaskSet,    val maxTaskFailures: Int,    clock: Clock = SystemClock)  extends Schedulable with Logging {
val copiesRunning = new Array[Int](numTasks)val successful = new Array[Boolean](numTasks)private val numFailures = new Array[Int](numTasks)// key is taskId, value is a Map of executor id to when it failedprivate val failedExecutors = new HashMap[Int, HashMap[String, Long]]()

val taskAttempts = Array.fill[List[TaskInfo]](numTasks)(Nil)var tasksSuccessful = 0
// Set of pending tasks for each executor. These collections are actually// treated as stacks, in which new tasks are added to the end of the// ArrayBuffer and removed from the end. This makes it faster to detect// tasks that repeatedly fail because whenever a task failed, it is put// back at the head of the stack. They are also only cleaned up lazily;// when a task is launched, it remains in all the pending lists except// the one that it was launched from, but gets removed from them later.private val pendingTasksForExecutor = new HashMap[String, ArrayBuffer[Int]]

// Set of pending tasks for each host. Similar to pendingTasksForExecutor,// but at host level.private val pendingTasksForHost = new HashMap[String, ArrayBuffer[Int]]

// Set of pending tasks for each rack -- similar to the above.private val pendingTasksForRack = new HashMap[String, ArrayBuffer[Int]]

// Set containing pending tasks with no locality preferences.var pendingTasksWithNoPrefs = new ArrayBuffer[Int]

// Set containing all pending tasks (also used as a stack, as above).val allPendingTasks = new ArrayBuffer[Int]

// Tasks that can be speculated. Since these will be a small fraction of total// tasks, we‘ll just hold them in a HashSet.val speculatableTasks = new HashSet[Int]

// Task index, start and finish time for each task attempt (indexed by task ID)val taskInfos = new HashMap[Long, TaskInfo]
// Figure out which locality levels we have in our TaskSet, so we can do delay schedulingvar myLocalityLevels = computeValidLocalityLevels()var localityWaits = myLocalityLevels.map(getLocalityWait) // Time to wait at each level

// Delay scheduling variables: we keep track of our current locality level and the time we// last launched a task at that level, and move up a level when localityWaits[curLevel] expires.// We then move down if we manage to launch a "more local" task.var currentLocalityIndex = 0    // Index of our current locality level in validLocalityLevelsvar lastLaunchTime = clock.getTime()  // Time we last launched a task at this level
/** * Return a speculative task for a given executor if any are available. The task should not have * an attempt running on this host, in case the host is slow. In addition, the task should meet * the given locality constraint. */private def findSpeculativeTask(execId: String, host: String, locality: TaskLocality.Value)  : Option[(Int, TaskLocality.Value)] =

调度器眼中的executor: 调度的过程和常理有点不同:我们一般认为是拿task去寻找合适的资源,但是spark中是拿资源去寻找合适的task。这可能是因为task(由RDD分解而来)是不变的,但是资源是不断变化的缘故。

/**
 * Respond to an offer of a single executor from the scheduler by finding a task * * NOTE: this function is either called with a maxLocality which * would be adjusted by delay scheduling algorithm or it will be with a special * NO_PREF locality which will be not modified * * @param execId the executor Id of the offered resource * @param host  the host Id of the offered resource * @param maxLocality the maximum locality we want to schedule the tasks at */def resourceOffer(    execId: String,    host: String,    maxLocality: TaskLocality.TaskLocality)  : Option[TaskDescription] ={

SchedulableBuilder定义了FIFO/FAIR两种调度算法。他们都是对Pool的封装,所以重点是Pool

/** * An interface to build Schedulable tree * buildPools: build the tree nodes(pools) * addTaskSetManager: build the leaf nodes(TaskSetManagers) */private[spark] trait SchedulableBuilder {  def rootPool: Pool

def buildPools()

def addTaskSetManager(manager: Schedulable, properties: Properties)}
private[spark] class FIFOSchedulableBuilder(val rootPool: Pool)
private[spark] class FairSchedulableBuilder(val rootPool: Pool, conf: SparkConf)  extends SchedulableBuilder with Logging {

Pool: 对应文档中调度的group。每个Pool可以设置独立的调度算法。

/** * An Schedulable entity that represent collection of Pools or TaskSetManagers */

private[spark] class Pool(    val poolName: String,    val schedulingMode: SchedulingMode,    initMinShare: Int,    initWeight: Int)  extends Schedulable  with Logging {
val schedulableQueue = new ConcurrentLinkedQueue[Schedulable]val schedulableNameToSchedulable = new ConcurrentHashMap[String, Schedulable]

Schedulable: 调度器操作的对象需要实现这个接口,也就是实现这个接口的类都可以交给调度器根据优先级调度。就像Task对应executor一样。getSortedTaskSetQueue是主要方法,它按优先级顺序将task返回给外层调度器。

/** * An interface for schedulable entities. * there are two type of Schedulable entities(Pools and TaskSetManagers) */private[spark] trait Schedulable {  var parent: Pool  // child queues  def schedulableQueue: ConcurrentLinkedQueue[Schedulable]  def schedulingMode: SchedulingMode  def weight: Int  def minShare: Int  def runningTasks: Int  def priority: Int  def stageId: Int  def name: String

def addSchedulable(schedulable: Schedulable): Unit  def removeSchedulable(schedulable: Schedulable): Unit  def getSchedulableByName(name: String): Schedulable  def executorLost(executorId: String, host: String): Unit  def checkSpeculatableTasks(): Boolean  def getSortedTaskSetQueue: ArrayBuffer[TaskSetManager]}

CoarseGrainedSchedulerBackend:它是屏蔽各种资源管理算法的抽象。local、standalone、yarn-client等都是它的子类。而spark内部仅与CoarseGrainedSchedulerBackend交互,所以说他起了屏蔽左右。

/** * A scheduler backend that waits for coarse grained executors to connect to it through Akka. * This backend holds onto each executor for the duration of the Spark job rather than relinquishing * executors whenever a task is done and asking the scheduler to launch a new executor for * each new task. Executors may be launched in a variety of ways, such as Mesos tasks for the * coarse-grained Mesos mode or standalone processes for Spark‘s standalone deploy mode * (spark.deploy.*). */private[spark]class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: ActorSystem)  extends SchedulerBackend with Logging

主要属性:

// Use an atomic variable to track total number of cores in the cluster for simplicity and speedvar totalCoreCount = new AtomicInteger(0)var totalRegisteredExecutors = new AtomicInteger(0)val conf = scheduler.sc.confprivate val timeout = AkkaUtils.askTimeout(conf)private val akkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf)// Submit tasks only after (registered resources / total expected resources)// is equal to at least this value, that is double between 0 and 1.var minRegisteredRatio =  math.min(1, conf.getDouble("spark.scheduler.minRegisteredResourcesRatio", 0))// Submit tasks after maxRegisteredWaitingTime milliseconds// if minRegisteredRatio has not yet been reachedval maxRegisteredWaitingTime =  conf.getInt("spark.scheduler.maxRegisteredResourcesWaitingTime", 30000)val createTime = System.currentTimeMillis()

class DriverActor(sparkProperties: Seq[(String, String)]) extends Actor with ActorLogReceive {

override protected def log = CoarseGrainedSchedulerBackend.this.log

private val executorActor = new HashMap[String, ActorRef]  private val executorAddress = new HashMap[String, Address]  private val executorHost = new HashMap[String, String]  private val freeCores = new HashMap[String, Int]  private val totalCores = new HashMap[String, Int]  private val addressToExecutorId = new HashMap[Address, String]

来自为知笔记(Wiz)

时间: 2024-12-20 04:57:38

spark 笔记 10: TaskScheduler相关的相关文章

shell 脚本实战笔记(10)--spark集群脚本片段念念碎

前言: 通过对spark集群脚本的研读, 对一些重要的shell脚本技巧, 做下笔记. *). 取当前脚本的目录 sbin=`dirname "$0"` sbin=`cd "$sbin"; pwd` 代码评注:# 以上代码为获取执行脚本所在的目录的常用技巧# sbin=$(dirname $0) 返回可能是相对路径, 比如./ # sbin=$(cd $sbin; pwd) 采用pwd, 来返回脚本所在目录的绝对路径 *). 循环遍历脚本参数 while (( &q

Android:日常学习笔记(10)———使用LitePal操作数据库

Android:日常学习笔记(10)---使用LitePal操作数据库 引入LitePal 什么是LitePal LitePal是一款开源的Android数据库框架,采用了对象关系映射(ORM)的模式,将平时开发时最常用的一些数据库功能进行了封装,使得开发者不用编写一行SQL语句就可以完成各种建表.増删改查的操作.并且LitePal很"轻",jar包大小不到100k,而且近乎零配置,这一点和Hibernate这类的框架有很大区别.目前LitePal的源码已经托管到了GitHub上. 关

MongoDB权威指南学习笔记4---查询相关的知识点

1 find find({查询条件},{"key":1,"email":1})  后面表示返回哪些键 2 可用的比较操作符 $lt , $lte,$gt,$gte 比如db.users.find({"age":{"$gte":18,"$lte":30}}) 3不等于 find(...{"key":{"$ne":"value"}} 4 in find

《C++ Primer Plus》学习笔记10

<C++ Primer Plus>学习笔记10 <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<&

MongoDB权威指南学习笔记5---索引相关的知识点

1 查看查询计划 db.user.find({"username":"xxx"}) .explain() db.doc.find({"es_y":"2014"}).explain() {  "cursor" : "BasicCursor",  "isMultiKey" : false,  "n" : 0,  "nscannedObject

【Spark Core】TaskScheduler源码与任务提交原理浅析1

引言 上一节<Stage生成和Stage源码浅析>中,我介绍了Stage生成划分到提交Stage的过程,分析最终归结到submitStage的递归提交Stage,其中要通过submitMissingTasks函数创建task集合来实现任务的创建和分发. 在接下来的几篇文章中,我将具体介绍一下任务创建和分发的过程,为了让逻辑更加清楚,我将分成几篇文章进行介绍,好保证简明清晰,逻辑连贯,前后统一. TaskScheduler介绍 TaskScheduler的主要任务是提交taskset到集群运算并

jQuery学习笔记10:Ajax技术

jQuery 库拥有完整的 Ajax 兼容套件.其中的函数和方法允许我们在不刷新浏览器的情况下从服务器加载数据. jQuery 采用了三层封装:最底层的封装方法为:$.ajax(),而通过这层封装了第二层有三种方法:.load().$.get()和$.post(),最高层是$.getScript()和$.getJSON()方法. 函数 描述 jQuery.ajax() 执行异步 HTTP (Ajax) 请求. .ajaxComplete() 当 Ajax 请求完成时注册要调用的处理程序.这是一个

sqlite学习笔记10:C语言中使用sqlite之查询和更新数据

前面说到的 sqlite_exec() 中的第三个参数, SQLite 将为 sql 参数内执行的每个 SELECT 语句中处理的每个记录调用这个回调函数. 本节添加了两个函数,selectFromTable和updateTable. 实例程序如下: #include <stdio.h> #include <stdlib.h> #include "sqlite/sqlite3.h" #define DB_NANE "sqlite/test.db&quo

lua学习笔记10:lua简单命令行

前面多次用了命令行,这次就好好学下命令行: 一 格式 lua [options][script][args] 二 具体命令 -e 直接将命令传个lua -l 加载一个文件 -i 进入交互模式 例如,终端输入: lua -e "print(math.sin(12))" lua学习笔记10:lua简单命令行,布布扣,bubuko.com