Hadoop CombineFileInputFormat原理及源码分析

引言

引用《Hadoop权威指南》原文如下:

Hadoop works better with a small number of large files than a large number of small files. One reason for this is that FileInputFormat generates splits in such a way that each split is all or part of a single file. If the file is very small (“small” means significantly smaller than an HDFS block) and there are a lot of them, each map task will process very little input, and there will be a lot of them (one per file), each of which imposes extra bookkeeping overhead. Compare a 1 GB file broken into sixteen 64 MB blocks and 10,000 or so 100 KB files. The 10,000 files use one map each, and the job time can be tens or hundreds of times slower than the equivalent one with a single input file and 16 map tasks.

The situation is alleviated somewhat by CombineFileInputFormat, which was designed to work well with small files. Where FileInputFormat creates a split per file, CombineFileInputFormat packs many files into each split so that each mapper has more to process. Crucially, CombineFileInputFormat takes node and rack locality into account when deciding which blocks to place in the same split, so it does not compromise the speed at which it can process the input in a typical MapReduce job.

可以看出,执行MR任务时如果“小文件”太多,对于Hadoop是很不友好的,而CombineFileInputFormat就是用来缓解这个问题的。CombineFileInputFormat通过将多个“小文件”合并为一个“切片”(再形成切片的过程中也考虑同一节点、同一机架的数据本地性),让每一个Mapper任务可以处理更多的数据,从而提高MR任务的执行速度。

思路

 

CombineFileInputFormat涉及到三个重要的属性:

mapred.max.split.size:同一节点或同一机架的数据块形成切片时,切片大小的最大值;

mapred.min.split.size.per.node:同一节点的数据块形成切片时,切片大小的最小值;

mapred.min.split.size.per.rack:同一机架的数据块形成切片时,切片大小的最小值。

切片形成过程:

(1)逐个节点(数据块)形成切片;

a.遍历并累加这个节点上的数据块,如果累加数据块大小大于或等于mapred.max.split.size,则将这些数据块形成一个切片,继承该过程,直到剩余数据块累加大小小于mapred.max.split.size,则进行下一步;

b.如果剩余数据块累加大小大于或等于mapred.min.split.size.per.node,则将这些剩余数据块形成一个切片,如果剩余数据块累加大小小于mapred.min.split.size.per.node,则这些数据块留待后续处理。

(2)逐个机架(数据块)形成切片;

a.遍历并累加这个机架上的数据块(这些数据块即为上一步遗留下来的数据块),如果累加数据块大小大于或等于mapred.max.split.size,则将这些数据块形成一个切片,继承该过程,直到剩余数据块累加大小小于mapred.max.split.size,则进行下一步;

b.如果剩余数据块累加大小大于或等于mapred.min.split.size.per.rack,则将这些剩余数据块形成一个切片,如果剩余数据块累加大小小于mapred.min.split.size.per.rack,则这些数据块留待后续处理。

(3)遍历并累加剩余数据块,如果数据块大小大于或等于mapred.max.split.size,则将这些数据块形成一个切片,继承该过程,直到剩余数据块累加大小小于mapred.max.split.size,则进行下一步;

(4)剩余数据块形成一个切片。

核心实现

 

// mapping from a rack name to the list of blocks it has
HashMap<String, List<OneBlockInfo>> rackToBlocks =
                          new HashMap<String, List<OneBlockInfo>>();

// mapping from a block to the nodes on which it has replicas
HashMap<OneBlockInfo, String[]> blockToNodes =
                          new HashMap<OneBlockInfo, String[]>();

// mapping from a node to the list of blocks that it contains
HashMap<String, List<OneBlockInfo>> nodeToBlocks =
                          new HashMap<String, List<OneBlockInfo>>();

开始形成切片之前,需要初始化三个重要的映射关系:

rackToBlocks:机架和数据块的对应关系,即某一个机架上有哪些数据块;

blockToNodes:数据块与节点的对应关系,即一块数据块的“拷贝”位于哪些节点;

nodeToBlocks:节点和数据块的对应关系,即某一个节点上有哪些数据块;

初始化过程如下代码所示,其中每一个Path代表的文件被形成一个OneFileInfo对象,映射关系也在形成OneFileInfo的过程中被维护。

// populate all the blocks for all files
long totLength = 0;
for (int i = 0; i < paths.length; i++) {
  files[i] = new OneFileInfo(paths[i], job,
                             rackToBlocks, blockToNodes, nodeToBlocks, rackToNodes);
  totLength += files[i].getLength();
}

(1)逐个节点(数据块)形成切片,代码如下:

    // 保存当前切片所包含的数据块
    ArrayList<OneBlockInfo> validBlocks = new ArrayList<OneBlockInfo>();
    // 保存当前切片中的数据块属于哪些节点
    ArrayList<String> nodes = new ArrayList<String>();
    // 保存当前切片的大小
    long curSplitSize = 0;

    // process all nodes and create splits that are local to a node.
    // 依次处理每个节点上的数据块
    for (Iterator<Map.Entry<String, List<OneBlockInfo>>> iter = nodeToBlocks.entrySet().iterator(); iter.hasNext();) {
      Map.Entry<String, List<OneBlockInfo>> one = iter.next();
      nodes.add(one.getKey());
      List<OneBlockInfo> blocksInNode = one.getValue();

      // for each block, copy it into validBlocks. Delete it from blockToNodes so that the same block does not appear in
      // two different splits.
      // 依次处理每个数据块,注意blockToNodes变量的作用,它保证了同一数据块不会出现在两个切片中
      for (OneBlockInfo oneblock : blocksInNode) {
        if (blockToNodes.containsKey(oneblock)) {
          validBlocks.add(oneblock);
          blockToNodes.remove(oneblock);
          curSplitSize += oneblock.length;

          // if the accumulated split size exceeds the maximum, then create this split.
          // 如果数据块累积大小大于或等于maxSize,则形成一个切片
          if (maxSize != 0 && curSplitSize >= maxSize) {
            // create an input split and add it to the splits array
            addCreatedSplit(job, splits, nodes, validBlocks);
            curSplitSize = 0;
            validBlocks.clear();
          }
        }
      }
      // if there were any blocks left over and their combined size is
      // larger than minSplitNode, then combine them into one split.
      // Otherwise add them back to the unprocessed pool. It is likely
      // that they will be combined with other blocks from the same rack later on.
      // 如果剩余数据块大小大于或等于minSizeNode,则将这些数据块构成一个切片;
      // 如果剩余数据块大小小于minSizeNode,则将这些数据块归还给blockToNodes,交由后期“同一机架”过程处理
      if (minSizeNode != 0 && curSplitSize >= minSizeNode) {
        // create an input split and add it to the splits array
        addCreatedSplit(job, splits, nodes, validBlocks);
      } else {
        for (OneBlockInfo oneblock : validBlocks) {
          blockToNodes.put(oneblock, oneblock.hosts);
        }
      }
      validBlocks.clear();
      nodes.clear();
      curSplitSize = 0;
    }

(2)逐个机架(数据块)形成切片,代码如下:

  // if blocks in a rack are below the specified minimum size, then keep them
    // in ‘overflow‘. After the processing of all racks is complete, these overflow
    // blocks will be combined into splits.
    // overflowBlocks用于保存“同一机架”过程处理之后剩余的数据块
    ArrayList<OneBlockInfo> overflowBlocks = new ArrayList<OneBlockInfo>();
    ArrayList<String> racks = new ArrayList<String>();

    // Process all racks over and over again until there is no more work to do.
    while (blockToNodes.size() > 0) {
      // Create one split for this rack before moving over to the next rack.
      // Come back to this rack after creating a single split for each of the
      // remaining racks.
      // Process one rack location at a time, Combine all possible blocks that
      // reside on this rack as one split. (constrained by minimum and maximum
      // split size).

      // iterate over all racks
      // 依次处理每个机架
      for (Iterator<Map.Entry<String, List<OneBlockInfo>>> iter =
           rackToBlocks.entrySet().iterator(); iter.hasNext();) {
        Map.Entry<String, List<OneBlockInfo>> one = iter.next();
        racks.add(one.getKey());
        List<OneBlockInfo> blocks = one.getValue();

        // for each block, copy it into validBlocks. Delete it from
        // blockToNodes so that the same block does not appear in
        // two different splits.
        boolean createdSplit = false;
        // 依次处理该机架的每个数据块
        for (OneBlockInfo oneblock : blocks) {
          if (blockToNodes.containsKey(oneblock)) {
            validBlocks.add(oneblock);
            blockToNodes.remove(oneblock);
            curSplitSize += oneblock.length;

            // if the accumulated split size exceeds the maximum, then create this split.
            // 如果数据块累积大小大于或等于maxSize,则形成一个切片
            if (maxSize != 0 && curSplitSize >= maxSize) {
              // create an input split and add it to the splits array
              addCreatedSplit(job, splits, getHosts(racks), validBlocks);
              createdSplit = true;
              break;
            }
          }
        }

        // if we created a split, then just go to the next rack
        if (createdSplit) {
          curSplitSize = 0;
          validBlocks.clear();
          racks.clear();
          continue;
        }

        if (!validBlocks.isEmpty()) {
          // 如果剩余数据块大小大于或等于minSizeRack,则将这些数据块构成一个切片
          if (minSizeRack != 0 && curSplitSize >= minSizeRack) {
            // if there is a mimimum size specified, then create a single split
            // otherwise, store these blocks into overflow data structure
            addCreatedSplit(job, splits, getHosts(racks), validBlocks);
          } else {
            // There were a few blocks in this rack that remained to be processed.
            // Keep them in ‘overflow‘ block list. These will be combined later.
            // 如果剩余数据块大小小于minSizeRack,则将这些数据块加入overflowBlocks
            overflowBlocks.addAll(validBlocks);
          }
        }
        curSplitSize = 0;
        validBlocks.clear();
        racks.clear();
      }
    }

(3)遍历并累加剩余数据块,代码如下:

    // Process all overflow blocks
    for (OneBlockInfo oneblock : overflowBlocks) {
      validBlocks.add(oneblock);
      curSplitSize += oneblock.length;

      // This might cause an exiting rack location to be re-added,
      // but it should be ok.
      for (int i = 0; i < oneblock.racks.length; i++) {
        racks.add(oneblock.racks[i]);
      }

      // if the accumulated split size exceeds the maximum, then
      // create this split.
      // 如果剩余数据块大小大于或等于maxSize,则将这些数据块构成一个切片
      if (maxSize != 0 && curSplitSize >= maxSize) {
        // create an input split and add it to the splits array
        addCreatedSplit(job, splits, getHosts(racks), validBlocks);
        curSplitSize = 0;
        validBlocks.clear();
        racks.clear();
      }
    }

(4)剩余数据块形成一个切片,代码如下:

    // Process any remaining blocks, if any.
    if (!validBlocks.isEmpty()) {
      addCreatedSplit(job, splits, getHosts(racks), validBlocks);
    }

总结

CombineFileInputFormat形成切片过程中考虑数据本地性(同一节点、同一机架),首先处理同一节点的数据块,然后处理同一机架的数据块,最后处理剩余的数据块,可见本地性是逐步减弱的。另外CombineFileInputFormat是抽象的,具体使用时需要自己实现getRecordReader方法。

时间: 2024-10-10 17:35:54

Hadoop CombineFileInputFormat原理及源码分析的相关文章

【Spring】Spring&amp;WEB整合原理及源码分析

表现层和业务层整合: 1. Jsp/Servlet整合Spring: 2. Spring MVC整合SPring: 3. Struts2整合Spring: 本文主要介绍Jsp/Servlet整合Spring原理及源码分析. 一.整合过程 Spring&WEB整合,主要介绍的是Jsp/Servlet容器和Spring整合的过程,当然,这个过程是Spring MVC或Strugs2整合Spring的基础. Spring和Jsp/Servlet整合操作很简单,使用也很简单,按部就班花不到2分钟就搞定了

ConcurrentHashMap实现原理及源码分析

ConcurrentHashMap实现原理 ConcurrentHashMap源码分析 总结 ConcurrentHashMap是Java并发包中提供的一个线程安全且高效的HashMap实现(若对HashMap的实现原理还不甚了解,可参考我的另一篇文章HashMap实现原理及源码分析),ConcurrentHashMap在并发编程的场景中使用频率非常之高,本文就来分析下ConcurrentHashMap的实现原理,并对其实现原理进行分析(JDK1.7). ConcurrentHashMap实现原

OpenCV学习笔记(27)KAZE 算法原理与源码分析(一)非线性扩散滤波

http://blog.csdn.net/chenyusiyuan/article/details/8710462 OpenCV学习笔记(27)KAZE 算法原理与源码分析(一)非线性扩散滤波 2013-03-23 17:44 16963人阅读 评论(28) 收藏 举报 分类: 机器视觉(34) 版权声明:本文为博主原创文章,未经博主允许不得转载. 目录(?)[+] KAZE系列笔记: OpenCV学习笔记(27)KAZE 算法原理与源码分析(一)非线性扩散滤波 OpenCV学习笔记(28)KA

caffe中HingeLossLayer层原理以及源码分析

输入: bottom[0]: NxKx1x1维,N为样本个数,K为类别数.是预测值. bottom[1]: Nx1x1x1维, N为样本个数,类别为K时,每个元素的取值范围为[0,1,2,-,K-1].是groundTruth. 输出: top[0]: 1x1x1x1维, 求得是hingeLoss. 关于HingeLoss: p: 范数,默认是L1范数,可以在配置中设置为L1或者L2范数. :指示函数,如果第n个样本的真实label为k,则为,否则为-1. tnk: bottom[0]中第n个样

【Spring】Spring&amp;WEB整合原理及源码分析(二)

一.整合过程 Spring&WEB整合,主要介绍的是Jsp/Servlet容器和Spring整合的过程,当然,这个过程是Spring MVC或Strugs2整合Spring的基础. Spring和Jsp/Servlet整合操作很简单,使用也很简单,按部就班花不到2分钟就搞定了,本节只讲操作不讲原理,更多细节.原理及源码分析后续过程陆续涉及. 1. 导入必须的jar包,本例spring-web-x.x.x.RELEASE.jar: 2. 配置web.xml,本例示例如下: <?xml vers

深度理解Android InstantRun原理以及源码分析

深度理解Android InstantRun原理以及源码分析 @Author 莫川 Instant Run官方介绍 简单介绍一下Instant Run,它是Android Studio2.0以后新增的一个运行机制,能够显著减少你第二次及以后的构建和部署时间.简单通俗的解释就是,当你在Android Studio中改了你的代码,Instant Run可以很快的让你看到你修改的效果.而在没有Instant Run之前,你的一个小小的修改,都肯能需要几十秒甚至更长的等待才能看到修改后的效果. 传统的代

【OpenCV】SIFT原理与源码分析:关键点描述

<SIFT原理与源码分析>系列文章索引:http://www.cnblogs.com/tianyalu/p/5467813.html 由前一篇<方向赋值>,为找到的关键点即SIFT特征点赋了值,包含位置.尺度和方向的信息.接下来的步骤是关键点描述,即用用一组向量将这个关键点描述出来,这个描述子不但包括关键点,也包括关键点周围对其有贡献的像素点.用来作为目标匹配的依据(所以描述子应该有较高的独特性,以保证匹配率),也可使关键点具有更多的不变特性,如光照变化.3D视点变化等. SIFT

【OpenCV】SIFT原理与源码分析:方向赋值

<SIFT原理与源码分析>系列文章索引:http://www.cnblogs.com/tianyalu/p/5467813.html 由前一篇<关键点搜索与定位>,我们已经找到了关键点.为了实现图像旋转不变性,需要根据检测到的关键点局部图像结构为特征点方向赋值.也就是在findScaleSpaceExtrema()函数里看到的alcOrientationHist()语句: // 计算梯度直方图 float omax = calcOrientationHist(gauss_pyr[o

【OpenCV】SIFT原理与源码分析:关键点搜索与定位

<SIFT原理与源码分析>系列文章索引:http://www.cnblogs.com/tianyalu/p/5467813.html 由前一步<DoG尺度空间构造>,我们得到了DoG高斯差分金字塔: 如上图的金字塔,高斯尺度空间金字塔中每组有五层不同尺度图像,相邻两层相减得到四层DoG结果.关键点搜索就在这四层DoG图像上寻找局部极值点. DoG局部极值点 寻找DoG极值点时,每一个像素点和它所有的相邻点比较,当其大于(或小于)它的图像域和尺度域的所有相邻点时,即为极值点.如下图所