spark内核揭秘-04-spark任务调度系统个人理解

spark的任务调度系统如下所示:

从上图中科院看出来由RDDObject产生DAG,然后进入了DAGScheduler阶段,DAGScheduler是面向state的高层次的调度器,DAGScheduler把DAG拆分成很多的tasks,每组的tasks都是一个state,每当遇到shuffle就会产生新的state,可以看出上图一共有三个state;DAGScheduler需要记录那些RDD被存入磁盘等物化动作,同时需勋勋task的最优化调度,例如数据本地性等;DAGScheduler还要监视因为shuffle输出导致的失败,如果发生这种失败,可能就需要重新提交该state:

DAGScheduler划分state后以TaskSet为单位把任务,把任务交给底层次的可插拔的调度器TaskScheduler来处理:

可以看出TaskScheduler是一个trait,在目前spark系统中TaskScheduler的实现类只有一个TaskSchedulerImpl:

一个TaskScheduler只为一个SparkContext实例服务,TaskScheduler接受来自DAGScheduler发送过来的分组的任务,DAGScheduler给TaskScheduler发送任务的时候是以Stage为单位来提交的,TaskScheduler收到任务后负责把任务分发到集群中Worker的Executor中去运行,如果某个task运行失败,TaskScheduler要负责重试;另外如果TaskScheduler发现某个Task一直未运行完,就可能启动同样的任务运行同一个Task,那个任务先运行完就用哪个任务的结果。

TaskScheduler发送的任务交给了Worker上的Executor以多线程的方式运行,每一个线程负责一个任务:

其中的存储系统的管理是BlockManager来负责的:

看一下TaskSet的源码:

从TaskSet源码的第一个参数tasks就可以看出其是一个Task的数组,包含一组Task。

时间: 2024-08-27 16:26:29

spark内核揭秘-04-spark任务调度系统个人理解的相关文章

spark内核揭秘-03-spark核心组件

spark核心组件如下所示: 在SparkContext初始化的时候,会初始化一系列内容: 查看内存使用情况: 创建和启动scheduler: 集群核心组件中的Block tracker是用于block和partition对应关系的管理. 集群核心组件中的shuffle tracker是用于记录shuffle操作的过程细节. 从集群中也可以看出,Executor在执行任务的时候是采用多线程的方式执行的并能够在HDFS或者HBase等系统上读取数据. 而在实际的Driver Program运行的时

spark内核揭秘-06-TaskSceduler启动源码解析初体验

TaskScheduler实例对象启动源代码如下所示: 从上面代码可以看出来,taskScheduler的启动是在SparkContext 找到TaskSchedulerImpl实现类中的start方法实现: 1.从上代码看到,先启动CoarseGrainedSchedulerBackend, 从上面CoarseGrainedSchedulerBackend类的代码,可以看出spark启动了DriverActor,名称为CoarseGrainedScheduler,这是一个akka消息通信类,会

spark内核揭秘-01-spark内核核心术语解析

Application: Application是创建了SparkContext实例对象的spark用户,包含了Driver程序: Spark-shell是一个应用程序,因为spark-shell在启动的时候创建了一个SparkContext对象,其名称为sc: Job: 和Spark的action相对应,每一个action例如count.saveAsTextFile等都会对应一个job实例,该job实例包含多任务的并行计算. Driver Program: 运行main函数并且创建SparkC

spark内核揭秘-02-spark集群概览

Spark集群预览: 官方文档对spark集群的初步描述如下,这是一个典型的主从结构: 官方文档对spark集群中的一些关键点给出详细的指导: 其Worker的定义如下所示: 需要注意的是Spark Driver所在的集群需要和Spark集群最好位于同一个网络环境中,因为Driver中的SparkContext实例需发送任务给不同Worker Node的Executor并接受Executor的一些执行结果信息,一般而言,在企业实际的生产环境中Driver所在的机器是的配置往往都是比较不错的,尤其

spark内核揭秘-14-Spark性能优化的10大问题及其解决方案

问题1:reduce task数目不合适 解决方案: 需要根据实际情况调整默认配置,调整方式是修改参数spark.default.parallelism.通常的,reduce数目设置为core数目的2-3倍.数量太大,造成很多小任务,增加启动任务的开销:数目太小,任务运行缓慢.所以要合理修改reduce的task数目即spark.default.parallelism 问题2:shuffle磁盘IO时间长 解决方案: 设置spark.local.dir为多个磁盘,并设置磁盘的IO速度快的磁盘,通

spark内核揭秘-10-RDD源码分析

RDD的核心方法: 首先看一下getPartitions方法的源码: getPartitions返回的是一系列partitions的集合,即一个Partition类型的数组 我们就想进入HadoopRDD实现: 1.getJobConf():用来获取job Configuration,获取配置方式有clone和非clone方式,但是clone方式 是not thread-safe,默认是禁止的,非clone方式可以从cache中获取,如cache中没有那就创建一个新的,然后再放到cache中 2

spark内核揭秘-08-spark的Web监控页面

在SparkContext中可以看到初始化UI代码: // Initialize the Spark UI private[spark] val ui: Option[SparkUI] = if (conf.getBoolean("spark.ui.enabled", true)) { Some(SparkUI.createLiveUI(this, conf, listenerBus, jobProgressListener, env.securityManager,appName))

spark内核揭秘-05-SparkContext核心源码解析初体验

SparkContext在获得了一系列的初始化信息后开始创建并启动TaskScheduler实例: 进入createTaskScheduler方法: 我们看一下其Standalone的方式: 在上述代码中首先实例化一个TaskSchedulerImpl: 然后构建出了masterUrls: 接着创建出关键的backend: 进入SparkDeploySchedulerBackend实现: 从以上截图可以看出来,SparkDeploySchedulerBackend核心是为了启动CoarseGra

spark内核揭秘-12-AppClient注册Masters

注册Master有两种,一种是registerWithMaster方法,一种是tryRegisterAllMasters方法,前者是单Master的情况,后者是多Master,一般情况下是满足HA机制,我们看一下registerWithMaster方法: 此时会调用tryRegisterAllMasters方法: 此时通过Akka通过消息机制发送消息给Master来注册程序,RegisterApplication是一个case class,来封装消息: 我们进入Master的源代码: 看一下接受