spark core源码分析7 Executor的运行

实际任务的运行,都是通过Executor类来执行的。这一节,我们只介绍Standalone模式。

源码位置:org.apache.spark.executor.CoarseGrainedExecutorBackend

private def run(
    driverUrl: String,
    executorId: String,
    hostname: String,
    cores: Int,
    appId: String,
    workerUrl: Option[String],
    userClassPath: Seq[URL]) {
  SignalLogger.register(log)
  SparkHadoopUtil.get.runAsSparkUser { () =>
    // Debug code
    Utils.checkHost(hostname)

    // Bootstrap to fetch the driver's Spark properties.
    val executorConf = new SparkConf//创建Executor sparkConf
    val port = executorConf.getInt("spark.executor.port", 0)
    //创建akkaRpcEnv,内部包含actorSystem
    val fetcher = RpcEnv.create(
      "driverPropsFetcher",
      hostname,
      port,
      executorConf,
      new SecurityManager(executorConf))
    //获取driver的ActorRef
    val driver = fetcher.setupEndpointRefByURI(driverUrl)
    val props = driver.askWithRetry[Seq[(String, String)]](RetrieveSparkProps) ++
      Seq[(String, String)](("spark.app.id", appId))
    fetcher.shutdown()

    // Create SparkEnv using properties we fetched from the driver.
    val driverConf = new SparkConf()//创建driver sparkConf
    for ((key, value) <- props) {
      // this is required for SSL in standalone mode
      if (SparkConf.isExecutorStartupConf(key)) {
        driverConf.setIfMissing(key, value)
      } else {
        driverConf.set(key, value)
      }
    }
    if (driverConf.contains("spark.yarn.credentials.file")) {
      logInfo("Will periodically update credentials from: " +
        driverConf.get("spark.yarn.credentials.file"))
      SparkHadoopUtil.get.startExecutorDelegationTokenRenewer(driverConf)
    }
    //创建Executor 的sparkEnv,下面分析
    val env = SparkEnv.createExecutorEnv(
      driverConf, executorId, hostname, port, cores, isLocal = false)

    // SparkEnv sets spark.driver.port so it shouldn't be 0 anymore.
    val boundPort = env.conf.getInt("spark.executor.port", 0)
    assert(boundPort != 0)
    // Start the CoarseGrainedExecutorBackend endpoint.
    val sparkHostPort = hostname + ":" + boundPort
    //这里创建Executor 的ActorRef,onStart方法主要是向driver注册Executor,见下面分析
    env.rpcEnv.setupEndpoint("Executor", new CoarseGrainedExecutorBackend(
      env.rpcEnv, driverUrl, executorId, sparkHostPort, cores, userClassPath, env))
    //这个workerWatcher我没看出起什么作用的
    workerUrl.foreach { url =>
      env.rpcEnv.setupEndpoint("WorkerWatcher", new WorkerWatcher(env.rpcEnv, url))
    }
    env.rpcEnv.awaitTermination()
    SparkHadoopUtil.get.stopExecutorDelegationTokenRenewer()
  }
}

先介绍createExecutorEnv,这个与driver端的几乎一样,之前已经介绍过了,这里就介绍一下与driver不同的地方

1、mapOutputTracker在Executor端是MapOutputTrackerWorker对象,mapOutputTracker.trackerEndpoint实际引用的是driver的ActorRef。

2、blockManagerMaster在内部保存的也是driver的ActorRef

3、outputCommitCoordinator.coordinatorRef实际包含的也是driver的ActorRef

现在介绍一下CoarseGrainedExecutorBackend的onStart方法,看它主动干了什么事。

发送RegisterExecutor消息到driver端,注册Executor。成功返回后再向自己发送RegisteredExecutor消息

override def onStart() {
  logInfo("Connecting to driver: " + driverUrl)
  rpcEnv.asyncSetupEndpointRefByURI(driverUrl).flatMap { ref =>
    // This is a very fast action so we can use "ThreadUtils.sameThread"
    driver = Some(ref)
    ref.ask[RegisteredExecutor.type](
      RegisterExecutor(executorId, self, hostPort, cores, extractLogUrls))
  }(ThreadUtils.sameThread).onComplete {
    // This is a very fast action so we can use "ThreadUtils.sameThread"
    case Success(msg) => Utils.tryLogNonFatalError {
      Option(self).foreach(_.send(msg)) // msg must be RegisteredExecutor
    }
    case Failure(e) => logError(s"Cannot register with driver: $driverUrl", e)
  }(ThreadUtils.sameThread)
}

看driver端接收到后如何处理?重点看最后的makeOffers。当由Executor注册上来之后,如果有等待执行的任务,这时就可以开始了。这个方法后续还会用到,且目前还没讲到任务调度的章节,后续再解释。这里只需要知道,Executor注册上来之后,会触发一把任务调度(如果有任务的话)

case RegisterExecutor(executorId, executorRef, hostPort, cores, logUrls) =>
  Utils.checkHostPort(hostPort, "Host port expected " + hostPort)
  if (executorDataMap.contains(executorId)) {
    context.reply(RegisterExecutorFailed("Duplicate executor ID: " + executorId))
  } else {
    logInfo("Registered executor: " + executorRef + " with ID " + executorId)
    context.reply(RegisteredExecutor)//反馈RegisteredExecutor消息到Executor
    addressToExecutorId(executorRef.address) = executorId
    totalCoreCount.addAndGet(cores)//每注册成功一个Executor,就记录总的cores
    totalRegisteredExecutors.addAndGet(1)
    val (host, _) = Utils.parseHostPort(hostPort)
    val data = new ExecutorData(executorRef, executorRef.address, host, cores, cores, logUrls)
    // This must be synchronized because variables mutated
    // in this block are read when requesting executors
    CoarseGrainedSchedulerBackend.this.synchronized {
      executorDataMap.put(executorId, data)
      if (numPendingExecutors > 0) {
        numPendingExecutors -= 1
        logDebug(s"Decremented number of pending executors ($numPendingExecutors left)")
      }
    }
    listenerBus.post(
      SparkListenerExecutorAdded(System.currentTimeMillis(), executorId, data))
    makeOffers()
  }

Executor端接收到之后,创建真正的Executor对象,Executor类是运行任务的接口,里面维护着该Executor进程上的所有任务

case RegisteredExecutor =>
  logInfo("Successfully registered with driver")
  val (hostname, _) = Utils.parseHostPort(hostPort)
  executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false)

至此,Executor端的注册逻辑就介绍完了,后续将结合真正的任务介绍其他的内容。



版权声明:本文为博主原创文章,未经博主允许不得转载。

时间: 2024-12-12 05:06:46

spark core源码分析7 Executor的运行的相关文章

spark core源码分析10 Task的运行

这一节介绍具体task的运行以及最终结果的处理 看线程运行的run方法,见代码注释 override def run(): Unit = { val taskMemoryManager = new TaskMemoryManager(env.executorMemoryManager) val deserializeStartTime = System.currentTimeMillis() Thread.currentThread.setContextClassLoader(replClass

spark core源码分析6 Spark job的提交

本节主要讲解SparkContext的逻辑 首先看一个spark自带的最简单的例子: object SparkPi { def main(args: Array[String]) { val conf = new SparkConf().setAppName("Spark Pi") val spark = new SparkContext(conf) val slices = if (args.length > 0) args(0).toInt else 2 val n = ma

spark core源码分析8 从简单例子看transformation

前面提到过spark自带的一个最简单的例子,也介绍了SparkContext的部分,这节介绍剩余的内容中的transformation. object SparkPi { def main(args: Array[String]) { val conf = new SparkConf().setAppName("Spark Pi") val spark = new SparkContext(conf) val slices = if (args.length > 0) args(

spark core源码分析4 worker启动流程

源码位置:org.apache.spark.deploy.worker.Worker.scala 首先查看worker的main方法,与master类似,创建sparkConf,参数解析,以及构造worker对象并创建ActorRef用于对外或者本身的信息交互.这里masters参数可以设置多个 def main(argStrings: Array[String]) { SignalLogger.register(log) val conf = new SparkConf val args =

spark core源码分析9 从简单例子看action操作

上一节举例讲解了transformation操作,这一节以reduce为例讲解action操作 首先看submitJob方法,它将我们reduce中写的处理函数随JobSubmitted消息传递出去,因为每个分区都需要调用它进行计算: 而resultHandler是指最后合并的方法,在每个task完成后,需要调用resultHandler将最终结果合并.所以它不需要随JobSubmitted消息传递,而是保存在JobWaiter中 /** * Submit a job to the job sc

spark core源码分析14 参数配置

博客地址: http://blog.csdn.net/yueqian_zhu/ spark 参数详解 一.Shuffle 相关 1.spark.shuffle.manager(默认 sort) HashShuffleManager,故名思义也就是在Shuffle的过程中写数据时不做排序操作,只是将数据根据Hash的结果,将各个Reduce分区的数据写到各自的磁盘文件中.带来的问题就是如果Reduce分区的数量比较大的话,将会产生大量的磁盘文件.如果文件数量特别巨大,对文件读写的性能会带来比较大的

spark core源码分析13 异常情况下的容错保证

博客地址: http://blog.csdn.net/yueqian_zhu/ standalone模式下的框架图如下: 异常分析1: worker异常退出 worker异常退出,比如说有意识的通过kill指令将worker杀死 worker在退出之前,会将自己所管控的所有小弟executor全干掉 worker需要定期向master改善心跳消息的,现在worker进程都已经玩完了,哪有心跳消息,所以Master会在超时处理中意识到有一个"分舵"离开了 Master非常伤心,伤心的Ma

spark core源码分析15 Shuffle详解-写流程

博客地址: http://blog.csdn.net/yueqian_zhu/ Shuffle是一个比较复杂的过程,有必要详细剖析一下内部写的逻辑 ShuffleManager分为SortShuffleManager和HashShuffleManager 一.SortShuffleManager 每个ShuffleMapTask不会为每个Reducer生成一个单独的文件:相反,它会将所有的结果写到一个本地文件里,同时会生成一个index文件,Reducer可以通过这个index文件取得它需要处理

spark源码分析之Executor启动与任务提交篇

任务提交流程 概述 在阐明了Spark的Master的启动流程与Worker启动流程.接下继续执行的就是Worker上的Executor进程了,本文继续分析整个Executor的启动与任务提交流程 Spark-submit 提交一个任务到集群通过的是Spark-submit 通过启动脚本的方式启动它的主类,这里以WordCount为例子 spark-submit --class cn.itcast.spark.WordCount bin/spark-clas -> org.apache.spar