mapreduce
job提交流程源码级分析(三)中已经说明用户最终调用JobTracker.submitJob方法来向JobTracker提交作业。而这个方法的核心提交方法是JobTracker.addJob(JobID
jobId, JobInProgress
job)方法,这个addJob方法会把Job提交到调度器(默认是JobQueueTaskScheduler)的监听器JobQueueJobInProgressListener和EagerTaskInitializationListener(本文只讨论默认调度器)中,使用方法jobAdded(JobInProgress
job),JobQueueJobInProgressListener任务是监控各个JobInProcess生命周期中的变化;EagerTaskInitializationListener是发现有新Job后对其初始化的。
一、JobQueueJobInProgressListener.jobAdded(JobInProgress
job)方法。就一句代码jobQueue.put(new JobSchedulingInfo(job.getStatus()),
job),先构建一个JobSchedulingInfo对象,然后和JobInProgress对应起来放入jobQueue中。JobSchedulingInfo类维护这调度这个job必备的一些信息,比如优先级(默认是NORMAL)、JobID以及开始时间startTime。
二、EagerTaskInitializationListener.jobAdded(JobInProgress
job)方法。
1 /**
2 * We add the JIP to the jobInitQueue, which is processed
3 * asynchronously to handle split-computation and build up
4 * the right TaskTracker/Block mapping.
5 */
6 @Override
7 public void jobAdded(JobInProgress job) {
8 synchronized (jobInitQueue) {
9 jobInitQueue.add(job); //添加进List<JobInProgress> jobInitQueue
10 resortInitQueue();
11 jobInitQueue.notifyAll(); //唤醒阻塞的进程
12 }
13
14 }
上面方法中resortInitQueue()方法主要是对jobInitQueue中JobInProcess进行排序,先按照优先级排序,相同的再按开始时间。EagerTaskInitializationListener.start()在调度器初始化时JobQueueTaskScheduler.start()就调用了,所以先于jobAdded方法调用。EagerTaskInitializationListener.start()代码如下:
1 public void start() throws IOException {
2 this.jobInitManagerThread = new Thread(jobInitManager, "jobInitManager");
3 jobInitManagerThread.setDaemon(true);
4 this.jobInitManagerThread.start();
5 }
start()方法会启动一个线程:JobInitManager。
1 /////////////////////////////////////////////////////////////////
2 // Used to init new jobs that have just been created
3 /////////////////////////////////////////////////////////////////
4 class JobInitManager implements Runnable {
5
6 public void run() {
7 JobInProgress job = null;
8 while (true) {
9 try {
10 synchronized (jobInitQueue) {
11 while (jobInitQueue.isEmpty()) {
12 jobInitQueue.wait();
13 }
14 job = jobInitQueue.remove(0);
15 }
16 threadPool.execute(new InitJob(job));
17 } catch (InterruptedException t) {
18 LOG.info("JobInitManagerThread interrupted.");
19 break;
20 }
21 }
22 LOG.info("Shutting down thread pool");
23 threadPool.shutdownNow();
24 }
25 }
26
27 class InitJob implements Runnable {
28
29 private JobInProgress job;
30
31 public InitJob(JobInProgress job) {
32 this.job = job;
33 }
34
35 public void run() {
36 ttm.initJob(job);//对应JobTracker的对应方法
37 }
38 }
JobInitManager线程的run方法是一个死循环始终监控jobInitQueue是否为空,不为空的话就取出0位置的JobInProgress,在InitJob线程中初始化:TaskTrackerManager.initJob(job)对应JobTracker的initJob方法。这里为什么会另起线程来初始化Job呢?原因很简单,就是可能jobInitQueue中同时会有很多JobInProgress,一个一个的初始化会比较慢,所以采用多线程的方式初始化。来看initJob方法的代码:
1 public void initJob(JobInProgress job) {
2 if (null == job) {
3 LOG.info("Init on null job is not valid");
4 return;
5 }
6
7 try {
8 JobStatus prevStatus = (JobStatus)job.getStatus().clone();
9 LOG.info("Initializing " + job.getJobID());
10 job.initTasks(); //调用该实例的initTasks方 法,对job进行初始化
11 // Inform the listeners if the job state has changed
12 // Note : that the job will be in PREP state.
13 JobStatus newStatus = (JobStatus)job.getStatus().clone();
14 if (prevStatus.getRunState() != newStatus.getRunState()) {
15 JobStatusChangeEvent event =
16 new JobStatusChangeEvent(job, EventType.RUN_STATE_CHANGED, prevStatus,
17 newStatus);
18 synchronized (JobTracker.this) {
19 updateJobInProgressListeners(event);
20 }
21 }
22 } catch (KillInterruptedException kie) {
23 // If job was killed during initialization, job state will be KILLED
24 LOG.error("Job initialization interrupted:\n" +
25 StringUtils.stringifyException(kie));
26 killJob(job);
27 } catch (Throwable t) {
28 String failureInfo =
29 "Job initialization failed:\n" + StringUtils.stringifyException(t);
30 // If the job initialization is failed, job state will be FAILED
31 LOG.error(failureInfo);
32 job.getStatus().setFailureInfo(failureInfo);
33 failJob(job);
34 }
35 }
首先是获取初始化前的状态prevStatus;然后是job.initTasks()初始化;在获取初始化的后的状态newStatus;
job.initTasks()方法代码比较多,主要的工作是检查之后获取输入数据的分片信息TaskSplitMetaInfo[] splits =
createSplits(jobId)这是去读的上传到HDFS中的文件job.splitmetainfo和job.split,要确保numMapTasks ==
splits.length,然后构建numMapTasks个TaskInProgress作为MapTask,
MapReduce job在JobTracker初始化源码级分析,布布扣,bubuko.com