Spark on K8S源码解析.md

Spark on K8S源码解析

sparkk8s

time: 2019-12-19

  • Spark on k8s源码解析

    • 1. Spark Submit

      • spark-submit.sh
      • spark-class.sh
      • SparkSubmit
        • 第一步,初始化spark应用配置
        • 第二步,执行spark应用

Spark on k8s源码解析

本文基于spark-3.0.0 preview源码,来分析spark作业基于K8S的提交过程.

spark on k8s的代码位置位于:

关于kubernetes目录由以下部分组成:

  1. $ tree kubernetes -L 1


  2. kubernetes 

  3. ├── core 

  4. ├── docker 

  5. └── integration-tests 


其中kubernetes中的core/src/main的代码目录如下:

  1. $ tree core/src/main/scala -L 7


  2. core/src/main/scala 

  3. └── org 

  4. └── apache 

  5. └── spark 

  6. ├── deploy 

  7. │   └── k8s 

  8. │   ├── Config.scala 

  9. │   ├── Constants.scala 

  10. │   ├── features 

  11. │   │   ├── BasicDriverFeatureStep.scala 

  12. │   │   ├── BasicExecutorFeatureStep.scala 

  13. │   │   ├── DriverCommandFeatureStep.scala 

  14. │   │   ├── DriverKubernetesCredentialsFeatureStep.scala 

  15. │   │   ├── DriverServiceFeatureStep.scala 

  16. │   │   ├── EnvSecretsFeatureStep.scala 

  17. │   │   ├── ExecutorKubernetesCredentialsFeatureStep.scala 

  18. │   │   ├── HadoopConfDriverFeatureStep.scala 

  19. │   │   ├── KerberosConfDriverFeatureStep.scala 

  20. │   │   ├── KubernetesFeatureConfigStep.scala 

  21. │   │   ├── LocalDirsFeatureStep.scala 

  22. │   │   ├── MountSecretsFeatureStep.scala 

  23. │   │   ├── MountVolumesFeatureStep.scala 

  24. │   │   └── PodTemplateConfigMapStep.scala 

  25. │   ├── KubernetesConf.scala 

  26. │   ├── KubernetesDriverSpec.scala 

  27. │   ├── KubernetesUtils.scala 

  28. │   ├── KubernetesVolumeSpec.scala 

  29. │   ├── KubernetesVolumeUtils.scala 

  30. │   ├── SparkKubernetesClientFactory.scala 

  31. │   ├── SparkPod.scala 

  32. │   └── submit 

  33. │   ├── K8sSubmitOps.scala 

  34. │   ├── KubernetesClientApplication.scala 

  35. │   ├── KubernetesDriverBuilder.scala 

  36. │   ├── LoggingPodStatusWatcher.scala 

  37. │   └── MainAppResource.scala 

  38. └── scheduler 

  39. └── cluster 

  40. └── k8s 

  41. ├── ExecutorPodsAllocator.scala 

  42. ├── ExecutorPodsLifecycleManager.scala 

  43. ├── ExecutorPodsPollingSnapshotSource.scala 

  44. ├── ExecutorPodsSnapshot.scala 

  45. ├── ExecutorPodsSnapshotsStoreImpl.scala 

  46. ├── ExecutorPodsSnapshotsStore.scala 

  47. ├── ExecutorPodStates.scala 

  48. ├── ExecutorPodsWatchSnapshotSource.scala 

  49. ├── KubernetesClusterManager.scala 

  50. ├── KubernetesClusterSchedulerBackend.scala 

  51. └── KubernetesExecutorBuilder.scala 


  52. 10 directories, 39 files 

而docker目录下面则是用来打包Spark镜像的Dockerfile:

  1. $ tree kubernetes/docker/src/main -L 5


  2. kubernetes/docker/src/main 

  3. └── dockerfiles 

  4. └── spark 

  5. ├── bindings 

  6. │   ├── python 

  7. │   │   └── Dockerfile 

  8. │   └── R 

  9. │   └── Dockerfile 

  10. ├── Dockerfile 

  11. └── entrypoint.sh 


  12. 5 directories, 4 files 


1. Spark Submit

首先我们提交一个spark-pi的例子作为开始:

  1. $ ./bin/spark-submit \


  2. --master k8s://https://<k8s-apiserver-host>:<k8s-apiserver-port> \ 

  3. --deploy-mode cluster \ 

  4. --name spark-pi \ 

  5. --class org.apache.spark.examples.SparkPi \ 

  6. --conf spark.executor.instances=5 \ 

  7. --conf spark.kubernetes.container.image=<spark-image> \ 

  8. local:///path/to/examples.jar 

spark-submit.sh

  1. if [ -z "${SPARK_HOME}" ]; then


  2. source "$(dirname "$0")"/find-spark-home 

  3. fi 


  4. # disable randomized hash for string in Python 3.3+ 

  5. export PYTHONHASHSEED=0 

  6. # 源码批注: 这里将spark-submit中的所有入参都传递给spark-class  

  7. exec "${SPARK_HOME}"/bin/spark-class org.apache.spark.deploy.SparkSubmit "[email protected]" 

spark-class.sh

这个脚本中核心功能见该代码的71-74行.

主要功能:

根据环境和spark-submit的入参去拼接

  1. java -Xmx128m *** -cp *** com.demo.Main ***.jar


而进入的Main就是org.apache.spark.deploy.SparkSubmit

  1. #!/usr/bin/env bash





  2. # Licensed to the Apache Software Foundation (ASF) under one or more 

  3. # contributor license agreements. See the NOTICE file distributed with 

  4. # this work for additional information regarding copyright ownership. 

  5. # The ASF licenses this file to You under the Apache License, Version 2.0 

  6. # (the "License"); you may not use this file except in compliance with 

  7. # the License. You may obtain a copy of the License at 



  8. # http://www.apache.org/licenses/LICENSE-2.0 



  9. # Unless required by applicable law or agreed to in writing, software 

  10. # distributed under the License is distributed on an "AS IS" BASIS, 

  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

  12. # See the License for the specific language governing permissions and 

  13. # limitations under the License. 




  14. if [ -z "${SPARK_HOME}" ]; then 

  15. source "$(dirname "$0")"/find-spark-home 

  16. fi 


  17. . "${SPARK_HOME}"/bin/load-spark-env.sh 


  18. # Find the java binary 

  19. if [ -n "${JAVA_HOME}" ]; then 

  20. RUNNER="${JAVA_HOME}/bin/java" 

  21. else 

  22. if [ "$(command -v java)" ]; then 

  23. RUNNER="java" 

  24. else 

  25. echo "JAVA_HOME is not set" >&2 

  26. exit 1 

  27. fi 

  28. fi 


  29. # Find Spark jars. 

  30. if [ -d "${SPARK_HOME}/jars" ]; then 

  31. SPARK_JARS_DIR="${SPARK_HOME}/jars" 

  32. else 

  33. SPARK_JARS_DIR="${SPARK_HOME}/assembly/target/scala-$SPARK_SCALA_VERSION/jars" 

  34. fi 


  35. if [ ! -d "$SPARK_JARS_DIR" ] && [ -z "$SPARK_TESTING$SPARK_SQL_TESTING" ]; then 

  36. echo "Failed to find Spark jars directory ($SPARK_JARS_DIR)." 1>&2 

  37. echo "You need to build Spark with the target \"package\" before running this program." 1>&2 

  38. exit 1 

  39. else 

  40. LAUNCH_CLASSPATH="$SPARK_JARS_DIR/*" 

  41. fi 


  42. # Add the launcher build dir to the classpath if requested. 

  43. if [ -n "$SPARK_PREPEND_CLASSES" ]; then 

  44. LAUNCH_CLASSPATH="${SPARK_HOME}/launcher/target/scala-$SPARK_SCALA_VERSION/classes:$LAUNCH_CLASSPATH" 

  45. fi 


  46. # For tests 

  47. if [[ -n "$SPARK_TESTING" ]]; then 

  48. unset YARN_CONF_DIR 

  49. unset HADOOP_CONF_DIR 

  50. fi 


  51. # The launcher library will print arguments separated by a NULL character, to allow arguments with 

  52. # characters that would be otherwise interpreted by the shell. Read that in a while loop, populating 

  53. # an array that will be used to exec the final command. 



  54. # The exit code of the launcher is appended to the output, so the parent shell removes it from the 

  55. # command array and checks the value to see if the launcher succeeded. 


  56. build_command() { 

  57. "$RUNNER" -Xmx128m $SPARK_LAUNCHER_OPTS -cp "$LAUNCH_CLASSPATH" org.apache.spark.launcher.Main "[email protected]" 

  58. printf "%d\0" $? 




  59. # Turn off posix mode since it does not allow process substitution 

  60. set +o posix 

  61. CMD=() 

  62. DELIM=$‘\n‘ 

  63. CMD_START_FLAG="false" 

  64. while IFS= read -d "$DELIM" -r ARG; do 

  65. if [ "$CMD_START_FLAG" == "true" ]; then 

  66. CMD+=("$ARG") 

  67. else 

  68. if [ "$ARG" == $‘\0‘ ]; then 

  69. # After NULL character is consumed, change the delimiter and consume command string. 

  70. DELIM=‘‘ 

  71. CMD_START_FLAG="true" 

  72. elif [ "$ARG" != "" ]; then 

  73. echo "$ARG" 

  74. fi 

  75. fi 

  76. done < <(build_command "[email protected]") 


  77. COUNT=${#CMD[@]} 

  78. LAST=$((COUNT - 1)) 

  79. LAUNCHER_EXIT_CODE=${CMD[$LAST]} 


  80. # Certain JVM failures result in errors being printed to stdout (instead of stderr), which causes 

  81. # the code that parses the output of the launcher to get confused. In those cases, check if the 

  82. # exit code is an integer, and if it‘s not, handle it as a special error case. 

  83. if ! [[ $LAUNCHER_EXIT_CODE =~ ^[0-9]+$ ]]; then 

  84. echo "${CMD[@]}" | head -n-1 1>&2 

  85. exit 1 

  86. fi 


  87. if [ $LAUNCHER_EXIT_CODE != 0 ]; then 

  88. exit $LAUNCHER_EXIT_CODE 

  89. fi 


  90. CMD=("${CMD[@]:0:$LAST}") 

  91. exec "${CMD[@]}" 


SparkSubmit

通过java命令启动,首先进入 Object SparkSubmit的main方法:

  • 构造SparkSubmit对象
  • 执行SparkSubmit.doSubmit方法.
  1. override def main(args: Array[String]): Unit = {


  2. val submit = new SparkSubmit() { 

  3. self => 


  4. override protected def parseArguments(args: Array[String]): SparkSubmitArguments = { 

  5. new SparkSubmitArguments(args) { 

  6. override protected def logInfo(msg: => String): Unit = self.logInfo(msg) 


  7. override protected def logWarning(msg: => String): Unit = self.logWarning(msg) 


  8. override protected def logError(msg: => String): Unit = self.logError(msg) 






  9. override protected def logInfo(msg: => String): Unit = printMessage(msg) 


  10. override protected def logWarning(msg: => String): Unit = printMessage(s"Warning: $msg") 


  11. override protected def logError(msg: => String): Unit = printMessage(s"Error: $msg") 


  12. override def doSubmit(args: Array[String]): Unit = { 

  13. try { 

  14. super.doSubmit(args) 

  15. } catch { 

  16. case e: SparkUserAppException => 

  17. exitFn(e.exitCode) 









  18. submit.doSubmit(args) 



这里我们进入SparkSubmit的doSubmit方法:

这里执行两步:

  • parseArguments(args)构造SparkSubmitArguments类对象
  • 执行submit()方法
  1. def doSubmit(args: Array[String]): Unit = {


  2. // Initialize logging if it hasn‘t been done yet. Keep track of whether logging needs to 

  3. // be reset before the application starts. 

  4. val uninitLog = initializeLogIfNecessary(true, silent = true) 

  5. //代码批注: 根据spark-submit提交的参数构造SparkSubmitArguments对象 

  6. val appArgs = parseArguments(args) 

  7. if (appArgs.verbose) { 

  8. logInfo(appArgs.toString) 



  9. appArgs.action match { 

  10. case SparkSubmitAction.SUBMIT => submit(appArgs, uninitLog) 

  11. case SparkSubmitAction.KILL => kill(appArgs) 

  12. case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs) 

  13. case SparkSubmitAction.PRINT_VERSION => printVersion() 





接下来我们进入submit()

  • 首先判断集群是否为standalone模式,这里由于集群是k8s native模式,直接执行else,进入doRunMain()
  • 由于我们spark-submit没有--proxy-user,直接执行53行的else,进入runMain()
  1. /**


  2. * Submit the application using the provided parameters, ensuring to first wrap 

  3. * in a doAs when --proxy-user is specified. 

  4. */ 

  5. @tailrec 

  6. private def submit(args: SparkSubmitArguments, uninitLog: Boolean): Unit = { 


  7. def doRunMain(): Unit = { 

  8. if (args.proxyUser != null) { 

  9. val proxyUser = UserGroupInformation.createProxyUser(args.proxyUser, 

  10. UserGroupInformation.getCurrentUser()) 

  11. try { 

  12. proxyUser.doAs(new PrivilegedExceptionAction[Unit]() { 

  13. override def run(): Unit = { 

  14. runMain(args, uninitLog) 



  15. }) 

  16. } catch { 

  17. case e: Exception => 

  18. // Hadoop‘s AuthorizationException suppresses the exception‘s stack trace, which 

  19. // makes the message printed to the output by the JVM not very helpful. Instead, 

  20. // detect exceptions with empty stack traces here, and treat them differently. 

  21. if (e.getStackTrace().length == 0) { 

  22. error(s"ERROR: ${e.getClass().getName()}: ${e.getMessage()}") 

  23. } else { 

  24. throw e 





  25. } else { 

  26. runMain(args, uninitLog) 






  27. // In standalone cluster mode, there are two submission gateways: 

  28. // (1) The traditional RPC gateway using o.a.s.deploy.Client as a wrapper 

  29. // (2) The new REST-based gateway introduced in Spark 1.3 

  30. // The latter is the default behavior as of Spark 1.3, but Spark submit will fail over 

  31. // to use the legacy gateway if the master endpoint turns out to be not a REST server. 

  32. if (args.isStandaloneCluster && args.useRest) { 

  33. try { 

  34. logInfo("Running Spark using the REST application submission protocol.") 

  35. doRunMain() 

  36. } catch { 

  37. // Fail over to use the legacy submission gateway 

  38. case e: SubmitRestConnectionException => 

  39. logWarning(s"Master endpoint ${args.master} was not a REST server. " + 

  40. "Falling back to legacy submission gateway instead.") 

  41. args.useRest = false 

  42. submit(args, false) 



  43. // In all other modes, just run the main class as prepared 

  44. } else { 

  45. doRunMain() 






进入runMain有两个关键步骤:

    1. 初始化childArgs,childClasspath,sparkConf,childMainClass.
    1. 实例化childMainClass

以上所谓的child都指的是resource manager中不用模式下提交作业的Client Main Class.

  1. /**


  2. * Run the main method of the child class using the submit arguments. 



  3. * This runs in two steps. First, we prepare the launch environment by setting up 

  4. * the appropriate classpath, system properties, and application arguments for 

  5. * running the child main class based on the cluster manager and the deploy mode. 

  6. * Second, we use this launch environment to invoke the main method of the child 

  7. * main class. 



  8. * Note that this main class will not be the one provided by the user if we‘re 

  9. * running cluster deploy mode or python applications. 

  10. */ 

  11. private def runMain(args: SparkSubmitArguments, uninitLog: Boolean): Unit = { 

  12. val (childArgs, childClasspath, sparkConf, childMainClass) = prepareSubmitEnvironment(args) 

  13. // Let the main class re-initialize the logging system once it starts. 

  14. if (uninitLog) { 

  15. Logging.uninitialize() 




  16. if (args.verbose) { 

  17. logInfo(s"Main class:\n$childMainClass") 

  18. logInfo(s"Arguments:\n${childArgs.mkString("\n")}") 

  19. // sysProps may contain sensitive information, so redact before printing 

  20. logInfo(s"Spark config:\n${Utils.redact(sparkConf.getAll.toMap).mkString("\n")}") 

  21. logInfo(s"Classpath elements:\n${childClasspath.mkString("\n")}") 

  22. logInfo("\n") 



  23. val loader = getSubmitClassLoader(sparkConf) 

  24. for (jar <- childClasspath) { 

  25. addJarToClasspath(jar, loader) 




  26. var mainClass: Class[_] = null 


  27. try { 

  28. mainClass = Utils.classForName(childMainClass) 

  29. } catch { 

  30. case e: ClassNotFoundException => 

  31. logError(s"Failed to load class $childMainClass.") 

  32. if (childMainClass.contains("thriftserver")) { 

  33. logInfo(s"Failed to load main class $childMainClass.") 

  34. logInfo("You need to build Spark with -Phive and -Phive-thriftserver.") 



  35. throw new SparkUserAppException(CLASS_NOT_FOUND_EXIT_STATUS) 

  36. case e: NoClassDefFoundError => 

  37. logError(s"Failed to load $childMainClass: ${e.getMessage()}") 

  38. if (e.getMessage.contains("org/apache/hadoop/hive")) { 

  39. logInfo(s"Failed to load hive class.") 

  40. logInfo("You need to build Spark with -Phive and -Phive-thriftserver.") 



  41. throw new SparkUserAppException(CLASS_NOT_FOUND_EXIT_STATUS) 




  42. val app: SparkApplication = if (classOf[SparkApplication].isAssignableFrom(mainClass)) { 

  43. mainClass.getConstructor().newInstance().asInstanceOf[SparkApplication] 

  44. } else { 

  45. new JavaMainApplication(mainClass) 




下面开始详细说明runMain()的两步:

第一步,初始化spark应用配置

  1. val (childArgs, childClasspath, sparkConf, childMainClass) = prepareSubmitEnvironment(args)


我们可以看一下prepareSubmitEnvironment()方法中有以下关键部分:

  1. 确认集群模式
  1. // Set the cluster manager


  2. val clusterManager: Int = args.master match { 

  3. case "yarn" => YARN 

  4. case m if m.startsWith("spark") => STANDALONE 

  5. case m if m.startsWith("mesos") => MESOS 

  6. case m if m.startsWith("k8s") => KUBERNETES 

  7. case m if m.startsWith("local") => LOCAL 

  8. case _ => 

  9. error("Master must either be yarn or start with spark, mesos, k8s, or local") 

  10. -1 




  11. // Set the deploy mode; default is client mode 

  12. var deployMode: Int = args.deployMode match { 

  13. case "client" | null => CLIENT 

  14. case "cluster" => CLUSTER 

  15. case _ => 

  16. error("Deploy mode must be either client or cluster") 

  17. -1 



确认完spark-submit提交的参数中是kubernetes的cluster模式之后

2. 封装spark应用的classpath,files,sparkConf,以及childmainClass.

prepareSubmitEnvironment()方法中关于k8s的几个代码块:

初始化k8s模式的spark master


  1. if (clusterManager == KUBERNETES) { 

  2. args.master = Utils.checkAndGetK8sMasterUrl(args.master) 

  3. // Make sure KUBERNETES is included in our build if we‘re trying to use it 

  4. if (!Utils.classIsLoadable(KUBERNETES_CLUSTER_SUBMIT_CLASS) && !Utils.isTesting) { 

  5. error( 

  6. "Could not load KUBERNETES classes. " + 

  7. "This copy of Spark may not have been compiled with KUBERNETES support.") 





构造各种集群模式判断条件的flag变量

  1. val isYarnCluster = clusterManager == YARN && deployMode == CLUSTER


  2. val isMesosCluster = clusterManager == MESOS && deployMode == CLUSTER 

  3. val isStandAloneCluster = clusterManager == STANDALONE && deployMode == CLUSTER 

  4. val isKubernetesCluster = clusterManager == KUBERNETES && deployMode == CLUSTER 

  5. val isKubernetesClient = clusterManager == KUBERNETES && deployMode == CLIENT 

  6. val isKubernetesClusterModeDriver = isKubernetesClient && 

  7. sparkConf.getBoolean("spark.kubernetes.submitInDriver", false) 


当然,我们的集群模式是kubernetes的cluster模式,根据isKubernetesCluster和isKubernetesClusterModeDriver,进入特定jar依赖解决和下载远程文件的流程,如果是Yarn或者Messos则进入的是其他流程:

将依赖的Jar 加入classpath

  1. if (!isMesosCluster && !isStandAloneCluster) {


  2. // Resolve maven dependencies if there are any and add classpath to jars. Add them to py-files 

  3. // too for packages that include Python code 

  4. val resolvedMavenCoordinates = DependencyUtils.resolveMavenDependencies( 

  5. args.packagesExclusions, args.packages, args.repositories, args.ivyRepoPath, 

  6. args.ivySettingsPath) 


  7. if (!StringUtils.isBlank(resolvedMavenCoordinates)) { 

  8. // In K8s client mode, when in the driver, add resolved jars early as we might need 

  9. // them at the submit time for artifact downloading. 

  10. // For example we might use the dependencies for downloading 

  11. // files from a Hadoop Compatible fs eg. S3. In this case the user might pass: 

  12. // --packages com.amazonaws:aws-java-sdk:1.7.4:org.apache.hadoop:hadoop-aws:2.7.6 

  13. if (isKubernetesClusterModeDriver) { 

  14. val loader = getSubmitClassLoader(sparkConf) 

  15. for (jar <- resolvedMavenCoordinates.split(",")) { 

  16. addJarToClasspath(jar, loader) 



  17. } else if (isKubernetesCluster) { 

  18. // We need this in K8s cluster mode so that we can upload local deps 

  19. // via the k8s application, like in cluster mode driver 

  20. childClasspath ++= resolvedMavenCoordinates.split(",") 

  21. } else { 

  22. args.jars = mergeFileLists(args.jars, resolvedMavenCoordinates) 

  23. if (args.isPython || isInternal(args.primaryResource)) { 

  24. args.pyFiles = mergeFileLists(args.pyFiles, resolvedMavenCoordinates) 









下载依赖的远程文件

  1. // In client mode, download remote files.


  2. var localPrimaryResource: String = null 

  3. var localJars: String = null 

  4. var localPyFiles: String = null 

  5. if (deployMode == CLIENT) { 

  6. localPrimaryResource = Option(args.primaryResource).map { 

  7. downloadFile(_, targetDir, sparkConf, hadoopConf, secMgr) 

  8. }.orNull 

  9. localJars = Option(args.jars).map { 

  10. downloadFileList(_, targetDir, sparkConf, hadoopConf, secMgr) 

  11. }.orNull 

  12. localPyFiles = Option(args.pyFiles).map { 

  13. downloadFileList(_, targetDir, sparkConf, hadoopConf, secMgr) 

  14. }.orNull 


  15. if (isKubernetesClusterModeDriver) { 

  16. // Replace with the downloaded local jar path to avoid propagating hadoop compatible uris. 

  17. // Executors will get the jars from the Spark file server. 

  18. // Explicitly download the related files here 

  19. args.jars = renameResourcesToLocalFS(args.jars, localJars) 

  20. val localFiles = Option(args.files).map { 

  21. downloadFileList(_, targetDir, sparkConf, hadoopConf, secMgr) 

  22. }.orNull 

  23. args.files = renameResourcesToLocalFS(args.files, localFiles) 






  24. > 初始化sparkConf 


  25. ``` scala?linenums 

  26. // A list of rules to map each argument to system properties or command-line options in 

  27. // each deploy mode; we iterate through these below 

  28. val options = List[OptionAssigner]( 


  29. // All cluster managers 

  30. OptionAssigner(args.master, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, confKey = "spark.master"), 

  31. OptionAssigner(args.deployMode, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, 

  32. confKey = SUBMIT_DEPLOY_MODE.key), 

  33. OptionAssigner(args.name, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, confKey = "spark.app.name"), 

  34. OptionAssigner(args.ivyRepoPath, ALL_CLUSTER_MGRS, CLIENT, confKey = "spark.jars.ivy"), 

  35. OptionAssigner(args.driverMemory, ALL_CLUSTER_MGRS, CLIENT, 

  36. confKey = DRIVER_MEMORY.key), 

  37. OptionAssigner(args.driverExtraClassPath, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, 

  38. confKey = DRIVER_CLASS_PATH.key), 

  39. OptionAssigner(args.driverExtraJavaOptions, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, 

  40. confKey = DRIVER_JAVA_OPTIONS.key), 

  41. OptionAssigner(args.driverExtraLibraryPath, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, 

  42. confKey = DRIVER_LIBRARY_PATH.key), 

  43. OptionAssigner(args.principal, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, 

  44. confKey = PRINCIPAL.key), 

  45. OptionAssigner(args.keytab, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, 

  46. confKey = KEYTAB.key), 

  47. OptionAssigner(args.pyFiles, ALL_CLUSTER_MGRS, CLUSTER, confKey = SUBMIT_PYTHON_FILES.key), 


  48. // Propagate attributes for dependency resolution at the driver side 

  49. OptionAssigner(args.packages, STANDALONE | MESOS | KUBERNETES, 

  50. CLUSTER, confKey = "spark.jars.packages"), 

  51. OptionAssigner(args.repositories, STANDALONE | MESOS | KUBERNETES, 

  52. CLUSTER, confKey = "spark.jars.repositories"), 

  53. OptionAssigner(args.ivyRepoPath, STANDALONE | MESOS | KUBERNETES, 

  54. CLUSTER, confKey = "spark.jars.ivy"), 

  55. OptionAssigner(args.packagesExclusions, STANDALONE | MESOS | KUBERNETES, 

  56. CLUSTER, confKey = "spark.jars.excludes"), 


  57. // Yarn only 

  58. OptionAssigner(args.queue, YARN, ALL_DEPLOY_MODES, confKey = "spark.yarn.queue"), 

  59. OptionAssigner(args.pyFiles, YARN, ALL_DEPLOY_MODES, confKey = "spark.yarn.dist.pyFiles", 

  60. mergeFn = Some(mergeFileLists(_, _))), 

  61. OptionAssigner(args.jars, YARN, ALL_DEPLOY_MODES, confKey = "spark.yarn.dist.jars", 

  62. mergeFn = Some(mergeFileLists(_, _))), 

  63. OptionAssigner(args.files, YARN, ALL_DEPLOY_MODES, confKey = "spark.yarn.dist.files", 

  64. mergeFn = Some(mergeFileLists(_, _))), 

  65. OptionAssigner(args.archives, YARN, ALL_DEPLOY_MODES, confKey = "spark.yarn.dist.archives", 

  66. mergeFn = Some(mergeFileLists(_, _))), 


  67. // Other options 

  68. OptionAssigner(args.numExecutors, YARN | KUBERNETES, ALL_DEPLOY_MODES, 

  69. confKey = EXECUTOR_INSTANCES.key), 

  70. OptionAssigner(args.executorCores, STANDALONE | YARN | KUBERNETES, ALL_DEPLOY_MODES, 

  71. confKey = EXECUTOR_CORES.key), 

  72. OptionAssigner(args.executorMemory, STANDALONE | MESOS | YARN | KUBERNETES, ALL_DEPLOY_MODES, 

  73. confKey = EXECUTOR_MEMORY.key), 

  74. OptionAssigner(args.totalExecutorCores, STANDALONE | MESOS | KUBERNETES, ALL_DEPLOY_MODES, 

  75. confKey = CORES_MAX.key), 

  76. OptionAssigner(args.files, LOCAL | STANDALONE | MESOS | KUBERNETES, ALL_DEPLOY_MODES, 

  77. confKey = FILES.key), 

  78. OptionAssigner(args.jars, LOCAL, CLIENT, confKey = JARS.key), 

  79. OptionAssigner(args.jars, STANDALONE | MESOS | KUBERNETES, ALL_DEPLOY_MODES, 

  80. confKey = JARS.key), 

  81. OptionAssigner(args.driverMemory, STANDALONE | MESOS | YARN | KUBERNETES, CLUSTER, 

  82. confKey = DRIVER_MEMORY.key), 

  83. OptionAssigner(args.driverCores, STANDALONE | MESOS | YARN | KUBERNETES, CLUSTER, 

  84. confKey = DRIVER_CORES.key), 

  85. OptionAssigner(args.supervise.toString, STANDALONE | MESOS, CLUSTER, 

  86. confKey = DRIVER_SUPERVISE.key), 

  87. OptionAssigner(args.ivyRepoPath, STANDALONE, CLUSTER, confKey = "spark.jars.ivy"), 


  88. // An internal option used only for spark-shell to add user jars to repl‘s classloader, 

  89. // previously it uses "spark.jars" or "spark.yarn.dist.jars" which now may be pointed to 

  90. // remote jars, so adding a new option to only specify local jars for spark-shell internally. 

  91. OptionAssigner(localJars, ALL_CLUSTER_MGRS, CLIENT, confKey = "spark.repl.local.jars")  



  92. //-------------------- 

  93. //忽略部分代码 

  94. //-------------------- 

  95. // Map all arguments to command-line options or system properties for our chosen mode 

  96. for (opt <- options) { 

  97. if (opt.value != null && 

  98. (deployMode & opt.deployMode) != 0 && 

  99. (clusterManager & opt.clusterManager) != 0) { 

  100. if (opt.clOption != null) { childArgs += (opt.clOption, opt.value) } 

  101. if (opt.confKey != null) { 

  102. if (opt.mergeFn.isDefined && sparkConf.contains(opt.confKey)) { 

  103. sparkConf.set(opt.confKey, opt.mergeFn.get.apply(sparkConf.get(opt.confKey), opt.value)) 

  104. } else { 

  105. sparkConf.set(opt.confKey, opt.value) 









初始化childMainClass,并传递Spark Application的mainClass或者R,Python的执行文件到childArgs

如果是Client模式,childMainClass直接就是Spark Application Main Class:

  1. // In client mode, launch the application main class directly


  2. // In addition, add the main application jar and any added jars (if any) to the classpath 

  3. if (deployMode == CLIENT) { 

  4. childMainClass = args.mainClass 

  5. if (localPrimaryResource != null && isUserJar(localPrimaryResource)) { 

  6. childClasspath += localPrimaryResource 



  7. if (localJars != null) { childClasspath ++= localJars.split(",") } 



如果是Cluster模式,childMainClass就是Kubernetes的Client Main Class,由它去调用Spark Application Main Class.

  1. if (isKubernetesCluster) {


  2. //这里KUBERNETES_CLUSTER_SUBMIT_CLASS是指: 

  3. //org.apache.spark.deploy.k8s.submit.KubernetesClientApplication,通过这个类来真正调用Spark应用的mainClass  

  4. childMainClass = KUBERNETES_CLUSTER_SUBMIT_CLASS 

  5. if (args.primaryResource != SparkLauncher.NO_RESOURCE) { 

  6. if (args.isPython) { 

  7. childArgs ++= Array("--primary-py-file", args.primaryResource) 

  8. childArgs ++= Array("--main-class", "org.apache.spark.deploy.PythonRunner") 

  9. } else if (args.isR) { 

  10. childArgs ++= Array("--primary-r-file", args.primaryResource) 

  11. childArgs ++= Array("--main-class", "org.apache.spark.deploy.RRunner") 



  12. else { 

  13. childArgs ++= Array("--primary-java-resource", args.primaryResource) 

  14. childArgs ++= Array("--main-class", args.mainClass) 



  15. } else { 

  16. childArgs ++= Array("--main-class", args.mainClass) 



  17. if (args.childArgs != null) { 

  18. args.childArgs.foreach { arg => 

  19. childArgs += ("--arg", arg) 








最后完成childArgs, childClasspath, sparkConf, childMainClass的初始化并返回

第二步,执行spark应用

进入runMain完成第一步之后,执行childClass中的main方法,这里cluster模式的childClass就是Yarn,Kubernetes,Mesos提交作业的Client类

通过Client再一次调用我们编写的Spark mainClass,这里使用例子SparkPi:

  1. package org.apache.spark.examples



  2. import scala.math.random 


  3. import org.apache.spark.sql.SparkSession 


  4. /** Computes an approximation to pi */ 

  5. object SparkPi { 

  6. def main(args: Array[String]): Unit = { 

  7. val spark = SparkSession 

  8. .builder 

  9. .appName("Spark Pi") 

  10. .getOrCreate() 

  11. val slices = if (args.length > 0) args(0).toInt else 2 

  12. val n = math.min(100000L * slices, Int.MaxValue).toInt // avoid overflow 

  13. val count = spark.sparkContext.parallelize(1 until n, slices).map { i => 

  14. val x = random * 2 - 1 

  15. val y = random * 2 - 1 

  16. if (x*x + y*y <= 1) 1 else 0 

  17. }.reduce(_ + _) 

  18. println(s"Pi is roughly ${4.0 * count / (n - 1)}") 

  19. spark.stop() 





所以从spark-class执行java命令之后,调用关系链为SparkSubmit.main->KubernetesClientApplication.main->SparkPi.main

在知道调用关系链之后,我们再看runMain的最后片段,可以看到第20行调用了SparkApplication接口的start(childArgs,sparkConf)方法.

  1. val app: SparkApplication = if (classOf[SparkApplication].isAssignableFrom(mainClass)) {


  2. //源码批注: 这里Yarn或者Kubernetes cluster模式会进入这里,因为Yarn/KubernetesClientApplication继承自SparkApplication  

  3. mainClass.getConstructor().newInstance().asInstanceOf[SparkApplication] 

  4. } else { 

  5. //源码批注: Client模式会进入这里,直接invoke SparkApplication mainClass的main方法 

  6. new JavaMainApplication(mainClass) 




  7. @tailrec 

  8. def findCause(t: Throwable): Throwable = t match { 

  9. case e: UndeclaredThrowableException => 

  10. if (e.getCause() != null) findCause(e.getCause()) else e 

  11. case e: InvocationTargetException => 

  12. if (e.getCause() != null) findCause(e.getCause()) else e 

  13. case e: Throwable => 






  14. try { 

  15. app.start(childArgs.toArray, sparkConf) 

  16. } catch { 

  17. case t: Throwable => 

  18. throw findCause(t) 



现在我们再进入KubernetesClientApplication.start()方法:

  1. /**


  2. * Main class and entry point of application submission in KUBERNETES mode. 

  3. */ 

  4. private[spark] class KubernetesClientApplication extends SparkApplication { 


  5. override def start(args: Array[String], conf: SparkConf): Unit = { 

  6. val parsedArguments = ClientArguments.fromCommandLineArgs(args) 

  7. run(parsedArguments, conf) 




  8. private def run(clientArguments: ClientArguments, sparkConf: SparkConf): Unit = { 

  9. val appName = sparkConf.getOption("spark.app.name").getOrElse("spark") 

  10. // For constructing the app ID, we can‘t use the Spark application name, as the app ID is going 

  11. // to be added as a label to group resources belonging to the same application. Label values are 

  12. // considerably restrictive, e.g. must be no longer than 63 characters in length. So we generate 

  13. // a unique app ID (captured by spark.app.id) in the format below. 

  14. val kubernetesAppId = s"spark-${UUID.randomUUID().toString.replaceAll("-", "")}" 

  15. val waitForAppCompletion = sparkConf.get(WAIT_FOR_APP_COMPLETION) 

  16. val kubernetesConf = KubernetesConf.createDriverConf( 

  17. sparkConf, 

  18. kubernetesAppId, 

  19. clientArguments.mainAppResource, 

  20. clientArguments.mainClass, 

  21. clientArguments.driverArgs) 

  22. // The master URL has been checked for validity already in SparkSubmit. 

  23. // We just need to get rid of the "k8s://" prefix here. 

  24. val master = KubernetesUtils.parseMasterUrl(sparkConf.get("spark.master")) 

  25. val loggingInterval = if (waitForAppCompletion) Some(sparkConf.get(REPORT_INTERVAL)) else None 


  26. val watcher = new LoggingPodStatusWatcherImpl(kubernetesAppId, loggingInterval) 


  27. Utils.tryWithResource(SparkKubernetesClientFactory.createKubernetesClient( 

  28. master, 

  29. Some(kubernetesConf.namespace), 

  30. KUBERNETES_AUTH_SUBMISSION_CONF_PREFIX, 

  31. SparkKubernetesClientFactory.ClientType.Submission, 

  32. sparkConf, 

  33. None, 

  34. None)) { kubernetesClient => 

  35. val client = new Client( 

  36. kubernetesConf, 

  37. new KubernetesDriverBuilder(), 

  38. kubernetesClient, 

  39. waitForAppCompletion, 

  40. watcher) 

  41. client.run() 








这里由Client连接k8s的API Server,开始构建Kubernetes Driver Pod,提交Spark on k8s的作业.

接下来,我们开始分析Driver Pod又是如何构建Exector Pod,并分配作业的.

未完待续...

  • spark kubernetes 代码UML结构分析
  • spark on k8s 作业调度流程

原文地址:https://www.cnblogs.com/lanrish/p/12081165.html

时间: 2024-11-05 12:12:47

Spark on K8S源码解析.md的相关文章

RestTemplate post请求使用map传参 Controller 接收不到值的解决方案 postForObject方法源码解析.md

结论 post方法中如果使用map传参,需要使用MultiValueMap来传递 RestTemplate 的 postForObject 方法有四个参数 String url => 顾名思义 这个参数是请求的url路径 Object request => 请求的body 这个参数需要再controller类用 @RequestBody 注解接收 Class responseType => 接收响应体的类型 第四个参数?postForObject 方法多种重构 Map<String

Scala 深入浅出实战经典 第48讲:Scala类型约束代码实战及其在Spark中的应用源码解析

王家林亲授<DT大数据梦工厂>大数据实战视频 Scala 深入浅出实战经典(1-64讲)完整视频.PPT.代码下载:百度云盘:http://pan.baidu.com/s/1c0noOt6 腾讯微云:http://url.cn/TnGbdC 360云盘:http://yunpan.cn/cQ4c2UALDjSKy 访问密码 45e2 技术爱好者尤其是大数据爱好者 可以加DT大数据梦工厂的qq群 DT大数据梦工厂① :462923555 DT大数据梦工厂②:437123764 DT大数据梦工厂③

68:Scala并发编程原生线程Actor、Cass Class下的消息传递和偏函数实战解析及其在Spark中的应用源码解析

今天给大家带来的是王家林老师的scala编程讲座的第68讲:Scala并发编程原生线程Actor.Cass Class下的消息传递和偏函数实战解析 昨天讲了Actor的匿名Actor及消息传递,那么我们今天来看一下原生线程Actor及CassClass下的消息传递,让我们从代码出发: case class Person(name:String,age:Int)//定义cass Class class HelloActor extends Actor{//预定义一个Actor  def act()

Scala 深入浅出实战经典 第60讲:Scala中隐式参数实战详解以及在Spark中的应用源码解析

王家林亲授<DT大数据梦工厂>大数据实战视频 Scala 深入浅出实战经典(1-87讲)完整视频.PPT.代码下载:百度云盘:http://pan.baidu.com/s/1c0noOt6 腾讯微云:http://url.cn/TnGbdC 360云盘:http://yunpan.cn/cQ4c2UALDjSKy 访问密码 45e2土豆:http://www.tudou.com/programs/view/IVN4EuFlmKk/优酷:http://v.youku.com/v_show/id_

Spark 源码解析:TaskScheduler的任务提交和task最佳位置算法

上篇文章< Spark 源码解析 : DAGScheduler中的DAG划分与提交 >介绍了DAGScheduler的Stage划分算法. 本文继续分析Stage被封装成TaskSet,并将TaskSet提交到集群的Executor执行的过程 在DAGScheduler的submitStage方法中,将Stage划分完成,生成拓扑结构,当一个stage没有父stage时候,会调用DAGScheduler的submitMissingTasks方法来提交该stage包含tasks. 首先来分析一下

[Spark內核] 第42课:Spark Broadcast内幕解密:Broadcast运行机制彻底解密、Broadcast源码解析、Broadcast最佳实践

本课主题 Broadcast 运行原理图 Broadcast 源码解析 Broadcast 运行原理图 Broadcast 就是将数据从一个节点发送到其他的节点上; 例如 Driver 上有一张表,而 Executor 中的每个并行执行的Task (100万个Task) 都要查询这张表的话,那我们通过 Broadcast 的方式就只需要往每个Executor 把这张表发送一次就行了,Executor 中的每个运行的 Task 查询这张唯一的表,而不是每次执行的时候都从 Driver 中获得这张表

Spark版本定制第5天:案列解析Spark Streaming运行源码

本期内容: 1 在线动态计算分类最热门商品案例回顾与演示 2 基于案例贯通Spark Streaming的运行源码 一切不能进行实时流处理的数据都是无效的数据.在流处理时代,SparkStreaming有着强大吸引力,而且发展前景广阔,加之Spark的生态系统,Streaming可以方便调用其他的诸如SQL,MLlib等强大框架,它必将一统天下. Spark Streaming运行时与其说是Spark Core上的一个流式处理框架,不如说是Spark Core上的一个最复杂的应用程序.如果可以掌

Scala 深入浅出实战经典 第65讲:Scala中隐式转换内幕揭秘、最佳实践及其在Spark中的应用源码解析

王家林亲授<DT大数据梦工厂>大数据实战视频 Scala 深入浅出实战经典(1-87讲)完整视频.PPT.代码下载:百度云盘:http://pan.baidu.com/s/1c0noOt6 腾讯微云:http://url.cn/TnGbdC 360云盘:http://yunpan.cn/cQ4c2UALDjSKy 访问密码 45e2土豆:http://www.tudou.com/programs/view/NGgUD5FBQaA/优酷:http://v.youku.com/v_show/id_

SearchView源码解析

原文地址:https://github.com/nukc/SearchViewAnalysis/blob/master/README.md SearchView是一个搜索框控件,样式也挺好看的.这次解析主要围绕android.support.v7.widget包下的SearchView(API >= 7),android.widget.SearchView支持API >= 11, 另外有个android.support.v4.widget.SearchViewCompat. 目录 1. 源码解