Hadoop 中的MapReduce库支持几种不同格式的输入数据。例如,文本模式的输入数据的每一行被视为一个key/value pair,其中key为文件的偏移量,value为那一行的内容。每一种输入类型的实现都必须能够把输入数据分割成数据片段,并能够由单独的Map任务来对数据片段进行后续处理。
一. 输入格式InputFormat
当运行一个M-R 作业的时候,我们需要为作业制定它的输入格式。InputFormat为Hadoop作业的所有输入格式的抽象基类,它描述了作业输入需要满足的规范细节。
1.InputFormat 抽象类
该抽象类只有两个抽象方法:
Modifier and Type | Method and Description |
RecordReader<K,V> |
getRecordReader(InputSplit split, TaskAttemptContext context)
|
List<InputSplit> |
getSplits (JobContext context)
|
函数getSplits(JobContext context
) 主要作用就是将所有的输入文件分割成逻辑上的多个分片InputSplit。这只是逻辑上的分片,并不是真正将文件分割成多个数据块。每个InputSplit分片通过输入文件路径、开始位置和偏移量三个信息来唯一标识。
函数getRecordReader(InputSplit split, TaskAttemptContext context) 主要作用就是为指定的InputSplit创建记录读取器,通过创建的记录读取器从输入分片中读取键值对,然后将键值对交给Map来处理。
当Hadoop在运行MapReduce程序时,InputFormat主要承担一下三个功能:
a. 检查输入路径是否正确;
b. 将输入文件分割成逻辑上的分片InputSplit,并将每个InputSplit 分别传给单独的一个Map,也就是说,有多少InputSplit,就得生成多少个Map任务;
c. InputSplit 是由一条条记录构成的,所以InputSplit 需要提供一个RecordReader 的实现,然后通过RecordReader 的实现来读取InputSplit 中的每条记录,并将之传给Map任务。
2.InputSplit 的子类
InputFormat 类 有多个子类继承,其类与类之间的关系如下:
由上图可知,InputFormat 直接子类有三个:DBInputFormat、DelegatingInputFormat和FileInputFormat,分别表示输入文件的来源为从数据库、用于多个输入以及基于文件的输入。对于FileInputFormat,即从文件输入的输入方式,又有五个继承子类:CombineFileInputFormat,KeyValueTextInput,NLineInoutFormat,
SequenceFileInputFormat,TextInputFormat。
3.getSplits 函数
FileInputFormat 类,实现了getSplits函数:
//获取输入文件的输入分片的方法,生成的输入分片是FileSplit格式的
public List<InputSplit> getSplits(JobContext job) throws IOException {
//首先得获取输入分片的上界和下界
long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
long maxSize = getMaxSplitSize(job);
// generate splits
List<InputSplit> splits = new ArrayList<InputSplit>();//初始化用于保存生成的输入分片的列表对象
List<FileStatus>files = listStatus(job);//获取所有的输入文件列表
for (FileStatus file: files) {//对文件列表中的每个文件进行相应的分割处理,然后生成文件的输入分片列表
Path path = file.getPath(); //获取文件的path对象
FileSystem fs = path.getFileSystem(job.getConfiguration());//获取文件所在的文件系统
long length = file.getLen();//获取文件的长度
BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length);//获取文件的所有块信息
if ((length != 0) && isSplitable(job, path)) { //如果文件的大小不是0 && 文件是可分割的,则执行分割操作
long blockSize = file.getBlockSize();//获取文件的快大小
/*将文件系统数据块的大小,输入分片的上下界作为参数传给computeSplitSize方法来计算出真正的输入分片的大小。输入分片
* 大小的计算策略为:首先取出块大小和设置的分片大小的上界中的较小值,然后取出上一步计算出的较小值和设置的分片大小的下界
* 的较大值,最终将第二步取出的较大值作为实际分片大小
* */
long splitSize = computeSplitSize(blockSize, minSize, maxSize);
long bytesRemaining = length;//剩余文件大小的初始值作为整个文件的大小
//如果文件未分割的部分的大小比分片的1.1倍大,那么就创建一个FileSplit分片
while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
splits.add(new FileSplit(path, length-bytesRemaining, splitSize,
blkLocations[blkIndex].getHosts()));
bytesRemaining -= splitSize;
}
//当剩余文件的大小比分片的1.1倍小,则将剩余部分作为整个FileSplit分片处理
if (bytesRemaining != 0) {
splits.add(new FileSplit(path, length-bytesRemaining, bytesRemaining,
blkLocations[blkLocations.length-1].getHosts()));
}
} else if (length != 0) { //如果文件是不可分割的,则将整个文件作为一个FileSplit处理
splits.add(new FileSplit(path, 0, length, blkLocations[0].getHosts()));
} else {
//Create empty hosts array for zero length files
splits.add(new FileSplit(path, 0, length, new String[0]));
}
}
4.TestInputFormat 类
TextInputFormat 类是FileInputFormat 的默认实现,该输入格式主要针对的是文本类型的文件,文件被分割成多行,而且每一行都使用换行符(LF=10) 或者【Enter】键作为每一行的结束标识。该类主要重写了FileInputFormat中的createRecordReader ,其返回了LineRecordReader行记录读取器,该读取器用于从文件中读取一行,将这行文本在文件中的偏移量作为key,以这行文本的内容作为value,组成键值对。
二. 输入分片 InputSplit
输入分片InputSplit 是一个单独的Map需要处理的数据单元。输入分片的类型一般都是字节类型的,经过相应的RecordReader 处理后,转化成记录视图的形式,然后交给Map处理。InputSplit存储的并非数据本身,而是一个分片长度和一个记录数据位置的数组,生成InputSplit的方法可通过InputFormat来设置。InputFormat的getSplits方法可以生成InputSplit相关信息,包括两部分:InputSplit元数据信息和原始InputSplit信息。InputSplit元数据信息将被JobTracker使用,用以生成Task本地性相关数据结构;原始InputSplit信息将被Map Task初始化时使用,用以获取自己要处理的数据。
输入分片InputSplit 类有三个子类继承:FileSplit (文件输入分片),CombineFileSplit(多文件输入分片)以及DBInputSplit(数据块输入分片)。
三. 记录读取器 RecordReader
抽象基类RecordReader 的实现来读取InputSplit 中的每条记录,并将之传给Map任务。
1.RecordReader 抽象类中主要有一下几个虚函数:
public abstract void initialize(InputSplit split,TaskAttemptContext context) ;//初始化方法
public abstract boolean nextKeyValue();//获取输入分片的下一个键值对
public abstract KEYIN getCurrentKey();//获取当前读取到的键值对中的键对象
public abstract VALUEIN getCurrentValue();//获取当前读取到的键值对中的值对象
public abstract float getProgress();//获取当前数据的读取进度
public abstract void close();//关闭RecordReader ,清理其占用的资源
2.RecordReader 的几个继承子类:
LineRecordReader 行记录读取器:该类是针对文本文件的默认读取器,一次处理输入分片中的一行,并将偏移量最为键,行的内容作为值,得到键值对。
KeyValueLineRecordReader 键值对读取器:该类将输入文件的整行作为一个由指定分隔符分开的键值对,默认的分隔符为:”\t”,我们可以通过mapreduce.input.keyvaluelinerecordreader.key.value.separator配置项来修改默认的分隔符。
SequenceFileRecordReader 序列文件读取器:该类用于读取SequenceFile 格式的输入分片。
DBRecordReader 数据库记录读取器:该类用于处理DBInputSplit 输入分片,该类会从关系型数据库中读取若干条记录, 将读取到的记录的条数作为键,将内容作为值。
四. 输出格式 OutputFormat
1.OutputFormat 抽象类:
OutputFormat 抽象类描述了M-R作业的输出规范,它决定了将MapReduce 的作业的输出结果保持到哪里,以及如何对输出结果进行持久化操作。其主要完成以下几个工作:
a. 坚持作业的输出是否有效;
b. 提供一个具体的RecordWriter 实现类。
OutputFormat 抽象类主要有三个抽象函数:
public abstract RecordWriter<K, V> getRecordWriter(TaskAttemptContext context )//返回RecordWriter,将K-V 对写入存储结构
public abstract void checkOutputSpecs(JobContext context )//检查输出目录是否存在
public abstract OutputCommitter getOutputCommitter(TaskAttemptContext context )//Hadoop作业使用该类来完成输出结果的提交即将作业的输出结果保存到正确的输出目录中
2.OutputFormat 类有四个继承子类:
FileOutputFormat 类:将键值对写入文件系统
DBOutputFormat 类:将键值对写入数据库中
FitterOutputFormat 类:将OutputFormat 的结果再次封装,类似Java的流的Fitter
NullOutputFormat 类:将键值对写入/dev/null,相当于舍弃这些值
五. 记录写入器 RecordWriter
InputFormat 提供了RecordReader 来从输入文件中读取键值对,OutputFormat 也提供了RecordWriter 用于MapReduce 作业的键值对写入指定的输出中。
RecordWriter 有两个抽象函数:
public abstract void write(K key,V value)//用于将产生的键值对以指定的格式写入到输出目录中
public abstract void close(TaskAttempContext context)//关闭输出,并释放资源