Hadoop提供了较为丰富的数据输入输出格式,可以满足很多的设计实现,但是在某些时候需要自定义输入输出格式。
数据的输入格式用于描述MapReduce作业的数据输入规范,MapReduce框架依靠数据输入格式完后输入规范检查(比如输入文件目录的检查),对数据文件进行输入分块(InputSpilt)以及提供从输入分快中将数据逐行的读出,并转换为Map过程的输入键值对等功能。Hadoop提供了很多的输入格式,TextInputFormat和KeyValueInputFormat,对于每个输入格式都有与之对应的RecordReader,LineRecordReader和KeyValueLineRecordReader。用户需要自定义输入格式,主要实现InputFormat中的createRecordReader()和getSplit()方法,而在RecordReader中实现getCurrentKey().....
例如:
package com.rpc.nefu; import java.io.IOException; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.util.LineReader; import org.apache.hadoop.mapreduce.lib.input.FileSplit; //自定义的输入格式需要 继承FileInputFormat接口 public class ZInputFormat extends FileInputFormat<IntWritable,IntWritable>{ @Override //实现RecordReader public RecordReader<IntWritable, IntWritable> createRecordReader( InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { return new ZRecordReader(); } //自定义的数据类型 public static class ZRecordReader extends RecordReader<IntWritable,IntWritable> { //data private LineReader in; //输入流 private boolean more = true;//提示后续还有没有数据 private IntWritable key = null; private IntWritable value = null; //这三个保存当前读取到位置(即文件中的位置) private long start; private long end; private long pos; //private Log LOG = LogFactory.getLog(ZRecordReader.class);//日志写入系统,可加可不加 @Override public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { // 初始化函数 FileSplit inputsplit = (FileSplit)split; start = inputsplit.getStart(); //得到此分片开始位置 end = start + inputsplit.getLength();//结束此分片位置 final Path file = inputsplit.getPath(); // 打开文件 FileSystem fs = file.getFileSystem(context.getConfiguration()); FSDataInputStream fileIn = fs.open(inputsplit.getPath()); //将文件指针移动到当前分片,因为每次默认打开文件时,其指针指向开头 fileIn.seek(start); in = new LineReader(fileIn, context.getConfiguration()); if (start != 0) { System.out.println("4"); //如果这不是第一个分片,那么假设第一个分片是0——4,那么,第4个位置已经被读取,则需要跳过4,否则会产生读入错误,因为你回头又去读之前读过的地方 start += in.readLine(new Text(), 0, maxBytesToConsume(start)); } pos = start; } private int maxBytesToConsume(long pos) { return (int) Math.min(Integer.MAX_VALUE, end - pos); } @Override public boolean nextKeyValue() throws IOException, InterruptedException { //下一组值 //tips:以后在这种函数中最好不要有输出,费时 //LOG.info("正在读取下一个,嘿嘿"); if(null == key) { key = new IntWritable(); } if(null == value) { value = new IntWritable(); } Text nowline = new Text();//保存当前行的内容 int readsize = in.readLine(nowline); //更新当前读取到位置 pos += readsize; //如果pos的值大于等于end,说明此分片已经读取完毕 if(pos >= end) { more = false; return false; } if(0 == readsize) { key = null; value = null; more = false;//说明此时已经读取到文件末尾,则more为false return false; } String[] keyandvalue = nowline.toString().split(","); //排除第一行 if(keyandvalue[0].endsWith("\"CITING\"")) { readsize = in.readLine(nowline); //更新当前读取到位置 pos += readsize; if(0 == readsize) { more = false;//说明此时已经读取到文件末尾,则more为false return false; } //重新划分 keyandvalue = nowline.toString().split(","); } //得到key和value //LOG.info("key is :" + key +"value is" + value); key.set(Integer.parseInt(keyandvalue[0])); value.set(Integer.parseInt(keyandvalue[1])); return true; } @Override public IntWritable getCurrentKey() throws IOException, InterruptedException { //得到当前的Key return key; } @Override public IntWritable getCurrentValue() throws IOException, InterruptedException { //得到当前的value return value; } @Override public float getProgress() throws IOException, InterruptedException { //计算对于当前片的处理进度 if( false == more || end == start) { return 0f; } else { return Math.min(1.0f, (pos - start)/(end - start)); } } @Override public void close() throws IOException { //关闭此输入流 if(null != in) { in.close(); } } } }
package reverseIndex; import java.io.IOException; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.mapreduce.lib.input.LineRecordReader; public class FileNameLocInputFormat extends FileInputFormat<Text, Text>{ @Override public org.apache.hadoop.mapreduce.RecordReader<Text, Text> createRecordReader( org.apache.hadoop.mapreduce.InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { // TODO Auto-generated method stub return new FileNameLocRecordReader(); } public static class FileNameLocRecordReader extends RecordReader<Text,Text>{ String FileName; LineRecordReader line = new LineRecordReader(); /** * ...... */ @Override public Text getCurrentKey() throws IOException, InterruptedException { // TODO Auto-generated method stub return new Text("("+FileName+"@"+line.getCurrentKey()+")"); } @Override public Text getCurrentValue() throws IOException, InterruptedException { // TODO Auto-generated method stub return line.getCurrentValue(); } @Override public void initialize(InputSplit split, TaskAttemptContext arg1) throws IOException, InterruptedException { // TODO Auto-generated method stub line.initialize(split, arg1); FileSplit inputsplit = (FileSplit)split; FileName = (inputsplit).getPath().getName(); } @Override public void close() throws IOException { // TODO Auto-generated method stub } @Override public float getProgress() throws IOException, InterruptedException { // TODO Auto-generated method stub return 0; } @Override public boolean nextKeyValue() throws IOException, InterruptedException { // TODO Auto-generated method stub return false; } } }
Hadoop中也内置了很多的输出格式与RecordWriter.输出格式完成输出规范检查,作业结果数据输出。
自定义的输出格式:
public static class AlphaOutputFormat extends multiformat<Text, IntWritable>{ @Override protected String generateFileNameForKeyValue(Text key, IntWritable value, Configuration conf) { // TODO Auto-generated method stub char c = key.toString().toLowerCase().charAt(0); if( c>='a' && c<='z'){ return c+".txt"; }else{ return "other.txt"; } } }
//设置输出格式 job.setOutputFormatClass(AlphaOutputFormat.class);
package com.rpc.nefu; import java.io.DataOutputStream; import java.io.IOException; import java.util.HashMap; import java.util.Iterator; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.GzipCodec; import org.apache.hadoop.mapreduce.OutputCommitter; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.ReflectionUtils; public abstract class multiformat<K extends WritableComparable<?>, V extends Writable> extends FileOutputFormat<K, V> { private MultiRecordWriter writer = null; public RecordWriter<K, V> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException { if (writer == null) { writer = new MultiRecordWriter(job, getTaskOutputPath(job)); } return writer; } private Path getTaskOutputPath(TaskAttemptContext conf) throws IOException { Path workPath = null; OutputCommitter committer = super.getOutputCommitter(conf); if (committer instanceof FileOutputCommitter) { workPath = ((FileOutputCommitter) committer).getWorkPath(); } else { Path outputPath = super.getOutputPath(conf); if (outputPath == null) { throw new IOException("Undefined job output-path"); } workPath = outputPath; } return workPath; } /**通过key, value, conf来确定输出文件名(含扩展名)*/ protected abstract String generateFileNameForKeyValue(K key, V value, Configuration conf); public class MultiRecordWriter extends RecordWriter<K, V> { /**RecordWriter的缓存*/ private HashMap<String, RecordWriter<K, V>> recordWriters = null; private TaskAttemptContext job = null; /**输出目录*/ private Path workPath = null; public MultiRecordWriter(TaskAttemptContext job, Path workPath) { super(); this.job = job; this.workPath = workPath; recordWriters = new HashMap<String, RecordWriter<K, V>>(); } @Override public void close(TaskAttemptContext context) throws IOException, InterruptedException { Iterator<RecordWriter<K, V>> values = this.recordWriters.values().iterator(); while (values.hasNext()) { values.next().close(context); } this.recordWriters.clear(); } @Override public void write(K key, V value) throws IOException, InterruptedException { //得到输出文件名 String baseName = generateFileNameForKeyValue(key, value, job.getConfiguration()); RecordWriter<K, V> rw = this.recordWriters.get(baseName); if (rw == null) { rw = getBaseRecordWriter(job, baseName); this.recordWriters.put(baseName, rw); } rw.write(key, value); } // ${mapred.out.dir}/_temporary/_${taskid}/${nameWithExtension} private RecordWriter<K, V> getBaseRecordWriter(TaskAttemptContext job, String baseName) throws IOException, InterruptedException { Configuration conf = job.getConfiguration(); boolean isCompressed = getCompressOutput(job); String keyValueSeparator = ","; RecordWriter<K, V> recordWriter = null; if (isCompressed) { Class<? extends CompressionCodec> codecClass = getOutputCompressorClass(job, GzipCodec.class); CompressionCodec codec = ReflectionUtils.newInstance(codecClass, conf); Path file = new Path(workPath, baseName + codec.getDefaultExtension()); FSDataOutputStream fileOut = file.getFileSystem(conf).create(file, false); recordWriter = new lineRecordWrite<K, V>(new DataOutputStream(codec .createOutputStream(fileOut)), keyValueSeparator); } else { Path file = new Path(workPath, baseName); FSDataOutputStream fileOut = file.getFileSystem(conf).create(file, false); recordWriter = new lineRecordWrite<K, V>(fileOut, keyValueSeparator); } return recordWriter; } } }
时间: 2024-10-13 22:23:35