MultipleInputs类指定不同的输入文件路径以及输入文化格式
现有两份数据
phone
123,good number
124,common number
125,bad number
user
zhangsan,123
lisi,124
wangwu,125
现在需要把user和phone按照phone number连接起来。得到下面的结果
zhangsan,123,good number
lisi,123,common number
wangwu,125,bad number
分析思路
还是相当于两张表的一对一join操作。join时对value设置个Bean(JavaBean实现writablecomparable接口),key为外键值
join的优化,详见http://blog.csdn.net/u010366796/article/details/44649933,设置KeyBean(外健和标识flag属性),进行排序
本例中将通过value进行排序,即在value的JavaBean中通过实习CompareTo()方法,完成排序,使得phone表位于首位
1.对value实现JavaBean(实现writablecomparable接口)
package test.mr.multiinputs; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.WritableComparable; /* * 自定义的JavaBean */ public class FlagString implements WritableComparable<FlagString> { private String value; private int flag; // 标记 0:表示phone表 1:表示user表 public FlagString() { super(); // TODO Auto-generated constructor stub } public FlagString(String value, int flag) { super(); this.value = value; this.flag = flag; } public String getValue() { return value; } public void setValue(String value) { this.value = value; } public int getFlag() { return flag; } public void setFlag(int flag) { this.flag = flag; } @Override public void write(DataOutput out) throws IOException { out.writeInt(flag); out.writeUTF(value); } @Override public void readFields(DataInput in) throws IOException { this.flag = in.readInt(); this.value = in.readUTF(); } @Override public int compareTo(FlagString o) { if (this.flag >= o.getFlag()) { if (this.flag > o.getFlag()) { return 1; } } else { return -1; } return this.value.compareTo(o.getValue()); } }
2.多map类,map1(实现对phone表文件操作)
package test.mr.multiinputs; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class MultiMap1 extends Mapper<LongWritable, Text, Text, FlagString> { private String delimiter; // 定义分隔符,由job端设置 @Override protected void setup( Mapper<LongWritable, Text, Text, FlagString>.Context context) throws IOException, InterruptedException { delimiter = context.getConfiguration().get("delimiter", ","); } @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, FlagString>.Context context) throws IOException, InterruptedException { String line = value.toString().trim(); if (line.length() > 0) { String[] str = line.split(delimiter); if (str.length == 2) { context.write(new Text(str[0].trim()), new FlagString(str[1].trim(), 0)); // flag=0,表示phone表 } } } }
2.map2(实现对user表文件操作)
package test.mr.multiinputs; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class MultiMap2 extends Mapper<LongWritable, Text, Text, FlagString> { private String delimiter; // 设置分隔符 @Override protected void setup( Mapper<LongWritable, Text, Text, FlagString>.Context context) throws IOException, InterruptedException { delimiter = context.getConfiguration().get("delimiter", ","); } @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, FlagString>.Context context) throws IOException, InterruptedException { String line = value.toString().trim(); if (line.length() > 0) { String[] str = line.split(delimiter); if (str.length == 2) { context.write(new Text(str[1].trim()), new FlagString(str[0].trim(), 1)); // flag=1为user表 } } } }
3.reduce类
package test.mr.multiinputs; import java.io.IOException; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class MultiRedu extends Reducer<Text, FlagString, NullWritable, Text> { private String delimiter; // 设置分隔符 @Override protected void setup( Reducer<Text, FlagString, NullWritable, Text>.Context context) throws IOException, InterruptedException { delimiter = context.getConfiguration().get("delimiter", ","); } @Override protected void reduce(Text key, Iterable<FlagString> values, Reducer<Text, FlagString, NullWritable, Text>.Context context) throws IOException, InterruptedException { // 最后输出的格式为: uservalue,key,phonevalue String phoneValue = ""; String userValue = ""; int num = 0; for (FlagString value : values) { // 第一个即为phone表 if (num == 0) { phoneValue = value.getValue(); num++; } else { userValue = value.getValue(); context.write(NullWritable.get(), new Text(userValue + key.toString() + phoneValue)); } } } }
4.job类(关键!!实现多文件的输入格式等)
package test.mr.multiinputs; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.MultipleInputs; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; /* * MultipleInputs类指定不同的输入文件路径以及输入文化格式 现有两份数据 phone 123,good number 124,common number 123,bad number user zhangsan,123 lisi,124 wangwu,125 现在需要把user和phone按照phone number连接起来。得到下面的结果 zhangsan,123,good number lisi,123,common number wangwu,125,bad number */ public class MultiMapMain extends Configuration implements Tool { private String input1 = null; // 定义的多个输入文件 private String input2 = null; private String output = null; private String delimiter = null; @Override public void setConf(Configuration conf) { } @Override public Configuration getConf() { return new Configuration(); } @Override public int run(String[] args) throws Exception { setArgs(args); checkParam();// 对参数进行检测 Configuration conf = new Configuration(); Job job = new Job(conf); job.setJarByClass(MultiMapMain.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(FlagString.class); job.setReducerClass(MultiRedu.class); job.setOutputKeyClass(NullWritable.class); job.setOutputValueClass(Text.class); // MultipleInputs类添加文件路径 MultipleInputs.addInputPath(job, new Path(input1), TextInputFormat.class, MultiMap1.class); MultipleInputs.addInputPath(job, new Path(input2), TextInputFormat.class, MultiMap2.class); FileOutputFormat.setOutputPath(job, new Path(output)); job.waitForCompletion(true); return 0; } private void checkParam() { if (input1 == null || "".equals(input1.trim())) { System.out.println("no input phone-data path"); userMaunel(); System.exit(-1); } if (input2 == null || "".equals(input2.trim())) { System.out.println("no input user-data path"); userMaunel(); System.exit(-1); } if (output == null || "".equals(output.trim())) { System.out.println("no output path"); userMaunel(); System.exit(-1); } if (delimiter == null || "".equals(delimiter.trim())) { System.out.println("no delimiter"); userMaunel(); System.exit(-1); } } // 用户手册 private void userMaunel() { System.err.println("Usage:"); System.err.println("-i1 input \t phone data path."); System.err.println("-i2 input \t user data path."); System.err.println("-o output \t output data path."); System.err.println("-delimiter data delimiter \t default comma."); } // 对属性进行赋值 // 设置输入的格式:-i1 xxx(输入目录) -i2 xxx(输入目录) -o xxx(输出目录) -delimiter x(分隔符) private void setArgs(String[] args) { for (int i = 0; i < args.length; i++) { if ("-i1".equals(args[i])) { input1 = args[++i]; // 将input1赋值为第一个文件的输入路径 } else if ("-i2".equals(args[i])) { input2 = args[++i]; } else if ("-o".equals(args[i])) { output = args[++i]; } else if ("-delimiter".equals(args[i])) { delimiter = args[++i]; } } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); ToolRunner.run(conf, new MultiMapMain(), args); // 调用run方法 } }
时间: 2024-09-29 23:58:26