大数据:Mapper输出缓冲区MapOutputBuffer

Mapper的输出缓冲区MapOutputBuffer

现在我们知道了Map的输入端,紧接着我们看map的输出,这里重点就是context.write这个语句的内涵。获取视频中文档资料及完整视频的伙伴请加QQ群:947967114

搞清Mapper作为参数传给map的context,这里我们看Mapper的run被调用的时候作为了参数传递下来。调用Mapper.run的是MapTask. runNewMapper。到这里我们深究一下runNewMapper。我们看MapTask的run方法:我们重点看runNewMapper

public void run(final JobConf job, final TaskUmbilicalProtocol umbilical)

throws IOException, ClassNotFoundException, InterruptedException {

this.umbilical = umbilical;

if (isMapTask()) {

  // If there are no reducers then there won‘t be any sort. Hence the map

  // phase will govern the entire attempt‘s progress.

  if (conf.getNumReduceTasks() == 0) {

    mapPhase = getProgress().addPhase("map", 1.0f);

  } else {

    // If there are reducers then the entire attempt‘s progress will be

    // split between the map phase (67%) and the sort phase (33%).

    mapPhase = getProgress().addPhase("map", 0.667f);

    sortPhase  = getProgress().addPhase("sort", 0.333f);

  }

}

TaskReporter reporter = startReporter(umbilical);获取视频中文档资料及完整视频的伙伴请加QQ群:947967114

boolean useNewApi = job.getUseNewMapper();

initialize(job, getJobID(), reporter, useNewApi);

// check if it is a cleanupJobTask

if (jobCleanup) {

  runJobCleanupTask(umbilical, reporter);

  return;

}

if (jobSetup) {

  runJobSetupTask(umbilical, reporter);

  return;

}

if (taskCleanup) {

  runTaskCleanupTask(umbilical, reporter);

  return;

}

if (useNewApi) {

  runNewMapper(job, splitMetaInfo, umbilical, reporter);

} else {

  runOldMapper(job, splitMetaInfo, umbilical, reporter);

}

done(umbilical, reporter);

}

当我们点runNewMapper的时候就能进入真正实现:

private <INKEY,INVALUE,OUTKEY,OUTVALUE>

void runNewMapper(final JobConf job,final TaskSplitIndex splitIndex,final TaskUmbilicalProtocol umbilical,TaskReporter reporter) throws IOException, ClassNotFoundException,

InterruptedException {

// make a task context so we can get the classes

org.apache.hadoop.mapreduce.TaskAttemptContext taskContext =

  new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(job,                                                             getTaskID(),reporter);

// make a mapper    org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE> mapper =  (org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>)

ReflectionUtils.newInstance(taskContext.getMapperClass(), job);

//确定该用哪一种具体的Mapper,然后创建。获取视频中文档资料及完整视频的伙伴请加QQ群:947967114

org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE> inputFormat =

  (org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE>)

ReflectionUtils.newInstance(taskContext.getInputFormatClass(), job);

//确定输入的文件格式

// rebuild the input split

org.apache.hadoop.mapreduce.InputSplit split = null;

split = getSplitDetails(new Path(splitIndex.getSplitLocation()),splitIndex.getStartOffset());//确定这个Mapper所用的输入是哪一个split

LOG.info("Processing split: " + split);

org.apache.hadoop.mapreduce.RecordReader<INKEY,INVALUE> input =

  new NewTrackingRecordReader<INKEY,INVALUE>

    (split, inputFormat, reporter, taskContext);

//创建和InputFormat相称的RecordReader

job.setBoolean(JobContext.SKIP_RECORDS, isSkipping());

org.apache.hadoop.mapreduce.RecordWriter output = null;

// get an output object

if (job.getNumReduceTasks() == 0) {

//如果设置的reduce个数是0,就直接输出。

  output =

    new NewDirectOutputCollector(taskContext, job, umbilical, reporter);

} else {

  output = new NewOutputCollector(taskContext, job, umbilical, reporter);

}

接下来我们看一下NewOutputCollector源码 获取视频中文档资料及完整视频的伙伴请加QQ群:947967114

NewOutputCollector(org.apache.hadoop.mapreduce.JobContext jobContext,

                   JobConf job,

                   TaskUmbilicalProtocol umbilical,

                   TaskReporter reporter

                   ) throws IOException, ClassNotFoundException {

  collector = createSortingCollector(job, reporter);

//创建通向排序阶段的collecter

  partitions = jobContext.getNumReduceTasks();

//通过获取Reduce数量来获得partitions数量。两个数量一一对应

  if (partitions > 1) {

//获取的partitions 数量大于1

    partitioner = (org.apache.hadoop.mapreduce.Partitioner<K,V>)

  ReflectionUtils.newInstance(jobContext.getPartitionerClass(), job);

//ReflectionUtils.newInstance创建用户设置的Partitioner,里边的参数jobContext.getPartitionerClass()是对抽象类的某种扩充,表示自己可以书写一个Partitioner类,通过这个方法来获取,如果没有自己写,就是用默认的HashPartitioner

  } else {

    partitioner = new org.apache.hadoop.mapreduce.Partitioner<K,V>() {

      @Override

      public int getPartition(K key, V value, int numPartitions) {

        return partitions - 1;

      }//只有一个partition就动态扩充抽象类Partitioner类

    };

  }

}

回到runNewMapper源码:

org.apache.hadoop.mapreduce.MapContext<INKEY, INVALUE, OUTKEY, OUTVALUE>

mapContext =

  new MapContextImpl<INKEY, INVALUE, OUTKEY, OUTVALUE>(job, getTaskID(), input, output, committer, reporter, split);

//创建一个用于Mapper的Context。

org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>.Context

    mapperContext =

      new WrappedMapper<INKEY, INVALUE, OUTKEY, OUTVALUE>().getMapContext(mapContext);

//把上边创建的mapContext通过getMapContext获取过来最终传递给mapperContext ,我们继续看getMapContext源码

public Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context

getMapContext(MapContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> mapContext) {

return new Context(mapContext);

}

//这里返回了Context对象,在查看Context对象。获取视频中文档资料及完整视频的伙伴请加QQ群:947967114

public Context(MapContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> mapContext) {

  this.mapContext = mapContext;

}

//我们看到获取了mapContext 的值。所以我们知道WrappedMapper-->Context-->mapContext是一个MapContextImpl。

try {

  input.initialize(split, mapperContext);

//初始化input,input是recordReader对象,split和mapperContext作为参数

mapper.run(mapperContext);

//我们知道这个run方法运行的是Mapper的run方法,所以看一下这个run

public void run(Context context) throws IOException, InterruptedException {

setup(context);

//获取context

try {

  while (context.nextKeyValue()) {

//通过nextKeyValue来控制运行

    map(context.getCurrentKey(), context.getCurrentValue(), context);

//运行了map方法,给了recordReader提供过来的键值对。

  }

} finally {

  cleanup(context);

}

}

回到MapTask源码

  mapPhase.complete();

//上锁

  setPhase(TaskStatus.Phase.SORT);

//所有的task结果进行排序

  statusUpdate(umbilical);

//更新runNewMapper状态。

  input.close();

//关闭输入流

  input = null;

  output.close(mapperContext);

//关闭输出流

  output = null;

} finally {

  closeQuietly(input);

  closeQuietly(output, mapperContext);

}

}

对于输入格式和分片以前已经详细说过了,需要注意NewTrackingRecordReader。我们知道有了InputFormat之后需要创建与他对应的RecordReader。但是在RecordReader上是用NewTrackingRecordReader。不同之处在于Tracking,是一个跟踪,对RecordReader的跟踪,他这里有一个参数reporter,就是用来上报跟踪结果的,RecordReader则没有这个功能。

和输出有关的是collecter,是输出数据的收集器,context.write最后就通过RecodWriter落实到collector.collect上。RecordWriter和RecordReader是同一个层次。RecodWriter是hadoop定义个一个抽象类,具体的RecodWriter就是对这个抽象类的扩充。用于maptask的就是NewDrictDoutputCollecter和NewOutputCollecter。

这两个类叫做OutputCollecter,实际上都是RecordWriter。Collecter只是一种语意的描述。从Mapper的角度看是Writer,是输出。从框架或下游的角度看是Collect,是收集。

如果reducer数量是0,就是没有reducer,Mapper的输出就是整个MR的输出,这个时候用RecordWriter的NewDrictDoutputCollecter,直接输出。相反至少有一个Reducer,那么使用的就是RecordWriter的NewOutputCollecter。这是我们注重的重点内容。我们看NewOutputCollecter源码。定义了几个内容:

  collector = createSortingCollector(job, reporter);

//实现MapOutputCollector

  partitions = jobContext.getNumReduceTasks();

//负责Mapper输出的分区

    partitioner = (org.apache.hadoop.mapreduce.Partitioner<K,V>)

//分发目标的个数,也就是Reducer的个数。

@Override

public void write(K key, V value) throws IOException, InterruptedException {

  collector.collect(key, value,

                    partitioner.getPartition(key, value, partitions));

}

//write只写不读。

@Override

public void close(TaskAttemptContext context

                  ) throws IOException,InterruptedException {

  try {

    collector.flush();

  } catch (ClassNotFoundException cnf) {

    throw new IOException("can‘t find class ", cnf);

  }

  collector.close();

}

}

NewOutputCollector分成两部分,一个是collecter还有一个是partitioner。collecter负责实际收集Mapper输出并交付给Reducer的工作,partitioner负责决定把具体的输出交给哪一个Reducer。

有多个Reducer存在,MR框架需要把每个Mapper的每项输出,也就是收集到的所有的KV对。按照某种条件(就是Partioner的实现方式,默认就是HashPartitioner)输出到不同的Reducer。这样就把Mapper的输出划分成了多个分区(Partition),有几个Reducer就把每个Mapper还分成几个Partition,Partitioner就是起到划分的作用。hash的方式。。。。。。。。。。。。

所以在创建NewOutputCollector的构造函数中,就要把具体的collector和partitioner创建好。

hadoop的源码中定义了MapOutputCollector。凡是实现了这个类,除了init和close方法外,还必须提供collect和flush这两个函数,从NewOutputCollector知道这两个函数的调用者是collector,创建collector的方式是通过createSortingCollector来完成的。并且还实现了对KV对的排序。从属关系如下:

YarnChild.main->PrivilegeExceptionAction.run->Maptask.run-->RunNewMapper->NewOutputCollector->MapTask.createSortingCollector

那么我们来看一下createSortingCollector源码。获取视频中文档资料及完整视频的伙伴请加QQ群:947967114

private <KEY, VALUE> MapOutputCollector<KEY, VALUE>

      createSortingCollector(JobConf job, TaskReporter reporter)

throws IOException, ClassNotFoundException {

MapOutputCollector.Context context =

  new MapOutputCollector.Context(this, job, reporter);

Class<?>[] collectorClasses = job.getClasses(

  JobContext.MAP_OUTPUT_COLLECTOR_CLASS_ATTR, MapOutputBuffer.class);

//如果没有添加设置就默认使用MapOutputBuffer.class

int remainingCollectors = collectorClasses.length;

for (Class clazz : collectorClasses) {

//逐一实验设置的collectorClasses

  try {

    if (!MapOutputCollector.class.isAssignableFrom(clazz)) {

      throw new IOException("Invalid output collector class: " + clazz.getName() +

        " (does not implement MapOutputCollector)");

//这里告诉我们必须实现MapOutputCollector.class

    }

    Class<? extends MapOutputCollector> subclazz =

      clazz.asSubclass(MapOutputCollector.class);

    LOG.debug("Trying map output collector class: " + subclazz.getName());

//获取日志

    MapOutputCollector<KEY, VALUE> collector =

      ReflectionUtils.newInstance(subclazz, job);

//创建collector对象。

    collector.init(context);

//初始化collector,实际上初始化的是MapOutputBuffer对象

    LOG.info("Map output collector class = " + collector.getClass().getName());

    return collector;

//没有异常就成功了。

  } catch (Exception e) {

    String msg = "Unable to initialize MapOutputCollector " + clazz.getName();

    if (--remainingCollectors > 0) {

      msg += " (" + remainingCollectors + " more collector(s) to try)";

    }

    LOG.warn(msg, e);

  }

}

throw new IOException("Unable to initialize any output collector");

}

具体采用什么collector是可以在配置文件mapred-default.xml中设置的,这里的MAP_OUTPUT_COLLECTOR_CLASS_ATTR即mapreduce.job.output.collector.class.如果文件中没有设置就使用默认的MapOutputBuffer。所以实际创建的collcter就是Mapask的MapOutputBuffer。这个类是Maptask的内部类,实现了MapOutputCollector。

可想而知,如果我们另写一个实现了MapOutputCollectior的Collector,并修改配置文件mapred-default.xml中队配置项的设置。那么就可以创建不是MapTask.MapOutputBuffer。那样createSortingCollector创建的就是一个没有排序功能的collector。我们知道MapReduce框架之所以是工作流不是数据流的原因就是因为Mapper和Reducer之间的排序。因为Sort只有在所有数据到来之后才能完成。sort完之后所有数据才被Rducer拉取。那么没有了sort之后代表数据可以不断的流入而不是一次性的填充,MR给我们提供了这种可能性,就是通过写一个不排序的Collector来替代MapOutputBuffer。我们接下来还是把注意力放到runNewMapper上。

当创建了collector和partitioner之后就是Context,MapTask在调用mapper.run时作为参数的是mapperContext,这个对象的类型是WrappedMapper.Context,整个过程是MapContextImpl创建了mapContext对象,通过WrappedMapper对象(是对Mapper的扩充,根据名字就可以知道是对Mapper的包装区别就是在内部定义了Context类),把一个扩充的Mapper.Context包装在Mapper内部,这就是WrappedMapper.Context类对象。下面是部分代码;

public class WrappedMapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>

extends Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {

public Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context

getMapContext(MapContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> mapContext) {

return new Context(mapContext);

}

@InterfaceStability.Evolving

public class Context

  extends Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context {

protected MapContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> mapContext;

//MapContext类。被MapContextImpl实现

public Context(MapContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> mapContext) {

  this.mapContext = mapContext;

}

/**

 * Get the input split for this map.

 */

public InputSplit getInputSplit() {

  return mapContext.getInputSplit();

}

@Override

public KEYIN getCurrentKey() throws IOException, InterruptedException {

  return mapContext.getCurrentKey();

}

@Override

public VALUEIN getCurrentValue() throws IOException, InterruptedException {

  return mapContext.getCurrentValue();

}

@Override

public boolean nextKeyValue() throws IOException, InterruptedException {

  return mapContext.nextKeyValue();

}

WrappedMapper.Context是对Mapper.Context的扩充。内部mapContext,它的构造函数Context中的this.mapContext就设置成这个MapContextImpl类对象mapContext。WrappedMapper.Context扩充了Mapper.Context的write、getCurrentKey、nextKeyValue等。

传给mapper.run的context就是WrappedMapper.Context对象。里面的mapContext是MapContextImpl对象。

我们继续看Mapper.map的context.write

关系是:MapTask.run->runNewMapper->Mapper.run->Mapper.map

按照这个关系找到了一个没有做任何事的方法。

public void write(KEYOUT key, VALUEOUT value)

  throws IOException, InterruptedException;

我们需要找一个实现,这里找到的就是WrappedMapper.Context.write

就是这一段:

public void write(KEYOUT key, VALUEOUT value) throws IOException,

    InterruptedException {

  mapContext.write(key, value);

}

这里的调用的其实是MapContextImpl.write。所以我们找到MapContextImpl。当我们看到MapContextImpl源码是看到继承了TaskInputOutputContextImpl我们找到了

public void write(KEYOUT key, VALUEOUT value

                ) throws IOException, InterruptedException {

output.write(key, value);

}

找到这里我们还是没有找到真正的实现,这里的witer实际上调用的是,NewOutputCollector.writer。

public void write(K key, V value) throws IOException, InterruptedException {

  collector.collect(key, value,

                    partitioner.getPartition(key, value, partitions));

}

绕了一大圈之后我们发现最终回到了NewOutputCollector,这里的write和之前的有明显区别是collect实现的,里面有了分区。我们找的目的是一定要找到write中真正实现了分区写。

我们知道context是个WrappedMappe.Context对象,所以context.write其实就是就是Wrapped.Context.write,这个函数转而调用内部成分mapContext的write函数,而mapContext是个MapContextImpl对象,所以实际调用的是MoapCntextImpl.write。然而MapContextImpl中没有提供write函数,但是我们看到这个类继承了TaskInputOutputContextImpl。所以就继承他的write方法,然后这个write函数调用的是output的write,我们知道这个output参数类型是一个RecordReader,实际上这个output就是MapTask中定义的output,这个output是一个NewOutputCollector,也就是说是调用的NewOutputCollector的write方法,在这个write中我们看到调用了collector的collect,这个collecter就是Maptask.MapOutputBuffer。

在调用Maptask.MapOutputBuffer的collect时增加了一个参数partition,是指明KV去向的,这个值是有job.setPartitionerClass指定的,没有设置就使用了hashPartitioner。下面所有的工作就是由MapTask的MapOutputBuffer来完成了。获取视频中文档资料及完整视频的伙伴请加QQ群:947967114

原文地址:http://blog.51cto.com/14043271/2323207

时间: 2024-11-14 13:11:06

大数据:Mapper输出缓冲区MapOutputBuffer的相关文章

大数据运营之孕育:服务过程设计,卓有成效的管理者

[本文摘自:李福东<大数据运营>3.6?,了解更多请关注微信公号:李福东频道] 编制按 大数据服务过程包括:服务目录管理.容量管理.可用性管理.连续性管理.服务等级管理.信息安全管理.供应商管理等. 正文 在设计方法方面,大数据服务与支撑企业运营的服务既存在区别,又存在联系.不同之处是:大数据服务的设计主要以"数据"为参考点,"数据"类型越多.越丰富.越新鲜,则越有助于设计好的服务:两者的共同点是:大数据服务归根结底还是为企业运营服务的,是为了提升企业在

大数据:Map终结和Spill文件合并

当Mapper没有数据输入,mapper.run中的while循环会调用context.nextKeyValue就返回false,于是便返回到runNewMapper中,在这里程序会关闭输入通道和输出通道,这里关闭输出通道并没有关闭collector,必须要先flush一下. 获取更多大数据视频资料请加QQ群:947967114 代码结构: Maptask.runNewMapper->NewOutputCollector.close->MapOutputBuffer.flush 我们看flus

大数据-hadoop理论

前言 : 下面可能用的很多计算的词语,理解是计算不是单单1+1是计算,对于计算机而言,任何的程序执行就是一个计算过程. 1:计算过程区别(关键字:并行计算) 传统的计算方式: 一个文件数据->开始计算(整个文件有多少数据就计算多少,从头到尾)->计算结束 并行计算: 一个文件数据->拆分存储在一个集群中(每个计算机上存文件的一部分数据)->并行计算(每个计算机只要计算分给它的那部分数据, 且整个集群一起运行)->将运行之后的结果汇总->计算结束 2:计算方式(关键字:计

Spark应用开发之一:Hadoop分析大数据

要学会和使用一门技术的时候,首先要弄清楚该技术出现的背景和要解决的问题.要说spark首先要了解海量数据的处理和Hadoop技术. 一个系统在运行的过程中都会产生许多的日志数据,这些日志数据包含但不局限我们平常开发中使用log4j或者logback生成的记录系统运行情况的日志.例如对于网络服务提供商,他们的设备可能会记录着用户上下线时间,访问的网页地址,响应时长等数据,这些数据文件里面记录的某些信息经过抽取分析后可以得出许多的指标信息,从而为改善网络结构和提高服务等提供数据依据.但这些数据会很大

117道有关大数据面试题解析,希望对你有所帮助

一 .简述如何安装配置apache 的一个开源的hadoop 使用root账户登陆 2.修改ip 3.修改host主机名 4.配置ssh 免密登陆 5.关闭防火墙 6.安装JDK 7.解压hadoop安装包 8.配置hadoop的核心配置文件 hadoop-env.sh? core-site.xml? mapred-site.xml yarn-site.xml hdfs-site.xml 9.配置hadoop 的环境变量 10 .格式化hadoop namenode-format 启动节点sta

出一套高端大气上档次的大数据开发面试题(刷起来!!!)

一千个读者眼中有一千个哈姆雷特,一千名 大数据 程序员心目中就有一千套 大数据面试题.本文就是笔者认为可以用来面试大数据 程序员的面试题. 这套题的题目跟公司和业务都没有关系,而且也并不代表笔者本人可以把这些题回答得非常好,笔者只是将一部分觉得比较好的题从收集的面试题里面抽出来了而已. 收集的面试题有以下三个来源: 笔者在准备面试的过程中搜集并整理过的面试题. 笔者在准备面试的过程中自己思考过的新题. 笔者在面试过程中遇到的觉得比较好的题. 好了不说废话了 上~~~~~~~题~~~~~~~~~

117道有关大数据面试题的解析,希望对你有所帮助!

一 .简述如何安装配置apache 的一个开源的hadoop 使用root账户登陆 2.修改ip 3.修改host主机名 4.配置ssh 免密登陆 5.关闭防火墙 6.安装JDK 7.解压hadoop安装包 8.配置hadoop的核心配置文件 hadoop-env.sh? core-site.xml? mapred-site.xml yarn-site.xml hdfs-site.xml 9.配置hadoop 的环境变量 10 .格式化hadoop namenode-format 启动节点sta

大数据阶段划分及案例单词统计

大数据阶段的重要课程划分 离线分析 : hadoop生态圈 HDFS, MapReduce(概念偏多), hive(底层是MapReduce), 离线业务分析80%都是使用hive 实时分析 : spark 数据结构 : 二叉树(面试) 动态规划, redis数据库, SSM三大框架 1. spring 2. springMVC 3. mybatis HDFSAPI HDFS创建目录 @Test public void mkdirTest() throws IOException { //1 获

大数据 : Hadoop reduce阶段

Mapreduce中由于sort的存在,MapTask和ReduceTask直接是工作流的架构.而不是数据流的架构.在MapTask尚未结束,其输出结果尚未排序及合并前,ReduceTask是又有数据输入的,因此即使ReduceTask已经创建也只能睡眠等待MapTask完成.从而可以从MapTask节点获取数据.一个MapTask最终的数据输出是一个合并的spill文件,可以通过Web地址访问.所以reduceTask一般在MapTask快要完成的时候才启动.启动早了浪费container资源