九(三):手机上网流量汇总求和(排序方式二)

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;

public class FlowBean implements WritableComparable<FlowBean>{

	//电话号码
	private String phoneNb;
	//上传流量
	private long flow_up;
	//下载流量
	private long flow_down;
	//总流量
	private long flow_sum;

	public String getPhoneNb() {
		return phoneNb;
	}
	public void setPhoneNb(String phoneNb) {
		this.phoneNb = phoneNb;
	}
	public long getFlow_up() {
		return flow_up;
	}
	public void setFlow_up(long flow_up) {
		this.flow_up = flow_up;
	}
	public long getFlow_down() {
		return flow_down;
	}
	public void setFlow_down(long flow_down) {
		this.flow_down = flow_down;
	}
	public long getFlow_sum() {
		return flow_sum;
	}
	public void setFlow_sum(long flow_sum) {
		this.flow_sum = flow_sum;
	}
	//注意:在hadoop使用当中,如果你要使用有参的构造方法,无参的构造方法必须写出来。
	public FlowBean() {

	}
	//有参的构造方法
	public FlowBean(String phoneNb, long flow_up, long flow_down) {
		this.phoneNb = phoneNb;
		this.flow_up = flow_up;
		this.flow_down = flow_down;
		this.flow_sum = flow_up+flow_down;
	}

	/**
	 * 序列化对象:把结构化对象转化为字节流
	 */
	@Override
	public void write(DataOutput out) throws IOException {
		out.writeUTF(phoneNb);
		out.writeLong(flow_up);
		out.writeLong(flow_down);
		out.writeLong(flow_sum);
	}
	/**
	 * 反序列化对象:把字节流化为结构化对象
	 */
	@Override
	public void readFields(DataInput in) throws IOException {
		this.phoneNb = in.readUTF();
		this.flow_up = in.readLong();
		this.flow_down = in.readLong();
		this.flow_sum = in.readLong();
	}
	@Override
	public String toString() {
		return "" + flow_up + "\t" + flow_down+ "\t" + flow_sum + "";
	}
	@Override
	public int compareTo(FlowBean bean) {
		return this.flow_sum > bean.getFlow_sum() ? -1 : 1;
	}

}
import java.io.IOException;
import java.util.UUID;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.Partitioner;
import org.apache.hadoop.mapred.lib.HashPartitioner;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import cn.com.hadoop.mr.flowsum.FlowBean;

public class FlowSumSortRunner {

	public static void main(String[] args) throws Exception {
		Job job = Job.getInstance(new Configuration());

		job.setJarByClass(FlowSumSortRunner.class);
		job.setMapperClass(FlowSumSortMapper.class);
		job.setReducerClass(FlowSumSortReducer.class);

		job.setMapOutputKeyClass(FlowBean.class);
		job.setMapOutputValueClass(NullWritable.class);

		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(FlowBean.class);
		//输入第一次未排序的结果,然后在执行mapreduce程序就进行排序了
		FileInputFormat.setInputPaths(job, new Path("hdfs://XXX:9000/flowsum/part-r-00000"));
		FileOutputFormat.setOutputPath(job, new Path("hdfs://xxx:9000/"+UUID.randomUUID()));

		job.waitForCompletion(true);
	}

	public static class FlowSumSortMapper extends Mapper<LongWritable, Text, FlowBean, NullWritable>{

		@Override
		protected void map(LongWritable key,Text value,
				Mapper<LongWritable, Text, FlowBean, NullWritable>.Context context)
				throws IOException, InterruptedException {
			String line = value.toString();
			String[] split = StringUtils.split(line, "\t");
			context.write(new FlowBean(split[0], Long.parseLong(split[1]), Long.parseLong(split[2])), NullWritable.get());
		}

	}

	public static class FlowSumSortReducer extends Reducer<FlowBean, NullWritable, Text, FlowBean>{

		@Override
		protected void reduce(FlowBean key, Iterable<NullWritable> values,
				Reducer<FlowBean, NullWritable, Text, FlowBean>.Context context)
				throws IOException, InterruptedException {
			context.write(new Text(key.getPhoneNb()), key);

		}

	}
}
时间: 2024-10-19 23:46:32

九(三):手机上网流量汇总求和(排序方式二)的相关文章

九(一):手机上网流量汇总求和

对文件中的手机号流量进行汇总: 1363157985066  13726230503 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 2481 24681 200 1363157995052  13826544101 5C-0E-8B-C7-F1-E0:CMCC 120.197.40.4 4 0 264 0 200 1363157991076  13926435656 20-10-7A-28-CC-0A:CMCC 120.

结合手机上网流量业务来说明Hadoop中的二次排序机制,分区机制

本篇博客将结合手机上网流量业务来详细介绍Hadoop的二次排序机制.分区机制,先介绍一下业务场景: 先介绍一下业务场景:统计每个用户的上行流量和,下行流量和,以及总流量和. 本次描述所用数据: 日志格式描述: 日志flowdata.txt中的具体数据: 首先我们先通过mapreduce程序实现上面的业务逻辑: 代码实现: package FlowSum; import java.io.DataInput; import java.io.DataOutput; import java.io.IOE

结合手机上网流量业务来说明Hadoop中的自定义数据类型(序列化、反序列化机制)

大家都知道,Hadoop中为Key的数据类型必须实现WritableComparable接口,而Value的数据类型只需要实现Writable接口即可:能做Key的一定可以做Value,能做Value的未必能做Key.但是具体应该怎么应用呢?--本篇文章将结合手机上网流量业务进行分析. 先介绍一下业务场景:统计每个用户的上行流量和,下行流量和,以及总流量和. 本次描述所用数据: 日志格式描述: 日志flowdata.txt中的具体数据: 接下来贴出详细代码,代码中含有详细注释,从代码中可以看出,

第2节 mapreduce深入学习:8、手机流量汇总求和

例子:MapReduce综合练习之上网流量统计. 数据格式参见资料夹 需求一:统计求和 统计每个手机号的上行流量总和,下行流量总和,上行总流量之和,下行总流量之和 分析:以手机号码作为key值,上行流量,下行流量,上行总流量,下行总流量四个字段作为value值,然后以这个key,和value作为map阶段的输出,reduce阶段的输入. data_flow.dat内容类似下面的: 1363157985066 13726230503 00-FD-07-A4-72-B8:CMCC 120.196.1

MapReduce实现手机上网流量分析

一.问题背景 现在的移动刚一通话就可以在网站上看自己的通话记录,以前是本月只能看上一个月.不过流量仍然是只能看上一月的. 目的就是找到用户在一段时间内的上网流量. 本文并没有对时间分组. 二.数据集分析 可以看出实际数据集并不是每个字段都有值,但是还好,完整地以tab隔开了,数据格式还是不错的,我们需要的上行下行数据都有,没有缺失值.其实这个需要在程序中处理,如果不在的话 该怎么办. 1363157985066 13726230503 00-FD-07-A4-72-B8:CMCC 120.196

MapReduce实现手机上网日志分析(排序)

一.背景 1.1 流程 实现排序,分组拍上一篇通过Partitioner实现了. 实现接口,自动产生接口方法,写属性,产生getter和setter,序列化和反序列化属性,写比较方法,重写toString,为了方便复制写够着方法,不过重写够着方法map里需要不停地new,发现LongWritable有set方法,text也有,可以用,产生默认够着方法. public void set(String account,double income,double expense,double surpl

[转帖]从 2G 到 5G,手机上网话语权的三次改变

从 2G 到 5G,手机上网话语权的三次改变 美国第一大电信运营商 Verizon 公司的 CEO Hans Vestberg 手持一部 iPad,屏幕上显示俯瞰地面的飞行地图.400 多公里外的洛杉矶,一架无人机在他的远程控制下起飞.屏幕传来无人机拍摄的实时画面,左下角状态栏显示,5G 网络的连接的速度达到 900 Mbps. 作者:Qdaily深度报道来源:今日头条|2019-01-22 15:48 收藏 分享 美国第一大电信运营商 Verizon 公司的 CEO Hans Vestberg

Hadoop日记Day13---使用hadoop自定义类型处理手机上网日志

测试数据的下载地址为:http://pan.baidu.com/s/1gdgSn6r 一.文件分析 首先可以用文本编辑器打开一个HTTP_20130313143750.dat的二进制文件,这个文件的内容是我们的手机日志,文件的内容已经经过了优化,格式比较规整,便于学习研究,感兴趣的读者可以尝试一下. 我从中截取文件中的一行记录内容进行分析: 1363157985066     13726230503    00-FD-07-A4-72-B8:CMCC    120.196.100.82    i

Hadoop学习笔记—5.自定义类型处理手机上网日志

一.测试数据:手机上网日志 1.1 关于这个日志 假设我们如下一个日志文件,这个文件的内容是来自某个电信运营商的手机上网日志,文件的内容已经经过了优化,格式比较规整,便于学习研究. 该文件的内容如下(这里我只截取了三行): 1363157993044 18211575961 94-71-AC-CD-E6-18:CMCC-EASY 120.196.100.99 iface.qiyi.com 视频网站 15 12 1527 2106 200 1363157995033 15920133257 5C-