


public interface InputFormat<K, V> {

   * Logically split the set of input files for the job.
   * <p>Each {@link InputSplit} is then assigned to an individual {@link Mapper}
   * for processing.</p>
   * <p><i>Note</i>: The split is a <i>logical</i> split of the inputs and the
   * input files are not physically split into chunks. For e.g. a split could
   * be <i><input-file-path, start, offset></i> tuple.
   * @param job job configuration.
   * @param numSplits the desired number of splits, a hint.
   * @return an array of {@link InputSplit}s for the job.
  InputSplit[] getSplits(JobConf job, int numSplits) throws IOException;

   * Get the {@link RecordReader} for the given {@link InputSplit}.
   * <p>It is the responsibility of the <code>RecordReader</code> to respect
   * record boundaries while processing the logical split to present a
   * record-oriented view to the individual task.</p>
   * @param split the {@link InputSplit}
   * @param job the job that this split belongs to
   * @return a {@link RecordReader}
  RecordReader<K, V> getRecordReader(InputSplit split,
                                     JobConf job,
                                     Reporter reporter) throws IOException;


public abstract class FileInputFormat<K, V> implements InputFormat<K, V>


public InputSplit[] getSplits(JobConf job, int numSplits)


public interface InputSplit extends Writable {

   * Get the total number of bytes in the data of the <code>InputSplit</code>.
   * @return the number of bytes in the input split.
   * @throws IOException
  long getLength() throws IOException;

   * Get the list of hostnames where the input split is located.
   * @return list of hostnames where data of the <code>InputSplit</code> is
   *         located as an array of <code>String</code>s.
   * @throws IOException
  String[] getLocations() throws IOException;


public class FileStatus implements Writable, Comparable {

  private Path path;
  private long length;
  private boolean isdir;
  private short block_replication;
  private long blocksize;
  private long modification_time;
  private long access_time;
  private FsPermission permission;
  private String owner;
  private String group;




/** Splits files returned by {@link #listStatus(JobConf)} when
   * they're too big.*/
  public InputSplit[] getSplits(JobConf job, int numSplits)
    throws IOException {
    FileStatus[] files = listStatus(job);

    // Save the number of input files in the job-conf
    job.setLong(NUM_INPUT_FILES, files.length);
    long totalSize = 0;
    // compute total size,计算文件总的大小
    for (FileStatus file: files) {                // check we have valid files
      if (file.isDir()) {
        throw new IOException("Not a file: "+ file.getPath());
      totalSize += file.getLen();

    long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits);
    long minSize = Math.max(job.getLong("mapred.min.split.size", 1),

    // generate splits
    ArrayList<FileSplit> splits = new ArrayList<FileSplit>(numSplits);
    NetworkTopology clusterMap = new NetworkTopology();
    for (FileStatus file: files) {
      Path path = file.getPath();
      FileSystem fs = path.getFileSystem(job);
      long length = file.getLen();
      BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length);
      if ((length != 0) && isSplitable(fs, path)) {
        long blockSize = file.getBlockSize();
        long splitSize = computeSplitSize(goalSize, minSize, blockSize);

        long bytesRemaining = length;
        while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
          String[] splitHosts = getSplitHosts(blkLocations,
              length-bytesRemaining, splitSize, clusterMap);
          splits.add(new FileSplit(path, length-bytesRemaining, splitSize,
          bytesRemaining -= splitSize;

        if (bytesRemaining != 0) {
          splits.add(new FileSplit(path, length-bytesRemaining, bytesRemaining,
      } else if (length != 0) {
        String[] splitHosts = getSplitHosts(blkLocations,0,length,clusterMap);
        splits.add(new FileSplit(path, 0, length, splitHosts));
      } else {
        //Create empty hosts array for zero length files
        splits.add(new FileSplit(path, 0, length, new String[0]));

    LOG.debug("Total # of splits: " + splits.size());
    return splits.toArray(new FileSplit[splits.size()]);


protected long computeSplitSize(long goalSize, long minSize,
                                       long blockSize) {
    return Math.max(minSize, Math.min(goalSize, blockSize));




public RecordReader<K, V> getRecordReader(InputSplit split,
                                      JobConf job, Reporter reporter)
    throws IOException {


    return new SequenceFileRecordReader<K, V>(job, (FileSplit) split);
public RecordReader<LongWritable, Text> getRecordReader(
                                          InputSplit genericSplit, JobConf job,
                                          Reporter reporter)
    throws IOException {

    return new LineRecordReader(job, (FileSplit) genericSplit);


/** An {@link RecordReader} for {@link SequenceFile}s. */
public class SequenceFileRecordReader<K, V> implements RecordReader<K, V> {

  private SequenceFile.Reader in;
  private long start;
  private long end;
  private boolean more = true;
  protected Configuration conf;

  public SequenceFileRecordReader(Configuration conf, FileSplit split)
    throws IOException {
    Path path = split.getPath();
    FileSystem fs = path.getFileSystem(conf);
    //从文件系统中读取数据输入流 = new SequenceFile.Reader(fs, path, conf);
    this.end = split.getStart() + split.getLength();
    this.conf = conf;

    if (split.getStart() > in.getPosition())
      in.sync(split.getStart());                  // sync to start

    this.start = in.getPosition();
    more = start < end;


   * 获取下一个键值对
  public synchronized boolean next(K key, V value) throws IOException {
    if (!more) return false;
    long pos = in.getPosition();
    boolean remaining = ( != null);
    if (remaining) {
    if (pos >= end && in.syncSeen()) {
      more = false;
    } else {
      more = remaining;
    return more;


 * Treats keys as offset in file and value as line.
public class LineRecordReader implements RecordReader<LongWritable, Text> {
  private static final Log LOG
    = LogFactory.getLog(LineRecordReader.class.getName());

  private CompressionCodecFactory compressionCodecs = null;
  private long start;
  private long pos;
  private long end;
  private LineReader in;
  int maxLineLength;


  /** Read a line. */
  public synchronized boolean next(LongWritable key, Text value)
    throws IOException {

    while (pos < end) {

      int newSize = in.readLine(value, maxLineLength,
                                Math.max((int)Math.min(Integer.MAX_VALUE, end-pos),
      if (newSize == 0) {
        return false;
      pos += newSize;
      if (newSize < maxLineLength) {
        return true;

      // line too long. try again"Skipped line of size " + newSize + " at pos " + (pos - newSize));

    return false;



预备知识:什么是hadoop,HDFS? Hadoop是一个开源框架,它允许在整个集群使用简单编程模型计算机的分布式环境存储并处理大数据.它的目的是从单一的服务器到上千台机器的扩展,每一个台机都可以提供本地计算和存储. HDFS全称为Hadoop Distributed File System(分布式文件系统),可以粗浅得理解成将很大的文件分成固定大小的小片,存储在多个计算机上,更详细的参考: 1.


MapReduce的Shuffle过程介绍 Shuffle的本义是洗牌.混洗,把一组有一定规则的数据尽量转换成一组无规则的数据,越随机越好.MapReduce中的Shuffle更像是洗牌的逆过程,把一组无规则的数据尽量转换成一组具有一定规则的数据. 为什么MapReduce计算模型需要Shuffle过程?我们都知道MapReduce计算模型一般包括两个重要的阶段:Map是映射,负责数据的过滤分发:Reduce是规约,负责数据的计算归并.Reduce的数据来源于Map,Map的输出即是Reduce


InputFormat中的Splits集合的获取: InputFormat是一个接口,该接口有2个成员函数: InputSplit[] getSplits(JobConf job, int numSplits) throws IOException; RecordReader<K, V> getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws IOException; 而FileInputFormat是继


[toc] 手动实现一个单词统计MapReduce程序与过程原理分析 前言 我们知道,在搭建好hadoop环境后,可以运行wordcount程序来体验一下hadoop的功能,该程序在hadoop目录下的share/hadoop/mapreduce目录中,通过下面的命令: yarn jar $HADOOP_HOME/share/hadoop/mapreducehadoop-mapreduce-examples-2.6.4.jar wordcount inputPath outPath 即可对输入文

第2节 mapreduce深入学习:7、MapReduce的规约过程combiner

每一个 map 都可能会产生大量的本地输出,Combiner 的作用就是对 map 端的输出先做一次合并,以减少在 map 和 reduce 节点之间的数据传输量,以提高网络IO 性能,是 MapReduce 的一种优化手段之一. ?   combiner 是 MR 程序中 Mapper 和 Reducer 之外的一种组件 ?   combiner 组件的父类就是 Reducer ?   combiner 和 reducer 的区别在于运行的位置: Combiner 是在每一个 maptask


昨天,经过几个小时的学习.该MapReduce学习的某一位的方法的第一阶段.即当大多数文件的开头的Data至key-value制图.那是,InputFormat的过程.虽说过程不是非常难,可是也存在非常多细节的. 也非常少会有人对此做比較细腻的研究.学习.今天.就让我来为大家剖析一下这段代码的原理. 我还为此花了一点时间做了几张结构图.便于大家理解. 在这里先声明一下.我研究的MapReduce主要研究的是旧版的API,也就是mapred包下的. InputFormat最最原始的形式就是一个接口


本文是学习时的自我总结,用于日后温习.如有错误还望谅解,不吝赐教. 此处附上一篇个人认为写的比较好的博客,转自枝叶飞扬的博文: 将Map的输出作为Reduce的输入的过程就是Shuffle了,这个是MapReduce优化的重点地方 Shuffle 过程 ①   Map在内存中开启一个默认大小100MB的环形内存缓冲区用于输出 ②   当缓冲区内存达到默认阈值 80% 时,Map 会启动守


转自 Shuffle过程是MapReduce的核心,也被称为奇迹发生的地方.要想理解MapReduce, Shuffle是必须要了解的.我看过很多相关的资料,但每次看完都云里雾里的绕着,很难理清大致的逻辑,反而越搅越混.前段时间在做MapReduce job 性能调优的工作,需要深入代码研究MapReduce的运行机制,这才对Shuffle探了个究竟.考虑到之前我在看相关资料而看不懂时很恼火,所以在这 里我尽最大的可能试着把S


MapReduce中的Shuffle过程分为Map端和Reduce端两个过程. Map端: 1.(Hash Partitioner)执行完Map函数后,根据key进行hash,并对该结果进行Reduce的数量取模(该键值对将会由某个reduce端处理)得到一个分区号. 2.(Sort Combiner)将该键值对和分区号序列化之后的字节写入到内存缓存区(大小为100M,装载因子为0.8)中,当内存缓冲区的大小超过100*0.8 = 80M的时候,将会spill(溢出):在溢出之前会在内存缓冲区中