大数据:Spark Standalone 集群调度(一)从远程调试开始说application创建

远程debug,特别是在集群方式时候,会很方便了解代码的运行方式,这也是码农比较喜欢的方式

虽然scala的语法和java不一样,但是scala是运行在JVM虚拟机上的,也就是scala最后编译成字节码运行在JVM上,那么远程调试方式就是JVM调试方式

在服务器端:

-Xdebug -Xrunjdwp:server=y,transport=dt_socket,address=7001,suspend=y 

客户端通过socket就能远程调试代码

1. 调试submit, master, worker代码

1.1 Submit 调试

客户端client 运行Submit,这里就不描述,通常spark的用例都是用

spark-submit

提交一个spark任务

其本质就是类似下面命令

/usr/java/jdk1.8.0_111/bin/java -cp /work/spark-2.1.0-bin-hadoop2.7/conf/:/work/spark-2.1.0-bin-hadoop2.7/jars/* -Xdebug -Xrunjdwp:server=y,transport=dt_socket,address=7000,suspend=y -Xmx1g org.apache.spark.deploy.SparkSubmit --master spark://raintungmaster:7077 --class rfcexample --jars /work/spark-2.1.0-bin-hadoop2.7/examples/jars/scopt_2.11-3.3.0.jar,/work/spark-2.1.0-bin-hadoop2.7/examples/jars/spark-examples_2.11-2.1.0.jar /tmp/machinelearning.jar

调用SparkSubmit的类去提交任务,debug的参数直接往上加就是了

1.2 master, worker 的设置调试

export SPARK_WORKER_OPTS="-Xdebug -Xrunjdwp:server=y,transport=dt_socket,address=8000,suspend=n"
export SPARK_MASTER_OPTS="-Xdebug -Xrunjdwp:server=y,transport=dt_socket,address=8001,suspend=n"

在启动的时候设置环境变量就可以了

2. 调试executor 代码

发现设置woker环境参数,但确一直都无法调试在spark executor 运行的代码,既然executor是在worker上运行的,当然是可以远程debug,但为啥executor不能调试呢?

3. Spark standalone 的集群调度

既然executor不能调试,我们需要把submit, master, worker的调度关系搞清楚

3.1 Submit 提交任务

刚才已经描述过submit实际上初始化了SparkSubmit的类,在SparkSubmit的main方法中调用了runMain方法

try {
      mainMethod.invoke(null, childArgs.toArray)
    } catch {
      case t: Throwable =>
        findCause(t) match {
          case SparkUserAppException(exitCode) =>
            System.exit(exitCode)

          case t: Throwable =>
            throw t
        }
    }

而核心就是调用了在我们提交的类的main方法,在上面的例子里就是参数

--class rfcexample

调用了rfcexample的main方法

通常我们在写spark的运行的类的方法,会初始化spark的上下文

val sc = new SparkContext(conf)

SparkContext初始化的时候会启动一个task的任务

// Create and start the scheduler
    val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode)
    _schedulerBackend = sched
    _taskScheduler = ts
    _dagScheduler = new DAGScheduler(this)
    _heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)

    // start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler‘s
    // constructor
    _taskScheduler.start()

在standalone 的模式下最后会调用StandaloneSchedulerBackend.scala 的start方法

    val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command,
      appUIAddress, sc.eventLogDir, sc.eventLogCodec, coresPerExecutor, initialExecutorLimit)
    client = new StandaloneAppClient(sc.env.rpcEnv, masters, appDesc, this, conf)
    client.start()
    launcherBackend.setState(SparkAppHandle.State.SUBMITTED)
    waitForRegistration()
    launcherBackend.setState(SparkAppHandle.State.RUNNING)

构建Application的描述符号,启动一个StandaloneAppClient 去connect 的master

3.2 Master 分配任务

Submit 里创建了一个客户端构建了一个application的描述,注册application 到master中,在master的dispatcher分发消息会收到registerapplication的消息

case RegisterApplication(description, driver) =>
      // TODO Prevent repeated registrations from some driver
      if (state == RecoveryState.STANDBY) {
        // ignore, don‘t send response
      } else {
        logInfo("Registering app " + description.name)
        val app = createApplication(description, driver)
        registerApplication(app)
        logInfo("Registered app " + description.name + " with ID " + app.id)
        persistenceEngine.addApplication(app)
        driver.send(RegisteredApplication(app.id, self))
        schedule()
      }

创建一个新的application id, 注册这个application,一个application只能绑定一个客户端端口,同一个客户端的ip:port只能注册一个application,在schedule里通过计算application的内存,core的要求,进行对有效的worker分配executor

private def launchExecutor(worker: WorkerInfo, exec: ExecutorDesc): Unit = {
    logInfo("Launching executor " + exec.fullId + " on worker " + worker.id)
    worker.addExecutor(exec)
    worker.endpoint.send(LaunchExecutor(masterUrl,
      exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory))
    exec.application.driver.send(
      ExecutorAdded(exec.id, worker.id, worker.hostPort, exec.cores, exec.memory))
  }

在worker的endpoint发送了LaunchExecutor的序列化消息

3.3 Worker 分配任务

在worker.scala中dispatcher收到了LaunchExecutor 消息

case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_) =>
      if (masterUrl != activeMasterUrl) {
        logWarning("Invalid Master (" + masterUrl + ") attempted to launch executor.")
      } else {
        try {
          logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name))

          // Create the executor‘s working directory
          val executorDir = new File(workDir, appId + "/" + execId)
          if (!executorDir.mkdirs()) {
            throw new IOException("Failed to create directory " + executorDir)
          }

          // Create local dirs for the executor. These are passed to the executor via the
          // SPARK_EXECUTOR_DIRS environment variable, and deleted by the Worker when the
          // application finishes.
          val appLocalDirs = appDirectories.getOrElse(appId,
            Utils.getOrCreateLocalRootDirs(conf).map { dir =>
              val appDir = Utils.createDirectory(dir, namePrefix = "executor")
              Utils.chmod700(appDir)
              appDir.getAbsolutePath()
            }.toSeq)
          appDirectories(appId) = appLocalDirs
          val manager = new ExecutorRunner(
            appId,
            execId,
            appDesc.copy(command = Worker.maybeUpdateSSLSettings(appDesc.command, conf)),
            cores_,
            memory_,
            self,
            workerId,
            host,
            webUi.boundPort,
            publicAddress,
            sparkHome,
            executorDir,
            workerUri,
            conf,
            appLocalDirs, ExecutorState.RUNNING)
          executors(appId + "/" + execId) = manager
          manager.start()
          coresUsed += cores_
          memoryUsed += memory_
          sendToMaster(ExecutorStateChanged(appId, execId, manager.state, None, None))
        } catch {
          case e: Exception =>
            logError(s"Failed to launch executor $appId/$execId for ${appDesc.name}.", e)
            if (executors.contains(appId + "/" + execId)) {
              executors(appId + "/" + execId).kill()
              executors -= appId + "/" + execId
            }
            sendToMaster(ExecutorStateChanged(appId, execId, ExecutorState.FAILED,
              Some(e.toString), None))
        }
      }

创建了一个工作目录,启动了ExecutorRunner

private[worker] def start() {
    workerThread = new Thread("ExecutorRunner for " + fullId) {
      override def run() { fetchAndRunExecutor() }
    }
    workerThread.start()
    // Shutdown hook that kills actors on shutdown.
    shutdownHook = ShutdownHookManager.addShutdownHook { () =>
      // It‘s possible that we arrive here before calling `fetchAndRunExecutor`, then `state` will
      // be `ExecutorState.RUNNING`. In this case, we should set `state` to `FAILED`.
      if (state == ExecutorState.RUNNING) {
        state = ExecutorState.FAILED
      }
      killProcess(Some("Worker shutting down")) }
  }

在ExecutorRunner.scala的start的方法里,启动了线程ExecutorRunner for xxx, 运行executor,难道application里的方法就是在这个线程里运行的?

private def fetchAndRunExecutor() {
    try {
      // Launch the process
      val builder = CommandUtils.buildProcessBuilder(appDesc.command, new SecurityManager(conf),
        memory, sparkHome.getAbsolutePath, substituteVariables)
      val command = builder.command()
      val formattedCommand = command.asScala.mkString("\"", "\" \"", "\"")
      .....
      process = builder.start()
      ......
      val exitCode = process.waitFor()
      state = ExecutorState.EXITED
      val message = "Command exited with code " + exitCode
      worker.send(ExecutorStateChanged(appId, execId, state, Some(message), Some(exitCode)))
    } catch {
      ......
    }
  }

在看fetchAndRunExecutor的方法里,我们看到了builder.start,这是一个ProcessBuilder,也就是当前线程启动了一个子进程运行命令

这就是为什么我们无法通过debug worker的方式去debug executor, 因为这是另一个进程

4. 调试executor进程

我们刚才跟了代码一路,发现在master接受到RegisterApplication消息到发送调度worker的LaunchExecutor消息,并没有对消息进行处理,最后子进程的运行命令是从ApplicationDescription中的command获取到,而我们也知道ApplicationDescription 就是3.1种的Submit创建的,那就回到StandaloneSchedulerBackend.scala
的start方法

val driverUrl = RpcEndpointAddress(
      sc.conf.get("spark.driver.host"),
      sc.conf.get("spark.driver.port").toInt,
      CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString
    val args = Seq(
      "--driver-url", driverUrl,
      "--executor-id", "{{EXECUTOR_ID}}",
      "--hostname", "{{HOSTNAME}}",
      "--cores", "{{CORES}}",
      "--app-id", "{{APP_ID}}",
      "--worker-url", "{{WORKER_URL}}")
    val extraJavaOpts = sc.conf.getOption("spark.executor.extraJavaOptions")
      .map(Utils.splitCommandString).getOrElse(Seq.empty)
    val classPathEntries = sc.conf.getOption("spark.executor.extraClassPath")
      .map(_.split(java.io.File.pathSeparator).toSeq).getOrElse(Nil)
    val libraryPathEntries = sc.conf.getOption("spark.executor.extraLibraryPath")
      .map(_.split(java.io.File.pathSeparator).toSeq).getOrElse(Nil)

    // When testing, expose the parent class path to the child. This is processed by
    // compute-classpath.{cmd,sh} and makes all needed jars available to child processes
    // when the assembly is built with the "*-provided" profiles enabled.
    val testingClassPath =
      if (sys.props.contains("spark.testing")) {
        sys.props("java.class.path").split(java.io.File.pathSeparator).toSeq
      } else {
        Nil
      }

    // Start executors with a few necessary configs for registering with the scheduler
    val sparkJavaOpts = Utils.sparkJavaOpts(conf, SparkConf.isExecutorStartupConf)
    val javaOpts = sparkJavaOpts ++ extraJavaOpts

我们看到了executor 的java的参数是在javaOpts里控制的,也就是

val extraJavaOpts = sc.conf.getOption("spark.executor.extraJavaOptions")

原来是参数spark.executor.extraJavaOptions控制的,反过来去翻spark文档,虽然有点晚

spark.executor.extraJavaOptions (none) A string of extra JVM options to pass to executors. For instance, GC settings or other logging.  Note that it is illegal to set Spark properties or maximum heap size (-Xmx)settings  with this option. Spark properties should be set using a SparkConf object
or the spark-defaults.conf file used with the spark-submit script. Maximum heap size settings can be set with spark.executor.memory.

在这个文档里,我们可以通过设置conf 对spark_submit 进行executor 进行JVM参数设置

--conf "spark.executor.extraJavaOptions=-Xdebug -Xrunjdwp:server=y,transport=dt_socket,address=7001,suspend=y"

整个运行的submit的进程就是

/usr/java/jdk1.8.0_111/bin/java -cp /work/spark-2.1.0-bin-hadoop2.7/conf/:/work/spark-2.1.0-bin-hadoop2.7/jars/* -Xdebug -Xrunjdwp:server=y,transport=dt_socket,address=7000,suspend=y -Xmx1g org.apache.spark.deploy.SparkSubmit --master spark://raintungmaster:7077 --class rfcexample --jars /work/spark-2.1.0-bin-hadoop2.7/examples/jars/scopt_2.11-3.3.0.jar,/work/spark-2.1.0-bin-hadoop2.7/examples/jars/spark-examples_2.11-2.1.0.jar --conf "spark.executor.extraJavaOptions=-Xdebug -Xrunjdwp:server=y,transport=dt_socket,address=7001,suspend=y" /tmp/machinelearning.jar

注意:

如果你的worker 不能起多个executor,毕竟监听端口在一起机器上只能起一个。

时间: 2024-12-10 13:36:13

大数据:Spark Standalone 集群调度(一)从远程调试开始说application创建的相关文章

大数据:Spark Standalone 集群调度(三)多Master节点的可用性

1. Master 单节点可用性 Master节点在Spark中所承载的作用是分配Application到Worker节点,维护Worker节点,Driver,Application的状态. 在Spark中,Master本身也提供了基于硬盘的单节点的可用性,也就是可以直接通过重启Master,Master通过读取硬盘里保存的状态,进行单节点的恢复. In order to enable this recovery mode, you can set SPARK_DAEMON_JAVA_OPTS

大数据-spark HA集群搭建

一.安装scala 我们安装的是scala-2.11.8 5台机器全部安装 下载需要的安装包并进行解压 配置环境变量 [root@master1 ~]# vi /etc/profile export SCALA_HOME=/opt/software/scala-2.11.8 export PATH=$SCALA_HOME/bin:$PATH [root@master1 ~]# source /etc/profile 启动scala [root@master1 workspace]# vim /e

(二)win7下用Intelij IDEA 远程调试spark standalone 集群

关于这个spark的环境搭建了好久,踩了一堆坑,今天 环境: WIN7笔记本  spark 集群(4个虚拟机搭建的) Intelij IDEA15 scala-2.10.4 java-1.7.0 版本问题: 个人选择的是hadoop2.6.0 spark1.5.0 scala2.10.4  jdk1.7.0 关于搭建集群环境,见个人的上一篇博客:(一) Spark Standalone集群环境搭建,接下来就是用Intelij IDEA来远程连接spark集群,这样就可以方便的在本机上进行调试.

大数据高可用集群环境安装与配置(09)——安装Spark高可用集群

1. 获取spark下载链接 登录官网:http://spark.apache.org/downloads.html 选择要下载的版本 2. 执行命令下载并安装 cd /usr/local/src/ wget http://mirrors.tuna.tsinghua.edu.cn/apache/spark/spark-2.4.4/spark-2.4.4-bin-hadoop2.7.tgz tar -zxvf spark-2.4.4-bin-hadoop2.7.tgz mv spark-2.4.4

Laxcus大数据管理系统单机集群版

Laxcus大数据管理系统是我们Laxcus大数据实验室历时5年,全体系全功能设计研发的大数据产品,目前的最新版本是2.1版本.从三年前的1.0版本开始,Laxcus大数据系统投入到多个大数据和云计算项目中使用.2.0版本的Laxcus大数据管理系统,已经从紧耦合架构转为松耦合架构,整合了最新的大数据和关系数据库的技术,实现了一站式数据处理,大幅度提高了并行处理能力,同时兼具易操作.易维护.运行稳定的特点,节点数和数据存储计算规模已经达到百万台级和EB量级.目前已经覆盖的技术包括:行列混合存储.

大数据(hdfs集群及其集群的高级管理)

#### 大数据课程第二天 伪分布式hadoop的启动停止脚本[使用] sbin/hadoop-daemon.sh start namenode sbin/hadoop-daemon.sh start datanode sbin/yarn-daemon.sh start resourcemanager sbin/yarn-daemon.sh start nodemanager ? shell脚本 xxx.sh ls mkdir hadoop-start.sh sbin/hadoop-daemon

大数据高可用集群环境安装与配置(07)——安装HBase高可用集群

1. 下载安装包 登录官网获取HBase安装包下载地址 https://hbase.apache.org/downloads.html 2. 执行命令下载并安装 cd /usr/local/src/ wget http://mirrors.tuna.tsinghua.edu.cn/apache/hbase/2.1.8/hbase-2.1.8-bin.tar.gz tar -zxvf hbase-2.1.8-bin.tar.gz mv hbase-2.1.8 /usr/local/hbase/ 3

大数据高可用集群环境安装与配置(06)——安装Hadoop高可用集群

下载Hadoop安装包 登录 https://mirrors.tuna.tsinghua.edu.cn/apache/hadoop/common/ 镜像站,找到我们要安装的版本,点击进去复制下载链接 安装Hadoop时要注意版本与后续安装的HBase.Spark等相关组件的兼容,不要安装了不匹配的版本,而导致某些组件需要重装 输入命令进行安装操作 cd /usr/local/src/ wget https://mirrors.tuna.tsinghua.edu.cn/apache/hadoop/

Spark standalone集群安装

本文不会搞什么Yarn混搭Spark,只想建立一个纯粹的Spark环境,太多层东西搅和在一起,不靠谱. 创建spark服务运行帐号 # useradd smile smile帐号就是spark服务的运行帐号. 下载安装包并测试 在root帐号下,下载最新安装包,注意不是source,而是bin安装包,支持hadoop2.6以后的 wget http://mirrors.cnnic.cn/apache/spark/spark-1.5.1/spark-1.5.1-bin-hadoop2.6.tgz