Hadoop2.6.0运行mapreduce之Uber模式验证

前言

在有些情况下,运行于Hadoop集群上的一些mapreduce作业本身的数据量并不是很大,如果此时的任务分片很多,那么为每个map任务或者reduce任务频繁创建Container,势必会增加Hadoop集群的资源消耗,并且因为创建分配Container本身的开销,还会增加这些任务的运行时延。如果能将这些小任务都放入少量的Container中执行,将会解决这些问题。好在Hadoop本身已经提供了这种功能,只需要我们理解其原理,并应用它。

Uber运行模式就是解决此类问题的现成解决方案。本文旨在通过测试手段验证Uber运行模式的效果,在正式的生成环境下,还需要大家具体情况具体对待。

Uber运行模式

Uber运行模式对小作业进行优化,不会给每个任务分别申请分配Container资源,这些小任务将统一在一个Container中按照先执行map任务后执行reduce任务的顺序串行执行。那么什么样的任务,mapreduce框架会认为它是小任务呢?

  • 地图任务的数量不大于mapreduce.job.ubertask.maxmaps参数(默认值是9)的值;
  • 减少任务的数量不大于mapreduce.job.ubertask.maxreduces参数(默认值是1)的值;
  • 输入文件大小不大于mapreduce.job.ubertask.maxbytes参数(默认为1个Block的字节大小)的值;
  • map任务和reduce任务需要的资源量不能大于MRAppMaster(mapreduce作业的ApplicationMaster)可用的资源总量;

我们可以使用在《Hadoop2.6.0配置参数查看小工具》一文中制作的小工具,查看Uber相关参数及其默认值:

上面显示的参数mapreduce.job.ubertask.enable用来控制是否开启Uber运行模式,默认为false。

优化

为简单起见,我们还是以WordCount例子展开。输入数据及输出结果目录的构造过程可以参照《Hadoop2.6.0的FileInputFormat的任务切分原理分析》一文,本文不再赘述。

限制任务划分数量

我们知道WordCount例子中的reduce任务的数量通过Job.setNumReduceTasks(int)方法已经设置为1,因此满足mapreduce.job.ubertask.maxreduces参数的限制。所以我们首先控制下map任务的数量,我们通过设置mapreduce.input.fileinputformat.split.maxsize参数来限制。看看在满足小任务前提,但是不开启Uber运行模式时的执行情况。执行命令如下:

hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-2.6.0.jar wordcount -D mapreduce.input.fileinputformat.split.maxsize = 30 /wordcount/input /wordcount/output/result1

观察执行结果,可以看到没有启用Uber模式,作业划分为6个分片,如下图:


还可以看到一共是6个地图任务和1个减少任务,如下图:

我在任务执行过程中,在web界面对于分配的Container进行截图,可以看到一共分配了7个Container:

如果阅读了《Hadoop2.6.0的FileInputFormat的任务切分原理分析》一文,你会知道输入源/wordcount/input目录下2个文件的大小总和为177字节,为了这么小的数据量和简单的WordCount而分配这么多资源的确很不划算。

开启Uber模式

现在我们开启mapreduce.job.ubertask.enable参数并使用Uber运行模式,命令如下:

hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-2.6.0.jar wordcount -D mapreduce.input.fileinputformat.split.maxsize = 30 -D mapreduce.job.ubertask.enable = true /wordcount/input /wordcount/output/result2

然后观察执行结果,可以看到已经启用了Uber模式,如下图:

依然是6个地图任务和1个减少任务,但是之前的数据本地地图任务= 6一行信息已经变为当地的其他地图tasks=6。此外还增加了TOTAL_LAUNCHED_UBERTASKS、NUM_UBER_SUBMAPS、NUM_UBER_SUBREDUCES等信息,如下图所示:

以下列出这几个信息的含义:

输出字段 描述
TOTAL_LAUNCHED_UBERTASKS 启动的Uber任务数
NUM_UBER_SUBMAPS Uber任务中的地图任务数
NUM_UBER_SUBREDUCES Uber中减少任务数

因此我们知道这7个任务都在Uber模式下运行,其中包含6个map任务和1个reduce任务。

即便如此,有人依然会担心真正分配了多少Container资源,请看我在web界面的截图:

其它测试

由于我主动控制了分片大小,导致分片数量是6,这小于mapreduce.job.ubertask.maxmaps参数的默认值9。按照之前的介绍,当map任务数量大于9时,那么这个作业就不会被认为小任务。所以我们先将分片大小调整为20字节,使得map任务的数量刚好等于9,然后执行以下命令:

hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-2.6.0.jar wordcount -D mapreduce.input.fileinputformat.split.maxsize = 20 -D mapreduce.job.ubertask.enable = true /wordcount/input /wordcount/output/result3

任务划分相关的信息截图如下:


。我们看到的确将输入数据划分为9份了其它信息如下:

我们看到一共10个Uber模式运行的任务,其中包括9个地图任务和1个减少任务。

最后,我们再将分片大小调整为19字节,使得map任务数量等于10,然后执行以下命令:

hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-2.6.0.jar wordcount -D mapreduce.input.fileinputformat.split.maxsize = 19 -D mapreduce.job.ubertask.enable = true /wordcount/input /wordcount/output/result4

任务划分相关的信息截图如下:

。我们看到的确将输入数据划分为10份了其它信息如下:

可以看到又重新显示了数据的本地地图 地图

此外,还可以通过调整reduce任务数量或者输入数据大小等方式,使得Uber失效,有兴趣的同学可以自行测试。

源码分析

本文的最后,我们从源码实现的角度来具体分析下Uber运行机制。有经验的Hadoop工程师,想必知道当mapreduce任务提交给ResourceManager后,由RM负责向NodeManger通信启动一个Container用于执行MRAppMaster。启动MRAppMaster实际也是通过调用其main方法,最终会调用MRAppMaster实例的serviceStart方法,其实现如下:

  protected void serviceStart() throws Exception {

    // 省略无关代码
    job = createJob(getConfig(), forcedState, shutDownMessage);

    // 省略无关代码
    if (!errorHappenedShutDown) {
      JobEvent initJobEvent = new JobEvent(job.getID(), JobEventType.JOB_INIT);

      jobEventDispatcher.handle(initJobEvent);

      // 省略无关代码

      if (job.isUber()) {
        speculatorEventDispatcher.disableSpeculation();
      } else {
        dispatcher.getEventHandler().handle(
            new SpeculatorEvent(job.getID(), clock.getTime()));
      }

    }

serviceStart方法的执行步骤如下:

  1. 调用createJob方法创建JobImpl实例。
  2. 发送JOB_INIT事件,然后处理此事件。
  3. 使用Uber运行模式的一个附加动作——即一旦满足Uber运行的四个条件,那么将不会进行推断执行优化。

createJob方法的代码实现如下:

  protected Job createJob(Configuration conf, JobStateInternal forcedState,
      String diagnostic) {

    // create single job
    Job newJob =
        new JobImpl(jobId, appAttemptID, conf, dispatcher.getEventHandler(),
            taskAttemptListener, jobTokenSecretManager, jobCredentials, clock,
            completedTasksFromPreviousRun, metrics,
            committer, newApiCommitter,
            currentUser.getUserName(), appSubmitTime, amInfos, context,
            forcedState, diagnostic);
    ((RunningAppContext) context).jobs.put(newJob.getID(), newJob);

    dispatcher.register(JobFinishEvent.Type.class,
        createJobFinishEventHandler());
    return newJob;
  }

从以上代码可以看到创建了一个JobImpl对象,此对象自身维护了一个状态机(有关状态机转换的实现原理可以参阅《Hadoop2.6.0中YARN底层状态机实现分析》一文的内容),用于在接收到事件之后进行状态转移并触发一些动作。JobImpl新建后的状态forcedState是JobStateInternal.NEW。最后将此JobImpl对象放入AppContext接口的实现类RunningAppContext的类型为Map<JobId,工作>的缓存上下文中。

JobEventDispatcher的handle方法用来处理JobEvent。之前说到serviceStart方法主动创建了一个类型是JobEventType.JOB_INIT的JobEvent,并且交由JobEventDispatcher的handle方法处理。handle方法的实现如下:

  private class JobEventDispatcher implements EventHandler<JobEvent> {
    @SuppressWarnings("unchecked")
    @Override
    public void handle(JobEvent event) {
      ((EventHandler<JobEvent>)context.getJob(event.getJobId())).handle(event);
    }
  }

处理方法从AppContext的实现类RunningAppContext中获取JobImpl对象,代码如下:

    @Override
    public Job getJob(JobId jobID) {
      return jobs.get(jobID);
    }

最后调用JobImpl实例的句柄方法,其实现如下:

  public void handle(JobEvent event) {
    if (LOG.isDebugEnabled()) {
      LOG.debug("Processing " + event.getJobId() + " of type "
          + event.getType());
    }
    try {
      writeLock.lock();
      JobStateInternal oldState = getInternalState();
      try {
         getStateMachine().doTransition(event.getType(), event);
      } catch (InvalidStateTransitonException e) {
        LOG.error("Can‘t handle this event at current state", e);
        addDiagnostic("Invalid event " + event.getType() +
            " on Job " + this.jobId);
        eventHandler.handle(new JobEvent(this.jobId,
            JobEventType.INTERNAL_ERROR));
      }
      //notify the eventhandler of state change
      if (oldState != getInternalState()) {
        LOG.info(jobId + "Job Transitioned from " + oldState + " to "
                 + getInternalState());
        rememberLastNonFinalState(oldState);
      }
    }

    finally {
      writeLock.unlock();
    }
  }

处理方法的处理步骤如下:

  1. 获取修改JobImpl实例的锁;
  2. 获取JobImpl实例目前所处的状态
  3. 状态机状态转换;
  4. 释放修改JobImpl实例的锁。

getInternalState方法用于获取JobImpl实例当前的状态,其实现如下:

  @Private
  public JobStateInternal getInternalState() {
    readLock.lock();
    try {
      if(forcedState != null) {
        return forcedState;
      }
     return getStateMachine().getCurrentState();
    } finally {
      readLock.unlock();
    }
  }

我们之前介绍过,在创建JobImpl实例时,其forcedState字段应当是JobStateInternal.NEW。
JobImpl状态机转移时,处理的JobEvent的类型是JobEventType.JOB_INIT,因此经过状态机转换最终会调用InitTransition的transition方法。有关状态机转换的实现原理可以参阅《Hadoop2.6.0中YARN底层状态机实现分析》一文的内容。
InitTransition的transition方法处理Uber运行模式的关键代码是

    @Override
    public JobStateInternal transition(JobImpl job, JobEvent event) {
        // 省略无关代码
        job.makeUberDecision(inputLength);

        // 省略无关代码
    }

最后我们看看JobImpl实例的makeUberDecision方法的实现:

  private void makeUberDecision(long dataInputLength) {
    //FIXME:  need new memory criterion for uber-decision (oops, too late here;
    // until AM-resizing supported,
    // must depend on job client to pass fat-slot needs)
    // these are no longer "system" settings, necessarily; user may override
    int sysMaxMaps = conf.getInt(MRJobConfig.JOB_UBERTASK_MAXMAPS, 9);

    int sysMaxReduces = conf.getInt(MRJobConfig.JOB_UBERTASK_MAXREDUCES, 1);

    long sysMaxBytes = conf.getLong(MRJobConfig.JOB_UBERTASK_MAXBYTES,
        fs.getDefaultBlockSize(this.remoteJobSubmitDir)); // FIXME: this is wrong; get FS from
                                   // [File?]InputFormat and default block size
                                   // from that

    long sysMemSizeForUberSlot =
        conf.getInt(MRJobConfig.MR_AM_VMEM_MB,
            MRJobConfig.DEFAULT_MR_AM_VMEM_MB);

    long sysCPUSizeForUberSlot =
        conf.getInt(MRJobConfig.MR_AM_CPU_VCORES,
            MRJobConfig.DEFAULT_MR_AM_CPU_VCORES);

    boolean uberEnabled =
        conf.getBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
    boolean smallNumMapTasks = (numMapTasks <= sysMaxMaps);
    boolean smallNumReduceTasks = (numReduceTasks <= sysMaxReduces);
    boolean smallInput = (dataInputLength <= sysMaxBytes);
    // ignoring overhead due to UberAM and statics as negligible here:
    long requiredMapMB = conf.getLong(MRJobConfig.MAP_MEMORY_MB, 0);
    long requiredReduceMB = conf.getLong(MRJobConfig.REDUCE_MEMORY_MB, 0);
    long requiredMB = Math.max(requiredMapMB, requiredReduceMB);
    int requiredMapCores = conf.getInt(
            MRJobConfig.MAP_CPU_VCORES,
            MRJobConfig.DEFAULT_MAP_CPU_VCORES);
    int requiredReduceCores = conf.getInt(
            MRJobConfig.REDUCE_CPU_VCORES,
            MRJobConfig.DEFAULT_REDUCE_CPU_VCORES);
    int requiredCores = Math.max(requiredMapCores, requiredReduceCores);
    if (numReduceTasks == 0) {
      requiredMB = requiredMapMB;
      requiredCores = requiredMapCores;
    }
    boolean smallMemory =
        (requiredMB <= sysMemSizeForUberSlot)
        || (sysMemSizeForUberSlot == JobConf.DISABLED_MEMORY_LIMIT);

    boolean smallCpu = requiredCores <= sysCPUSizeForUberSlot;
    boolean notChainJob = !isChainJob(conf);

    // User has overall veto power over uberization, or user can modify
    // limits (overriding system settings and potentially shooting
    // themselves in the head).  Note that ChainMapper/Reducer are
    // fundamentally incompatible with MR-1220; they employ a blocking
    // queue between the maps/reduces and thus require parallel execution,
    // while "uber-AM" (MR AM + LocalContainerLauncher) loops over tasks
    // and thus requires sequential execution.
    isUber = uberEnabled && smallNumMapTasks && smallNumReduceTasks
        && smallInput && smallMemory && smallCpu
        && notChainJob;

    if (isUber) {
      LOG.info("Uberizing job " + jobId + ": " + numMapTasks + "m+"
          + numReduceTasks + "r tasks (" + dataInputLength
          + " input bytes) will run sequentially on single node.");

      // make sure reduces are scheduled only after all map are completed
      conf.setFloat(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART,
                        1.0f);
      // uber-subtask attempts all get launched on same node; if one fails,
      // probably should retry elsewhere, i.e., move entire uber-AM:  ergo,
      // limit attempts to 1 (or at most 2?  probably not...)
      conf.setInt(MRJobConfig.MAP_MAX_ATTEMPTS, 1);
      conf.setInt(MRJobConfig.REDUCE_MAX_ATTEMPTS, 1);

      // disable speculation
      conf.setBoolean(MRJobConfig.MAP_SPECULATIVE, false);
      conf.setBoolean(MRJobConfig.REDUCE_SPECULATIVE, false);
    } else {
      StringBuilder msg = new StringBuilder();
      msg.append("Not uberizing ").append(jobId).append(" because:");
      if (!uberEnabled)
        msg.append(" not enabled;");
      if (!smallNumMapTasks)
        msg.append(" too many maps;");
      if (!smallNumReduceTasks)
        msg.append(" too many reduces;");
      if (!smallInput)
        msg.append(" too much input;");
      if (!smallCpu)
        msg.append(" too much CPU;");
      if (!smallMemory)
        msg.append(" too much RAM;");
      if (!notChainJob)
        msg.append(" chainjob;");
      LOG.info(msg.toString());
    }
  }

如果你认真阅读以上代码的实现,就知道这正是我在本文一开始说的Uber运行模式判断mapreduce作业是否采用Uber模式执行的4个条件,缺一不可。一旦判定为Uber运行模式,那么还告诉我们以下几点:

  1. 设置当map任务全部运行结束后才开始reduce任务(参数mapreduce.job.reduce.slowstart.completedmaps设置为1.0)。
  2. 将当前Job的最大map任务尝试执行次数(参数mapreduce.map.maxattempts)和最大reduce任务尝试次数(参数mapreduce.reduce.maxattempts)都设置为1。
  3. 取消当前Job的map任务的推断执行(参数mapreduce.map.speculative设置为false)和reduce任务的推断执行(参数mapreduce.reduce.speculative设置为false)。

后记:个人总结整理的《深入理解Spark:核心思想与源码分析》一书现在已经正式出版上市,目前京东、当当、天猫等网站均有销售,欢迎感兴趣的同学购买。

京东:http://item.jd.com/11846120.html

当当:http://product.dangdang.com/23838168.html

时间: 2024-12-10 10:31:47

Hadoop2.6.0运行mapreduce之Uber模式验证的相关文章

Hadoop2.6.0运行mapreduce之推断(speculative)执行(一)

前言 当一个应用向YARN集群提交作业后,此作业的多个任务由于负载不均衡.资源分布不均等原因都会导致各个任务运行完成的时间不一致,甚至会出现一个任务明显慢于同一作业的其它任务的情况.如果对这种情况不加优化,最慢的任务最终会拖慢整个作业的整体执行进度.好在mapreduce框架提供了任务推断执行机制,当有必要时就启动一个备份任务.最终会采用备份任务和原任务中率先执行完的结果作为最终结果. 由于具体分析推断执行机制,篇幅很长,所以我会分成几篇内容陆续介绍. 推断执行测试 本文在我自己搭建的集群(集群

Hadoop2 使用 YARN 运行 MapReduce 的过程源码分析

Hadoop 使用 YARN 运行 MapReduce 的过程如下图所示: 总共分为11步. 这里以 WordCount 为例, 我们在客户端终端提交作业: # 把本地的 /home/hadoop/test.txt 文件上传到 HDFS 的 /input 下, 之后 HDFS 会对文件分块等 hadoop-2.7.3/bin/hadoop fs -put /home/hadoop/test.txt /input/ # 我们以 hadoop 自带测试例子 wordcount 为例 hadoop-2

Ubuntu12.04 64bit搭建Hadoop-2.2.0

一.准备工作: 集群安装ubuntu12.04 64bit系统,配置各结点IP地址 开启ssh服务,方便以后远程登录,命令sudo apt-get install openssh-server(无需重启) 使用命令:ssh [email protected]测试服务连接是否正常 设置无密钥登录: 修改主机名:sudo vim /etc/hostname将各主机设置成相应的名字,如mcmaster.node1.node2... 修改/etc/hosts文件:sudo vim /etc/hosts,

搭建hadoop2.6.0 HDFS HA及YARN HA

最终结果:[[email protected] ~]$ jps12723 ResourceManager12995 Jps12513 NameNode12605 DFSZKFailoverController [[email protected] ~]$ jps12137 ResourceManager12233 Jps12009 DFSZKFailoverController11930 NameNode [[email protected] ~]$ jps12196 DataNode12322

使用命令行编译打包运行自己的MapReduce程序 Hadoop2.6.0

使用命令行编译打包运行自己的MapReduce程序 Hadoop2.6.0 网上的 MapReduce WordCount 教程对于如何编译 WordCount.java 几乎是一笔带过… 而有写到的,大多又是 0.20 等旧版本版本的做法,即 javac -classpath /usr/local/hadoop/hadoop-1.0.1/hadoop-core-1.0.1.jar WordCount.java,但较新的 2.X 版本中,已经没有 hadoop-core*.jar 这个文件,因此

windows下使用Eclipse编译运行MapReduce程序 Hadoop2.6.0/Ubuntu(二)

在上篇文章中eclipse已经能访问HDFS目录,但并不能进行Mapreduce编程,在这里小编将常见错误和处理办法进行总结,希望对大家有所帮助 错误1:ERROR [main] util.Shell (Shell.java:getWinUtilsPath(303)) - Failed to locate the winutils binary in the hadoop binary path java.io.IOException: Could not locate executable n

Win7下面安装hadoop2.x插件及Win7/Linux运行MapReduce程序

一.win7下 (一).安装环境及安装包 win7 32 bit jdk7 eclipse-java-juno-SR2-win32.zip hadoop-2.2.0.tar.gz hadoop-eclipse-plugin-2.2.0.jar hadoop-common-2.2.0-bin.rar (二).安装 默认已经安装好了jdk.eclipse以及配置好了hadoop伪分布模式 1.拷贝hadoop-eclipse-plugin-2.2.0.jar插件到Eclipse安装目录的子目录plu

Hadoop-2.6.0分布式单机环境搭建HDFS讲解Mapreduce示例

Hadoop安装使用 1.1 Hadoop简介 1.2 HDFS分布式存储系统 1.3 单机安装 1.4 Mapreduce 案例 1.5 伪分布式安装 1.6 课后作业 1.1 Hadoop简介 在文章的时候已经讲解了Hadoop的简介以及生态圈,有什么不懂的可以"出门右转" http://dwz.cn/4rdSdU 1.2 HDFS分布式存储系统(Hadoop Distributed File System) HDFS优点 高容错性 数据自动保存多个副本 副本都时候会自动恢复 适合

Hadoop2.2.0伪分布式之MapReduce简介

一概念. mapReduce是分布式计算模型.注:在hadoop2.x中MapReduce运行在yarn上,yarn支持多种运算模型.storm.spark等等,任何运行在JVM上的程序都可以运行在yarn上. MR有两个阶段组成,Map和Reduce,用户只需要实现Map()和reduce()两个函数(且这两个函数的输入和输出均是key -value的形式)即可实现分布式计算.代码示例略. MapReduce设计框架: 在1.0中:,管理者:Job Tracker:被管理者:Task Trac