Hadoop 学习笔记一 ---MapReduce 的输入和输出

Hadoop 中的MapReduce库支持几种不同格式的输入数据。例如,文本模式的输入数据的每一行被视为一个key/value pair,其中key为文件的偏移量,value为那一行的内容。每一种输入类型的实现都必须能够把输入数据分割成数据片段,并能够由单独的Map任务来对数据片段进行后续处理。

一.  输入格式InputFormat

当运行一个M-R 作业的时候,我们需要为作业制定它的输入格式。InputFormat为Hadoop作业的所有输入格式的抽象基类,它描述了作业输入需要满足的规范细节。

1.InputFormat 抽象类

该抽象类只有两个抽象方法:

Modifier and Type Method and Description
RecordReader<K,V>
getRecordReader(InputSplit split, TaskAttemptContext context)

//Get the RecordReader for the given InputSplit.

List<InputSplit>
getSplits(JobContext context)

//Logically split the set of input files for the job.

函数getSplits(JobContext context ) 主要作用就是将所有的输入文件分割成逻辑上的多个分片InputSplit。这只是逻辑上的分片,并不是真正将文件分割成多个数据块。每个InputSplit分片通过输入文件路径、开始位置和偏移量三个信息来唯一标识。

函数getRecordReader(InputSplit split, TaskAttemptContext context) 主要作用就是为指定的InputSplit创建记录读取器,通过创建的记录读取器从输入分片中读取键值对,然后将键值对交给Map来处理。

当Hadoop在运行MapReduce程序时,InputFormat主要承担一下三个功能:

a. 检查输入路径是否正确;

b. 将输入文件分割成逻辑上的分片InputSplit,并将每个InputSplit 分别传给单独的一个Map,也就是说,有多少InputSplit,就得生成多少个Map任务;

c. InputSplit 是由一条条记录构成的,所以InputSplit 需要提供一个RecordReader 的实现,然后通过RecordReader 的实现来读取InputSplit 中的每条记录,并将之传给Map任务。

2.InputSplit 的子类

InputFormat 类 有多个子类继承,其类与类之间的关系如下:

由上图可知,InputFormat 直接子类有三个:DBInputFormat、DelegatingInputFormat和FileInputFormat,分别表示输入文件的来源为从数据库、用于多个输入以及基于文件的输入。对于FileInputFormat,即从文件输入的输入方式,又有五个继承子类:CombineFileInputFormat,KeyValueTextInput,NLineInoutFormat,

SequenceFileInputFormat,TextInputFormat。

3.getSplits 函数

FileInputFormat 类,实现了getSplits函数:

//获取输入文件的输入分片的方法,生成的输入分片是FileSplit格式的
  public List<InputSplit> getSplits(JobContext job) throws IOException {
    //首先得获取输入分片的上界和下界
    long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
    long maxSize = getMaxSplitSize(job);

// generate splits
    List<InputSplit> splits = new ArrayList<InputSplit>();//初始化用于保存生成的输入分片的列表对象
    List<FileStatus>files = listStatus(job);//获取所有的输入文件列表
    for (FileStatus file: files) {//对文件列表中的每个文件进行相应的分割处理,然后生成文件的输入分片列表
      Path path = file.getPath(); //获取文件的path对象
      FileSystem fs = path.getFileSystem(job.getConfiguration());//获取文件所在的文件系统
      long length = file.getLen();//获取文件的长度
      BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length);//获取文件的所有块信息
      if ((length != 0) && isSplitable(job, path)) { //如果文件的大小不是0 && 文件是可分割的,则执行分割操作
        long blockSize = file.getBlockSize();//获取文件的快大小
        /*将文件系统数据块的大小,输入分片的上下界作为参数传给computeSplitSize方法来计算出真正的输入分片的大小。输入分片
         * 大小的计算策略为:首先取出块大小和设置的分片大小的上界中的较小值,然后取出上一步计算出的较小值和设置的分片大小的下界
         * 的较大值,最终将第二步取出的较大值作为实际分片大小
         * */
        long splitSize = computeSplitSize(blockSize, minSize, maxSize);
       
        long bytesRemaining = length;//剩余文件大小的初始值作为整个文件的大小
        //如果文件未分割的部分的大小比分片的1.1倍大,那么就创建一个FileSplit分片
        while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
          int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
          splits.add(new FileSplit(path, length-bytesRemaining, splitSize,
                                   blkLocations[blkIndex].getHosts()));
          bytesRemaining -= splitSize;
        }
        //当剩余文件的大小比分片的1.1倍小,则将剩余部分作为整个FileSplit分片处理
        if (bytesRemaining != 0) {
          splits.add(new FileSplit(path, length-bytesRemaining, bytesRemaining,
                     blkLocations[blkLocations.length-1].getHosts()));
        }
      } else if (length != 0) { //如果文件是不可分割的,则将整个文件作为一个FileSplit处理
        splits.add(new FileSplit(path, 0, length, blkLocations[0].getHosts()));
      } else {
        //Create empty hosts array for zero length files
        splits.add(new FileSplit(path, 0, length, new String[0]));
      }
    }

4.TestInputFormat 类

TextInputFormat 类是FileInputFormat 的默认实现,该输入格式主要针对的是文本类型的文件,文件被分割成多行,而且每一行都使用换行符(LF=10) 或者【Enter】键作为每一行的结束标识。该类主要重写了FileInputFormat中的createRecordReader ,其返回了LineRecordReader行记录读取器,该读取器用于从文件中读取一行,将这行文本在文件中的偏移量作为key,以这行文本的内容作为value,组成键值对。

二. 输入分片 InputSplit

输入分片InputSplit 是一个单独的Map需要处理的数据单元。输入分片的类型一般都是字节类型的,经过相应的RecordReader  处理后,转化成记录视图的形式,然后交给Map处理。InputSplit存储的并非数据本身,而是一个分片长度和一个记录数据位置的数组,生成InputSplit的方法可通过InputFormat来设置。InputFormat的getSplits方法可以生成InputSplit相关信息,包括两部分:InputSplit元数据信息和原始InputSplit信息。InputSplit元数据信息将被JobTracker使用,用以生成Task本地性相关数据结构;原始InputSplit信息将被Map Task初始化时使用,用以获取自己要处理的数据。

输入分片InputSplit 类有三个子类继承:FileSplit (文件输入分片),CombineFileSplit(多文件输入分片)以及DBInputSplit(数据块输入分片)。

三. 记录读取器 RecordReader

抽象基类RecordReader 的实现来读取InputSplit 中的每条记录,并将之传给Map任务。

1.RecordReader 抽象类中主要有一下几个虚函数:

public  abstract void initialize(InputSplit split,TaskAttemptContext context) ;//初始化方法

public abstract boolean nextKeyValue();//获取输入分片的下一个键值对

public abstract KEYIN getCurrentKey();//获取当前读取到的键值对中的键对象

public abstract VALUEIN getCurrentValue();//获取当前读取到的键值对中的值对象

public abstract float getProgress();//获取当前数据的读取进度

public abstract void close();//关闭RecordReader ,清理其占用的资源

2.RecordReader 的几个继承子类:

LineRecordReader 行记录读取器:该类是针对文本文件的默认读取器,一次处理输入分片中的一行,并将偏移量最为键,行的内容作为值,得到键值对。

KeyValueLineRecordReader 键值对读取器:该类将输入文件的整行作为一个由指定分隔符分开的键值对,默认的分隔符为:”\t”,我们可以通过mapreduce.input.keyvaluelinerecordreader.key.value.separator配置项来修改默认的分隔符。

SequenceFileRecordReader 序列文件读取器:该类用于读取SequenceFile 格式的输入分片。

DBRecordReader 数据库记录读取器:该类用于处理DBInputSplit 输入分片,该类会从关系型数据库中读取若干条记录, 将读取到的记录的条数作为键,将内容作为值。

四. 输出格式 OutputFormat

1.OutputFormat 抽象类:

OutputFormat 抽象类描述了M-R作业的输出规范,它决定了将MapReduce 的作业的输出结果保持到哪里,以及如何对输出结果进行持久化操作。其主要完成以下几个工作:

a. 坚持作业的输出是否有效;

b. 提供一个具体的RecordWriter 实现类。

OutputFormat 抽象类主要有三个抽象函数:

public abstract RecordWriter<K, V> getRecordWriter(TaskAttemptContext context )//返回RecordWriter,将K-V 对写入存储结构

public abstract void checkOutputSpecs(JobContext context )//检查输出目录是否存在

public abstract OutputCommitter getOutputCommitter(TaskAttemptContext context )//Hadoop作业使用该类来完成输出结果的提交即将作业的输出结果保存到正确的输出目录中

2.OutputFormat 类有四个继承子类:

FileOutputFormat 类:将键值对写入文件系统

DBOutputFormat 类:将键值对写入数据库中

FitterOutputFormat 类:将OutputFormat 的结果再次封装,类似Java的流的Fitter

NullOutputFormat 类:将键值对写入/dev/null,相当于舍弃这些值

五. 记录写入器 RecordWriter

InputFormat 提供了RecordReader 来从输入文件中读取键值对,OutputFormat 也提供了RecordWriter 用于MapReduce 作业的键值对写入指定的输出中。

RecordWriter 有两个抽象函数:

public abstract void write(K key,V value)//用于将产生的键值对以指定的格式写入到输出目录中

public abstract void close(TaskAttempContext context)//关闭输出,并释放资源

时间: 2024-10-23 05:34:57

Hadoop 学习笔记一 ---MapReduce 的输入和输出的相关文章

hadoop 学习笔记:mapreduce框架详解

hadoop 学习笔记:mapreduce框架详解 开始聊mapreduce,mapreduce是hadoop的计算框架,我 学hadoop是从hive开始入手,再到hdfs,当我学习hdfs时候,就感觉到hdfs和mapreduce关系的紧密.这个可能是我做技术研究的 思路有关,我开始学习某一套技术总是想着这套技术到底能干什么,只有当我真正理解了这套技术解决了什么问题时候,我后续的学习就能逐步的加快,而学习 hdfs时候我就发现,要理解hadoop框架的意义,hdfs和mapreduce是密不

[转]Spring MVC 学习笔记 json格式的输入和输出

Spring mvc处理json需要使用jackson的类库,因此为支持json格式的输入输出需要先修改pom.xml增加jackson包的引用 <!-- json --> <dependency> <groupId>org.codehaus.jackson</groupId> <artifactId>jackson-core-lgpl</artifactId> <version>1.8.1</version>

【Big Data - Hadoop - MapReduce】hadoop 学习笔记:MapReduce框架详解

开始聊MapReduce,MapReduce是Hadoop的计算框架,我学Hadoop是从Hive开始入手,再到hdfs,当我学习hdfs时候,就感觉到hdfs和mapreduce关系的紧密.这个可能是我做技术研究的思路有关,我开始学习某一套技术总是想着这套技术到底能干什么,只有当我真正理解了这套技术解决了什么问题时候,我后续的学习就能逐步的加快,而学习hdfs时候我就发现,要理解hadoop框架的意义,hdfs和mapreduce是密不可分,所以当我写分布式文件系统时候,总是感觉自己的理解肤浅

Hadoop学习笔记:MapReduce框架详解

原文出处: 夏天的森林 开始聊mapreduce,mapreduce是hadoop的计算框架,我学hadoop是从hive开始入手,再到hdfs,当我学习hdfs时候,就感觉到hdfs和mapreduce关系的紧密.这个可能是我做技术研究的思路有关,我开始学习某一套技术总是想着这套技术到底能干什么,只有当我真正理解了这套技术解决了什么问题时候,我后续的学习就能逐步的加快,而学习hdfs时候我就发现,要理解hadoop框架的意义,hdfs和mapreduce是密不可分,所以当我写分布式文件系统时候

hadoop 学习笔记:mapreduce框架详解(转 http://www.cnblogs.com/sharpxiajun/p/3151395.html)

开始聊mapreduce,mapreduce是hadoop的计算框架,我学hadoop是从hive开始入手,再到hdfs,当我学习hdfs时候,就感觉到hdfs和mapreduce关系的紧密.这个可能是我做技术研究的思路有关,我开始学习某一套技术总是想着这套技术到底能干什么,只有当我真正理解了这套技术解决了什么问题时候,我后续的学习就能逐步的加快,而学习hdfs时候我就发现,要理解hadoop框架的意义,hdfs和mapreduce是密不可分,所以当我写分布式文件系统时候,总是感觉自己的理解肤浅

Java学习笔记—第十章 数据输入与输出

第十章  数据输入与输出 输入流与输出流: (1)输入流:以程序为基准,向程序中输入数据的流定义为输入流.从输入流向程序中输入数据称为读数据(read). (2)输出流:以程序为基准,从程序输出数据的流称为输出流.从程序中将数据输出到输出流称为写数据(write). 字节流和字符流 (1)字节流:按照字节的形式读/写数据.Java中抽象类InputStream和OutputStream及其派生子类用来处理字节流的输入和输出. (2)字符流:按照字符的形式读/写数据.Java中抽象类Reader和

Hadoop学习笔记—11.MapReduce中的排序和分组

一.写在之前的 1.1 回顾Map阶段四大步凑 首先,我们回顾一下在MapReduce中,排序和分组在哪里被执行: 从上图中可以清楚地看出,在Step1.4也就是第四步中,需要对不同分区中的数据进行排序和分组,默认情况下,是按照key进行排序和分组. 1.2 实验场景数据文件 在一些特定的数据文件中,不一定都是类似于WordCount单次统计这种规范的数据,比如下面这类数据,它虽然只有两列,但是却有一定的实践意义. 3 3 3 2 3 1 2 2 2 1 1 1 (1)如果按照第一列升序排列,当

Hadoop学习笔记,MapReduce任务Namenode DataNode Jobtracker Tasktracker之间的关系

一.基本概念 在MapReduce中,一个准备提交执行的应用程序称为“作业(job)”,而从一个作业划分出的运行于各个计算节点的工作单元称为“任务(task)”.此外,Hadoop提供的分布式文件系统(HDFS)主要负责各个节点的数据存储,并实现了高吞吐率的数据读写. 在分布式存储和分布式计算方面,Hadoop都是用主/从(Master/Slave)架构.在一个配置完整的集群上,想让Hadoop这头大象奔跑起来,需要在集群中运行一系列后台程序.不同的后台程序扮演不用的角色,这些角色由NameNo

Hadoop学习笔记—12.MapReduce中的常见算法

一.MapReduce中有哪些常见算法 (1)经典之王:单词计数 这个是MapReduce的经典案例,经典的不能再经典了! (2)数据去重 "数据去重"主要是为了掌握和利用并行化思想来对数据进行有意义的筛选.统计大数据集上的数据种类个数.从网站日志中计算访问地等这些看似庞杂的任务都会涉及数据去重. (3)排序:按某个Key进行升序或降序排列 (4)TopK:对源数据中所有数据进行排序,取出前K个数据,就是TopK. 通常可以借助堆(Heap)来实现TopK问题. (5)选择:关系代数基