Hadoop MapReduce(FlowCount) Java编程

编写PhoneFlow程序,计算手机上行流量、下行流量以及总流量,数据如下:

13685295623 122  201

13985295600 102  11

13885295622 22   101

13785295633 120  20

1、FlowMapper:

package com.hadoop.flow;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.commons.lang.StringUtils;

public class FlowMapper extends Mapper<LongWritable, Text, Text, FlowBean>{

/**

* 数据格式:

* 13685295623 122  201

* 13985295600 102  11

* 13885295622 22   101

* 13785295633 120  20

*/

@Override

protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException {

String line=value.toString();

String [] fields=StringUtils.split(line,"\t");

String phoneNB=fields[0];

long up_flow=Long.valueOf(fields[1]);

long d_flow=Long.valueOf(fields[2]);

context.write(new Text(phoneNB), new FlowBean(phoneNB,up_flow,d_flow));

}

}

2、FlowReducer:

package com.hadoop.flow;

import java.io.IOException;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Reducer;

public class FlowReducer extends Reducer<Text, FlowBean,Text, FlowBean> {

@Override

protected void reduce(Text key, Iterable<FlowBean> values,Context context) throws IOException, InterruptedException {

long upflowC=0;

long dflowD=0;

for(FlowBean bean:values){

upflowC+=bean.getUp_flow();

dflowD+=bean.getD_flow();

}

context.write(key,new FlowBean(key.toString(),upflowC,dflowD));

}

}


3、FlowRunner 

package com.hadoop.flow;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.conf.Configured;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.util.Tool;

import org.apache.hadoop.util.ToolRunner;

import org.apache.hadoop.io.Text;

public class FlowRunner extends Configured implements Tool{

public int run(String[] args) throws Exception {

Configuration conf=new Configuration();

Job job=Job.getInstance(conf);

job.setJarByClass(FlowRunner.class);

job.setMapperClass(FlowMapper.class);

job.setReducerClass(FlowReducer.class);

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(FlowBean.class);

job.setMapOutputKeyClass(Text.class);

job.setMapOutputValueClass(FlowBean.class);

FileInputFormat.setInputPaths(job,new Path(args[0]));

FileOutputFormat.setOutputPath(job,new Path(args[1]));

return job.waitForCompletion(true)?0:1;

}

public static void main(String[] args) throws Exception {

ToolRunner.run(new Configuration(), new FlowRunner(), args);

}

}


4、FlowBean :

package com.hadoop.flow;

import java.io.DataInput;

import java.io.DataOutput;

import java.io.IOException;

import org.apache.hadoop.io.Writable;

public class FlowBean implements Writable{

private String phoneNB;

private long up_flow;

private long d_flow;

private long s_flow;

public FlowBean(){

}

public FlowBean (String phoneNB,long up_flow,long d_flow){

this.phoneNB=phoneNB;

this.up_flow=up_flow;

this.d_flow=d_flow;

this.s_flow=up_flow+d_flow;

}

public String getPhoneNB() {

return phoneNB;

}

public void setPhoneNB(String phoneNB) {

this.phoneNB = phoneNB;

}

public long getUp_flow() {

return up_flow;

}

public void setUp_flow(long up_flow) {

this.up_flow = up_flow;

}

public long getD_flow() {

return d_flow;

}

public void setD_flow(long d_flow) {

this.d_flow = d_flow;

}

public long getS_flow() {

return s_flow;

}

public void setS_flow(long s_flow) {

this.s_flow = s_flow;

}

//

public void write(DataOutput out) throws IOException {

out.writeUTF(phoneNB);

out.writeLong(up_flow);

out.writeLong(d_flow);

out.writeLong(s_flow);

}

public void readFields(DataInput in) throws IOException {

phoneNB= in.readUTF();

up_flow=in.readLong();

d_flow=in.readLong();

s_flow=in.readLong();

}

@Override

public String toString() {

return up_flow+"   "+d_flow+"   "+"   "+s_flow;

}

}

时间: 2024-10-27 10:58:00

Hadoop MapReduce(FlowCount) Java编程的相关文章

Hadoop MapReduce(WordCount) Java编程

编写WordCount程序数据如下: hello beijing hello shanghai hello chongqing hello tianjin hello guangzhou hello shenzhen ... 1.WCMapper: package com.hadoop.testHadoop; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop

Hadoop MapReduce编程 API入门系列之压缩和计数器(三十)

不多说,直接上代码. Hadoop MapReduce编程 API入门系列之小文件合并(二十九) 生成的结果,作为输入源. 代码 package zhouls.bigdata.myMapReduce.ParseTVDataCompressAndCounter; import java.net.URI; import java.util.List;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Co

Hadoop MapReduce编程学习

一直在搞spark,也没时间弄hadoop,不过Hadoop基本的编程我觉得我还是要会吧,看到一篇不错的文章,不过应该应用于hadoop2.0以前,因为代码中有  conf.set("mapred.job.tracker", "192.168.1.2:9001");新框架中已改为 Yarn-site.xml 中的 resouceManager 及 nodeManager 具体配置项,新框架中历史 job 的查询已从 Job tracker 剥离,归入单独的mapre

Hadoop MapReduce编程 API入门系列之挖掘气象数据版本2(九)

下面,是版本1. Hadoop MapReduce编程 API入门系列之挖掘气象数据版本1(一) 这篇博文,包括了,实际生产开发非常重要的,单元测试和调试代码.这里不多赘述,直接送上代码. MRUnit 框架 MRUnit是Cloudera公司专为Hadoop MapReduce写的单元测试框架,API非常简洁实用.MRUnit针对不同测试对象使用不同的Driver: MapDriver:针对单独的Map测试  ReduceDriver:针对单独的Reduce测试    MapReduceDri

Hadoop MapReduce编程 API入门系列之处理Excel通话记录(二十)

不多说,直接上代码. 与家庭成员之间的通话记录一份,存储在Excel文件中,如下面的数据集所示.我们需要基于这份数据,统计每个月每个家庭成员给自己打电话的次数,并按月份输出到不同文件夹. 2016-12-12 20:04:10,203 INFO [zhouls.bigdata.myMapReduce.ExcelContactCount.ExcelContactCount$ExcelMapper] - Map processing finished2016-12-12 20:04:10,203 I

Hadoop MapReduce编程 API入门系列之FOF(Fund of Fund)(二十三)

不多说,直接上代码. 代码 package zhouls.bigdata.myMapReduce.friend; import org.apache.hadoop.io.Text; public class Fof extends Text{//自定义Fof,表示f1和f2关系 public Fof(){//无参构造 super(); } public Fof(String a,String b){//有参构造 super(getFof(a, b)); } public static Strin

Hadoop MapReduce编程 API入门系列之网页流量版本1(二十二)

不多说,直接上代码. 对流量原始日志进行流量统计,将不同省份的用户统计结果输出到不同文件. 代码 package zhouls.bigdata.myMapReduce.flowsum; import java.io.DataInput;import java.io.DataOutput;import java.io.IOException; import org.apache.hadoop.io.Writable;import org.apache.hadoop.io.WritableCompa

Hadoop MapReduce编程 API入门系列之统计学生成绩版本2(十八)

不多说,直接上代码. 统计出每个年龄段的 男.女 学生的最高分 这里,为了空格符的差错,直接,我们有时候,像如下这样的来排数据. 代码 package zhouls.bigdata.myMapReduce.Gender; import java.io.IOException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs

Hadoop MapReduce编程 API入门系列之统计学生成绩版本1(十七)

不多说,直接上代码. 代码 package zhouls.bigdata.myMapReduce.ScoreCount; import java.io.IOException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FSDataInputStream;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;