Fair Scheduler中的Delay Schedule分析

  延迟调度的主要目的是提高数据本地性(data locality),减少数据在网络中的传输。对于那些输入数据不在本地的MapTask,调度器将会延迟调度他们,而把slot分配给那些具备本地性的MapTask。

  延迟调度的大体思想如下:

  若该job找到一个node-local的MapTask,则返回该task;若找不到,则延迟调度。即在nodeLocalityDelay时长内,重新找到一个node-local的MapTask并返回;

  否则等待时长超过nodeLocalityDelay之后,寻找一个rack-local的MapTask并返回;若找不到,则延迟调度。即在rackLocalityDelay时长内,重新找到一个rack-local的MapTask并返回;

  否则等待超过nodeLocalityDelay + rackLocalityDelay之后,重新寻找一个off-switch的MapTask并返回。

  FairScheduler.java中关于延迟调度的主要变量:

1 long nodeLocalityDelay://node-local已经等待的时间
2 long rackLocalityDelay: //rack-local已经等待的时间
3 boolean skippedAtLastHeartbeat://该job是否被延迟调度(是否被跳过)
4 timeWaitedForLocalMap://自从上次MapTask被分配以来等待的时间
5 LocalityLevel lastMapLocalityLevel://上次分配的MapTask对应的本地级别
6 nodeLocalityDelay = rackLocalityDelay =
7   Math.min(15000 ,  (long) (1.5 * jobTracker.getNextHeartbeatInterval()));

  

  在fair scheduler中,每个job维护了两个变量用来完成延迟调度:最后一个被调度的MapTask的本地性级别(lastMapLocalityLevel)与自从这个job被跳过以来所等待的时间(timeWaitedForLocalMap)。工作流程如下(具体工作在FairScheduler.java的getAllowedLocalityLevel ()方法中完成):

 1 /**
 2    * Get the maximum locality level at which a given job is allowed to
 3    * launch tasks, based on how long it has been waiting for local tasks.
 4    * This is used to implement the "delay scheduling" feature of the Fair
 5    * Scheduler for optimizing data locality.
 6    * If the job has no locality information (e.g. it does not use HDFS), this
 7    * method returns LocalityLevel.ANY, allowing tasks at any level.
 8    * Otherwise, the job can only launch tasks at its current locality level
 9    * or lower, unless it has waited at least nodeLocalityDelay or
10    * rackLocalityDelay milliseconds depends on the current level. If it
11    * has waited (nodeLocalityDelay + rackLocalityDelay) milliseconds,
12    * it can go to any level.
13    */
14   protected LocalityLevel getAllowedLocalityLevel(JobInProgress job,
15       long currentTime) {
16     JobInfo info = infos.get(job);
17     if (info == null) { // Job not in infos (shouldn‘t happen)
18       LOG.error("getAllowedLocalityLevel called on job " + job
19           + ", which does not have a JobInfo in infos");
20       return LocalityLevel.ANY;
21     }
22     if (job.nonLocalMaps.size() > 0) { // Job doesn‘t have locality information
23       return LocalityLevel.ANY;
24     }
25     // Don‘t wait for locality if the job‘s pool is starving for maps
26     Pool pool = poolMgr.getPool(job);
27     PoolSchedulable sched = pool.getMapSchedulable();
28     long minShareTimeout = poolMgr.getMinSharePreemptionTimeout(pool.getName());
29     long fairShareTimeout = poolMgr.getFairSharePreemptionTimeout();
30     if (currentTime - sched.getLastTimeAtMinShare() > minShareTimeout ||
31         currentTime - sched.getLastTimeAtHalfFairShare() > fairShareTimeout) {
32       eventLog.log("INFO", "No delay scheduling for "
33           + job.getJobID() + " because it is being starved");
34       return LocalityLevel.ANY;
35     }
36     // In the common case, compute locality level based on time waited
37     switch(info.lastMapLocalityLevel) {
38     case NODE: // Last task launched was node-local
39       if (info.timeWaitedForLocalMap >=
40           nodeLocalityDelay + rackLocalityDelay)
41         return LocalityLevel.ANY;
42       else if (info.timeWaitedForLocalMap >= nodeLocalityDelay)
43         return LocalityLevel.RACK;
44       else
45         return LocalityLevel.NODE;
46     case RACK: // Last task launched was rack-local
47       if (info.timeWaitedForLocalMap >= rackLocalityDelay)
48         return LocalityLevel.ANY;
49       else
50         return LocalityLevel.RACK;
51     default: // Last task was non-local; can launch anywhere
52       return LocalityLevel.ANY;
53     }
54   }

getAllowedLocalityLevel()

1. 若lastMapLocalityLevel为Node:

1)若timeWaitedForLocalMap >= nodeLocalityDelay + rackLocalityDelay,则可以调度off-switch及以下级别的MapTask;

2)若timeWaitedForLocalMap >= nodeLocalityDelay,则可以调度rack-local及以下级别的MapTask;

3)否则调度node-local级别的MapTask。

2. 若lastMapLocalityLevel为Rack:

1)若timeWaitedForLocalMap >= rackLocalityDelay,则调度off-switch及以下级别的MapTask;

2)否则调度rack-local及以下级别的MapTask;

3. 否则调度off-switch及以下级别的MapTask;

  延迟调度的具体工作流程如下(具体工作在FairScheduler.java的assignTasks()方法中完成):

  1 @Override
  2   public synchronized List<Task> assignTasks(TaskTracker tracker)
  3       throws IOException {
  4     if (!initialized) // Don‘t try to assign tasks if we haven‘t yet started up
  5       return null;
  6     String trackerName = tracker.getTrackerName();
  7     eventLog.log("HEARTBEAT", trackerName);
  8     long currentTime = clock.getTime();
  9
 10     // Compute total runnable maps and reduces, and currently running ones
 11     int runnableMaps = 0;
 12     int runningMaps = 0;
 13     int runnableReduces = 0;
 14     int runningReduces = 0;
 15     for (Pool pool: poolMgr.getPools()) {
 16       runnableMaps += pool.getMapSchedulable().getDemand();
 17       runningMaps += pool.getMapSchedulable().getRunningTasks();
 18       runnableReduces += pool.getReduceSchedulable().getDemand();
 19       runningReduces += pool.getReduceSchedulable().getRunningTasks();
 20     }
 21
 22     ClusterStatus clusterStatus = taskTrackerManager.getClusterStatus();
 23     // Compute total map/reduce slots
 24     // In the future we can precompute this if the Scheduler becomes a
 25     // listener of tracker join/leave events.
 26     int totalMapSlots = getTotalSlots(TaskType.MAP, clusterStatus);
 27     int totalReduceSlots = getTotalSlots(TaskType.REDUCE, clusterStatus);
 28
 29     eventLog.log("RUNNABLE_TASKS",
 30         runnableMaps, runningMaps, runnableReduces, runningReduces);
 31
 32     // Update time waited for local maps for jobs skipped on last heartbeat
 33     //备注一
 34     updateLocalityWaitTimes(currentTime);
 35
 36     // Check for JT safe-mode
 37     if (taskTrackerManager.isInSafeMode()) {
 38       LOG.info("JobTracker is in safe-mode, not scheduling any tasks.");
 39       return null;
 40     }
 41
 42     TaskTrackerStatus tts = tracker.getStatus();
 43
 44     int mapsAssigned = 0; // loop counter for map in the below while loop
 45     int reducesAssigned = 0; // loop counter for reduce in the below while
 46     int mapCapacity = maxTasksToAssign(TaskType.MAP, tts);
 47     int reduceCapacity = maxTasksToAssign(TaskType.REDUCE, tts);
 48     boolean mapRejected = false; // flag used for ending the loop
 49     boolean reduceRejected = false; // flag used for ending the loop
 50
 51     // Keep track of which jobs were visited for map tasks and which had tasks
 52     // launched, so that we can later mark skipped jobs for delay scheduling
 53     Set<JobInProgress> visitedForMap = new HashSet<JobInProgress>();
 54     Set<JobInProgress> visitedForReduce = new HashSet<JobInProgress>();
 55     Set<JobInProgress> launchedMap = new HashSet<JobInProgress>();
 56
 57     ArrayList<Task> tasks = new ArrayList<Task>();
 58     // Scan jobs to assign tasks until neither maps nor reduces can be assigned
 59     //备注二
 60     while (true) {
 61       // Computing the ending conditions for the loop
 62       // Reject a task type if one of the following condition happens
 63       // 1. number of assigned task reaches per heatbeat limit
 64       // 2. number of running tasks reaches runnable tasks
 65       // 3. task is rejected by the LoadManager.canAssign
 66       if (!mapRejected) {
 67         if (mapsAssigned == mapCapacity ||
 68             runningMaps == runnableMaps ||
 69             !loadMgr.canAssignMap(tts, runnableMaps,
 70                 totalMapSlots, mapsAssigned)) {
 71           eventLog.log("INFO", "Can‘t assign another MAP to " + trackerName);
 72           mapRejected = true;
 73         }
 74       }
 75       if (!reduceRejected) {
 76         if (reducesAssigned == reduceCapacity ||
 77             runningReduces == runnableReduces ||
 78             !loadMgr.canAssignReduce(tts, runnableReduces,
 79                 totalReduceSlots, reducesAssigned)) {
 80           eventLog.log("INFO", "Can‘t assign another REDUCE to " + trackerName);
 81           reduceRejected = true;
 82         }
 83       }
 84       // Exit while (true) loop if
 85       // 1. neither maps nor reduces can be assigned
 86       // 2. assignMultiple is off and we already assigned one task
 87       if (mapRejected && reduceRejected ||
 88           !assignMultiple && tasks.size() > 0) {
 89         break; // This is the only exit of the while (true) loop
 90       }
 91
 92       // Determine which task type to assign this time
 93       // First try choosing a task type which is not rejected
 94       TaskType taskType;
 95       if (mapRejected) {
 96         taskType = TaskType.REDUCE;
 97       } else if (reduceRejected) {
 98         taskType = TaskType.MAP;
 99       } else {
100         // If both types are available, choose the task type with fewer running
101         // tasks on the task tracker to prevent that task type from starving
102         if (tts.countMapTasks() + mapsAssigned <=
103             tts.countReduceTasks() + reducesAssigned) {
104           taskType = TaskType.MAP;
105         } else {
106           taskType = TaskType.REDUCE;
107         }
108       }
109
110       // Get the map or reduce schedulables and sort them by fair sharing
111       List<PoolSchedulable> scheds = getPoolSchedulables(taskType);
112       //对job进行排序
113       Collections.sort(scheds, new SchedulingAlgorithms.FairShareComparator());
114       boolean foundTask = false;
115       //备注三
116       for (Schedulable sched: scheds) { // This loop will assign only one task
117         eventLog.log("INFO", "Checking for " + taskType +
118             " task in " + sched.getName());
119         //备注四
120         Task task = taskType == TaskType.MAP ?
121                     sched.assignTask(tts, currentTime, visitedForMap) :
122                     sched.assignTask(tts, currentTime, visitedForReduce);
123         if (task != null) {
124           foundTask = true;
125           JobInProgress job = taskTrackerManager.getJob(task.getJobID());
126           eventLog.log("ASSIGN", trackerName, taskType,
127               job.getJobID(), task.getTaskID());
128           // Update running task counts, and the job‘s locality level
129           if (taskType == TaskType.MAP) {
130             launchedMap.add(job);
131             mapsAssigned++;
132             runningMaps++;
133             //备注五
134             updateLastMapLocalityLevel(job, task, tts);
135           } else {
136             reducesAssigned++;
137             runningReduces++;
138           }
139           // Add task to the list of assignments
140           tasks.add(task);
141           break; // This break makes this loop assign only one task
142         } // end if(task != null)
143       } // end for(Schedulable sched: scheds)
144
145       // Reject the task type if we cannot find a task
146       if (!foundTask) {
147         if (taskType == TaskType.MAP) {
148           mapRejected = true;
149         } else {
150           reduceRejected = true;
151         }
152       }
153     } // end while (true)
154
155     // Mark any jobs that were visited for map tasks but did not launch a task
156     // as skipped on this heartbeat
157     for (JobInProgress job: visitedForMap) {
158       if (!launchedMap.contains(job)) {
159         infos.get(job).skippedAtLastHeartbeat = true;
160       }
161     }
162
163     // If no tasks were found, return null
164     return tasks.isEmpty() ? null : tasks;
165   }

assignTasks()

  备注一:updateLocalityWaitTimes()。首先更新自上次心跳以来,timeWaitedForLocalMap的时间,并将所有job 的skippedAtLastHeartbeat设为false;代码如下:

 1 /**
 2    * Update locality wait times for jobs that were skipped at last heartbeat.
 3    */
 4   private void updateLocalityWaitTimes(long currentTime) {
 5     long timeSinceLastHeartbeat =
 6       (lastHeartbeatTime == 0 ? 0 : currentTime - lastHeartbeatTime);
 7     lastHeartbeatTime = currentTime;
 8     for (JobInfo info: infos.values()) {
 9       if (info.skippedAtLastHeartbeat) {
10         info.timeWaitedForLocalMap += timeSinceLastHeartbeat;
11         info.skippedAtLastHeartbeat = false;
12       }
13     }
14   }

updateLocalityWaitTimes()

  备注二:在while(true)循环中不断分配MapTask和ReduceTask,直到没有可被分配的为止;在循环中对所有job进行排序;接着在一个for()循环中进行真正的MapTask分配(Schedulable有两个子类,分别代表PoolSchedulable与JobSchedulable。这里的Schedulable可当做job看待)。

  备注三、四:在for()循环里,JobSchedulable中的assignTask()方法会被调用,来选择适当的MapTask或者ReduceTask。在选择MapTask时,先会调用FairScheduler.getAllowedLocalityLevel()方法来确定应该调度哪个级别的MapTask(具体的方法分析见上),然后根据该方法的返回值来选择对应级别的MapTask。assignTask()方法代码如下:

 1 @Override
 2   public Task assignTask(TaskTrackerStatus tts, long currentTime,
 3       Collection<JobInProgress> visited) throws IOException {
 4     if (isRunnable()) {
 5       visited.add(job);
 6       TaskTrackerManager ttm = scheduler.taskTrackerManager;
 7       ClusterStatus clusterStatus = ttm.getClusterStatus();
 8       int numTaskTrackers = clusterStatus.getTaskTrackers();
 9
10       // check with the load manager whether it is safe to
11       // launch this task on this taskTracker.
12       LoadManager loadMgr = scheduler.getLoadManager();
13       if (!loadMgr.canLaunchTask(tts, job, taskType)) {
14         return null;
15       }
16       if (taskType == TaskType.MAP) {
17           //确定应该调度的级别
18         LocalityLevel localityLevel = scheduler.getAllowedLocalityLevel(
19             job, currentTime);
20         scheduler.getEventLog().log(
21             "ALLOWED_LOC_LEVEL", job.getJobID(), localityLevel);
22         switch (localityLevel) {
23           case NODE:
24             return job.obtainNewNodeLocalMapTask(tts, numTaskTrackers,
25                 ttm.getNumberOfUniqueHosts());
26           case RACK:
27             return job.obtainNewNodeOrRackLocalMapTask(tts, numTaskTrackers,
28                 ttm.getNumberOfUniqueHosts());
29           default:
30             return job.obtainNewMapTask(tts, numTaskTrackers,
31                 ttm.getNumberOfUniqueHosts());
32         }
33       } else {
34         return job.obtainNewReduceTask(tts, numTaskTrackers,
35             ttm.getNumberOfUniqueHosts());
36       }
37     } else {
38       return null;
39     }
40   }

assignTask()

  可以看到,在该方法中又会根据相应的级别调用JobInProgress类中的方法来获取该级别的MapTask。

  备注五:最后updateLastMapLocalityLevel()方法会更新该job的一些信息:lastMapLocalityLevel设为该job对应的级别;timeWaitedForLocalMap置为0。

 1   /**
 2    * Update a job‘s locality level and locality wait variables given that that
 3    * it has just launched a map task on a given task tracker.
 4    */
 5   private void updateLastMapLocalityLevel(JobInProgress job,
 6       Task mapTaskLaunched, TaskTrackerStatus tracker) {
 7     JobInfo info = infos.get(job);
 8     boolean isNodeGroupAware = conf.getBoolean(
 9         "net.topology.nodegroup.aware", false);
10     LocalityLevel localityLevel = LocalityLevel.fromTask(
11         job, mapTaskLaunched, tracker, isNodeGroupAware);
12     info.lastMapLocalityLevel = localityLevel;
13     info.timeWaitedForLocalMap = 0;
14     eventLog.log("ASSIGNED_LOC_LEVEL", job.getJobID(), localityLevel);
15   }

updateLastMapLocalityLevel()

  本文基于hadoop1.2.1。如有错误,还请指正

  参考文章: 《Hadoop技术内幕 深入理解MapReduce架构设计与实现原理》 董西成

    https://issues.apache.org/jira/secure/attachment/12457515/fair_scheduler_design_doc.pdf

  转载请注明出处:http://www.cnblogs.com/gwgyk/p/4568270.html

时间: 2024-10-04 13:20:10

Fair Scheduler中的Delay Schedule分析的相关文章

Cocos2d-x 源代码分析 : Scheduler(定时器) 源代码分析

源代码版本号 3.1r,转载请注明 我也最终不out了,開始看3.x的源代码了.此时此刻的心情仅仅能是wtf! !!!!!!! !.只是也最终告别CC时代了. cocos2d-x 源代码分析文件夹 http://blog.csdn.net/u011225840/article/details/31743129 1.继承结构 没错.是两张图.(你没有老眼昏花. .我脑子也没有秀逗..)Ref就是原来的CCObject.而Timer类是与Scheduler类密切相关的类,所以须要把他们放在一起说.T

Fair Scheduler与Capacity Scheduler比较

1.Fair Scheduler Facebook开发的适合共享环境的调度器,支持多用户多分组管理,每个分组可以配置资源量,也可限制每个用户和每个分组中的并发运行作业数量:每个用户的作业有优先级,优先级越高分配的资源越多. 2.Capacity Scheduler Yahoo开发的适合共享环境的调度器,支持多用户多队列管理,每个队列可以配置资源量,也可限制每个用户和每个队列的并发运行作业数量,也可限制每个作业使用的内存量:每个用户的作业有优先级,在单个队列中,作业按照先来先服务(实际上是先按照优

Fair Scheduler

与Capacity Scheduler类似,Fair Scheduler以队列为单位划分资源,每个队列可设定一定比例的资源最低保证和使用上限,同时,每个用户也可设定一定的资源使用上限以防止资源滥用:当一个队列的资源有剩余时,可暂时将剩余的资源共享给其他队列.Fair Scheduler与Capacity Scheduler不同之处主要体现在以下几个方面: 资源公平共享:在每个队列中,Fair Scheduler可选择按照FIFO.Fair或DRF策略为应用程序分配资源.Fair策略即平均分配,默

DEBUG模式下, 内存中的变量地址分析

测试函数的模板实现 [cpp] view plain copy /// @file my_template.h /// @brief 测试数据类型用的模板实现 #ifndef MY_TEMPLATE_H_2016_0123_1226 #define MY_TEMPLATE_H_2016_0123_1226 template<int iArySize> void fnTestDataType() { char szBuf[iArySize] = {'\0'}; unsigned short wT

linux中mmap系统调用原理分析与实现

参考文章:http://blog.csdn.net/shaoguangleo/article/details/5822110 linux中mmap系统调用原理分析与实现 1.mmap系统调用(功能)      void* mmap ( void * addr , size_t len , int prot , int flags ,int fd , off_t offset )      内存映射函数mmap, 负责把文件内容映射到进程的虚拟内存空间, 通过对这段内存的读取和修改,来实现对文件的

C++中几个值得分析的小问题(2)

下面有3个小问题,作为C++ Beginner你一定要知道错在哪里了. 1.派生类到基类的引用或指针转换一定“完美”存在? 一般情况,你很可能会认为:派生类对象的引用或指针转换为基类对象的引用或指针是一件很正常的事.那要是不一般情况呢?请看下面这个例子: class Person { public: Person(const string& str = "Normal Person") : ID(str) {} string ID; //作为一般的人身份是“普通人”,作为学生身

Linux中块设备驱动程序分析

基于<Linux设备驱动程序>书中的sbull程序以对Linux块设备驱动总结分析. 开始之前先来了解这个块设备中的核心数据结构: struct sbull_dev { int size;                       /* Device size in sectors */ u8 *data;                       /* The data array */ short users;                    /* How many users

自定义Adapter中实现startActivityForResult的分析

最近几天在做文件上传的时候,想在自定义Adapter中启动activity时也返回Intent数据,于是想到了用startActivityForResult,可是用mContext怎么也调不出这个方法,只能调用startActivity这个方法,于是在网上搜一下,可以利用一个方式可以间接的解决这个问题,果断贴代码: Intent mIntent = new Intent(mContext,clazz);((Activity) mContext).startActivityForResult(mI

Java异常打印输出中常见方法的分析

Java异常是在Java应用中的警报器,在出现异常的情况下,可以帮助我们程序猿们快速定位问题的类型以及位置.但是一般在我们的项目中,由于经验阅历等多方面的原因,依然有若干的童鞋在代码中没有正确的使用异常打印方法,导致在项目的后台日志中,没有收到日志或者日志信息不完整等情况的发生,这些都给项目埋下了若干隐患.本文将深入分析在异常日志打印过程中的若干情况,并给出若干的使用建议. 1. Java异常Exception的结构分析 我们通常所说的Exception主要是继承于Throwable而来,可以参