Spark源码剖析——SparkContext的初始化(四)_TaskScheduler的启动

7. TaskScheduler的启动

第五节介绍了TaskScheduler的创建,要想TaskScheduler发挥作用,必须要启动它,代码:

TaskScheduler在启动的时候,实际调用了backend的start方法,即同时启动了backend。local模式下,这里的backend是localSchedulerBackend。在TaskScheduler初始化时传入localSchedulerBackend。以LocalSchedulerBackend为例,启动LocalSchedulerBackend时向RpcEnv注册了LocalEndpoint。

7.1 创建LocalEndpoint

创建LocalEndpoint的过程主要是构建本地的Executor,见代码如下:

Executor的构建,主要包括以下步骤:

1) 创建并注册ExecutorSource。

2) 获取SparkEnv。如果是非local模式,Worker上的CoarseGrainedExecutorBackend向Driver上的CoarseGrainedExecutorBackend注册Executor时,则需要新建SparkEnv。可以修改属性spark.executor.port(默认为0,表示随机生成)来配置Executor中的RpcEnv的端口号。

3) urlClassLoader的创建。为什么需要创建这个ClassLoader?在非local模式中,Driver或者Worker上都会有多个Executor,每个Executor都设置自身的urlClassLoader,用于加载任务上传的jar包中的类,有效对任务的类加载环境进行隔离。

4) 创建Executor执行Task的线程池threadPool。此线程池用于执行任务。

5) 启动Executor的心跳线程heartbeater。此线程用于向Driver发送心跳。

此外,还包括Rpc发送消息的帧大小(10485760字节)、结果总大小的字节限制(1073741824字节)、正在运行的task的列表、设置serializer的默认ClassLoader为创建的ClassLoader等。

7.2 ExecutorSource的创建与注册

ExecutorSource用于测量系统。通过metricRegistry的register方法注册计量,这些计量信息包括threadpool.activeTasks、threadpool.completeTasks、threadpool.currentPool_size、threadpool.maxPool_size、filesystem.hdfs.write_bytes、filesystem.hdfs.read_ops、filesystem.file.write_bytes、filesystem.hdfs.largeRead_ops、filesystem.hdfs.write_ops等,ExecutorSource的实现见代码:

创建完ExecutorSource后,调用MetricsSystem的registerSource方法将ExecutorSource注册到MetricsSystem。registerSource方法使用MetricRegistry的register方法,将source注册到MetricRegistry,见代码:

7.3 Spark自身urlClassLoader的创建

获取要创建的ClassLoader的父加载器currentLoader,然后根据currentJars生成URL数组,spark.files.userClassPathFirst属性指定加载类时是否先从用户的classpath下加载,最后创建ExecutorURLClassLoader或者ChildExecutorURLClassLoader,见代码:

MutableURLClassLoader或者ChildFirstURLClassLoader实际上都继承了URLClassLoader,见代码:

如果需要REPL交互,还会调用addReplClassLoaderIfNeeded创建replClassLoader,见代码:

7.4 启动Executor的心跳线程

Executor的心跳由startDriverHeartbeater启动。Executor心跳线程的间隔由属性spark.executor.heartbeatInterval配置,默认是10000毫秒。此外,超时时间是30秒,超时重试次数是3次,重试间隔是3000毫秒。此线程从runningTasks获取最新的有关Task的测量信息,将其与executorId、blockManagerId封装为Heartbeat消息,向HearbeatReceiverRef发送Heartbeat消息。

这个心跳线程的作用是什么呢?其作用有两个:

  • 更新正在处理的任务的测量信息;
  • 通知BlockManagerMaster,此Executor上的BlockManager依然活着。

下面对心跳线程的实现详细分析下:

初始化TaskSchedulerImpl后会创建心跳接收器HeartbeatReceiver。HeartbeatReceiver接收所有分配给当前Driver Application的Executor的心跳,并将Task、Task计量信息、心跳等交给TaskSchedulerImpl和DAGScheduler作进一步处理。创建心跳接收器的代码如下:

HeartbeatReceiver在收到心跳信息后,会调用TaskScheduler的executorHeartbeatReceived方法,代码如下:

executorHeartbeatReceived的实现代码如下:

这段程序通过遍历accumUpdates,依据taskIdToTaskSetId找到TaskSetManager。然后将taskId、TaskSetManager.stageId、TaskSetManager.taskSet.stageAttemptId、accInfos封装到类型为Array[(Long, Int, Int,Seq[AccumulableInfo])]的数组accumUpdatesWithTaskIds中。最后调用了dagScheduler的executorHeartbeatReceived方法,其实现如下:

dagScheduler将executorId、accumUpdates封装为SparkListenerExecutorMetricsUpdate事件,并post到listenerBus中,此事件用于更新Stage的各种测量数据。最后给BlockManagerMaster持有的BlockManagerMasterEndpoint发送BlockManagerHeartbeat消息。BlockManagerMasterEndpoint在接收到消息后会匹配执行heartbeatReceived方法。heartbeatReceived最终更新BlockManagerMaster对BlockManager的最后可见时间(即更新BlockManagerId对应的BlockManagerInfo的_lastSeenMs)。

原文地址:https://www.cnblogs.com/swordfall/p/9314930.html

时间: 2024-08-01 17:58:08

Spark源码剖析——SparkContext的初始化(四)_TaskScheduler的启动的相关文章

Spark源码剖析——SparkContext的初始化(四)_Hadoop相关配置及Executor环境变量

4. Hadoop相关配置及Executor环境变量的设置 4.1 Hadoop相关配置信息 默认情况下,Spark使用HDFS作为分布式文件系统,所以需要获取Hadoop相关配置信息的代码如下: 获取的配置信息包括: 将Amazon S3文件系统的AccessKeyId和SecretAccessKey加载到Hadoop的Configuration: 将SparkConf中所有以spark.hadoop. 开头的属性都复制到Hadoop的Configuration: 将SparkConf的属性s

Spark源码剖析——SparkContext的初始化(六)_创建和启动DAGScheduler

6.创建和启动DAGScheduler DAGScheduler主要用于在任务正式交给TaskSchedulerImpl提交之前做一些准备工作,包括:创建Job,将DAG中的RDD划分到不同的Stage,提交Stage,等等.创建DAGScheduler的代码如下: DAGScheduler的数据结构主要维护jobId和stageId的关系.Stage.ActiveJob.以及缓存的RDD的partitions的位置信息,见代码: DAGSchedulerEventProcessLoop能处理的

《Apache Spark源码剖析》

Spark Contributor,Databricks工程师连城,华为大数据平台开发部部长陈亮,网易杭州研究院副院长汪源,TalkingData首席数据科学家张夏天联袂力荐1.本书全面.系统地介绍了Spark源码,深入浅出,细致入微2.提供给读者一系列分析源码的实用技巧,并给出一个合理的阅读顺序3.始终抓住资源分配.消息传递.容错处理等基本问题,抽丝拨茧4.一步步寻找答案,所有问题迎刃而解,使读者知其然更知其所以然 内容简介 书籍计算机书籍 <Apache Spark源码剖析>以Spark

菜鸟nginx源码剖析数据结构篇(四)红黑树ngx_rbtree_t[转]

菜鸟nginx源码剖析数据结构篇(四)红黑树ngx_rbtree_t Author:Echo Chen(陈斌) Email:[email protected] Blog:Blog.csdn.net/chen19870707 Date:October 27h, 2014 1.ngx_rbtree优势和特点 ngx_rbtree是一种使用红黑树实现的关联容器,关于红黑树的特性,在<手把手实现红黑树>已经详细介绍,这里就只探讨ngx_rbtree与众不同的地方:ngx_rbtree红黑树容器中的元素

Spark源码剖析(一):如何将spark源码导入到IDEA中

由于近期准备深入研究一下Spark的核心源码,所以开了这一系列用来记录自己研究spark源码的过程! 想要读源码,那么第一步肯定导入spark源码啦(笔者使用的是IntelliJ IDEA),在网上找了一圈,尝试了好几种方法都没有成功,最终通过自己摸索出了一种非常简单的方式(只需要两步即可!) 环境要求 IntelliJ IDEA(Community版本即可) maven(当然jdk是不可少的) 具体信息如下: C:\Users\Administrator>mvn -version Apache

spark源码解读-SparkContext初始化过程

sparkcontext是spark应用程序的入口,每个spark应用都会创建sparkcontext,用于连接spark集群来执行计算任务.在sparkcontext初始化过程中会创建SparkEnv,SparkUI,TaskSchedule,DAGSchedule等多个核心类,我们会逐个分析他们. 下面我们看一下sparkcontext的初始化过程,首先判断一些参数, try { _conf = config.clone() _conf.validateSettings() if (!_co

spark源码之SparkContext

SparkContext可以说是Spark应用的发动机引擎,Spark Drive的初始化围绕这SparkContext的初始化. SparkContext总览 sparkcontxt的主要组成部分 sparkEnv:spark运行环境,Executor是处理任务的执行器,依赖于SparkEnv的环境.Driver中也包含SparkEnv,为了保证Local模式下任务执行.此外,SparkEnv还包含serializerManager.RpcEnv.BlockManager.mapOutputT

Spark源码剖析(八):stage划分原理与源码剖析

引言 对于Spark开发人员来说,了解stage的划分算法可以让你知道自己编写的spark application被划分为几个job,每个job被划分为几个stage,每个stage包括了你的哪些代码,只有知道了这些之后,碰到某个stage执行特别慢或者报错,你才能快速定位到对应的代码,对其进行性能优化和排错. stage划分原理与源码 接着上期内核源码(五)的最后,每个action操作最终会调用SparkContext初始化时创建的DAGSchedule的runJob方法创建一个job: 那么

重温《STL源码剖析》笔记 第四章

源码之前,了无秘密  ——侯杰 第四章:序列式容器 C++语言本身提供了一个序列式容器array array:分配静态空间,一旦配置了就不能改变. vector: 分配动态空间.维护一个连续线性空间,迭代器类型为:Random Access Iterators 空间配置 typedef simple_alloc<value_type, Alloc> data_allocator 所谓动态增加空间大小,并不是在原空间之后接续新空间,而是以原大小的两倍另外配置一块较大 的空间,然后将原内容拷贝过来