MR-Job提交流程

1.一个标准 MR-Job 的执行入口:

//参数 true 表示检查并打印 Job 和 Task 的运行状况
System.exit(job.waitForCompletion(true) ? 0 : 1);

2.job.waitForCompletion(true)方法的内部实现

//job.waitForCompletion()方法的内部实现
public boolean waitForCompletion(boolean verbose
                                   ) throws IOException, InterruptedException,
                                            ClassNotFoundException {
    if (state == JobState.DEFINE) {
      submit(); //此方法的核心在于submit()
    }
    if (verbose) { //根据传入的参数,决定是否打印Job运行的详细过程
      monitorAndPrintJob();
    } else {
      // get the completion poll interval from the client.
      int completionPollIntervalMillis =
        Job.getCompletionPollInterval(cluster.getConf());
      while (!isComplete()) {
        try {
          Thread.sleep(completionPollIntervalMillis);
        } catch (InterruptedException ie) {
        }
      }
}

3. Job 类 submit()方法的内部实现

public void submit()
         throws IOException, InterruptedException, ClassNotFoundException {
    ensureState(JobState.DEFINE);
    setUseNewAPI();//使用MapReduce新的API
    connect();//返回一个【客户端代理对象 Cluster】(属于 Job 类),用于和服务端RM建立RPC通信
    final JobSubmitter submitter =
        getJobSubmitter(cluster.getFileSystem(), cluster.getClient());
    status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
      public JobStatus run() throws IOException, InterruptedException,
      ClassNotFoundException {

        //提交Job
        return submitter.submitJobInternal(Job.this, cluster);
      }
    });
    state = JobState.RUNNING;//设置 JobStatus 为 Running
    LOG.info("The url to track the job: " + getTrackingURL());
}

3.1.1.查看Connect()方法的内部实现

private synchronized void connect()
          throws IOException, InterruptedException, ClassNotFoundException {
    if (cluster == null) {
      cluster =
        ugi.doAs(new PrivilegedExceptionAction<Cluster>() {
                   public Cluster run()
                          throws IOException, InterruptedException,
                                 ClassNotFoundException {
                     //返回一个Cluster对象,并将此对象作为 Job 类的一个成员变量                     //即 Job 类持有 Cluster 的引用。
                     return new Cluster(getConfiguration());
                   }
                 });
    }
}

3.1.2.查看new Cluster()的实现过程

public Cluster(InetSocketAddress jobTrackAddr, Configuration conf)
      throws IOException {
    this.conf = conf;
    this.ugi = UserGroupInformation.getCurrentUser();
    initialize(jobTrackAddr, conf);//重点在于此方法的内部实现
}

3.1.3.客户端代理对象Cluster实例化过程有两种实现:LocalClientProtocolProvider(本地模式)和YarnClientProtocolProvider(Yarn模式)。

synchronized (frameworkLoader) {
      for (ClientProtocolProvider provider : frameworkLoader) {
        LOG.debug("Trying ClientProtocolProvider : "
            + provider.getClass().getName());

        //ClientProtocol 是Client端和NN通信的RPC协议,根据RPC通信原理,此协议接口中必定包含一个 versionID 字段。
         ClientProtocol clientProtocol = null;
        try {
          if (jobTrackAddr == null) {
            clientProtocol = provider.create(conf);
          } else {
            clientProtocol = provider.create(jobTrackAddr, conf);
          }

          if (clientProtocol != null) {  //初始化Cluster内部成员变量
            clientProtocolProvider = provider;
            client = clientProtocol;     //创建Cluster类的客户端代理对象client
            LOG.debug("Picked " + provider.getClass().getName()
                + " as the ClientProtocolProvider");
            break;
          }
          else {
            LOG.debug("Cannot pick " + provider.getClass().getName()
                + " as the ClientProtocolProvider - returned null protocol");
          }
        }
        catch (Exception e) {
          LOG.info("Failed to use " + provider.getClass().getName()
              + " due to error: " + e.getMessage());
        }
     }
 }

3.1.4.ClientProtocol接口中包含的versionID 字段

//Version 37: More efficient serialization format for framework counters
public static final long versionID = 37L;

3.2.1.查看 JobSubmitter 类中 submitJobInternal()方法的实现:

时间: 2024-10-27 07:44:00

MR-Job提交流程的相关文章

Spark修炼之道(高级篇)——Spark源码阅读:第一节 Spark应用程序提交流程

作者:摇摆少年梦 微信号: zhouzhihubeyond spark-submit 脚本应用程序提交流程 在运行Spar应用程序时,会将spark应用程序打包后使用spark-submit脚本提交到Spark中运行,执行提交命令如下: root@sparkmaster:/hadoopLearning/spark-1.5.0-bin-hadoop2.4/bin# ./spark-submit --master spark://sparkmaster:7077 --class SparkWordC

图文讲解:iOS App提交流程

原地址:http://www.toceansoft.com/ios/3287.jhtml 摘要: 由于苹果的机制,在非越狱机器上安装应用必须通过官方的App Store,开发者开发好应用后上传App Store,也需要通过审核等环节.AppCan作为一个跨主流平台的一个开发平台,也对ipa包上传App Store作了支持.本文从三个流程来介绍如何实现AppCan在线编译出ipa包,以及上传到苹果App Store. 一.证书的导出 1.1.前期工作 首先你需要有一个苹果的开发者帐号,一个Mac系

GIT(2)-从文件生命周期看提交流程

GIT的上一篇文章比较枯燥无味,理论性较强,也是难以引起共鸣!所以今天从实在操作方面说一下GIT使用过程中,使用最多的流程-提交到仓库. 开始说明提交流程之前,先看一下上篇提到的GIT整体架构图. 对!还是这张画的比较丑的图![捂脸]! 本章要说的内容,就是上面这张图的前面部分,如下: 只有提交本地仓库的流程,并没有涉及到远程仓库.关于提交流程,常用的命令:add和commit,add负责提交到暂存区,commit提交到仓库.但并不是只有这两个命令就足够了.下面开始正文. 准备你的环境 操作系统

科研论文提交流程与常见问题(EDAS 系统提交)

第一步 注册文章(Registering your Paper) 如上图,点击菜单中的submit paper按钮,会列出所有的会议和期刊,选择一个你要投稿的期刊或者会议,例如选择第一个2013 IEEE CSUDET. 接下来,点击2013 IEEE CSUDET这一排最后一个submit 按钮,填写论文题目,关键字和摘要如下图.然后提交 第二步为论文添加作者(Adding Authors) 按照要求可以输入姓名,邮箱或者ID都行,因为可能存在名字重复,在输入名字后,会显示所有重名的作者,包含

Apache Flink流作业提交流程分析

提交流程调用的关键方法链 用户编写的程序逻辑需要提交给Flink才能得到执行.本文来探讨一下客户程序如何提交给Flink.鉴于用户将自己利用Flink的API编写的逻辑打成相应的应用程序包(比如Jar)然后提交到一个目标Flink集群上去运行是比较主流的使用场景,因此我们的分析也基于这一场景进行. Flink的API针对不同的执行环境有不同的Environment对象,这里我们主要基于常用的RemoteStreamEnvironment和RemoteEnvironment进行分析 在前面我们谈到

Git向开源代码提交流程

Git向开源代码提交流程: 1. 先fork到自己的repository. 2. 将自己的repository clone到本地, 3. 创建feature branch 4. 从主repository更新代码: 1) Add主repository:  git remote add upstream https://location/of/generic.git 2) 更新代码:git pull upstream master 3) 更新 push url 防止checkin 到主reposit

Yarn的基础介绍以及job的提交流程

1.YARN的基础理论 1)关于YARN的介绍: ?? YARN 是一个资源调度平台,负责为运算程序提供服务器运算资源,相当于一个分布式的操作系统平台,而 MapReduce 等运算程序则相当于运行于操作系统之上的应用程序. 2)hadoop1.x中YARN的不足: ?? - JobTracker是集群的事务的集中处理,存在单点故障?? - JobTracker需要完成得任务太多,既要维护job的状态又要维护job的task的状态,造成资源消耗过多?? - 在 TaskTracker 端,用Ma

[转]Hadoop YARN任务提交流程

Yarn是随着hadoop发展而催生的新框架,全称是Yet Another Resource Negotiator,可以翻译为“另一个资源管理器”.yarn取代了以前hadoop中jobtracker(后面简写JT)的角色,因为以前JT的 任务过重,负责任务的调度.跟踪.失败重启等过程,而且只能运行mapreduce作业,不支持其他编程模式,这也限制了JT使用范围,而yarn应运而 生,解决了这两个问题. 为了表述清楚,大家可以先看hadoop版本说明这篇文章,我这里要说的是hadoop2.0,

Yarn任务提交流程(源码分析)

Based on Hadoop 2.7.1 JobSubmitter addMRFrameworkToDistributedCache(Configuration conf) : mapreduce.application.framework.path, 用于指定其他framework的hdfs 路径配置,默认yarn的可以不管 Token相关的方法:读取认证信息(支持二进制.json),并将其添加至相应的fileSystem中,以便以同样权限访问文件系统 copyAndConfigureFil