数据去重
1、原始数据
1)file1:
2012-3-1 a
2012-3-2 b
2012-3-3 c
2012-3-4 d
2012-3-5 a
2012-3-6 b
2012-3-7 c
2012-3-3 c
2)file2:
2012-3-1 b
2012-3-2 a
2012-3-3 b
2012-3-4 d
2012-3-5 a
2012-3-6 c
2012-3-7 d
2012-3-3 c
数据输出:
2012-3-1 a
2012-3-1 b
2012-3-2 a
2012-3-2 b
2012-3-3 b
2012-3-3 c
2012-3-4 d
2012-3-5 a
2012-3-6 b
2012-3-6 c
2012-3-7 c
2012-3-7 d
2、说明
数据去重的最终目标是让原始数据中出现次数超过一次的数据在输出文件中只出现一次。我们自然而然会想到将同一个数据的所有记录都交给一台reduce机器,
无论这个数据出现多少次,只要在最终结果中输出一次就可以了。具体就是reduce的输入应该以数据作为key,
而对value-list则没有要求。当reduce接收到一个<key,value-list>时就直接将key复制到输出的key中,并将value设置成空值。
代码测试:
public class DeMapper extends Mapper<LongWritable, Text, Text, Text> {
private Text val = new Text("");
@Override
protected void map(LongWritable key, Text value,Context context)
throws IOException, InterruptedException {
String line = value.toString();
if(line.trim().length()>0){
context.write(new Text(line.trim()),val );
}
}
}
public class DeReducer extends Reducer<Text, Text, Text, NullWritable> {
@Override
protected void reduce(Text key, Iterable<Text> values,Context context)
throws IOException, InterruptedException {
context.write(key, NullWritable.get());
}
}
public class JobMain {
/**
* @param args
*/
public static void main(String[] args)throws Exception{
Configuration configuration = new Configuration();
Job job= new Job(configuration, "de-job");
job.setJarByClass(JobMain.class);
job.setMapperClass(DeMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setReducerClass(DeReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
Path outputDir = new Path(args[1]);
FileSystem fs = FileSystem.get(configuration);
if(fs.exists(outputDir)){
fs.delete(outputDir, true);
}
FileOutputFormat.setOutputPath(job, outputDir);
job.setNumReduceTasks(1);
System.exit(job.waitForCompletion(true)?0:1);
}
}
运行结果为: