在旧版本的samples中,使用的是旧的api,mapred下面的MultiFileInputFormat,现在已经过时。
现在推荐使用mapreduce下面的CombineInputFormat来处理。
应用场景:
如果文件数量大,而且单个文件又比较小,若是使用FileInputFormat进行分片,则会根据一个文件生成一个分片,
每个分片又丢给一个maptask,这样maptask处理的内容太小,很快就完成了,利用率不高,因为maptask本身启动
处理所占的时间和资源消耗就超过了信息处理本身所占的时间。推荐一个maptask至少运行一分钟左右。
解决方案:
使用combinefileinputformat来重定义了getSplits方法,这样可以根据我们指定的splitsize(一般是给定为blocksize大小,减少数据传输)
,打包多个小文件到一个inputsplit中去。这样减少了框架生成的maptask的数量。
示例:
例如我的englishwords目录下面有四个文件,使用wordcount示例来跑的话,默认生成4个maptask(不考虑失败又生成的maptask)一个reducetask.
使用旧版的api生成了2个maptask,使用新版的multiplefilewordcount示例生成了一个maptask.
CombineFileInputformat 中可以重写的一个重要方法是:
/** * Specify the maximum size (in bytes) of each split. Each split is * approximately equal to the specified size. */ protected void setMaxSplitSize(long maxSplitSize) { this.maxSplitSize = maxSplitSize; }
示例中又自己写了一个数据结构wordoffset, 是因为原来的只考虑一个文件(一个分片一个文件)中的信息,所以key是offset,value是当前行的值。
现在一个分片中会有多个文件,所以新的数据结构wordoffset就表示哪个文件的offset,这样更明晰。
有时候我们在项目中就需要自己定义maptask的参数。这个结构是需要实现writable接口的(可以序列化)。
使用CombineFileInputFormat最重要的就是实现 Reader的方法,Reader中最重要的就是next().
基本思路其实和单个文件的是类似的, 只是在这种情况下需要处理多个文件的情况,需要有一个index来标志是正在处理哪个文件。
一般在combineReader里面会有如下的代码:
public static class CombineFileLineRecordReader extends RecordReader<WordOffset, Text> { private long startOffset; //offset of the chunk; private long end; //end of the chunk; private long pos; // current pos private FileSystem fs; private Path path; private WordOffset key; private Text value; private FSDataInputStream fileIn; private LineReader reader; public CombineFileLineRecordReader(CombineFileSplit split, TaskAttemptContext context, Integer index) throws IOException { this.path = split.getPath(index); fs = this.path.getFileSystem(context.getConfiguration()); this.startOffset = split.getOffset(index); this.end = startOffset + split.getLength(index); boolean skipFirstLine = false; //open the file fileIn = fs.open(path); if (startOffset != 0) { skipFirstLine = true; --startOffset; fileIn.seek(startOffset); } reader = new LineReader(fileIn); if (skipFirstLine) { // skip first line and re-establish "startOffset". startOffset += reader.readLine(new Text(), 0, (int)Math.min((long)Integer.MAX_VALUE, end - startOffset)); } this.pos = startOffset; }
…………