MapReduce的InputFormat过程的学习

昨天经过几个小时的学习,把MapReduce的第一个阶段的过程学习了一下,也就是最最开始的时候从文件中的Data到key-value的映射,也就是InputFormat的过程。虽说过程不是很难,但是也存在很多细节的。也很少会有人对此做比较细腻的研究,学习。今天,就让我来为大家剖析一下这段代码的原理。我还为此花了一点时间做了几张结构图,便于大家理解。在这里先声明一下,我研究的MapReduce主要研究的是旧版的API,也就是mapred包下的。

InputFormat最最原始的形式就是一个接口。后面出现的各种Format都是他的衍生类。结构如下,只包含最重要的2个方法:

public interface InputFormat<K, V> {

  /**
   * Logically split the set of input files for the job.
   *
   * <p>Each {@link InputSplit} is then assigned to an individual {@link Mapper}
   * for processing.</p>
   *
   * <p><i>Note</i>: The split is a <i>logical</i> split of the inputs and the
   * input files are not physically split into chunks. For e.g. a split could
   * be <i><input-file-path, start, offset></i> tuple.
   *
   * @param job job configuration.
   * @param numSplits the desired number of splits, a hint.
   * @return an array of {@link InputSplit}s for the job.
   */
  InputSplit[] getSplits(JobConf job, int numSplits) throws IOException;

  /**
   * Get the {@link RecordReader} for the given {@link InputSplit}.
   *
   * <p>It is the responsibility of the <code>RecordReader</code> to respect
   * record boundaries while processing the logical split to present a
   * record-oriented view to the individual task.</p>
   *
   * @param split the {@link InputSplit}
   * @param job the job that this split belongs to
   * @return a {@link RecordReader}
   */
  RecordReader<K, V> getRecordReader(InputSplit split,
                                     JobConf job,
                                     Reporter reporter) throws IOException;
}

所以后面讲解,我也只是会围绕这2个方法进行分析。当然我们用的最多的是从文件中获得输入数据,也就是FileInputFormat这个类。继承关系如下:

public abstract class FileInputFormat<K, V> implements InputFormat<K, V>

我们看里面的1个主要方法:

public InputSplit[] getSplits(JobConf job, int numSplits)

返回的类型是一个InputSpilt对象,这是一个抽象的输入Spilt分片概念。结构如下:

public interface InputSplit extends Writable {

  /**
   * Get the total number of bytes in the data of the <code>InputSplit</code>.
   *
   * @return the number of bytes in the input split.
   * @throws IOException
   */
  long getLength() throws IOException;

  /**
   * Get the list of hostnames where the input split is located.
   *
   * @return list of hostnames where data of the <code>InputSplit</code> is
   *         located as an array of <code>String</code>s.
   * @throws IOException
   */
  String[] getLocations() throws IOException;
}

提供了与数据相关的2个方法。后面这个返回的值会被用来传递给RecordReader里面去的。在想理解getSplits方法之前还有一个类需要理解,FileStatus,里面包装了一系列的文件基本信息方法:

public class FileStatus implements Writable, Comparable {

  private Path path;
  private long length;
  private boolean isdir;
  private short block_replication;
  private long blocksize;
  private long modification_time;
  private long access_time;
  private FsPermission permission;
  private String owner;
  private String group;

.....

看到这里你估计会有点晕了,下面是我做的一张小小类图关系:

可以看到,FileSpilt为了兼容新老版本,继承了新的抽象类InputSpilt,同时附上旧的接口形式的InputSpilt。下面我们看看里面的getspilt核心过程:

/** Splits files returned by {@link #listStatus(JobConf)} when
   * they're too big.*/
  @SuppressWarnings("deprecation")
  public InputSplit[] getSplits(JobConf job, int numSplits)
    throws IOException {
	//获取所有的状态文件
    FileStatus[] files = listStatus(job);

    // Save the number of input files in the job-conf
    //在job-cof中保存文件的数量
    job.setLong(NUM_INPUT_FILES, files.length);
    long totalSize = 0;
    // compute total size,计算文件总的大小
    for (FileStatus file: files) {                // check we have valid files
      if (file.isDir()) {
    	  //如果是目录不是纯文件的直接抛异常
        throw new IOException("Not a file: "+ file.getPath());
      }
      totalSize += file.getLen();
    }

    //用户期待的划分大小,总大小除以spilt划分数目
    long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits);
    //获取系统的划分最小值
    long minSize = Math.max(job.getLong("mapred.min.split.size", 1),
                            minSplitSize);

    // generate splits
    //创建numSplits个FileSpilt文件划分量
    ArrayList<FileSplit> splits = new ArrayList<FileSplit>(numSplits);
    NetworkTopology clusterMap = new NetworkTopology();
    for (FileStatus file: files) {
      Path path = file.getPath();
      FileSystem fs = path.getFileSystem(job);
      long length = file.getLen();
      //获取此文件的block的位置列表
      BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length);
      //如果文件系统可划分
      if ((length != 0) && isSplitable(fs, path)) {
    	//计算此文件的总的block块的大小
        long blockSize = file.getBlockSize();
        //根据期待大小,最小大小,得出最终的split分片大小
        long splitSize = computeSplitSize(goalSize, minSize, blockSize);

        long bytesRemaining = length;
        //如果剩余待划分字节倍数为划分大小超过1.1的划分比例,则进行拆分
        while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
          //获取提供数据的splitHost位置
          String[] splitHosts = getSplitHosts(blkLocations,
              length-bytesRemaining, splitSize, clusterMap);
          //添加FileSplit
          splits.add(new FileSplit(path, length-bytesRemaining, splitSize,
              splitHosts));
          //数量减少splitSize大小
          bytesRemaining -= splitSize;
        }

        if (bytesRemaining != 0) {
          //添加刚刚剩下的没划分完的部分,此时bytesRemaining已经小于splitSize的1.1倍了
          splits.add(new FileSplit(path, length-bytesRemaining, bytesRemaining,
                     blkLocations[blkLocations.length-1].getHosts()));
        }
      } else if (length != 0) {
    	//不划分,直接添加Spilt
        String[] splitHosts = getSplitHosts(blkLocations,0,length,clusterMap);
        splits.add(new FileSplit(path, 0, length, splitHosts));
      } else {
        //Create empty hosts array for zero length files
        splits.add(new FileSplit(path, 0, length, new String[0]));
      }
    }

    //最后返回FileSplit数组
    LOG.debug("Total # of splits: " + splits.size());
    return splits.toArray(new FileSplit[splits.size()]);
  }

里面有个computerSpiltSize方法很特殊,考虑了很多情况,总之最小值不能小于系统设定的最小值。要与期待值,块大小,系统允许最小值:

protected long computeSplitSize(long goalSize, long minSize,
                                       long blockSize) {
    return Math.max(minSize, Math.min(goalSize, blockSize));
  }

上述过程的相应流程图如下:

3种情况3中年执行流程。

处理完getSpilt方法然后,也就是说已经把数据从文件中转划到InputSpilt中了,接下来就是给RecordRead去取出里面的一条条的记录了。当然这在FileInputFormat是抽象方法,必须由子类实现的,我在这里挑出了2个典型的子类SequenceFileInputFormat,和TextInputFormat。他们的实现RecordRead方法如下:

public RecordReader<K, V> getRecordReader(InputSplit split,
                                      JobConf job, Reporter reporter)
    throws IOException {

    reporter.setStatus(split.toString());

    return new SequenceFileRecordReader<K, V>(job, (FileSplit) split);
  }
public RecordReader<LongWritable, Text> getRecordReader(
                                          InputSplit genericSplit, JobConf job,
                                          Reporter reporter)
    throws IOException {

    reporter.setStatus(genericSplit.toString());
    return new LineRecordReader(job, (FileSplit) genericSplit);
  }

可以看到里面的区别就在于LineRecordReader和SequenceFileRecordReader的不同了,这也就表明2种方式对应于数据的读取方式可能会不一样,继续往里深入看:

/** An {@link RecordReader} for {@link SequenceFile}s. */
public class SequenceFileRecordReader<K, V> implements RecordReader<K, V> {

  private SequenceFile.Reader in;
  private long start;
  private long end;
  private boolean more = true;
  protected Configuration conf;

  public SequenceFileRecordReader(Configuration conf, FileSplit split)
    throws IOException {
    Path path = split.getPath();
    FileSystem fs = path.getFileSystem(conf);
    //从文件系统中读取数据输入流
    this.in = new SequenceFile.Reader(fs, path, conf);
    this.end = split.getStart() + split.getLength();
    this.conf = conf;

    if (split.getStart() > in.getPosition())
      in.sync(split.getStart());                  // sync to start

    this.start = in.getPosition();
    more = start < end;
  }

  ......

  /**
   * 获取下一个键值对
   */
  public synchronized boolean next(K key, V value) throws IOException {
	//判断还有无下一条记录
    if (!more) return false;
    long pos = in.getPosition();
    boolean remaining = (in.next(key) != null);
    if (remaining) {
      getCurrentValue(value);
    }
    if (pos >= end && in.syncSeen()) {
      more = false;
    } else {
      more = remaining;
    }
    return more;
  }

我们可以看到SequenceFileRecordReader是从输入流in中一个键值,一个键值的读取,另外一个的实现方式如下:

/**
 * Treats keys as offset in file and value as line.
 */
public class LineRecordReader implements RecordReader<LongWritable, Text> {
  private static final Log LOG
    = LogFactory.getLog(LineRecordReader.class.getName());

  private CompressionCodecFactory compressionCodecs = null;
  private long start;
  private long pos;
  private long end;
  private LineReader in;
  int maxLineLength;

  ....

  /** Read a line. */
  public synchronized boolean next(LongWritable key, Text value)
    throws IOException {

    while (pos < end) {
      //设置key
      key.set(pos);

      //根据位置一行一行读取,设置value
      int newSize = in.readLine(value, maxLineLength,
                                Math.max((int)Math.min(Integer.MAX_VALUE, end-pos),
                                         maxLineLength));
      if (newSize == 0) {
        return false;
      }
      pos += newSize;
      if (newSize < maxLineLength) {
        return true;
      }

      // line too long. try again
      LOG.info("Skipped line of size " + newSize + " at pos " + (pos - newSize));
    }

    return false;
  }

实现的方式为通过读的位置,从输入流中逐行读取key-value。通过这2种方法,就能得到新的key-value,就会用于后面的map操作。

InputFormat的整个流程其实我忽略了很多细节。大体流程如上述所说。

时间: 2024-10-07 16:37:21

MapReduce的InputFormat过程的学习的相关文章

MapReduce简介和过程浅析

预备知识:什么是hadoop,HDFS? Hadoop是一个开源框架,它允许在整个集群使用简单编程模型计算机的分布式环境存储并处理大数据.它的目的是从单一的服务器到上千台机器的扩展,每一个台机都可以提供本地计算和存储. HDFS全称为Hadoop Distributed File System(分布式文件系统),可以粗浅得理解成将很大的文件分成固定大小的小片,存储在多个计算机上,更详细的参考:https://www.cnblogs.com/codeOfLife/p/5375120.html 1.

MapReduce的Shuffle过程介绍

MapReduce的Shuffle过程介绍 Shuffle的本义是洗牌.混洗,把一组有一定规则的数据尽量转换成一组无规则的数据,越随机越好.MapReduce中的Shuffle更像是洗牌的逆过程,把一组无规则的数据尽量转换成一组具有一定规则的数据. 为什么MapReduce计算模型需要Shuffle过程?我们都知道MapReduce计算模型一般包括两个重要的阶段:Map是映射,负责数据的过滤分发:Reduce是规约,负责数据的计算归并.Reduce的数据来源于Map,Map的输出即是Reduce

MapReduce之InputFormat和OutFormat

InputFormat中的Splits集合的获取: InputFormat是一个接口,该接口有2个成员函数: InputSplit[] getSplits(JobConf job, int numSplits) throws IOException; RecordReader<K, V> getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws IOException; 而FileInputFormat是继

手动实现一个单词统计MapReduce程序与过程原理分析

[toc] 手动实现一个单词统计MapReduce程序与过程原理分析 前言 我们知道,在搭建好hadoop环境后,可以运行wordcount程序来体验一下hadoop的功能,该程序在hadoop目录下的share/hadoop/mapreduce目录中,通过下面的命令: yarn jar $HADOOP_HOME/share/hadoop/mapreducehadoop-mapreduce-examples-2.6.4.jar wordcount inputPath outPath 即可对输入文

第2节 mapreduce深入学习:7、MapReduce的规约过程combiner

每一个 map 都可能会产生大量的本地输出,Combiner 的作用就是对 map 端的输出先做一次合并,以减少在 map 和 reduce 节点之间的数据传输量,以提高网络IO 性能,是 MapReduce 的一种优化手段之一. ?   combiner 是 MR 程序中 Mapper 和 Reducer 之外的一种组件 ?   combiner 组件的父类就是 Reducer ?   combiner 和 reducer 的区别在于运行的位置: Combiner 是在每一个 maptask

MapReduce的InputFormat学习过程

昨天,经过几个小时的学习.该MapReduce学习的某一位的方法的第一阶段.即当大多数文件的开头的Data至key-value制图.那是,InputFormat的过程.虽说过程不是非常难,可是也存在非常多细节的. 也非常少会有人对此做比較细腻的研究.学习.今天.就让我来为大家剖析一下这段代码的原理. 我还为此花了一点时间做了几张结构图.便于大家理解. 在这里先声明一下.我研究的MapReduce主要研究的是旧版的API,也就是mapred包下的. InputFormat最最原始的形式就是一个接口

MapReduce的shuffle过程

本文是学习时的自我总结,用于日后温习.如有错误还望谅解,不吝赐教. 此处附上一篇个人认为写的比较好的博客,转自枝叶飞扬的博文:http://blog.sina.com.cn/s/blog_605f5b4f010188lp.html### 将Map的输出作为Reduce的输入的过程就是Shuffle了,这个是MapReduce优化的重点地方 Shuffle 过程 ①   Map在内存中开启一个默认大小100MB的环形内存缓冲区用于输出 ②   当缓冲区内存达到默认阈值 80% 时,Map 会启动守

【转】mapreduce的shuffle过程

转自http://langyu.iteye.com/blog/992916 Shuffle过程是MapReduce的核心,也被称为奇迹发生的地方.要想理解MapReduce, Shuffle是必须要了解的.我看过很多相关的资料,但每次看完都云里雾里的绕着,很难理清大致的逻辑,反而越搅越混.前段时间在做MapReduce job 性能调优的工作,需要深入代码研究MapReduce的运行机制,这才对Shuffle探了个究竟.考虑到之前我在看相关资料而看不懂时很恼火,所以在这 里我尽最大的可能试着把S

MapReduce中Shuffle过程整理

MapReduce中的Shuffle过程分为Map端和Reduce端两个过程. Map端: 1.(Hash Partitioner)执行完Map函数后,根据key进行hash,并对该结果进行Reduce的数量取模(该键值对将会由某个reduce端处理)得到一个分区号. 2.(Sort Combiner)将该键值对和分区号序列化之后的字节写入到内存缓存区(大小为100M,装载因子为0.8)中,当内存缓冲区的大小超过100*0.8 = 80M的时候,将会spill(溢出):在溢出之前会在内存缓冲区中