Hadoop-1.2.1学习之Job创建和提交源代码分析

在Hadoop中,MapReduce的Java作业通常由编写Mapper和Reducer开始,接着创建Job对象,然后使用该对象的set方法设置Mapper和Reducer以及诸如输入输出等参数,最后调用Job对象的waitForCompletion(true)方法提交作业并等待作业的完成。尽管使用了寥寥数语就描述了作业的创建和提交,但实际情况要复杂的多,本篇文章将通过分析源代码来深入学习该过程。

通常使用public Job(Configuration conf, String jobName)创建Job作业对象,都会指定作业名称,hadoop代码只是将jobName设置为参数mapred.job.name的值。除了设置作业名称外,Job的构造函数还会使用Configuration对象初始化org.apache.hadoop.mapred.JobConf对象conf,以及使用UserGroupInformation.getCurrentUser()获取当前用户ugi。其中JobConf是描述MapReduce作业的主要接口,包括设置作业名称在内的许多方法都是由该类完成的。UserGroupInformation类用包含了用户和组的信息,该类封装了JAAS(Java
Authentication AuthorizationService,Java认证和授权服务),并提供方法确定用户名和组。

当创建了Job对象后通常会设置Mapper和Reducer,比如job.setMapperClass,正像上面提到的,该操作实际是由JobConf对象完成的,具体代码如下,其它的设置方法类似:

public void setMapperClass(Class<? extends Mapper> cls) throws IllegalStateException {
    ensureState(JobState.DEFINE);
    conf.setClass(MAP_CLASS_ATTR, cls, Mapper.class);
}

在设置完作业运行需要的参数后,执行job.waitForCompletion(true)向集群提交作业并等待作业执行完成,其中的boolean类型的参数用于决定是否向用户打印作业的执行进度。该方法的具体代码如下:

public boolean waitForCompletion(boolean verbose) throws IOException, InterruptedException,ClassNotFoundException {
    if (state == JobState.DEFINE) {
      submit();
    }
    if (verbose) {
      jobClient.monitorAndPrintJob(conf, info);
    } else {
      info.waitForCompletion();
    }
    return isSuccessful();
}

当新创建一个作业时,该作业的JobState state = JobState.DEFINE,所以上面的代码中会执行submit方法,当在submit返回后会根据参数verbose为true或false执行不同的方法。现在具体submit的实现:

public void submit() throws IOException, InterruptedException, ClassNotFoundException {
    ensureState(JobState.DEFINE);
    setUseNewAPI();//默认使用新版本中的API,除非显示设置了老版本的API

    // Connect to the JobTracker and submit the job
    connect();
    info = jobClient.submitJobInternal(conf);
    super.setJobID(info.getID());
    state = JobState.RUNNING;
}

在submit中,先确认Job的state为JobState.DEFINE,并最后在将作业提交后设置为JobState.RUNNING。connect方法用于打开到JobTracker的连接,该方法的代码为:

private void connect() throws IOException, InterruptedException {
    ugi.doAs(new PrivilegedExceptionAction<Object>() {
      public Object run() throws IOException {
        jobClient = new JobClient((JobConf) getConfiguration());
        return null;
      }
    });
}

在进一步分析之前,需要先了解两个对象,分别是JobClient jobClient和RunningJobinfo,其中jobClient是用户作业与JobTracker交互的主要接口,该类具有提交作业,跟踪作业进度,访问任务日志和获取MapReduce集群状态信息等功能。RunningJob是接口,用于查询正在运行的MapReduce作业的细节,当调用jobClient的submitJobInternal时,返回的是jobClient的内部类NetworkedJob(该类实现了RunningJob)。在connect方法中,主要是实例化了jobClient对象,而ugi的doAs方法的返回值为run方法的返回值,后面还会使用该方法(实际情况是该方法被大量使用)。在JobClient的构造方法中,主要完成了连接JobTracker的工作,该工作又交给了init方法,该方法的具体实现为:

public void init(JobConf conf) throws IOException {
String tracker = conf.get("mapred.job.tracker", "local");
// mapreduce.client.tasklog.timeout
    tasklogtimeout = conf.getInt(
      TASKLOG_PULL_TIMEOUT_KEY, DEFAULT_TASKLOG_TIMEOUT);
    this.ugi = UserGroupInformation.getCurrentUser();
    if ("local".equals(tracker)) {
      conf.setNumMapTasks(1);
      this.jobSubmitClient = new LocalJobRunner(conf);
    } else {
      this.rpcJobSubmitClient = createRPCProxy(JobTracker.getAddress(conf), conf);
      this.jobSubmitClient = createProxy(this.rpcJobSubmitClient, conf);
    }
}

在该方法中着重分析非单机模式下的情况,即mapred.job.tracker的值不是local,也即else语句中的代码。rpcJobSubmitClient和jobSubmitClient是类型为JobSubmissionProtocol的两个对象,JobClient和JobTracker使用该接口通信,JobClient使用该接口的方法提交作业及了解当前系统的状态。方法createRPCProxy和createProxy用于创建实现JobSubmissionProtocol的客户端对象。

在连接到JobTracker后,接着使用jobClient的submitJobInternal向JobTracker提交作业。在该方法中首先确定存放作业文件的路径,该路径为${mapreduce.jobtracker.staging.root.dir}/{user-name}/.staging设置,若未设置mapreduce.jobtracker.staging.root.dir则使用/tmp/hadoop/mapred/staging/${user-name}/.staging。然后在上述目录创建名为作业Id的目录,并将参数mapreduce.job.dir设置为该值,即${mapreduce.jobtracker.staging.root.dir}/{user-name}/.staging/jobId,上面的目录均是相对于fs.default.name设置的值。接下来将作业的jar文件拷贝到${mapreduce.jobtracker.staging.root.dir}/{user-name}/.staging/jobId中,并重命名为job.jar文件,该工作由copyAndConfigureFiles方法完成。接着需要在上述目录中创建job.xml文件,获取Reduce任务的数量,分割输入文件并根据分割所得块数设置Map任务的数量。做完上述工作后,使用下面的代码提交作业:

status = jobSubmitClient.submitJob( jobId, submitJobDir.toString(), jobCopy.getCredentials());

当将作业提交到JobTracker后,作业的执行将由JobTracker负责,而做为提交作业的客户端可以选择是否打印作业执行进度。

综上在Hadoop-1.2.1中作业的创建和提交包括如下的一些过程:

  • 设置作业的输入输出参数
  • 拷贝作业文件和配置文件到特定目录中
  • 计算作业的分片并设置Map任务的数量
  • 向JobTracker提交作业并可选的监控作业运行进度
时间: 2024-11-09 09:38:10

Hadoop-1.2.1学习之Job创建和提交源代码分析的相关文章

SQL Server 2008从基础开始学习历程(1)------创建表与查询

[by:yy] 无论我们学什么呢,都要讲究一个Why,一个How.那么我们为什么要学SQL呢?无非就那么几点. 1.为了适应其他技术,和其他技术配对而学. 我个人的理解呢,只要在IT行业,无论你学什么,或者做什么工作.都离不开数据库.而学习数据库呢,又太文字化了,看的心里就烦.我是很抵触文字的.看着一大排一大排的字,就怕.可能是大天朝的教育所致.已经怕了学生生涯了. 2.为了适应社会而学. 随便找个招聘信息,都会有标注需要会SQL语言啊.了解MySql呀.会搞Oracle呀.所以呢,为了能在找工

PHP学习之中数组--创建数组【1】

在PHP中数组的定义有三种写法,分别是: <?php //第一种方式是用关键字 array来创建的 $username = array("demo1","demo2","demo3","demo4"); <?php //第二种方法是用直接声明一个变量,在变量的右边加上中括号的方式来创建数组 $usrename = array(); $username[0]="aaa"; $username[1]

Oracle学习(九):创建和管理表

1.知识点:可以对照下面的录屏进行阅读 SQL> --创建表 SQL> create table test1 2 (tid number, 3 tname varchar2(20), 4 hidatedate date default sysdate); SQL> --使用as和子查询快速建表 SQL> --创建表:包含员工号 姓名 月薪 年薪 部门名称 SQL> create table empincome 2 as 3 select empno,ename,sal,sal

我的Android 4 学习系列之创建应用程序和Activity:Manifest、Application、Activity

目录 介绍Android应用程序组件,以及使用这些组件构建的各种Android应用程序 Android应用程序的生命周期 如何创建应用程序Manifest 如何使用外部资源提供对位置.语言和硬件配置的支持 如何实现和使用自己的Application类 如何创建新的Activity 理解Activity的状态转换和生命周期 Android应用程序的组成部分 Android应用程序是由松散耦合的组件构成的,并使用应用程序Manifest绑定到一起. Manifest描述了每一个组件以及他们之间的交互

springmvc学习笔记---idea创建springmvc项目

前言: 真的是很久没搞java的web服务开发了, 最近一次搞还是读研的时候, 想来感慨万千. 英雄没落, Eclipse的盟主地位隐隐然有被IntelliJ IDEA超越的趋势. Spring从2.x到现在4.x, 一眨眼已是二代的积淀. 本文想借助Idea, 来简单搭建一个基于springmvc的web程序, 一为体验, 二为技术积累. 环境配置: idea版本为14.0.2, tomcat版本为8.0.23, idea创建的springmvc为4.1.1.RELEASE. 网上有很多图文并

Sharepoint2013搜索学习笔记之创建搜索服务(二)

第一步,进入管理中心,点击管理服务器上的服务 第二步,在服务器上选择需要承载搜索服务的服务器,并启动服务列表上的sharepoint server search 第三步,从管理中心进入管理服务应用程序 第四步,新建search service application 第五步,在弹出的新建窗口分别填好相应信息点击确定,主要注意的是 应用程序池可以选择已经有的,也可以自己填一个新的名称,选择填写新的之后,程序会在稍后新建一个应用程序池,一般推荐新建应用程序池. 默认情况,爬网组件会用配置好的搜索服务

跟着刚哥学习Spring框架--创建HelloWorld项目(一)

1.Spring框架简介 Spring是一个开源框架,Spring是在2003年兴起的一个轻量级的开源框架,由Rod johnson创建.主要对JavaBean的生命周期进行管理的轻量级框架,Spring给JavaEE带来了春天. 2.Spring框架特点 √ 轻量级:不是说他的文件大小很小,指的Spring是非侵入性. 知识点:轻量级框架和重量级框架的区别 轻量级和重量级的框架是以启动程序所需要的资源所决定,比如EJB在启动程序的时候需要消耗大量的资源,内存和CPU,所以是重量级.√ 依赖注入

Cloudera Spark 及 Hadoop 开发员培训学习【北京上海】

Spark 及 Hadoop 开发员培训 学习如何将数据导入到 Apache Hadoop 机群并使用 Spark.Hive.Flume.Sqoop.Impala 及其他 Hadoop 生态系统工具对数据进行各种操作和处理分析 在为期四天的培训中,学员将学习关键概念和掌握使用最新技术和工具将数据采集到 Hadoop 机群并进行处理.通过学习掌握诸如 Spark.Hive.Flume.Sqoop 和 Impala 这样的 Hadoop 生态系统工具和技术,Hadoop 开发员将具备解决实际大数据问

好程序员Java学习路线分享创建Java class

好程序员Java学习路线分享创建Java class,首先通过Transport Client获取ES的连接 private Client client; //通过Transport Client获取ES的连接 @Before public void getClient() throws Exception{ ????//ES服务的JavaAPI的port为9300 ????//注意:如果请求一个ES集群,可以多添几个节点 ????//为了避免在一个节点出现网络问题导致的请求失败问题,可以自动切