MapReduce获取分片数目

问题

MapReduce Application中mapper的数目和分片的数目是一样的,但是分片数目和什么有关呢?

  1. 默认情况下,分片和输入文件的分块数是相等的。也不完全相等,如果block size大小事128M,文件大小为128.1M,文件的block数目为2,但是application运行过程中,你会发现分片数目是1,而不是2,其中的机理,后面会分析
  2. 有的程序会设置map的数目,那么map数目是怎样影响分片的数目的呢?
  3. 如果文件大小为0,是否会作为一个分片传给map任务?

流程

FileInputFormat.getSplits返回文件的分片数目,这部分将介绍其运行流程,后面将粘贴其源码并给出注释

  1. 通过listStatus()获取输入文件列表files,其中会遍历输入目录的子目录,并过滤掉部分文件,如文件_SUCCESS
  2. 获取所有的文件大小totalSIze
  3. goalSIze=totalSize/numMaps。numMaps是用户指定的map数目
  4. files中取出一个文件file
  5. 计算splitSize。splitSize=max(minSplitSize,min(file.blockSize,goalSize)),其中minSplitSize是允许的最小分片大小,默认为1B
  6. 后面根据splitSize大小将file分片。在分片的时候,如果剩余的大小不大于splitSize*1.1,且大于0B的时候,会将该区域整个作为一个分片。这样做是为了防止一个mapper处理的数据太小
  7. 将file的分片加入到splits中
  8. 返回4,直到将files遍历完
  9. 结束,返回splits

源码

 public InputSplit[] getSplits(JobConf job, int numSplits)
    throws IOException {
	  //获取输入文件列表files,其中会遍历输入目录的子目录,并过滤掉部分文件,如文件_SUCCESS
    FileStatus[] files = listStatus(job);

    // Save the number of input files for metrics/loadgen
    job.setLong(NUM_INPUT_FILES, files.length);
    long totalSize = 0;                           // compute total size
    for (FileStatus file: files) {                // check we have valid files
      if (file.isDirectory()) {
        throw new IOException("Not a file: "+ file.getPath());
      }
      totalSize += file.getLen();
    }

    /*
     * numSplits为设置的map数目
     * 期待的分片大小
     */
    long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits);
    /*
     * FileInputFormat.SPLIT_MINSIZE为参数值:mapreduce.input.fileinputformat.split.minsize,默认为0
     * minSplitSize默认为1
     */
     long minSize = Math.max(job.getLong(org.apache.hadoop.mapreduce.lib.input.
      FileInputFormat.SPLIT_MINSIZE, 1), minSplitSize);

    // generate splits
    ArrayList<FileSplit> splits = new ArrayList<FileSplit>(numSplits);
    NetworkTopology clusterMap = new NetworkTopology();
    for (FileStatus file: files) {
      Path path = file.getPath();
      long length = file.getLen();
      if (length != 0) {
        FileSystem fs = path.getFileSystem(job);
        BlockLocation[] blkLocations;
        if (file instanceof LocatedFileStatus) {
          blkLocations = ((LocatedFileStatus) file).getBlockLocations();
        } else {
          blkLocations = fs.getFileBlockLocations(file, 0, length);
        }
        if (isSplitable(fs, path)) {
          long blockSize = file.getBlockSize();
          /*
           * 计算分片的大小,每一个文件都要计算一次
           *computeSplitSize的计算公式为 Math.max(minSize, Math.min(goalSize, blockSize));
           */
          long splitSize = computeSplitSize(goalSize, minSize, blockSize);

          long bytesRemaining = length;
          while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
            String[] splitHosts = getSplitHosts(blkLocations,
                length-bytesRemaining, splitSize, clusterMap);
            splits.add(makeSplit(path, length-bytesRemaining, splitSize,
                splitHosts));
            bytesRemaining -= splitSize;
          }

          if (bytesRemaining != 0) {
            String[] splitHosts = getSplitHosts(blkLocations, length
                - bytesRemaining, bytesRemaining, clusterMap);
            splits.add(makeSplit(path, length - bytesRemaining, bytesRemaining,
                splitHosts));
          }
        } else {
          String[] splitHosts = getSplitHosts(blkLocations,0,length,clusterMap);
          splits.add(makeSplit(path, 0, length, splitHosts));
        }
      } else {
        //Create empty hosts array for zero length files
        splits.add(makeSplit(path, 0, length, new String[0]));
      }
    }
    LOG.debug("Total # of splits: " + splits.size());
    return splits.toArray(new FileSplit[splits.size()]);
  }

总结

看源码还是很有用的。很多时候,博客或者书介绍的不是很中肯,或者会有错误,看源码就不会出现这些问题。

MapReduce获取分片数目

时间: 2024-10-14 22:57:12

MapReduce获取分片数目的相关文章

实训任务05 MapReduce获取成绩表的最高分记录

实训任务05  MapReduce获取成绩表的最高分记录 实训1:统计用户纺问次数 任务描述: 统计用户在2016年度每个自然日的总访问次数.原始数据文件中提供了用户名称与访问日期.这个任务就是要获取以每个自然日为单位的所有用户访问次数的累加值.如果通过MapReduce编程实现这个任务,首先要考虑的是,Mapper与Reducer各自的处理逻辑是怎样的:然后根据处理逻辑编写出核心代码:最后在Eclipse中编写完整代码,编译打包后提交给集群运行. 分析思路和逻辑 (1)       输入/输出

MapReduce中TextInputFormat分片和读取分片数据源码级分析

InputFormat主要用于描述输入数据的格式(我们只分析新API,即org.apache.hadoop.mapreduce.lib.input.InputFormat),提供以下两个功能: (1)数据切分:按照某个策略将输入数据切分成若干个split,以便确定MapTask个数以及对应的split: (2)为Mapper提供输入数据:读取给定的split的数据,解析成一个个的key/value对,供mapper使用. InputFormat有两个比较重要的方法:(1)List<InputSp

C#ListView控件添加Checkbox复选框并获取选中的数目,检查checkbox是否勾选

原地址:http://blog.csdn.net/lucky51222/article/details/41892429 1.添加复选框:listView1.CheckBoxes = true; 2.选中listview并获取选中的数目: [csharp] view plain copy private void listView1_ItemChecked(object sender, ItemCheckedEventArgs e) { e.Item.Selected = e.Item.Chec

C#ListView控件添加复选框并获取选中的数目

1.添加复选框:listView1.CheckBoxes = true; 2.选中listview并获取选中的数目: private void listView1_ItemChecked(object sender, ItemCheckedEventArgs e) { e.Item.Selected = e.Item.Checked; m = listView1.CheckedItems.Count; label1.Text = "当前选中数:" + m.ToString(); } 说

mapreduce工作原理

转自:http://www.cnblogs.com/z1987/p/5055565.html MapReduce模型主要包含Mapper类和Reducer类两个抽象类.Mapper类主要负责对数据的分析处理,最终转化为key-value数据对:Reducer类主要获取key-value数据对,然后处理统计,得到结果.MapReduce实现了存储的均衡,但没有实现计算的均衡. 一. MapReduce框架组成 MapReduce主要包括JobClient.JobTracker.TaskTracke

Mapreduce执行过程分析(基于Hadoop2.4)——(一)

1 概述 该瞅瞅MapReduce的内部运行原理了,以前只知道个皮毛,再不搞搞,不然怎么死的都不晓得.下文会以2.4版本中的WordCount这个经典例子作为分析的切入点,一步步来看里面到底是个什么情况. 2 为什么要使用MapReduce Map/Reduce,是一种模式,适合解决并行计算的问题,比如TopN.贝叶斯分类等.注意,是并行计算,而非迭代计算,像涉及到层次聚类的问题就不太适合了. 从名字可以看出,这种模式有两个步骤,Map和Reduce.Map即数据的映射,用于把一组键值对映射成另

Elasticsearch 分片交互过程分析

1.Elasticseach如何将数据存储到分片中 问题:当我们要在ES中存储数据的时候,数据应该存储在主分片和复制分片中的哪一个中去:当我们在ES中检索数据的时候,又是怎么判断要查询的数据是属于哪一个分片. 数据存储到分片的过程是一定规则的,并不是随机发生的. 规则:shard = hash(routing) % number_of_primary_shards Routing值可以是一个任意的字符串,默认情况下,它的值为存数数据对应文档 _id 值,也可以是用户自定义的值.Routing这个

NoSQL生态系统——hash分片和范围分片两种分片

13.4 横向扩展带来性能提升 很多NoSQL系统都是基于键值模型的,因此其查询条件也基本上是基于键值的查询,基本不会有对整个数据进行查询的时候.由于基本上所有的查询操作都是基本键值形式的,因此分片通常也基于数据的键来做:键的一些属性会决定这个键值对存储在哪台机器上.下面我们将会对hash分片和范围分片两种分片方式进行描述. 3.4.2 通过协调器进行数据分片 由于CouchDB专注于单机性能,没有提供类似的横向扩展方案,于是出现了两个项目:Lounge 和 BigCouch,他们通过提供一个p

Hadoop Mapreduce 中的Partitioner

Partitioner的作用的对Mapper产生的中间结果进行分片,以便将同一分组的数据交给同一个Reduce处理,Partitioner直接影响Reduce阶段的负载均衡. MapReduce提供了两个Partitioner实现:HashPartitioner和TotalOederPartitioner. HashPartitioner是默认实现,实现了一种基于哈希值的分片方法,代码如下: public int getPartition(K2 key, V2 value, int numRed