package com.asin.hdp.inverted;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class InvertedIndexCombine {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(InvertedIndexCombine.class);
job.setMapperClass(invertedMapper.class);
job.setCombinerClass(invertedCombine.class);
job.setReducerClass(invertedReduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path("e:/a.txt"));
FileInputFormat.addInputPath(job, new Path("e:/b.txt"));
FileInputFormat.addInputPath(job, new Path("e:/c.txt"));
FileOutputFormat.setOutputPath(job, new Path("e:/outputCombine"));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
public static class invertedMapper extends Mapper<LongWritable, Text, Text, Text> {
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context)
throws IOException, InterruptedException {
FileSplit split = (FileSplit) context.getInputSplit();
Path path = split.getPath();
String name = path.getName().replace("e:/", "");
StringTokenizer token = new StringTokenizer(value.toString(), " ");
while (token.hasMoreTokens()) {
context.write(new Text(name + "\t" + token.nextToken()), new Text("1"));
}
}
}
public static class invertedCombine extends Reducer<Text, Text, Text, Text> {
@Override
protected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context)
throws IOException, InterruptedException {
String line = key.toString();
String[] split = line.split("\t");
int sum = 0;
for (Text text : values) {
sum += Integer.parseInt(text.toString());
}
context.write(new Text(split[1]), new Text(split[0] + ":" + sum));
}
}
public static class invertedReduce extends Reducer<Text, Text, Text, Text> {
@Override
protected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context)
throws IOException, InterruptedException {
String val = "";
for (Text text : values) {
val += text + "\t";
}
context.write(new Text(key), new Text(val));
}
}
}