Spark学习之1:Master启动流程

1. 启动脚本

sbin/start-master.sh

  1. "$sbin"/spark-daemon.sh start org.apache.spark.deploy.master.Master 1 --ip $SPARK_MASTER_IP --port $SPARK_MASTER_PORT --webui-port $SPARK_MASTER_WEBUI_PORT

参数:

(1)SPARK_MASTER_IP

(2)SPARK_MASTER_PORT

(3)SPARK_MASTER_WEBUI_PORT

Master类最终会通过bin/spark-class脚本启动。

其中的参数“1”用于表示master编号,在生成日志文件时起作用,并不会传入Master类。

  1. spark-xxx-org.apache.spark.deploy.master.Master-1-CentOS-01.out
  2. spark-xxx-org.apache.spark.deploy.master.Master-1.pid

其中“Master-1”中的“1”就是master编号。

2. Master.main

  1. def main(argStrings: Array[String]) {
  2. SignalLogger.register(log)
  3. val conf = new SparkConf
  4. val args = new MasterArguments(argStrings, conf)
  5. val (actorSystem, _, _, _) = startSystemAndActor(args.host, args.port, args.webUiPort, conf)
  6. actorSystem.awaitTermination()
  7. }

main函数的职责:

(1)创建MasterArguments对象并初始化其成员;

(2)调用startSystemAndActor方法,创建ActorSystem对象并启动Master actor;

2.1. MasterArguments

  1. parse(args.toList)
  2. // This mutates the SparkConf, so all accesses to it must be made after this line
  3. propertiesFile = Utils.loadDefaultSparkProperties(conf, propertiesFile)

(1)parse方法负责解析启动脚本所带的命令行参数;

(2)loadDefaultSparkProperties负责从配置文件中加载spark运行属性,默认而配置文件为spark-defaults.conf;

2.2. startSystemAndActor

  1. val (actorSystem, boundPort) = AkkaUtils.createActorSystem(systemName, host, port, conf = conf,
  2. securityManager = securityMgr)
  3. val actor = actorSystem.actorOf(
  4. Props(classOf[Master], host, boundPort, webUiPort, securityMgr, conf), actorName)

(1)通过AkkaUtils.createActorSystem创建ActorSystem对象

(2)创建Master actor并启动

3. Master Actor

3.1. 重要数据成员

  1. val workers = new HashSet[WorkerInfo]
  2. val idToWorker = new HashMap[String, WorkerInfo]
  3. val addressToWorker = new HashMap[Address, WorkerInfo]
  4. val apps = new HashSet[ApplicationInfo]
  5. val idToApp = new HashMap[String, ApplicationInfo]
  6. val actorToApp = new HashMap[ActorRef, ApplicationInfo]
  7. val addressToApp = new HashMap[Address, ApplicationInfo]
  8. val waitingApps = new ArrayBuffer[ApplicationInfo]
  9. val completedApps = new ArrayBuffer[ApplicationInfo]
  10. var nextAppNumber = 0
  11. val appIdToUI = new HashMap[String, SparkUI]
  12. val drivers = new HashSet[DriverInfo]
  13. val completedDrivers = new ArrayBuffer[DriverInfo]
  14. val waitingDrivers = new ArrayBuffer[DriverInfo] // Drivers currently spooled for scheduling

3.2. Master.preStart

  1. // Listen for remote client disconnection events, since they don‘t go through Akka‘s watch()
  2. context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])

监听RemotingLifecycleEvent事件,它一个trait:

  1. sealed trait RemotingLifecycleEvent extends Serializable {
  2. def logLevel: Logging.LogLevel
  3. }

Master只处理了DisassociatedEvent消息。

  1. context.system.scheduler.schedule(0 millis, WORKER_TIMEOUT millis, self, CheckForWorkerTimeOut)

启动定时器,检查Worker超时;以Work超时时间为周期,向Master发送CheckForWorkerTimeOut消息;默认超时时间为60秒,可通过spark.worker.timeout属性设置。

  1. val (persistenceEngine_, leaderElectionAgent_) = RECOVERY_MODE match {
  2. case "ZOOKEEPER" =>
  3. logInfo("Persisting recovery state to ZooKeeper")
  4. val zkFactory =
  5. new ZooKeeperRecoveryModeFactory(conf, SerializationExtension(context.system))
  6. (zkFactory.createPersistenceEngine(), zkFactory.createLeaderElectionAgent(this))
  7. case "FILESYSTEM" =>
  8. val fsFactory =
  9. new FileSystemRecoveryModeFactory(conf, SerializationExtension(context.system))
  10. (fsFactory.createPersistenceEngine(), fsFactory.createLeaderElectionAgent(this))
  11. case "CUSTOM" =>
  12. val clazz = Class.forName(conf.get("spark.deploy.recoveryMode.factory"))
  13. val factory = clazz.getConstructor(conf.getClass, Serialization.getClass)
  14. .newInstance(conf, SerializationExtension(context.system))
  15. .asInstanceOf[StandaloneRecoveryModeFactory]
  16. (factory.createPersistenceEngine(), factory.createLeaderElectionAgent(this))
  17. case _ =>
  18. (new BlackHolePersistenceEngine(), new MonarchyLeaderAgent(this))
  19. }

根据RECOVERY_MODE创建持久化引擎和领导选择代理。RECOVERY_MODE默认值为NONE,通过spark.deploy.recoveryMode进行配置。

假设RECOVERY_MODE值为NONE。

(1)创建BlackHolePersistenceEngine对象,不做任何持久化操作;

(2)创建MonarchyLeaderAgent对象,其主构造函数将向Master发送ElectedLeader消息

3.3. Master消息处理

3.3.1. ElectedLeader消息

  1. case ElectedLeader => {
  2. val (storedApps, storedDrivers, storedWorkers) = persistenceEngine.readPersistedData()
  3. state = if (storedApps.isEmpty && storedDrivers.isEmpty && storedWorkers.isEmpty) {
  4. RecoveryState.ALIVE
  5. } else {
  6. RecoveryState.RECOVERING
  7. }
  8. logInfo("I have been elected leader! New state: " + state)
  9. if (state == RecoveryState.RECOVERING) {
  10. beginRecovery(storedApps, storedDrivers, storedWorkers)
  11. recoveryCompletionTask = context.system.scheduler.scheduleOnce(WORKER_TIMEOUT millis, self,
  12. CompleteRecovery)
  13. }
  14. }

前面假设RECOVERY_MODE值为NONE,所以不执行任何recovery操作,直接将state设置为RecoveryState.ALIVE。

3.3.2. CheckForWorkerTimeOut消息

  1. case CheckForWorkerTimeOut => {
  2. timeOutDeadWorkers()
  3. }

检查超时Worker节点。Worker节点超时时间默认为60秒,通过spark.worker.timeout属性设置。

3.3.3. DisassociatedEvent消息

  1. case DisassociatedEvent(_, address, _) => {
  2. // The disconnected client could‘ve been either a worker or an app; remove whichever it was
  3. logInfo(s"$address got disassociated, removing it.")
  4. addressToWorker.get(address).foreach(removeWorker)
  5. addressToApp.get(address).foreach(finishApplication)
  6. if (state == RecoveryState.RECOVERING && canCompleteRecovery) { completeRecovery() }
  7. }

3.3.4. RegisterWorker消息

这是Worker和Master之间的注册消息。

  1. val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory,
  2. sender, workerUiPort, publicAddress)
  3. if (registerWorker(worker)) {
  4. persistenceEngine.addWorker(worker)
  5. sender ! RegisteredWorker(masterUrl, masterWebUiUrl)
  6. schedule()
  7. }

(1)创建WorkerInfo对象;

(2)调用registerWorker方法,记录Worker信息;

(3)向Worker发送RegisteredWorker消息;

(4)调用schedule方法,该方法的职责是为Driver和App分配资源。

3.3.4.1. WorkerInfo

  1. private[spark] class WorkerInfo(
  2. val id: String,
  3. val host: String,
  4. val port: Int,
  5. val cores: Int,
  6. val memory: Int,
  7. val actor: ActorRef,
  8. val webUiPort: Int,
  9. val publicAddress: String)
  10. extends Serializable {
  11. ...
  12. init()
  13. ...
  14. private def init() {
  15. executors = new mutable.HashMap
  16. drivers = new mutable.HashMap
  17. state = WorkerState.ALIVE
  18. coresUsed = 0
  19. memoryUsed = 0
  20. lastHeartbeat = System.currentTimeMillis()
  21. }

创建WorkerInfo对象,并调用init进行初始化。

3.3.4.2. Master.registerWorker

  1. workers.filter { w =>
  2. (w.host == worker.host && w.port == worker.port) && (w.state == WorkerState.DEAD)
  3. }.foreach { w =>
  4. workers -= w
  5. }

移除状态位DEAD的WorkerInfo

  1. val workerAddress = worker.actor.path.address
  2. if (addressToWorker.contains(workerAddress)) {
  3. val oldWorker = addressToWorker(workerAddress)
  4. if (oldWorker.state == WorkerState.UNKNOWN) {
  5. // A worker registering from UNKNOWN implies that the worker was restarted during recovery.
  6. // The old worker must thus be dead, so we will remove it and accept the new worker.
  7. removeWorker(oldWorker)
  8. } else {
  9. logInfo("Attempted to re-register worker at same address: " + workerAddress)
  10. return false
  11. }
  12. }
  13. workers += worker
  14. idToWorker(worker.id) = worker
  15. addressToWorker(workerAddress) = worker

记录WorkInfo信息至workers、idToWorker、addressToWorker。

4. 启动结束

到此,启动过程就完成了。

接下来开始等待worker及driver消息请求。

来自为知笔记(Wiz)

时间: 2024-10-16 14:59:44

Spark学习之1:Master启动流程的相关文章

Tomcat学习 HttpConnector和HttpProcessor启动流程和线程交互

一.tomat启动流程 1.启动HttpConnector connector等待连接请求,只负责接受socket请求,具体处理过程交给HttpProcessor处理. tomcat用户只能访问到connector,能设置接受的数据的buffer大小,而不能看见HttpProcessor的处理过程. 2.创建HttpProcessor对象池 创建对象后马上调用start()方法启动processor的线程: private HttpProcessor newProcessor() { HttpP

Linux嵌入式驱动学习之路⑤u-boot启动流程分析

这里说的u-boot启动流程,值得是从上电开机执行u-boot,到u-boot,到u-boot加载操作系统的过程.这一过程可以分为两个过程,各个阶段的功能如下. 第一阶段的功能: 硬件设备初始化. 加载u-boot第二阶段代码到RAM空间. 设置好栈. 跳转到第二阶段代码入口. 第二阶段的功能: 初始化本阶段使用的硬件设备. 检查系统内存映射. 将内核从Flash读取到RAM中. 为内核设置启动参数. 调用内核. u-boot启动第一阶段流程 根据连接器脚本 board/samsung/$(BO

Linux学习笔记之内核启动流程与模块机制

本文旨在简单的介绍一下Linux的启动流程与模块机制: Linux启动的C入口位于/Linux.2.6.22.6/init/main.c::start_kernel() 下图简要的描述了一下内核初始化的流程: 本文我们分析一下do_initcalls ()函数,他负责大部分模块的初始化(比如U盘驱动就是在这里被初始化的). 1 static void __init do_initcalls(void) 2 { 3 initcall_t *call; 4 int count = preempt_c

salt的api学习记录---minion的启动流程

最近在看minion的启动的源代码,一路曲折啊,经过一番努力,终于理解了流程.现在记录下,方便以后查阅. 总体来说流程如下: 1.解析命令行参数和minion配置文件,得到options和config字典 2.设置日志(salt.log.setup.setup_logfile_logger负责) 3.设置pidfile 4.根据master参数决定调用salt.minion.MultiMinion或者salt.minion.Minion初始化 5.调用tune_in方法 解析命令行参数和配置文件

Spark-源码-Spark-StartAll Master Worler启动流程

Spark start-all>> """Master启动流程""" Master类 class Master( host: String, port: Int, webUiPort: Int, val securityMgr: SecurityManager, val conf: SparkConf) extends Actor with ActorLogReceive with Logging with LeaderElectab

【Spark2.0源码学习】-5.Worker启动

Worker作为Endpoint的具体实例,下面我们介绍一下Worker启动以及OnStart指令后的额外工作 一.脚本概览 下面是一个举例: /opt/jdk1.7.0_79/bin/java -cp /opt/spark-2.1.0/conf/:/opt/spark-2.1.0/jars/*:/opt/hadoop-2.6.4/etc/hadoop/ -Xmx1g -XX:MaxPermSize=256m org.apache.spark.deploy.worker.Worker --web

nginx学习十一 nginx启动流程

今天用了一天的时间看nginx的启动流程,流程还是很复杂,主要的函数调用有十几个之多,通过看源码和上网查资料,弄懂了一些函数,有些函数还在学习中,有些函数还待日后学习,这里记录一下今天所学.加油! http://blog.csdn.net/xiaoliangsky/article/details/39856803 1nginx.c 启动的程序主要在src/core/nginx.c中,和普通函数一样,main函数是其入口函数:下面我们看看main函数的源代码: int ngx_cdecl main

spark core源码分析4 worker启动流程

源码位置:org.apache.spark.deploy.worker.Worker.scala 首先查看worker的main方法,与master类似,创建sparkConf,参数解析,以及构造worker对象并创建ActorRef用于对外或者本身的信息交互.这里masters参数可以设置多个 def main(argStrings: Array[String]) { SignalLogger.register(log) val conf = new SparkConf val args =

Android FM模块学习之一 FM启动流程

转自:http://blog.csdn.net/tfslovexizi/article/details/41283743 最近在学习FM模块,FM是一个值得学习的模块,可以从上层看到底层.上层就是FM的按扭操作和界面显示,从而调用到FM底层驱动来实现广播收听的功能. 看看Fm启动流程:如下图: 先进入FMRadio.java类,onCreate初始化一些数据,画出FM界面,启动fm在onStart()方法里启动FMRadioService.java (调用bindToService(this,