JobTracker最重要的功能之一是状态监控,包括TaskTracker、Job和Task等运行时状态的监控,其中TaskTracker状态监控比较简单,只要记录其最近心跳汇报时间和健康状况(由TaskTracker端的监控脚本检测,并通过心跳将结果发送给JobTracker)即可。
作业描述模型
如下图所示
JobTracker在其内部以“三层多叉树”的方式描述和跟踪每个作业的运行状态。JobTracker为每个作业创建一个JobInProgress对象以跟踪和监控其运行状态。该对象存在于作业的整个运行过程中:它在作业提交时创建,作业运行完成时销毁。同时,为了采用分而治之的策略解决问题,JobTracker会将每个作业拆分成若干个任务,并为每个任务创建一个TaskInProgress对象以跟踪和监控其运行状态,而任务在运行过程中,可能会因为软件Bug、硬件故障等原因运行失败,此时JobTracker会按照一定的策略重新运行该任务,也就是说,每个任务可能会尝试运行多次,直到运行成功或者因超过尝试次数而失败。JobTracker将每运行一次任务称为一次“任务运行尝试”,即Task Attempt。对于某个任务,只要有一个Task Attempt运行成功,则相应的TaskInProgress对象会标注该任务运行成功,而当所有的TaskInProgress均标注其对应的任务运行成功后,JobInProgress对象会标识整个作业运行成功。
为了区分各个作业,如图所示
JobTracker会赋予每个作业一个唯一的ID。该ID由三部分组成:作业前缀字符串、JobTracker启动时间和作业提交顺序,各部分通过“_”连接起来组成一个完整的作业ID,比如 job_201208071706_0009,对应的三部分分别是“job”、“201208071706”和“009”(JobTracker运行以来第9个作业)。每个任务的ID继承了作业的ID,并在此基础上进行了扩展,它由三部分组成:作业ID(其中前缀字符串变为“task”)、任务类型(map还是reduce)和任务编号(从000000开始,一直到999999)。比如,task_201208071706_0009_m_000000,表示它的作业ID为task_201208071706_0009,任务类型为map,任务编号为000000。每个Task Attempt的ID继承了任务的ID,它由两部分组成:任务ID(其中前缀字符串变为“attempt”)和运行尝试次数(从0开始),比如,attempt_201208071706_0009_m_000000_0表示任务task_201208071706_0009_m_000000的第0次尝试。
JobInProgress
JobInProgress类主要用于监控和跟踪作业运行状态,并为调度器提供最底层的调度接口。
JobInProgress维护了两种作业信息:一种是静态信息,这些信息是作业提交之时就已经确定好的;另一种是动态信息,这些信息随着作业的运行而动态变化。
(1)作业静态信息
作业静态信息是指作业提交之时就已经确定好的属性信息,主要包括以下几项:
org.apache.hadoop.mapred.JobInProgress.java
//map task, reduce task , cleanup task和setup task对应的TaskInProgress
TaskInProgress maps[] = new TaskInProgress[0];
TaskInProgress reduces[] = new TaskInProgress[0];
TaskInProgress cleanup[] = new TaskInProgress[0];
TaskInProgress setup[] = new TaskInProgress[0];
int numMapTasks = 0;//Map Task个数
int numReduceTasks = 0;//Reduce Task个数
final long memoryPerMap;//每个Map Task需要的内存量
final long memoryPerReduce;//每个Reduce Task需要的内存量
volatile int numSlotsPerMap = 1;//每个Map Task需要的slot个数
volatile int numSlotsPerReduce = 1;//每个Reduce Task需要的slot个数
/*允许每个TaskTracker上失败的Task个数,默认是4,通过参数mapred.max.tracker.failures设置。当该作业在某个TaskTracker上失败的个数超过该值时,会将该节点添加到该作业的黑名单中,调度器不再为该节点分配该作业的任务*/
final int maxTaskFailuresPerTracker;
private static float DEFAULT_COMPLETED_MAPS_PERCENT_FOR_REDUCE_SLOWSTART = 0.05f;//当有5%的Map Task完成后,才可以调度Reduce Task
int completedMapsForReduceSlowstart = 0;//多少Map Task完成后开始调度Reduce Task
final int mapFailuresPercent;//允许的Map Task失败比例上限,通过参数mapred.max.map.failures.percent设置
final int reduceFailuresPercent;//允许的Reduce Task失败比例上限,通过参数mapred.max.reduce.failures.percent设置
JobPriority priority = JobPriority.NORMAL;//作业优先级
(2)作业动态信息
作业动态信息是指作业运行过程中会动态更新的信息。这些信息对于发现TaskTracker/Job/Task故障非常有用,也可以为调度器进行任务调度提供决策依据。
int runningMapTasks = 0;//正在运行的Map Task数目
int runningReduceTasks = 0;//正在运行的Reduce Task数目
int finishedMapTasks = 0;//运行完成的Map Task数目
int finishedReduceTasks = 0;//运行完成的Reduce Task数目
int failedMapTasks = 0; //失败的Map Task Attempt数目
int failedReduceTasks = 0;//失败的Reduce Task Attempt数目
int speculativeMapTasks = 0;//正在运行的备份任务(MAP)数目
int speculativeReduceTasks = 0;//正在运行的备份任务(REDUCE)数目
int failedMapTIPs = 0;//失败的TaskInProgress(MAP)数目,这意味着对应的输入数据将被丢弃,不会产生最终结果
int failedReduceTIPs = 0;//失败的TaskInProgress(REDUCE)数目
private volatile boolean launchedCleanup = false;//是否已启动Cleanup Task
private volatile boolean launchedSetup = false;//是否已启动Setup Task
private volatile boolean jobKilled = false;//作业是否已被杀死
private volatile boolean jobFailed = false;//作业是否已失败
// NetworkTopology Node to the set of TIPs
Map<Node, List<TaskInProgress>> nonRunningMapCache;//节点与TaskInProgress的映射关系,即TaskInProgress输入数据位置与节点对应关系
// Map of NetworkTopology Node to set of running TIPs
Map<Node, Set<TaskInProgress>> runningMapCache;//节点及其上面正在运行的Task映射关系
// A list of non-local, non-running maps
final List<TaskInProgress> nonLocalMaps;//不需要考虑数据本地性的Map Task,如果一个Map Task的InputSplit Location为空,则进行任务调度时不需要考虑本地性
// Set of failed, non-running maps sorted by #failures
final SortedSet<TaskInProgress> failedMaps;//按照失败次数进行排序的TIP集合
// A set of non-local running maps
Set<TaskInProgress> nonLocalRunningMaps;//未运行的Map Task集合
// A list of non-running reduce TIPs
Set<TaskInProgress> nonRunningReduces;//未运行的Reduce Task集合
// A set of running reduce TIPs
Set<TaskInProgress> runningReduces;//正在运行的Reduce Task集合
// A list of cleanup tasks for the map task attempts, to be launched
List<TaskAttemptID> mapCleanupTasks = new LinkedList<TaskAttemptID>();//待清理的Map Task列表,比如用户直接通过命令“bin/hadoop job -kill”杀死的Task
// A list of cleanup tasks for the reduce task attempts, to be launched
List<TaskAttemptID> reduceCleanupTasks = new LinkedList<TaskAttemptID>();
long startTime;//作业提交时间
long launchTime;//作业开始执行时间
long finishTime;//作业完成时间
TaskInProgress
TaskInProgress类维护了一个Task运行过程中的全部信息。在Hadoop中,由于一个任务可能会推测执行或者重新执行,所以会存在多个Task Attempt,且同一时刻,可能有多个处理相同的任务尝试同时在执行,而这些任务被同一个TaskInProgress对象管理和跟踪,只要任何一个任务尝试运行成功,TaskInProgress就会标注该任务执行成功。
private final TaskSplitMetaInfo splitInfo;//Task要处理的Split信息
private int numMaps;//Map Task数目,只对Reduce Task有用
private int partition;//该Task在task列表中的索引
private JobTracker jobtracker;//JobTracker对象,用于获取全局时钟
private TaskID id;//task ID,其后面加下标构成Task Attempt ID
private JobInProgress job;//该TaskInProgress所在的JobInProgress
private final int numSlotsRequired;//运行该Task需要的slot数目
// Status of the TIP
private int successEventNumber = -1;
private int numTaskFailures = 0;//Task Attempt失败次数
private int numKilledTasks = 0;//Task Attempt被杀死次数
private double progress = 0;//任务运行进度
private String state = "";//运行状态
private long startTime = 0;//TaskInProgress对象创建时间
private long execStartTime = 0;//第一个Task Attempt开始运行时间
private long execFinishTime = 0;//最后一个运行成功的Task Attempt完成时间
private int completes = 0;//Task Attempt运行完成数目,实际只有两个值:0和1
private boolean failed = false;//该TaskInProgress是否运行失败
private boolean killed = false;//该TaskInProgress是否被杀死
private boolean jobCleanup = false; //该TaskInProgress是否为Cleanup Task
private boolean jobSetup = false;//该TaskInProgress是否为Setup Task
// The ‘next‘ usable taskid of this tip
int nextTaskId = 0;//该TaskInProgress的下一个可用Task Attempt ID
// The taskid that took this TIP to SUCCESS
private TaskAttemptID successfulTaskId;//使得该TaskInProgress运行成功的那个Task ID
// The first taskid of this tip
private TaskAttemptID firstTaskId;//第一个运行的Task Attemp的ID
// Map from task Id -> TaskTracker Id, contains tasks that are
// currently runnings
private TreeMap<TaskAttemptID, String> activeTasks = new TreeMap<TaskAttemptID, String>();//正在运行的Task ID与TaskTracker ID之间的映射关系
// All attempt Ids of this TIP
private TreeSet<TaskAttemptID> tasks = new TreeSet<TaskAttemptID>();//该TaskInProgress已运行的所有TaskAttempt ID,包括已经运行完成的和正在运行的
/**
* Map from taskId -> TaskStatus
*/
private TreeMap<TaskAttemptID,TaskStatus> taskStatuses =
new TreeMap<TaskAttemptID,TaskStatus>();//Task ID与TaskStatus映射关系
// Map from taskId -> TaskTracker Id,
// contains cleanup attempts and where they ran, if any
private TreeMap<TaskAttemptID, String> cleanupTasks =
new TreeMap<TaskAttemptID, String>();//Cleanup Task ID与TaskTracker ID映射关系
private TreeSet<String> machinesWhereFailed = new TreeSet<String>();//所有已经运行失败的Task所在的节点列表
private TreeSet<TaskAttemptID> tasksReportedClosed = new TreeSet<TaskAttemptID>();//某个Task Attempt运行成功后,其他所有正在运行的Task Attempt保存在该集合中
//list of tasks to kill, <taskid> -> <shouldFail>
private TreeMap<TaskAttemptID, Boolean> tasksToKill = new TreeMap<TaskAttemptID, Boolean>();//待杀死的Task列表
//task to commit, <taskattemptid>
private TaskAttemptID taskToCommit;//等待被提交的Task Attempt,该Task Attempt最终使得TaskInProgress运行成功