Hadoop 2.2.0 Job源代码阅读笔记

  本文所有涉及的内容均为2.2.0版本中呈现。

  概述:

  Job在创建Job并且提交的人的眼中,可以在创建的时候通过配置Job的内容,控制Job的执行,以及查询Job的运行状态。一旦Job提交以后,将不能对其进行配置,否则将会出现IllegalStateException异常。

  正常情况下用户通过Job类来创建、描述、提交Job,以及监控Job的处理过程。下面是一个简单的例子:  

// Create a new Job
Job job = new Job(new Configuration());
job.setJarByClass(MyJob.class);

// Specify various job-specific parameters
job.setJobName("myjob");

job.setInputPath(new Path("in"));
job.setOutputPath(new Path("out"));

job.setMapperClass(MyJob.MyMapper.class);
job.setReducerClass(MyJob.MyReducer.class);
// Submit the job, then poll for progress until the job is complete
job.waitForCompletion(true);

  基本结构:  

  Job类在org.apache.hadoop.mapreduce包中,继承了JobContextImpl类以及实现了JobContext接口。

  Job定义的静态常量:  

private static final Log LOG = LogFactory.getLog(Job.class);

  @InterfaceStability.Evolving
  public static enum JobState {DEFINE, RUNNING};
  private static final long MAX_JOBSTATUS_AGE = 1000 * 2;
  public static final String OUTPUT_FILTER = "mapreduce.client.output.filter";
  /** Key in mapred-*.xml that sets completionPollInvervalMillis */
  public static final String COMPLETION_POLL_INTERVAL_KEY =
    "mapreduce.client.completion.pollinterval";

  /** Default completionPollIntervalMillis is 5000 ms. */
  static final int DEFAULT_COMPLETION_POLL_INTERVAL = 5000;
  /** Key in mapred-*.xml that sets progMonitorPollIntervalMillis */
  public static final String PROGRESS_MONITOR_POLL_INTERVAL_KEY =
    "mapreduce.client.progressmonitor.pollinterval";
  /** Default progMonitorPollIntervalMillis is 1000 ms. */
  static final int DEFAULT_MONITOR_POLL_INTERVAL = 1000;

  public static final String USED_GENERIC_PARSER =
    "mapreduce.client.genericoptionsparser.used";
  public static final String SUBMIT_REPLICATION =
    "mapreduce.client.submit.file.replication";
  private static final String TASKLOG_PULL_TIMEOUT_KEY =
           "mapreduce.client.tasklog.timeout";
  private static final int DEFAULT_TASKLOG_TIMEOUT = 60000;

  Job定义的私有变量:  

 private JobState state = JobState.DEFINE;
 private JobStatus status;
 private long statustime;
 private Cluster cluster;

  Job类加载的时候就要执行的加载配置文件的方法:  

static {
    ConfigUtil.loadResources();
 }

  加载的配置文件包括mapred-default.xml、mapred-site.xml、yarn-default.xml、yarn-site.xml。

  

  Job的构造函数:  

  @Deprecated
  public Job() throws IOException {
    this(new Configuration());
  }

  @Deprecated
  public Job(Configuration conf) throws IOException {
    this(new JobConf(conf));
  }

  @Deprecated
  public Job(Configuration conf, String jobName) throws IOException {
    this(conf);
    setJobName(jobName);
  }

  Job(JobConf conf) throws IOException {
    super(conf, null);
    // propagate existing user credentials to job
    this.credentials.mergeAll(this.ugi.getCredentials());
    this.cluster = null;
  }

  Job(JobStatus status, JobConf conf) throws IOException {
    this(conf);
    setJobID(status.getJobID());
    this.status = status;
    state = JobState.RUNNING;
  }

  可以注意到Hadoop不鼓励通过缺省的构造函数和通过Configuration类来构造Job对象。通过JobConf对象来构建Job是一个不错的选择。

  

  获取Job对象的实例化方法:

    除了通过构造函数,Job类中还提供了通过一些静态方法来获取Job的事例对象,看一下具体定义:    

 /**
   * Creates a new {@link Job} with no particular {@link Cluster} .
   * A Cluster will be created with a generic {@link Configuration}.
   *
   * @return the {@link Job} , with no connection to a cluster yet.
   * @throws IOException
   */
  public static Job getInstance() throws IOException {
    // create with a null Cluster
    return getInstance(new Configuration());
  }

  /**
   * Creates a new {@link Job} with no particular {@link Cluster} and a
   * given {@link Configuration}.
   *
   * The <code>Job</code> makes a copy of the <code>Configuration</code> so
   * that any necessary internal modifications do not reflect on the incoming
   * parameter.
   *
   * A Cluster will be created from the conf parameter only when it‘s needed.
   *
   * @param conf the configuration
   * @return the {@link Job} , with no connection to a cluster yet.
   * @throws IOException
   */
  public static Job getInstance(Configuration conf) throws IOException {
    // create with a null Cluster
    JobConf jobConf = new JobConf(conf);
    return new Job(jobConf);
  }

  /**
   * Creates a new {@link Job} with no particular {@link Cluster} and a given jobName.
   * A Cluster will be created from the conf parameter only when it‘s needed.
   *
   * The <code>Job</code> makes a copy of the <code>Configuration</code> so
   * that any necessary internal modifications do not reflect on the incoming
   * parameter.
   *
   * @param conf the configuration
   * @return the {@link Job} , with no connection to a cluster yet.
   * @throws IOException
   */
  public static Job getInstance(Configuration conf, String jobName)
           throws IOException {
    // create with a null Cluster
    Job result = getInstance(conf);
    result.setJobName(jobName);
    return result;
  }

  /**
   * Creates a new {@link Job} with no particular {@link Cluster} and given
   * {@link Configuration} and {@link JobStatus}.
   * A Cluster will be created from the conf parameter only when it‘s needed.
   *
   * The <code>Job</code> makes a copy of the <code>Configuration</code> so
   * that any necessary internal modifications do not reflect on the incoming
   * parameter.
   *
   * @param status job status
   * @param conf job configuration
   * @return the {@link Job} , with no connection to a cluster yet.
   * @throws IOException
   */
  public static Job getInstance(JobStatus status, Configuration conf)
  throws IOException {
    return new Job(status, new JobConf(conf));
  }

  /**
   * Creates a new {@link Job} with no particular {@link Cluster}.
   * A Cluster will be created from the conf parameter only when it‘s needed.
   *
   * The <code>Job</code> makes a copy of the <code>Configuration</code> so
   * that any necessary internal modifications do not reflect on the incoming
   * parameter.
   *
   * @param ignored
   * @return the {@link Job} , with no connection to a cluster yet.
   * @throws IOException
   * @deprecated Use {@link #getInstance()}
   */
  @Deprecated
  public static Job getInstance(Cluster ignored) throws IOException {
    return getInstance();
  }

  /**
   * Creates a new {@link Job} with no particular {@link Cluster} and given
   * {@link Configuration}.
   * A Cluster will be created from the conf parameter only when it‘s needed.
   *
   * The <code>Job</code> makes a copy of the <code>Configuration</code> so
   * that any necessary internal modifications do not reflect on the incoming
   * parameter.
   *
   * @param ignored
   * @param conf job configuration
   * @return the {@link Job} , with no connection to a cluster yet.
   * @throws IOException
   * @deprecated Use {@link #getInstance(Configuration)}
   */
  @Deprecated
  public static Job getInstance(Cluster ignored, Configuration conf)
      throws IOException {
    return getInstance(conf);
  }

  /**
   * Creates a new {@link Job} with no particular {@link Cluster} and given
   * {@link Configuration} and {@link JobStatus}.
   * A Cluster will be created from the conf parameter only when it‘s needed.
   *
   * The <code>Job</code> makes a copy of the <code>Configuration</code> so
   * that any necessary internal modifications do not reflect on the incoming
   * parameter.
   *
   * @param cluster cluster
   * @param status job status
   * @param conf job configuration
   * @return the {@link Job} , with no connection to a cluster yet.
   * @throws IOException
   */
  @Private
  public static Job getInstance(Cluster cluster, JobStatus status,
      Configuration conf) throws IOException {
    Job job = getInstance(status, conf);
    job.setCluster(cluster);
    return job;
  }

    可见通过这种方式获取Job实例的时候会有可能涉及到Cluster。

    

    轮询周期的方法:    

 /** The interval at which monitorAndPrintJob() prints status */
  public static int getProgressPollInterval(Configuration conf) {
    // Read progress monitor poll interval from config. Default is 1 second.
    int progMonitorPollIntervalMillis = conf.getInt(
      PROGRESS_MONITOR_POLL_INTERVAL_KEY, DEFAULT_MONITOR_POLL_INTERVAL);
    if (progMonitorPollIntervalMillis < 1) {
      LOG.warn(PROGRESS_MONITOR_POLL_INTERVAL_KEY +
        " has been set to an invalid value; "
        + " replacing with " + DEFAULT_MONITOR_POLL_INTERVAL);
      progMonitorPollIntervalMillis = DEFAULT_MONITOR_POLL_INTERVAL;
    }
    return progMonitorPollIntervalMillis;
  }

  /** The interval at which waitForCompletion() should check. */
  public static int getCompletionPollInterval(Configuration conf) {
    int completionPollIntervalMillis = conf.getInt(
      COMPLETION_POLL_INTERVAL_KEY, DEFAULT_COMPLETION_POLL_INTERVAL);
    if (completionPollIntervalMillis < 1) {
      LOG.warn(COMPLETION_POLL_INTERVAL_KEY +
       " has been set to an invalid value; "
       + "replacing with " + DEFAULT_COMPLETION_POLL_INTERVAL);
      completionPollIntervalMillis = DEFAULT_COMPLETION_POLL_INTERVAL;
    }
    return completionPollIntervalMillis;
  }

    上面两个方法分别为获取并且打印Job的运行状态的周期,以及查看Job是否完成的周期。

    

    需要做异步处理的方法:    

synchronized void ensureFreshStatus()
      throws IOException {
    if (System.currentTimeMillis() - statustime > MAX_JOBSTATUS_AGE) {
      updateStatus();
    }
  }

 /** Some methods need to update status immediately. So, refresh
   * immediately
   * @throws IOException
   */
  synchronized void updateStatus() throws IOException {
    try {
      this.status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
        @Override
        public JobStatus run() throws IOException, InterruptedException {
          return cluster.getClient().getJobStatus(status.getJobID());
        }
      });
    }
    catch (InterruptedException ie) {
      throw new IOException(ie);
    }
    if (this.status == null) {
      throw new IOException("Job status not available ");
    }
    this.statustime = System.currentTimeMillis();
  }

 private synchronized void connect()
          throws IOException, InterruptedException, ClassNotFoundException {
    if (cluster == null) {
      cluster =
        ugi.doAs(new PrivilegedExceptionAction<Cluster>() {
                   public Cluster run()
                          throws IOException, InterruptedException,
                                 ClassNotFoundException {
                     return new Cluster(getConfiguration());
                   }
                 });
    }
  } 

    

    设置配置参数的方法:

    

/**
   * Set the number of reduce tasks for the job.
   * @param tasks the number of reduce tasks
   * @throws IllegalStateException if the job is submitted
   */
  public void setNumReduceTasks(int tasks) throws IllegalStateException {
    ensureState(JobState.DEFINE);
    conf.setNumReduceTasks(tasks);
  }

  /**
   * Set the current working directory for the default file system.
   *
   * @param dir the new current working directory.
   * @throws IllegalStateException if the job is submitted
   */
  public void setWorkingDirectory(Path dir) throws IOException {
    ensureState(JobState.DEFINE);
    conf.setWorkingDirectory(dir);
  }

  /**
   * Set the {@link InputFormat} for the job.
   * @param cls the <code>InputFormat</code> to use
   * @throws IllegalStateException if the job is submitted
   */
  public void setInputFormatClass(Class<? extends InputFormat> cls
                                  ) throws IllegalStateException {
    ensureState(JobState.DEFINE);
    conf.setClass(INPUT_FORMAT_CLASS_ATTR, cls,
                  InputFormat.class);
  }

  /**
   * Set the {@link OutputFormat} for the job.
   * @param cls the <code>OutputFormat</code> to use
   * @throws IllegalStateException if the job is submitted
   */
  public void setOutputFormatClass(Class<? extends OutputFormat> cls
                                   ) throws IllegalStateException {
    ensureState(JobState.DEFINE);
    conf.setClass(OUTPUT_FORMAT_CLASS_ATTR, cls,
                  OutputFormat.class);
  }

  /**
   * Set the {@link Mapper} for the job.
   * @param cls the <code>Mapper</code> to use
   * @throws IllegalStateException if the job is submitted
   */
  public void setMapperClass(Class<? extends Mapper> cls
                             ) throws IllegalStateException {
    ensureState(JobState.DEFINE);
    conf.setClass(MAP_CLASS_ATTR, cls, Mapper.class);
  }

  /**
   * Set the Jar by finding where a given class came from.
   * @param cls the example class
   */
  public void setJarByClass(Class<?> cls) {
    ensureState(JobState.DEFINE);
    conf.setJarByClass(cls);
  }

  /**
   * Set the job jar
   */
  public void setJar(String jar) {
    ensureState(JobState.DEFINE);
    conf.setJar(jar);
  }

  /**
   * Set the reported username for this job.
   *
   * @param user the username for this job.
   */
  public void setUser(String user) {
    ensureState(JobState.DEFINE);
    conf.setUser(user);
  }

  /**
   * Set the combiner class for the job.
   * @param cls the combiner to use
   * @throws IllegalStateException if the job is submitted
   */
  public void setCombinerClass(Class<? extends Reducer> cls
                               ) throws IllegalStateException {
    ensureState(JobState.DEFINE);
    conf.setClass(COMBINE_CLASS_ATTR, cls, Reducer.class);
  }

  /**
   * Set the {@link Reducer} for the job.
   * @param cls the <code>Reducer</code> to use
   * @throws IllegalStateException if the job is submitted
   */
  public void setReducerClass(Class<? extends Reducer> cls
                              ) throws IllegalStateException {
    ensureState(JobState.DEFINE);
    conf.setClass(REDUCE_CLASS_ATTR, cls, Reducer.class);
  }

  /**
   * Set the {@link Partitioner} for the job.
   * @param cls the <code>Partitioner</code> to use
   * @throws IllegalStateException if the job is submitted
   */
  public void setPartitionerClass(Class<? extends Partitioner> cls
                                  ) throws IllegalStateException {
    ensureState(JobState.DEFINE);
    conf.setClass(PARTITIONER_CLASS_ATTR, cls,
                  Partitioner.class);
  }

  /**
   * Set the key class for the map output data. This allows the user to
   * specify the map output key class to be different than the final output
   * value class.
   *
   * @param theClass the map output key class.
   * @throws IllegalStateException if the job is submitted
   */
  public void setMapOutputKeyClass(Class<?> theClass
                                   ) throws IllegalStateException {
    ensureState(JobState.DEFINE);
    conf.setMapOutputKeyClass(theClass);
  }

  /**
   * Set the value class for the map output data. This allows the user to
   * specify the map output value class to be different than the final output
   * value class.
   *
   * @param theClass the map output value class.
   * @throws IllegalStateException if the job is submitted
   */
  public void setMapOutputValueClass(Class<?> theClass
                                     ) throws IllegalStateException {
    ensureState(JobState.DEFINE);
    conf.setMapOutputValueClass(theClass);
  }

  /**
   * Set the key class for the job output data.
   *
   * @param theClass the key class for the job output data.
   * @throws IllegalStateException if the job is submitted
   */
  public void setOutputKeyClass(Class<?> theClass
                                ) throws IllegalStateException {
    ensureState(JobState.DEFINE);
    conf.setOutputKeyClass(theClass);
  }

  /**
   * Set the value class for job outputs.
   *
   * @param theClass the value class for job outputs.
   * @throws IllegalStateException if the job is submitted
   */
  public void setOutputValueClass(Class<?> theClass
                                  ) throws IllegalStateException {
    ensureState(JobState.DEFINE);
    conf.setOutputValueClass(theClass);
  }

  /**
   * Define the comparator that controls how the keys are sorted before they
   * are passed to the {@link Reducer}.
   * @param cls the raw comparator
   * @throws IllegalStateException if the job is submitted
   */
  public void setSortComparatorClass(Class<? extends RawComparator> cls
                                     ) throws IllegalStateException {
    ensureState(JobState.DEFINE);
    conf.setOutputKeyComparatorClass(cls);
  }

  /**
   * Define the comparator that controls which keys are grouped together
   * for a single call to
   * {@link Reducer#reduce(Object, Iterable,
   *                       org.apache.hadoop.mapreduce.Reducer.Context)}
   * @param cls the raw comparator to use
   * @throws IllegalStateException if the job is submitted
   */
  public void setGroupingComparatorClass(Class<? extends RawComparator> cls
                                         ) throws IllegalStateException {
    ensureState(JobState.DEFINE);
    conf.setOutputValueGroupingComparator(cls);
  }

  /**
   * Set the user-specified job name.
   *
   * @param name the job‘s new name.
   * @throws IllegalStateException if the job is submitted
   */
  public void setJobName(String name) throws IllegalStateException {
    ensureState(JobState.DEFINE);
    conf.setJobName(name);
  }

  /**
   * Turn speculative execution on or off for this job.
   *
   * @param speculativeExecution <code>true</code> if speculative execution
   *                             should be turned on, else <code>false</code>.
   */
  public void setSpeculativeExecution(boolean speculativeExecution) {
    ensureState(JobState.DEFINE);
    conf.setSpeculativeExecution(speculativeExecution);
  }

  /**
   * Turn speculative execution on or off for this job for map tasks.
   *
   * @param speculativeExecution <code>true</code> if speculative execution
   *                             should be turned on for map tasks,
   *                             else <code>false</code>.
   */
  public void setMapSpeculativeExecution(boolean speculativeExecution) {
    ensureState(JobState.DEFINE);
    conf.setMapSpeculativeExecution(speculativeExecution);
  }

  /**
   * Turn speculative execution on or off for this job for reduce tasks.
   *
   * @param speculativeExecution <code>true</code> if speculative execution
   *                             should be turned on for reduce tasks,
   *                             else <code>false</code>.
   */
  public void setReduceSpeculativeExecution(boolean speculativeExecution) {
    ensureState(JobState.DEFINE);
    conf.setReduceSpeculativeExecution(speculativeExecution);
  }

  /**
   * Specify whether job-setup and job-cleanup is needed for the job
   *
   * @param needed If <code>true</code>, job-setup and job-cleanup will be
   *               considered from {@link OutputCommitter}
   *               else ignored.
   */
  public void setJobSetupCleanupNeeded(boolean needed) {
    ensureState(JobState.DEFINE);
    conf.setBoolean(SETUP_CLEANUP_NEEDED, needed);
  }

  /**
   * Set the given set of archives
   * @param archives The list of archives that need to be localized
   */
  public void setCacheArchives(URI[] archives) {
    ensureState(JobState.DEFINE);
    DistributedCache.setCacheArchives(archives, conf);
  }

  /**
   * Set the given set of files
   * @param files The list of files that need to be localized
   */
  public void setCacheFiles(URI[] files) {
    ensureState(JobState.DEFINE);
    DistributedCache.setCacheFiles(files, conf);
  }

  /**
   * Add a archives to be localized
   * @param uri The uri of the cache to be localized
   */
  public void addCacheArchive(URI uri) {
    ensureState(JobState.DEFINE);
    DistributedCache.addCacheArchive(uri, conf);
  }

  /**
   * Add a file to be localized
   * @param uri The uri of the cache to be localized
   */
  public void addCacheFile(URI uri) {
    ensureState(JobState.DEFINE);
    DistributedCache.addCacheFile(uri, conf);
  }

  /**
   * Add an file path to the current set of classpath entries It adds the file
   * to cache as well.
   *
   * Files added with this method will not be unpacked while being added to the
   * classpath.
   * To add archives to classpath, use the {@link #addArchiveToClassPath(Path)}
   * method instead.
   *
   * @param file Path of the file to be added
   */
  public void addFileToClassPath(Path file)
    throws IOException {
    ensureState(JobState.DEFINE);
    DistributedCache.addFileToClassPath(file, conf, file.getFileSystem(conf));
  }

  /**
   * Add an archive path to the current set of classpath entries. It adds the
   * archive to cache as well.
   *
   * Archive files will be unpacked and added to the classpath
   * when being distributed.
   *
   * @param archive Path of the archive to be added
   */
  public void addArchiveToClassPath(Path archive)
    throws IOException {
    ensureState(JobState.DEFINE);
    DistributedCache.addArchiveToClassPath(archive, conf, archive.getFileSystem(conf));
  }

  /**
   * Originally intended to enable symlinks, but currently symlinks cannot be
   * disabled.
   */
  @Deprecated
  public void createSymlink() {
    ensureState(JobState.DEFINE);
    DistributedCache.createSymlink(conf);
  }

  /**
   * Expert: Set the number of maximum attempts that will be made to run a
   * map task.
   *
   * @param n the number of attempts per map task.
   */
  public void setMaxMapAttempts(int n) {
    ensureState(JobState.DEFINE);
    conf.setMaxMapAttempts(n);
  }

  /**
   * Expert: Set the number of maximum attempts that will be made to run a
   * reduce task.
   *
   * @param n the number of attempts per reduce task.
   */
  public void setMaxReduceAttempts(int n) {
    ensureState(JobState.DEFINE);
    conf.setMaxReduceAttempts(n);
  }

  /**
   * Set whether the system should collect profiler information for some of
   * the tasks in this job? The information is stored in the user log
   * directory.
   * @param newValue true means it should be gathered
   */
  public void setProfileEnabled(boolean newValue) {
    ensureState(JobState.DEFINE);
    conf.setProfileEnabled(newValue);
  }

  /**
   * Set the profiler configuration arguments. If the string contains a ‘%s‘ it
   * will be replaced with the name of the profiling output file when the task
   * runs.
   *
   * This value is passed to the task child JVM on the command line.
   *
   * @param value the configuration string
   */
  public void setProfileParams(String value) {
    ensureState(JobState.DEFINE);
    conf.setProfileParams(value);
  }

  /**
   * Set the ranges of maps or reduces to profile. setProfileEnabled(true)
   * must also be called.
   * @param newValue a set of integer ranges of the map ids
   */
  public void setProfileTaskRange(boolean isMap, String newValue) {
    ensureState(JobState.DEFINE);
    conf.setProfileTaskRange(isMap, newValue);
  } 

  /**
   * Sets the flag that will allow the JobTracker to cancel the HDFS delegation
   * tokens upon job completion. Defaults to true.
   */
  public void setCancelDelegationTokenUponJobCompletion(boolean value) {
    ensureState(JobState.DEFINE);
    conf.setBoolean(JOB_CANCEL_DELEGATION_TOKEN, value);
  }

    要非常注意的地方就是在每项配置的时候都需要检查状态,Job只有处于DEFINE状态下的时候才可以对其进行配置。

    

    屏幕输出的方法:    

/**
   * Dump stats to screen.
   */
  @Override
  public String toString() {
    ensureState(JobState.RUNNING);
    String reasonforFailure = " ";
    int numMaps = 0;
    int numReduces = 0;
    try {
      updateStatus();
      if (status.getState().equals(JobStatus.State.FAILED))
        reasonforFailure = getTaskFailureEventString();
      numMaps = getTaskReports(TaskType.MAP).length;
      numReduces = getTaskReports(TaskType.REDUCE).length;
    } catch (IOException e) {
    } catch (InterruptedException ie) {
    }
    StringBuffer sb = new StringBuffer();
    sb.append("Job: ").append(status.getJobID()).append("\n");
    sb.append("Job File: ").append(status.getJobFile()).append("\n");
    sb.append("Job Tracking URL : ").append(status.getTrackingUrl());
    sb.append("\n");
    sb.append("Uber job : ").append(status.isUber()).append("\n");
    sb.append("Number of maps: ").append(numMaps).append("\n");
    sb.append("Number of reduces: ").append(numReduces).append("\n");
    sb.append("map() completion: ");
    sb.append(status.getMapProgress()).append("\n");
    sb.append("reduce() completion: ");
    sb.append(status.getReduceProgress()).append("\n");
    sb.append("Job state: ");
    sb.append(status.getState()).append("\n");
    sb.append("retired: ").append(status.isRetired()).append("\n");
    sb.append("reason for failure: ").append(reasonforFailure);
    return sb.toString();
  }

  

  获取任务进程的方法:  

 /**
   * Get the <i>progress</i> of the job‘s map-tasks, as a float between 0.0
   * and 1.0.  When all map tasks have completed, the function returns 1.0.
   *
   * @return the progress of the job‘s map-tasks.
   * @throws IOException
   */
  public float mapProgress() throws IOException {
    ensureState(JobState.RUNNING);
    ensureFreshStatus();
    return status.getMapProgress();
  }

  /**
   * Get the <i>progress</i> of the job‘s reduce-tasks, as a float between 0.0
   * and 1.0.  When all reduce tasks have completed, the function returns 1.0.
   *
   * @return the progress of the job‘s reduce-tasks.
   * @throws IOException
   */
  public float reduceProgress() throws IOException {
    ensureState(JobState.RUNNING);
    ensureFreshStatus();
    return status.getReduceProgress();
  }

  /**
   * Get the <i>progress</i> of the job‘s cleanup-tasks, as a float between 0.0
   * and 1.0.  When all cleanup tasks have completed, the function returns 1.0.
   *
   * @return the progress of the job‘s cleanup-tasks.
   * @throws IOException
   */
  public float cleanupProgress() throws IOException, InterruptedException {
    ensureState(JobState.RUNNING);
    ensureFreshStatus();
    return status.getCleanupProgress();
  }

  /**
   * Get the <i>progress</i> of the job‘s setup-tasks, as a float between 0.0
   * and 1.0.  When all setup tasks have completed, the function returns 1.0.
   *
   * @return the progress of the job‘s setup-tasks.
   * @throws IOException
   */
  public float setupProgress() throws IOException {
    ensureState(JobState.RUNNING);
    ensureFreshStatus();
    return status.getSetupProgress();
  }

Hadoop 2.2.0 Job源代码阅读笔记

时间: 2024-10-13 00:21:18

Hadoop 2.2.0 Job源代码阅读笔记的相关文章

CI框架源代码阅读笔记2 一切的入口 index.php

上一节(CI框架源代码阅读笔记1 - 环境准备.基本术语和框架流程)中,我们提到了CI框架的基本流程.这里再次贴出流程图.以备參考: 作为CI框架的入口文件.源代码阅读,自然由此開始. 在源代码阅读的过程中,我们并不会逐行进行解释.而仅仅解释核心的功能和实现. 1.       设置应用程序环境 define('ENVIRONMENT', 'development'); 这里的development能够是不论什么你喜欢的环境名称(比方dev,再如test).相相应的,你要在以下的switch c

Hadoop 2.2.0部署安装(笔记,单机安装)

SSH无密安装与配置 具体配置步骤: ◎ 在root根目录下创建.ssh目录 (必须root用户登录) cd /root & mkdir .ssh chmod 700 .ssh & cd .ssh ◎ 创建密码为空的 RSA 密钥对: ssh-keygen -t rsa -P "" ◎ 在提示的对称密钥名称中输入 id_rsa将公钥添加至 authorized_keys 中: cat id_rsa.pub >> authorized_keys chmod 6

seajs 3.0.0 源码阅读笔记

写笔记的时候才注意到我看的源代码是 3.0.0 的,但是官方发布的最新版本是 2.3.0.相信大部分是相同的,所以先把这个记完,再看一次 2.3.0 的代码. seajs 的源代码可以在 github上获取.seajs 在文档"如何参与开发"中说明了阅读顺序,当然为了便于阅读,在了解了目录结构之后,我直接阅读了合并好的 sea-debug.js. 整个seajs采用的是2空格缩进,避免分号的写法,我不是很习惯,但不影响阅读. 文件/目录结构 文档中各个源文件所包含的内容大致如下: in

CI框架源代码阅读笔记6 扩展钩子 Hook.php

CI框架同意你在不改动系统核心代码的基础上加入或者更改系统的核心功能(如重写缓存.输出等). 比如,在系统开启hook的条件下(config.php中$config['enable_hooks'] = TRUE;).通过加入特定的钩子,能够让系统在特定的时刻触发特定的脚本: $hook['post_system'] = array( 'class' => 'frameLog', 'function' => 'postLog', 'filename' => 'post_system.php

Flask 源代码阅读笔记

我认为我已经养成了一个坏习惯.在使用一个框架过程中对它的内部原理非常感兴趣,有时候须要花不少精力才 明确,这也导致了学习的缓慢,但换来的是对框架的内部机理的熟悉,正如侯捷所说,源代码面前,了无秘密.这也是 本文产生的直接原因. 一.flask session原理 flask的session是通过client的cookie实现的.不同于diango的server端实现,flask通过itsdangerous这个苦 将session的内容序列化到浏览器的cookie,当浏览器再次请求时将反序列化co

Vue2.0源码阅读笔记--生命周期

一.Vue2.0的生命周期 Vue2.0的整个生命周期有八个:分别是 1.beforeCreate,2.created,3.beforeMount,4.mounted,5.beforeUpdate,6.updated,7.beforeDestroy,8.destroyed. 用官方的一张图就可以清晰的了解整个生命周期: Vue最新源码下载:地址 二:源码分析 1.先看new Vue实例的方法 创建Vue实例的文件是: src/core/instance/index.js function Vue

linux源代码阅读笔记 free_page_tables()分析

76 /* 77 * This function frees a continuos block of page tables, as needed 78 * by 'exit()'. As does copy_page_tables(), this handles only 4Mb blocks. 79 */ 80 int free_page_tables(unsigned long from,unsigned long size)//size以B为单位而不是以页表为单位 81 { 82 un

linux源代码阅读笔记 linux文件系统(三)

当系统申请一个新的inode时.系统并不会对磁盘进行读写.它会在存储在内存的inode表(inode_table)中寻找一个空闲的位置. 如果找到了,直接返回该inode.否则要等待一个空闲的位置. 得到一个空闲的位置后,检查dirty位,如果dirty,那么需要写回磁盘. 但是注意,系统并不会直接对磁盘进行操作.而是申请一个高速缓冲块,对该高速缓冲块进行操作. ps:为了弥补cpu与磁盘之间巨大的速度差距.系统所有的对磁盘的读写操作都不会直接操作磁盘,而是操作高速缓冲区. 高速缓冲区再和磁盘进

linux源代码阅读笔记 find_entry分析

78 static struct buffer_head * find_entry(struct m_inode * dir, 79 const char * name, int namelen, struct dir_entry ** res_dir) find_entry是linux文件系统中一个较为费解的函数.下面我们来分析一番. 它的第一个参数是 m_inode型,它指示了函数操作的所在根目录的inode节点.即相对路径的起始节点. name自然是想要查找的目录路径,namelen是该目