Spark-源码-Spark-StartAll Master Worler启动流程

Spark start-all>>

"""Master启动流程"""

Master类
class Master(
    host: String,
    port: Int,
    webUiPort: Int,
    val securityMgr: SecurityManager,
    val conf: SparkConf) extends Actor with ActorLogReceive with Logging with LeaderElectable

Master端
def main(){
	val (actorSystem, _, _, _) = startSystemAndActor(args.host, args.port, args.webUiPort, conf)
    actorSystem.awaitTermination()
}

Master端
def startSystemAndActor(System, Int, Int, Option[Int]) = {
	//调用AkkaUtils创建ActorSystem
	val (actorSystem, boundPort) = AkkaUtils.createActorSystem(systemName, host, port, conf = conf,
	  securityManager = securityMgr)
	//创建属于Master的actor, 在创建actor的同时, 会使用classOf[Master]初始化Master
	val actor = actorSystem.actorOf(Props(classOf[Master], host, boundPort, webUiPort, securityMgr, conf), actorName)
}

Master端
"""初始化Master时由于Master继承了 trait Actor 重写了preStart方法,
Actor的初始化会启动preStart方法 因此找到Master的 override def preStart()
preStart属于生命周期方法, 在构造器之后, receiver之前"""
override def preStart() {
	// 启动一个定时器, 定时检查超时的Worker, WORKER_TIMEOUT:每六十秒检查一次,
	// self:先对着自己来一下(检查)试试
	context.system.scheduler.schedule(0 millis, WORKER_TIMEOUT millis, self, CheckForWorkerTimeOut)
 	// 调用 timeOutDeadWorkers() 方法,
 	override def receiveWithLogging = {
	 	case CheckForWorkerTimeOut => {
	      timeOutDeadWorkers()
	    }
	}

	// 用来检查并移除所有超时的workers
	def timeOutDeadWorkers(){
		// 事实上是移除了一个存有WorkInfo的HashSet[WrokInfo]中的对象
		val toRemove = workers.filter(_.lastHeartbeat < currentTime - WORKER_TIMEOUT_MS).toArray
		for (worker <- toRemove) {
	      if (worker.state != WorkerState.DEAD) {
	        removeWorker(worker)
	      }
	    }
	}

	def removeWorker(worker: WorkerInfo){
		// 删除内存里的workInfo
		idToWorker -= worker.id
	    addressToWorker -= worker.endpoint.address
	}
}

"""之后执行receive方法(1.3版本), 在后来的1.6版本中叫 def receive: PartialFunction[Any, Unit]"""
Master端
override def receiveWithLogging () {}
会不断的接收actor发送过来的请求

"""Worker启动流程"""

Worker类
class Worker(
    host: String,
    port: Int,
    webUiPort: Int,
    cores: Int,
    memory: Int,
    masterAkkaUrls: Array[String],
    actorSystemName: String,
    actorName: String,
    workDirPath: String = null,
    val conf: SparkConf,
    val securityMgr: SecurityManager)
  extends Actor

def preStart() => {
  registerWithMaster()
}

// 向Master注册的方法
def registerWithMaster() {
  	// 向所有的Master注册Worker
  	tryRegisterAllMasters()

  	// 其中内容
  	def tryRegisterAllMasters()=>{
	  	// 通过Master的Url获取Master的actor
		val actor = context.actorSelection(masterAkkaUrl)
		// 向Master发送注册信息
	    actor ! RegisterWorker(workerId, host, port, cores, memory, webUi.boundPort, publicAddress)
	}
 }

Master端
// 接收Worker发送的注册信息
override def receiveWithLogging = {
	case RegisterWorker(id, workerHost, workerPort, cores, memory, workerUiPort, publicAddress) =>{
		// 判断是否是StandBy状态, doNothing
		idToWorker.contains(id), 已经注册过, doNothing

		正常情况下(Active状态, 且没有注册过):{
			// 把发送来的 WorkerInfo 添加到 Master的 WorkerInfo中
			val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory, sender, workerUiPort, publicAddress)
		}
		// 如果将Worker Info存入内存成功, 则调用持久化引擎, 将信息存入磁盘中,
		// 目的是防止数据丢失. 如果Master宕机, 内存中会丢失数据,
		// 切换状态(Standby和Active)后, 需要切换的节点拿不到WorkerInfo, Worker会再次注册, 非常消耗资源, 存在磁盘则可以直接去磁盘拿取数据不需要重新注册
		if (registerWorker(worker)) {
	      persistenceEngine.addWorker(worker)
	      sender ! RegisteredWorker(masterUrl, masterWebUiUrl)
	      schedule()
	    }

		// 向worker响应注册成功信息
		sender ! RegisteredWorker(masterUrl, masterWebUiUrl)
		// 开始调度资源, 调度资源不仅仅是集群启动的时候调动资源, 运行Job的时候也会调度资源, 其有两种方式 一种是尽量分散, 一种是尽量集中
		schedule()
	}
}

Worker端
// 接收注册成功的信息, 其实是将 Active Master 的Url和rWebUiUrl传回并更新, 之后向他发送心跳~
def receiveWithLogging() = {
	case RegisteredWorker(masterUrl, masterWebUiUrl) =>{
		//更新MasterUrl
		changeMaster(masterUrl, masterWebUiUrl)
		//向Master发送心跳信息, HEARTBEAT_MILLIS =15秒, 每十五秒发送一次心跳信息, 发送逻辑为 SendHeartbeat
  		context.system.scheduler.schedule(0 millis, HEARTBEAT_MILLIS millis, self, SendHeartbeat)
	}

  	//向Master发送心跳信息, 实际上是将自己的WorkerId发送给Master
  	case SendHeartbeat =>
    	if (connected) { master ! Heartbeat(workerId) }
}

Master端
def receiveWithLogging() = {
	case Heartbeat(workerId) => {
		//正常情况下, 更新上次心跳时间
		workerInfo.lastHeartbeat = System.currentTimeMillis()
		//启动完成
	}
}

  

原文地址:https://www.cnblogs.com/chinashenkai/p/9977672.html

时间: 2024-10-16 10:38:49

Spark-源码-Spark-StartAll Master Worler启动流程的相关文章

Caddy源码阅读(二)启动流程与 Event 事件通知

Caddy源码阅读(二)启动流程与 Event 事件通知 Preface Caddy 是 Go 语言构建的轻量配置化服务器.https://github.com/caddyserver/caddy Caddy 整个软件可以说是由不同的 插件 堆砌起来的.自己本身仅提供 Plugin 的注册运行逻辑和 Server 的监听服务功能. 学习 caddy 的源码,实际上是学习 如何构建一个 松耦合的 抽象 Plugin 设计,即模块化插拔的做法. 所以我们的源码阅读,围绕 Caddy 为 Plugin

spark源码阅读(一) 启动代码阅读

spark启动代码阅读: spark使用一系列的shell脚本作为入口:其中bin目录下面是任务提交的脚本:sbin目录是master和worker启停相关的脚本. 而所有脚本最后都是通过调用bin/spark-class来实现对java(scala)代码的调用. ----------------------spark-class获取java参数分析 开始------------------------------------- spark-class的代码处理流程: 调用org.apache.

Spark源码分析之Sort-Based Shuffle读写流程

一 .概述 我们知道Spark Shuffle机制总共有三种: 1.未优化的Hash Shuffle:每一个ShuffleMapTask都会为每一个ReducerTask创建一个单独的文件,总的文件数是S * R,不仅文件数量很多,造成频繁的磁盘和网络I/O,而且内存负担也很大,GC频繁,经常出现OOM. 2.优化后Hash Shuffle:改进后的Shuffle,启用consolidation机制,Executor每一个core上的ShuffleMapTask共享文件,减少文件数目,比如Exe

spark源码分析之Executor启动与任务提交篇

任务提交流程 概述 在阐明了Spark的Master的启动流程与Worker启动流程.接下继续执行的就是Worker上的Executor进程了,本文继续分析整个Executor的启动与任务提交流程 Spark-submit 提交一个任务到集群通过的是Spark-submit 通过启动脚本的方式启动它的主类,这里以WordCount为例子 spark-submit --class cn.itcast.spark.WordCount bin/spark-clas -> org.apache.spar

spark 源码理解2 进一步窥探Master、Worker通信机制

上一篇文章 spark 源码理解1 从spark启动脚本开始 是分析执行start_all.sh时,集群中启动了哪些进程,下面我们再深入一点看看这些进程都是做什么用的,它们之间又是如何通信的? 一.Master进程的启动 Master进程,它主要负责对Worker.Driver.App等资源的管理并与它们进行通信,这篇文章中我打算着重讲一下它与Worker的通信,其它的部分放在以后的章节再加以描述. spark-daemon.sh start org.apache.spark.deploy.ma

01 Spark源码编译

1.1设置机器名:hostname gedit /etc/sysconfig/network Scala http://www.scala-lang.org/ cd /opt mkdir scala cp /home/hserver1/desktop/scala-2.12.2.tgz /opt/scala cd /opt/scala tar -xvf scala-2.12.2.tgz 配置环境变量 gedit /etc/profile export SCALA_HOME=/opt/scala/s

Apache Spark源码走读之12 -- Hive on Spark运行环境搭建

欢迎转载,转载请注明出处,徽沪一郎. 楔子 Hive是基于Hadoop的开源数据仓库工具,提供了类似于SQL的HiveQL语言,使得上层的数据分析人员不用知道太多MapReduce的知识就能对存储于Hdfs中的海量数据进行分析.由于这一特性而收到广泛的欢迎. Hive的整体框架中有一个重要的模块是执行模块,这一部分是用Hadoop中MapReduce计算框架来实现,因而在处理速度上不是非常令人满意.由于Spark出色的处理速度,有人已经成功将HiveQL的执行利用Spark来运行,这就是已经非常

Spark源码系列(四)图解作业生命周期

这一章我们探索了Spark作业的运行过程,但是没把整个过程描绘出来,好,跟着我走吧,let you know! 我们先回顾一下这个图,Driver Program是我们写的那个程序,它的核心是SparkContext,回想一下,从api的使用角度,RDD都必须通过它来获得. 下面讲一讲它所不为认知的一面,它和其它组件是如何交互的. Driver向Master注册Application过程 SparkContext实例化之后,在内部实例化两个很重要的类,DAGScheduler和TaskSched

Apache Spark源码走读之5 -- DStream处理的容错性分析

欢迎转载,转载请注明出处,徽沪一郎,谢谢. 在流数据的处理过程中,为了保证处理结果的可信度(不能多算,也不能漏算),需要做到对所有的输入数据有且仅有一次处理.在Spark Streaming的处理机制中,不能多算,比较容易理解.那么它又是如何作到即使数据处理结点被重启,在重启之后这些数据也会被再次处理呢? 环境搭建 为了有一个感性的认识,先运行一下简单的Spark Streaming示例.首先确认已经安装了openbsd-netcat. 运行netcatnc -lk 9999 运行spark-s