原理图解:
Master类位置所在:spark-core_2.11-2.1.0.jar的org.apache.spark.deploy.master下的Master类
//截取了部分代码 //处理Application注册的请求 case RegisterApplication(description, driver) => //如果master的状态是standby,也就是当前这个master,是standby master,不是active master //那么Application来请求注册,什么都不会干 // TODO Prevent repeated registrations from some driver if (state == RecoveryState.STANDBY) { // ignore, don‘t send response } else { logInfo("Registering app " + description.name) //用ApplicationDescription信息,创建ApplicationInfo val app = createApplication(description, driver) //注册Application //将ApplicationInfo加入缓存,将Application加入等待调度的队列--waitingApps registerApplication(app)//详细代码见:代码1 logInfo("Registered app " + description.name + " with ID " + app.id) //使用持久化引擎,将ApplicationInfo进行持久化 persistenceEngine.addApplication(app) //反向,向SparkDeploySchedulerBackend的APPClient的ClientActor,发送消息,也就是RegisteredApplication driver.send(RegisteredApplication(app.id, self)) schedule()//schedule 重新调度,--至此Application注册完成 } 代码1 private def registerApplication(app: ApplicationInfo): Unit = { val appAddress = app.driver.address if (addressToApp.contains(appAddress)) { logInfo("Attempted to re-register application at same address: " + appAddress) return } applicationMetricsSystem.registerSource(app.appSource) //这里其实就是将APP的信息加入内存缓存中 apps += app idToApp(app.id) = app endpointToApp(app.driver) = app addressToApp(appAddress) = app //将APP加入等待调度的队列中 waitingApps就是一个ArrayBuffer waitingApps += app if (reverseProxy) { webUi.addProxyTargets(app.id, app.desc.appUiUrl) } }
原文地址:https://www.cnblogs.com/yzqyxq/p/11968009.html
时间: 2024-10-03 11:04:54