编写PhoneFlow程序,计算手机上行流量、下行流量以及总流量,数据如下:
13685295623 122 201
13985295600 102 11
13885295622 22 101
13785295633 120 20
1、FlowMapper:
package com.hadoop.flow;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.commons.lang.StringUtils;
public class FlowMapper extends Mapper<LongWritable, Text, Text, FlowBean>{
/**
* 数据格式:
* 13685295623 122 201
* 13985295600 102 11
* 13885295622 22 101
* 13785295633 120 20
*/
@Override
protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException {
String line=value.toString();
String [] fields=StringUtils.split(line,"\t");
String phoneNB=fields[0];
long up_flow=Long.valueOf(fields[1]);
long d_flow=Long.valueOf(fields[2]);
context.write(new Text(phoneNB), new FlowBean(phoneNB,up_flow,d_flow));
}
}
2、FlowReducer:
package com.hadoop.flow;
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class FlowReducer extends Reducer<Text, FlowBean,Text, FlowBean> {
@Override
protected void reduce(Text key, Iterable<FlowBean> values,Context context) throws IOException, InterruptedException {
long upflowC=0;
long dflowD=0;
for(FlowBean bean:values){
upflowC+=bean.getUp_flow();
dflowD+=bean.getD_flow();
}
context.write(key,new FlowBean(key.toString(),upflowC,dflowD));
}
}
3、FlowRunner
package com.hadoop.flow;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.io.Text;
public class FlowRunner extends Configured implements Tool{
public int run(String[] args) throws Exception {
Configuration conf=new Configuration();
Job job=Job.getInstance(conf);
job.setJarByClass(FlowRunner.class);
job.setMapperClass(FlowMapper.class);
job.setReducerClass(FlowReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(FlowBean.class);
FileInputFormat.setInputPaths(job,new Path(args[0]));
FileOutputFormat.setOutputPath(job,new Path(args[1]));
return job.waitForCompletion(true)?0:1;
}
public static void main(String[] args) throws Exception {
ToolRunner.run(new Configuration(), new FlowRunner(), args);
}
}
4、FlowBean :
package com.hadoop.flow;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;
public class FlowBean implements Writable{
private String phoneNB;
private long up_flow;
private long d_flow;
private long s_flow;
public FlowBean(){
}
public FlowBean (String phoneNB,long up_flow,long d_flow){
this.phoneNB=phoneNB;
this.up_flow=up_flow;
this.d_flow=d_flow;
this.s_flow=up_flow+d_flow;
}
public String getPhoneNB() {
return phoneNB;
}
public void setPhoneNB(String phoneNB) {
this.phoneNB = phoneNB;
}
public long getUp_flow() {
return up_flow;
}
public void setUp_flow(long up_flow) {
this.up_flow = up_flow;
}
public long getD_flow() {
return d_flow;
}
public void setD_flow(long d_flow) {
this.d_flow = d_flow;
}
public long getS_flow() {
return s_flow;
}
public void setS_flow(long s_flow) {
this.s_flow = s_flow;
}
//
public void write(DataOutput out) throws IOException {
out.writeUTF(phoneNB);
out.writeLong(up_flow);
out.writeLong(d_flow);
out.writeLong(s_flow);
}
public void readFields(DataInput in) throws IOException {
phoneNB= in.readUTF();
up_flow=in.readLong();
d_flow=in.readLong();
s_flow=in.readLong();
}
@Override
public String toString() {
return up_flow+" "+d_flow+" "+" "+s_flow;
}
}