Spark on Yarn: Cluster模式Scheduler实现

  • 背景
  • 主体逻辑
  • 具体实现
    • AM
    • YarnAllocator
    • Executor

背景

Spark on Yarn分yarn-cluster和yarn-client两种模式。

本文通过Cluster模式的TaskScheduler实现入手,梳理一遍spark on yarn的大致实现逻辑。

前提我对两种模式以及yarn任务的整体运行逻辑不是很清楚。

主体逻辑

cluster模式中,使用的TaskScheduler是YarnClusterScheduler

它继承了默认使用的TaskSchedulerImpl类,额外在postStartHook方法里,唤醒了ApplicationMaster类的设置sparkcontext的方法。

ApplicationMaster相当于是spark在yarn上的AM,内部的YarnRMClient类,负责向RM注册和注销AM,以及拿到attemptId。注册AM之后,得到一个可以申请/释放资源的YarnAllocationHandler类,从而可以维护container与executor之间的关系。

下节具体介绍几个主要类的实现逻辑。

具体实现

AM

ApplicationMaster,通过YarnRMClient来完成自己的注册和注销。

AM的启动方式

/**
 * This object does not provide any special functionality. It exists so that it‘s easy to tell
 * apart the client-mode AM from the cluster-mode AM when using tools such as ps or jps.
 */
object ExecutorLauncher {

  def main(args: Array[String]) = {
    ApplicationMaster.main(args)
  }

}

main里面调用AM的run方法:

  def main(args: Array[String]) = {
    SignalLogger.register(log)
    val amArgs = new ApplicationMasterArguments(args)
    SparkHadoopUtil.get.runAsSparkUser { () =>
      master = new ApplicationMaster(amArgs, new YarnRMClientImpl(amArgs))
      System.exit(master.run())
    }
  }

如果AM的启动参数里有用户自己定义的类,则是Driver模式,即cluster模式。

run方法里

1. 如果不是Driver模式,执行runExecutorLauncher逻辑:

启动后,执行registerAM,里面new了YarnAllocator的实现,调用allocateResources,申请并执行container。同时,启动一个reporter线程,每隔一段时间调用YarnAllocator的allocateResources方法,或汇报有太多executor fail了。

2. 如果是Driver模式,执行runDriver逻辑:

也是执行registerAM,但是之前需要反射执行jar包里用户定义的driver类。

YarnAllocator

YarnAllocator负责向yarn申请和释放containers,维护containe、executor相关关系,有一个线程池。申请到container之后,在container里执行ExecutorRunnable。需要子类实现的是申请和释放这两个方法:

protected def allocateContainers(count: Int, pending: Int): YarnAllocateResponse
protected def releaseContainer(container: Container): Unit

YarnAllocationHandler继承了YarnAllocator。

  1. allocateContainers方法:

    Yarn api里提供ResourceRequest这个类,里面包含了一个app向RM索要不同container的信息,包括机器名/机架名,cpu和mem资源数,container数,优先级,locality是否放松。然后组成AllocateRequest类,代表AM向RM从集群里获得resource。调用ApplicationMasterProtocal的allocate(AllocateRequest),由AM**向RM发起资源请求**。

  2. releaseContainer方法:

    每次把需要release的container记录下来。在每次allocateContainers调用的时候,

    会往AllocateRequest里addAllReleases(releasedContainerList),在请求资源的时候顺便把历史资源释放掉。

ExecutorRunnable与Yarn的关系:

1. 向ContainerManager建立连接,让cm来startContainer。

2. ContainerLaunchContext包含了yarn的NodeManager启动一个container需要的所有信息。ExecutorRunnable会构建这个container申请信息。

可以参考这段启动逻辑:

def startContainer = {
    logInfo("Setting up ContainerLaunchContext")

    val ctx = Records.newRecord(classOf[ContainerLaunchContext])
      .asInstanceOf[ContainerLaunchContext]

    ctx.setContainerId(container.getId())
    ctx.setResource(container.getResource())
    val localResources = prepareLocalResources
    ctx.setLocalResources(localResources)

    val env = prepareEnvironment
    ctx.setEnvironment(env)

    ctx.setUser(UserGroupInformation.getCurrentUser().getShortUserName())

    val credentials = UserGroupInformation.getCurrentUser().getCredentials()
    val dob = new DataOutputBuffer()
    credentials.writeTokenStorageToStream(dob)
    ctx.setContainerTokens(ByteBuffer.wrap(dob.getData()))

    val commands = prepareCommand(masterAddress, slaveId, hostname, executorMemory, executorCores,
      appAttemptId, localResources)
    logInfo("Setting up executor with commands: " + commands)
    ctx.setCommands(commands)

    ctx.setApplicationACLs(YarnSparkHadoopUtil.getApplicationAclsForYarn(securityMgr))

    // If external shuffle service is enabled, register with the Yarn shuffle service already
    // started on the NodeManager and, if authentication is enabled, provide it with our secret
    // key for fetching shuffle files later
    if (sparkConf.getBoolean("spark.shuffle.service.enabled", false)) {
      val secretString = securityMgr.getSecretKey()
      val secretBytes =
        if (secretString != null) {
          // This conversion must match how the YarnShuffleService decodes our secret
          JavaUtils.stringToBytes(secretString)
        } else {
          // Authentication is not enabled, so just provide dummy metadata
          ByteBuffer.allocate(0)
        }
      ctx.setServiceData(Map[String, ByteBuffer]("spark_shuffle" -> secretBytes))
    }

    // Send the start request to the ContainerManager
    val startReq = Records.newRecord(classOf[StartContainerRequest])
    .asInstanceOf[StartContainerRequest]
    startReq.setContainerLaunchContext(ctx)
    cm.startContainer(startReq)
  }

值得注意的是setServiceData方法,如果在node manager上启动了external shuffle service。Yarn的AuxiliaryService支持在NodeManager上启动辅助服务。spark有一个参数spark.shuffle.service.enabled来设置该服务是否被启用,我看的1.2.0版本里貌似没有服务的实现代码。

Executor

此外,从ExecutorRunnableUtil的prepareCommand方法可以得知,ExecutorRunnable通过命令行启动了CoarseGrainedExecutorBackend进程,与粗粒度的mesos模式和standalone模式一致,task最终落到CoarseGrainedExecutorBackend里面执行。

全文完:)

时间: 2024-10-13 22:33:09

Spark on Yarn: Cluster模式Scheduler实现的相关文章

Spark之Yarn提交模式

一.Client模式 提交命令: ./spark-submit --master yarn --class org.apache.examples.SparkPi ../lib/spark-examples-1.6.0-hadoop2.7.3.jar 1000 ./spark-submit --master yarn-client --class org.apache.examples.SparkPi ../lib/spark-examples-1.6.0-hadoop2.7.3.jar 100

理解Spark运行模式(二)(Yarn Cluster)

上一篇说到Spark的yarn client运行模式,它与yarn cluster模式的主要区别就是前者Driver是运行在客户端,后者Driver是运行在yarn集群中.yarn client模式一般用在交互式场景中,比如spark shell, spark sql等程序,但是该模式下运行在客户端的Driver与Yarn集群有大量的网络交互,如果客户端与集群之间的网络不是很好,可能会导致性能问题.因此一般在生产环境中,大部分还是采用yarn cluster模式运行spark程序. 下面具体还是

Spark on YARN两种运行模式介绍

本文出自:Spark on YARN两种运行模式介绍http://www.aboutyun.com/thread-12294-1-1.html(出处: about云开发)   问题导读 1.Spark在YARN中有几种模式? 2.Yarn Cluster模式,Driver程序在YARN中运行,应用的运行结果在什么地方可以查看? 3.由client向ResourceManager提交请求,并上传jar到HDFS上包含哪些步骤? 4.传递给app的参数应该通过什么来指定? 5.什么模式下最后将结果输

Spark On YARN内存分配

本文转自:http://blog.javachen.com/2015/06/09/memory-in-spark-on-yarn.html?utm_source=tuicool 此文解决了Spark yarn-cluster模式运行时,内存不足的问题. Spark yarn-cluster模式运行时,注意yarn.app.mapreduce.am.resource.mb的设置.默认为1G Spark On YARN内存分配 本文主要了解Spark On YARN部署模式下的内存分配情况,因为没有

Spark集群-Standalone 模式

Spark 集群相关 来源于官方, 可以理解为是官方译文, 外加一点自己的理解. 版本是2.4.4 本篇文章涉及到: 集群概述 master, worker, driver, executor的理解 打包提交,发布 Spark application standalone模式 SparkCluster 启动 及相关配置 资源, executor分配 开放网络端口 高可用(Zookeeper) 名词解释 Term(术语) Meaning(含义) Application 用户构建在 Spark 上的

Spark 在yarn上运行模式详解:cluster模式和client模式

1.    官方文档 http://spark.apache.org/docs/latest/running-on-yarn.html 2.    配置安装 2.1.安装hadoop:需要安装HDFS模块和YARN模块,HDFS必须安装,spark运行时要把jar包存放到HDFS上. 2.2.安装Spark:解压Spark安装程序到一台服务器上,修改spark-env.sh配置文件,spark程序将作为YARN的客户端用于提交任务 export JAVA_HOME=/usr/local/jdk1

第31课:Spark资源调度分配内幕天机彻底解密:Driver在Cluster模式下的启动、两种不同的资源调度方式源码彻底解析、资源调度内幕总结

本課主題 Master 资源调度的源码鉴赏 Spark 的 Worker 是基于什么逻辑去启动 Executor 资源调度管理 任務調度與資源是通過 DAGScheduler.TaskScheduler.SchedulerBackend 等進行的作業調度 資源調度是指應用程序如何獲得資源 任務調度是在資源調度的基礎上進行的,沒有資源調度那麼任務調度就成為了無源之水無本之木 Master 资源调度的源码鉴赏 因為 Master 負責資源管理和調度,所以資源調度方法 scheduer 位於 Mast

Apache Spark源码走读之19 -- standalone cluster模式下资源的申请与释放

欢迎转载,转载请注明出处,徽沪一郎. 概要 本文主要讲述在standalone cluster部署模式下,Spark Application在整个运行期间,资源(主要是cpu core和内存)的申请与释放. 构成Standalone cluster部署模式的四大组成部件如下图所示,分别为Master, worker, executor和driver,它们各自运行于独立的JVM进程. 从资源管理的角度来说 Master  掌管整个cluster的资源,主要是指cpu core和memory,但Ma

为什么我们生产上要选择Spark On Yarn模式?

为什么我们生产上要选择Spark On Yarn? 开发上我们选择local[2]模式生产上跑任务Job,我们选择Spark On Yarn模式 , 将Spark Application部署到yarn中,有如下优点: 1.部署Application和服务更加方便 只需要yarn服务,包括Spark,Storm在内的多种应用程序不要要自带服务,它们经由客户端提交后,由yarn提供的分布式缓存机制分发到各个计算节点上. 2.资源隔离机制 yarn只负责资源的管理和调度,完全由用户和自己决定在yarn