需求:
对文件中的内容进行索引,并且显示出对文件内容索引以及所在文件名称:
a.txt hello tom hello jerry hello tom b.txt hello jerry hello jerry tom jerry c.txt hello jerry hello tom
最终输出结果:
hello a.txt-->3 b.txt-->2 c.txt -->2 jerry a.txt-->1 b.txt-->3 c.txt -->1 tom a.txt-->2 b.txt-->1 c.txt -->1
代码实现:
import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class InverseIndexStepOne { static class InverseIndexStepOneMapper extends Mapper<LongWritable, Text, Text, IntWritable> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] words = line.split(" "); // 因为map方法是由maptask来调用的,而maptask知道自己所读的数据所在的切片 // 进而,在切片信息中有切片所属的文件信息 FileSplit inputSplit = (FileSplit) context.getInputSplit(); String fileName = inputSplit.getPath().getName(); for (String word : words) { context.write(new Text(word + "-->" + fileName), new IntWritable(1)); } } } static class InverseIndexStepOneReducer extends Reducer<Text, IntWritable, Text, LongWritable> { @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { long count = 0; for (IntWritable value : values) { count += value.get(); } context.write(key, new LongWritable(count)); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(InverseIndexStepOne.class); job.setMapperClass(InverseIndexStepOneMapper.class); job.setReducerClass(InverseIndexStepOneReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); /** * hadoop中默认的输入输出组件就是TextInputformat和textoutputformat,所以,这两句代码也可以省略 */ /* * job.setInputFormatClass(TextInputFormat.class); * job.setOutputFormatClass(TextOutputFormat.class); */ FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); boolean res = job.waitForCompletion(true); System.exit(res ? 0 : 1); } }
输出结果为:
但是这样结果并不是我们想要的,所以在这个文件基础上需要在写一个Mapreduce程序来达到我们想要的结果
代码实现:
import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /** * 是基于上一个步骤的结果来进行处理 * * hello-->a.txt 3 hello-->b.txt 2 hello-->c.txt 2 jerry-->a.txt 1 jerry-->b.txt * 3 jerry-->c.txt 1 tom-->a.txt 2 tom-->b.txt 1 tom-->c.txt 1 * * @author * */ public class InverseIndexStepTwo { static class InverseIndexStepTwoMapper extends Mapper<LongWritable, Text, Text, Text> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] split = line.split("\t"); String[] wordAndFile = split[0].split("-->"); String count = split[1]; String word = wordAndFile[0]; String fileName = wordAndFile[1]; context.write(new Text(word), new Text(fileName + "-->" + count)) } } static class InverseIndexStepTwoReducer extends Reducer<Text, Text, Text, Text> { // 输入:<hello,a.txt-->3><hello,b.txt-->2><hello,c.txt-->1> // 输出: hello a.txt-->3 b.txt-->2 c.txt -->2 @Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { StringBuffer sb = new StringBuffer(); for (Text value : values) { sb.append(value.toString()).append(" "); } context.write(key, new Text(sb.toString())); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(InverseIndexStepTwo.class); job.setMapperClass(InverseIndexStepTwoMapper.class); job.setReducerClass(InverseIndexStepTwoReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); /** * hadoop中默认的输入输出组件就是TextInputformat和textoutputformat,所以,这两句代码也可以省略 */ /* * job.setInputFormatClass(TextInputFormat.class); * job.setOutputFormatClass(TextOutputFormat.class); */ FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); boolean res = job.waitForCompletion(true); System.exit(res ? 0 : 1); } }
时间: 2024-10-14 02:17:12