MapReduce job在JobTracker初始化源码级分析

  mapreduce
job提交流程源码级分析(三)
中已经说明用户最终调用JobTracker.submitJob方法来向JobTracker提交作业。而这个方法的核心提交方法是JobTracker.addJob(JobID
jobId, JobInProgress
job)方法,这个addJob方法会把Job提交到调度器(默认是JobQueueTaskScheduler)的监听器JobQueueJobInProgressListener和EagerTaskInitializationListener(本文只讨论默认调度器)中,使用方法jobAdded(JobInProgress
job),JobQueueJobInProgressListener任务是监控各个JobInProcess生命周期中的变化;EagerTaskInitializationListener是发现有新Job后对其初始化的。

  一、JobQueueJobInProgressListener.jobAdded(JobInProgress
job)方法。就一句代码jobQueue.put(new JobSchedulingInfo(job.getStatus()),
job),先构建一个JobSchedulingInfo对象,然后和JobInProgress对应起来放入jobQueue中。JobSchedulingInfo类维护这调度这个job必备的一些信息,比如优先级(默认是NORMAL)、JobID以及开始时间startTime。

  二、EagerTaskInitializationListener.jobAdded(JobInProgress
job)方法。  


 1 /**
2 * We add the JIP to the jobInitQueue, which is processed
3 * asynchronously to handle split-computation and build up
4 * the right TaskTracker/Block mapping.
5 */
6 @Override
7 public void jobAdded(JobInProgress job) {
8 synchronized (jobInitQueue) {
9 jobInitQueue.add(job);  //添加进List<JobInProgress> jobInitQueue
10 resortInitQueue();
11 jobInitQueue.notifyAll();  //唤醒阻塞的进程
12 }
13
14 }

  上面方法中resortInitQueue()方法主要是对jobInitQueue中JobInProcess进行排序,先按照优先级排序,相同的再按开始时间。EagerTaskInitializationListener.start()在调度器初始化时JobQueueTaskScheduler.start()就调用了,所以先于jobAdded方法调用。EagerTaskInitializationListener.start()代码如下:

1 public void start() throws IOException {
2 this.jobInitManagerThread = new Thread(jobInitManager, "jobInitManager");
3 jobInitManagerThread.setDaemon(true);
4 this.jobInitManagerThread.start();
5 }

  start()方法会启动一个线程:JobInitManager。


 1 /////////////////////////////////////////////////////////////////
2 // Used to init new jobs that have just been created
3 /////////////////////////////////////////////////////////////////
4 class JobInitManager implements Runnable {
5
6 public void run() {
7 JobInProgress job = null;
8 while (true) {
9 try {
10 synchronized (jobInitQueue) {
11 while (jobInitQueue.isEmpty()) {
12 jobInitQueue.wait();
13 }
14 job = jobInitQueue.remove(0);
15 }
16 threadPool.execute(new InitJob(job));
17 } catch (InterruptedException t) {
18 LOG.info("JobInitManagerThread interrupted.");
19 break;
20 }
21 }
22 LOG.info("Shutting down thread pool");
23 threadPool.shutdownNow();
24 }
25 }
26
27 class InitJob implements Runnable {
28
29 private JobInProgress job;
30
31 public InitJob(JobInProgress job) {
32 this.job = job;
33 }
34
35 public void run() {
36 ttm.initJob(job);//对应JobTracker的对应方法
37 }
38 }

  JobInitManager线程的run方法是一个死循环始终监控jobInitQueue是否为空,不为空的话就取出0位置的JobInProgress,在InitJob线程中初始化:TaskTrackerManager.initJob(job)对应JobTracker的initJob方法。这里为什么会另起线程来初始化Job呢?原因很简单,就是可能jobInitQueue中同时会有很多JobInProgress,一个一个的初始化会比较慢,所以采用多线程的方式初始化。来看initJob方法的代码:


 1   public void initJob(JobInProgress job) {
2 if (null == job) {
3 LOG.info("Init on null job is not valid");
4 return;
5 }
6
7 try {
8 JobStatus prevStatus = (JobStatus)job.getStatus().clone();
9 LOG.info("Initializing " + job.getJobID());
10 job.initTasks(); //调用该实例的initTasks方 法,对job进行初始化
11 // Inform the listeners if the job state has changed
12 // Note : that the job will be in PREP state.
13 JobStatus newStatus = (JobStatus)job.getStatus().clone();
14 if (prevStatus.getRunState() != newStatus.getRunState()) {
15 JobStatusChangeEvent event =
16 new JobStatusChangeEvent(job, EventType.RUN_STATE_CHANGED, prevStatus,
17 newStatus);
18 synchronized (JobTracker.this) {
19 updateJobInProgressListeners(event);
20 }
21 }
22 } catch (KillInterruptedException kie) {
23 // If job was killed during initialization, job state will be KILLED
24 LOG.error("Job initialization interrupted:\n" +
25 StringUtils.stringifyException(kie));
26 killJob(job);
27 } catch (Throwable t) {
28 String failureInfo =
29 "Job initialization failed:\n" + StringUtils.stringifyException(t);
30 // If the job initialization is failed, job state will be FAILED
31 LOG.error(failureInfo);
32 job.getStatus().setFailureInfo(failureInfo);
33 failJob(job);
34 }
35 }

  首先是获取初始化前的状态prevStatus;然后是job.initTasks()初始化;在获取初始化的后的状态newStatus;

  job.initTasks()方法代码比较多,主要的工作是检查之后获取输入数据的分片信息TaskSplitMetaInfo[] splits =
createSplits(jobId)这是去读的上传到HDFS中的文件job.splitmetainfo和job.split,要确保numMapTasks ==
splits.length,然后构建numMapTasks个TaskInProgress作为MapTask,

MapReduce job在JobTracker初始化源码级分析,布布扣,bubuko.com

时间: 2024-09-29 21:12:20

MapReduce job在JobTracker初始化源码级分析的相关文章

监听器初始化Job、JobTracker相应TaskTracker心跳、调度器分配task源码级分析

JobTracker和TaskTracker分别启动之后(JobTracker启动流程源码级分析,TaskTracker启动过程源码级分析),taskTracker会通过心跳与JobTracker通信,并获取分配它的任务.用户将作业提交到JobTracker之后,放入相应的数据结构中,静等被分配.mapreduce job提交流程源码级分析(三)这篇文章已经分析了用户提交作业的最后步骤,主要是构造作业对应的JobInProgress并加入jobs,告知所有的JobInProgressListen

TableInputFormat分片及分片数据读取源码级分析

我们在MapReduce中TextInputFormat分片和读取分片数据源码级分析 这篇中以TextInputFormat为例讲解了InputFormat的分片过程以及RecordReader读取分片数据的过程.接下来咱们分析TableInputFormat的分片信息和数据读取过程. TableInputFormat这是专门处理基于HBase的MapReduce的输入数据的格式类.我们可以看看继承结构:(1)public class TableInputFormat extends Table

Flume-NG内置计数器(监控)源码级分析

Flume的内置监控怎么整?这个问题有很多人问.目前了解到的信息是可以使用Cloudera Manager.Ganglia有图形的监控工具,以及从浏览器获取json串,或者自定义向其他监控系统汇报信息.那监控的信息是什么呢?就是各个组件的统计信息,比如成功接收的Event数量.成功发送的Event数量,处理的Transaction的数量等等.而且不同的组件有不同的Countor来做统计,目前直到1.5版本仍然只对三大组件:source.sink.channel进行统计分别是SourceCount

Flume-NG(1.5版本)中SpillableMemoryChannel源码级分析

SpillableMemoryChannel是1.5版本新增的一个channel.这个channel优先将evnet放在内存中,一旦内存达到设定的容量就使用file channel写入磁盘.然后读的时候会按照顺序读取:会通过一个DrainOrderQueue来保证不管是内存中的还是溢出(本文的“溢出”指的是内存channel已满,需要使用file channel存储数据)文件中的顺序.这个Channel是memory channel和file channel的一个折中,虽然在内存中的数据仍然可能

Shell主要逻辑源码级分析(1)——SHELL运行流程

版权声明:本文由李航原创文章,转载请注明出处: 文章原文链接:https://www.qcloud.com/community/article/109 来源:腾云阁 https://www.qcloud.com/community 本文的目的:分享一下在学校的时候分析shell源码的一些收获,帮助大家了解shell的一个工作流程,从软件设计的角度,看看shell这样一个历史悠久的软件的一些设计优点和缺陷.本文重点不是讲SHELL语法,相信很多同事玩shell都很熟了. 本文的局限:限于本人技术水

源码级强力分析Hadoop的RPC机制

分析对象: hadoop版本:hadoop 0.20.203.0 必备技术点: 1. 动态代理(参考 :http://weixiaolu.iteye.com/blog/1477774 )2. Java NIO(参考 :http://weixiaolu.iteye.com/blog/1479656 )3. Java网络编程 目录: 一.RPC协议二.ipc.RPC源码分析三.ipc.Client源码分析四.ipc.Server源码分析 分析:  一.RPC协议 在分析协议之前,我觉得我们很有必要先

MapReduce中TextInputFormat分片和读取分片数据源码级分析

InputFormat主要用于描述输入数据的格式(我们只分析新API,即org.apache.hadoop.mapreduce.lib.input.InputFormat),提供以下两个功能: (1)数据切分:按照某个策略将输入数据切分成若干个split,以便确定MapTask个数以及对应的split: (2)为Mapper提供输入数据:读取给定的split的数据,解析成一个个的key/value对,供mapper使用. InputFormat有两个比较重要的方法:(1)List<InputSp

JobTracker启动流程源码级分析

org.apache.hadoop.mapred.JobTracker类是个独立的进程,有自己的main函数.JobTracker是在网络环境中提交及运行MR任务的核心位置. main方法主要代码有两句: 1 //创建jobTracker对象 2 JobTracker tracker = startTracker(new JobConf()); 3 //启动各个服务,包括JT内部一些重要的服务或者线程 4 tracker.offerService(); 一.startTracker(new Jo

mapreduce job提交流程源码级分析(三)

mapreduce job提交流程源码级分析(二)(原创)这篇文章说到了jobSubmitClient.submitJob(jobId, submitJobDir.toString(), jobCopy.getCredentials())提交job,最终调用的是JobTracker.submitJob:而这篇文章JobTracker启动流程源码级分析则是分析的JobTracker的启动过程,JobTracker启动之后就会等待提交作业管理作业等. 接下来看看JobTracker.submitJo