spark内核揭秘-12-AppClient注册Masters

注册Master有两种,一种是registerWithMaster方法,一种是tryRegisterAllMasters方法,前者是单Master的情况,后者是多Master,一般情况下是满足HA机制,我们看一下registerWithMaster方法:

此时会调用tryRegisterAllMasters方法:

此时通过Akka通过消息机制发送消息给Master来注册程序,RegisterApplication是一个case class,来封装消息:

我们进入Master的源代码:

看一下接受客户端发送过来消息RegisterApplication的代码如下所示:

此时首先使用ApplicationInfo构建一些准备信息,然后会导致registerApplication代码的调用:

1、createAppliction:

2、registerApplication:

代码中就是一个注册应用的过程。

3、sender ! RegisteredApplication(app.id, masterUrl)方法是回复给发送者消息:

3.1、上面代码中的changeMaster(masterUrl):

3.2、进入listener.connected(appId)方法:

3.2.1、进入实现类SparkDeploySchedulerBackend的connected方法:

进入notifyContext()方法:

4、接着在Master的消息响应中会调用schedule方法:

可以看到schedule方法中首先要启动Driver程序,也就是有main函数的程序,然后在schedule中会调度Worker的过程:

上面代码分析

1、进入launchDriver(worker, driver)方法:

1.1、进入worker.actor ! LaunchDriver(driver.id, driver.desc)方法:

1.1.1、进入 driver.start()方法:

2、进入 launchExecutor(worker, exec)方法:

PS:代码分析的好乱呀,我这个笨猪

时间: 2024-10-12 13:40:41

spark内核揭秘-12-AppClient注册Masters的相关文章

spark内核揭秘-11-Driver中AppClient源码分析

首先从SparkContext中TaskScheduler实例的创建开始: 进入taskScheduler.start()方法内部: 进入其实现者TaskSchedulerImpl内部: 可以发现在start具体实现的内部首先是有个backend.start方法: 其最终具体的实现类为: 从代码中可以看出,我们把CoarseGrainedExecutorBackend封装成command,然后交给appDesc,接着交给了Appclient,此时的AppClient就是客户端程序! AppCli

spark内核揭秘-01-spark内核核心术语解析

Application: Application是创建了SparkContext实例对象的spark用户,包含了Driver程序: Spark-shell是一个应用程序,因为spark-shell在启动的时候创建了一个SparkContext对象,其名称为sc: Job: 和Spark的action相对应,每一个action例如count.saveAsTextFile等都会对应一个job实例,该job实例包含多任务的并行计算. Driver Program: 运行main函数并且创建SparkC

spark内核揭秘-03-spark核心组件

spark核心组件如下所示: 在SparkContext初始化的时候,会初始化一系列内容: 查看内存使用情况: 创建和启动scheduler: 集群核心组件中的Block tracker是用于block和partition对应关系的管理. 集群核心组件中的shuffle tracker是用于记录shuffle操作的过程细节. 从集群中也可以看出,Executor在执行任务的时候是采用多线程的方式执行的并能够在HDFS或者HBase等系统上读取数据. 而在实际的Driver Program运行的时

spark内核揭秘-02-spark集群概览

Spark集群预览: 官方文档对spark集群的初步描述如下,这是一个典型的主从结构: 官方文档对spark集群中的一些关键点给出详细的指导: 其Worker的定义如下所示: 需要注意的是Spark Driver所在的集群需要和Spark集群最好位于同一个网络环境中,因为Driver中的SparkContext实例需发送任务给不同Worker Node的Executor并接受Executor的一些执行结果信息,一般而言,在企业实际的生产环境中Driver所在的机器是的配置往往都是比较不错的,尤其

spark内核揭秘-06-TaskSceduler启动源码解析初体验

TaskScheduler实例对象启动源代码如下所示: 从上面代码可以看出来,taskScheduler的启动是在SparkContext 找到TaskSchedulerImpl实现类中的start方法实现: 1.从上代码看到,先启动CoarseGrainedSchedulerBackend, 从上面CoarseGrainedSchedulerBackend类的代码,可以看出spark启动了DriverActor,名称为CoarseGrainedScheduler,这是一个akka消息通信类,会

spark内核揭秘-04-spark任务调度系统个人理解

spark的任务调度系统如下所示: 从上图中科院看出来由RDDObject产生DAG,然后进入了DAGScheduler阶段,DAGScheduler是面向state的高层次的调度器,DAGScheduler把DAG拆分成很多的tasks,每组的tasks都是一个state,每当遇到shuffle就会产生新的state,可以看出上图一共有三个state:DAGScheduler需要记录那些RDD被存入磁盘等物化动作,同时需勋勋task的最优化调度,例如数据本地性等:DAGScheduler还要监

spark内核揭秘-14-Spark性能优化的10大问题及其解决方案

问题1:reduce task数目不合适 解决方案: 需要根据实际情况调整默认配置,调整方式是修改参数spark.default.parallelism.通常的,reduce数目设置为core数目的2-3倍.数量太大,造成很多小任务,增加启动任务的开销:数目太小,任务运行缓慢.所以要合理修改reduce的task数目即spark.default.parallelism 问题2:shuffle磁盘IO时间长 解决方案: 设置spark.local.dir为多个磁盘,并设置磁盘的IO速度快的磁盘,通

spark内核揭秘-10-RDD源码分析

RDD的核心方法: 首先看一下getPartitions方法的源码: getPartitions返回的是一系列partitions的集合,即一个Partition类型的数组 我们就想进入HadoopRDD实现: 1.getJobConf():用来获取job Configuration,获取配置方式有clone和非clone方式,但是clone方式 是not thread-safe,默认是禁止的,非clone方式可以从cache中获取,如cache中没有那就创建一个新的,然后再放到cache中 2

spark内核揭秘-08-spark的Web监控页面

在SparkContext中可以看到初始化UI代码: // Initialize the Spark UI private[spark] val ui: Option[SparkUI] = if (conf.getBoolean("spark.ui.enabled", true)) { Some(SparkUI.createLiveUI(this, conf, listenerBus, jobProgressListener, env.securityManager,appName))