上一篇文章 spark 源码理解1 从spark启动脚本开始 是分析执行start_all.sh时,集群中启动了哪些进程,下面我们再深入一点看看这些进程都是做什么用的,它们之间又是如何通信的?
一、Master进程的启动
Master进程,它主要负责对Worker、Driver、App等资源的管理并与它们进行通信,这篇文章中我打算着重讲一下它与Worker的通信,其它的部分放在以后的章节再加以描述。
spark-daemon.sh start org.apache.spark.deploy.master.Master 1 --ip $SPARK_MASTER_IP --port $SPARK_MASTER_PORT --webui-port $SPARK_MASTER_WEBUI_PORT
前面说过,我们最终是通过上面的命令来启动Master进程的,其中org.apache.spark.deploy.master.Master是类名,其余为参数:1 表示实例个数,--ip 主机地址,--port 主机端口,默认7077,--webui-port web界面端口,默认8080,它们的解析是由org.apache.spark.deploy.master.MasterArguments 完成的。
在接下来深入之前,简单介绍一下Spark重要的通信协议Akka,它是基于消息传递的机制,scala原生支持的,具有一下特点:
Actors
Actors为你提供:
对并发/并行程序的简单的、高级别的抽象。
异步、非阻塞、高性能的事件驱动编程模型。
非常轻量的事件驱动处理(1G内存可容纳约270万个actors)。
容错性
使用“let-it-crash”语义和监管者树形结构来实现容错。非常适合编写永不停机、自愈合的高容错系统。监管者树形结构可以跨多个JVM来提供真正的高容错系统。
位置透明性
Akka的所有元素都为分布式环境而设计:所有actor都仅通过发送消息进行互操作,所有操作都是异步的。
事务性actors
事务性Actor是actor与STM(Software Transactional Memory)的组合。它使你能够使用自动重试和回滚来组合出原子消息流。
有一个很好的入门级教程可以参考 过忘记忆 的 Akka学习笔记 。
继续回到Master.scala这个文件,它由Master类和Master对象组成。在scala中,这个对象被叫做伴生对象(object Companion),是一个Singleton(注:类和单例对象存在于同一个文件中,才能被称之为Singleton,否则为孤立对象),对应的类叫做伴生类(class Companion),它们之间可以彼此访问对方的私有成员。
程序的入口是Master对象的main函数,它主要调用startSystemAndActor方法,启动Master并返回一个Tuple4的对象。
val (actorSystem, _, _, _) = startSystemAndActor(args.host, args.port, args.webUiPort, conf)
/** * Start the Master and return a four tuple of: * (1) The Master actor system * (2) The bound port * (3) The web UI bound port * (4) The REST server bound port, if any */ def startSystemAndActor( host: String, port: Int, webUiPort: Int, conf: SparkConf): (ActorSystem, Int, Int, Option[Int]) = { val securityMgr = new SecurityManager(conf) val (actorSystem, boundPort) = AkkaUtils.createActorSystem(systemName, host, port, conf = conf, securityManager = securityMgr) val actor = actorSystem.actorOf( Props(classOf[Master], host, boundPort, webUiPort, securityMgr, conf), actorName) val timeout = AkkaUtils.askTimeout(conf) val portsRequest = actor.ask(BoundPortsRequest)(timeout) val portsResponse = Await.result(portsRequest, timeout).asInstanceOf[BoundPortsResponse] (actorSystem, boundPort, portsResponse.webUIPort, portsResponse.restPort) }
生成Master实例时,会创建 Master Metrics System, Application Metrics System,并启动它们,然后根据RECOVERY_MODE来指定Master的容错恢复模式,这里可供选择的有ZOOKEEPER,FILESYSTEM,CUSTOM和其它BlackHole。
val (persistenceEngine_, leaderElectionAgent_) = RECOVERY_MODE match { case "ZOOKEEPER" => logInfo("Persisting recovery state to ZooKeeper") val zkFactory = new ZooKeeperRecoveryModeFactory(conf, SerializationExtension(context.system)) (zkFactory.createPersistenceEngine(), zkFactory.createLeaderElectionAgent(this)) case "FILESYSTEM" => val fsFactory = new FileSystemRecoveryModeFactory(conf, SerializationExtension(context.system)) (fsFactory.createPersistenceEngine(), fsFactory.createLeaderElectionAgent(this)) case "CUSTOM" => val clazz = Class.forName(conf.get("spark.deploy.recoveryMode.factory")) val factory = clazz.getConstructor(conf.getClass, Serialization.getClass) .newInstance(conf, SerializationExtension(context.system)) .asInstanceOf[StandaloneRecoveryModeFactory] (factory.createPersistenceEngine(), factory.createLeaderElectionAgent(this)) case _ => (new BlackHolePersistenceEngine(), new MonarchyLeaderAgent(this)) }
Master进程启来后,开始监听其它进程发来的消息,并在receiveWithLogging中对它们进行相应的处理。
二、Worker进程的启动及与Master的通信
同样地,当启动脚本执行到
spark-daemon.sh start org.apache.spark.deploy.worker.Worker "[email protected]"
会启动Worker进程,启动过程跟Master的启动很相似,先是通过org.apache.spark.deploy.worker.WorkerArguments 进行参数解析,得到端口、内核数、内存、web ui 端口、工作目录等信息,然后生成Worker Actor并启动它。
Worker会初始化如下信息:
1. 发送心跳时间 默认 60*1000/4 毫秒
2. 重连次数为 6 + 10
3. 前6次 间隔5-15秒
4. 后10次 间隔30-90秒
5. 清理应用程序文件夹时间间隔为 60*30/1000 毫秒
启动时,向Master发送RegisterWorker消息:
val actor = context.actorSelection(masterAkkaUrl) actor ! RegisterWorker(workerId, host, port, cores, memory, webUi.boundPort, publicAddress)
Master通过以下代码来响应此消息:
case RegisterWorker(id, workerHost, workerPort, cores, memory, workerUiPort, publicAddress) => { logInfo("Registering worker %s:%d with %d cores, %s RAM".format( workerHost, workerPort, cores, Utils.megabytesToString(memory))) if (state == RecoveryState.STANDBY) { // ignore, don‘t send response } else if (idToWorker.contains(id)) { sender ! RegisterWorkerFailed("Duplicate worker ID") } else { val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory, sender, workerUiPort, publicAddress) if (registerWorker(worker)) { persistenceEngine.addWorker(worker) sender ! RegisteredWorker(masterUrl, masterWebUiUrl) schedule() } else { val workerAddress = worker.actor.path.address logWarning("Worker registration failed. Attempted to re-register worker at same " + "address: " + workerAddress) sender ! RegisterWorkerFailed("Attempted to re-register worker at same address: " + workerAddress) } } }
上面代码大致理解为:如果当前Master为Active状态(HA模式下),并且目标work没有成功注册过,此时将work信息添加到workers, idToWorker, addressToWorker, 并向Worker Actor 发送RegisteredWorker消息,否则发送 RegisterWorkerFailed 消息。
当Worker 收到 RegisteredWorker 消息后,更新Master信息,并向Master按心跳时间间隔不停的发送 Heartbeat 消息,以告诉当前Worker 是“活”的。
另外,Worker 还会创建 Worker Metrics System。
至此,Master、Worker(s)都已在各自主机启动,并连接成功。