1, tf-idf
计算每个人的词条中的重要度
需要3个mapreduce 的 job执行, 第一个计算 TF 和 n, 第二个计算 DF, 第三个代入公式计算结果值
1, 第一个job
package com.wenbronk.weibo; import java.io.IOException; import java.io.StringReader; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.wltea.analyzer.core.IKSegmenter; import org.wltea.analyzer.core.Lexeme; /** * 第一个map, 计算 TF 和 N * * @author root * */ public class FirstMapper extends Mapper<LongWritable, Text, Text, IntWritable> { /** * TF 在一个文章中出现的词频 N 总共多少文章 * 按行传入 */ @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException { String[] values = value.toString().trim().split("\t"); if (values.length >= 2) { String id = values[0].trim(); String content = values[1].trim(); // 分词 StringReader stringReader = new StringReader(content); IKSegmenter ikSegmenter = new IKSegmenter(stringReader, true); Lexeme word = null; while ((word = ikSegmenter.next()) != null ) { String w = word.getLexemeText(); context.write(new Text(w + "_" + id), new IntWritable(1)); } context.write(new Text("count"), new IntWritable(1)); }else { System.out.println(values.toString() + "---"); } } }
reduce
package com.wenbronk.weibo; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; /** * 统计tf, n * @author root * */ public class FirstReducer extends Reducer<Text, IntWritable, Text, IntWritable> { @Override protected void reduce(Text arg0, Iterable<IntWritable> arg1, Reducer<Text, IntWritable, Text, IntWritable>.Context arg2) throws IOException, InterruptedException { int sum = 0; for (IntWritable intWritable : arg1) { sum += intWritable.get(); } if (arg0.equals(new Text("count"))) { System.err.println(arg0.toString() + "---"); } arg2.write(arg0, new IntWritable(sum)); } }
partition
package com.wenbronk.weibo; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner; /** * 决定分区, 计划分4个, n一个, tf三个 * @author root * */ public class FirstPartition extends HashPartitioner<Text, IntWritable>{ @Override public int getPartition(Text key, IntWritable value, int numReduceTasks) { if (key.equals(new Text("count"))) { return 3; }else { return super.getPartition(key, value, numReduceTasks - 1); } } }
mainJob
package com.wenbronk.weibo; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class FirstJob { public static void main(String[] args) { Configuration config = new Configuration(); config.set("fs.defaults", "hdfs://192.168.208.106:8020"); config.set("yarn.resourcemanager.hostname", "192.168.208.106"); // config.set("maper.jar", "E:\\sxt\\target\\weibo1.jar"); try { Job job = Job.getInstance(config); job.setJarByClass(FirstJob.class); job.setJobName("first"); job.setPartitionerClass(FirstPartition.class); job.setMapperClass(FirstMapper.class); job.setNumReduceTasks(4); job.setCombinerClass(FirstReducer.class); job.setReducerClass(FirstReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path("E:\\sxt\\1-MapReduce\\data\\weibo2.txt")); FileSystem fileSystem = FileSystem.get(config); Path outPath = new Path("E:\\sxt\\1-MapReduce\\data\\weibo1"); if (fileSystem.exists(outPath)) { fileSystem.delete(outPath); } FileOutputFormat.setOutputPath(job, outPath); boolean waitForCompletion = job.waitForCompletion(true); if (waitForCompletion) { System.out.println("first success"); } }catch (Exception e) { e.printStackTrace(); } } }
2, 第二个
package com.wenbronk.weibo; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileSplit; /** * 计算 DFi的值, 在多少个文章中出现过 * */ public class SecondMapper extends Mapper<LongWritable, Text, Text, IntWritable> { @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException { // 获取当前maptask的数据片段 FileSplit inputSplit = (FileSplit) context.getInputSplit(); // count不被统计 if (!inputSplit.getPath().getName().contains("part-r-00003")) { String[] values = value.toString().trim().split("\t"); if (values.length >= 2) { String[] split = values[0].trim().split("_"); if (split.length >= 2) { String id = split[0]; context.write(new Text(id), new IntWritable(1)); } } }else { System.out.println(value.toString() + "----"); } } }
reduce
package com.wenbronk.weibo; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; /** * * @author root * */ public class SecondReducer extends Reducer<Text, IntWritable, Text, IntWritable>{ @Override protected void reduce(Text arg0, Iterable<IntWritable> arg1, Reducer<Text, IntWritable, Text, IntWritable>.Context arg2) throws IOException, InterruptedException { int sum = 0; for (IntWritable intWritable : arg1) { sum += intWritable.get(); } arg2.write(new Text(arg0), new IntWritable(sum)); } }
mainjob
package com.wenbronk.weibo; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class SecondJob { public static void main(String[] args) { Configuration config = new Configuration(); config.set("fs.default", "hdfs://192.168.208.106:8020"); config.set("yarn.resourcemanager.hostname", "192.168.208.106"); try { Job job = Job.getInstance(config); job.setJarByClass(SecondJob.class); job.setJobName("second"); job.setMapperClass(SecondMapper.class); job.setCombinerClass(SecondReducer.class); job.setReducerClass(SecondReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path("E:\\sxt\\1-MapReduce\\data\\weibo1")); FileSystem fileSystem = FileSystem.get(config); Path outPath = new Path("E:\\sxt\\1-MapReduce\\data\\weibo2"); if (fileSystem.exists(outPath)) { fileSystem.delete(outPath); } FileOutputFormat.setOutputPath(job, outPath); boolean f = job.waitForCompletion(true); if (f) { System.out.println("job2 success"); } }catch(Exception e) { e.printStackTrace(); } } }
3, 第三个Job
package com.wenbronk.weibo; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.net.URI; import java.text.NumberFormat; import java.util.HashMap; import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileSplit; public class ThirdMapper extends Mapper<LongWritable, Text, Text, Text>{ //存放微博总数, 将小数据缓存进内存, 预加载 public static Map<String, Integer> cmap = null; //存放df public static Map<String, Integer> df = null; // 在初始化类时执行, 将数据预加载进map protected void setup(Context context) throws IOException, InterruptedException { System.out.println("*****"); if (cmap == null || cmap.size() == 0 || df == null || df.size() == 0) { URI[] cacheFiles = context.getCacheFiles(); if (cacheFiles != null) { for (URI uri : cacheFiles) { if (uri.getPath().endsWith("part-r-00003")) { Path path = new Path(uri.getPath()); // 获取文件 Configuration configuration = context.getConfiguration(); FileSystem fs = FileSystem.get(configuration); FSDataInputStream open = fs.open(path); BufferedReader reader = new BufferedReader(new InputStreamReader(open)); // BufferedReader reader = new BufferedReader(new FileReader(path.getName())); String line = reader.readLine(); if (line.startsWith("count")) { String[] split = line.split("\t"); cmap = new HashMap<>(); cmap.put(split[0], Integer.parseInt(split[1].trim())); } reader.close(); }else if (uri.getPath().endsWith("part-r-00000")) { df = new HashMap<>(); Path path = new Path(uri.getPath()); // 获取文件 Configuration configuration = context.getConfiguration(); FileSystem fs = FileSystem.get(configuration); FSDataInputStream open = fs.open(path); BufferedReader reader = new BufferedReader(new InputStreamReader(open)); // BufferedReader reader = new BufferedReader(new FileReader(path.getName())); String line = null; while ((line = reader.readLine()) != null) { String[] ls = line.split("\t"); df.put(ls[0], Integer.parseInt(ls[1].trim())); } reader.close(); } } } } } @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context) throws IOException, InterruptedException { // 获取分片 FileSplit inputSplit = (FileSplit) context.getInputSplit(); if (!inputSplit.getPath().getName().contains("part-r-00003")) { String[] values = value.toString().trim().split("\t"); if (values.length >= 2) { int tf = Integer.parseInt(values[1].trim()); String[] ss = values[0].split("_"); if (ss.length >= 2) { String word = ss[0]; String id = ss[1]; // 公式 Double s = tf * Math.log(cmap.get("count")) / df.get(word); NumberFormat format = NumberFormat.getInstance(); // 取小数点后5位 format.setMaximumFractionDigits(5); context.write(new Text(id), new Text(word + ": " + format.format(s))); }else { System.out.println(value.toString() + "------"); } } } } }
reduce
package com.wenbronk.weibo; import java.io.IOException; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class ThirdReducer extends Reducer<Text, Text, Text, Text>{ @Override protected void reduce(Text arg0, Iterable<Text> arg1, Reducer<Text, Text, Text, Text>.Context arg2) throws IOException, InterruptedException { StringBuffer sb = new StringBuffer(); for (Text text : arg1) { sb.append(text.toString() + "\t"); } arg2.write(arg0, new Text(sb.toString())); } }
mainJob
package com.wenbronk.weibo; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class ThirdJob { public static void main(String[] args) { Configuration config = new Configuration(); config.set("fs.defaults", "hdfs://192.168.208.106:8020"); config.set("yarn.resourcemanager.hostname", "192.168.208.106"); try { Job job = Job.getInstance(config); job.setJarByClass(ThirdJob.class); job.setJobName("third"); // job.setInputFormatClass(KeyValueTextInputFormat.class); //把微博总数加载到内存 job.addCacheFile(new Path("E:\\sxt\\1-MapReduce\\data\\weibo1\\part-r-00003").toUri()); //把df加载到内存 job.addCacheFile(new Path("E:\\sxt\\1-MapReduce\\data\\weibo2\\part-r-00000").toUri()); job.setMapperClass(ThirdMapper.class); job.setReducerClass(ThirdReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); FileSystem fs = FileSystem.get(config); FileInputFormat.addInputPath(job, new Path("E:\\sxt\\1-MapReduce\\data\\weibo1")); Path path = new Path("E:\\sxt\\1-MapReduce\\data\\weibo3"); if (fs.exists(path)) { fs.delete(path); } FileOutputFormat.setOutputPath(job, path); boolean waitForCompletion = job.waitForCompletion(true); if(waitForCompletion) { System.out.println("执行job成功"); } }catch (Exception e) { e.printStackTrace(); } } }
系列来自尚学堂视频
时间: 2024-10-09 10:28:06