上篇文章Task运行过程1讲到脚本会运行org.apache.hadoop.mapred.Child类。。。
Child类包含一个入口主方法main,在运行的时候需要传递对应的参数,来运行MapTask和ReduceTask,通过命令行输入如下5个参数:
host:表示TaskTracker节点的主机名称
port:表示TaskTracker节点RPc端口号
taskID:表示启动的Task对应的TaskAttemptID,标识一个Task的一个运行实例
log location:表示该Task运行实例对应的日志文件的路径
JVM ID:表示该Task实例对应的JVMId信息,包括JobID、Task类型(MapTask/ReduceTask)、JVM编号(标识该JVM实例对应的id)
代码如下:
public static void main(String[] args) throws Throwable {
LOG.debug("Child starting");
final JobConf defaultConf = new JobConf();
String host = args[0];
int port = Integer.parseInt(args[1]);
final InetSocketAddress address = NetUtils.makeSocketAddr(host, port);
final TaskAttemptID firstTaskid = TaskAttemptID.forName(args[2]);
final String logLocation = args[3];
final int SLEEP_LONGER_COUNT = 5;
int jvmIdInt = Integer.parseInt(args[4]);
JVMId jvmId = new JVMId(firstTaskid.getJobID(),firstTaskid.isMap(),jvmIdInt);
String prefix = firstTaskid.isMap() ? "MapTask" : "ReduceTask";
cwd = System.getenv().get(TaskRunner.HADOOP_WORK_DIR);
if (cwd == null) {
throw new IOException("Environment variable " +
TaskRunner.HADOOP_WORK_DIR + " is not set");
}
// file name is passed thru env
String jobTokenFile =
System.getenv().get(UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION);
Credentials credentials =
TokenCache.loadTokens(jobTokenFile, defaultConf);
LOG.debug("loading token. # keys =" +credentials.numberOfSecretKeys() +
"; from file=" + jobTokenFile);
Token<JobTokenIdentifier> jt = TokenCache.getJobToken(credentials);
SecurityUtil.setTokenService(jt, address);
UserGroupInformation current = UserGroupInformation.getCurrentUser();
current.addToken(jt);
UserGroupInformation taskOwner
= UserGroupInformation.createRemoteUser(firstTaskid.getJobID().toString());
taskOwner.addToken(jt);
// Set the credentials
defaultConf.setCredentials(credentials);
final TaskUmbilicalProtocol umbilical =
taskOwner.doAs(new PrivilegedExceptionAction<TaskUmbilicalProtocol>() {
@Override
public TaskUmbilicalProtocol run() throws Exception {
return (TaskUmbilicalProtocol)RPC.getProxy(TaskUmbilicalProtocol.class,
TaskUmbilicalProtocol.versionID,
address,
defaultConf);
}
});
int numTasksToExecute = -1; //-1 signifies "no limit"
int numTasksExecuted = 0;
Runtime.getRuntime().addShutdownHook(new Thread() {
public void run() {
try {
if (taskid != null) {
TaskLog.syncLogs
(logLocation, taskid, isCleanup, currentJobSegmented);
}
} catch (Throwable throwable) {
}
}
});
Thread t = new Thread() {
public void run() {
//every so often wake up and syncLogs so that we can track
//logs of the currently running task
while (true) {
try {
Thread.sleep(5000);
if (taskid != null) {
TaskLog.syncLogs
(logLocation, taskid, isCleanup, currentJobSegmented);
}
} catch (InterruptedException ie) {
} catch (IOException iee) {
LOG.error("Error in syncLogs: " + iee);
System.exit(-1);
}
}
}
};
t.setName("Thread for syncLogs");
t.setDaemon(true);
t.start();
String pid = "";
if (!Shell.WINDOWS) {
pid = System.getenv().get("JVM_PID");
}
JvmContext context = new JvmContext(jvmId, pid);
int idleLoopCount = 0;
Task task = null;
UserGroupInformation childUGI = null;
final JvmContext jvmContext = context;
try {
while (true) {
taskid = null;
currentJobSegmented = true;
JvmTask myTask = umbilical.getTask(context);
if (myTask.shouldDie()) {
break;
} else {
if (myTask.getTask() == null) {
taskid = null;
currentJobSegmented = true;
if (++idleLoopCount >= SLEEP_LONGER_COUNT) {
//we sleep for a bigger interval when we don‘t receive
//tasks for a while
Thread.sleep(1500);
} else {
Thread.sleep(500);
}
continue;
}
}
idleLoopCount = 0;
task = myTask.getTask();
task.setJvmContext(jvmContext);
taskid = task.getTaskID();
// Create the JobConf and determine if this job gets segmented task logs
final JobConf job = new JobConf(task.getJobFile());
currentJobSegmented = logIsSegmented(job);
isCleanup = task.isTaskCleanupTask();
// reset the statistics for the task
FileSystem.clearStatistics();
// Set credentials
job.setCredentials(defaultConf.getCredentials());
//forcefully turn off caching for localfs. All cached FileSystems
//are closed during the JVM shutdown. We do certain
//localfs operations in the shutdown hook, and we don‘t
//want the localfs to be "closed"
job.setBoolean("fs.file.impl.disable.cache", false);
// set the jobTokenFile into task
task.setJobTokenSecret(JobTokenSecretManager.
createSecretKey(jt.getPassword()));
// setup the child‘s mapred-local-dir. The child is now sandboxed and
// can only see files down and under attemtdir only.
TaskRunner.setupChildMapredLocalDirs(task, job);
// setup the child‘s attempt directories
localizeTask(task, job, logLocation);
//setupWorkDir actually sets up the symlinks for the distributed
//cache. After a task exits we wipe the workdir clean, and hence
//the symlinks have to be rebuilt.
TaskRunner.setupWorkDir(job, new File(cwd));
//create the index file so that the log files
//are viewable immediately
TaskLog.syncLogs
(logLocation, taskid, isCleanup, logIsSegmented(job));
numTasksToExecute = job.getNumTasksToExecutePerJvm();
assert(numTasksToExecute != 0);
task.setConf(job);
// Initiate Java VM metrics
initMetrics(prefix, jvmId.toString(), job.getSessionId());
LOG.debug("Creating remote user to execute task: " + job.get("user.name"));
childUGI = UserGroupInformation.createRemoteUser(job.get("user.name"));
// Add tokens to new user so that it may execute its task correctly.
for(Token<?> token : UserGroupInformation.getCurrentUser().getTokens()) {
childUGI.addToken(token);
}
// Create a final reference to the task for the doAs block
final Task taskFinal = task;
childUGI.doAs(new PrivilegedExceptionAction<Object>() {
@Override
public Object run() throws Exception {
try {
// use job-specified working directory
FileSystem.get(job).setWorkingDirectory(job.getWorkingDirectory());
taskFinal.run(job, umbilical); // run the task
} finally {
TaskLog.syncLogs
(logLocation, taskid, isCleanup, logIsSegmented(job));
TaskLogsTruncater trunc = new TaskLogsTruncater(defaultConf);
trunc.truncateLogs(new JVMInfo(
TaskLog.getAttemptDir(taskFinal.getTaskID(),
taskFinal.isTaskCleanupTask()), Arrays.asList(taskFinal)));
}
return null;
}
});
if (numTasksToExecute > 0 && ++numTasksExecuted == numTasksToExecute) {
break;
}
}
} catch (FSError e) {
LOG.fatal("FSError from child", e);
umbilical.fsError(taskid, e.getMessage(), jvmContext);
} catch (Exception exception) {
LOG.warn("Error running child", exception);
try {
if (task != null) {
// do cleanup for the task
if(childUGI == null) {
task.taskCleanup(umbilical);
} else {
final Task taskFinal = task;
childUGI.doAs(new PrivilegedExceptionAction<Object>() {
@Override
public Object run() throws Exception {
taskFinal.taskCleanup(umbilical);
return null;
}
});
}
}
} catch (Exception e) {
LOG.info("Error cleaning up", e);
}
// Report back any failures, for diagnostic purposes
ByteArrayOutputStream baos = new ByteArrayOutputStream();
exception.printStackTrace(new PrintStream(baos));
if (taskid != null) {
umbilical.reportDiagnosticInfo(taskid, baos.toString(), jvmContext);
}
} catch (Throwable throwable) {
LOG.fatal("Error running child : "
+ StringUtils.stringifyException(throwable));
if (taskid != null) {
Throwable tCause = throwable.getCause();
String cause = tCause == null
? throwable.getMessage()
: StringUtils.stringifyException(tCause);
umbilical.fatalError(taskid, cause, jvmContext);
}
} finally {
RPC.stopProxy(umbilical);
shutdownMetrics();
// Shutting down log4j of the child-vm...
// This assumes that on return from Task.run()
// there is no more logging done.
LogManager.shutdown();
}
}
Child类的核心代码框架如下:
public static void main(String args[]){
//创建RPC Client,启动日志同步线程
...
while(true){//不断询问TaskTracker,以获取新任务
JvmTask myTask = umbilical.getTask(context) ;//获取新任务
if(myTask.shouldDie()){//JVM所属作业不存在或者被杀死
break ;
}else{
if(myTask.getTask() == null){//暂时没有新任务
//等待一段时间继续询问TaskTracker
...
continue ;
}
}
//有新任务,进行任务本地化
...
taskFinal.run(job,umbilical) ;//启动该任务
...
//如果JVM复用次数达到上限数目,则直接退出
if(numTasksToExecute > 0 && ++numTaskExecuted == numTasksToExecute){
break ;
}
}
}
然后会调用org.apache.hadoop.mapred.Task.java中的run方法,Task是一个抽象类,两个子类分别是MapTask和ReduceTask。先关注MapTask的run方法。
@Override
public void run(final JobConf job, final TaskUmbilicalProtocol umbilical)
throws IOException, ClassNotFoundException, InterruptedException {
this.umbilical = umbilical;
// start thread that will handle communication with parent
TaskReporter reporter = new TaskReporter(getProgress(), umbilical,
jvmContext);
reporter.startCommunicationThread();
boolean useNewApi = job.getUseNewMapper();
initialize(job, getJobID(), reporter, useNewApi);
// check if it is a cleanupJobTask
if (jobCleanup) {
runJobCleanupTask(umbilical, reporter);
return;
}
if (jobSetup) {
runJobSetupTask(umbilical, reporter);
return;
}
if (taskCleanup) {
runTaskCleanupTask(umbilical, reporter);
return;
}
if (useNewApi) {
runNewMapper(job, splitMetaInfo, umbilical, reporter);
} else {
runOldMapper(job, splitMetaInfo, umbilical, reporter);
}
done(umbilical, reporter);
}
runJobCleanupTask():清理Job对应的相关目录和文件
runJobSetupTask():创建Job运行所需要的相关目录和文件
runTaskCleanupTask():清理一个Task对应的工作目录下与Task相关的目录或文件
runNewMapper()/runOldMapper():调用用户编写的MapReduce程序中的Mapper中的处理逻辑
在runNewMapper()中,有如下代码
// make a mapper
org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE> mapper =
(org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>)
ReflectionUtils.newInstance(taskContext.getMapperClass(), job);
通过反射,执行用户编写的mapper函数。。。。
2、Map Task的输入从哪里来
这首先要了解两个文件job.split和job.splitmetainfo
a)org.apache.hadoop.mapreduce.Job.java中的submit()提交job之后,会调用org.apache.hadoop.mapred.JobClient中的submitJobInternal();
b)在submitJobInternal函数中会给job创建分片
int maps = writeSplits(job,submitJobDir) ;
在该函数中会调用writeNewSplits() ;
c)在org.apache.hadoop.mapred.JobClient的writeNewSplits函数中,通过反射获得InputFormat对象,会调用该对象中的getSplits()方法来进行分片,从而得到InputSplit[]数组,然后会通过JobSplitWriter.createSplitFiles方法将数组内容写出,writeNewSplits方法返回的是分片数目,决定了会创建多少个map task
d)在org.apache.hadoop.mapreduce.split.JobSplitWriter.java的JobSplitWriter.createSplitFiles方法中,会打开一个输出流out,输出文件名是({jobSubmitDir}/job.splitInfo),其中调用writeNewSplits()来完成写出操作, 与此同时,该函数会返回一个SplitMetaInfo的数组。
e)最后调用org.apache.hadoop.mapreduce.split.JobSplitWriter.java的writeJobSplitMetaInfo()方法,将上一步中的SplitMetaInfo数组写入到另一个文件中,文件名是(${jobSubmitDir}/job.splitInfo)
上述步骤中最终会输出两个文件job.split和job.splitmetainfo
如下图所示,job.split存储了所有划分出来的InputSplit,而每个InputSplit记录如下信息:
1、该Split的类型(ClassName, mostly org.apache.hadoop.mapreduce.lib.input.FileSplit)
2、该Split所属文件的路径(FilePath)
3、该Split在所属文件中的起始位置(FileOffset)
4、该Split的字节长度(Length)
job.splitmetainfo存储了有关InputSplit的元数据:
1、该Split在哪些Node上是local data(Location)
2、该Split对应的InputSplit在job.split文件中的位置(SplitFileOffset)
3、该Split的字节长度(Length, the same as that in job.split)
到此为之,将分片信息记录完成,写入到HDFS中相应的文件中。
对于job.splitmetainfo、job.split文件使用,splitmetainfo只是保存了split的元数据信息,如果想读取数据,还是要用到job.split
调度器调用JobTracker.initJob()函数对新作业进行初始化,会调用org.apache.hadoop.mapred.JobInProgress.java中的initTasks()方法
//
// read input splits and create a map per a split
//
TaskSplitMetaInfo[] splits = createSplits(jobId);
在createSplits函数中会通过SplitMetaInfoReader.readSplitMetaInfo函数从job.splitmetainfo文件中读出相应的信息,首先会先验证META_SPLIT_VERSION和读取numSplits,然后会依次读取出每个splitMetaInfo,根据splitMetaInfo再从job.split中读取相应数据,构建出TaskSplitIndex对象,然后得到TaskSplitMetaInfo对象,最后返回TaskSplitMetaInfo[]数组
TaskSplitMetaInfo:用于保存InputSplit元信息的数据结构,包括以下三项内容:
private TaskSplitIndex splitIndex ;//保存读取job.split文件中的split内容
private long inputDataLength ; //InputSplit的数据长度
private String[] locations ; //InputSplit所在的host列表
这些信息是在作业初始化时,JobTracker从文件job.splitmetainfo中获取。其中,host列表信息是任务调度器判断任务是否具有本地性的最重要因素,而splitIndex信息保存了新任务需处理的数据位置信息在文件job.split中的索引,TaskTracker(从JobTracker端)收到该信息后,便可以从job.split文件中读取InputSplit信息,进而运行一个任务。
TaskSplitIndex:JobTracker向TaskTracker分配新任务时,TaskSplitIndex用于指定新任务待处理数据位置信息在文件job.split中的索引,主要包括两项内容:
private String splitLocation ;//job.split文件的位置(目录)
private long startOffset ; //InputSplit信息在job.split文件中的位置
在org.apache.hadoop.mapred.MapTask.java中的runNewMapper()/runOldMapper()方法最终会调用用户编写的MapReduce程序中的Mapper中的处理逻辑,它们所传的参数中都含有TaskSplitIndex对象,也就可以找到job.split文件读取数据。。。
对于job.split、job.splitmetainfo、TaskSplitMetaInfo、TaskSplitIndex在生成它们时保存的信息
job.split内容
FileSplit={file=hdfs://server1:9000/user/admin/in/yellow.txt,length=67108864,start=0}
FileSplit={file=hdfs://server1:9000/user/admin/in/yellow.txt,length=67108864,start= 67108864}
FileSplit={file=hdfs://server1:9000/user/admin/in/yellow.txt,length= 66782272,start=134217728}
job.splitmetainfo内容
JobSplit$SplitMetaInfo={data-size : 67108864,start-offset: 7,locations:[server3, server2]
}
JobSplit$SplitMetaInfo={data-size : 67108864,start-offset: 116,locations:[server3, server2]
}
JobSplit$SplitMetaInfo={data-size : 66782272,start-offset: 225,locations:[server3, server2]
对比job.split和job.splitmetainfo内容:
job.splitmetainfo的data-size即job.split的length,
job.splitmetainfo的locations即所对应的job.split的数据所在块的主机列表,
job.splitmetainfo的start-offset意思是job.split中某条FileSplit记录的起始地址。
TaskSplitMetaInfo、TaskSplitIndex的内容
TaskSplitMetaInfo[0]={inputDataLength=67108864,locations=[server3,server2], splitIndex=JobSplit$TaskSplitIndex{splitLocation="hdfs://server1:9000/tmp/hadoop-admin/mapred/staging/admin/.staging/job_201404200521_0001/job.split", startOffset=7}
}
TaskSplitMetaInfo[1]={inputDataLength=67108864,locations=[server3,server2], splitIndex=JobSplit$TaskSplitIndex{splitLocation="hdfs://server1:9000/tmp/hadoop-admin/mapred/staging/admin/.staging/job_201404200521_0001/job.split", startOffset=116}
}
TaskSplitMetaInfo[2]={inputDataLength=66782272, locations=[server3,server2], splitIndex=JobSplit$TaskSplitIndex{splitLocation="hdfs://server1:9000/tmp/hadoop-admin/mapred/staging/admin/.staging/job_201404200521_0001/job.split", startOffset=225}
好了,现在我们已经知道map函数如何输入了,接下来再思考一个问题,在org.apache.hadoop.mapred.JobClient.java中的getSplits方法中会得到分片的主机列表,如果一个分片在一个block内,直接返回该block的主机列表即可,但是存在一种情况,该分片跨越多个block,这个时候如何返回主机列表呢?
可以参考文章http://blog.csdn.net/chlaws/article/details/22900141
MapReduce中如何处理跨InputSplit的行,可以参考这篇文章
http://my.oschina.net/xiangchen/blog/99653