【Spark01】SparkSubmit兼谈Spark集群管理和部署模式

关于Cluster Manager和Deploy Mode的组合在SparkSubmit.scala的createLaunchEnv中有比较详细的逻辑。

Cluster Manager基本上有Standalone,YARN和Mesos三种情况,说明Cluster Manager用来指明集群的资源管理器。这就是说不管是Client还是Cluster部署方式(deployMode的两种可能),都会使用它们做集 群管理器,也就是说Client也是一种集群部署方式???

  /**
   * @return a tuple containing
   *           (1) the arguments for the child process,
   *           (2) a list of classpath entries for the child,
   *           (3) a list of system properties and env vars, and
   *           (4) the main class for the child
   */
//createLaunchEnv的方法返回值
//1.子进程的参数,字符串数组,ArrayBuffer[String]
//2.子进程JVM的classpath路径列表,字符串数组,ArrayBuffer[String]
//3.子进程的系统变量和环境变量 ,HashMap类型
//4.子进程JVM的main class
private[spark] def createLaunchEnv(args: SparkSubmitArguments)
      : (ArrayBuffer[String], ArrayBuffer[String], Map[String, String], String) = {

    // Values to return
    val childArgs = new ArrayBuffer[String]()
    val childClasspath = new ArrayBuffer[String]()
    val sysProps = new HashMap[String, String]()
    var childMainClass = ""

    // Set the cluster manager
    //集群管理器,这里指定了四种:YARN,STANDALONE,MESON和LOCAL
    //需要注意的是,为什么LOCAL也是一种集群管理器,它的集群含义是什么?
    //根据args.master参数值决定clusterManager,注意,区分大小写
    //这里只检查master是否以yarn, spark, mesos或者local开头,实际中,以yarn开头的master值可能是yarn-client,yarn-cluster,yarn-standalone,所以代码后面对master做了更进一步的检查
    val clusterManager: Int = args.master match {
      case m if m.startsWith("yarn") => YARN
      case m if m.startsWith("spark") => STANDALONE
      case m if m.startsWith("mesos") => MESOS
      case m if m.startsWith("local") => LOCAL
      //如果master不以这四个开头,提示出错信息是***Master***必须以yarn, spark, mesos, or local开头
      case _ => printErrorAndExit("Master must start with yarn, spark, mesos, or local"); -1
    }

    // Set the deploy mode; default is client mode
    //设置部署模式,有两种模式,client和cluster模式
    //如果没有设置deployMode(取值null),则默认认为是client模式
    var deployMode: Int = args.deployMode match {
      case "client" | null => CLIENT
      case "cluster" => CLUSTER
      case _ => printErrorAndExit("Deploy mode must be either client or cluster"); -1
    }

    // Because "yarn-cluster" and "yarn-client" encapsulate both the master
    // and deploy mode, we have some logic to infer the master and deploy mode
    // from each other if only one is specified, or exit early if they are at odds.

    ///因为yarn-cluster和yarn-client封装了master和deployMode, 这里对yarn-cluster和yarn-client两种集群管理器和部署模式的组合进行了特殊处理
    ///只要知道一个就可以推倒出另一个we have some logic to infer the master and deploy mode from each other
    //如果args.master以yarn开头(导致clusterManager == YARN为true)
    if (clusterManager == YARN) { ///如果clusterManager是YARN,那么说明master是以yarn开头
      if (args.master == "yarn-standalone") { //如果master是yarn-standalone,则提示说master的值yarn-standalone已经不推荐使用,
        printWarning("\"yarn-standalone\" is deprecated. Use \"yarn-cluster\" instead.")
        args.master = "yarn-cluster" ///将master值改为yarn-cluster
      }
      (args.master, args.deployMode) match { ///对args.master和args.deployMode的组合进行检查和整理
        case ("yarn-cluster", null) => ///如果args.deployMode为null,那么在前面的逻辑中,deployMode的值是CLIENT,
          deployMode = CLUSTER //根据args.master的取值yarn-cluster,将deployMode的值改为CLUSTER
        case ("yarn-cluster", "client") =>  ////如果args.master, args.deployMode是yarn-cluster,client,那么认为这两个值是冲突的,退出
          printErrorAndExit("Client deploy mode is not compatible with master \"yarn-cluster\"")
        case ("yarn-client", "cluster") =>  ////如果args.master, args.deployMode是yarn-client,cluster,那么认为这两个值是冲突的,退出
          printErrorAndExit("Cluster deploy mode is not compatible with master \"yarn-client\"")
        case (_, mode) =>
          ///其他情况下,将args.master改为如下值(代码执行到这里的前提是args.master以yarn开头)
          ///通过前面的代码,mode只有三种可能的取值:null,cluster和client
          //如果mode是client,那么args.master是yarn-client
          //如果mode是null,那么args.master是yarn-client
          //如果mode是cluster,args.master只能是yarn-cluster,因为(yarn-client,cluster)情况在前面判断过了
          args.master = "yarn-" + Option(mode).getOrElse("client")
      }

      // Make sure YARN is included in our build if we‘re trying to use it
      //在YARN的集群模式下检查Spark安装包的完整性,为什么一定要检查org.apache.spark.deploy.yarn.Client类的存在?因为后面的代码中会通过反射调用Client的main方法
      //注意,Client是的包名中有个yarn,说明这个Client用于跟YARN集群模式有关
      if (!Utils.classIsLoadable("org.apache.spark.deploy.yarn.Client") && !Utils.isTesting) {
        printErrorAndExit(
          "Could not load YARN classes. " +
          "This copy of Spark may not have been compiled with YARN support.")
      }
    }

    // The following modes are not supported or applicable
    ///对clusterManager和deployMode不支持的组合进行检查
    (clusterManager, deployMode) match {
      case (MESOS, CLUSTER) =>///MESOS和CLUSTER不共存
        printErrorAndExit("Cluster deploy mode is currently not supported for Mesos clusters.")
      case (_, CLUSTER) if args.isPython =>///Python应用程序不支持集群部署模式
        printErrorAndExit("Cluster deploy mode is currently not supported for python applications.")
      case (_, CLUSTER) if isShell(args.primaryResource) => ///Spark Shell不支持集群部署方式?通过Spark Shell提交Application是一种Client方式???
        printErrorAndExit("Cluster deploy mode is not applicable to Spark shells.")
      case _ =>
    }

    // If we‘re running a python app, set the main class to our specific python runner
    if (args.isPython) { ///如果python应用程序,我表示不关心,忽略之
      if (args.primaryResource == PYSPARK_SHELL) {
        args.mainClass = "py4j.GatewayServer"
        args.childArgs = ArrayBuffer("--die-on-broken-pipe", "0")
      } else {
        // If a python file is provided, add it to the child arguments and list of files to deploy.
        // Usage: PythonAppRunner <main python file> <extra python files> [app arguments]
        args.mainClass = "org.apache.spark.deploy.PythonRunner"
        args.childArgs = ArrayBuffer(args.primaryResource, args.pyFiles) ++ args.childArgs
        args.files = mergeFileLists(args.files, args.primaryResource)
      }
      args.files = mergeFileLists(args.files, args.pyFiles)
      if (args.pyFiles != null) {
        sysProps("spark.submit.pyFiles") = args.pyFiles
      }
    }

    // Special flag to avoid deprecation warnings at the client
    sysProps("SPARK_SUBMIT") = "true" ///SPARK_SUBMIT这个参数用来做什么的?注意sysProps是个Scala的HashMap

    // A list of rules to map each argument to system properties or command-line options in
    // each deploy mode; we iterate through these below
    ///这是在做什么??一系列规则(用于在各种部署模式下,把每个argument映射为system properties或者command-line options的规则)
    val options = List[OptionAssigner](

      // All cluster managers
      ///sysProp是指定参数的函数参数传值,此处没有给clOption赋值
      OptionAssigner(args.master, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, sysProp = "spark.master"),
      OptionAssigner(args.name, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, sysProp = "spark.app.name"),

      ///args.jars指定了提交的Application的jars相关的东西
      OptionAssigner(args.jars, ALL_CLUSTER_MGRS, CLIENT, sysProp = "spark.jars"),
      OptionAssigner(args.driverMemory, ALL_CLUSTER_MGRS, CLIENT,
        sysProp = "spark.driver.memory"),
      OptionAssigner(args.driverExtraClassPath, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES,
        sysProp = "spark.driver.extraClassPath"),
      OptionAssigner(args.driverExtraJavaOptions, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES,
        sysProp = "spark.driver.extraJavaOptions"),
      OptionAssigner(args.driverExtraLibraryPath, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES,
        sysProp = "spark.driver.extraLibraryPath"),

      // Standalone cluster only
      ///只适用于Standalone集群,属性无需以spark开头,因为都在spark中
      OptionAssigner(args.jars, STANDALONE, CLUSTER, sysProp = "spark.jars"),
      OptionAssigner(args.driverMemory, STANDALONE, CLUSTER, clOption = "--memory"), //命令行赋值
      OptionAssigner(args.driverCores, STANDALONE, CLUSTER, clOption = "--cores"),

      // Yarn client only
      //只适用于Yarn client,因为由Yarn进行管理,属性名都带有spark
      OptionAssigner(args.queue, YARN, CLIENT, sysProp = "spark.yarn.queue"),
      OptionAssigner(args.numExecutors, YARN, CLIENT, sysProp = "spark.executor.instances"),
      OptionAssigner(args.executorCores, YARN, CLIENT, sysProp = "spark.executor.cores"),
      OptionAssigner(args.files, YARN, CLIENT, sysProp = "spark.yarn.dist.files"),
      OptionAssigner(args.archives, YARN, CLIENT, sysProp = "spark.yarn.dist.archives"),

      // Yarn cluster only
      ///只适用于Yarn cluster,为命令行选项赋值
      OptionAssigner(args.name, YARN, CLUSTER, clOption = "--name"),
      OptionAssigner(args.driverMemory, YARN, CLUSTER, clOption = "--driver-memory"),
      OptionAssigner(args.queue, YARN, CLUSTER, clOption = "--queue"),
      OptionAssigner(args.numExecutors, YARN, CLUSTER, clOption = "--num-executors"),
      OptionAssigner(args.executorMemory, YARN, CLUSTER, clOption = "--executor-memory"),
      OptionAssigner(args.executorCores, YARN, CLUSTER, clOption = "--executor-cores"),
      OptionAssigner(args.files, YARN, CLUSTER, clOption = "--files"),
      OptionAssigner(args.archives, YARN, CLUSTER, clOption = "--archives"),
      OptionAssigner(args.jars, YARN, CLUSTER, clOption = "--addJars"),

      // Other options
      OptionAssigner(args.executorMemory, STANDALONE | MESOS | YARN, ALL_DEPLOY_MODES,
        sysProp = "spark.executor.memory"),
      OptionAssigner(args.totalExecutorCores, STANDALONE | MESOS, ALL_DEPLOY_MODES,
        sysProp = "spark.cores.max"),
      OptionAssigner(args.files, LOCAL | STANDALONE | MESOS, ALL_DEPLOY_MODES,
        sysProp = "spark.files")
    )

    // In client mode, launch the application main class directly
    // In addition, add the main application jar and any added jars (if any) to the classpath
    if (deployMode == CLIENT) {
      childMainClass = args.mainClass
      if (isUserJar(args.primaryResource)) {
        childClasspath += args.primaryResource
      }
      if (args.jars != null) { childClasspath ++= args.jars.split(",") }
      if (args.childArgs != null) { childArgs ++= args.childArgs }
    }

    // Map all arguments to command-line options or system properties for our chosen mode
    for (opt <- options) {
      if (opt.value != null &&
          (deployMode & opt.deployMode) != 0 &&
          (clusterManager & opt.clusterManager) != 0) {
        if (opt.clOption != null) { childArgs += (opt.clOption, opt.value) }
        if (opt.sysProp != null) { sysProps.put(opt.sysProp, opt.value) }
      }
    }

    // Add the application jar automatically so the user doesn‘t have to call sc.addJar
    // For YARN cluster mode, the jar is already distributed on each node as "app.jar"
    // For python files, the primary resource is already distributed as a regular file
    val isYarnCluster = clusterManager == YARN && deployMode == CLUSTER
    if (!isYarnCluster && !args.isPython) {
      var jars = sysProps.get("spark.jars").map(x => x.split(",").toSeq).getOrElse(Seq.empty)
      if (isUserJar(args.primaryResource)) {
        jars = jars ++ Seq(args.primaryResource)
      }
      sysProps.put("spark.jars", jars.mkString(","))
    }

    // In standalone-cluster mode, use Client as a wrapper around the user class
    if (clusterManager == STANDALONE && deployMode == CLUSTER) {
      childMainClass = "org.apache.spark.deploy.Client"
      if (args.supervise) {
        childArgs += "--supervise"
      }
      childArgs += "launch"
      childArgs += (args.master, args.primaryResource, args.mainClass)
      if (args.childArgs != null) {
        childArgs ++= args.childArgs
      }
    }

    // In yarn-cluster mode, use yarn.Client as a wrapper around the user class
    if (isYarnCluster) {
      childMainClass = "org.apache.spark.deploy.yarn.Client"
      if (args.primaryResource != SPARK_INTERNAL) {
        childArgs += ("--jar", args.primaryResource)
      }
      childArgs += ("--class", args.mainClass)
      if (args.childArgs != null) {
        args.childArgs.foreach { arg => childArgs += ("--arg", arg) }
      }
    }

    // Load any properties specified through --conf and the default properties file
    for ((k, v) <- args.sparkProperties) {
      sysProps.getOrElseUpdate(k, v)
    }

    // Ignore invalid spark.driver.host in cluster modes.
    if (deployMode == CLUSTER) {
      sysProps -= ("spark.driver.host")
    }

    // Resolve paths in certain spark properties
    val pathConfigs = Seq(
      "spark.jars",
      "spark.files",
      "spark.yarn.jar",
      "spark.yarn.dist.files",
      "spark.yarn.dist.archives")
    pathConfigs.foreach { config =>
      // Replace old URIs with resolved URIs, if they exist
      sysProps.get(config).foreach { oldValue =>
        sysProps(config) = Utils.resolveURIs(oldValue)
      }
    }

    // Resolve and format python file paths properly before adding them to the PYTHONPATH.
    // The resolving part is redundant in the case of --py-files, but necessary if the user
    // explicitly sets `spark.submit.pyFiles` in his/her default properties file.
    sysProps.get("spark.submit.pyFiles").foreach { pyFiles =>
      val resolvedPyFiles = Utils.resolveURIs(pyFiles)
      val formattedPyFiles = PythonRunner.formatPaths(resolvedPyFiles).mkString(",")
      sysProps("spark.submit.pyFiles") = formattedPyFiles
    }

    (childArgs, childClasspath, sysProps, childMainClass)
  }
时间: 2024-10-05 01:51:27

【Spark01】SparkSubmit兼谈Spark集群管理和部署模式的相关文章

Spark教程-构建Spark集群-配置Hadoop单机模式并运行Wordcount(1)

安装ssh Hadoop是采用ssh进行通信的,此时我们要设置密码为空,即不需要密码登陆,这样免去每次通信时都输入秘密,安装如下: 输入“Y”进行安装并等待自动安装完成. 安装ssh完成后启动服务 以下命令验证服务是否正常启动: 可以看到ssh正常启动: 设置免密码登录,生成私钥和公钥: 在/root/.ssh中生成两个文件:id_rsa和id_rsa.pub,id_rsa为私钥,id_rsa.pub为公钥,我们将公钥id_rsa.pub追加到 authorized_keys中,因为author

Spark集群管理器介绍

Spark可以运行在各种集群管理器上,并通过集群管理器访问集群中的其他机器.Spark主要有三种集群管理器,如果只是想让spark运行起来,可以采用spark自带的独立集群管理器,采用独立部署的模式:如果是想让Spark部署在其他集群上,各应用共享集群的话,可以采取两种集群管理器:Hadoop Yarn 或 Apache Mesos. 一.独立集群管理器 Spark独立集群管理器提供的在集群上运行应用的简单方法.要使用集群启动脚本,按照以下步骤执行即可:1.将编译好的Spark发送到集群的其他节

浅谈分布式集群管理的原理,看看集群究竟是做什么的

本文始发于个人公众号:TechFlow,原创不易,求个关注 今天是分布式专题的第11篇文章,我们一起来聊聊分布式集群资源管理. 在开始文章之前,我们先来问一个问题,为什么是国际上是亚马逊,国内是阿里这两家公司云计算搞得最好呢?这两家公司之间有一个巨大的共同点,就是它们都是电商公司.电商公司的特点很明显,就是流量不是固定的,往往会受到大促.节日的影响.像是国内的双十一和美国的黑色星期五就是典型的大促.在大促的时候的流量会是平常的十倍甚至更多,这么大的流量必须要有更多的机器去应对.但问题是如果去买这

Spark集群任务提交流程----2.1.0源码解析

Spark的应用程序是通过spark-submit提交到Spark集群上运行的,那么spark-submit到底提交了什么,集群是怎样调度运行的,下面一一详解. 0. spark-submit提交任务 0.1 启动脚本解析 分析spark-submit脚本源码可知最终该命令执行./bin/spark-class的Java类脚本,./bin/spark-class脚本启动的类是org.apache.spark.launcher.Main,在spark-submit模式下该类会启动SparkSubm

Spark的集群管理器

上篇文章谈到Driver节点和Executor节点,但是如果想要运行Driver节点和Executor节点,就不能不说spark的集群管理器.spark的集群管理器大致有三种,一种是自带的standalone独立集群管理器,一种是依赖于Hadoop的资源调度器YARN,还有一种就是Apache项目的Mesos集群管理器. Spark 依赖于集群管理器来启动Executor节点,有时候也会依赖集群管理器来启动Driver节点.集群管理器是 Spark 中的可插拔式组件. 在集群管理器中有着主节点(

Spark集群任务提交

1. 集群管理器 Spark当前支持三种集群管理方式 Standalone-Spark自带的一种集群管理方式,易于构建集群. Apache Mesos-通用的集群管理,可以在其上运行Hadoop MapReduce和一些服务应用. Hadoop YARN-Hadoop2中的资源管理器. Tip1: 在集群不是特别大,并且没有mapReduce和Spark同时运行的需求的情况下,用Standalone模式效率最高. Tip2: Spark可以在应用间(通过集群管理器)和应用中(如果一个SparkC

Spark集群模式&amp;Spark程序提交

Spark集群模式&Spark程序提交 1. 集群管理器 Spark当前支持三种集群管理方式 Standalone-Spark自带的一种集群管理方式,易于构建集群. Apache Mesos-通用的集群管理,可以在其上运行Hadoop MapReduce和一些服务应用. Hadoop YARN-Hadoop2中的资源管理器. Tip1: 在集群不是特别大,并且没有mapReduce和Spark同时运行的需求的情况下,用Standalone模式效率最高. Tip2: Spark可以在应用间(通过集

Spark学习笔记5:Spark集群架构

Spark的一大好处就是可以通过增加机器数量并使用集群模式运行,来扩展计算能力.Spark可以在各种各样的集群管理器(Hadoop YARN , Apache Mesos , 还有Spark自带的独立集群管理器)上运行,所以Spark应用既能够适应专用集群,又能用于共享的云计算环境. Spark运行时架构 Spark在分布式环境中的架构如下图: 在分布式环境下,Spark集群采用的是主/从结构.在Spark集群,驱动器节点负责中央协调,调度各个分布式工作节点.执行器节点是工作节点,作为独立的Ja

spark集群与spark HA高可用快速部署 spark研习第一季

1.spark 部署 标签: spark 0 apache spark项目架构 spark SQL -- spark streaming -- MLlib -- GraphX 0.1 hadoop快速搭建,主要利用hdfs存储框架 下载hadoop-2.6.0,解压,到etc/hadoop/目录下 0.2 快速配置文件 cat core-site.xml <configuration> <property> <name>fs.defaultFS</name>