import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.WritableComparable; public class FlowBean implements WritableComparable<FlowBean>{ //电话号码 private String phoneNb; //上传流量 private long flow_up; //下载流量 private long flow_down; //总流量 private long flow_sum; public String getPhoneNb() { return phoneNb; } public void setPhoneNb(String phoneNb) { this.phoneNb = phoneNb; } public long getFlow_up() { return flow_up; } public void setFlow_up(long flow_up) { this.flow_up = flow_up; } public long getFlow_down() { return flow_down; } public void setFlow_down(long flow_down) { this.flow_down = flow_down; } public long getFlow_sum() { return flow_sum; } public void setFlow_sum(long flow_sum) { this.flow_sum = flow_sum; } //注意:在hadoop使用当中,如果你要使用有参的构造方法,无参的构造方法必须写出来。 public FlowBean() { } //有参的构造方法 public FlowBean(String phoneNb, long flow_up, long flow_down) { this.phoneNb = phoneNb; this.flow_up = flow_up; this.flow_down = flow_down; this.flow_sum = flow_up+flow_down; } /** * 序列化对象:把结构化对象转化为字节流 */ @Override public void write(DataOutput out) throws IOException { out.writeUTF(phoneNb); out.writeLong(flow_up); out.writeLong(flow_down); out.writeLong(flow_sum); } /** * 反序列化对象:把字节流化为结构化对象 */ @Override public void readFields(DataInput in) throws IOException { this.phoneNb = in.readUTF(); this.flow_up = in.readLong(); this.flow_down = in.readLong(); this.flow_sum = in.readLong(); } @Override public String toString() { return "" + flow_up + "\t" + flow_down+ "\t" + flow_sum + ""; } @Override public int compareTo(FlowBean bean) { return this.flow_sum > bean.getFlow_sum() ? -1 : 1; } }
import java.io.IOException; import java.util.UUID; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.Partitioner; import org.apache.hadoop.mapred.lib.HashPartitioner; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import cn.com.hadoop.mr.flowsum.FlowBean; public class FlowSumSortRunner { public static void main(String[] args) throws Exception { Job job = Job.getInstance(new Configuration()); job.setJarByClass(FlowSumSortRunner.class); job.setMapperClass(FlowSumSortMapper.class); job.setReducerClass(FlowSumSortReducer.class); job.setMapOutputKeyClass(FlowBean.class); job.setMapOutputValueClass(NullWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(FlowBean.class); //输入第一次未排序的结果,然后在执行mapreduce程序就进行排序了 FileInputFormat.setInputPaths(job, new Path("hdfs://XXX:9000/flowsum/part-r-00000")); FileOutputFormat.setOutputPath(job, new Path("hdfs://xxx:9000/"+UUID.randomUUID())); job.waitForCompletion(true); } public static class FlowSumSortMapper extends Mapper<LongWritable, Text, FlowBean, NullWritable>{ @Override protected void map(LongWritable key,Text value, Mapper<LongWritable, Text, FlowBean, NullWritable>.Context context) throws IOException, InterruptedException { String line = value.toString(); String[] split = StringUtils.split(line, "\t"); context.write(new FlowBean(split[0], Long.parseLong(split[1]), Long.parseLong(split[2])), NullWritable.get()); } } public static class FlowSumSortReducer extends Reducer<FlowBean, NullWritable, Text, FlowBean>{ @Override protected void reduce(FlowBean key, Iterable<NullWritable> values, Reducer<FlowBean, NullWritable, Text, FlowBean>.Context context) throws IOException, InterruptedException { context.write(new Text(key.getPhoneNb()), key); } } }
时间: 2024-10-19 23:46:32