spark core源码分析4 worker启动流程

源码位置:org.apache.spark.deploy.worker.Worker.scala

首先查看worker的main方法,与master类似,创建sparkConf,参数解析,以及构造worker对象并创建ActorRef用于对外或者本身的信息交互。这里masters参数可以设置多个

def main(argStrings: Array[String]) {
  SignalLogger.register(log)
  val conf = new SparkConf
  val args = new WorkerArguments(argStrings, conf)
  val (actorSystem, _) = startSystemAndActor(args.host, args.port, args.webUiPort, args.cores,
    args.memory, args.masters, args.workDir)
  actorSystem.awaitTermination()
}
程序起来后,同样是先执行akka 的preStart方法
override def preStart() {
  assert(!registered)
  logInfo("Starting Spark worker %s:%d with %d cores, %s RAM".format(
    host, port, cores, Utils.megabytesToString(memory)))
  logInfo(s"Running Spark version ${org.apache.spark.SPARK_VERSION}")
  logInfo("Spark home: " + sparkHome)
  createWorkDir()//创建worker内部工作目录
  //订阅akka生命周期事件
  context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
  //是否额外的启动一个shuffle服务,确保被executor所读写的shuffle文件在executor退出后被保存,可配
  shuffleService.startIfEnabled()
  webUi = new WorkerWebUI(this, workDir, webUiPort)
  webUi.bind()
  registerWithMaster()//最重要的动作了,见下面

  metricsSystem.registerSource(workerSource)
  metricsSystem.start()
  // Attach the worker metrics servlet handler to the web ui after the metrics system is started.
  metricsSystem.getServletHandlers.foreach(webUi.attachHandler)
}

向Master注册自己

private def registerWithMaster() {
  // DisassociatedEvent may be triggered multiple times, so don't attempt registration
  // if there are outstanding registration attempts scheduled.
  registrationRetryTimer match {
    case None =>
      registered = false
      //这里向所有的master actorRef发送RegisterWorker消息,上几节有讲master收到该消息后,如果成功处理会反馈RegisteredWorker消息,不成功会发送RegisterWorkerFailed消息
      tryRegisterAllMasters()
      connectionAttemptCount = 0
      //这里在一定时间之后会进入ReregisterWithMaster,里面会判断是否已注册,如果没有会再次发送注册信息。这个是否注册的状态是由master反馈回来的
      registrationRetryTimer = Some {
        context.system.scheduler.schedule(INITIAL_REGISTRATION_RETRY_INTERVAL,
          INITIAL_REGISTRATION_RETRY_INTERVAL, self, ReregisterWithMaster)
      }
    case Some(_) =>
      logInfo("Not spawning another attempt to register with the master, since there is an" +
        " attempt scheduled already.")
  }
}

看worker收到master的RegisteredWorker消息会怎么做?这里要说一点,worker要注册时并不知道哪台是主,哪台是备,所以向所有配置的master都发送注册信息。主备都收到worker的注册信息之后,只有主才会反馈,并带上自己的masterUrl信息,worker以此来认定主master的actorRef用于真正的信息交互

worker要通过心跳来保持与master的时刻连通,所以注册成功之后,有一个connected标记是否连接正常,在changeMaster方法内部设置connected = true
<pre name="code" class="java">case RegisteredWorker(masterUrl, masterWebUiUrl) =>
  logInfo("Successfully registered with master " + masterUrl)
  registered = true //状态设置为已注册,不然的话,一定时间过后,会发起ReregisterWithMaster而重复注册
  changeMaster(masterUrl, masterWebUiUrl)//这里是将主master的信息保存

  //在注册成功之后,才开启定时器向master发送心跳
  context.system.scheduler.schedule(0 millis, HEARTBEAT_MILLIS millis, self, SendHeartbeat)
  //定时器清理workDir下很久都没有更新的且app也不在执行状态的目录
  if (CLEANUP_ENABLED) {
    logInfo(s"Worker cleanup enabled; old application directories will be deleted in: $workDir")
    context.system.scheduler.schedule(CLEANUP_INTERVAL_MILLIS millis,
      CLEANUP_INTERVAL_MILLIS millis, self, WorkDirCleanup)
  }

如果收到RegisterWorkerFailed消息,则退出



下面看master接受到worker的心跳之后如何处理
由于worker注册时,master已经将workerId存入idToWorker中,所以这里走Some分支。很简单,只是更新该worker的一个时间戳。这里有必要说明一下None分支,在注册消息到达后,在master 的idToWorker和workers中都会保存,但是当master检测到worker超时时,将worker从idToWorker中删除,这样新的任务就选不了该worker了,但不删除workers中的。workers中的只会在间隔很长一段时间之后仍然没有心跳上来,才说明该worker真正无法再工作了,再从workers中删除。这里的None分支就是应对超时过后,心跳又继续上来了,就向worker发送重新注册的消息ReconnectWorker

case Heartbeat(workerId) => {
  idToWorker.get(workerId) match {
    case Some(workerInfo) =>
      workerInfo.lastHeartbeat = System.currentTimeMillis()
    case None =>
      if (workers.map(_.id).contains(workerId)) {
        logWarning(s"Got heartbeat from unregistered worker $workerId." +
          " Asking it to re-register.")
        sender ! ReconnectWorker(masterUrl)
      } else {
        logWarning(s"Got heartbeat from unregistered worker $workerId." +
          " This worker was never registered, so ignoring the heartbeat.")
      }
  }
}

至此,worker启动流程以及主动发送的消息介绍完了,剩下的都是被动接收并处理的流程,在之后结合具体job介绍。。。

版权声明:本文为博主原创文章,未经博主允许不得转载。

时间: 2024-10-25 03:01:47

spark core源码分析4 worker启动流程的相关文章

spark core源码分析6 Spark job的提交

本节主要讲解SparkContext的逻辑 首先看一个spark自带的最简单的例子: object SparkPi { def main(args: Array[String]) { val conf = new SparkConf().setAppName("Spark Pi") val spark = new SparkContext(conf) val slices = if (args.length > 0) args(0).toInt else 2 val n = ma

Tomcat源码分析之—具体启动流程分析

从Tomcat启动调用栈可知,Bootstrap类的main方法为整个Tomcat的入口,在init初始化Bootstrap类的时候为设置Catalina的工作路径也就是Catalina_HOME信息.Catalina.base信息,在initClassLoaders方法中初始化类加载器,然后通过反射初始化org.apache.catalina.startup.Catalina作为catalina守护进程: 一.load Bootstrap中load流程: 反射调用Catalina的load方法

spark core源码分析7 Executor的运行

实际任务的运行,都是通过Executor类来执行的.这一节,我们只介绍Standalone模式. 源码位置:org.apache.spark.executor.CoarseGrainedExecutorBackend private def run( driverUrl: String, executorId: String, hostname: String, cores: Int, appId: String, workerUrl: Option[String], userClassPath

spark core源码分析8 从简单例子看transformation

前面提到过spark自带的一个最简单的例子,也介绍了SparkContext的部分,这节介绍剩余的内容中的transformation. object SparkPi { def main(args: Array[String]) { val conf = new SparkConf().setAppName("Spark Pi") val spark = new SparkContext(conf) val slices = if (args.length > 0) args(

spark core源码分析14 参数配置

博客地址: http://blog.csdn.net/yueqian_zhu/ spark 参数详解 一.Shuffle 相关 1.spark.shuffle.manager(默认 sort) HashShuffleManager,故名思义也就是在Shuffle的过程中写数据时不做排序操作,只是将数据根据Hash的结果,将各个Reduce分区的数据写到各自的磁盘文件中.带来的问题就是如果Reduce分区的数量比较大的话,将会产生大量的磁盘文件.如果文件数量特别巨大,对文件读写的性能会带来比较大的

Spring源码分析2 — 容器启动流程

1 主要类 部署web应用时,web容器(比如Tomcat)会读取配置在web.xml中的监听器,从而启动spring容器.有了spring容器之后,我们才能使用spring的IOC AOP等特性.弄清spring容器启动流程,有利于理解spring IOC中的各种特性,比如BeanPostProcessor,MessageSource,ApplicationListener等.我们先来看下容器启动流程中涉及的主要类. ContextLoaderListener:注册在web.xml中,web应

spark core源码分析10 Task的运行

这一节介绍具体task的运行以及最终结果的处理 看线程运行的run方法,见代码注释 override def run(): Unit = { val taskMemoryManager = new TaskMemoryManager(env.executorMemoryManager) val deserializeStartTime = System.currentTimeMillis() Thread.currentThread.setContextClassLoader(replClass

spark core源码分析9 从简单例子看action操作

上一节举例讲解了transformation操作,这一节以reduce为例讲解action操作 首先看submitJob方法,它将我们reduce中写的处理函数随JobSubmitted消息传递出去,因为每个分区都需要调用它进行计算: 而resultHandler是指最后合并的方法,在每个task完成后,需要调用resultHandler将最终结果合并.所以它不需要随JobSubmitted消息传递,而是保存在JobWaiter中 /** * Submit a job to the job sc

spark core源码分析13 异常情况下的容错保证

博客地址: http://blog.csdn.net/yueqian_zhu/ standalone模式下的框架图如下: 异常分析1: worker异常退出 worker异常退出,比如说有意识的通过kill指令将worker杀死 worker在退出之前,会将自己所管控的所有小弟executor全干掉 worker需要定期向master改善心跳消息的,现在worker进程都已经玩完了,哪有心跳消息,所以Master会在超时处理中意识到有一个"分舵"离开了 Master非常伤心,伤心的Ma