Hadoop任务提交分析
分析工具和环境
下载一份hadoop的源码,这里以hadoop-1.1.2为例。本地IDE环境为eclipse,导入整个目录,然后可以在IDE里面看到目录结构了,要分析任务提交过程,需要找到入口代码,很明显,对于熟悉Hadoop应用程序开发的同学来说很容易的知道任务的提交是从job的配置开始的,所以需要这样一份提交Job的代码,在src/examples里面你可以找到一些例子,这里从最简单的wordcount进入去分析提交过程。
核心的一些类
Configuration Configuration类是整个框架的配置信息的集合类,里面主要包含了overlay
和property
和resources
,而主要的方法主要看get
和set
,下面具体看一下这几个变量和方法overlay
: private
,其实overlay是Properties对象,用来存储key-value pair,但是它的主要作用还在于在添加Resource后,保证Resources重载后用户通过set函数设置的属性对会覆盖Resource中相同key的属性对。
Properties overlay;properties
:
用来存储key-value pair finalParameters
:是一个set,用来保存final的配置信息,即不允许覆盖
addResourceObject会添加资源并调用reloadConfiguration,在reloadConfiguration中会清空properties对象和finalParameter,然后下次通过get去读某个属性的时候,会由于properties为null而,重新载入resources,这样资源得到更新,并且,在这之后立马会将overlay里面的内容覆盖进入properties,就是将之前通过set函数设置的属性对去覆盖resource中的默认属性对。
private synchronized void addResourceObject(Object resource) {
resources.add(resource); // add to resources
reloadConfiguration();
}
public synchronized void reloadConfiguration() {
properties = null; // trigger reload
finalParameters.clear(); // clear site-limits
}
public String get(String name) {
return substituteVars(getProps().getProperty(name)); //这里先是getProps得到properties对象然后调用getProperty得到value,然后通过
//substitudeVars函数去替换value中的${..},其实就是引用变量
}
private synchronized Properties getProps() {
if (properties == null) { //properties为null会重新实例化properties,然后载入资源,并将overlay里面的内容去覆盖properties里面相同key
properties = new Properties(); //的内容
loadResources(properties, resources, quietmode);
if (overlay!= null) {
properties.putAll(overlay);
for (Map.Entry<Object,Object> item: overlay.entrySet()) {
updatingResource.put((String) item.getKey(), UNKNOWN_RESOURCE);
}
}
}
return properties;
}
Job
Job类是hadoop任务配置的对象,继承于JobContext,而他们的构造函数都其实是以Configuration对象作为参数,然后提供了一些设置Job配置信息或者获取Job配置信息的一些函数。
上述的配置Job信息的函数并不是直接使用了Configuration对象,而是使用了Configuration对象的子类JobConf,JobContext以conf = JobConf(conf)作为内置成员变量。JobConf是在Confguration的基础上提供了Job配置的一些函数
Job类配置好类以及输入输出等之后,会调用Job.waitforcompletion当然也可以直接调用submit函数,其中waitForCompletion会调用submit然后jobClient.monitorAndPrintJob打印和监视Job的运行进度和情况。
submit函数分析
public void submit() throws IOException, InterruptedException,
ClassNotFoundException {
ensureState(JobState.DEFINE);
setUseNewAPI();
// Connect to the JobTracker and submit the job
connect(); //主要是生成了jobClient对象,而在该对象的初始化过程中,主要生成 了jobSubmitClient的一个动态代理,能够RPC调用
//JobTracker中的一些函数(分布式的情况,本地情况先不做分析)
info = jobClient.submitJobInternal(conf); // 任务提交的过程
super.setJobID(info.getID());
state = JobState.RUNNING;
}
jobClient构造函数分析
//jobClient的构造函数,设置了conf,然后就调用了init函数
public JobClient(JobConf conf) throws IOException {
setConf(conf);
init(conf);
}
/**
* Connect to the default {@link JobTracker}.
* @param conf the job configuration.
* @throws IOException
*/
public void init(JobConf conf) throws IOException {
String tracker = conf.get("mapred.job.tracker", "local");
tasklogtimeout = conf.getInt(
TASKLOG_PULL_TIMEOUT_KEY, DEFAULT_TASKLOG_TIMEOUT);
this.ugi = UserGroupInformation.getCurrentUser();
if ("local".equals(tracker)) {
conf.setNumMapTasks(1);
this.jobSubmitClient = new LocalJobRunner(conf); //本地模式下,会生成一个LocalJobRunner对象
} else {
this.rpcJobSubmitClient =
createRPCProxy(JobTracker.getAddress(conf), conf); // 分布式模式下,会生成了一个RPC代理
this.jobSubmitClient = createProxy(this.rpcJobSubmitClient, conf); //对这个代理再次封装了一下,其实里面主要是增加了retry的一些代码
}
}
jobClient.submitJobInternal函数分析
public
RunningJob submitJobInternal(final JobConf job
) throws FileNotFoundException,
ClassNotFoundException,
InterruptedException,
IOException {
/*
* configure the command line options correctly on the submitting dfs
*/
return ugi.doAs(new PrivilegedExceptionAction<RunningJob>() {
public RunningJob run() throws FileNotFoundException,
ClassNotFoundException,
InterruptedException,
IOException{
JobConf jobCopy = job;
Path jobStagingArea = JobSubmissionFiles.getStagingDir(JobClient.this,
jobCopy); //获得任务提交的目录,其实是作为参数的JobClient去通过RPC从JobTracker那里获取得到jobStagingArea
// ${hadoop.tmp.dir}/mapred/staging/user-name/.staging目录
JobID jobId = jobSubmitClient.getNewJobId(); //从JobTracker那里获得分配的jobid
Path submitJobDir = new Path(jobStagingArea, jobId.toString()); //submitJobDir = ${jobStagingArea}/job-id
jobCopy.set("mapreduce.job.dir", submitJobDir.toString());
JobStatus status = null;
try {
populateTokenCache(jobCopy, jobCopy.getCredentials());
copyAndConfigureFiles(jobCopy, submitJobDir); // 这个函数主要是负责处理通过参数--libjars,--archives,--files引入的jar包,以及要使用分布式缓存的文件和archive,并且将job.jar也拷贝到分布式文件系统上
// get delegation token for the dir
TokenCache.obtainTokensForNamenodes(jobCopy.getCredentials(),
new Path [] {submitJobDir},
jobCopy);
Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);
int reduces = jobCopy.getNumReduceTasks();
InetAddress ip = InetAddress.getLocalHost();
if (ip != null) {
job.setJobSubmitHostAddress(ip.getHostAddress());
job.setJobSubmitHostName(ip.getHostName());
}
JobContext context = new JobContext(jobCopy, jobId);
//检查output,如果已经存在则会抛出异常
// Check the output specification
if (reduces == 0 ? jobCopy.getUseNewMapper() :
jobCopy.getUseNewReducer()) {
org.apache.hadoop.mapreduce.OutputFormat<?,?> output =
ReflectionUtils.newInstance(context.getOutputFormatClass(),
jobCopy);
output.checkOutputSpecs(context);
} else {
jobCopy.getOutputFormat().checkOutputSpecs(fs, jobCopy);
}
jobCopy = (JobConf)context.getConfiguration();
//利用设置好的inputformat的来对输入进行分片,然后将分片写入到job.split中,然后也会将分片的元信息写入到job.splitmetainfo中
//
// Create the splits for the job
FileSystem fs = submitJobDir.getFileSystem(jobCopy);
LOG.debug("Creating splits at " + fs.makeQualified(submitJobDir));
int maps = writeSplits(context, submitJobDir);
jobCopy.setNumMapTasks(maps);
// write "queue admins of the queue to which job is being submitted"
// to job file.
String queue = jobCopy.getQueueName();
AccessControlList acl = jobSubmitClient.getQueueAdmins(queue);
jobCopy.set(QueueManager.toFullPropertyName(queue,
QueueACL.ADMINISTER_JOBS.getAclName()), acl.getACLString());
//将job配置相关的信息写入到job.xml中
// Write job file to JobTracker‘s fs
FSDataOutputStream out =
FileSystem.create(fs, submitJobFile,
new FsPermission(JobSubmissionFiles.JOB_FILE_PERMISSION));
try {
jobCopy.writeXml(out); //正式写入
} finally {
out.close();
}
//
// Now, actually submit the job (using the submit name)
//
printTokens(jobId, jobCopy.getCredentials());
status = jobSubmitClient.submitJob(
jobId, submitJobDir.toString(), jobCopy.getCredentials()); //通过RPC提交任务给JobTracker,让其去管理任务,在JobTrakcker的
//subnitJob里面会实例化JobInProgress并保存在一个jobs的Map<jobID,JobInProgress>的map里面
JobProfile prof = jobSubmitClient.getJobProfile(jobId); //获得任务的概要信息,主要包括了jobname,jobid,提交的队列名,job配置信息的路径,用户,web接口的url等
if (status != null && prof != null) {
return new NetworkedJob(status, prof, jobSubmitClient); //对status,prof,以及jobSubmitClient的封装,主要用来可以查询进度信息和profile信息。而status里面提供了getMapProgress和setMapProgress类型的接口函数
} else {
throw new IOException("Could not launch job");
}
} finally {
if (status == null) {
LOG.info("Cleaning up the staging area " + submitJobDir);
if (fs != null && submitJobDir != null)
fs.delete(submitJobDir, true);
}
}
}
});
}
JobClient
从上面的一些代码也可以看出真正提交Job是在JobClient做的,它主要处理了一些参数像libjars files archives把他们指定的文件拷贝到HDFS的submitJobDir,然后把JOb的配置信息job.xml , 分片信息以及job.jar也都拷贝到HDFS上。然后利用jobClient里面的一个代理类jobSubmitClient来提交任务,其实就是一个RPC请求,向JObTracker发送请求,然后返回的是一个JobStatus类型的对象,该对象可以查询map reduce进度等。
下一篇讲一下JObTracker相关