Hadoop2.6.0学习笔记(四)TextInputFormat及RecordReader解析析

鲁春利的工作笔记,谁说程序员不能有文艺范?



一个最简单的MapReduce程序

package com.lucl.hadoop.mapreduce;

public class MiniMRDriver extends Configured implements Tool {
    public static void main(String[] args) {
        try {
            ToolRunner.run(new MiniMRDriver(), args);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    
    @Override
    public int run(String[] args) throws Exception {
        Job job = Job.getInstance(this.getConf(), this.getClass().getSimpleName());
        job.setJarByClass(MiniMRDriver.class);
        
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
   
        return job.waitForCompletion(true) ? 0 : 1;
    }

}

查看MapReduce任务的数据

[[email protected] code]$ hdfs dfs -text /data/HTTP_SITE_FLOW.log
视频网站        15      1527
信息安全        20      3156
站点统计        24      6960
搜索引擎        28      3659
站点统计        3       1938
综合门户        15      1938
搜索引擎        21      9531
搜索引擎        63      11058
[[email protected] code]$

打包运行该MapReduce程序

[[email protected] code]$ hadoop jar MiniMR.jar /data/HTTP_SITE_FLOW.log /201511302119
15/11/30 21:19:46 INFO client.RMProxy: Connecting to ResourceManager at nnode/192.168.137.117:8032
15/11/30 21:19:48 INFO input.FileInputFormat: Total input paths to process : 1
15/11/30 21:19:48 INFO mapreduce.JobSubmitter: number of splits:1
15/11/30 21:19:49 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1448889273221_0001
15/11/30 21:19:50 INFO impl.YarnClientImpl: Submitted application application_1448889273221_0001
15/11/30 21:19:50 INFO mapreduce.Job: The url to track the job: http://nnode:8088/proxy/application_1448889273221_0001/
15/11/30 21:19:50 INFO mapreduce.Job: Running job: job_1448889273221_0001
15/11/30 21:20:26 INFO mapreduce.Job: Job job_1448889273221_0001 running in uber mode : false
15/11/30 21:20:26 INFO mapreduce.Job:  map 0% reduce 0%
15/11/30 21:20:59 INFO mapreduce.Job:  map 100% reduce 0%
15/11/30 21:21:30 INFO mapreduce.Job:  map 100% reduce 100%
15/11/30 21:21:31 INFO mapreduce.Job: Job job_1448889273221_0001 completed successfully
15/11/30 21:21:31 INFO mapreduce.Job: Counters: 49
        File System Counters
                FILE: Number of bytes read=254
                FILE: Number of bytes written=213863
                FILE: Number of read operations=0
                FILE: Number of large read operations=0
                FILE: Number of write operations=0
                HDFS: Number of bytes read=277
                HDFS: Number of bytes written=194
                HDFS: Number of read operations=6
                HDFS: Number of large read operations=0
                HDFS: Number of write operations=2
        Job Counters 
                Launched map tasks=1
                Launched reduce tasks=1
                Data-local map tasks=1
                Total time spent by all maps in occupied slots (ms)=30256
                Total time spent by all reduces in occupied slots (ms)=27787
                Total time spent by all map tasks (ms)=30256
                Total time spent by all reduce tasks (ms)=27787
                Total vcore-seconds taken by all map tasks=30256
                Total vcore-seconds taken by all reduce tasks=27787
                Total megabyte-seconds taken by all map tasks=30982144
                Total megabyte-seconds taken by all reduce tasks=28453888
        Map-Reduce Framework
                Map input records=8
                Map output records=8
                Map output bytes=232
                Map output materialized bytes=254
                Input split bytes=103
                Combine input records=0
                Combine output records=0
                Reduce input groups=8
                Reduce shuffle bytes=254
                Reduce input records=8
                Reduce output records=8
                Spilled Records=16
                Shuffled Maps =1
                Failed Shuffles=0
                Merged Map outputs=1
                GC time elapsed (ms)=182
                CPU time spent (ms)=2000
                Physical memory (bytes) snapshot=305459200
                Virtual memory (bytes) snapshot=1697824768
                Total committed heap usage (bytes)=136450048
        Shuffle Errors
                BAD_ID=0
                CONNECTION=0
                IO_ERROR=0
                WRONG_LENGTH=0
                WRONG_MAP=0
                WRONG_REDUCE=0
        File Input Format Counters 
                Bytes Read=174
        File Output Format Counters 
                Bytes Written=194
[[email protected] code]$

查看输出结果

[[email protected] code]$ hdfs dfs -ls /201511302119
Found 2 items
-rw-r--r--   2 hadoop hadoop          0 2015-11-30 21:21 /201511302119/_SUCCESS
-rw-r--r--   2 hadoop hadoop        194 2015-11-30 21:21 /201511302119/part-r-00000
[[email protected] code]$ hdfs dfs -text /201511302119/part-r-00000
0       视频网站        15      1527
22      信息安全        20      3156
44      站点统计        24      6960
66      搜索引擎        28      3659
88      站点统计        3       1938
109     综合门户        15      1938
131     搜索引擎        21      9531
153     搜索引擎        63      11058
[[email protected] code]$

在这里没有指定Mapper类、Reducer类,并通过FileInputFormat和FileOutputFormat指定了输入数据及输出结果存储路径,执行后把行偏移量和行内容保存到了指定的输出路径下。

FileInputFormat的默认实现为TextInputFormat,专门用来处理文本数据,以回车换行符作为一行的分割标记,其中key为该行的行偏移量,value为这一行内容。

类定义如下:

public class TextInputFormat extends FileInputFormat<LongWritable, Text> {

  @Override
  public RecordReader<LongWritable, Text> createRecordReader(InputSplit split, 
                                                  TaskAttemptContext context) {
    // 略
    return new LineRecordReader(recordDelimiterBytes);
  }

  @Override
  protected boolean isSplitable(JobContext context, Path file) {
    // 是否可切片
  }
}

在Job任务中可以通过public void setInputFormatClass(Class<? extends InputFormat> cls)方法设定希望使用的InputFormat格式。

public abstract class InputFormat<K, V> {
    public abstract List<InputSplit> getSplits(JobContext context) 
                        throws IOException, InterruptedException;
                               
    public abstract RecordReader<K,V> createRecordReader(InputSplit split,
                                         TaskAttemptContext context
                                        ) throws IOException, InterruptedException;
}

文件在HDFS上是以Block块的形式存储的,而在MapReduce计算中则是以划分的切片(split后称为split分片或chunk)进行读取的,每个split的就对应一个mapper task,split的数量决定了mappertask的数量。

注意:MapReduce是由Mapper和Reducer组成的,MapperTask由split决定,那么Reducer由什么来决定呢?后面会逐渐通过示例代码进行说明

List<InputSplit> getSplits(JobContext context)负责将一个大数据逻辑分成多片。比如数据库表有100条数据,按照主键ID升序存储,假设每20条分成一片,这个List的大小就是5,然后每个InputSplit记录两个参数,第一个为这个分片的起始ID,第二个为这个分片数据的大小(这里是20)。InputSplit并没有真正存储数据,只是提供了一个如何将数据分片的方法。

RecordReader<K, V) createRecordReader(InputSplit split, TaskAttemptContext context)根据InputSplit定义的分片方法,返回一个能够读取分片记录的RecordReader。

InputSplit类定义

public abstract class InputSplit {
    // Split分片的大小,用来实现输入的split的排序
    public abstract long getLength() throws IOException, InterruptedException;
    // 用来获取存储分片的位置列表
    public abstract String[] getLocations() throws IOException, InterruptedException;
}

RecordReader类定义

public abstract class RecordReader<KEYIN, VALUEIN> implements Closeable {
    public abstract void initialize(InputSplit split,TaskAttemptContext context
                                  ) throws IOException, InterruptedException;
    public abstract boolean nextKeyValue() throws IOException, InterruptedException;
    public abstract KEYIN getCurrentKey() throws IOException, InterruptedException;
    public abstract VALUEIN getCurrentValue() throws IOException, InterruptedException;
    public abstract float getProgress() throws IOException, InterruptedException;
    public abstract void close() throws IOException;
}

InputSplit描述了数据块的切分方式,RecordReader类则是实际用来加载split分片数据,并把数据转换为适合Mapper类里面map()方法处理的<key, value>形式。

RecordReader实例是由输入格式定义的,默认的输入格式为TextInputFormat,提供了一个LineRecordReader,把每一行的行偏移量作为key,把内容作为value。RecordReader会在输入块上被反复调用,直到整个输入块被处理完毕,每一次调用RecordReader都会调用Mapper类的map()函数。

TextInputFormat并没有getSplits的实现,而是其父类FileInputFormat进行了实现。

public abstract class FileInputFormat<K, V> extends InputFormat<K, V> {
    // Generate the list of files and make them into FileSplits
    public List<InputSplit> getSplits(JobContext job) throws IOException {
        // 1. 通过JobContext中获取List<FileStatus>;
        // 2. 遍历文件属性数据
        //    2.1. 如果是空文件,则初始化一个无主机信息的FileSplits实例;
        //    2.2. 非空文件,判断是否分片,默认是分片的
        //         如果不分片则每个文件作为一个FileSplit
        //         计算分片大小splitSize
        
        // getFormatMinSplitSize()返回固定值1
        // getMinSplitSize(job)通过Configuration获取,配置参数为(mapred-default.xml):
        // mapreduce.input.fileinputformat.split.minsize默认值为0
        // minSize的值为1
        long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
        // 实际调用context.getConfiguration().getLong(SPLIT_MAXSIZE, Long.MAX_VALUE);
        // 通过Configuration获取,配置参数为(mapred-default.xml无该参数):
        // mapreduce.input.fileinputformat.split.maxsize
        // 未配置该参数,取Long.MAX_VALUE,maxSize的值为Long.MAX_VALUE
        long maxSize = getMaxSplitSize(job);
    
        // generate splits
        List<InputSplit> splits = new ArrayList<InputSplit>();
        List<FileStatus> files = listStatus(job);
        for (FileStatus file: files) {
          Path path = file.getPath();     // 在HDFS上的绝对路径
          long length = file.getLen();    // 文件的实际大小
          if (length != 0) {
            BlockLocation[] blkLocations;
            if (file instanceof LocatedFileStatus) {
              blkLocations = ((LocatedFileStatus) file).getBlockLocations();
            } else {
              FileSystem fs = path.getFileSystem(job.getConfiguration());
              blkLocations = fs.getFileBlockLocations(file, 0, length);
            }
            if (isSplitable(job, path)) {
              // 这里取的是Block块的大小,在2.6里面默认是134217728(即128M)
              long blockSize = file.getBlockSize();
              // 获取切片大小,computeSplitSize(blockSize, minSize, maxSize)实际调用:
              //          1                Long.MAX_VALUE   128M
              // Math.max(minSize, Math.min(maxSize,        blockSize));
              // split的大小刚好等于block块的大小,为128M
              long splitSize = computeSplitSize(blockSize, minSize, maxSize);
    
              long bytesRemaining = length;
              while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
                int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
                splits.add(makeSplit(path, length-bytesRemaining, splitSize,
                            blkLocations[blkIndex].getHosts(),
                            blkLocations[blkIndex].getCachedHosts()));
                bytesRemaining -= splitSize;
              }
    
              if (bytesRemaining != 0) {
                int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
                splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
                           blkLocations[blkIndex].getHosts(),
                           blkLocations[blkIndex].getCachedHosts()));
              }
            } else { // not splitable
              splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(),
                          blkLocations[0].getCachedHosts()));
            }
          } else { 
            //Create empty hosts array for zero length files
            splits.add(makeSplit(path, 0, length, new String[0]));
          }
        }
        // Save the number of input files for metrics/loadgen
        job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
        
        return splits;
    }
}

说明:List<FileStatus>中FileStatus可能为LocatedFileStatus(a FileStatus that includes a file‘s block locations)。

时间: 2024-10-07 19:27:48

Hadoop2.6.0学习笔记(四)TextInputFormat及RecordReader解析析的相关文章

Hadoop2.6.0学习笔记(三)HDFS架构

鲁春利的工作笔记,谁说程序员不能有文艺范? HDFS Architecture见: http://hadoop.apache.org/docs/r2.6.0/hadoop-project-dist/hadoop-hdfs/HdfsDesign.html 或下载的tar包解压后的 hadoop-2.6.0/share/doc/hadoop/hadoop-project-dist/hadoop-hdfs/HdfsDesign.html 官方给出的架构为:

Hadoop2.6.0学习笔记(四)HDFS dfsadmin脚本

鲁春利的工作笔记,谁说程序员不能有文艺范? 本文出自 "闷葫芦的世界" 博客,请务必保留此出处http://luchunli.blog.51cto.com/2368057/1717733

Hadoop2.6.0学习笔记(七)MapReduce分区

鲁春利的工作笔记,谁说程序员不能有文艺范? MapReduce中map task任务的数量是由spli分片决定,那么reduce task的数量由什么来确定的呢?就是这里要讨论的MapReduce分区.默认情况下,MapReduce中使用的是HashPartitioner. /** Partition keys by their {@link Object#hashCode()}. */ public class HashPartitioner<K, V> extends Partitione

Hadoop2.6.0学习笔记(七)HDFS读写流程

鲁春利的工作笔记,谁说程序员不能有文艺范? HDFS读文件解析 HDFS写文件解析

Hadoop2.6.0学习笔记(九)SequenceFile和MapFile

鲁春利的工作笔记,谁说程序员不能有文艺范? 本文出自 "闷葫芦的世界" 博客,请务必保留此出处http://luchunli.blog.51cto.com/2368057/1717819

Hadoop2.6.0学习笔记(六)TextOutputFormat及RecordWriter解析

鲁春利的工作笔记,谁说程序员不能有文艺范? MapReduce提供了许多默认的输出格式,如TextOutputFormat.KeyValueOutputFormat等.MapReduce中输出文件的个数与Reduce的个数一致,默认情况下有一个Reduce,输出只有一个文件,文件名为part-r-00000,文件内容的行数与map输出中不同key的个数一致.如果有两个Reduce,输出的结果就有两个文件,第一个为part-r-00000,第二个为part-r-00001,依次类推. MapRed

Linux学习笔记四:Linux的文件搜索命令

1.文件搜索命令  which 语法:which [命令名称] 范例:$which ls  列出ls命令所在目录 [[email protected] ~]$ which ls alias ls='ls --color=auto' /bin/ls 另外一个命令:whereis [名称名称],也可以列出命令所在目录. [[email protected] ~]$ whereis ls ls: /bin/ls /usr/share/man/man1/ls.1.gz /usr/share/man/ma

小猪的数据结构学习笔记(四)

小猪的数据结构学习笔记(四) 线性表之静态链表 --转载请注明出处:coder-pig 本章引言: 在二,三中中我们分别学习了顺序表中的线性表与单链表,线性表有点类似于 我们前面所学的数组,而单链表使用的最多的是指针,这里问个简单的问题, 如果是在以前没有指针的话,前辈先人们怎么实现单链表呢?大家思考下! 没有指针,那么用什么来代替呢?前辈先人们非常机智,想出了使用下标+游标的方式 来实现单链表的效果!也就是今天要讲的--静态链表! 当然你也可以直接跳过本章,因为有了单链表就没有必要用静态链表了

NLTK学习笔记(四):自然语言处理的一些算法研究

自然语言处理中算法设计有两大部分:分而治之 和 转化 思想.一个是将大问题简化为小问题,另一个是将问题抽象化,向向已知转化.前者的例子:归并排序:后者的例子:判断相邻元素是否相同(与排序). 这次总结的自然语言中常用的一些基本算法,算是入个门了. 递归 使用递归速度上会受影响,但是便于理解算法深层嵌套对象.而一些函数式编程语言会将尾递归优化为迭代. 如果要计算n个词有多少种组合方式?按照阶乘定义:n! = n*(n-1)*...*1 def func(wordlist): length = le