小记--------spark的worker原理分析及源码分析

Worker类源码位置: org.apache.spark.deploy.worker

/**
*启动driver的源码分析
*/
case LaunchDriver(driverId, driverDesc) =>
  logInfo(s"Asked to launch driver $driverId")

//创建DriverRunner线程
  val driver = new DriverRunner(
    conf,
    driverId,
    workDir,
    sparkHome,
    driverDesc.copy(command = Worker.maybeUpdateSSLSettings(driverDesc.command, conf)),
    self,
    workerUri,
    securityMgr)

//把DriverRunner线程加入Drivers的hashset中
  drivers(driverId) = driver

//启动driver
  driver.start() //详细代码见:代码1

  coresUsed += driverDesc.cores
  memoryUsed += driverDesc.mem

代码1
/** Starts a thread to run and manage the driver. */
private[worker] def start() = {

  //DriverRunner机制分析
  //启动一个java线程
  new Thread("DriverRunner for " + driverId) {
    override def run() {
      var shutdownHook: AnyRef = null
      try {
        shutdownHook = ShutdownHookManager.addShutdownHook { () =>
          logInfo(s"Worker shutting down, killing driver $driverId")
          kill()
        }

        // prepare driver jars and run driver
        // 在此处进行第一步:创建DriverRunner的工作目录
        // 第二步,下载用户上传的jar(我们编写完的spark应用程序,如果是java,用maven打个jar包,如果是scala,那么会用export将它导出为jar包)
        //第三步 构建ProcessBuilder
        val exitCode = prepareAndRunDriver()//详细代码见:代码2

        // set final state depending on if forcibly killed and process exit code
        // 对driver的退出状态做一些处理
        finalState = if (exitCode == 0) {
          Some(DriverState.FINISHED)
        } else if (killed) {
          Some(DriverState.KILLED)
        } else {
          Some(DriverState.FAILED)
        }
      } catch {
        case e: Exception =>
          kill()
          finalState = Some(DriverState.ERROR)
          finalException = Some(e)
      } finally {
        if (shutdownHook != null) {
          ShutdownHookManager.removeShutdownHook(shutdownHook)
        }
      }

      // notify worker of final driver state, possible exception
        // 这个DriverRunner这个线程,向它所属的worker的actor,发送一个DriverStateChanged的事件
      worker.send(DriverStateChanged(driverId, finalState.get, finalException))//详细代码见:代码3
    }
  }.start()
}

代码2
private[worker] def prepareAndRunDriver(): Int = {
  val driverDir = createWorkingDirectory()//创建DriverRunner的工作目录
  val localJarFilename = downloadUserJar(driverDir)//第二步,下载用户上传的jar

  def substituteVariables(argument: String): String = argument match {
    case "{{WORKER_URL}}" => workerUrl
    case "{{USER_JAR}}" => localJarFilename
    case other => other
  }

  // TODO: If we add ability to submit multiple jars they should also be added here

  // 构建ProcessBuilder
  // 传入了driver的启动命令,需要的内存大小等信息
  val builder = CommandUtils.buildProcessBuilder(driverDesc.command, securityManager,
    driverDesc.mem, sparkHome.getAbsolutePath, substituteVariables)

  runDriver(builder, driverDir, driverDesc.supervise)
}

代码3
//driver执行完以后,driverrunner线程会发送一个状态给worker
//然后worker实际上会将DriverStateChanged消息发送给Master
case driverStateChanged @ DriverStateChanged(driverId, state, exception) =>
  handleDriverStateChanged(driverStateChanged)//详细代码见:代码4

代码4
private[worker] def handleDriverStateChanged(driverStateChanged: DriverStateChanged): Unit = {
  val driverId = driverStateChanged.driverId
  val exception = driverStateChanged.exception
  val state = driverStateChanged.state
  state match {
    case DriverState.ERROR =>
      logWarning(s"Driver $driverId failed with unrecoverable exception: ${exception.get}")
    case DriverState.FAILED =>
      logWarning(s"Driver $driverId exited with failure")
    case DriverState.FINISHED =>
      logInfo(s"Driver $driverId exited successfully")
    case DriverState.KILLED =>
      logInfo(s"Driver $driverId was killed by user")
    case _ =>
      logDebug(s"Driver $driverId changed state to $state")
  }

//worker把DriverStateChanged消息发送给Master
// Master会对状态进行修改
  sendToMaster(driverStateChanged)

//将driver从本地缓存中移除
  val driver = drivers.remove(driverId).get

//将driver加入完成driver的队列
  finishedDrivers(driverId) = driver
  trimFinishedDriversIfNecessary()

//将driver的内存和CPU进行释放
  memoryUsed -= driver.driverDesc.mem
  coresUsed -= driver.driverDesc.cores
}
/**
*启动Executor的源码分析
*/
case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_) =>
  if (masterUrl != activeMasterUrl) {
    logWarning("Invalid Master (" + masterUrl + ") attempted to launch executor.")
  } else {
    try {
      logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name))

      // Create the executor‘s working directory
      // 创建executor本地工作目录
      val executorDir = new File(workDir, appId + "/" + execId)
      if (!executorDir.mkdirs()) {
        throw new IOException("Failed to create directory " + executorDir)
      }

      // Create local dirs for the executor. These are passed to the executor via the
      // SPARK_EXECUTOR_DIRS environment variable, and deleted by the Worker when the
      // application finishes.
      val appLocalDirs = appDirectories.getOrElse(appId,
        Utils.getOrCreateLocalRootDirs(conf).map { dir =>
          val appDir = Utils.createDirectory(dir, namePrefix = "executor")
          Utils.chmod700(appDir)
          appDir.getAbsolutePath()
        }.toSeq)
      appDirectories(appId) = appLocalDirs

        //创建ExecutorRunner
      val manager = new ExecutorRunner(
        appId,
        execId,
        appDesc.copy(command = Worker.maybeUpdateSSLSettings(appDesc.command, conf)),
        cores_,
        memory_,
        self,
        workerId,
        host,
        webUi.boundPort,
        publicAddress,
        sparkHome,
        executorDir,
        workerUri,
        conf,
        appLocalDirs, ExecutorState.RUNNING)

    //把executorRunner加入本地缓存
      executors(appId + "/" + execId) = manager

    //启动ExecutorRunner
      manager.start()//详细代码:见代码5

    //加上Executor需要使用的CPU 内存的资源
      coresUsed += cores_
      memoryUsed += memory_

    //向master返回一个ExecutorStateChanged事件,用于master修改状态
      sendToMaster(ExecutorStateChanged(appId, execId, manager.state, None, None))
    } catch {
      case e: Exception =>
        logError(s"Failed to launch executor $appId/$execId for ${appDesc.name}.", e)
        if (executors.contains(appId + "/" + execId)) {
          executors(appId + "/" + execId).kill()
          executors -= appId + "/" + execId
        }
        sendToMaster(ExecutorStateChanged(appId, execId, ExecutorState.FAILED,
          Some(e.toString), None))
    }
  }

代码5
private[worker] def start() {

    //创建一个java线程
  workerThread = new Thread("ExecutorRunner for " + fullId) {
    override def run() { fetchAndRunExecutor() }//详细代码见代码6
  }
  workerThread.start()
  // Shutdown hook that kills actors on shutdown.
  shutdownHook = ShutdownHookManager.addShutdownHook { () =>
    // It‘s possible that we arrive here before calling `fetchAndRunExecutor`, then `state` will
    // be `ExecutorState.RUNNING`. In this case, we should set `state` to `FAILED`.
    if (state == ExecutorState.RUNNING) {
      state = ExecutorState.FAILED
    }
    killProcess(Some("Worker shutting down")) }
}

代码6
/**
* Download and run the executor described in our ApplicationDescription
*/
private def fetchAndRunExecutor() {
  try {
    // Launch the process

    //封装一个ProcessBuilder
    val builder = CommandUtils.buildProcessBuilder(appDesc.command, new SecurityManager(conf),
      memory, sparkHome.getAbsolutePath, substituteVariables)
    val command = builder.command()
    val formattedCommand = command.asScala.mkString("\"", "\" \"", "\"")
    logInfo(s"Launch command: $formattedCommand")

    builder.directory(executorDir)
    builder.environment.put("SPARK_EXECUTOR_DIRS", appLocalDirs.mkString(File.pathSeparator))
    // In case we are running this from within the Spark Shell, avoid creating a "scala"
    // parent process for the executor command
    builder.environment.put("SPARK_LAUNCH_WITH_SCALA", "0")

    // Add webUI log urls
    val baseUrl =
      if (conf.getBoolean("spark.ui.reverseProxy", false)) {
        s"/proxy/$workerId/logPage/?appId=$appId&executorId=$execId&logType="
      } else {
        s"http://$publicAddress:$webUiPort/logPage/?appId=$appId&executorId=$execId&logType="
      }
    builder.environment.put("SPARK_LOG_URL_STDERR", s"${baseUrl}stderr")
    builder.environment.put("SPARK_LOG_URL_STDOUT", s"${baseUrl}stdout")

    process = builder.start()

    //重定向到输出流文件(将是stdout和stderr)
    //将executor的InputStream和ErrorStream,输出的信息
    //分贝重定向到本地工作目录的stdout文件,和stderr文件中
    val header = "Spark Executor Command: %s\n%s\n\n".format(
      formattedCommand, "=" * 40)

    // Redirect its stdout and stderr to files
    val stdout = new File(executorDir, "stdout")
    stdoutAppender = FileAppender(process.getInputStream, stdout, conf)

    val stderr = new File(executorDir, "stderr")
    Files.write(header, stderr, StandardCharsets.UTF_8)
    stderrAppender = FileAppender(process.getErrorStream, stderr, conf)

    // Wait for it to exit; executor may exit with code 0 (when driver instructs it to shutdown)
    // or with nonzero exit code
    // 调用Proess的waitFor()方法,启动executor进程
    val exitCode = process.waitFor()

    // executor执行完之后拿到返回值状态
    state = ExecutorState.EXITED
    val message = "Command exited with code " + exitCode

 //向ExecutorRunner线程所属的Worker actor,发送ExecutorStateChanged消息
    worker.send(ExecutorStateChanged(appId, execId, state, Some(message), Some(exitCode)))//详细代码见:代码7
  } catch {
    case interrupted: InterruptedException =>
      logInfo("Runner thread for executor " + fullId + " interrupted")
      state = ExecutorState.KILLED
      killProcess(None)
    case e: Exception =>
      logError("Error running executor", e)
      state = ExecutorState.FAILED
      killProcess(Some(e.toString))
  }
}

代码7
//向master发送executorstatechanged事件
case executorStateChanged @ ExecutorStateChanged(appId, execId, state, message, exitStatus) =>
  handleExecutorStateChanged(executorStateChanged)//详细代码见:代码8

代码8
private[worker] def handleExecutorStateChanged(executorStateChanged: ExecutorStateChanged):
  Unit = {

// 直接向master也发送一个executorstatechanged消息
  sendToMaster(executorStateChanged)
  val state = executorStateChanged.state

// 如果executor状态是finished
  if (ExecutorState.isFinished(state)) {
    val appId = executorStateChanged.appId
    val fullId = appId + "/" + executorStateChanged.execId
    val message = executorStateChanged.message
    val exitStatus = executorStateChanged.exitStatus
    executors.get(fullId) match {
      case Some(executor) =>
        logInfo("Executor " + fullId + " finished with state " + state +
          message.map(" message " + _).getOrElse("") +
          exitStatus.map(" exitStatus " + _).getOrElse(""))

        // 将executor从内存中移除
        executors -= fullId
        finishedExecutors(fullId) = executor
        trimFinishedExecutorsIfNecessary()

        // 释放executor占用的内存和CPU资源
        coresUsed -= executor.cores
        memoryUsed -= executor.memory
      case None =>
        logInfo("Unknown Executor " + fullId + " finished with state " + state +
          message.map(" message " + _).getOrElse("") +
          exitStatus.map(" exitStatus " + _).getOrElse(""))
    }
    maybeCleanupApplication(appId)
  }
}

原文地址:https://www.cnblogs.com/yzqyxq/p/12054358.html

时间: 2024-08-30 00:10:38

小记--------spark的worker原理分析及源码分析的相关文章

SpringMVC关于json、xml自动转换的原理研究[附带源码分析 --转

SpringMVC关于json.xml自动转换的原理研究[附带源码分析] 原文地址:http://www.cnblogs.com/fangjian0423/p/springMVC-xml-json-convert.html 目录 前言 现象 源码分析 实例讲解 关于配置 总结 参考资料 前言 SpringMVC是目前主流的Web MVC框架之一. 如果有同学对它不熟悉,那么请参考它的入门blog:http://www.cnblogs.com/fangjian0423/p/springMVC-in

OpenStack_Swift源码分析——Object-auditor源码分析(1)

1 Object-auditor 的启动 Object-auditor的启动和object-replicator的启动过程是一样的,首先是执行启动脚本 swift-init object-auditor start 启动脚本会运行swift源码bin目录下的swift-ojbect-auditor if __name__ == '__main__': parser = OptionParser("%prog CONFIG [options]") parser.add_option('-

OpenStack_Swift源码分析——Object-auditor源码分析(2)

1 Object-aduitor审计具体分析 上一篇文章中,讲解了Object-aduitor的启动,其中审计的具体执行是AuditorWorker实现的,在run_audit中实例化了AuditorWorker类,并调用audit_all_objects方法,下面看此方法的具体代码实现: def audit_all_objects(self, mode='once', device_dirs=None): #run_forever传过来的mode 为forever description =

Android源码分析--MediaServer源码分析(二)

在上一篇博客中Android源码分析–MediaServer源码分析(一),我们知道了ProcessState和defaultServiceManager,在分析源码的过程中,我们被Android的Binder通信机制中的各种复杂的类关系搞的眼花缭乱,接下来我们就以MediaPlayerService为例来分析一下Binder的通信机制.首先来回顾一下: BpBinder和BBinder都是Android中Binder通信的代表类,其中BpBinder是客户端用来与Server交互的代理类,p代

OpenStack_Swift源码分析——ObjectReplicator源码分析(1)

1.ObjectorReplicator的启动 首先运行启动脚本 swift-init object-replicator start 此运行脚本的运行过程和ring运行脚本运行过程差不多,找到swift 源码bin下的swift-object-replicator其代码如下所示 if __name__ == '__main__': parser = OptionParser("%prog CONFIG [options]") parser.add_option('-d', '--de

OpenStack_Swift源码分析——ObjectReplicator源码分析(2)

1.Replicator执行代码详细分析 上篇问中介绍了启动Replicator的具体过程,下面讲解Replicator的执行代码的具体实现,首先看replicate方法: def replicate(self, override_devices=None, override_partitions=None): """Run a replication pass""" self.start = time.time() self.suffix_co

SpringMVC核心分发器DispatcherServlet分析[附带源码分析]

SpringMVC核心分发器DispatcherServlet分析[附带源码分析] 目录 前言 DispatcherServlet初始化过程 DispatcherServlet处理请求过程 总结 参考资料 前言 SpringMVC是目前主流的Web MVC框架之一. 如果有同学对它不熟悉,那么请参考它的入门blog:http://www.cnblogs.com/fangjian0423/p/springMVC-introduction.html 本文将分析SpringMVC的核心分发器Dispa

区块链教程以太坊源码分析core-state-process源码分析(二)

兄弟连区块链教程以太坊源码分析core-state-process源码分析(二):关于g0的计算,在黄皮书上由详细的介绍和黄皮书有一定出入的部分在于if contractCreation && homestead {igas.SetUint64(params.TxGasContractCreation) 这是因为 Gtxcreate+Gtransaction = TxGasContractCreation func IntrinsicGas(data []byte, contractCre

SpringMVC关于json、xml自动转换的原理研究[附带源码分析]

本文讨论SpringMVC关于json.xml自动转换的原理. 实现这个功能只需要三个配置 1.springmvc配置文件 dispatcher-servlet.xml中的关键配置如下 <mvc:resources location="/static/" mapping="/static/**"/> <!-- 配置包扫描器 --> <context:component-scan base-package="com.winner