Spark Scheduler 模块(下)

Scheduler 模块中最重要的两个类是 DAGScheduler 和 TaskScheduler。上篇讲了 DAGScheduler,这篇讲 TaskScheduler。

TaskScheduler

前面提到,在 SparkContext 初始化的过程中,根据 master 的类型分别创建不同的 TaskScheduler 的实现。当 master 为 local, spark, mesos 时创建 TaskSchedulerImpl,当 master 为 YARN 时,创建其他的实现,读者可以自行研究。

master match {
  case "local" =>
    val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true)
    val backend = new LocalBackend(sc.getConf, scheduler, 1)
    scheduler.initialize(backend)
    (backend, scheduler)

  case SPARK_REGEX(sparkUrl) =>
    val scheduler = new TaskSchedulerImpl(sc)
    val masterUrls = sparkUrl.split(",").map("spark://" + _)
    val backend = new SparkDeploySchedulerBackend(scheduler, sc, masterUrls)
    scheduler.initialize(backend)
    (backend, scheduler)

  case mesosUrl @ MESOS_REGEX(_) =>
    MesosNativeLibrary.load()
    val scheduler = new TaskSchedulerImpl(sc)
    val coarseGrained = sc.conf.getBoolean("spark.mesos.coarse", false)
    val url = mesosUrl.stripPrefix("mesos://") // strip scheme from raw Mesos URLs
    val backend = if (coarseGrained) {
      new CoarseMesosSchedulerBackend(scheduler, sc, url, sc.env.securityManager)
    } else {
      new MesosSchedulerBackend(scheduler, sc, url)
    }
    scheduler.initialize(backend)
    (backend, scheduler)

  ....
}

此时细心的读者就会有疑问了, TaskScheduler 需要将任务调度在不同的资源管理平台上(local, spark, mesos),怎么就能使用同一个 TaskSchedulerImpl 呢?注意这里有个很重要的成员 backend。每种 master 对应的 backend 都不一样,而正是这个 backend 负责与资源管理平台通信。

因为这个层面的调度,需要跟资源管理器通信了,所以也会部分的涉及到 deploy 模块和 executor 模块的内容。因为 Local 模式的过于简单(本地启动多线程处理 task),而 YARN 和 Mesos 需要编程接口相关的背景知识,这里我们选择 SparkDeploySchedulerBackend 着重分析。这是 Spark 自身实现的资源管理系统,有些读者可能已经搭建和使用过。

TaskSchedulerImpl 的启动

在 SparkContext 中(上面代码),首先创建了 TaskSchedulerImpl 和 SparkDeploySchedulerBackend,并将 backend 传入 TaskSchedulerImpl。之后启动了 TaskSchedulerImpl。

// Create and start the scheduler
val (sched, ts) = SparkContext.createTaskScheduler(this, master)
_schedulerBackend = sched
_taskScheduler = ts
_dagScheduler = new DAGScheduler(this)
_heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)

// start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler‘s
// constructor
_taskScheduler.start()

TaskSchedulerImpl.start 方法主要是调用了 backend.start()。SparkDeploySchedulerBackend 的启动首先调用父类 CoarseGrainedSchedulerBackend 的 start 方法,其中创建了一个 driverEndpoint,它是一个本地的 driver,以 RPC 的方式与其他 executor 通信。

// CoarseGrainedSchedulerBackend.scalaoverride def start() {
  driverEndpoint = rpcEnv.setupEndpoint(
      CoarseGrainedSchedulerBackend.ENDPOINT_NAME, new DriverEndpoint(rpcEnv, properties))
}
// SparkDeploySchedulerBackend.scala
override def start() {
  super.start()

  // The endpoint for executors to talk to us
  val driverUrl = rpcEnv.uriOf(SparkEnv.driverActorSystemName,
      RpcAddress(sc.conf.get("spark.driver.host"), sc.conf.get("spark.driver.port").toInt),
      CoarseGrainedSchedulerBackend.ENDPOINT_NAME)
  val args = Seq(
    "--driver-url", driverUrl,
    "--executor-id", "{{EXECUTOR_ID}}",
    "--hostname", "{{HOSTNAME}}",
    "--cores", "{{CORES}}",
    "--app-id", "{{APP_ID}}",
    "--worker-url", "{{WORKER_URL}}")
  ....
  val command = Command("org.apache.spark.executor.CoarseGrainedExecutorBackend",
      args, sc.executorEnvs, classPathEntries ++ testingClassPath, libraryPathEntries, javaOpts)
  val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory,
      command, appUIAddress, sc.eventLogDir, sc.eventLogCodec, coresPerExecutor)
  client = new AppClient(sc.env.rpcEnv, masters, appDesc, this, conf)
  client.start()
  waitForRegistration()
}

这里的创建了一个 client:AppClient,它会连接到 masters(spark://master:7077) 上,附带一条命令 command,用来启动 executor,改命令还有 driver-url 的参数。如此 executor 启动时就能自动连接上 driver。

至此,TaskSchedulerImpl 和 SparkDeploySchedulerBackend 的启动过程已经完成。主要做了两件事情,启动 local driver,通知 masters 启动 executors。并且 driver 和 executors 使用了 RPC 通信。

注:至于 executor 如何启动,等待分析 deploy 和 executor 模块的时候再仔细分析。

TaskSchedulerImpl 提交任务

在上一篇中,我们说到 DAGScheduler 最后调用了 taskScheduler.submitTasks 提交任务。下面继续上篇的分析:

override def submitTasks(taskSet: TaskSet) {
  val tasks = taskSet.tasks
  this.synchronized {
    val manager = createTaskSetManager(taskSet, maxTaskFailures)
    val stage = taskSet.stageId
    val stageTaskSets =
      taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager])
    stageTaskSets(taskSet.stageAttemptId) = manager
    schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties) // 将任务加入到变量 rootPool 中
  }
  backend.reviveOffers()
}

这里将 TaskSet 再包装成 TaskSetManager,加入到 schedulableBuilder 中。顺便提一下,schedulableBuilder 是 Spark 的调度策略实现,有 FIFO 和 FAIR 两种,默认的是 FIFO。它们最终都是把 TaskSetManager 放到了 rootPool 中。

然后调用了 backend.reviveOffers,这里有个较转折的调用关系。因为 SparkDeploySchedulerBackend 没有方法 reviveOffers,所以是调用了其父类 CoarseGrainedSchedulerBackend 的同名方法。而 CoarseGrainedSchedulerBackend.reviveOffers 实现中只有一行,即

override def reviveOffers() {
  driverEndpoint.send(ReviveOffers)
}

而 DriverEndpoint 接收到 ReviceOffers,调用了 makeOffers 方法:

private def makeOffers() {
  // Filter out executors under killing
  val activeExecutors = executorDataMap.filterKeys(!executorsPendingToRemove.contains(_))
  val workOffers = activeExecutors.map { case (id, executorData) =>
        new WorkerOffer(id, executorData.executorHost, executorData.freeCores)
  }.toSeq
  launchTasks(scheduler.resourceOffers(workOffers))
}

整个调用关系为 TaskScheduler.submitTasks() -> CoarseGrainedSchedulerBackend.reviveOffers() -> RpcEndpointRef.send(ReviveOffers) ->DriverEndpoint.ReviceOffers -> DriverEndpoint.makeOffers

这里调用 TaskSchedulerImpl 的方法 resourceOffers,该方法给任务分配计算资源。接着调用了 CoarseGrainedSchedulerBackend.launchTasks,真正向 executor 发送计算任务。

// Launch tasks returned by a set of resource offers
private def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
  for (task <- tasks.flatten) {
    val serializedTask = ser.serialize(task) // 序列化 task
    if (serializedTask.limit >= akkaFrameSize - AkkaUtils.reservedSizeBytes) {
      // 序列化结果超过上限,报警
    }
    else {
      val executorData = executorDataMap(task.executorId) // 选择一个 executor 执行 task
      executorData.freeCores -= scheduler.CPUS_PER_TASK // 标记 executor 的 CPU 资源被占用了一部分
      executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask))) // 向 executor 的 RPC 服务器发送执行 task 信息
    }
  }
}

上文提到,TaskSchedulerImpl 启动时,masters 也启动了 executor,具体的启动方法是 org.apache.spark.executor.CoarseGrainedExecutorBackend。所以 executor 接受消息的方法也在该类中:

// org.apache.spark.executor.CoarseGrainedExecutorBackend
override def receive: PartialFunction[Any, Unit] = {
  case LaunchTask(data) =>
    if (executor == null) {
      logError("Received LaunchTask command but executor was null")
      System.exit(1)
    } else {
      val taskDesc = ser.deserialize[TaskDescription](data.value) // 发序列化
      logInfo("Got assigned task " + taskDesc.taskId)
      executor.launchTask(this, taskId = taskDesc.taskId, attemptNumber = taskDesc.attemptNumber,
        taskDesc.name, taskDesc.serializedTask)
    }
  ...
}

executor 执行 task:

// org.apache.spark.executor.Executor
def launchTask(
    context: ExecutorBackend,
    taskId: Long,
    attemptNumber: Int,
    taskName: String,
    serializedTask: ByteBuffer): Unit = {
  val tr = new TaskRunner(context, taskId = taskId, attemptNumber = attemptNumber, taskName, serializedTask) // 创建线程执行 task
  runningTasks.put(taskId, tr)
  threadPool.execute(tr)
}

TaskRunner 使用 ClassLoader 中从字节中加载 Task,并执行得到结果,把结果序列化使用 RPC 返回。

// org.apache.spark.executor.Executor.TaskRunner
override def run(): Unit = {
  execBackend.statusUpdate(taskId, TaskState.RUNNING, EMPTY_BYTE_BUFFER) // executor 通知 driver,正在执行 task
  try {
    val (taskFiles, taskJars, taskBytes) = Task.deserializeWithDependencies(serializedTask) // 反序列化出依赖文件,jar包,任务自身
    updateDependencies(taskFiles, taskJars)
    task = ser.deserialize[Task[Any]](taskBytes, Thread.currentThread.getContextClassLoader)
    task.setTaskMemoryManager(taskMemoryManager)

    val (value, accumUpdates) = try {
      val res = task.run(taskAttemptId = taskId, attemptNumber = attemptNumber, metricsSystem = env.metricsSystem) // 执行 task
      res
    } finally {
      ...
    }

    val resultSer = env.serializer.newInstance()
    val valueBytes = resultSer.serialize(value) // 序列化结果

    execBackend.statusUpdate(taskId, TaskState.FINISHED, serializedResult) // 以 RPC 的方式返回结果给 driver
} catch {
  ...
}

至此 task 在 TaskSchedulerImpl 内运行的流程有了一个大致介绍。其中略过了很多分支,但不影响读者对整体流程的理解。

总结

Spark Scheduler 模块上下两篇对 Spark 的调度逻辑按照执行的顺序有了一个概括的介绍。

Scheduler 模块的代码架构充分体现了分层和隔离的设计哲学。首先 DAGScheduler 是 Spark 独有的逻辑,而 TaskScheduler 则因资源调度器而各不相同,所以把调度部分割裂成这两部分,前者只需一种实现,而后者可以在不同平台各自实现。即便是 TaskScheduler,在多种平台上也有共性,所以 TaskSchedulerImpl 也是一个较通用的实现,只是与资源调度器的通信部分使用了不同的 backend。

时间: 2024-10-14 00:54:34

Spark Scheduler 模块(下)的相关文章

Spark(五十二):Spark Scheduler模块之DAGScheduler流程

导入 从一个Job运行过程中来看DAGScheduler是运行在Driver端的,其工作流程如下图: 图中涉及到的词汇概念: 1. RDD——Resillient Distributed Dataset 弹性分布式数据集. 2. Operation——作用于RDD的各种操作分为transformation和action. 3. Job——作业,一个JOB包含多个RDD及作用于相应RDD上的各种operation. 4. Stage——一个作业分为多个阶段. 5. Partition——数据分区,

Spark Scheduler 模块(上)

在阅读 Spark 源代码的过程中,发现单步调试并不能很好的帮助理解程序.这样的多线程的分布式系统,更好的阅读源代码的方式是依据模块,分别理解. 在包 org.apache.spark 下面有很多下一级的包,如 deploy, storage, shuffle, scheduler 等.这就是一个个系统模块,本文主要介绍 scheduler 模块. 博客http://jerryshao.me/architecture/2013/04/21/Spark%E6%BA%90%E7%A0%81%E5%8

Spark Deploy 模块

Spark Scheduler 模块的文章中,介绍到 Spark 将底层的资源管理和上层的任务调度分离开来,一般而言,底层的资源管理会使用第三方的平台,如 YARN 和 Mesos.为了方便用户测试和使用,Spark 也单独实现了一个简单的资源管理平台,也就是本文介绍的 Deploy 模块. 一些有经验的读者已经使用过该功能. Spark RPC 的实现 细心的读者在阅读 Scheduler 相关代码时,已经注意到很多地方使用了 RPC 的方式通讯,比如 driver 和 executor 之间

HP C7000 Virtual Connect FlexFabric模块下Windows2008R2网络连接查看与配置

Windows2008R2网络属性查看与配置实例 前提: 1:C7000互联模块Bay1和Bay2配置FlexFabric模块 2:Enclosure Server Bay1:插有一个HP BL460g8服务器(LOM卡支持FlexFabric功能) 3:在VC中配置Server Profile赋予Bay1,配置内容包括:划分2个FlexNIC和2个FlexHBA. 具体步骤: 当我们在VC-FlexFabric中配置Ethernet Network和FCoE SAN Fabric后,将新创建的

Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGSchedul

在写Spark程序是遇到问题 Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.orgapacheapachesparkschedulerschedulerDAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1283) 这个原因是因为数据过大,而中断(我的天,坑死我了,只有一万条数据啊) 原文地址:https://www.cnblogs.com/chen

re模块下的的常用方法

引入模块: import re 1.查找findall   匹配所有,每一项都是列表中的一个元素 ret=re.findall("\d+","sjkhk172按实际花费928") print(ret)['172', '928'] search  只匹配从左到右的第一个,得到的不是结果,而是一个变量,通过这个变量的group方法来获取结果. ret=re.search("\d+","sjkhk172按实际花费928") prin

Spark源码分析之-scheduler模块

RDD的依赖关系和Stage的分类 在Spark中,每一个RDD是对于数据集在某一状态下的表现形式,而这个状态有可能是从前一状态转换而来的,因此换句话说这一个RDD有可能与之前的RDD(s)有依赖关系.根据依赖关系的不同,可以将RDD分成两种不同的类型:Narrow Dependency和Wide Dependency. Narrow Dependency指的是 child RDD只依赖于parent RDD(s)固定数量的partition. Wide Dependency指的是child R

Hadoop + Spark 在CentOS下的伪分布式部署

一. 软件 centos6.5 jdk1.7 hadoop-2.6.1.tar.gz(在64位平台重新编译好的版本) scala2.11.7.tgz spark-1.5.0-bin-hadoop2.6.tgz   二. 安装前准备 1. 在系统全局安装jdk a. 解压 b. 配置环境变量(可以在/etc/profile.d/下面配置) export JAVA_HOME=/usr/java/jdk1.7.0_21export CLASSPATH=.:$JAVA_HOME/lib:$CLASSPA

【Spark 内核】 Spark 内核解析-下

Spark内核泛指Spark的核心运行机制,包括Spark核心组件的运行机制.Spark任务调度机制.Spark内存管理机制.Spark核心功能的运行原理等,熟练掌握Spark内核原理,能够帮助我们更好地完成Spark代码设计,并能够帮助我们准确锁定项目运行过程中出现的问题的症结所在. Spark Shuffle 解析 Shuffle 的核心要点 ShuffleMapStage与ResultStage 在划分stage时,最后一个stage称为finalStage,它本质上是一个ResultSt