Spark中的Scheduler

Spark中的Scheduler

scheduler分成两个类型,一个是TaskScheduler与其实现,一个是DAGScheduler。

TaskScheduler:主要负责各stage中传入的task的执行与调度。

DAGScheduler:主要负责对JOB中的各种依赖进行解析,根据RDD的依赖生成stage并通知TaskScheduler执行。

实例生成

TaskScheduler实例生成:

scheduler实例生成,我目前主要是针对onyarn的spark进行的相关分析,

在appmaster启动后,通过调用startUserClass()启动线程来调用用户定义的spark分析程序。

传入的第一个参数为appmastername(master),可传入的如:yarn-cluster等。

在用户定义的spark分析程序中,生成SparkContext实例。

通过SparkContext.createTaskScheduler函数。如果是yarn-cluster,生成YarnClusterScheduler实例。

此部分生成的scheduler为TaskScheduler实例。

defthis(sc:SparkContext)
= this(sc,newConfiguration())

同时YarnClusterSchduler实现TaskSchedulerImpl。

defthis(sc:SparkContext)
= this(sc,sc.conf.getInt("spark.task.maxFailures",4))

生成TaskScheduler中的SchedulerBackend属性引用,yarn-cluster为CoarseGrainedSchedulerBackend

valbackend =newCoarseGrainedSchedulerBackend(scheduler,sc.env.actorSystem)

scheduler.initialize(backend)

DAGScheduler实例生成:

classDAGScheduler(

taskSched: TaskScheduler,

mapOutputTracker:MapOutputTrackerMaster,

blockManagerMaster:BlockManagerMaster,

env: SparkEnv)

extendsLogging {

defthis(taskSched:TaskScheduler){

this(taskSched,SparkEnv.get.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster],

SparkEnv.get.blockManager.master,SparkEnv.get)

}

taskSched.setDAGScheduler(this)

scheduler调度过程分析

1.rdd执行action操作,如saveAsHadoopFile

2.调用SparkContext.runJob

3.调用DAGScheduler.runJob-->此函数调用submitJob,并等job执行完成。

Waiter.awaitResult()中通过_jobFinished检查job运行是否完成,如果完成,此传为true,否则为false.

_jobFinished的值通过resultHandler函数,每调用一次finishedTasks的值加一,

如果finishedTasks的个数等于totalTasks的个数时,表示完成。或者出现exception.

defrunJob[T, U: ClassTag](

rdd: RDD[T],

func: (TaskContext, Iterator[T])=> U,

partitions: Seq[Int],

callSite: String,

allowLocal: Boolean,

resultHandler: (Int, U) =>Unit,

properties: Properties =
null)

{

valwaiter
=submitJob(rdd, func, partitions, callSite, allowLocal, resultHandler,properties)

waiter.awaitResult()match{

caseJobSucceeded => {}

caseJobFailed(exception:Exception,
_) =>

logInfo("Failedto run " + callSite)

throwexception

}

}

4.调用DAGScheduler.submitJob函数,

部分代码:生成JobWaiter实例,并传入此实例,发送消息,调用JobSubmitted事件。并返回waiter实例。

JobWaiter是JobListener的实现。

valwaiter
=newJobWaiter(this,jobId,partitions.size,
resultHandler)

eventProcessActor! JobSubmitted(

jobId,rdd,
func2,partitions.toArray, allowLocal, callSite,
waiter,properties)

waiter

5.处理DAGScheduler的JobSubmitted事件消息,通过processEvent处理消息接收的事件。

defreceive = {

caseevent:DAGSchedulerEvent
=>

logTrace("Gotevent of type " +event.getClass.getName)

if(!processEvent(event)){

submitWaitingStages()

}
else{

resubmissionTask.cancel()

context.stop(self)

}

}

}))

6.processEvent函数中处理JobSubmitted事件部分代码:

caseJobSubmitted(jobId,rdd,
func,partitions,allowLocal,callSite,listener,properties)=>

varfinalStage:Stage
= null

try{

生成stage实例,stage的id通过nextStageId的值加一得到,task的个数就是partitions的分区个数,

根据job对应的rdd,得到如果parentrdd是shuffle的rdd时生成ShuffleMapStage,通过getParentStages函数,

此处去拿到parentrdd时,如果currentrdd的parentrdd不是shuffle,递归调用parentrdd,

如果parendrdd中没有shuffle的rdd,不生成新的stage,否则有多少个,生成多少个。此处是处理DAG类的依赖

finalStage= newStage(rdd,partitions.size,None,
jobId,Some(callSite))

}
catch{

casee:Exception
=>

logWarning("Creatingnew stage failed due to exception - job: "+
jobId,
e)

listener.jobFailed(e)

returnfalse

}

生成ActiveJob实例。设置numFinished的值为0,表示job中有0个完成的task.

设置所有task个数的arrayfinished.并把所有元素的值设置为false.把JobWaiter当listener传入ActiveJob.

valjob
= newActiveJob(jobId,finalStage,func,partitions,callSite,listener,properties)

对已经cache过的TaskLocation进行清理。

clearCacheLocs()

logInfo("Gotjob " +
job.jobId+
" ("+
callSite+ ") with "+
partitions.length+

"output partitions (allowLocal=" +allowLocal+
")")

logInfo("Finalstage: " +
finalStage+
" ("+
finalStage.name+
")")

logInfo("Parentsof final stage: " +finalStage.parents)

logInfo("Missingparents: " +getMissingParentStages(finalStage))

如果runJob时传入的allowLocal的值为true,同时没有需要shuffle的rdd,同时partitions的长度为1,

也就是task只有一个,直接在local运行此job..通过runLocallyWithinThread生成一个线程来执行。

if(allowLocal&&
finalStage.parents.size==
0 &&partitions.length==
1) {

//Compute very short actions like first() or take() with no parentstages locally.

listenerBus.post(SparkListenerJobStart(job,Array(),
properties))

通过ActiveJob中的func函数来执行job的运行,此函数在rdd的action调用时生成定义,

如saveAsHadoopFile(saveAsHadoopDataset)中的定义的内部func,writeToFile函数。

完成函数执行后,调用上面提到的生成的JobWaiter.taskSucceeded函数。

runLocally(job)

}
else{

否则有多个partition也就是有多个task,或者有shuffle的情况,

idToActiveJob(jobId)=
job

activeJobs+=
job

resultStageToJob(finalStage)=
job

listenerBus.post(SparkListenerJobStart(job,jobIdToStageIds(jobId).toArray,properties))

调用DAGScheduler.submitStage函数。

submitStage(finalStage)

}

7.DAGScheduler.submitStage函数:递归函数调用,

如果stage包含parentstage(shuffle的情况)把stage设置为waiting状态,等待parentstage执行完成才进行执行。

privatedefsubmitStage(stage:
Stage) {

valjobId
=activeJobForStage(stage)

if(jobId.isDefined){

logDebug("submitStage("+ stage +
")")

如果RDD的Dependency的RDD还没有执行完成,等待Dependency执行完成后当前的RDD才能进行执行操作。

if(!waiting(stage)&&
!running(stage)&& !failed(stage)){

根据stage中rdd的Dependency,检查是否需要生成新的stage,如果是ShuffleDependency,会生成新的ShuffleMapStage

此处去拿到parentrdd时,如果currentrdd的parentrdd不是shuffle,递归调用parentrdd,

如果parendrdd中没有shuffle的rdd,不生成新的stage,否则有多少个,生成多少个。此处是处理DAG类的依赖

valmissing
=getMissingParentStages(stage).sortBy(_.id)

logDebug("missing:" +
missing)

如果没有RDD中的shuffle的Dependency,也就是RDD之间都是NarrowDependency的Dependency

表示所有的Dependency都在map端本地执行。

if(missing
==Nil) {

logInfo("Submitting" + stage +
"(" + stage.rdd+
"), which has no missingparents")

submitMissingTasks(stage,jobId.get)

running+= stage

}
else{

如果RDD有Dependency,先执行parentrdd的stage操作。此处是递归函数调用

for(parent
<-missing) {

submitStage(parent)

}

waiting+= stage

}

}

}else{

abortStage(stage,
"Noactive job for stage " + stage.id)

}

}

    8.DAGScheduler.submitMissingTask的执行流程:

    privatedefsubmitMissingTasks(stage:
    Stage, jobId: Int) {

logDebug("submitMissingTasks("+ stage +
")")

//Get our pending tasks and remember them in our pendingTasks entry

valmyPending
=pendingTasks.getOrElseUpdate(stage,newHashSet)

myPending.clear()

vartasks
=ArrayBuffer[Task[_]]()

如果stage是shuffle的rdd,迭代stage下的的所有partition,根据partition与对应的TaskLocation

生成ShuffleMapTask.添加到task列表中。

if(stage.isShuffleMap){

for(p
<- 0until stage.numPartitionsifstage.outputLocs(p)==
Nil) {

vallocs
=getPreferredLocs(stage.rdd,p)

tasks+=
newShuffleMapTask(stage.id,stage.rdd,stage.shuffleDep.get,p,
locs)

}

}else{

否则表示stage是非shuffle的rdd,此是是执行完成后直接返回结果的stage,生成ResultTask实例。

由于是ResultTask,因此需要传入定义的func,也就是如何处理结果返回

//This is a final stage; figure out its job‘s missing partitions

valjob
=resultStageToJob(stage)

for(id
<- 0until
job.numPartitionsif!job.finished(id)){

valpartition
=job.partitions(id)

vallocs
=getPreferredLocs(stage.rdd,partition)

tasks+=
newResultTask(stage.id,stage.rdd,job.func,partition,locs,
id)

}

}

valproperties=
if(idToActiveJob.contains(jobId)){

idToActiveJob(stage.jobId).properties

}else{

//thisstage will be assigned to "default" pool

null

}

//must be run listener before possible NotSerializableException

//should be "StageSubmitted" first and then "JobEnded"

listenerBus.post(SparkListenerStageSubmitted(stageToInfos(stage),properties))

if(tasks.size>
0) {

//Preemptively serialize a task to make sure it can be serialized. Weare catching this

//exception here because it would be fairly hard to catch thenon-serializableexception

//down the road, where we have several different implementations forlocal scheduler and

//cluster schedulers.

try{

SparkEnv.get.closureSerializer.newInstance().serialize(tasks.head)

}
catch{

casee:NotSerializableException
=>

abortStage(stage,
"Tasknot serializable: " +
e.toString)

running-= stage

return

}

logInfo("Submitting" +
tasks.size+
" missing tasks from "+ stage +
" ("+ stage.rdd+
")")

myPending++=
tasks

logDebug("Newpending tasks: " +
myPending)

生成TaskSet实例,把stage中要执行的Task列表传入,同时把stage对应的ActiveJob也传入。

通过TaskScheduler的实现,调用submitTasks函数,YarnClusterScheduler(TaskSchedulerImpl)

taskSched.submitTasks(

newTaskSet(tasks.toArray,stage.id,stage.newAttemptId(),
stage.jobId,properties))

stageToInfos(stage).submissionTime=
Some(System.currentTimeMillis())

}else{

logDebug("Stage" + stage +
"is actually done; %b %d %d".format(

stage.isAvailable,stage.numAvailableOutputs,stage.numPartitions))

running-= stage

}

}

9.TaskSchedulerImpl.submitTasks函数流程分析:

通过传入的TaskSet,得到要执行的tasks列表,并生成TaskSetmanager实例,

同时把实例添加到的schedulableBuilder(FIFOSchedulableBuilder/FairSchedulableBuilder)队列中。

关于TaskSetManager实例可参见后面的分析。

overridedefsubmitTasks(taskSet:
TaskSet) {

valtasks
=taskSet.tasks

logInfo("Addingtask set " + taskSet.id+
" with "+
tasks.length+
" tasks")

this.synchronized{

valmanager
=newTaskSetManager(this,taskSet,
maxTaskFailures)

activeTaskSets(taskSet.id)=
manager

schedulableBuilder.addTaskSetManager(manager,manager.taskSet.properties)

taskSetTaskIds(taskSet.id)=
newHashSet[Long]()

定期检查task的执行消息是否被生成执行。如果task被分配执行,关闭此线程。否则一直给出提示.

if(!isLocal && !hasReceivedTask){

starvationTimer.scheduleAtFixedRate(newTimerTask()
{

overridedefrun()
{

if(!hasLaunchedTask){

logWarning("Initialjob has not accepted any resources; "+

"checkyour cluster UI to ensure that workers are registered "+

"andhave sufficient memory")

}
else{

this.cancel()

}

}

},
STARVATION_TIMEOUT,STARVATION_TIMEOUT)

}

hasReceivedTask=
true

}

通过SchedulerBackend的实现CoarseGrainedSchedulerBackend.reviceOffers发起执行处理操作。

backend.reviveOffers()

}

9.1TaskSetManager的实例生成:

private[spark]classTaskSetManager(

sched: TaskSchedulerImpl,

valtaskSet:TaskSet,

valmaxTaskFailures:Int,

clock: Clock = SystemClock)

extendsSchedulablewithLogging

...........................

for(i
<- (0until
numTasks).reverse){

addPendingTask(i)

}

关于addPendingTask的定义:此睦传入的readding的值为false.

privatedefaddPendingTask(index:
Int, readding: Boolean = false){

//Utility method that adds `index` to a list only if
readding=falseor it‘s not already there

内部定义的addTo方法。

defaddTo(list:ArrayBuffer[Int])
{

if(!readding || !list.contains(index))
{

list += index

}

}

varhadAliveLocations=
false

迭代所有的要执行的task,并通过task的TaskLocation检查执行的节点级别。添加到相应的pendingTask容器中

for(loc
<-tasks(index).preferredLocations){

for(execId
<-loc.executorId){

检查TaskSchedulerImpl.activeExecutorIds的活动的worker的executor是否存在,

如果是第一个执行的RDD时,此时activeExecutorIds容器的的值为空,当第一个RDD中有TASK在此executor中执行过后,

会把executor的id添加到activeExecutorIds容器中。

第一个RDD的stage执行时,此部分不执行,但第二个stage执行时,可最大可能的保证task在PROCESS_LOCAL的执行。

if(sched.isExecutorAlive(execId)){

addTo(pendingTasksForExecutor.getOrElseUpdate(execId,newArrayBuffer))

hadAliveLocations=
true

}

}

if(sched.hasExecutorsAliveOnHost(loc.host)){

如果在TaskSchedulerImpl的executorsByHost容器中包含此host,在pendingTasksForHost中添加对应的task.

TaskSchedulerImpl.executorsByHost容器的值在每一个worker注册时

通过向CoarseGrainedSchedulerBackend.DriverActor发送RegisterExecutor事件消息。

通过makeOffers()-->TaskSchedulerImpl.resourceOffers把host添加到executorsByHost容器中。

addTo(pendingTasksForHost.getOrElseUpdate(loc.host,newArrayBuffer))

通过调用YarnClusterScheduler.getRackForHost得到host对应的rack,

并在rack的pending容器中添加对应的task个数和。

for(rack
<-sched.getRackForHost(loc.host)){

addTo(pendingTasksForRack.getOrElseUpdate(rack,newArrayBuffer))

}

hadAliveLocations=
true

}

}

如果上面两种情况都没有添加到容器中pendingTasksWithNoPrefs。

if(!hadAliveLocations){

//Even though the task might‘ve had preferred locations, all of thosehosts or executors

//are dead; put it in the no-prefslist
so we can schedule it elsewhere right away.

addTo(pendingTasksWithNoPrefs)

}

在TaskSetManager实例生成是,把所有task的个数都添加到allPendingTasks容器中

if(!readding) {

allPendingTasks+= index
// No point scanning thiswhole list to find the old task there

}

}

.............................

得到可选择的LocalityLevel级别。

valmyLocalityLevels=
computeValidLocalityLevels()

vallocalityWaits=
myLocalityLevels.map(getLocalityWait)// Time to wait at each level

以下代码是computeValidLocalityLevels的定义,主要根据各种locality中pending的容器中是否有值。

生成当前stage中的task执行可选择的Locality级别。

privatedefcomputeValidLocalityLevels():
Array[TaskLocality.TaskLocality] = {

importTaskLocality.{PROCESS_LOCAL,NODE_LOCAL,RACK_LOCAL,ANY}

vallevels
=newArrayBuffer[TaskLocality.TaskLocality]

if(!pendingTasksForExecutor.isEmpty&&
getLocalityWait(PROCESS_LOCAL)!=
0) {

levels+=
PROCESS_LOCAL

}

if(!pendingTasksForHost.isEmpty&&
getLocalityWait(NODE_LOCAL)!=
0) {

levels+=
NODE_LOCAL

}

if(!pendingTasksForRack.isEmpty&&
getLocalityWait(RACK_LOCAL)!=
0) {

levels+=
RACK_LOCAL

}

levels+=
ANY

logDebug("Validlocality levels for " +
taskSet+
": "+ levels.mkString(","))

levels.toArray

}

}

以下代码是getLocalityWait的定义代码:此函数主要是定义每一个Task在此Locality级别中执行的等待时间。

也就是scheduler调度在传入的Locality级别时所花的时间是否超过指定的等待时间,

如果超过表示需要放大Locality的查找级别。

privatedefgetLocalityWait(level:
TaskLocality.TaskLocality): Long = {

valdefaultWait=
conf.get("spark.locality.wait","3000")

level
match{

caseTaskLocality.PROCESS_LOCAL=>

conf.get("spark.locality.wait.process",defaultWait).toLong

caseTaskLocality.NODE_LOCAL=>

conf.get("spark.locality.wait.node",defaultWait).toLong

caseTaskLocality.RACK_LOCAL=>

conf.get("spark.locality.wait.rack",defaultWait).toLong

caseTaskLocality.ANY=>

0L

}

}

10.SchedulerBackend.reviveOffers()的调度处理流程:

SchedulerBackend的实现为CoarseGrainedSchedulerBackend。

overridedefreviveOffers()
{

driverActor! ReviveOffers

}

以上代码发CoarseGrainedSchedulerBackend内部的DriverActor发送消息,处理ReviveOffers事件。

caseReviveOffers =>

makeOffers()

................

defmakeOffers() {

见下面的launchTasks与resourceOffers函数

launchTasks(scheduler.resourceOffers(

executorHost.toArray.map{case(id,
host)=>
newWorkerOffer(id,host,freeCores(id))}))

}

调用TaskSchedulerImpl.resourceOffers并传入注册的worker中executorid与host的kvarray.

defresourceOffers(offers: Seq[WorkerOffer]):
Seq[Seq[TaskDescription]] =synchronized {

SparkEnv.set(sc.env)

//Mark each slave as alive and remember its
hostname

for(o
<-offers) {

executorIdToHost(o.executorId)=
o.host

此部分主要是在worker注册时executorsByHost中还不存在时会执行,

if(!executorsByHost.contains(o.host)){

executorsByHost(o.host)=
newHashSet[String]()

executorGained(o.executorId,o.host)

}

}

offers表示有多少个注册的worker的executor,根据每一个worker中可能的cpucore个数生成可执行的task个数。

//Build a list of tasks to assign to each worker

valtasks
=offers.map(o => newArrayBuffer[TaskDescription](o.cores))

可分配的cpu个数,由此处可以看出每一个任务分配时最好按每个worker能分配的最大cpucore个数来分配。

valavailableCpus=
offers.map(o => o.cores).toArray

得到队列中的所有的TaskSetManager列表。

valsortedTaskSets=
rootPool.getSortedTaskSetQueue()

for(taskSet
<-sortedTaskSets){

logDebug("parentName:%s, name: %s, runningTasks: %s".format(

taskSet.parent.name,taskSet.name,taskSet.runningTasks))

}

计算task的Locality级别,launchedTask=false表示需要放大Locality的级别。

//Take each TaskSet in our scheduling order, and then offer it eachnode in increasing order

//of locality levels so that it gets a chance to launch local tasks onall of them.

varlaunchedTask=
false

计算task的Locality,此处是一个for的迭代调用,先从taskset列表中拿出一个tasetset,

子迭代是从PROCESS_LOCAL开始迭代locality的级别。

for(taskSet
<-sortedTaskSets;maxLocality<- TaskLocality.values) {

do{

launchedTask=
false

迭代调用每一个worker的值,从每一个worker中在taskset中选择task的执行级别,生成TaskDescription

for(i
<- 0until offers.size) {

得到迭代出的worker的executorid与host

valexecId
=offers(i).executorId

valhost
=offers(i).host

通过TaskSetManager.resourceOffer选择一个执行级别,通过此函数选择Locality级别时,

不能超过传入的maxLocality,每次生成一个task,

for(task
<-taskSet.resourceOffer(execId,host,availableCpus(i),maxLocality)){

每次生成一个task,把生成的task添加到上面的tasks列表中。

tasks(i)+=
task

valtid
=task.taskId

taskIdToTaskSetId(tid)=
taskSet.taskSet.id

taskSetTaskIds(taskSet.taskSet.id)+=
tid

taskIdToExecutorId(tid)=
execId

设置当前executorid设置到activeExecutorIds列表中,当有多个依赖的stage执行时,

第二个stage在submitTasks时,生成TaskSetManager时,会根据的activeExecutorIds值,

在pendingTasksForExecutor中生成等执行的PROCESS_LOCAL的pendingtasks.

activeExecutorIds+=
execId

把executor对应的host记录到executorsByHost容器中。

executorsByHost(host)+=
execId

当前worker中可用的cpucore的值需要减去一,这样能充分保证一个cpucore执行一个task

availableCpus(i) -=
1

这个值用来检查是否在当前的Locality级别中接着执行其它的task的分配,

如果这个值为true,不放大maxLocality的级别,从下一个worker中接着分配剩余的task

launchedTask=
true

}

}

}
while(launchedTask)

}

if(tasks.size>
0) {

设置hasLaunchedTask的值为true,表示task的执行分配完成,在上面提到过的检查线程中对线程执行停止操作。

hasLaunchedTask=
true

}

returntasks

}

10.1TaskSetManager.resourceOffer流程分析

defresourceOffer(

execId: String,

host:String,

availableCpus: Int,

maxLocality:TaskLocality.TaskLocality)

:Option[TaskDescription] =

{

如果完成的task个数小于要生成的总task个数,同时当前cpu可用的core个数和大于或等于一个配置的,默认1

if(tasksSuccessful<
numTasks&& availableCpus >=
CPUS_PER_TASK){

valcurTime
=clock.getTime()

通过现在执行task分配的时间减去上一次并从currentLocalityIndex的下标开始,

取出locality对应的task分配等待时间,如果时间超过了此配置,把下标值加一,

找到下一个locality的配置时间,按这方式找,直到找到ANY的值,具体可见下面的此方法说明

varallowedLocality=
getAllowedLocalityLevel(curTime)

如果通过的locality的级别超过了传入的最大locality级别,把级别设置为传入的最大级别

if(allowedLocality>
maxLocality) {

allowedLocality= maxLocality
// We‘re not allowed tosearch for farther-away tasks

}

findTask主要是从对应的pending的列表中根据对应的Locality拿到对应的task的下标,在TaskSet.tasks中的下标。

findTask(execId, host,allowedLocality)match{

caseSome((index,taskLocality))=>
{

//Found a task; do some bookkeeping and return a task description

valtask
=tasks(index)

valtaskId
=sched.newTaskId()

//Figure out whether this should count as a preferred launch

logInfo("Startingtask %s:%d as TID %s on executor %s: %s (%s)".format(

taskSet.id,index,taskId,execId,
host, taskLocality))

//Do various bookkeeping

copiesRunning(index) +=
1

valinfo
= newTaskInfo(taskId,index,curTime,execId,
host, taskLocality)

taskInfos(taskId)=
info

taskAttempts(index)=
info ::taskAttempts(index)

把分配此task的locality级别拿到对应的下标,并重新设置下标的值。

//Update our locality level for delay scheduling

currentLocalityIndex= getLocalityIndex(taskLocality)

把这次的task的分配时间设置成最后一次分配时间。

lastLaunchTime=
curTime

//Serialize and return the task

valstartTime
=clock.getTime()

//We rely on the DAGScheduler to catch non-serializableclosures
and RDDs, so in here

//we assume the task can be serialized without exceptions.

valserializedTask=
Task.serializeWithDependencies(

task,sched.sc.addedFiles,sched.sc.addedJars,ser)

valtimeTaken
=clock.getTime() - startTime

addRunningTask(taskId)

logInfo("Serializedtask %s:%d as %d bytes in %d ms".format(

taskSet.id,index,serializedTask.limit,timeTaken))

valtaskName
="task %s:%d".format(taskSet.id,index)

如果是第一次执行,通过DAGScheduler.taskStarted发送BeginEvent事件。

if(taskAttempts(index).size==
1)

taskStarted(task,info)

returnSome(newTaskDescription(taskId,execId,
taskName,index,serializedTask))

}

case_ =>

}

}

None

}

根据超时时间配置,如果这次分配task的时间减去上次task分配的时间超过了locality分配等待的配置时间,

把locality的级别向上移动一级,并重新比对时间,拿到不超时的locality级别或ANY的级别。

privatedefgetAllowedLocalityLevel(curTime:
Long): TaskLocality.TaskLocality = {

while(curTime -
lastLaunchTime>=
localityWaits(currentLocalityIndex)&&

currentLocalityIndex<
myLocalityLevels.length-
1)

{

下标值加一,也就是把当前的Locality的级别向上放大一级。

//Jump to the next locality level, and remove our waiting time for thecurrent one since

//we don‘t want to count it again on the next one

lastLaunchTime+=
localityWaits(currentLocalityIndex)

currentLocalityIndex+=
1

}

myLocalityLevels(currentLocalityIndex)

}

DAGScheduler中处理BeginEvent事件:

caseBeginEvent(task,taskInfo)=>

for(

job<-
idToActiveJob.get(task.stageId);

stage<-
stageIdToStage.get(task.stageId);

stageInfo<-
stageToInfos.get(stage)

) {

if(taskInfo.serializedSize>
TASK_SIZE_TO_WARN*
1024 &&

!stageInfo.emittedTaskSizeWarning){

stageInfo.emittedTaskSizeWarning=
true

logWarning(("Stage%d (%s) contains a task of very large "+

"size(%d KB). The maximum recommended task size is %d KB.").format(

task.stageId,stageInfo.name,taskInfo.serializedSize/
1024,TASK_SIZE_TO_WARN))

}

}

listenerBus.post(SparkListenerTaskStart(task,taskInfo))

    11.CoarseGrainedSchedulerBackend.launchTasks流程

执行task的执行,发送LaunchTask事件处理消息

    deflaunchTasks(tasks: Seq[Seq[TaskDescription]])
    {

for(task
<-tasks.flatten) {

freeCores(task.executorId) -=
1

根据worker注册时的actor,向此actor发送LaunchTask事件。

executorActor(task.executorId)!
LaunchTask(task)

}

}

12.启动task,由于是onyarn的模式,worker的actor在CoarseGrainedExecutorBackend.

处理代码如下:

caseLaunchTask(taskDesc)=>

logInfo("Gotassigned task " +
taskDesc.taskId)

if(executor==
null){

logError("ReceivedLaunchTask command but executor was null")

System.exit(1)

}
else{

executor.launchTask(this,taskDesc.taskId,taskDesc.serializedTask)

}

.............................

通过Executor启动task的执行。

其它actor的消息处理与task的具体执行与shuffle后面分析,这里先不做细的说明。

吐槽一把scala,这玩意编写代码是方便,但看起来有点麻烦呀。

Spark中的Scheduler

时间: 2024-12-19 15:23:35

Spark中的Scheduler的相关文章

Tachyon在Spark中的作用(Tachyon: Reliable, Memory Speed Storage for Cluster Computing Frameworks 论文阅读翻译)

摘要: Tachyon是一种分布式文件系统,能够借助集群计算框架使得数据以内存的速度进行共享.当今的缓存技术优化了read过程,可是,write过程由于须要容错机制,就须要通过网络或者是磁盘进行复制操作.Tachyon通过将"血统"技术引入到存储层进而消除了这个瓶颈.创建一个长期的以"血统机制"为基础的存储系统的关键挑战是失败情况发生的时候及时地进行数据恢复.Tachyon通过引入一种检查点的算法来解决问题,这样的方法保证了恢复过程的有限开销以及通过资源调度器下进行

Spark中资源与任务的关系

在介绍Spark中的任务和资源之前先解释几个名词: Dirver Program:运行Application的main函数(用户提交的jar包中的main函数)并新建SparkContext实例的程序,称为驱动程序,通常用SparkContext代表驱动程序(任务的驱动程序). Cluster Manager:集群管理器是集群资源管理的外部服务.Spark上现在主要有Standalone.YARN.Mesos3种集群资源管理器.Spark自带的Standalone模式能满足绝大部分 Spark计

初解,Scala语言中基于Actor的并发编程的机制,并展示了在Spark中基于Scala语言的Actor而产生的消息驱动框架Akka的使用,

Scala深入浅出实战中级--进阶经典(第66讲:Scala并发编程实战初体验及其在Spark源码中应用解析)内容介绍和视频链接 2015-07-24 DT大数据梦工厂 从明天起,做一个勤奋的人 看视频.下视频,分享视频 DT大数据梦工厂-Scala深入浅出实战中级--进阶经典:第66讲:Scala并发编程实战初体验及其在Spark源码中的应用解析 本期视频通过代码实战详解了Java语言基于加锁的并发编程模型的弊端以及Scala语言中基于Actor的并发编程的机制,并展示了在Spark中基于Sc

spark中的广播变量broadcast

Spark中的Broadcast处理 首先先来看一看broadcast的使用代码: val values = List[Int](1,2,3) val broadcastValues = sparkContext.broadcast(values) rdd.mapPartitions(iter => { broadcastValues.getValue.foreach(println) }) 在上面的代码中,首先生成了一个集合变量,把这个变量通过sparkContext的broadcast函数进

spark中的RDD以及DAG

今天,我们就先聊一下spark中的DAG以及RDD的相关的内容 1.DAG:有向无环图:有方向,无闭环,代表着数据的流向,这个DAG的边界则是Action方法的执行 2.如何将DAG切分stage,stage切分的依据:有宽依赖的时候要进行切分(shuffle的时候, 也就是数据有网络的传递的时候),则一个wordCount有两个stage, 一个是reduceByKey之前的,一个事reduceByKey之后的(图1), 则我们可以这样的理解,当我们要进行提交上游的数据的时候, 此时我们可以认

Spark中GraphX图运算pregel详解

由于本人文字表达能力不足,还是多多以代码形式表述,首先展示测试代码,然后解释: package com.txq.spark.test import org.apache.spark.graphx.util.GraphGeneratorsimport org.apache.spark.graphx._import org.apache.spark.rdd.RDDimport org.apache.spark.{SparkConf, SparkContext, SparkException, gra

Spark中的键值对操作-scala

1.PairRDD介绍     Spark为包含键值对类型的RDD提供了一些专有的操作.这些RDD被称为PairRDD.PairRDD提供了并行操作各个键或跨节点重新进行数据分组的操作接口.例如,PairRDD提供了reduceByKey()方法,可以分别规约每个键对应的数据,还有join()方法,可以把两个RDD中键相同的元素组合在一起,合并为一个RDD. 2.创建Pair RDD 程序示例:对一个英语单词组成的文本行,提取其中的第一个单词作为key,将整个句子作为value,建立 PairR

Spark 中 map 与 flatMap 的区别

通过一个实验来看Spark 中 map 与 flatMap 的区别. 步骤一:将测试数据放到hdfs上面 hadoopdfs -put data1/test1.txt /tmp/test1.txt 该测试数据有两行文本: 步骤二:在Spark中创建一个RDD来读取hdfs文件/tmp/test1.txt 步骤三:查看map函数的返回值 得到map函数返回的RDD: 查看map函数的返回值--文件中的每一行数据返回了一个数组对象 步骤四:查看flatMap函数的返回值 得到flatMap函数返回的

为什么spark中只有ALS

WRMF is like the classic rock of implicit matrix factorization. It may not be the trendiest, but it will never go out of style --Ethan Rosenthal 前言 spark平台推出至今已经地带到2.1的版本了,很多地方都有了重要的更新,加入了很多新的东西.但是在协同过滤这一块却一直以来都只有ALS一种算法.同样是大规模计算平台,Hadoop中的机器学习算法库Mah