Spark Job的提交与task本地化分析(源码阅读八)

  我们又都知道,Spark中任务的处理也要考虑数据的本地性(locality),Spark目前支持PROCESS_LOCAL(本地进程)NODE_LOCAL(本地节点)NODE_PREF、RACK_LOCAL(本地机架)ANY(任何)几种。其他都很好理解,NODE_LOCAL会在spark日志中执行拉取数据所执行的task时,打印出来,因为Spark是移动计算,而不是移动数据的嘛。

  那么什么是NODE_PREF

  当Driver应用程序刚刚启动,Driver分配获得的Executor很可能还没有初始化,所以有一部分任务的本地化级别被设置为NO_PREF.如果是ShuffleRDD,其本地性始终为NO_PREF。这两种本地化级别是NO_PREF的情况,在任务分配时会被优先分配到非本地节点执行,达到一定的优化效果。

  那么下来我们从job的任务提交开始玩起~

   

  getMissingParentStages方法用来找到Stage的所有不可用的父Stage.从代码可以到这里是个递归的调用,submitWaitingStages实际上循环waitingStages中的stage并调用submitStaghe:

  

  那么下来开始提交task,提交task的入口是submitMissingTasks,此函数在Stage没有可用的父stage时,被调用处理当前Stage未提交的任务。

  1、那么在没有父stage时,会首先调用paendingPartitions.clear 用于清空pendingTasks.由于当前Stage的任务刚开始提交,所以需要清空,便于记录需要计算的任务。

  2、将当前Stage加入运行中的Stage集合,是用HashSet进行构造的。

  3、找出位计算的partition,如果Stage是map任务,那么outputLocs中partition对应的List为Nil,说明此partition还未计算。如果Stage不是map任务,那么需要获取stage的finalJob,调用finished方法判断每个partition的任务是否完成。

  

  4、然后通过stage.makeNewStageAttemp,使用StageInfo.fromStage方法创建当前Stage的_latestInfo:

  

  5、如果是Stage Map任务,那么序列化Stage的RDD及ShuffleDependency,如果Stage不是map任务,那么序列化Stage的RDD及resultOfJob的处理函数。最终这些序列化得到的字节数组需要用sc.broadcast进行广播。

  

  6、最后,创建所有Task、当前stage的id、jobId等信息创建TaskSet,并调用taskScheduler的submitTasks,批量提交Stage及其所有Task.

  

  有可能同时有多个任务提交,所以就有了调度策略FIFO,那么下来调用LocalBackendreviveOffers方法,向local-Actor发送ReviveOffers消息。localActor对ReviveOffers消息的匹配执行reviveOffers方法。调用TaskSchedulerImpl的resourceOffers方法分配资源,最后调用Executor的launchTask方法运行任务。

  

  同时你会发现,这里有段代码,shuffleOffers = Random.shuffle(offers),是为了计算资源的分配与计算,对所有WorkerOffer随机洗牌,避免将任务总是分配给同样的WorkerOffer。

  好了,知道了整个流程,下来我们来看一下本地化问题:

  

  myLocalityLevles:当前TaskSetManager允许使用的本地化级别。那么这里的computeValidLocalityLevels方法是用于计算有效的本地化缓存级别。如果存在Executor中的有待执行的任务,且PROCESS_LOCAL本地化的等待时间不为0,且存在Executor已被激活,那么允许的本地化级别里包括PROCESS_LOCAL.

  

  这里又发现新大陆,获取各个本地化级别的等待时间。

  spark.locality.wait 本地化级别的默认等待时间

  spark.locality.wait.process 本地进程的等待时间

  spark.locality.wait.node 本地节点的等待时间

  spark.locality.wait.rack 本地机架的等待时间

  这些参数呢,在任务的运行很长且数量很多的情况下,适当调高这些参数可以显著提高性能,然而当这些参数值都已经超过任务的运行时长时,则需要调小这些参数。任何任务都希望被分配到可以从本地读取数据的节点上以得到最大的性能提升,然而每个任务的运行时长时不可预计的。当一个任务在分配时,如果没有满足最佳本地化(PROCESS_LOCAL)的资源时,如果固执的期盼得到最佳的资源,很可能被已经占用最佳资源但是运行时间很长的任务耽误,所以这些代码实现了当没有最佳本地化时,选择稍差点的资源。

参考文献:《深入理解Spark:核心思想与源码分析》

时间: 2024-11-05 06:08:10

Spark Job的提交与task本地化分析(源码阅读八)的相关文章

Spark修炼之道(高级篇)——Spark源码阅读:第一节 Spark应用程序提交流程

作者:摇摆少年梦 微信号: zhouzhihubeyond spark-submit 脚本应用程序提交流程 在运行Spar应用程序时,会将spark应用程序打包后使用spark-submit脚本提交到Spark中运行,执行提交命令如下: root@sparkmaster:/hadoopLearning/spark-1.5.0-bin-hadoop2.4/bin# ./spark-submit --master spark://sparkmaster:7077 --class SparkWordC

Spark大师之路:广播变量(Broadcast)源码分析

概述 最近工作上忙死了--广播变量这一块其实早就看过了,一直没有贴出来. 本文基于Spark 1.0源码分析,主要探讨广播变量的初始化.创建.读取以及清除. 类关系 BroadcastManager类中包含一个BroadcastFactory对象的引用.大部分操作通过调用BroadcastFactory中的方法来实现. BroadcastFactory是一个Trait,有两个直接子类TorrentBroadcastFactory.HttpBroadcastFactory.这两个子类实现了对Htt

[Apache Spark源码阅读]天堂之门——SparkContext解析

稍微了解Spark源码的人应该都知道SparkContext,作为整个Project的程序入口,其重要性不言而喻,许多大牛也在源码分析的文章中对其做了很多相关的深入分析和解读.这里,结合自己前段时间的阅读体会,与大家共同讨论学习一下Spark的入口对象—天堂之门—SparkContex. SparkContex位于项目的源码路径\spark-master\core\src\main\scala\org\apache\spark\SparkContext.scala中,源文件包含Classs Sp

vuex源码阅读分析

这几天忙啊,有绝地求生要上分,英雄联盟新赛季需要上分,就懒着什么也没写,很惭愧.这个vuex,vue-router,vue的源码我半个月前就看的差不多了,但是懒,哈哈.下面是vuex的源码分析在分析源码的时候我们可以写几个例子来进行了解,一定不要闭门造车,多写几个例子,也就明白了在vuex源码中选择了example/counter这个文件作为例子来进行理解counter/store.js是vuex的核心文件,这个例子比较简单,如果比较复杂我们可以采取分模块来让代码结构更加清楚,如何分模块请在vu

Spark学习三:Spark Schedule以及idea的安装和导入源码

Spark学习三:Spark Schedule以及idea的安装和导入源码 标签(空格分隔): Spark Spark学习三Spark Schedule以及idea的安装和导入源码 一RDD操作过程中的数据位置 二Spark Schedule 三Idea导入spark源码 一,RDD操作过程中的数据位置 [hadoop001@xingyunfei001 spark-1.3.0-bin-2.5.0]$ bin/spark-shell --master local[2] val rdd = sc.t

easyloader [easyui_1.4.2] 分析源码,妙手偶得之

用easyui很久了,但是很少去看源码. 有解决不了的问题就去百度... 今日发现,easyui的源码不难懂. 而且结合 easyloader 可以非常方便的逐个研究easyui的组件. 但是, easyloader 的官方API介绍非常简略. easyloader.base = '../'; // 设置 easyui 基础目录 easyloader.load('messager', function(){ // 加载指定模块 $.messager.alert('Title', 'load ok

ConcurrentHashMap源码阅读以及底层实现的简单分析

ConcurrentHashMap 是可以实现多线程并发的HashMap,它是线程安全的. 前面分析过 HashMap的源码,它和HashMap有很多的相同点一样,比如它也有 initialCapacity 以及负载因子 loadFactor 属性.而且他们的默认值也是16和0.75. static final int DEFAULT_INITIAL_CAPACITY =16; static final float DEFAULT_LOAD_FACTOR =0.75f; 和HashMap不同的是

spark.mllib源码阅读-分类算法4-DecisionTree

本篇博文主要围绕Spark上的决策树来讲解,我将分为2部分来阐述这一块的知识.第一部分会介绍一些决策树的基本概念.Spark下决策树的表示与存储.结点分类信息的存储.结点的特征选择与分类:第二部分通过一个Spark自带的示例来看看Spark的决策树的训练算法.另外,将本篇与上一篇博文"spark.mllib源码阅读bagging方法"的bagging子样本集抽样方法结合,也就理解了Spark下的决策森林树的实现过程. 第一部分: 决策树模型 分类决策树模型是一种描述对实例进行分类的树形

【Spark】配置Spark源码阅读环境

Scala构建工具(SBT)的使用 SBT介绍 SBT是Simple Build Tool的简称,如果读者使用过Maven,那么可以简单将SBT看做是Scala世界的Maven,虽然二者各有优劣,但完成的工作基本是类似的. 虽然Maven同样可以管理Scala项目的依赖并进行构建,但SBT的某些特性却让人如此着迷,比如: 使用Scala作为DSL来定义build文件(one language rules them all); 通过触发执行(trigger execution)特性支持持续的编译与