对于不同文件里的数据,有时候有相应关系,须要进行连接(join),获得一个新的文件以便进行分析。比方有两个输入文件a.txt,b.txt,当中的数据格式分别例如以下
1 a 2 b 3 c 4 d
1 good 2 bad 3 ok 4 hello
须要将其连接成一个新的例如以下的文件:
a good b bad c ok d hello
处理步骤能够分成两步:
1.map阶段,将两个输入文件里的数据进行打散,例如以下:
1 a 1 good 2 b 2 bad 3 c 3 ok 4 d 4 hello
2.reduce阶段,进行数据的连接操作,此处数据较简单,仅仅要推断map结果的value的长度是不是1就决定是新的键还是值。
package cn.zhf.hadoop; import java.io.IOException; import java.util.Iterator; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class SingleJoin extends Configured implements Tool{ public static void main(String[] args) throws Exception { Tool tool = new SingleJoin(); ToolRunner.run(tool, args); print(tool); } @Override public int run(String[] arg0) throws Exception { Configuration conf = getConf(); Job job = new Job(); job.setJarByClass(getClass()); FileSystem fs = FileSystem.get(conf); fs.delete(new Path("out"),true); FileInputFormat.addInputPath(job, new Path("a.txt")); FileInputFormat.addInputPath(job, new Path("b.txt")); FileOutputFormat.setOutputPath(job,new Path("out")); job.setMapperClass(JoinMapper.class); job.setReducerClass(JoinReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.waitForCompletion(true); return 0; } public static class JoinMapper extends Mapper<LongWritable,Text,Text,Text>{ public void map(LongWritable key,Text value,Context context) throws IOException, InterruptedException{ String[] str = value.toString().split(" "); context.write(new Text(str[0]), new Text(str[1])); } } public static class JoinReducer extends Reducer<Text,Text,Text,Text>{ public void reduce(Text key,Iterable<Text> values,Context context) throws IOException, InterruptedException{ Iterator<Text> iterator = values.iterator(); Text keyy = new Text(); Text valuee = new Text(); while(iterator.hasNext()){ Text temp = iterator.next(); if(temp.toString().length() == 1){ keyy.set(temp); valuee.set(iterator.next()); }else{ valuee.set(temp); keyy.set(iterator.next()); } } context.write(keyy, valuee); } } public static void print(Tool tool) throws IOException{ FileSystem fs = FileSystem.get(tool.getConf()); Path path = new Path("out/part-r-00000"); FSDataInputStream fsin = fs.open(path); int length = 0; byte[] buff = new byte[128]; while((length = fsin.read(buff,0,128)) != -1) System.out.println(new String(buff,0,length)); } }
reference:《MapReduce2.0源代码分析及编程实践》
时间: 2024-10-14 12:38:02