Task运行过程分析2

上篇文章Task运行过程1讲到脚本会运行org.apache.hadoop.mapred.Child类。。。

Child类包含一个入口主方法main,在运行的时候需要传递对应的参数,来运行MapTask和ReduceTask,通过命令行输入如下5个参数:

host:表示TaskTracker节点的主机名称

port:表示TaskTracker节点RPc端口号

taskID:表示启动的Task对应的TaskAttemptID,标识一个Task的一个运行实例

log location:表示该Task运行实例对应的日志文件的路径

JVM ID:表示该Task实例对应的JVMId信息,包括JobID、Task类型(MapTask/ReduceTask)、JVM编号(标识该JVM实例对应的id)

代码如下:

  public static void main(String[] args) throws Throwable {
    LOG.debug("Child starting");

    final JobConf defaultConf = new JobConf();
    String host = args[0];
    int port = Integer.parseInt(args[1]);
    final InetSocketAddress address = NetUtils.makeSocketAddr(host, port);
    final TaskAttemptID firstTaskid = TaskAttemptID.forName(args[2]);
    final String logLocation = args[3];
    final int SLEEP_LONGER_COUNT = 5;
    int jvmIdInt = Integer.parseInt(args[4]);
    JVMId jvmId = new JVMId(firstTaskid.getJobID(),firstTaskid.isMap(),jvmIdInt);
    String prefix = firstTaskid.isMap() ? "MapTask" : "ReduceTask";

    cwd = System.getenv().get(TaskRunner.HADOOP_WORK_DIR);
    if (cwd == null) {
      throw new IOException("Environment variable " +
                             TaskRunner.HADOOP_WORK_DIR + " is not set");
    }

    // file name is passed thru env
    String jobTokenFile =
      System.getenv().get(UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION);
    Credentials credentials =
      TokenCache.loadTokens(jobTokenFile, defaultConf);
    LOG.debug("loading token. # keys =" +credentials.numberOfSecretKeys() +
        "; from file=" + jobTokenFile);

    Token<JobTokenIdentifier> jt = TokenCache.getJobToken(credentials);
    SecurityUtil.setTokenService(jt, address);
    UserGroupInformation current = UserGroupInformation.getCurrentUser();
    current.addToken(jt);

    UserGroupInformation taskOwner
     = UserGroupInformation.createRemoteUser(firstTaskid.getJobID().toString());
    taskOwner.addToken(jt);

    // Set the credentials
    defaultConf.setCredentials(credentials);

    final TaskUmbilicalProtocol umbilical =
      taskOwner.doAs(new PrivilegedExceptionAction<TaskUmbilicalProtocol>() {
        @Override
        public TaskUmbilicalProtocol run() throws Exception {
          return (TaskUmbilicalProtocol)RPC.getProxy(TaskUmbilicalProtocol.class,
              TaskUmbilicalProtocol.versionID,
              address,
              defaultConf);
        }
    });

    int numTasksToExecute = -1; //-1 signifies "no limit"
    int numTasksExecuted = 0;
    Runtime.getRuntime().addShutdownHook(new Thread() {
      public void run() {
        try {
          if (taskid != null) {
            TaskLog.syncLogs
              (logLocation, taskid, isCleanup, currentJobSegmented);
          }
        } catch (Throwable throwable) {
        }
      }
    });
    Thread t = new Thread() {
      public void run() {
        //every so often wake up and syncLogs so that we can track
        //logs of the currently running task
        while (true) {
          try {
            Thread.sleep(5000);
            if (taskid != null) {
              TaskLog.syncLogs
                (logLocation, taskid, isCleanup, currentJobSegmented);
            }
          } catch (InterruptedException ie) {
          } catch (IOException iee) {
            LOG.error("Error in syncLogs: " + iee);
            System.exit(-1);
          }
        }
      }
    };
    t.setName("Thread for syncLogs");
    t.setDaemon(true);
    t.start();

    String pid = "";
    if (!Shell.WINDOWS) {
      pid = System.getenv().get("JVM_PID");
    }
    JvmContext context = new JvmContext(jvmId, pid);
    int idleLoopCount = 0;
    Task task = null;

    UserGroupInformation childUGI = null;

    final JvmContext jvmContext = context;
    try {
      while (true) {
        taskid = null;
        currentJobSegmented = true;

        JvmTask myTask = umbilical.getTask(context);
        if (myTask.shouldDie()) {
          break;
        } else {
          if (myTask.getTask() == null) {
            taskid = null;
            currentJobSegmented = true;

            if (++idleLoopCount >= SLEEP_LONGER_COUNT) {
              //we sleep for a bigger interval when we don‘t receive
              //tasks for a while
              Thread.sleep(1500);
            } else {
              Thread.sleep(500);
            }
            continue;
          }
        }
        idleLoopCount = 0;
        task = myTask.getTask();
        task.setJvmContext(jvmContext);
        taskid = task.getTaskID();

        // Create the JobConf and determine if this job gets segmented task logs
        final JobConf job = new JobConf(task.getJobFile());
        currentJobSegmented = logIsSegmented(job);

        isCleanup = task.isTaskCleanupTask();
        // reset the statistics for the task
        FileSystem.clearStatistics();

        // Set credentials
        job.setCredentials(defaultConf.getCredentials());
        //forcefully turn off caching for localfs. All cached FileSystems
        //are closed during the JVM shutdown. We do certain
        //localfs operations in the shutdown hook, and we don‘t
        //want the localfs to be "closed"
        job.setBoolean("fs.file.impl.disable.cache", false);

        // set the jobTokenFile into task
        task.setJobTokenSecret(JobTokenSecretManager.
            createSecretKey(jt.getPassword()));

        // setup the child‘s mapred-local-dir. The child is now sandboxed and
        // can only see files down and under attemtdir only.
        TaskRunner.setupChildMapredLocalDirs(task, job);

        // setup the child‘s attempt directories
        localizeTask(task, job, logLocation);

        //setupWorkDir actually sets up the symlinks for the distributed
        //cache. After a task exits we wipe the workdir clean, and hence
        //the symlinks have to be rebuilt.
        TaskRunner.setupWorkDir(job, new File(cwd));

        //create the index file so that the log files
        //are viewable immediately
        TaskLog.syncLogs
          (logLocation, taskid, isCleanup, logIsSegmented(job));

        numTasksToExecute = job.getNumTasksToExecutePerJvm();
        assert(numTasksToExecute != 0);

        task.setConf(job);

        // Initiate Java VM metrics
        initMetrics(prefix, jvmId.toString(), job.getSessionId());

        LOG.debug("Creating remote user to execute task: " + job.get("user.name"));
        childUGI = UserGroupInformation.createRemoteUser(job.get("user.name"));
        // Add tokens to new user so that it may execute its task correctly.
        for(Token<?> token : UserGroupInformation.getCurrentUser().getTokens()) {
          childUGI.addToken(token);
        }

        // Create a final reference to the task for the doAs block
        final Task taskFinal = task;
        childUGI.doAs(new PrivilegedExceptionAction<Object>() {
          @Override
          public Object run() throws Exception {
            try {
              // use job-specified working directory
              FileSystem.get(job).setWorkingDirectory(job.getWorkingDirectory());
              taskFinal.run(job, umbilical);        // run the task
            } finally {
              TaskLog.syncLogs
                (logLocation, taskid, isCleanup, logIsSegmented(job));
              TaskLogsTruncater trunc = new TaskLogsTruncater(defaultConf);
              trunc.truncateLogs(new JVMInfo(
                  TaskLog.getAttemptDir(taskFinal.getTaskID(),
                    taskFinal.isTaskCleanupTask()), Arrays.asList(taskFinal)));
            }

            return null;
          }
        });
        if (numTasksToExecute > 0 && ++numTasksExecuted == numTasksToExecute) {
          break;
        }
      }
    } catch (FSError e) {
      LOG.fatal("FSError from child", e);
      umbilical.fsError(taskid, e.getMessage(), jvmContext);
    } catch (Exception exception) {
      LOG.warn("Error running child", exception);
      try {
        if (task != null) {
          // do cleanup for the task
          if(childUGI == null) {
            task.taskCleanup(umbilical);
          } else {
            final Task taskFinal = task;
            childUGI.doAs(new PrivilegedExceptionAction<Object>() {
              @Override
              public Object run() throws Exception {
                taskFinal.taskCleanup(umbilical);
                return null;
              }
            });
          }
        }
      } catch (Exception e) {
        LOG.info("Error cleaning up", e);
      }
      // Report back any failures, for diagnostic purposes
      ByteArrayOutputStream baos = new ByteArrayOutputStream();
      exception.printStackTrace(new PrintStream(baos));
      if (taskid != null) {
        umbilical.reportDiagnosticInfo(taskid, baos.toString(), jvmContext);
      }
    } catch (Throwable throwable) {
      LOG.fatal("Error running child : "
                + StringUtils.stringifyException(throwable));
      if (taskid != null) {
        Throwable tCause = throwable.getCause();
        String cause = tCause == null
                       ? throwable.getMessage()
                       : StringUtils.stringifyException(tCause);
        umbilical.fatalError(taskid, cause, jvmContext);
      }
    } finally {
      RPC.stopProxy(umbilical);
      shutdownMetrics();
      // Shutting down log4j of the child-vm...
      // This assumes that on return from Task.run()
      // there is no more logging done.
      LogManager.shutdown();
    }
  }

Child类的核心代码框架如下:

public static void main(String args[]){
    //创建RPC Client,启动日志同步线程
    ...
    while(true){//不断询问TaskTracker,以获取新任务
        JvmTask myTask = umbilical.getTask(context) ;//获取新任务
        if(myTask.shouldDie()){//JVM所属作业不存在或者被杀死
            break ;
        }else{
            if(myTask.getTask() == null){//暂时没有新任务
                //等待一段时间继续询问TaskTracker
                ...
                continue ;
            }
        }
        //有新任务,进行任务本地化
        ...
        taskFinal.run(job,umbilical) ;//启动该任务
        ...
        //如果JVM复用次数达到上限数目,则直接退出
        if(numTasksToExecute > 0 && ++numTaskExecuted == numTasksToExecute){
            break ;
        }
    }
}

然后会调用org.apache.hadoop.mapred.Task.java中的run方法,Task是一个抽象类,两个子类分别是MapTask和ReduceTask。先关注MapTask的run方法。

  @Override
  public void run(final JobConf job, final TaskUmbilicalProtocol umbilical)
    throws IOException, ClassNotFoundException, InterruptedException {
    this.umbilical = umbilical;

    // start thread that will handle communication with parent
    TaskReporter reporter = new TaskReporter(getProgress(), umbilical,
        jvmContext);
    reporter.startCommunicationThread();
    boolean useNewApi = job.getUseNewMapper();
    initialize(job, getJobID(), reporter, useNewApi);

    // check if it is a cleanupJobTask
    if (jobCleanup) {
      runJobCleanupTask(umbilical, reporter);
      return;
    }
    if (jobSetup) {
      runJobSetupTask(umbilical, reporter);
      return;
    }
    if (taskCleanup) {
      runTaskCleanupTask(umbilical, reporter);
      return;
    }

    if (useNewApi) {
      runNewMapper(job, splitMetaInfo, umbilical, reporter);
    } else {
      runOldMapper(job, splitMetaInfo, umbilical, reporter);
    }
    done(umbilical, reporter);
  }

runJobCleanupTask():清理Job对应的相关目录和文件

runJobSetupTask():创建Job运行所需要的相关目录和文件

runTaskCleanupTask():清理一个Task对应的工作目录下与Task相关的目录或文件

runNewMapper()/runOldMapper():调用用户编写的MapReduce程序中的Mapper中的处理逻辑

在runNewMapper()中,有如下代码

    // make a mapper
    org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE> mapper =
      (org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>)
        ReflectionUtils.newInstance(taskContext.getMapperClass(), job);

通过反射,执行用户编写的mapper函数。。。。

2、Map Task的输入从哪里来

这首先要了解两个文件job.split和job.splitmetainfo

a)org.apache.hadoop.mapreduce.Job.java中的submit()提交job之后,会调用org.apache.hadoop.mapred.JobClient中的submitJobInternal();

b)在submitJobInternal函数中会给job创建分片

int maps = writeSplits(job,submitJobDir) ;

在该函数中会调用writeNewSplits() ;

c)在org.apache.hadoop.mapred.JobClient的writeNewSplits函数中,通过反射获得InputFormat对象,会调用该对象中的getSplits()方法来进行分片,从而得到InputSplit[]数组,然后会通过JobSplitWriter.createSplitFiles方法将数组内容写出,writeNewSplits方法返回的是分片数目,决定了会创建多少个map task

d)在org.apache.hadoop.mapreduce.split.JobSplitWriter.java的JobSplitWriter.createSplitFiles方法中,会打开一个输出流out,输出文件名是({jobSubmitDir}/job.splitInfo),其中调用writeNewSplits()来完成写出操作, 与此同时,该函数会返回一个SplitMetaInfo的数组。

e)最后调用org.apache.hadoop.mapreduce.split.JobSplitWriter.java的writeJobSplitMetaInfo()方法,将上一步中的SplitMetaInfo数组写入到另一个文件中,文件名是(${jobSubmitDir}/job.splitInfo)

上述步骤中最终会输出两个文件job.split和job.splitmetainfo

如下图所示,job.split存储了所有划分出来的InputSplit,而每个InputSplit记录如下信息:

1、该Split的类型(ClassName, mostly org.apache.hadoop.mapreduce.lib.input.FileSplit)

2、该Split所属文件的路径(FilePath)

3、该Split在所属文件中的起始位置(FileOffset)

4、该Split的字节长度(Length)

job.splitmetainfo存储了有关InputSplit的元数据:

1、该Split在哪些Node上是local data(Location)

2、该Split对应的InputSplit在job.split文件中的位置(SplitFileOffset)

3、该Split的字节长度(Length, the same as that in job.split)

到此为之,将分片信息记录完成,写入到HDFS中相应的文件中。

对于job.splitmetainfo、job.split文件使用,splitmetainfo只是保存了split的元数据信息,如果想读取数据,还是要用到job.split

调度器调用JobTracker.initJob()函数对新作业进行初始化,会调用org.apache.hadoop.mapred.JobInProgress.java中的initTasks()方法

  //
    // read input splits and create a map per a split
    //
    TaskSplitMetaInfo[] splits = createSplits(jobId);

在createSplits函数中会通过SplitMetaInfoReader.readSplitMetaInfo函数从job.splitmetainfo文件中读出相应的信息,首先会先验证META_SPLIT_VERSION和读取numSplits,然后会依次读取出每个splitMetaInfo,根据splitMetaInfo再从job.split中读取相应数据,构建出TaskSplitIndex对象,然后得到TaskSplitMetaInfo对象,最后返回TaskSplitMetaInfo[]数组

TaskSplitMetaInfo:用于保存InputSplit元信息的数据结构,包括以下三项内容:

private TaskSplitIndex splitIndex ;//保存读取job.split文件中的split内容
private long inputDataLength ; //InputSplit的数据长度
private String[] locations ; //InputSplit所在的host列表

这些信息是在作业初始化时,JobTracker从文件job.splitmetainfo中获取。其中,host列表信息是任务调度器判断任务是否具有本地性的最重要因素,而splitIndex信息保存了新任务需处理的数据位置信息在文件job.split中的索引,TaskTracker(从JobTracker端)收到该信息后,便可以从job.split文件中读取InputSplit信息,进而运行一个任务。

TaskSplitIndex:JobTracker向TaskTracker分配新任务时,TaskSplitIndex用于指定新任务待处理数据位置信息在文件job.split中的索引,主要包括两项内容:

private String splitLocation ;//job.split文件的位置(目录)
private long startOffset ; //InputSplit信息在job.split文件中的位置

在org.apache.hadoop.mapred.MapTask.java中的runNewMapper()/runOldMapper()方法最终会调用用户编写的MapReduce程序中的Mapper中的处理逻辑,它们所传的参数中都含有TaskSplitIndex对象,也就可以找到job.split文件读取数据。。。

对于job.split、job.splitmetainfo、TaskSplitMetaInfo、TaskSplitIndex在生成它们时保存的信息

job.split内容

FileSplit={file=hdfs://server1:9000/user/admin/in/yellow.txt,length=67108864,start=0}

FileSplit={file=hdfs://server1:9000/user/admin/in/yellow.txt,length=67108864,start= 67108864}

FileSplit={file=hdfs://server1:9000/user/admin/in/yellow.txt,length= 66782272,start=134217728}

job.splitmetainfo内容

JobSplit$SplitMetaInfo={data-size : 67108864,start-offset: 7,locations:[server3, server2]

}

JobSplit$SplitMetaInfo={data-size : 67108864,start-offset: 116,locations:[server3, server2]

}

JobSplit$SplitMetaInfo={data-size : 66782272,start-offset: 225,locations:[server3, server2]

对比job.split和job.splitmetainfo内容:

job.splitmetainfo的data-size即job.split的length,

job.splitmetainfo的locations即所对应的job.split的数据所在块的主机列表,

job.splitmetainfo的start-offset意思是job.split中某条FileSplit记录的起始地址。

TaskSplitMetaInfo、TaskSplitIndex的内容

TaskSplitMetaInfo[0]={inputDataLength=67108864,locations=[server3,server2], splitIndex=JobSplit$TaskSplitIndex{splitLocation="hdfs://server1:9000/tmp/hadoop-admin/mapred/staging/admin/.staging/job_201404200521_0001/job.split", startOffset=7}

}

TaskSplitMetaInfo[1]={inputDataLength=67108864,locations=[server3,server2], splitIndex=JobSplit$TaskSplitIndex{splitLocation="hdfs://server1:9000/tmp/hadoop-admin/mapred/staging/admin/.staging/job_201404200521_0001/job.split", startOffset=116}

}

TaskSplitMetaInfo[2]={inputDataLength=66782272, locations=[server3,server2], splitIndex=JobSplit$TaskSplitIndex{splitLocation="hdfs://server1:9000/tmp/hadoop-admin/mapred/staging/admin/.staging/job_201404200521_0001/job.split", startOffset=225}

好了,现在我们已经知道map函数如何输入了,接下来再思考一个问题,在org.apache.hadoop.mapred.JobClient.java中的getSplits方法中会得到分片的主机列表,如果一个分片在一个block内,直接返回该block的主机列表即可,但是存在一种情况,该分片跨越多个block,这个时候如何返回主机列表呢?

可以参考文章http://blog.csdn.net/chlaws/article/details/22900141

MapReduce中如何处理跨InputSplit的行,可以参考这篇文章

http://my.oschina.net/xiangchen/blog/99653

时间: 2024-07-29 23:43:52

Task运行过程分析2的相关文章

Task运行过程分析3——Map Task内部实现

Map Task内部实现 在Task运行过程分析2中提到,MapTask分为4种,分别是Job-setup Task.Job-cleanup Task.Task-cleanup Task和Map Task.其中,Job-setup Task和Job-cleanup Task分别是作业运行时启动的第一个任务和最后一个任务,主要工作分别是进行一些作业初始化和收尾工作,比如创建和删除作业临时输出目录:而Task-cleanup Task则是任务失败或者被杀死后,用于清理已写入临时目录中数据的任务.本文

Task运行过程分析4——Map Task内部实现2

在Task运行过程分析3--MapTask内部实现中,我们分析了MapTask的Collect阶段,并且解读了环形缓冲区使得MapTask的Collect阶段和Spill阶段可并行执行...接下来分析Spill阶段和Combine阶段... Spill过程分析 Spill过程由SpillThread线程完成,SpillThread线程实际上是缓冲区kvbuffer的消费者 protected class SpillThread extends Thread { @Override public

Task运行过程分析1

1.Task运行过程概述 在MapReduce计算框架中,一个应用程序被划分成Map和Reduce两个计算阶段,它们分别由一个或者多个Map Task和Reduce Task组成.其中,每个Map Task处理输入数据集合中的一片数据(InputSplit),并将产生的若干个数据片段写到本地磁盘上,而Reduce Task则从每个Map Task上远程拷贝相应的数据片段,经分组聚集和归约后,将结果写到HDFS上作为最终结果,如下图所示: 总体上看,Map Task与Reduce Task之间的数

Spark分析之Standalone运行过程分析

一.集群启动过程--启动Master $SPARK_HOME/sbin/start-master.sh start-master.sh脚本关键内容: spark-daemon.sh start org.apache.spark.deploy.master.Master 1 --ip $SPARK_MASTER_IP --port $SPARK_MASTER_PORT --webui-port $SPARK_MASTER_WEBUI_PORT 日志信息:$SPARK_HOME/logs/ 14/0

Spark源码分析之八:Task运行(二)

在<Spark源码分析之七:Task运行(一)>一文中,我们详细叙述了Task运行的整体流程,最终Task被传输到Executor上,启动一个对应的TaskRunner线程,并且在线程池中被调度执行.继而,我们对TaskRunner的run()方法进行了详细的分析,总结出了其内Task执行的三个主要步骤: Step1:Task及其运行时需要的辅助对象构造,主要包括: 1.当前线程设置上下文类加载器: 2.获取序列化器ser: 3.更新任务状态TaskState: 4.计算垃圾回收时间: 5.反

Spark源码分析之七:Task运行(一)

在Task调度相关的两篇文章<Spark源码分析之五:Task调度(一)>与<Spark源码分析之六:Task调度(二)>中,我们大致了解了Task调度相关的主要逻辑,并且在Task调度逻辑的最后,CoarseGrainedSchedulerBackend的内部类DriverEndpoint中的makeOffers()方法的最后,我们通过调用TaskSchedulerImpl的resourceOffers()方法,得到了TaskDescription序列的序列Seq[Seq[Tas

MFC的运行过程分析

MFC程序的运行细节剖析 MFC程序也是Windows程序,所以它应该也有一个WinMain,但是在程序中看不到它的踪影.其实在程序进入点之前,还有一个(而且仅有一个)全局对象(theApp),这就是所谓的Application object,当操作系统将程序加载并激活时,这个全局对象获得配置,其构造函数会先执行,比WinMain更早. 一 CWinApp取代WinMain CWinApp的派生对象被称为application object,可以想见,CWinApp本身就代表一个程序本体.所谓程

第三十六课 Spark之TaskScheduler Spark shell案例运行日志详解、TaskScheduler和SchedulerBackend、FIFO与FAIR、Task运行时本地性算法详

</pre></h2><div><p>本节课内容:</p><p>1.     TaskSchedulerBackend与SchedulerBackend</p><p>2.     FIFO与FAIR两种调度模式</p><p>3.     Task数据本地性资源的分配</p></div><h3>一.Scheduler运行过程(Spark-shell角度)

Job和Task运行时信息的维护

JobTracker最重要的功能之一是状态监控,包括TaskTracker.Job和Task等运行时状态的监控,其中TaskTracker状态监控比较简单,只要记录其最近心跳汇报时间和健康状况(由TaskTracker端的监控脚本检测,并通过心跳将结果发送给JobTracker)即可. 作业描述模型 如下图所示 JobTracker在其内部以"三层多叉树"的方式描述和跟踪每个作业的运行状态.JobTracker为每个作业创建一个JobInProgress对象以跟踪和监控其运行状态.该对