mapreduce job提交流程源码级分析(三)

  mapreduce
job提交流程源码级分析(二)(原创)
这篇文章说到了jobSubmitClient.submitJob(jobId,
submitJobDir.toString(),
jobCopy.getCredentials())提交job,最终调用的是JobTracker.submitJob;而这篇文章JobTracker启动流程源码级分析则是分析的JobTracker的启动过程,JobTracker启动之后就会等待提交作业管理作业等。

  接下来看看JobTracker.submitJob方法,调用这个方法之前已经将相关的资源分片信息、配置信息、外部文件、第三方jar包、一些归档文件以及job.jar上传到HDFS中了。


 1  public JobStatus submitJob(JobID jobId, String jobSubmitDir, Credentials ts)
2 throws IOException {
3 JobInfo jobInfo = null;
4 UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
5 synchronized (this) {
6 if (jobs.containsKey(jobId)) {
7 // job already running, don‘t start twice
8 return jobs.get(jobId).getStatus();
9 }
10 jobInfo = new JobInfo(jobId, new Text(ugi.getShortUserName()),
11 new Path(jobSubmitDir));
12 }
13
14 // Create the JobInProgress, do not lock the JobTracker since
15 // we are about to copy job.xml from HDFS
16 //当JobTracker接收到新的job请求(即submitJob()函数被调用)后,
17 //会创建一个JobInProgress对象并通过它来管理和调度任务。
18 //JobInProgress在创建的时候会初始化一系列与任务有关的参数,调用到FileSystem,
19 //把在JobClient端上传的所有任务文件下载到本地的文件系统中的临时目录里。这其中包括上传的*.jar文件包、
20 //记录配置信息的xml、记录分割信息的文件。
21 JobInProgress job = null;
22 try {
23 job = new JobInProgress(this, this.conf, jobInfo, 0, ts);
24 } catch (Exception e) {
25 throw new IOException(e);
26 }
27
28 synchronized (this) {
29 // check if queue is RUNNING
30 String queue = job.getProfile().getQueueName();
31 if (!queueManager.isRunning(queue)) {
32 throw new IOException("Queue \"" + queue + "\" is not running");
33 }
34 try {
35 aclsManager.checkAccess(job, ugi, Operation.SUBMIT_JOB);
36 } catch (IOException ioe) {
37 LOG.warn("Access denied for user " + job.getJobConf().getUser()
38 + ". Ignoring job " + jobId, ioe);
39 job.fail();
40 throw ioe;
41 }
42
43 // Check the job if it cannot run in the cluster because of invalid memory
44 // requirements.
45 try {
46 checkMemoryRequirements(job);
47 } catch (IOException ioe) {
48 throw ioe;
49 }
50 boolean recovered = true; // TODO: Once the Job recovery code is there,
51 // (MAPREDUCE-873) we
52 // must pass the "recovered" flag accurately.
53 // This is handled in the trunk/0.22
54 if (!recovered) {
55 // Store the information in a file so that the job can be recovered
56 // later (if at all)
57 Path jobDir = getSystemDirectoryForJob(jobId);
58 FileSystem.mkdirs(fs, jobDir, new FsPermission(SYSTEM_DIR_PERMISSION));
59 FSDataOutputStream out = fs.create(getSystemFileForJob(jobId));
60 jobInfo.write(out);
61 out.close();
62 }
63
64 // Submit the job
65 JobStatus status;
66 try {
67 status = addJob(jobId, job);
68 } catch (IOException ioe) {
69 LOG.info("Job " + jobId + " submission failed!", ioe);
70 status = job.getStatus();
71 status.setFailureInfo(StringUtils.stringifyException(ioe));
72 failJob(job);
73 throw ioe;
74 }
75
76 return status;
77 }
78 }

  一、首先看看jobs众有无要提交的Job,jobs是一个Map<JobID, JobInProgress>
这里存储着所有已知的Job及其对应的JobInProgress信息。如果已经存在这个Job则直接返回这个Job的状态;如果不存在则利用JobID和jobSubmitDir构造一个JobInfo对象,JobInfo类实现了Writable可以被序列化,而且存储三个字段JobID、user、以及上传资源的目录jobSubmitDir;

  二、创建一个JobInProgress对象,JobInProgress类主要用于监控和跟踪作业运行状态,存在于作业的整个运行过程中,并未调度器提供最底层的调度接口,维护了两部分信息:一种是静态信息这些在作业提交之时就确定好了;另一种是动态的会随着作业的运行而动态变化的。job
= new JobInProgress(this, this.conf, jobInfo, 0,
ts),这里会创建一个JobProfile一直跟踪作业的运行,不管作业作业活着还是死了;

  三、checkMemoryRequirements(job)检查Job是否有无效的内存需求而不能运行,检查JobTracker的配置有无问题,再检查Job的内存配置有无问题;

  四、是否存储作业信息以备恢复。在1.0.0版本中这还没实现(在这就是没存储信息),要存的信息是一个JobInfo对象存储着作业的存储目录、ID以及user。

  五、status = addJob(jobId,
job)这是核心的提交方法。会将此Job放入jobs中,jobs保存着JobTracker所有运行作业的对应关系<jobID,JobInProgress>;然后让所有的JobInProgressListener监听这个Job,根据JobTracker启动流程源码级分析
中可以知道这些JobInProgressListener实例都是通过调度器初始化(JobQueueTaskScheduler.start()方法)时,有俩线程一个是监控Job生命周期的,一个是对新加入的Job初始化的;然后加入监控统计中,返回job状态job.getStatus()。

  这样Job的提交过程就完了,剩下的就是作业的调度分配及监控了。后续再讲吧

参考:

  董西成,《Hadoop技术内幕:深入解析MapReduce架构设计与实现原理》

mapreduce job提交流程源码级分析(三),布布扣,bubuko.com

时间: 2024-10-24 13:18:34

mapreduce job提交流程源码级分析(三)的相关文章

JobTracker启动流程源码级分析

org.apache.hadoop.mapred.JobTracker类是个独立的进程,有自己的main函数.JobTracker是在网络环境中提交及运行MR任务的核心位置. main方法主要代码有两句: 1 //创建jobTracker对象 2 JobTracker tracker = startTracker(new JobConf()); 3 //启动各个服务,包括JT内部一些重要的服务或者线程 4 tracker.offerService(); 一.startTracker(new Jo

TaskTracker启动过程源码级分析

TaskTracker也是作为一个单独的JVM来运行的,其main函数就是TaskTracker的入口函数,当运行start-all.sh时,脚本就是通过SSH运行该函数来启动TaskTracker的. TaskTracker是JobTracker和Task之间的桥梁:一方面,从JobTracker接收并执行各种命令:运行任务.提交任务.杀死任务等:另一方面,将本地节点上各个任务的状态通过心跳周期性汇报给JobTracker.TaskTracker与JobTracker和Task之间采用了RPC

MapReduce job在JobTracker初始化源码级分析

mapreduce job提交流程源码级分析(三)中已经说明用户最终调用JobTracker.submitJob方法来向JobTracker提交作业.而这个方法的核心提交方法是JobTracker.addJob(JobID jobId, JobInProgress job)方法,这个addJob方法会把Job提交到调度器(默认是JobQueueTaskScheduler)的监听器JobQueueJobInProgressListener和EagerTaskInitializationListen

监听器初始化Job、JobTracker相应TaskTracker心跳、调度器分配task源码级分析

JobTracker和TaskTracker分别启动之后(JobTracker启动流程源码级分析,TaskTracker启动过程源码级分析),taskTracker会通过心跳与JobTracker通信,并获取分配它的任务.用户将作业提交到JobTracker之后,放入相应的数据结构中,静等被分配.mapreduce job提交流程源码级分析(三)这篇文章已经分析了用户提交作业的最后步骤,主要是构造作业对应的JobInProgress并加入jobs,告知所有的JobInProgressListen

MapReduce中TextInputFormat分片和读取分片数据源码级分析

InputFormat主要用于描述输入数据的格式(我们只分析新API,即org.apache.hadoop.mapreduce.lib.input.InputFormat),提供以下两个功能: (1)数据切分:按照某个策略将输入数据切分成若干个split,以便确定MapTask个数以及对应的split: (2)为Mapper提供输入数据:读取给定的split的数据,解析成一个个的key/value对,供mapper使用. InputFormat有两个比较重要的方法:(1)List<InputSp

TableInputFormat分片及分片数据读取源码级分析

我们在MapReduce中TextInputFormat分片和读取分片数据源码级分析 这篇中以TextInputFormat为例讲解了InputFormat的分片过程以及RecordReader读取分片数据的过程.接下来咱们分析TableInputFormat的分片信息和数据读取过程. TableInputFormat这是专门处理基于HBase的MapReduce的输入数据的格式类.我们可以看看继承结构:(1)public class TableInputFormat extends Table

嵌入式linux开发uboot移植(二)——uboot工程源码目录分析

嵌入式linux开发uboot移植(二)--uboot工程源码目录分析 本文分析的uboot为uboot_smdkv210,是三星官方发布的基于S5PV210评估开发板对应的uboot. 一.uboot源码目录结构解析 1.cpu 本文件夹下的子文件与处理器相关,每个文件夹代表一种CPU系列.每个子目录中都包括cpu.c.interrupts.c.start.S文件. cpu.c主要用于初始化CPU.设置指令Cache和数据Cache等 interrupt.c主要用于设置系统的各种中断和异常 s

Flume-NG(1.5版本)中SpillableMemoryChannel源码级分析

SpillableMemoryChannel是1.5版本新增的一个channel.这个channel优先将evnet放在内存中,一旦内存达到设定的容量就使用file channel写入磁盘.然后读的时候会按照顺序读取:会通过一个DrainOrderQueue来保证不管是内存中的还是溢出(本文的“溢出”指的是内存channel已满,需要使用file channel存储数据)文件中的顺序.这个Channel是memory channel和file channel的一个折中,虽然在内存中的数据仍然可能

Flume-NG内置计数器(监控)源码级分析

Flume的内置监控怎么整?这个问题有很多人问.目前了解到的信息是可以使用Cloudera Manager.Ganglia有图形的监控工具,以及从浏览器获取json串,或者自定义向其他监控系统汇报信息.那监控的信息是什么呢?就是各个组件的统计信息,比如成功接收的Event数量.成功发送的Event数量,处理的Transaction的数量等等.而且不同的组件有不同的Countor来做统计,目前直到1.5版本仍然只对三大组件:source.sink.channel进行统计分别是SourceCount