Hadoop-2.4.1学习之Map任务源码分析(上)

众所周知,Mapper是MapReduce编程模式中最重要的环节之一(另一个当然是Reducer了)。在Hadoop-2.x版本中虽然不再有JobTracker和TaskTracker,但Mapper任务的功能却没有变化,本篇文章将结合源代码深入分析Mapper任务时如何执行的,包括处理InputSplit,mapper的输出、对输出分类等。在进行分析之前先明确几个概念:作业、任务、任务的阶段和任务的状态,可以将作业理解为要最终实现的功能或目的,比如统计单词的数量,而任务就是对该作业的拆分,只负责一部分作业,比如在统计单词数量的例子中,将一个作业交由10个任务去完成。任务的阶段指的是当前任务在执行什么功能,比如map和分类功能,在hadoop中一个任务的阶段由枚举类Phase定义,具体有6个阶段:STARTING、MAP、SHUFFLE、SORT、REDUCE、CLEANUP。任务的状态指的是该任务所处于的状态,比如运行中,失败等,具体由枚举类State定义:RUNNING、SUCCEEDED、FAILED、UNASSIGNED、KILLED、COMMIT_PENDING、FAILED_UNCLEAN、KILLED_UNCLEAN。

在hadoop中map任务是由类MapTask表示的,该类提供了众多的内部类用于完成map任务,比如读取输入,收集输出等。在该类的开头语句块中定义了map任务的阶段:

{   // set phase for this task
    setPhase(TaskStatus.Phase.MAP);
    getProgress().setStatus("map");
}

该语句将任务的阶段设置为MAP,任务的初始阶段为STARTING。MapTask中的run方法用于执行任务,该方法中与map任务相关的源代码为(省略了作业清理、作业安装、任务清理的源代码):

public void run(final JobConf job, final TaskUmbilicalProtocol umbilical)
    throws IOException, ClassNotFoundException, InterruptedException {
    this.umbilical = umbilical;
    if (isMapTask()) {
      // If there are no reducers then there won‘t be any sort. Hence the map
      // phase will govern the entire attempt‘s progress.
      if (conf.getNumReduceTasks() == 0) {
        mapPhase = getProgress().addPhase("map", 1.0f);
      } else {
        // If there are reducers then the entire attempt‘s progress will be
        // split between the map phase (67%) and the sort phase (33%).
        mapPhase = getProgress().addPhase("map", 0.667f);
        sortPhase  = getProgress().addPhase("sort", 0.333f);
      }
}
//实现了Runnable和Report接口,用于报告进度、更新计数器和状态信息等
    TaskReporter reporter = startReporter(umbilical);
boolean useNewApi = job.getUseNewMapper();
//初始化OutputFormat、OutputCommitter,创建任务临时输出目录
    initialize(job, getJobID(), reporter, useNewApi);
    if (useNewApi) {
      runNewMapper(job, splitMetaInfo, umbilical, reporter);
    } else {
      runOldMapper(job, splitMetaInfo, umbilical, reporter);
    }
    done(umbilical, reporter);
  }

该方法的TaskUmbilicalProtocol参数是用于任务子进程与其父进程的通信协议。在该方法体中,首先判断该任务是不是map任务(总是true),然后根据Reducer的数量决定是否将整个map任务进度分割为MAP(66.7%)和SORT(33.3%)阶段,二者的和必须为1,也就是说在整个map任务的进行过程中,66.7%的时间用于MAP阶段,33.3%的时间用于SORT阶段(将任务执行的进度与任务执行的时间对等考虑)。接着根据是否使用新版本的API调用不同的方法(hadoop-1.x和hadoop-2.x使用的是新版本的API,hadoop-0.x使用的旧版本的API),此处将只考虑使用新版本API的情况,也就是将调用runNewMapper方法,该方法执行实际的任务工作,前半部分用于实例化TaskAttemptContext taskContext、Mapper mapper、InputFormat inputFormat、InputSplit split、RecordReader input、Mapper.Context mapperContext等对象,且根据Reducer的数量创建不同的output对象,源代码如下:

org.apache.hadoop.mapreduce.RecordWriter output = null;
   if (job.getNumReduceTasks() == 0) {
      //直接输出map的结果
      output = new NewDirectOutputCollector(taskContext, job, umbilical, reporter);
    } else {
      output = new NewOutputCollector(taskContext, job, umbilical, reporter);
    }

在上述对象中,mapper和inputFormat为由作业调用setMapperClass和setInputFormatClass设置的类,split表示数据块,input用于从InputSplit中读取键值对,mapperContext为map方法中Context对象。在实例化完毕上述对象后,将进行实际的map过程,源代码如下:

try {
      input.initialize(split, mapperContext);
      //调用Mapper任务的run方法
      mapper.run(mapperContext);
      mapPhase.complete();
      setPhase(TaskStatus.Phase.SORT);
      statusUpdate(umbilical);
      input.close();
      input = null;
      output.close(mapperContext);
      output = null;
    } finally {
      closeQuietly(input);
      closeQuietly(output, mapperContext);
    }

其中的mapper.run(mapperContext)的源代码如下所示。由上面的分析可知,mapper为自定义的Mapper类,而在mapper的run方法中将调用该类的map方法以完成map任务,在该方法中对分配给该mapper的InputSplit中的数据循环调用map方法。

setup(context);
    try {
      while (context.nextKeyValue()) {
        map(context.getCurrentKey(), context.getCurrentValue(), context);
      }
    } finally {
      cleanup(context);
    }

至此完成了MAP阶段的任务,并将任务的阶段设置为SORT,并更新任务状态,然后关闭输入和输出。由于SORT阶段的代码比较长(占整个MapTask的一半以上),所以将在后续的文章中进行学习。

时间: 2025-01-06 19:23:01

Hadoop-2.4.1学习之Map任务源码分析(上)的相关文章

Hadoop-2.4.1学习之Map任务源码分析(下)

在Map任务源码分析(上)中,对MAP阶段的代码进行了学习,这篇文章文章将学习Map任务的SORT阶段.如果Reducer的数量不为0,则还需要进行SORT阶段,但从上面的学习中并未发现与MAP阶段执行完毕调用mapPhase.complete()类似的在SORT阶段执行完毕调用sortPhase.complete()的源码,那SORT阶段是在什么时候启动的?对于Map任务来说,有输入就有输出,输入由RecordReader负责,输出则由RecordWriter负责,当Reducer的数量不为0

memcached学习笔记——存储命令源码分析上

原创文章,转载请标明,谢谢. 上一篇分析过memcached的连接模型,了解memcached是如何高效处理客户端连接,这一篇分析memcached源码中的process_update_command函数,探究memcached客户端的set命令,解读memcached是如何解析客户端文本命令,剖析memcached的内存管理,LRU算法是如何工作等等. 解析客户端文本命令 客户端向memcached server发出set操作,memcached server读取客户端的命令,客户端的连接状态

u-boot学习(三):u-boot源码分析

建立域模型和关系数据模型有着不同的出发点: 域模型: 由程序代码组成, 通过细化持久化类的的粒度可提高代码的可重用性, 简化编程 在没有数据冗余的情况下, 应该尽可能减少表的数目, 简化表之间的参照关系, 以便提高数据的访问速度 Hibernate 把持久化类的属性分为两种: 值(value)类型: 没有 OID, 不能被单独持久化, 生命周期依赖于所属的持久化类的对象的生命周期 实体(entity)类型: 有 OID, 可以被单独持久化, 有独立的生命周期(如果实体类型包含值类型,这个值类型就

EasyUI学习总结(三)——easyloader源码分析

EasyUI学习总结(三)--easyloader源码分析 easyloader模块是用来加载jquery easyui的js和css文件的,而且它可以分析模块的依赖关系,先加载依赖项.模块加载好了会调用parse模块来解析页面.把class是easyui开头的标签都转化成easyui的控件. 先看Demo1例子,再分析源代码. 1 <!DOCTYPE html> 2 <html> 3 <head> 4 <title>easyloader范例</tit

EasyUI学习总结(四)——parser源码分析

EasyUI学习总结(四)--parser源码分析 parser模块是easyloader第一个加载的模块,它的主要作用,就是扫描页面上easyui开头的class标签,然后初始化成easyui控件. 1 /** 2 * parser模块主要是解析页面中easyui的控件 3 */ 4 $.parser = { 5 // 是否自动解析 6 auto: true, 7 8 // 可以被解析的控件 9 plugins:['linkbutton','menu','menubutton','splitb

memcached学习笔记——存储命令源码分析下篇

上一篇回顾:<memcached学习笔记——存储命令源码分析上篇>通过分析memcached的存储命令源码的过程,了解了memcached如何解析文本命令和mencached的内存管理机制. 本文是延续上一篇,继续分析存储命令的源码.接上一篇内存分配成功后,本文主要讲解:1.memcached存储方式:2.add和set命令的区别. memcached存储方式 哈希表(HashTable) 哈希表在实践中使用的非常广泛,例如编译器通常会维护的一个符号表来保存标记,很多高级语言中也显式的支持哈希

Hadoop-2.4.1学习之NameNode -format源码分析

在Hadoop-2.X中,使用hdfs namenode –format对文件系统进行格式化.虽然无论是在生产环境还是测试环境中,已经使用了该命令若干次了,也大体了解该命令就是在本地文件系统中创建几个文件夹,但具体是如何执行的则需要通过阅读源代码来了解了. 要想看到该命令的源代码,需要看看hdfs脚本中是如何执行相应的java类的.在hdfs脚本中,下面的语句说明了实际执行的java类和参数,其中类为org.apache.hadoop.hdfs.server.namenode.NameNode,

正式学习React(四) ----Redux源码分析

今天看了下Redux的源码,竟然出奇的简单,好吧.简单翻译做下笔记: 喜欢的同学自己可以去github上看:点这里 createStore.js 1 import isPlainObject from 'lodash/isPlainObject' 2 import $$observable from 'symbol-observable' 3 4 /** 5 * These are private action types reserved by Redux. 6 * For any unkno

正式学习React(五) react-redux源码分析

磨刀不误砍柴工,咱先把react-redux里的工具函数分析一下: 源码点这里  shallowEqual.js 1 export default function shallowEqual(objA, objB) { 2 if (objA === objB) { 3 return true 4 } 5 6 const keysA = Object.keys(objA) 7 const keysB = Object.keys(objB) 8 9 if (keysA.length !== keys