hadoop InputFormat getSplits

 /** Splits files returned by {@link #listStatus(JobConf)} when
   * they‘re too big.*/
  public InputSplit[] getSplits(JobConf job, int numSplits)
    throws IOException {
    //计时器,
    StopWatch sw = new StopWatch().start();
    //
    FileStatus[] files = listStatus(job);

    // Save the number of input files for metrics/loadgen
    //设置配置中文件个数mapreduce.input.fileinputformat.numinputfiles
    job.setLong(NUM_INPUT_FILES, files.length);
    // 计算所有文件的大小总和
    long totalSize = 0;                           // compute total size
    for (FileStatus file: files) {                // check we have valid files
      if (file.isDirectory()) {
        throw new IOException("Not a file: "+ file.getPath());
      }
      totalSize += file.getLen();
    }
    // 每个split目标大小,用总的文件大小 / (max(设置的split个数,1)),
    long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits);
    // 每个split大小的最小值,读取mapreduce.input.fileinputformat.split.minsize配置,如果没有配置的话那么
    // 取minSplitSize =1
    long minSize = Math.max(job.getLong(org.apache.hadoop.mapreduce.lib.input.
      FileInputFormat.SPLIT_MINSIZE, 1), minSplitSize);

    // 生成 splits
    ArrayList<FileSplit> splits = new ArrayList<FileSplit>(numSplits);
    NetworkTopology clusterMap = new NetworkTopology();
    //遍历文件列表
    for (FileStatus file: files) {
      //获取一个文件路径
      Path path = file.getPath();
      //获取文件大小
      long length = file.getLen();
      if (length != 0) {
        FileSystem fs = path.getFileSystem(job);
        BlockLocation[] blkLocations;
        //判断file是否包含file的location,也就是,是否包含BlockLocation等信息,
        if (file instanceof LocatedFileStatus) {
          blkLocations = ((LocatedFileStatus) file).getBlockLocations();
        } else {
          //去构造BlockLocation信息
          blkLocations = fs.getFileBlockLocations(file, 0, length);
        }
        //判断文件是否可以切分
        if (isSplitable(fs, path)) {
          //获取文件的BlockSize大小
          long blockSize = file.getBlockSize();
          //splitSize最终由 goalSize(设置的每个split大小的目标值),minSize(设置的每个split大小的最小值),blockSize(file的block数量)三个值所决定,逻辑关系如下:
          // Math.max(minSize, Math.min(goalSize, blockSize))
          // Math.max(minSize, Math.min((totalSize / (numSplits == 0 ? 1 : numSplits)), blockSize))
          // numSplits这个设置,只有在totalSize/numSplits < blockSize才会生效
          // minSize 只有在大于blockSize的时候才会生效
          long splitSize = computeSplitSize(goalSize, minSize, blockSize);
          //文件为读取长度
          long bytesRemaining = length;
          //如果剩余的大小/split的大小大雨1.1,那么就商城生成一个split
          while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
            String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations,
                length-bytesRemaining, splitSize, clusterMap);
            splits.add(makeSplit(path, length-bytesRemaining, splitSize,
                splitHosts[0], splitHosts[1]));
            bytesRemaining -= splitSize;
          }
          //剩余的一点点数据也要生成一个split,
          if (bytesRemaining != 0) {
            String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations, length
                - bytesRemaining, bytesRemaining, clusterMap);
            splits.add(makeSplit(path, length - bytesRemaining, bytesRemaining,
                splitHosts[0], splitHosts[1]));
          }
        } else {
          String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations,0,length,clusterMap);
          splits.add(makeSplit(path, 0, length, splitHosts[0], splitHosts[1]));
        }
      } else {
        //Create empty hosts array for zero length files
        splits.add(makeSplit(path, 0, length, new String[0]));
      }
    }
    sw.stop();
    if (LOG.isDebugEnabled()) {
      LOG.debug("Total # of splits generated by getSplits: " + splits.size()
          + ", TimeTaken: " + sw.now(TimeUnit.MILLISECONDS));
    }
    return splits.toArray(new FileSplit[splits.size()]);
  }

原文地址:https://www.cnblogs.com/fantiantian/p/9346782.html

时间: 2024-08-04 10:25:19

hadoop InputFormat getSplits的相关文章

Hadoop InputFormat浅析

本文转载:http://hi.baidu.com/_kouu/item/dc8d727b530f40346dc37cd1 在执行一个Job的时候,Hadoop会将输入数据划分成N个Split,然后启动相应的N个Map程序来分别处理它们. 数据如何划分?Split如何调度(如何决定处理Split的Map程序应该运行在哪台TaskTracker机器上)?划分后的数据又如何读取?这就是本文所要讨论的问题. 先从一张经典的MapReduce工作流程图出发: 1.运行mapred程序: 2.本次运行将生成

Hadoop InputFormat

Hadoop可以处理不同数据格式(数据源)的数据,从文本文件到(非)关系型数据库,这很大程度上得益于Hadoop InputFormat的可扩展性设计,InputFormat层次结构图如下: InputFormat(org.apache.hadoop.mapreduce.InputFormat)被设计为一个抽象类,代码如下: public abstract class InputFormat<K, V> { public abstract List<InputSplit> getS

hadoop InputFormat详解

1. 概述 我们在设置MapReduce输入格式的时候,会调用这样一条语句: job.setInputFormatClass(KeyValueTextInputFormat.class); 这条语句保证了输入文件会按照我们预设的格式被读取.KeyValueTextInputFormat即为我们设定的数据读取格式. 所有的输入格式类都继承自InputFormat,这是一个抽象类.其子类有例如专门用于读取普通文件的FileInputFormat,还有用来读取数据库的DBInputFormat等等.相

Hadoop InputFormat源码分析

平时我们写MapReduce程序的时候,在设置输入格式的时候,总会调用形如job.setInputFormatClass(KeyValueTextInputFormat.class)来保证输入文件按照我们想要的格式被读取.所有的输入格式都继承于InputFormat,这是一个抽象类,其子类有专门用于读取普通文件的FileInputFormat,用来读取数据库的DBInputFormat等等. 不同的InputFormat都会按自己的实现来读取输入数据并产生输入分片,一个输入分片会被单独的MapT

Hadoop实现全排序

1.1TB(或1分钟)排序的冠军 作为分布式数据处理的框架,集群的数据处理能力究竟有多快?或许1TB排序可以作为衡量的标准之一. 1TB排序,就是对1TB(1024GB,大约100亿行数据)的数据进行排序.2008年,Hadoop赢得1TB排序基准评估第一名,排序1TB数据耗时209秒.后来,1TB排序被1分钟排序所取代,1分钟排序指的是在一分钟内尽可能多的排序.2009年,在一个1406个节点组成的hadoop集群,在59秒里对500GB完成了排序:而在1460个节点的集群,排序1TB数据只花

Hadoop开发相关问题

总结自己在Hadoop开发中遇到的问题,主要在mapreduce代码执行方面.大部分来自日常代码执行错误的解决方法,还有一些是对Java.Hadoop剖析.对于问题,通过查询stackoverflow.csdn找到了解决方法.汇总出来以后查询方便.内容将不定期更新. 1.jar包执行出错,提示"class wordcount.WordCountMapper not found" 错误原因:在run()代码中没有定义setJarByClass解决方法:在wordcountJob.java

InputFormat&amp;OutputFormat

本文的主要目的是从源码级别讲解Hadoop中InputFormat和OutputFormat部分,首先简介InputFormat和OutputFormat,然后介绍两个重要的组件,RecordWriter和RecordReader,再以FileInputFormat和FileOutputFormat为例,介绍一组InputFormat和OutputFormat的实现细节,最后以SqoopInputFormat和SqoopOutputFormat为例,体会一下InputFormat和OutputF

关于hadoop中的DBInputFormat试验

1.注意,需要声明为静态内部类,否则会报java.lang.NoSuchMethodException...<init>的错误public static class MySqlWritable implements Writable, DBWritable { 2.如果输出目录存在,需要先删除 3.由于需要从mysql数据取值,则需要有mysql数据库驱动包,hadoop classpath查看hadoop类加载路径,将驱动包拷贝到其中一个目录下即可: 4.解决mysql"Acces

Hadoop OutputFormat浅析

问题:reduce输出时,如果不是推测任务写结果时会先写临时目录最后移动到输出目录吗? 下面部分转自Hadoop官网说明 OutputFormat 描述Map/Reduce作业的输出样式. Map/Reduce框架根据作业的OutputFormat来: 检验作业的输出,例如检查输出路径是否已经存在. 提供一个RecordWriter的实现,用来输出作业结果. 输出文件保存在FileSystem上. TextOutputFormat是默认的 OutputFormat. 任务的Side-Effect