Hadoop任务提交过程

Hadoop任务提交分析


分析工具和环境

下载一份hadoop的源码,这里以hadoop-1.1.2为例。本地IDE环境为eclipse,导入整个目录,然后可以在IDE里面看到目录结构了,要分析任务提交过程,需要找到入口代码,很明显,对于熟悉Hadoop应用程序开发的同学来说很容易的知道任务的提交是从job的配置开始的,所以需要这样一份提交Job的代码,在src/examples里面你可以找到一些例子,这里从最简单的wordcount进入去分析提交过程。

核心的一些类

Configuration Configuration类是整个框架的配置信息的集合类,里面主要包含了overlaypropertyresources,而主要的方法主要看getset,下面具体看一下这几个变量和方法overlayprivate
Properties overlay;
,其实overlay是Properties对象,用来存储key-value pair,但是它的主要作用还在于在添加Resource后,保证Resources重载后用户通过set函数设置的属性对会覆盖Resource中相同key的属性对。 properties:
用来存储key-value pair finalParameters:是一个set,用来保存final的配置信息,即不允许覆盖
addResourceObject会添加资源并调用reloadConfiguration,在reloadConfiguration中会清空properties对象和finalParameter,然后下次通过get去读某个属性的时候,会由于properties为null而,重新载入resources,这样资源得到更新,并且,在这之后立马会将overlay里面的内容覆盖进入properties,就是将之前通过set函数设置的属性对去覆盖resource中的默认属性对。

  private synchronized void addResourceObject(Object resource) {
    resources.add(resource);                      // add to resources
    reloadConfiguration();
  }

  public synchronized void reloadConfiguration() {
    properties = null;                            // trigger reload
    finalParameters.clear();                      // clear site-limits
  }

  public String get(String name) {
    return substituteVars(getProps().getProperty(name)); //这里先是getProps得到properties对象然后调用getProperty得到value,然后通过
                                                                                 //substitudeVars函数去替换value中的${..},其实就是引用变量
  }

  private synchronized Properties getProps() {
    if (properties == null) {    //properties为null会重新实例化properties,然后载入资源,并将overlay里面的内容去覆盖properties里面相同key
      properties = new Properties();          //的内容
      loadResources(properties, resources, quietmode);
      if (overlay!= null) {
        properties.putAll(overlay);
        for (Map.Entry<Object,Object> item: overlay.entrySet()) {
          updatingResource.put((String) item.getKey(), UNKNOWN_RESOURCE);
        }
      }
    }
    return properties;
  }

Job

Job类是hadoop任务配置的对象,继承于JobContext,而他们的构造函数都其实是以Configuration对象作为参数,然后提供了一些设置Job配置信息或者获取Job配置信息的一些函数。

上述的配置Job信息的函数并不是直接使用了Configuration对象,而是使用了Configuration对象的子类JobConf,JobContext以conf = JobConf(conf)作为内置成员变量。JobConf是在Confguration的基础上提供了Job配置的一些函数

Job类配置好类以及输入输出等之后,会调用Job.waitforcompletion当然也可以直接调用submit函数,其中waitForCompletion会调用submit然后jobClient.monitorAndPrintJob打印和监视Job的运行进度和情况。

submit函数分析

public void submit() throws IOException, InterruptedException,
                              ClassNotFoundException {
    ensureState(JobState.DEFINE);
    setUseNewAPI();

    // Connect to the JobTracker and submit the job
    connect(); //主要是生成了jobClient对象,而在该对象的初始化过程中,主要生成 了jobSubmitClient的一个动态代理,能够RPC调用
                        //JobTracker中的一些函数(分布式的情况,本地情况先不做分析)
    info = jobClient.submitJobInternal(conf); // 任务提交的过程
    super.setJobID(info.getID());
    state = JobState.RUNNING;
   }

jobClient构造函数分析

//jobClient的构造函数,设置了conf,然后就调用了init函数
public JobClient(JobConf conf) throws IOException {
    setConf(conf);
    init(conf);
  }

  /**
   * Connect to the default {@link JobTracker}.
   * @param conf the job configuration.
   * @throws IOException
   */
  public void init(JobConf conf) throws IOException {
    String tracker = conf.get("mapred.job.tracker", "local");
    tasklogtimeout = conf.getInt(
      TASKLOG_PULL_TIMEOUT_KEY, DEFAULT_TASKLOG_TIMEOUT);
    this.ugi = UserGroupInformation.getCurrentUser();
    if ("local".equals(tracker)) {
      conf.setNumMapTasks(1);
      this.jobSubmitClient = new LocalJobRunner(conf); //本地模式下,会生成一个LocalJobRunner对象
    } else {
      this.rpcJobSubmitClient =
          createRPCProxy(JobTracker.getAddress(conf), conf); // 分布式模式下,会生成了一个RPC代理
      this.jobSubmitClient = createProxy(this.rpcJobSubmitClient, conf); //对这个代理再次封装了一下,其实里面主要是增加了retry的一些代码
    }
  }

jobClient.submitJobInternal函数分析

public
  RunningJob submitJobInternal(final JobConf job
                               ) throws FileNotFoundException,
                                        ClassNotFoundException,
                                        InterruptedException,
                                        IOException {
    /*
     * configure the command line options correctly on the submitting dfs
     */
    return ugi.doAs(new PrivilegedExceptionAction<RunningJob>() {
      public RunningJob run() throws FileNotFoundException,
      ClassNotFoundException,
      InterruptedException,
      IOException{
        JobConf jobCopy = job;
        Path jobStagingArea = JobSubmissionFiles.getStagingDir(JobClient.this,
            jobCopy); //获得任务提交的目录,其实是作为参数的JobClient去通过RPC从JobTracker那里获取得到jobStagingArea
                            // ${hadoop.tmp.dir}/mapred/staging/user-name/.staging目录
        JobID jobId = jobSubmitClient.getNewJobId();  //从JobTracker那里获得分配的jobid
        Path submitJobDir = new Path(jobStagingArea, jobId.toString()); //submitJobDir = ${jobStagingArea}/job-id
        jobCopy.set("mapreduce.job.dir", submitJobDir.toString());
        JobStatus status = null;
        try {
          populateTokenCache(jobCopy, jobCopy.getCredentials());

          copyAndConfigureFiles(jobCopy, submitJobDir); // 这个函数主要是负责处理通过参数--libjars,--archives,--files引入的jar包,以及要使用分布式缓存的文件和archive,并且将job.jar也拷贝到分布式文件系统上

          // get delegation token for the dir
          TokenCache.obtainTokensForNamenodes(jobCopy.getCredentials(),
                                              new Path [] {submitJobDir},
                                              jobCopy);

          Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);
          int reduces = jobCopy.getNumReduceTasks();
          InetAddress ip = InetAddress.getLocalHost();
          if (ip != null) {
            job.setJobSubmitHostAddress(ip.getHostAddress());
            job.setJobSubmitHostName(ip.getHostName());
          }
          JobContext context = new JobContext(jobCopy, jobId);

        //检查output,如果已经存在则会抛出异常
          // Check the output specification
          if (reduces == 0 ? jobCopy.getUseNewMapper() :
            jobCopy.getUseNewReducer()) {
            org.apache.hadoop.mapreduce.OutputFormat<?,?> output =
              ReflectionUtils.newInstance(context.getOutputFormatClass(),
                  jobCopy);
            output.checkOutputSpecs(context);
          } else {
            jobCopy.getOutputFormat().checkOutputSpecs(fs, jobCopy);
          }

          jobCopy = (JobConf)context.getConfiguration();

        //利用设置好的inputformat的来对输入进行分片,然后将分片写入到job.split中,然后也会将分片的元信息写入到job.splitmetainfo中
        //
          // Create the splits for the job
          FileSystem fs = submitJobDir.getFileSystem(jobCopy);
          LOG.debug("Creating splits at " + fs.makeQualified(submitJobDir));
          int maps = writeSplits(context, submitJobDir);
          jobCopy.setNumMapTasks(maps);

          // write "queue admins of the queue to which job is being submitted"
          // to job file.
          String queue = jobCopy.getQueueName();
          AccessControlList acl = jobSubmitClient.getQueueAdmins(queue);
          jobCopy.set(QueueManager.toFullPropertyName(queue,
              QueueACL.ADMINISTER_JOBS.getAclName()), acl.getACLString());

            //将job配置相关的信息写入到job.xml中
          // Write job file to JobTracker‘s fs
          FSDataOutputStream out =
            FileSystem.create(fs, submitJobFile,
                new FsPermission(JobSubmissionFiles.JOB_FILE_PERMISSION));

          try {
            jobCopy.writeXml(out);  //正式写入
          } finally {
            out.close();
          }
          //
          // Now, actually submit the job (using the submit name)
          //
          printTokens(jobId, jobCopy.getCredentials());
          status = jobSubmitClient.submitJob(
              jobId, submitJobDir.toString(), jobCopy.getCredentials()); //通过RPC提交任务给JobTracker,让其去管理任务,在JobTrakcker的
                                                                                                //subnitJob里面会实例化JobInProgress并保存在一个jobs的Map<jobID,JobInProgress>的map里面
          JobProfile prof = jobSubmitClient.getJobProfile(jobId); //获得任务的概要信息,主要包括了jobname,jobid,提交的队列名,job配置信息的路径,用户,web接口的url等
          if (status != null && prof != null) {
            return new NetworkedJob(status, prof, jobSubmitClient); //对status,prof,以及jobSubmitClient的封装,主要用来可以查询进度信息和profile信息。而status里面提供了getMapProgress和setMapProgress类型的接口函数
          } else {
            throw new IOException("Could not launch job");
          }
        } finally {
          if (status == null) {
            LOG.info("Cleaning up the staging area " + submitJobDir);
            if (fs != null && submitJobDir != null)
              fs.delete(submitJobDir, true);
          }
        }
      }
    });
  }

JobClient

从上面的一些代码也可以看出真正提交Job是在JobClient做的,它主要处理了一些参数像libjars files archives把他们指定的文件拷贝到HDFS的submitJobDir,然后把JOb的配置信息job.xml , 分片信息以及job.jar也都拷贝到HDFS上。然后利用jobClient里面的一个代理类jobSubmitClient来提交任务,其实就是一个RPC请求,向JObTracker发送请求,然后返回的是一个JobStatus类型的对象,该对象可以查询map reduce进度等。



下一篇讲一下JObTracker相关

时间: 2024-10-25 14:30:12

Hadoop任务提交过程的相关文章

Hadoop MapReduce执行过程详解(带hadoop例子)

https://my.oschina.net/itblog/blog/275294 摘要: 本文通过一个例子,详细介绍Hadoop 的 MapReduce过程. 分析MapReduce执行过程 MapReduce运行的时候,会通过Mapper运行的任务读取HDFS中的数据文件,然后调用自己的方法,处理数据,最后输出.Reducer任务会接收Mapper任务输出的数据,作为自己的输入数据,调用自己的方法,最后输出到HDFS的文件中.整个流程如图: Mapper任务的执行过程详解 每个Mapper任

Hadoop的配置过程(虚拟机中的伪分布模式)

1引言 hadoop如今已经成为大数据处理中不可缺少的关键技术,在如今大数据爆炸的时代,hadoop给我们处理海量数据提供了强有力的技术支撑.因此,了解hadoop的原理与应用方法是必要的技术知识. hadoop的基础原理可参考如下的三篇论文: The Google File System, 2003 MapReduce: Simplified Data Processing on Large Clusters, 2004 Bigtable: A Distributed Storage Syst

去除hadoop的启动过程中警告信息

如何去除hadoop的启动过程中警告信息1.由于警告是在执行start-all.sh启动Hadoop时出现的,所以应该查看start-all.sh,执行more start-all.sh可以看到下面代码:if [ -e "$bin/../libexec/hadoop-config.sh" ]; then  . "$bin"/../libexec/hadoop-config.shelse  . "$bin/hadoop-config.sh"fi根据

4.事务提交过程,事务基本概念,Oracle中的事务生命周期,保存点savepoint,数据库的隔离级别

 事务提交过程 事务 基本概念 概念:一个或者多个DML语言组成 特点:要么都成功,要么都失败 事务的隔离性:多个客户端同时操作数据库的时候,要隔离它们的操作, 否则出现:脏读  不可重复读  幻读 Oracle默认情况下,事务是打开的 commit案例: SQL> create table t1(tid int,tname varchar2(20)); 表已创建. SQL> select * from tab; TNAME                          TABTYPE

MySQL事务提交过程(一)

MySQL作为一种关系型数据库,已被广泛应用到互联网中的诸多项目中.今天我们来讨论下事务的提交过程. MySQL体系结构 由于mysql插件式存储架构,导致开启binlog后,事务提交实质是二阶段提交,通过两阶段提交,来保证存储引擎和二进制日志的一致. 本文仅讨论binlog未打卡状态下的提交流程,后续会讨论打开binlog选项后的提交逻辑. 测试环境 OS:WIN7 ENGINE: bin-log:off DB: 测试条件 set autocommit=0; -- --------------

MySQL事务提交过程(二)

上一篇文章我们介绍了在关闭binlog的情况下,事务提交的大概流程.之所以关闭binlog,是因为开启binlog后事务提交流程会变成两阶段提交,这里的两阶段提交并不涉及分布式事务,当然mysql把它称之为内部xa事务(Distributed Transactions),与之对应的还有一个外部xa事务. 这里所谓的两阶段提交分别是prepare阶段和commit阶段. 内部xa事务主要是mysql内部为了保证binlog与redo log之间数据的一致性而存在的,这也是由其架构决定的(binlo

Hadoop 源码分析——Job提交过程

1.在客户端,我们进行Job相关属性设定后,最后使用job.waitForCompletion(true);提交任务到集群中,并等待集群作业完成 public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); Job job = Job.getInstance(con

[Hadoop]MapReducer工作过程

1. 从输入到输出 一个MapReducer作业经过了input,map,combine,reduce,output五个阶段,其中combine阶段并不一定发生,map输出的中间结果被分到reduce的过程成为shuffle(数据清洗). 在shuffle阶段还会发生copy(复制)和sort(排序). 在MapReduce的过程中,一个作业被分成Map和Reducer两个计算阶段,它们由一个或者多个Map任务和Reduce任务组成.如下图所示,一个MapReduce作业从数据的流向可以分为Ma

hadoop作业map过程调优使用到的参数笔记

参数:io.sort.mb(default 100) 当map task开始运算,并产生中间数据时,其产生的中间结果并非直接就简单的写入磁盘. 而是会利用到了内存buffer来进行已经产生的部分结果的缓存, 并在内存buffer中进行一些预排序来优化整个map的性能. 每一个map都会对应存在一个内存buffer,map会将已经产生的部分结果先写入到该buffer中, 这个buffer默认是100MB大小, 但是这个大小是可以根据job提交时的参数设定来调整的, 当map的产生数据非常大时,并且