Spark学习之2:Worker启动流程

1. 启动脚本

sbin/start-slaves.sh

  1. # Launch the slaves
  2. if [ "$SPARK_WORKER_INSTANCES" = "" ]; then
  3. exec "$sbin/slaves.sh" cd "$SPARK_HOME" \; "$sbin/start-slave.sh" 1 "spark://$SPARK_MASTER_IP:$SPARK_MASTER_PORT"
  4. else
  5. if [ "$SPARK_WORKER_WEBUI_PORT" = "" ]; then
  6. SPARK_WORKER_WEBUI_PORT=8081
  7. fi
  8. for ((i=0; i<$SPARK_WORKER_INSTANCES; i++)); do
  9. "$sbin/slaves.sh" cd "$SPARK_HOME" \; "$sbin/start-slave.sh" $(( $i + 1 )) "spark://$SPARK_MASTER_IP:$SPARK_MASTER_PORT" --webui-port $(( $SPARK_WORKER_WEBUI_PORT + $i ))
  10. done
  11. fi

假设每个节点启动一个Worker。

具体执行:

  1. exec "$sbin/slaves.sh" cd "$SPARK_HOME" \; "$sbin/start-slave.sh" 1 "spark://$SPARK_MASTER_IP:$SPARK_MASTER_PORT"

该语句分为两部分:

(1)

  1. exec "$sbin/slaves.sh" cd "$SPARK_HOME"

登录到worker服务器并cd到SPARK_HOME目录。

(2)

  1. "$sbin/start-slave.sh" 1 "spark://$SPARK_MASTER_IP:$SPARK_MASTER_PORT"

在worker服务器执行sbin/start-slave.sh脚本。

参数“1”代码worker的编号,用来区分不同worker实例的日志文件。如:

  1. spark-xxx-org.apache.spark.deploy.worker.Worker-1-CentOS-02.out
  2. spark-xxx-org.apache.spark.deploy.worker.Worker-1.pid

其中“Worker-1”中的“1”就代表worker编号。

这个参数并不会传入Worker类。传入Worker类的参数为:

spark://$SPARK_MASTER_IP:$SPARK_MASTER_PORT。

2. Worker.main

  1. def main(argStrings: Array[String]) {
  2. SignalLogger.register(log)
  3. val conf = new SparkConf
  4. val args = new WorkerArguments(argStrings, conf)
  5. val (actorSystem, _) = startSystemAndActor(args.host, args.port, args.webUiPort, args.cores,
  6. args.memory, args.masters, args.workDir)
  7. actorSystem.awaitTermination()
  8. }

main函数的职责:

(1)创建WorkerArguments对象并初始化其成员;

(2)调用startSystemAndActor方法,创建ActorSystem对象并启动Worker actor;

2.1. WorkerArguments

  1. var cores = inferDefaultCores()
  2. var memory = inferDefaultMemory()

(1)计算默认核数

(2)计算默认内存大小

  1. parse(args.toList)
  2. // This mutates the SparkConf, so all accesses to it must be made after this line
  3. propertiesFile = Utils.loadDefaultSparkProperties(conf, propertiesFile)

(1)parse方法负责解析启动脚本所带的命令行参数;

(2)loadDefaultSparkProperties负责从配置文件中加载spark运行属性,默认而配置文件为spark-defaults.conf;

2.2. startSystemAndActor

  1. val (actorSystem, boundPort) = AkkaUtils.createActorSystem(systemName, host, port,
  2. conf = conf, securityManager = securityMgr)
  3. val masterAkkaUrls = masterUrls.map(Master.toAkkaUrl(_, AkkaUtils.protocol(actorSystem)))
  4. actorSystem.actorOf(Props(classOf[Worker], host, boundPort, webUiPort, cores, memory,
  5. masterAkkaUrls, systemName, actorName, workDir, conf, securityMgr), name = actorName)

(1)通过AkkaUtils.createActorSystem创建ActorSystem对象

(2)创建Worker actor并启动

3. Worker Actor

3.1. 重要数据成员

  1. val executors = new HashMap[String, ExecutorRunner]
  2. val finishedExecutors = new HashMap[String, ExecutorRunner]
  3. val drivers = new HashMap[String, DriverRunner]
  4. val finishedDrivers = new HashMap[String, DriverRunner]
  5. val appDirectories = new HashMap[String, Seq[String]]
  6. val finishedApps = new HashSet[String]

3.2. Worker.preStart

  1. createWorkDir()
  2. context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
  3. shuffleService.startIfEnabled()
  4. webUi = new WorkerWebUI(this, workDir, webUiPort)
  5. webUi.bind()
  6. registerWithMaster()

(1)创建Worker节点工作目录;

(2)监听RemotingLifecycleEvent事件,它一个trait:

  1. sealed trait RemotingLifecycleEvent extends Serializable {
  2. def logLevel: Logging.LogLevel
  3. }

Worker只处理了DisassociatedEvent消息。

(3)创建并启动WorkerWebUI

(4)向Master进行注册,registerWithMaster将调用tryRegisterAllMasters方法向Master节点发送注册消息

3.3. Worker.registerWithMaster

  1. registrationRetryTimer match {
  2. case None =>
  3. registered = false
  4. tryRegisterAllMasters()
  5. connectionAttemptCount = 0
  6. registrationRetryTimer = Some {
  7. context.system.scheduler.schedule(INITIAL_REGISTRATION_RETRY_INTERVAL,
  8. INITIAL_REGISTRATION_RETRY_INTERVAL, self, ReregisterWithMaster)
  9. }
  10. case Some(_) =>
  11. logInfo("Not spawning another attempt to register with the master, since there is an" +
  12. " attempt scheduled already.")
  13. }

(1)调用tryRegisterAllMasters方法向Master发起注册消息;

(2)创建注册重试定时器,通过向自己(Worker Actor)发送ReregisterWithMaster消息;

3.3.1. Worker.tryRegisterAllMasters

  1. for (masterAkkaUrl <- masterAkkaUrls) {
  2. logInfo("Connecting to master " + masterAkkaUrl + "...")
  3. val actor = context.actorSelection(masterAkkaUrl)
  4. actor ! RegisterWorker(workerId, host, port, cores, memory, webUi.boundPort, publicAddress)
  5. }

(1)创建Master Actor远程引用;

(2)向Master发送RegisterWorker消息;如果注册成功,Master将向Worker发送RegisteredWorker消息。

workerId是一个字符串,定义:

  1. val workerId = generateWorkerId()
  2. ...
  3. def generateWorkerId(): String = {
  4. "worker-%s-%s-%d".format(createDateFormat.format(new Date), host, port)
  5. }

格式:worker-时间-主机名-端口

3.4. Worker消息处理

3.4.1. RegisteredWorker消息

此消息表示Worker向Master注册成功消息;该消息处理的主要目的是启动心跳发送定时器。

  1. case RegisteredWorker(masterUrl, masterWebUiUrl) =>
  2. logInfo("Successfully registered with master " + masterUrl)
  3. registered = true
  4. changeMaster(masterUrl, masterWebUiUrl)
  5. context.system.scheduler.schedule(0 millis, HEARTBEAT_MILLIS millis, self, SendHeartbeat)
  6. if (CLEANUP_ENABLED) {
  7. logInfo(s"Worker cleanup enabled; old application directories will be deleted in: $workDir")
  8. context.system.scheduler.schedule(CLEANUP_INTERVAL_MILLIS millis,
  9. CLEANUP_INTERVAL_MILLIS millis, self, WorkDirCleanup)
  10. }

(1)设置注册状态;

(2)调用changeMaster方法

(3)创建心跳发送定时器,向自己(Worker Actor)发送SendHeartbeat消息;

3.4.1.1. Worker.changeMaster

  1. // activeMasterUrl it‘s a valid Spark url since we receive it from master.
  2. activeMasterUrl = url
  3. activeMasterWebUiUrl = uiUrl
  4. master = context.actorSelection(
  5. Master.toAkkaUrl(activeMasterUrl, AkkaUtils.protocol(context.system)))
  6. masterAddress = Master.toAkkaAddress(activeMasterUrl, AkkaUtils.protocol(context.system))
  7. connected = true
  8. // Cancel any outstanding re-registration attempts because we found a new master
  9. registrationRetryTimer.foreach(_.cancel())
  10. registrationRetryTimer = None

职责:

(1)创建Master远程引用并赋值给master;

(2)将连接状态设置为true;

(3)取消registrationRetryTimer定时器;

3.4.2. SendHeartbeat消息

  1. case SendHeartbeat =>
  2. if (connected) { master ! Heartbeat(workerId) }

向master发送Heartbeat消息。

3.4.3. ReregisterWithMaster消息

  1. case ReregisterWithMaster =>
  2. reregisterWithMaster()

reregisterWithMaster方法职责:

(1)如果已经注册成功,取消registrationRetryTimer定时器;

(2)如果注册失败,从新向master发送RegisterWorker消息;初始默认重连次数为6,最大重连次数为16。

  1. // The first six attempts to reconnect are in shorter intervals (between 5 and 15 seconds)
  2. // Afterwards, the next 10 attempts are between 30 and 90 seconds.
  3. // A bit of randomness is introduced so that not all of the workers attempt to reconnect at
  4. // the same time.
  5. val INITIAL_REGISTRATION_RETRIES = 6
  6. val TOTAL_REGISTRATION_RETRIES = INITIAL_REGISTRATION_RETRIES + 10

前6次和后10次采用不同的周期。

4. 启动结束

到此,Worker节点就启动完成,它定时向Master节点发送心跳。在SparkSubmit提交Application时,将接收Master发送的启动Executor消息,由Executor和Driver进行消息通信。

来自为知笔记(Wiz)

时间: 2024-11-10 16:06:12

Spark学习之2:Worker启动流程的相关文章

spark core源码分析4 worker启动流程

源码位置:org.apache.spark.deploy.worker.Worker.scala 首先查看worker的main方法,与master类似,创建sparkConf,参数解析,以及构造worker对象并创建ActorRef用于对外或者本身的信息交互.这里masters参数可以设置多个 def main(argStrings: Array[String]) { SignalLogger.register(log) val conf = new SparkConf val args =

【Spark2.0源码学习】-5.Worker启动

Worker作为Endpoint的具体实例,下面我们介绍一下Worker启动以及OnStart指令后的额外工作 一.脚本概览 下面是一个举例: /opt/jdk1.7.0_79/bin/java -cp /opt/spark-2.1.0/conf/:/opt/spark-2.1.0/jars/*:/opt/hadoop-2.6.4/etc/hadoop/ -Xmx1g -XX:MaxPermSize=256m org.apache.spark.deploy.worker.Worker --web

Tomcat学习 HttpConnector和HttpProcessor启动流程和线程交互

一.tomat启动流程 1.启动HttpConnector connector等待连接请求,只负责接受socket请求,具体处理过程交给HttpProcessor处理. tomcat用户只能访问到connector,能设置接受的数据的buffer大小,而不能看见HttpProcessor的处理过程. 2.创建HttpProcessor对象池 创建对象后马上调用start()方法启动processor的线程: private HttpProcessor newProcessor() { HttpP

Linux嵌入式驱动学习之路⑤u-boot启动流程分析

这里说的u-boot启动流程,值得是从上电开机执行u-boot,到u-boot,到u-boot加载操作系统的过程.这一过程可以分为两个过程,各个阶段的功能如下. 第一阶段的功能: 硬件设备初始化. 加载u-boot第二阶段代码到RAM空间. 设置好栈. 跳转到第二阶段代码入口. 第二阶段的功能: 初始化本阶段使用的硬件设备. 检查系统内存映射. 将内核从Flash读取到RAM中. 为内核设置启动参数. 调用内核. u-boot启动第一阶段流程 根据连接器脚本 board/samsung/$(BO

Linux学习笔记之内核启动流程与模块机制

本文旨在简单的介绍一下Linux的启动流程与模块机制: Linux启动的C入口位于/Linux.2.6.22.6/init/main.c::start_kernel() 下图简要的描述了一下内核初始化的流程: 本文我们分析一下do_initcalls ()函数,他负责大部分模块的初始化(比如U盘驱动就是在这里被初始化的). 1 static void __init do_initcalls(void) 2 { 3 initcall_t *call; 4 int count = preempt_c

salt的api学习记录---minion的启动流程

最近在看minion的启动的源代码,一路曲折啊,经过一番努力,终于理解了流程.现在记录下,方便以后查阅. 总体来说流程如下: 1.解析命令行参数和minion配置文件,得到options和config字典 2.设置日志(salt.log.setup.setup_logfile_logger负责) 3.设置pidfile 4.根据master参数决定调用salt.minion.MultiMinion或者salt.minion.Minion初始化 5.调用tune_in方法 解析命令行参数和配置文件

Spark-源码-Spark-StartAll Master Worler启动流程

Spark start-all>> """Master启动流程""" Master类 class Master( host: String, port: Int, webUiPort: Int, val securityMgr: SecurityManager, val conf: SparkConf) extends Actor with ActorLogReceive with Logging with LeaderElectab

蜗龙徒行-Spark学习笔记【三】Spark集群中worker节点扩展实战经验

一.集群原先配置: 从机名sparkMaster,Ubuntu12.04-32 ,用户名Root , 内存4g    (只用于任务调度和分配,不做计算节点) 从机名sparkSlave1,Ubuntu12.04-32 ,用户名Root , 内存4g    (计算节点) 从机名sparkSlave2,Ubuntu12.04-32 ,用户名Root , 内存1.7g (计算节点) 二.扩展原因:计算数据量增大,原先的两个工作节点已不不能满足实时性的需求,由于实验室计算资源有限,故将原先的调度节点也增

Spark的Master和Worker集群启动的源码分析

基于spark1.3.1的源码进行分析 spark master启动源码分析 1.在start-master.sh调用master的main方法,main方法调用 def main(argStrings: Array[String]) { SignalLogger.register(log) val conf = new SparkConf val args = new MasterArguments(argStrings, conf) val (actorSystem, _, _, _) =