1 package com.liuhuan; 2 3 import org.apache.hadoop.io.WritableComparable; 4 5 import java.io.DataInput; 6 import java.io.DataOutput; 7 import java.io.IOException; 8 9 public class Student implements WritableComparable<Student> { 10 11 private String userId; 12 private String userName; 13 private String km; 14 private int score; 15 private String fileName; 16 17 @Override 18 public String toString() { 19 return userName + "\t" + km + "=" + score; 20 } 21 22 public Student() { 23 } 24 25 public String getUserId() { 26 return userId; 27 } 28 29 public void setUserId(String userId) { 30 this.userId = userId; 31 } 32 33 public String getUserName() { 34 return userName; 35 } 36 37 public void setUserName(String userName) { 38 this.userName = userName; 39 } 40 41 public String getKm() { 42 return km; 43 } 44 45 public void setKm(String km) { 46 this.km = km; 47 } 48 49 public int getScore() { 50 return score; 51 } 52 53 public void setScore(int score) { 54 this.score = score; 55 } 56 57 public String getFileName() { 58 return fileName; 59 } 60 61 public void setFileName(String fileName) { 62 this.fileName = fileName; 63 } 64 65 @Override 66 public int compareTo(Student o) { 67 int i = o.userId.compareTo(this.userId); 68 if (i == 0) { 69 return o.userName.compareTo(this.userName); 70 } else { 71 return i; 72 } 73 } 74 75 @Override 76 public void write(DataOutput dataOutput) throws IOException { 77 dataOutput.writeUTF(use
1 package com.liuhuan; 2 3 import org.apache.hadoop.io.WritableComparable; 4 import org.apache.hadoop.io.WritableComparator; 5 6 public class StudentCompartor extends WritableComparator { 7 public StudentCompartor() { 8 super(Student.class,true); 9 } 10 11 @Override 12 public int compare(WritableComparable a, WritableComparable b) { 13 Student a1 = (Student) a; 14 Student b1 = (Student) b; 15 return b1.getUserId().compareTo(a1.getUserId()); 16 } 17 }
1 package com.liuhuan; 2 3 import org.apache.hadoop.conf.Configuration; 4 import org.apache.hadoop.fs.Path; 5 import org.apache.hadoop.io.NullWritable; 6 import org.apache.hadoop.mapreduce.Job; 7 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 8 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 9 10 import java.io.IOException; 11 12 public class StudentDriver { 13 14 public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { 15 Configuration cfg = new Configuration(); 16 Job job = Job.getInstance(cfg); 17
1 package com.liuhuan; 2 3 import org.apache.hadoop.io.LongWritable; 4 import org.apache.hadoop.io.NullWritable; 5 import org.apache.hadoop.io.Text; 6 import org.apache.hadoop.mapreduce.Mapper; 7 import org.apache.hadoop.mapreduce.lib.input.FileSplit; 8 9 import java.io.IOException; 10 11 public class StudentMapper extends Mapper<LongWritable, Text, Student, NullWritable> { 12 String fileName; 13 Student stu = new Student(); 14 @Override 15 protected void setup(Context context) throws IOException, InterruptedException { 16 FileSplit fs = (FileSplit) context.getInputSplit(); 17 fileName = fs.getPath().getName(); 18 } 19 20 @Override 21 protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { 22 String[] split = value.toString().split("\t"); 23 24 if (fileName.startsWith("student")) { 25 stu.setUserId(split[0]);
1 package com.liuhuan; 2 3 import org.apache.hadoop.io.NullWritable; 4 import org.apache.hadoop.mapreduce.Reducer; 5 6 import java.io.IOException; 7 import java.util.Iterator; 8 9 public class StudentReduce extends Reducer<Student, NullWritable,Student, NullWritable> { 10 11 @Override 12 protected void reduce(Student key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException { 13 Iterator<NullWritable> it = values.iterator(); 14 it.next(); 15 String userName = key.getUserName(); 16 17 System.err.println(key+"----"); 18 while (it.hasNext()) { 19 it.next(); 20 key.setUserName(userName); 21 context.write(key,NullWritable.get()); 22 } 23 } 24 }
26 stu.setUserName(split[1]); 27 stu.setScore(0); 28 stu.setKm(""); 29 stu.setFileName("student"); 30 } else { 31 stu.setUserId(split[0]); 32 stu.setKm(split[1]); 33 stu.setScore(Integer.parseInt(split[2])); 34 stu.setFileName("score"); 35 stu.setUserName(""); 36 } 37 System.err.println(stu); 38 context.write(stu,NullWritable.get()); 39 } 40 }
18 job.setJarByClass(StudentDriver.class); 19 job.setMapperClass(StudentMapper.class); 20 job.setReducerClass(StudentReduce.class); 21 22 job.setGroupingComparatorClass(StudentCompartor.class); 23 24 job.setMapOutputKeyClass(Student.class); 25 job.setMapOutputValueClass(NullWritable.class); 26 job.setOutputKeyClass(Student.class); 27 job.setOutputValueClass(NullWritable.class); 28 29 FileInputFormat.setInputPaths(job, new Path("C:\\Users\\83795\\Desktop\\test\\10A\\2\\input")); 30 FileOutputFormat.setOutputPath(job, new Path("C:\\Users\\83795\\Desktop\\test\\10A\\2\\output")); 31 32 boolean b = job.waitForCompletion(true); 33 System.out.println(b); 34 } 35 }
rId); 78 dataOutput.writeUTF(userName); 79 dataOutput.writeUTF(km); 80 dataOutput.writeInt(score); 81 dataOutput.writeUTF(fileName); 82 } 83 84 @Override 85 public void readFields(DataInput dataInput) throws IOException { 86 this.userId = dataInput.readUTF(); 87 this.userName = dataInput.readUTF(); 88 this.km = dataInput.readUTF(); 89 this.score = dataInput.readInt(); 90 this.fileName = dataInput.readUTF(); 91 } 92 }
原文地址:https://www.cnblogs.com/xjqi/p/12654912.html
时间: 2024-11-05 14:52:56