MapReduce的InputFormat学习过程

昨天,经过几个小时的学习。该MapReduce学习的某一位的方法的第一阶段。即当大多数文件的开头的Data至key-value制图。那是,InputFormat的过程。虽说过程不是非常难,可是也存在非常多细节的。

也非常少会有人对此做比較细腻的研究。学习。今天。就让我来为大家剖析一下这段代码的原理。

我还为此花了一点时间做了几张结构图。便于大家理解。

在这里先声明一下。我研究的MapReduce主要研究的是旧版的API,也就是mapred包下的。

InputFormat最最原始的形式就是一个接口。后面出现的各种Format都是他的衍生类。结构例如以下,仅仅包括最重要的2个方法:

public interface InputFormat<K, V> {

  /**
   * Logically split the set of input files for the job.
   *
   * <p>Each {@link InputSplit} is then assigned to an individual {@link Mapper}
   * for processing.</p>
   *
   * <p><i>Note</i>: The split is a <i>logical</i> split of the inputs and the
   * input files are not physically split into chunks. For e.g. a split could
   * be <i><input-file-path, start, offset></i> tuple.
   *
   * @param job job configuration.
   * @param numSplits the desired number of splits, a hint.
   * @return an array of {@link InputSplit}s for the job.
   */
  InputSplit[] getSplits(JobConf job, int numSplits) throws IOException;

  /**
   * Get the {@link RecordReader} for the given {@link InputSplit}.
   *
   * <p>It is the responsibility of the <code>RecordReader</code> to respect
   * record boundaries while processing the logical split to present a
   * record-oriented view to the individual task.</p>
   *
   * @param split the {@link InputSplit}
   * @param job the job that this split belongs to
   * @return a {@link RecordReader}
   */
  RecordReader<K, V> getRecordReader(InputSplit split,
                                     JobConf job,
                                     Reporter reporter) throws IOException;
}

所以后面解说,我也仅仅是会环绕这2个方法进行分析。当然我们用的最多的是从文件里获得输入数据,也就是FileInputFormat这个类。继承关系例如以下:

public abstract class FileInputFormat<K, V> implements InputFormat<K, V>

我们看里面的1个主要方法:

public InputSplit[] getSplits(JobConf job, int numSplits)

返回的类型是一个InputSpilt对象。这是一个抽象的输入Spilt分片概念。结构例如以下:

public interface InputSplit extends Writable {

  /**
   * Get the total number of bytes in the data of the <code>InputSplit</code>.
   *
   * @return the number of bytes in the input split.
   * @throws IOException
   */
  long getLength() throws IOException;

  /**
   * Get the list of hostnames where the input split is located.
   *
   * @return list of hostnames where data of the <code>InputSplit</code> is
   *         located as an array of <code>String</code>s.
   * @throws IOException
   */
  String[] getLocations() throws IOException;
}

提供了与数据相关的2个方法。后面这个返回的值会被用来传递给RecordReader里面去的。在想理解getSplits方法之前另一个类须要理解,FileStatus,里面包装了一系列的文件基本信息方法:

public class FileStatus implements Writable, Comparable {

  private Path path;
  private long length;
  private boolean isdir;
  private short block_replication;
  private long blocksize;
  private long modification_time;
  private long access_time;
  private FsPermission permission;
  private String owner;
  private String group;

.....

看到这里你预计会有点晕了,以下是我做的一张小小类图关系:

能够看到,FileSpilt为了兼容新老版本号,继承了新的抽象类InputSpilt。同一时候附上旧的接口形式的InputSpilt。以下我们看看里面的getspilt核心过程:

/** Splits files returned by {@link #listStatus(JobConf)} when
   * they're too big.*/
  @SuppressWarnings("deprecation")
  public InputSplit[] getSplits(JobConf job, int numSplits)
    throws IOException {
	//获取全部的状态文件
    FileStatus[] files = listStatus(job);

    // Save the number of input files in the job-conf
    //在job-cof中保存文件的数量
    job.setLong(NUM_INPUT_FILES, files.length);
    long totalSize = 0;
    // compute total size,计算文件总的大小
    for (FileStatus file: files) {                // check we have valid files
      if (file.isDir()) {
    	  //假设是文件夹不是纯文件的直接抛异常
        throw new IOException("Not a file: "+ file.getPath());
      }
      totalSize += file.getLen();
    }

    //用户期待的划分大小。总大小除以spilt划分数目
    long goalSize = totalSize / (numSplits == 0 ?

1 : numSplits);
    //获取系统的划分最小值
    long minSize = Math.max(job.getLong("mapred.min.split.size", 1),
                            minSplitSize);

    // generate splits
    //创建numSplits个FileSpilt文件划分量
    ArrayList<FileSplit> splits = new ArrayList<FileSplit>(numSplits);
    NetworkTopology clusterMap = new NetworkTopology();
    for (FileStatus file: files) {
      Path path = file.getPath();
      FileSystem fs = path.getFileSystem(job);
      long length = file.getLen();
      //获取此文件的block的位置列表
      BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length);
      //假设文件系统可划分
      if ((length != 0) && isSplitable(fs, path)) {
    	//计算此文件的总的block块的大小
        long blockSize = file.getBlockSize();
        //依据期待大小。最小大小。得出终于的split分片大小
        long splitSize = computeSplitSize(goalSize, minSize, blockSize);

        long bytesRemaining = length;
        //假设剩余待划分字节倍数为划分大小超过1.1的划分比例,则进行拆分
        while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
          //获取提供数据的splitHost位置
          String[] splitHosts = getSplitHosts(blkLocations,
              length-bytesRemaining, splitSize, clusterMap);
          //加入FileSplit
          splits.add(new FileSplit(path, length-bytesRemaining, splitSize,
              splitHosts));
          //数量降低splitSize大小
          bytesRemaining -= splitSize;
        }

        if (bytesRemaining != 0) {
          //加入刚刚剩下的没划分完的部分。此时bytesRemaining已经小于splitSize的1.1倍了
          splits.add(new FileSplit(path, length-bytesRemaining, bytesRemaining,
                     blkLocations[blkLocations.length-1].getHosts()));
        }
      } else if (length != 0) {
    	//不划分。直接加入Spilt
        String[] splitHosts = getSplitHosts(blkLocations,0,length,clusterMap);
        splits.add(new FileSplit(path, 0, length, splitHosts));
      } else {
        //Create empty hosts array for zero length files
        splits.add(new FileSplit(path, 0, length, new String[0]));
      }
    }

    //最后返回FileSplit数组
    LOG.debug("Total # of splits: " + splits.size());
    return splits.toArray(new FileSplit[splits.size()]);
  }

里面有个computerSpiltSize方法非常特殊,考虑了非常多情况。总之最小值不能小于系统设定的最小值。要与期待值,块大小,系统同意最小值:

protected long computeSplitSize(long goalSize, long minSize,
                                       long blockSize) {
    return Math.max(minSize, Math.min(goalSize, blockSize));
  }

上述过程的对应流程图例如以下:

watermark/2/text/aHR0cDovL2Jsb2cuY3Nkbi5uZXQvQW5kcm9pZGx1c2hhbmdkZXJlbg==/font/5a6L5L2T/fontsize/400/fill/I0JBQkFCMA==/dissolve/70/gravity/Center" >

3种情况3中年运行流程。

处理完getSpilt方法然后,也就是说已经把数据从文件里转划到InputSpilt中了,接下来就是给RecordRead去取出里面的一条条的记录了。当然这在FileInputFormat是抽象方法,必须由子类实现的,我在这里挑出了2个典型的子类SequenceFileInputFormat,和TextInputFormat。

他们的实现RecordRead方法例如以下:

public RecordReader<K, V> getRecordReader(InputSplit split,
                                      JobConf job, Reporter reporter)
    throws IOException {

    reporter.setStatus(split.toString());

    return new SequenceFileRecordReader<K, V>(job, (FileSplit) split);
  }
public RecordReader<LongWritable, Text> getRecordReader(
                                          InputSplit genericSplit, JobConf job,
                                          Reporter reporter)
    throws IOException {

    reporter.setStatus(genericSplit.toString());
    return new LineRecordReader(job, (FileSplit) genericSplit);
  }

能够看到里面的差别就在于LineRecordReader和SequenceFileRecordReader的不同了,这也就表明2种方式相应于数据的读取方式可能会不一样。继续往里深入看:

/** An {@link RecordReader} for {@link SequenceFile}s. */
public class SequenceFileRecordReader<K, V> implements RecordReader<K, V> {

  private SequenceFile.Reader in;
  private long start;
  private long end;
  private boolean more = true;
  protected Configuration conf;

  public SequenceFileRecordReader(Configuration conf, FileSplit split)
    throws IOException {
    Path path = split.getPath();
    FileSystem fs = path.getFileSystem(conf);
    //从文件系统中读取数据输入流
    this.in = new SequenceFile.Reader(fs, path, conf);
    this.end = split.getStart() + split.getLength();
    this.conf = conf;

    if (split.getStart() > in.getPosition())
      in.sync(split.getStart());                  // sync to start

    this.start = in.getPosition();
    more = start < end;
  }

  ......

  /**
   * 获取下一个键值对
   */
  public synchronized boolean next(K key, V value) throws IOException {
	//推断还有无下一条记录
    if (!more) return false;
    long pos = in.getPosition();
    boolean remaining = (in.next(key) != null);
    if (remaining) {
      getCurrentValue(value);
    }
    if (pos >= end && in.syncSeen()) {
      more = false;
    } else {
      more = remaining;
    }
    return more;
  }

我们能够看到SequenceFileRecordReader是从输入流in中一个键值。一个键值的读取,另外一个的实现方式例如以下:

/**
 * Treats keys as offset in file and value as line.
 */
public class LineRecordReader implements RecordReader<LongWritable, Text> {
  private static final Log LOG
    = LogFactory.getLog(LineRecordReader.class.getName());

  private CompressionCodecFactory compressionCodecs = null;
  private long start;
  private long pos;
  private long end;
  private LineReader in;
  int maxLineLength;

  ....

  /** Read a line. */
  public synchronized boolean next(LongWritable key, Text value)
    throws IOException {

    while (pos < end) {
      //设置key
      key.set(pos);

      //依据位置一行一行读取,设置value
      int newSize = in.readLine(value, maxLineLength,
                                Math.max((int)Math.min(Integer.MAX_VALUE, end-pos),
                                         maxLineLength));
      if (newSize == 0) {
        return false;
      }
      pos += newSize;
      if (newSize < maxLineLength) {
        return true;
      }

      // line too long. try again
      LOG.info("Skipped line of size " + newSize + " at pos " + (pos - newSize));
    }

    return false;
  }

实现的方式为通过读的位置,从输入流中逐行读取key-value。

通过这2种方法,就能得到新的key-value。就会用于后面的map操作。

InputFormat的我忽略了一个事实,整个过程非常详细。通常该过程如上所述。

版权声明:本文博主原创文章。博客,未经同意不得转载。

时间: 2024-10-13 01:15:12

MapReduce的InputFormat学习过程的相关文章

MapReduce之InputFormat和OutFormat

InputFormat中的Splits集合的获取: InputFormat是一个接口,该接口有2个成员函数: InputSplit[] getSplits(JobConf job, int numSplits) throws IOException; RecordReader<K, V> getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws IOException; 而FileInputFormat是继

MapReduce的InputFormat过程的学习

昨天经过几个小时的学习,把MapReduce的第一个阶段的过程学习了一下,也就是最最开始的时候从文件中的Data到key-value的映射,也就是InputFormat的过程.虽说过程不是很难,但是也存在很多细节的.也很少会有人对此做比较细腻的研究,学习.今天,就让我来为大家剖析一下这段代码的原理.我还为此花了一点时间做了几张结构图,便于大家理解.在这里先声明一下,我研究的MapReduce主要研究的是旧版的API,也就是mapred包下的. InputFormat最最原始的形式就是一个接口.后

hadoop mapreduce 自定义InputFormat

很久以前为了满足公司的需求写过一些自定义InputFormat,今天有时间拿出来记一下     需求是这样的,如果如果使用FileInputFormat作为输入,是按照行来读取日志的,也就是按照\n来区分每一条日志的,而由于一条日志中记录着一些错误信息,比如java的异常信息,这些信息本身就带有换行符,如果还是按照\n进行区分每一条日志的话明显是错误的,由于我们日志的特殊性,将以"]@\n"作为区分日志的标识.     接下来就来看看如何自定义InputFormat,还是不画类图了,我

MapReduce实现WordCount

MapReduce采用的是“分而治之”的思想,把对大规模数据集的操作,分发给一个主节点管理下的各个从节点共同完成,然后通过整合各个节点的中间结果,得到最终结果.简单来说,MapReduce就是”任务的分解与结果的汇总“. MapReduce的工作原理 在分布式计算中,MapReduce框架负责处理了并行编程里分布式存储.工作调度,负载均衡.容错处理以及网络通信等复杂问题,现在我们把处理过程高度抽象为Map与Reduce两个部分来进行阐述,其中Map部分负责把任务分解成多个子任务,Reduce部分

Hadoop实例之利用MapReduce实现Wordcount单词统计 (附源代码)

大致思路是将hdfs上的文本作为输入,MapReduce通过InputFormat会将文本进行切片处理,并将每行的首字母相对于文本文件的首地址的偏移量作为输入键值对的key,文本内容作为输入键值对的value,经过在map函数处理,输出中间结果<word,1>的形式,并在reduce函数中完成对每个单词的词频统计.整个程序代码主要包括两部分:Mapper部分和Reducer部分. Mapper代码 public static class doMapper extends Mapper<O

一、MapReduce基本原理

一.MapReduce基本概述 1.定义 是一个分布式运算程序编程框架.核心功能是将用户编写的业务逻辑代码和自带的默认组件整合成一个完整的分布式程序,并发运行在一个hadoop集群上. 2.优缺点 (1)优点1>易于编程:以普通程序的编程方法加上使用MapReduce提供的接口,可以快速完成分布式程序的编写.2>良好的扩展性:计算资源得不到满足时,可以通过简单的增加计算机器来扩展计算能力3>高容错性:如果一个任务所在计算节点挂了,上面的计算任务可以自动转移到另外的节点上执行,即故障自动转

Hadoop大实验——MapReduce的操作

日期:2019.10.30 博客期:114 星期三 实验6:Mapreduce实例——WordCount   实验说明: 1.          本次实验是第六次上机,属于验证性实验.实验报告上交截止日期为2018年11月16日上午12点之前. 2.          实验报告命名为:信1605-1班学号姓名实验六.doc. 实验目的 1.准确理解Mapreduce的设计原理 2.熟练掌握WordCount程序代码编写 3.学会自己编写WordCount程序进行词频统计 实验原理 MapRedu

实验6:Mapreduce实例——WordCount

实验目的 1.准确理解Mapreduce的设计原理 2.熟练掌握WordCount程序代码编写 3.学会自己编写WordCount程序进行词频统计 实验原理 MapReduce采用的是“分而治之”的思想,把对大规模数据集的操作,分发给一个主节点管理下的各个从节点共同完成,然后通过整合各个节点的中间结果,得到最终结果.简单来说,MapReduce就是”任务的分解与结果的汇总“. 1.MapReduce的工作原理 在分布式计算中,MapReduce框架负责处理了并行编程里分布式存储.工作调度,负载均

一文读懂MapReduce 附流量解析实例

1.MapReduce是什么 Hadoop MapReduce是一个软件框架,基于该框架能够容易地编写应用程序,这些应用程序能够运行在由上千个商用机器组成的大集群上,并以一种可靠的,具有容错能力的方式并行地处理上TB级别的海量数据集.这个定义里面有着这些关键词, 一是软件框架,二是并行处理,三是可靠且容错,四是大规模集群,五是海量数据集. 2 MapReduce做什么 MapReduce擅长处理大数据,它为什么具有这种能力呢?这可由MapReduce的设计思想发觉.MapReduce的思想就是“