第一次迭代 1 package com.laoxiao.mr.weibo; 2 3 import java.io.StringReader; 4 5 import org.apache.commons.lang.StringUtils; 6 import org.apache.hadoop.io.IntWritable; 7 import org.apache.hadoop.io.LongWritable; 8 import org.apache.hadoop.io.Text; 9 import org.apache.hadoop.mapreduce.Mapper; 10 import org.wltea.analyzer.core.IKSegmenter; 11 import org.wltea.analyzer.core.Lexeme; 12 13 /** 14 * 第一个MR,计算TF和计算N(微博总数) 15 * @author root 16 * 17 */ 18 public class firstMapper extends Mapper<LongWritable, Text, Text, IntWritable>{ 19 20 protected void map(LongWritable key, Text value, Context context) 21 throws java.io.IOException ,InterruptedException { 22 String [] temp=StringUtils.split(value.toString(),"\t"); 23 if(temp.length>=2){ 24 String id=temp[0].trim(); 25 String str=temp[1].trim(); 26 StringReader sr =new StringReader(str); 27 IKSegmenter ikSegmenter =new IKSegmenter(sr, true); 28 Lexeme word=null; 29 while( (word=ikSegmenter.next()) !=null ){ 30 String w= word.getLexemeText(); 31 context.write(new Text(w+"_"+id), new IntWritable(1)); 32 } 33 context.write(new Text("count"), new IntWritable(1)); 34 }else{ 35 System.out.println("value is error:"+value.toString()); 36 } 37 }; 38 } 39 package com.laoxiao.mr.weibo; 40 41 import org.apache.hadoop.io.IntWritable; 42 import org.apache.hadoop.io.Text; 43 import org.apache.hadoop.mapreduce.Reducer; 44 45 import sun.management.resources.agent; 46 47 public class firstReducer extends Reducer<Text, IntWritable, Text, IntWritable>{ 48 49 protected void reduce(Text arg0, java.lang.Iterable<IntWritable> arg1, Context arg2) 50 throws java.io.IOException ,InterruptedException { 51 int sum=0; 52 for (IntWritable i : arg1) { 53 sum+=i.get(); 54 } 55 arg2.write(arg0, new IntWritable(sum)); 56 }; 57 } 58 59 package com.laoxiao.mr.weibo; 60 61 import org.apache.hadoop.io.IntWritable; 62 import org.apache.hadoop.io.Text; 63 import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner; 64 65 public class firstRepartition extends HashPartitioner<Text, IntWritable>{ 66 67 @Override 68 public int getPartition(Text key, IntWritable value, int numReduceTasks) { 69 if(key.toString().equals("count")){ 70 return 3; 71 }else{ 72 return super.getPartition(key, value, numReduceTasks-1); 73 } 74 75 } 76 } 77 78 79 package com.laoxiao.mr.weibo; 80 81 82 import org.apache.hadoop.conf.Configuration; 83 import org.apache.hadoop.fs.FileSystem; 84 import org.apache.hadoop.fs.Path; 85 import org.apache.hadoop.io.IntWritable; 86 import org.apache.hadoop.io.Text; 87 import org.apache.hadoop.mapreduce.Job; 88 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 89 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 90 91 92 public class firstJob { 93 94 public static void main(String[] args) { 95 Configuration config=new Configuration(); 96 config.set("fs.defaultFS", "hdfs://node1:8020"); 97 config.set("yarn.resourcemanager.hostname", "node1"); 98 try { 99 FileSystem fs =FileSystem.get(config); 100 Job job=Job.getInstance(config); 101 job.setJarByClass(firstJob.class); 102 job.setJobName("weibo1"); 103 104 105 job.setMapperClass(firstMapper.class); 106 job.setReducerClass(firstReducer.class); 107 job.setMapOutputKeyClass(Text.class); 108 job.setMapOutputValueClass(IntWritable.class); 109 job.setPartitionerClass(firstRepartition.class); 110 //job.setCombinerClass(firstReducer.class); 111 job.setNumReduceTasks(4); 112 113 FileInputFormat.addInputPath(job, new Path("/root/input/data/weibo.txt")); 114 115 Path path =new Path("/usr/output/weibo1"); 116 if(fs.exists(path)){ 117 fs.delete(path, true); 118 } 119 FileOutputFormat.setOutputPath(job,path); 120 121 boolean f= job.waitForCompletion(true); 122 if(f){ 123 System.out.println("first job run finished!!"); 124 } 125 126 } catch (Exception e) { 127 // TODO Auto-generated catch block 128 e.printStackTrace(); 129 } 130 } 131 }
第二次迭代
1 package com.laoxiao.mr.weibo; 2 3 import java.io.IOException; 4 5 import org.apache.hadoop.io.IntWritable; 6 import org.apache.hadoop.io.LongWritable; 7 import org.apache.hadoop.io.Text; 8 import org.apache.hadoop.mapreduce.Mapper; 9 import org.apache.hadoop.mapreduce.lib.input.FileSplit; 10 //统计df:词在多少个微博中出现过。 11 public class secondMapper extends Mapper<LongWritable, Text, Text, IntWritable>{ 12 13 protected void map(LongWritable key, Text value, Context context) 14 throws IOException, InterruptedException { 15 16 //获取当前 mapper task的数据片段(split) 17 FileSplit fs = (FileSplit) context.getInputSplit(); 18 19 if (!fs.getPath().getName().contains("part-r-00003")) { 20 21 String[] v = value.toString().trim().split("\t"); 22 if (v.length >= 2) { 23 String[] ss = v[0].split("_"); 24 if (ss.length >= 2) { 25 String w = ss[0]; 26 context.write(new Text(w), new IntWritable(1)); 27 } 28 } else { 29 System.out.println(value.toString() + "-------------"); 30 } 31 } 32 33 } 34 } 35 package com.laoxiao.mr.weibo; 36 37 import org.apache.hadoop.io.IntWritable; 38 import org.apache.hadoop.io.Text; 39 import org.apache.hadoop.mapreduce.Reducer; 40 41 public class secondReducer extends Reducer<Text, IntWritable, Text, IntWritable>{ 42 43 protected void reduce(Text arg0, java.lang.Iterable<IntWritable> arg1, Context context) 44 throws java.io.IOException ,InterruptedException { 45 int sum=0; 46 for (IntWritable i : arg1) { 47 sum+=1; 48 } 49 context.write(arg0, new IntWritable(sum)); 50 }; 51 } 52 package com.laoxiao.mr.weibo; 53 54 55 import org.apache.hadoop.conf.Configuration; 56 import org.apache.hadoop.fs.FileSystem; 57 import org.apache.hadoop.fs.Path; 58 import org.apache.hadoop.io.IntWritable; 59 import org.apache.hadoop.io.Text; 60 import org.apache.hadoop.mapreduce.Job; 61 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 62 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 63 64 65 public class secondJob { 66 67 public static void main(String[] args) { 68 Configuration config=new Configuration(); 69 config.set("fs.defaultFS", "hdfs://node1:8020"); 70 config.set("yarn.resourcemanager.hostname", "node1"); 71 try { 72 FileSystem fs =FileSystem.get(config); 73 Job job=Job.getInstance(config); 74 job.setJarByClass(secondJob.class); 75 job.setJobName("weibo2"); 76 77 78 job.setMapperClass(secondMapper.class); 79 job.setReducerClass(secondReducer.class); 80 job.setMapOutputKeyClass(Text.class); 81 job.setMapOutputValueClass(IntWritable.class); 82 //job.setPartitionerClass(firstRepartition.class); 83 //job.setCombinerClass(firstReducer.class); 84 //job.setNumReduceTasks(4); 85 86 FileInputFormat.addInputPath(job, new Path("/usr/output/weibo1")); 87 88 Path path =new Path("/usr/output/weibo2"); 89 if(fs.exists(path)){ 90 fs.delete(path, true); 91 } 92 FileOutputFormat.setOutputPath(job,path); 93 94 boolean f= job.waitForCompletion(true); 95 if(f){ 96 System.out.println("second job run finished!!"); 97 } 98 99 } catch (Exception e) { 100 // TODO Auto-generated catch block 101 e.printStackTrace(); 102 } 103 } 104 }
第三次迭代
package com.laoxiao.mr.weibo; import java.io.BufferedReader; import java.io.File; import java.io.FileInputStream; import java.io.FileReader; import java.io.IOException; import java.io.InputStreamReader; import java.io.StringReader; import java.net.URI; import java.text.NumberFormat; import java.util.HashMap; import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.wltea.analyzer.core.IKSegmenter; import org.wltea.analyzer.core.Lexeme; /** * 最后计算 * @author root * */ public class LastMapper extends Mapper<LongWritable, Text, Text, Text> { //存放微博总数 public static Map<String, Integer> cmap = null; //存放df public static Map<String, Integer> df = null; // 在map方法执行之前 protected void setup(Context context) throws IOException, InterruptedException { System.out.println("******************"); if (cmap == null || cmap.size() == 0 || df == null || df.size() == 0) { URI[] ss = context.getCacheFiles(); if (ss != null) { for (int i = 0; i < ss.length; i++) { URI uri = ss[i]; if (uri.getPath().endsWith("part-r-00003")) {//微博总数 Path path =new Path(uri.getPath()); // FileSystem fs =FileSystem.get(context.getConfiguration()); // fs.open(path); BufferedReader br = new BufferedReader(new FileReader(path.getName())); String line = br.readLine(); if (line.startsWith("count")) { String[] ls = line.split("\t"); cmap = new HashMap<String, Integer>(); cmap.put(ls[0], Integer.parseInt(ls[1].trim())); } br.close(); } else if (uri.getPath().endsWith("part-r-00000")) {//词条的DF df = new HashMap<String, Integer>(); Path path =new Path(uri.getPath()); BufferedReader br = new BufferedReader(new FileReader(path.getName())); String line; while ((line = br.readLine()) != null) { String[] ls = line.split("\t"); df.put(ls[0], Integer.parseInt(ls[1].trim())); } br.close(); } } } } } protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { FileSplit fs = (FileSplit) context.getInputSplit(); // System.out.println("--------------------"); if (!fs.getPath().getName().contains("part-r-00003")) { String[] v = value.toString().trim().split("\t"); if (v.length >= 2) { int tf =Integer.parseInt(v[1].trim());//tf值 String[] ss = v[0].split("_"); if (ss.length >= 2) { String w = ss[0]; String id=ss[1]; double s=tf * Math.log(cmap.get("count")/df.get(w)); NumberFormat nf =NumberFormat.getInstance(); nf.setMaximumFractionDigits(5); context.write(new Text(id), new Text(w+":"+nf.format(s))); } } else { System.out.println(value.toString() + "-------------"); } } } } package com.laoxiao.mr.weibo; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class LastReduce extends Reducer<Text, Text, Text, Text>{ protected void reduce(Text key, Iterable<Text> arg1, Context context) throws IOException, InterruptedException { StringBuffer sb =new StringBuffer(); for( Text i :arg1 ){ sb.append(i.toString()+"\t"); } context.write(key, new Text(sb.toString())); } } package com.laoxiao.mr.weibo; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.filecache.DistributedCache; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.TextInputFormat; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class LastJob { public static void main(String[] args) { Configuration config =new Configuration(); config.set("fs.defaultFS", "hdfs://node1:8020"); config.set("yarn.resourcemanager.hostname", "node1"); //config.set("mapred.jar", "C:\\Users\\Administrator\\Desktop\\weibo3.jar"); try { FileSystem fs =FileSystem.get(config); //JobConf job =new JobConf(config); Job job =Job.getInstance(config); job.setJarByClass(LastJob.class); job.setJobName("weibo3"); // DistributedCache.addCacheFile(uri, conf); //2.5 //把微博总数加载到内存 job.addCacheFile(new Path("/usr/output/weibo1/part-r-00003").toUri()); //把df加载到内存 job.addCacheFile(new Path("/usr/output/weibo2/part-r-00000").toUri()); //设置map任务的输出key类型、value类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); // job.setMapperClass(); job.setMapperClass(LastMapper.class); job.setReducerClass(LastReduce.class); //mr运行时的输入数据从hdfs的哪个目录中获取 FileInputFormat.addInputPath(job, new Path("/usr/output/weibo1")); Path outpath =new Path("/usr/output/weibo3"); if(fs.exists(outpath)){ fs.delete(outpath, true); } FileOutputFormat.setOutputPath(job,outpath ); boolean f= job.waitForCompletion(true); if(f){ System.out.println("执行job成功"); } } catch (Exception e) { e.printStackTrace(); } } }
时间: 2024-10-14 22:40:05