1. 启动脚本
sbin/start-master.sh
"$sbin"/spark-daemon.sh start org.apache.spark.deploy.master.Master 1 --ip $SPARK_MASTER_IP --port $SPARK_MASTER_PORT --webui-port $SPARK_MASTER_WEBUI_PORT
参数:
(1)SPARK_MASTER_IP
(2)SPARK_MASTER_PORT
(3)SPARK_MASTER_WEBUI_PORT
Master类最终会通过bin/spark-class脚本启动。
其中的参数“1”用于表示master编号,在生成日志文件时起作用,并不会传入Master类。
spark-xxx-org.apache.spark.deploy.master.Master-1-CentOS-01.out
spark-xxx-org.apache.spark.deploy.master.Master-1.pid
其中“Master-1”中的“1”就是master编号。
2. Master.main
def main(argStrings: Array[String]) {
SignalLogger.register(log)
val conf = new SparkConf
val args = new MasterArguments(argStrings, conf)
val (actorSystem, _, _, _) = startSystemAndActor(args.host, args.port, args.webUiPort, conf)
actorSystem.awaitTermination()
}
main函数的职责:
(1)创建MasterArguments对象并初始化其成员;
(2)调用startSystemAndActor方法,创建ActorSystem对象并启动Master actor;
2.1. MasterArguments
parse(args.toList)
// This mutates the SparkConf, so all accesses to it must be made after this line
propertiesFile = Utils.loadDefaultSparkProperties(conf, propertiesFile)
(1)parse方法负责解析启动脚本所带的命令行参数;
(2)loadDefaultSparkProperties负责从配置文件中加载spark运行属性,默认而配置文件为spark-defaults.conf;
2.2. startSystemAndActor
val (actorSystem, boundPort) = AkkaUtils.createActorSystem(systemName, host, port, conf = conf,
securityManager = securityMgr)
val actor = actorSystem.actorOf(
Props(classOf[Master], host, boundPort, webUiPort, securityMgr, conf), actorName)
(1)通过AkkaUtils.createActorSystem创建ActorSystem对象
(2)创建Master actor并启动
3. Master Actor
3.1. 重要数据成员
val workers = new HashSet[WorkerInfo]
val idToWorker = new HashMap[String, WorkerInfo]
val addressToWorker = new HashMap[Address, WorkerInfo]
val apps = new HashSet[ApplicationInfo]
val idToApp = new HashMap[String, ApplicationInfo]
val actorToApp = new HashMap[ActorRef, ApplicationInfo]
val addressToApp = new HashMap[Address, ApplicationInfo]
val waitingApps = new ArrayBuffer[ApplicationInfo]
val completedApps = new ArrayBuffer[ApplicationInfo]
var nextAppNumber = 0
val appIdToUI = new HashMap[String, SparkUI]
val drivers = new HashSet[DriverInfo]
val completedDrivers = new ArrayBuffer[DriverInfo]
val waitingDrivers = new ArrayBuffer[DriverInfo] // Drivers currently spooled for scheduling
3.2. Master.preStart
// Listen for remote client disconnection events, since they don‘t go through Akka‘s watch()
context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
监听RemotingLifecycleEvent事件,它一个trait:
sealed trait RemotingLifecycleEvent extends Serializable {
def logLevel: Logging.LogLevel
}
Master只处理了DisassociatedEvent消息。
context.system.scheduler.schedule(0 millis, WORKER_TIMEOUT millis, self, CheckForWorkerTimeOut)
启动定时器,检查Worker超时;以Work超时时间为周期,向Master发送CheckForWorkerTimeOut消息;默认超时时间为60秒,可通过spark.worker.timeout属性设置。
val (persistenceEngine_, leaderElectionAgent_) = RECOVERY_MODE match {
case "ZOOKEEPER" =>
logInfo("Persisting recovery state to ZooKeeper")
val zkFactory =
new ZooKeeperRecoveryModeFactory(conf, SerializationExtension(context.system))
(zkFactory.createPersistenceEngine(), zkFactory.createLeaderElectionAgent(this))
case "FILESYSTEM" =>
val fsFactory =
new FileSystemRecoveryModeFactory(conf, SerializationExtension(context.system))
(fsFactory.createPersistenceEngine(), fsFactory.createLeaderElectionAgent(this))
case "CUSTOM" =>
val clazz = Class.forName(conf.get("spark.deploy.recoveryMode.factory"))
val factory = clazz.getConstructor(conf.getClass, Serialization.getClass)
.newInstance(conf, SerializationExtension(context.system))
.asInstanceOf[StandaloneRecoveryModeFactory]
(factory.createPersistenceEngine(), factory.createLeaderElectionAgent(this))
case _ =>
(new BlackHolePersistenceEngine(), new MonarchyLeaderAgent(this))
}
根据RECOVERY_MODE创建持久化引擎和领导选择代理。RECOVERY_MODE默认值为NONE,通过spark.deploy.recoveryMode进行配置。
假设RECOVERY_MODE值为NONE。
(1)创建BlackHolePersistenceEngine对象,不做任何持久化操作;
(2)创建MonarchyLeaderAgent对象,其主构造函数将向Master发送ElectedLeader消息
3.3. Master消息处理
3.3.1. ElectedLeader消息
case ElectedLeader => {
val (storedApps, storedDrivers, storedWorkers) = persistenceEngine.readPersistedData()
state = if (storedApps.isEmpty && storedDrivers.isEmpty && storedWorkers.isEmpty) {
RecoveryState.ALIVE
} else {
RecoveryState.RECOVERING
}
logInfo("I have been elected leader! New state: " + state)
if (state == RecoveryState.RECOVERING) {
beginRecovery(storedApps, storedDrivers, storedWorkers)
recoveryCompletionTask = context.system.scheduler.scheduleOnce(WORKER_TIMEOUT millis, self,
CompleteRecovery)
}
}
前面假设RECOVERY_MODE值为NONE,所以不执行任何recovery操作,直接将state设置为RecoveryState.ALIVE。
3.3.2. CheckForWorkerTimeOut消息
case CheckForWorkerTimeOut => {
timeOutDeadWorkers()
}
检查超时Worker节点。Worker节点超时时间默认为60秒,通过spark.worker.timeout属性设置。
3.3.3. DisassociatedEvent消息
case DisassociatedEvent(_, address, _) => {
// The disconnected client could‘ve been either a worker or an app; remove whichever it was
logInfo(s"$address got disassociated, removing it.")
addressToWorker.get(address).foreach(removeWorker)
addressToApp.get(address).foreach(finishApplication)
if (state == RecoveryState.RECOVERING && canCompleteRecovery) { completeRecovery() }
}
3.3.4. RegisterWorker消息
这是Worker和Master之间的注册消息。
val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory,
sender, workerUiPort, publicAddress)
if (registerWorker(worker)) {
persistenceEngine.addWorker(worker)
sender ! RegisteredWorker(masterUrl, masterWebUiUrl)
schedule()
}
(1)创建WorkerInfo对象;
(2)调用registerWorker方法,记录Worker信息;
(3)向Worker发送RegisteredWorker消息;
(4)调用schedule方法,该方法的职责是为Driver和App分配资源。
3.3.4.1. WorkerInfo
private[spark] class WorkerInfo(
val id: String,
val host: String,
val port: Int,
val cores: Int,
val memory: Int,
val actor: ActorRef,
val webUiPort: Int,
val publicAddress: String)
extends Serializable {
...
init()
...
private def init() {
executors = new mutable.HashMap
drivers = new mutable.HashMap
state = WorkerState.ALIVE
coresUsed = 0
memoryUsed = 0
lastHeartbeat = System.currentTimeMillis()
}
创建WorkerInfo对象,并调用init进行初始化。
3.3.4.2. Master.registerWorker
workers.filter { w =>
(w.host == worker.host && w.port == worker.port) && (w.state == WorkerState.DEAD)
}.foreach { w =>
workers -= w
}
移除状态位DEAD的WorkerInfo
val workerAddress = worker.actor.path.address
if (addressToWorker.contains(workerAddress)) {
val oldWorker = addressToWorker(workerAddress)
if (oldWorker.state == WorkerState.UNKNOWN) {
// A worker registering from UNKNOWN implies that the worker was restarted during recovery.
// The old worker must thus be dead, so we will remove it and accept the new worker.
removeWorker(oldWorker)
} else {
logInfo("Attempted to re-register worker at same address: " + workerAddress)
return false
}
}
workers += worker
idToWorker(worker.id) = worker
addressToWorker(workerAddress) = worker
记录WorkInfo信息至workers、idToWorker、addressToWorker。
4. 启动结束
到此,启动过程就完成了。
接下来开始等待worker及driver消息请求。