Spark分析之Master、Worker以及Application三者之间如何建立连接

Master.preStart(){

  webUi.bind()
  context.system.scheduler.schedule(0 millis, WORKER_TIMEOUT millis, self, CheckForWorkerTimeOut) //定时任务检测是否有DEAD WORKER需要移除

  case CheckForWorkerTimeOut => {
    timeOutDeadWorkers()
  }

  /** Check for, and remove, any timed-out workers */  
  def timeOutDeadWorkers() {
    ...
    if (worker.lastHeartbeat < currentTime - ((REAPER_ITERATIONS + 1) * WORKER_TIMEOUT)) {
      workers -= worker
    }
  }

}
Worker.preStart(){

  override def preStart() {
    webUi = new WorkerWebUI(this, workDir, Some(webUiPort))
    webUi.bind()
    registerWithMaster()  //注册该Worker到Master
  }

  def tryRegisterAllMasters() {
    for (masterUrl <- masterUrls) {
      logInfo("Connecting to master " + masterUrl + "...")
      val actor = context.actorSelection(Master.toAkkaUrl(masterUrl))
      actor ! RegisterWorker(workerId, host, port, cores, memory, webUi.boundPort, publicAddress)
    }
  }

}
Master.scala

case RegisterWorker(){  

  persistenceEngine.addWorker(worker)
  sender ! RegisteredWorker(masterUrl, masterWebUiUrl)  //向Worker发送Worker注册成功事件

  schedule()  //调度部分后续章节分析  

}
Worker.scala

case RegisteredWorker(){

  registered = true
  context.system.scheduler.schedule(0 millis, HEARTBEAT_MILLIS millis, self, SendHeartbeat)  //Worker注册成功后,定时向Master发送心跳信息

}

case SendHeartbeat =>
  masterLock.synchronized {
  if (connected) { master ! Heartbeat(workerId) }
}
Master.scala

case Heartbeat(workerId) => {
  idToWorker.get(workerId) match {
  case Some(workerInfo) =>
    workerInfo.lastHeartbeat = System.currentTimeMillis()  //更新该worker的上次发送心跳信息的时间
  case None =>
    logWarning("Got heartbeat from unregistered worker " + workerId)
  }
}

=================如上步骤完成了Worker到Master的连接===============================================

SparkContext启动时:

SparkContext.createTaskScheduler()

  ==>new SparkDeploySchedulerBackend()

    ==>创建AppClient并启动

      ==>ClientActor.preStart():registerWithMaster(){actor ! RegisterApplication(appDescription)}  //向Master发起RegisterApplication事件
Master.scala

case RegisterApplication(description) {

  val app = createApplication(description, sender)

  registerApplication(app)
  persistenceEngine.addApplication(app)
  sender ! RegisteredApplication(app.id, masterUrl)  //向Worker发起RegisteredApplication事件表示该Application已经注册成功
  schedule()

}

=======================如上步骤完成了Application到Master的连接===============================================

Spark分析之Master、Worker以及Application三者之间如何建立连接

时间: 2024-11-04 20:02:19

Spark分析之Master、Worker以及Application三者之间如何建立连接的相关文章

Spark分析之Master

override def preStart() { logInfo("Starting Spark master at " + masterUrl) webUi.bind() //绑定WEBUI masterWebUiUrl = "http://" + masterPublicAddress + ":" + webUi.boundPort context.system.scheduler.schedule(0 millis, WORKER_TIM

Spark分析之Worker

override def preStart() { webUi = new WorkerWebUI(this, workDir, Some(webUiPort)) webUi.bind() //创建并绑定UI registerWithMaster() //注册到Master } def tryRegisterAllMasters() { for (masterUrl <- masterUrls) { logInfo("Connecting to master " + master

java执行spark查询hbase的jar包出现错误提示:ob aborted due to stage failure: Master removed our application: FAILED

执行java调用scala 打包后的jar时候出现异常 /14 23:57:08 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory 15/04/14 23:57:23 WARN TaskSchedulerImpl: Initial job

大数据技术之_16_Scala学习_11_客户信息管理系统+并发编程模型 Akka+Akka 网络编程-小黄鸡客服案例+Akka 网络编程-Spark Master Worker 进程通讯项目

第十五章 客户信息管理系统15.1 项目的开发流程15.2 项目的需求分析15.3 项目的界面15.4 项目的设计-程序框架图15.5 项目的功能实现15.5.1 完成 Customer 类15.5.2 完成显示主菜单和退出软件功能15.5.3 完成显示客户列表的功能15.5.4 完成添加客户的功能15.5.5 完成删除客户的功能15.5.6 完善退出确认功能15.5.7 完善删除确认功能15.5.8 完成修改客户的功能第十六章 并发编程模型 Akka16.1 Akka 的介绍16.2 Acto

Spark分析之Standalone运行过程分析

一.集群启动过程--启动Master $SPARK_HOME/sbin/start-master.sh start-master.sh脚本关键内容: spark-daemon.sh start org.apache.spark.deploy.master.Master 1 --ip $SPARK_MASTER_IP --port $SPARK_MASTER_PORT --webui-port $SPARK_MASTER_WEBUI_PORT 日志信息:$SPARK_HOME/logs/ 14/0

nginx进程模型 master/worker

nginx有两类进程,一类称为master进程(相当于管理进程),另一类称为worker进程(实际工作进程).启动方式有两种: (1)单进程启动:此时系统中仅有一个进程,该进程既充当master进程的角色,也充当worker进程的角色. (2)多进程启动:此时系统有且仅有一个master进程,至少有一个worker进程工作. master进程主要进行一些全局性的初始化工作和管理worker的工作:事件处理是在worker中进行的. 首先简要的浏览一下nginx的启动过程,如下图: 2.实现原理

Spark技术内幕:Worker源码与架构解析

首先通过一张Spark的架构图来了解Worker在Spark中的作用和地位: Worker所起的作用有以下几个: 1. 接受Master的指令,启动或者杀掉Executor 2. 接受Master的指令,启动或者杀掉Driver 3. 报告Executor/Driver的状态到Master 4. 心跳到Master,心跳超时则Master认为Worker已经挂了不能工作了 5. 向GUI报告Worker的状态 说白了,Worker就是整个集群真正干活的.首先看一下Worker重要的数据结构: v

Spark分析之DAGScheduler

DAGScheduler的主要功能1.接收用户提交的job;2.将job根据类型划分为不同的stage,并在每一个stage内产生一系列的task,并封装成TaskSet;3.向TaskScheduler提交TaskSet; 以如下示例描述Job提交过程: val sc = new SparkContext("local[2]", "WordCount", System.getenv("SPARK_HOME"), Seq(System.geten

Spark分析之Job Scheduling Process

经过前面文章的SparkContext.DAGScheduler.TaskScheduler分析,再从总体上了解Spark Job的调度流程 1.SparkContext将job的RDD DAG图提交给DAGScheduler: 2.DAGScheduler将job分解成Stage DAG,将每个Stage的Task封装成TaskSet提交给TaskScheduler:窄依赖以pipeline方式执行,效率高: 3.TaskScheduler将TaskSet中的一个个Task提交到集群中去运行: