多种自定义文件格式的文件输入处理
MultipleInputs可以让MR支持多种输入格式
比如我们有两种文件格式,那么我们就要有两套Record Class,RecordReader和InputFormat
InputFormat(extends FileInputFormat)--->RecordReader(extends RecordReader)--->RecordClass(implements Writable)
MultipleInpts需要不同的InputFormat,一种InputFormat使用一种RecordReader来读取文件并返回一种Record格式的值
这就是这三个典型的关系,也是map过程中涉及的三个步骤的工具和产物
数据准备
a文件
1t80
2t90
3t100
4t50
5t73
b文件
1tlilit3
2txiaomingt3
3tfeifeit3
4tzhangsant3
5tlisit3
t表示分隔符
设计思路
将t前面的Text表示给map将要输入的key
t后面的作为给map要输入的value
要求自定义实现InputFormat,输出key,value格式数据。以产生Map的输入的数据(key,value)
!!!三个文件步骤!!!
InputFormat(extends FileInputFormat)--->RecordReader(extends RecordReader)--->RecordClass(implements Writable)
本例是对两个文件操作
1.两个RecordClass类(实现Writable接口)
package test.mr.multiinputs2; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.Writable; /* * 对map输入的value的预处理 * 对原始数据的预加工 */ /* * 第一张表数据 */ public class FirstClass implements Writable { private String value; public String getValue() { return value; } public void setValue(String value) { this.value = value; } public FirstClass() { super(); // TODO Auto-generated constructor stub } public FirstClass(String value) { super(); this.value = value; } @Override public void write(DataOutput out) throws IOException { out.writeUTF(this.value); } @Override public void readFields(DataInput in) throws IOException { this.value = in.readUTF(); } @Override public String toString() { return "FirstClasst" + value; } }
package test.mr.multiinputs2; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.Writable; /* * 对map输入的value的预处理 * 对原始数据的预加工 */ /* * 第二张表数据 */ public class SecondClass implements Writable { private String username; private int classNo; public SecondClass() { super(); } public SecondClass(String username, int classNo) { super(); this.username = username; this.classNo = classNo; } public String getUsername() { return username; } public void setUsername(String username) { this.username = username; } public int getClassNo() { return classNo; } public void setClassNo(int classNo) { this.classNo = classNo; } @Override public void write(DataOutput out) throws IOException { out.writeUTF(username); out.writeInt(classNo); } @Override public void readFields(DataInput in) throws IOException { this.username = in.readUTF(); this.classNo = in.readInt(); } @Override public String toString() { return "SecondClasst" + username + "t" + classNo; } }
2.两个自定义RecordReader类(继承RecordReader类)
package test.mr.multiinputs2; import java.io.IOException; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.LineRecordReader; public class FirstRecordReader extends RecordReader<Text, FirstClass> { // 定义一个真正读取split中文件的读取器 private LineRecordReader lineRecordReader = null; private Text key = null; private FirstClass value = null; @Override public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { close(); lineRecordReader = new LineRecordReader(); lineRecordReader.initialize(split, context); } @Override public boolean nextKeyValue() throws IOException, InterruptedException { // 没有读取到东西 if (!lineRecordReader.nextKeyValue()) { key = null; value = null; return false; } Text val = lineRecordReader.getCurrentValue(); String line = val.toString(); String[] str = line.split("t"); key = new Text(str[0]); value = new FirstClass(str[1].trim()); // 实现对原始数据的预分割 return true; } // 读取key的当前值 @Override public Text getCurrentKey() throws IOException, InterruptedException { return key; } // 读取value的当前值 @Override public FirstClass getCurrentValue() throws IOException, InterruptedException { return value; } @Override public float getProgress() throws IOException, InterruptedException { return lineRecordReader.getProgress(); } @Override public void close() throws IOException { if (null != lineRecordReader) { lineRecordReader.close(); lineRecordReader = null; } key = null; value = null; } }
package test.mr.multiinputs2; import java.io.IOException; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.LineRecordReader; public class SecondRecordReader extends RecordReader<Text, SecondClass> { // 定义一个真正读取split中文件的读取器 private LineRecordReader lineRecordReader = null; private Text key = null; private SecondClass value = null; @Override public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { close(); lineRecordReader = new LineRecordReader(); lineRecordReader.initialize(split, context); } @Override public boolean nextKeyValue() throws IOException, InterruptedException { if (!lineRecordReader.nextKeyValue()) { key = null; value = null; return false; } Text val = lineRecordReader.getCurrentValue(); String line = val.toString(); String str[] = line.split("t"); key = new Text(str[0]); value = new SecondClass(str[1], Integer.parseInt(str[2])); return true; } @Override public Text getCurrentKey() throws IOException, InterruptedException { return key; } @Override public SecondClass getCurrentValue() throws IOException, InterruptedException { return value; } @Override public float getProgress() throws IOException, InterruptedException { return lineRecordReader.getProgress(); } @Override public void close() throws IOException { if (null != lineRecordReader) { lineRecordReader.close(); lineRecordReader = null; } key = null; value = null; } }
3.自定义两个FileInputFormat类(继承FileInputFormat类)
package test.mr.multiinputs2; import java.io.IOException; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; public class FirstInputFormat extends FileInputFormat<Text, FirstClass> { @Override public RecordReader<Text, FirstClass> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { return new FirstRecordReader(); } }
package test.mr.multiinputs2; import java.io.IOException; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; public class SecondInputFormat extends FileInputFormat<Text, SecondClass> { @Override public RecordReader<Text, SecondClass> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { return new SecondRecordReader(); } }
4.两个Map类
package test.mr.multiinputs2; import java.io.IOException; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class FirstMap extends Mapper<Text, FirstClass, Text, Text> { @Override protected void map(Text key, FirstClass value, Mapper<Text, FirstClass, Text, Text>.Context context) throws IOException, InterruptedException { context.write(key, new Text(value.toString())); } }
package test.mr.multiinputs2; import java.io.IOException; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class SecondMap extends Mapper<Text, SecondClass, Text, Text> { @Override protected void map(Text key, SecondClass value, Mapper<Text, SecondClass, Text, Text>.Context context) throws IOException, InterruptedException { context.write(key, new Text(value.toString())); } }
5.reduce类
package test.mr.multiinputs2; import java.io.IOException; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class MultiInputsRedu extends Reducer<Text, Text, Text, Text> { @Override protected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context) throws IOException, InterruptedException { for (Text val : values) { context.write(key, val); } } }
6.Job类
/* * 要求自定义实现InputFormat,输出key,value格式数据 */ public class MultiInputsMain extends Configuration implements Tool { private String input1 = null; // 定义的多个输入文件 private String input2 = null; private String output = null; @Override public void setConf(Configuration conf) { } @Override public Configuration getConf() { return new Configuration(); } @Override public int run(String[] args) throws Exception { setArgs(args); checkParam();// 对参数进行检测 Configuration conf = new Configuration(); Job job = new Job(conf); job.setJarByClass(MultiInputsMain.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setReducerClass(MultiInputsRedu.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); // MultipleInputs类添加文件路径 // 添加上自定义的fileInputFormat(分别是FirstInputFormat和SecondInputFormat)格式 MultipleInputs.addInputPath(job, new Path(input1), FirstInputFormat.class, FirstMap.class); MultipleInputs.addInputPath(job, new Path(input2), SecondInputFormat.class, SecondMap.class); FileOutputFormat.setOutputPath(job, new Path(output)); job.waitForCompletion(true); return 0; } private void checkParam() { if (input1 == null || "".equals(input1.trim())) { System.out.println("no input phone-data path"); userMaunel(); System.exit(-1); } if (input2 == null || "".equals(input2.trim())) { System.out.println("no input user-data path"); userMaunel(); System.exit(-1); } if (output == null || "".equals(output.trim())) { System.out.println("no output path"); userMaunel(); System.exit(-1); } } // 用户手册 private void userMaunel() { System.err.println("Usage:"); System.err.println("-i1 input \t phone data path."); System.err.println("-i2 input \t user data path."); System.err.println("-o output \t output data path."); } // 对属性进行赋值 // 设置输入的格式:-i1 xxx(输入目录) -i2 xxx(输入目录) -o xxx(输出目录) private void setArgs(String[] args) { for (int i = 0; i < args.length; i++) { if ("-i1".equals(args[i])) { input1 = args[++i]; // 将input1赋值为第一个文件的输入路径 } else if ("-i2".equals(args[i])) { input2 = args[++i]; } else if ("-o".equals(args[i])) { output = args[++i]; } } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); ToolRunner.run(conf, new MultiInputsMain(), args); // 调用run方法 } }