Spark启动流程(Standalone)- master源码

Master源码

 1 package org.apache.spark.deploy.master
 2 //伴生类
 3 private[deploy] class Master(
 4                                 override val rpcEnv: RpcEnv,
 5                                 address: RpcAddress,
 6                                 webUiPort: Int,
 7                                 val securityMgr: SecurityManager,
 8                                 val conf: SparkConf)
 9     extends ThreadSafeRpcEndpoint with Logging with LeaderElectable
10 {
11 ...
12 }
13 //伴生对象
14 private[deploy] object Master extends Logging{
15     val SYSTEM_NAME = "sparkMaster"
16     val ENDPOINT_NAME = "Master"
17
18     // 启动 Master 的入口函数
19     def main(argStrings: Array[String]) {
20         Utils.initDaemon(log)
21         val conf = new SparkConf
22         // 构建用于参数解析的实例
23         //--host hadoop201 --port 7077 --webui-port 8080
24         val args = new MasterArguments(argStrings, conf)
25         // 启动 RPC 通信环境和 MasterEndPoint(通信终端)
26        //<<1>>
27         val (rpcEnv, _, _): (RpcEnv, Int, Option[Int]) = startRpcEnvAndEndpoint(args.host, args.port, args.webUiPort, conf)
28         rpcEnv.awaitTermination()
29     }
30    ...
31 }

<< 1 >>、启动Mater返回一个三元组

 1     /**
 2       * Start the Master and return a three tuple of:
 3       * 启动 Master, 并返回一个三元组
 4       * (1) The Master RpcEnv
 5       * (2) The web UI bound port
 6       * (3) The REST server bound port, if any
 7       */
 8     def startRpcEnvAndEndpoint(
 9                                   host: String,
10                                   port: Int,
11                                   webUiPort: Int,
12                                   conf: SparkConf): (RpcEnv, Int, Option[Int]) = {
13         val securityMgr = new SecurityManager(conf)
14         // 创建 Master 端的 RpcEnv 环境, 并启动 RpcEnv
15         // 参数: sparkMaster hadoop201 7077 conf securityMgr
16         // 返回值  的实际类型是: NettyRpcEnv
17         //<< 1.1 >>
18         val rpcEnv: RpcEnv = RpcEnv.create(SYSTEM_NAME, host, port, conf, securityMgr)
19         // 创建 Master对象, 该对象就是一个 RpcEndpoint, 在 RpcEnv 中注册这个 RpcEndpoint
20         // 返回该 RpcEndpoint 的引用, 使用该引用来接收信息和发送信息
21         //<< 1.2 >>
22         val masterEndpoint: RpcEndpointRef = rpcEnv.setupEndpoint(ENDPOINT_NAME,
23             new Master(rpcEnv, rpcEnv.address, webUiPort, securityMgr, conf))
24         // 向 Master 的通信终端发法请求,获取 BoundPortsResponse 对象
25         // BoundPortsResponse 是一个样例类包含三个属性: rpcEndpointPort webUIPort restPort
26         val portsResponse: BoundPortsResponse = masterEndpoint.askWithRetry[BoundPortsResponse](BoundPortsRequest)
27         (rpcEnv, portsResponse.webUIPort, portsResponse.restPort)
28     }

<< 1.1 >> RpcEnv的创建

 1 def create(
 2               name: String,
 3               bindAddress: String,
 4               advertiseAddress: String,
 5               port: Int,
 6               conf: SparkConf,
 7               securityManager: SecurityManager,
 8               clientMode: Boolean): RpcEnv = {
 9     // 保存 RpcEnv 的配置信息
10     val config = RpcEnvConfig(conf, name, bindAddress, advertiseAddress, port, securityManager,
11         clientMode)
12     // 创建 NettyRpcEvn
13     //<< 1.1.1 >>
14     new NettyRpcEnvFactory().create(config)
15 }

真正的创建是调用NettyRpcEnvFactory 的 create 方法创建

创建NettyRpcEnv的时候,会创建消息分发器,收件箱和存储远程地址与发件箱的Map

RpcEnv.scala

<< 1.1.1 >> NettyRpcEnvFactory ( NettyRpcEnv .scala)

 1 private[rpc] class NettyRpcEnvFactory extends RpcEnvFactory with Logging {
 2     /*
 3     创建 NettyRpcEnv, 并且启动为后台程序
 4      */
 5     def create(config: RpcEnvConfig): RpcEnv = {
 6         val sparkConf: SparkConf = config.conf
 7         // Use JavaSerializerInstance in multiple threads is safe. However, if we plan to support
 8         // KryoSerializer in future, we have to use ThreadLocal to store SerializerInstance
 9         // 用于 Rpc传输对象时的序列化
10         val javaSerializerInstance: JavaSerializerInstance = new JavaSerializer(sparkConf)
11             .newInstance()
12             .asInstanceOf[JavaSerializerInstance]
13         // 实例化 NettyRpcEnv
14         val nettyEnv = new NettyRpcEnv(
15             sparkConf,
16             javaSerializerInstance,
17             config.advertiseAddress,
18             config.securityManager)
19         if (!config.clientMode) {
20             // 定义 NettyRpcEnv 的启动函数
21             val startNettyRpcEnv: Int => (NettyRpcEnv, Int) = { actualPort =>
22                 nettyEnv.startServer(config.bindAddress, actualPort)
23                 (nettyEnv, nettyEnv.address.port)
24             }
25             try {
26                 // 启动 NettyRpcEnv
27                 Utils.startServiceOnPort(config.port, startNettyRpcEnv, sparkConf, config.name)._1
28             } catch {
29                 case NonFatal(e) =>
30                     nettyEnv.shutdown()
31                     throw e
32             }
33         }
34         nettyEnv
35     }
36 }

<< 1.2 >> Master伴生类(Master 端的 RpcEndpoint 启动)

Master是一个RpcEndpoint.

他的生命周期方法是: constructor -> onStart -> receive* -> onStop

onStart 主要代码片段

 1  // 创建 WebUI 服务器
 2  webUi = new MasterWebUI(this, webUiPort)
 3
 4
 5 // 按照固定的频率去启动线程来检查 Worker 是否超时. 其实就是给自己发信息: CheckForWorkerTimeOut
 6 // 默认是每分钟检查一次.
 7  checkForWorkerTimeOutTask = forwardMessageThread.scheduleAtFixedRate(new Runnable {
 8     override def run(): Unit = Utils.tryLogNonFatalError {
 9 // 在 receive 方法中对 CheckForWorkerTimeOut 进行处理
10        //<< 1.2.1 >>
11         self.send(CheckForWorkerTimeOut)
12     }
13 }, 0, WORKER_TIMEOUT_MS, TimeUnit.MILLISECONDS)
14
15
16 private val WORKER_TIMEOUT_MS = conf.getLong("spark.worker.timeout", 60) * 1000

<< 1.2.1 >> 检查并移除超时的worker

 1    /** Check for, and remove, any timed-out workers */
 2     private def timeOutDeadWorkers() {
 3         // Copy the workers into an array so we don‘t modify the hashset while iterating through it
 4         val currentTime = System.currentTimeMillis()
 5         //  把超时的 Worker 从 workers 中移除
 6 //过滤出来要移除的worker:(上次心跳时间 小于 当前时间 减去 超时时间 )即为超时
 7         val toRemove = workers.filter(_.lastHeartbeat < currentTime - WORKER_TIMEOUT_MS).toArray
 8         for (worker <- toRemove) {
 9             // 如果 worker 的状态不是 DEAD
10             if (worker.state != WorkerState.DEAD) {
11                 logWarning("Removing %s because we got no heartbeat in %d seconds".format(
12                     worker.id, WORKER_TIMEOUT_MS / 1000))
13                 removeWorker(worker) //
14             } else {
15                 if (worker.lastHeartbeat < currentTime - ((REAPER_ITERATIONS + 1) * WORKER_TIMEOUT_MS)) {
16                     workers -= worker // we‘ve seen this DEAD worker in the UI, etc. for long enough; cull it
17                 }
18             }
19         }
20     }

原文地址:https://www.cnblogs.com/hyunbar/p/12079466.html

时间: 2024-08-27 03:54:28

Spark启动流程(Standalone)- master源码的相关文章

《深入理解SPARK:核心思想与源码分析》——SparkContext的初始化(中)

<深入理解Spark:核心思想与源码分析>一书前言的内容请看链接<深入理解SPARK:核心思想与源码分析>一书正式出版上市 <深入理解Spark:核心思想与源码分析>一书第一章的内容请看链接<第1章 环境准备> <深入理解Spark:核心思想与源码分析>一书第二章的内容请看链接<第2章 SPARK设计理念与基本架构> 由于本书的第3章内容较多,所以打算分别开辟三篇随笔分别展现. <深入理解Spark:核心思想与源码分析>一

Spark技术内幕:Worker源码与架构解析

首先通过一张Spark的架构图来了解Worker在Spark中的作用和地位: Worker所起的作用有以下几个: 1. 接受Master的指令,启动或者杀掉Executor 2. 接受Master的指令,启动或者杀掉Driver 3. 报告Executor/Driver的状态到Master 4. 心跳到Master,心跳超时则Master认为Worker已经挂了不能工作了 5. 向GUI报告Worker的状态 说白了,Worker就是整个集群真正干活的.首先看一下Worker重要的数据结构: v

Spark之SQL解析(源码阅读十)

如何能更好的运用与监控sparkSQL?或许我们改更深层次的了解它深层次的原理是什么.之前总结的已经写了传统数据库与Spark的sql解析之间的差别.那么我们下来直切主题~ 如今的Spark已经支持多种多样的数据源的查询与加载,兼容了Hive,可用JDBC的方式或者ODBC来连接Spark SQL.下图为官网给出的架构.那么sparkSql呢可以重用Hive本身提供的元数据仓库(MetaStore).HiveQL.以及用户自定义函数(UDF)及序列化和反序列化的工具(SerDes). 下来我们来

转 深入浅出Mybatis系列(十)---SQL执行流程分析(源码篇)

深入浅出Mybatis系列(十)---SQL执行流程分析(源码篇) 最近太忙了,一直没时间继续更新博客,今天忙里偷闲继续我的Mybatis学习之旅.在前九篇中,介绍了mybatis的配置以及使用, 那么本篇将走进mybatis的源码,分析mybatis 的执行流程, 好啦,鄙人不喜欢口水话,还是直接上干活吧: 1. SqlSessionFactory 与 SqlSession. 通过前面的章节对于mybatis 的介绍及使用,大家都能体会到SqlSession的重要性了吧, 没错,从表面上来看,

Spark MLlib机器学习算法、源码及实战讲解pdf电子版下载

Spark MLlib机器学习算法.源码及实战讲解pdf电子版下载 链接:https://pan.baidu.com/s/1ruX9inG5ttOe_5lhpK_LQg 提取码:idcb <Spark MLlib机器学习:算法.源码及实战详解>书中讲解由浅入深慢慢深入,解析讲解了MLlib的底层原理:数据操作及矩阵向量计算操作,该部分是MLlib实现的基础:并对此延伸机器学习的算法,循序渐进的讲解其中的原理,是读者一点一点的理解和掌握书中的知识. 目录 · · · · · · 第一部分 Spa

Activiti 流程启动及节点流转源码分析

作者:jiankunking 出处:http://blog.csdn.net/jiankunking 本文主要是以activiti-study中的xiaomage.xml流程图为例进行跟踪分析 具体的流程图如下: 流程图对应的XML文件如下: <?xml version="1.0" encoding="UTF-8"?> <definitions xmlns="http://www.omg.org/spec/BPMN/20100524/MO

Spark Core 1.3.1源码解析及个人总结

本篇源码基于赵星对Spark 1.3.1解析进行整理.话说,我不认为我这下文源码的排版很好,不能适应的还是看总结吧. 虽然1.3.1有点老了,但对于standalone模式下的Master.Worker和划分stage的理解是很有帮助的.=====================================================总结: master和worker都要创建ActorSystem来创建自身的Actor对象,master内部维护了一个保存workerinfo的hashSe

tomcat的启动过程(Tomcat源码解析(三))

Tomcat组件生命周期管理 在Tomcat总体结构 (Tomcat源代码解析之二)中,我们列出了Tomcat中Server,Service,Connector,Engine,Host,Context的继承关系图,你会发现它们都实现了org.apache.catalina.Lifecycle接口,而org.apache.catalina.util.LifecycleBase采用了模板方法模式来对所有支持生命周期管理的组件的生命周期各个阶段进行了总体管理,每个需要生命周期管理的组件只需要继承这个基

深入浅出Mybatis系列(十)---SQL执行流程分析(源码篇)

原文地址:http://www.cnblogs.com/dongying/p/4142476.html 最近太忙了,一直没时间继续更新博客,今天忙里偷闲继续我的Mybatis学习之旅.在前九篇中,介绍了mybatis的配置以及使用, 那么本篇将走进mybatis的源码,分析mybatis 的执行流程, 好啦,鄙人不喜欢口水话,还是直接上干活吧: 1. SqlSessionFactory 与 SqlSession. 通过前面的章节对于mybatis 的介绍及使用,大家都能体会到SqlSession