Hadoop读书笔记(六)MapReduce自定义数据类型demo

Hadoop读书笔记(一)Hadoop介绍http://blog.csdn.net/caicongyang/article/details/39898629

Hadoop读书笔记(二)HDFS的shell操作http://blog.csdn.net/caicongyang/article/details/41253927

Hadoop读书笔记(三)Java API操作HDFShttp://blog.csdn.net/caicongyang/article/details/41290955

Hadoop读书笔记(四)HDFS体系结构 :http://blog.csdn.net/caicongyang/article/details/41322649

Hadoop读书笔记(五)MapReduce统计单词demohttp://blog.csdn.net/caicongyang/article/details/41453579

1.demo说明

从给定的日志文件中统计手机流量

2.日志文件

1363157985066 	13726230503	00-FD-07-A4-72-B8:CMCC	120.196.100.82	i02.c.aliimg.com		24	27	2481	24681	200
1363157995052 	13826544101	5C-0E-8B-C7-F1-E0:CMCC	120.197.40.4			4	0	264	0	200
1363157991076 	13926435656	20-10-7A-28-CC-0A:CMCC	120.196.100.99			2	4	132	1512	200
1363154400022 	13926251106	5C-0E-8B-8B-B1-50:CMCC	120.197.40.4			4	0	240	0	200
1363157993044 	18211575961	94-71-AC-CD-E6-18:CMCC-EASY	120.196.100.99	iface.qiyi.com	视频网站	15	12	1527	2106	200
1363157995074 	84138413	5C-0E-8B-8C-E8-20:7DaysInn	120.197.40.4	122.72.52.12		20	16	4116	1432	200
1363157993055 	13560439658	C4-17-FE-BA-DE-D9:CMCC	120.196.100.99			18	15	1116	954	200
1363157995033 	15920133257	5C-0E-8B-C7-BA-20:CMCC	120.197.40.4	sug.so.360.cn	信息安全	20	20	3156	2936	200
1363157983019 	13719199419	68-A1-B7-03-07-B1:CMCC-EASY	120.196.100.82			4	0	240	0	200
1363157984041 	13660577991	5C-0E-8B-92-5C-20:CMCC-EASY	120.197.40.4	s19.cnzz.com	站点统计	24	9	6960	690	200
1363157973098 	15013685858	5C-0E-8B-C7-F7-90:CMCC	120.197.40.4	rank.ie.sogou.com	搜索引擎	28	27	3659	3538	200
1363157986029 	15989002119	E8-99-C4-4E-93-E0:CMCC-EASY	120.196.100.99	www.umeng.com	站点统计	3	3	1938	180	200
1363157992093 	13560439658	C4-17-FE-BA-DE-D9:CMCC	120.196.100.99			15	9	918	4938	200
1363157986041 	13480253104	5C-0E-8B-C7-FC-80:CMCC-EASY	120.197.40.4			3	3	180	180	200
1363157984040 	13602846565	5C-0E-8B-8B-B6-00:CMCC	120.197.40.4	2052.flash2-http.qq.com	综合门户	15	12	1938	2910	200
1363157995093 	13922314466	00-FD-07-A2-EC-BA:CMCC	120.196.100.82	img.qfc.cn		12	12	3008	3720	200
1363157982040 	13502468823	5C-0A-5B-6A-0B-D4:CMCC-EASY	120.196.100.99	y0.ifengimg.com	综合门户	57	102	7335	110349	200
1363157986072 	18320173382	84-25-DB-4F-10-1A:CMCC-EASY	120.196.100.99	input.shouji.sogou.com	搜索引擎	21	18	9531	2412	200
1363157990043 	13925057413	00-1F-64-E1-E6-9A:CMCC	120.196.100.55	t3.baidu.com	搜索引擎	69	63	11058	48243	200
1363157988072 	13760778710	00-FD-07-A4-7B-08:CMCC	120.196.100.82			2	2	120	120	200
1363157985079 	13823070001	20-7C-8F-70-68-1F:CMCC	120.196.100.99			6	3	360	180	200
1363157985069 	13600217502	00-1F-64-E2-E8-B1:CMCC	120.196.100.55			18	138	1080	186852	200

3.代码

KpiApp.java

package mapReduce;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.net.URI;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
/**
 *
 * <p>
 * Title: KpiApp.java
 * Package mapReduce
 * </p>
 * <p>
 * Description: 统计流量
 * <p>
 * @author Tom.Cai
 * @created 2014-11-25 下午10:23:33
 * @version V1.0
 *
 */
public class KpiApp {
	private static final String INPUT_PATH = "hdfs://192.168.80.100:9000/wlan";
	private static final String OUT_PATH = "hdfs://192.168.80.100:9000/wlan_out";

	public static void main(String[] args) throws Exception {
		FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), new Configuration());
		Path outPath = new Path(OUT_PATH);
		if (fileSystem.exists(outPath)) {
			fileSystem.delete(outPath, true);
		}

		Job job = new Job(new Configuration(), KpiApp.class.getSimpleName());
		FileInputFormat.setInputPaths(job, INPUT_PATH);
		job.setInputFormatClass(TextInputFormat.class);

		job.setMapperClass(KpiMapper.class);
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(KpiWite.class);

		job.setPartitionerClass(HashPartitioner.class);
		job.setNumReduceTasks(1);

		job.setReducerClass(KpiReducer.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(KpiWite.class);

		FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));
		job.setOutputFormatClass(TextOutputFormat.class);

		job.waitForCompletion(true);

	}

	static class KpiMapper extends Mapper<LongWritable, Text, Text, KpiWite> {
		@Override
		protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
			String[] splited = value.toString().split("\t");
			String num = splited[1];
			KpiWite kpi = new KpiWite(splited[6], splited[7], splited[8], splited[9]);
			context.write(new Text(num), kpi);
		}
	}

	static class KpiReducer extends Reducer<Text, KpiWite, Text, KpiWite> {
		@Override
		protected void reduce(Text key, Iterable<KpiWite> value, Context context) throws IOException, InterruptedException {
			long upPackNum = 0L;
			long downPackNum = 0L;
			long upPayLoad = 0L;
			long downPayLoad = 0L;
			for (KpiWite kpi : value) {
				upPackNum += kpi.upPackNum;
				downPackNum += kpi.downPackNum;
				upPayLoad += kpi.upPayLoad;
				downPayLoad += kpi.downPayLoad;
			}
			context.write(key, new KpiWite(String.valueOf(upPackNum), String.valueOf(downPackNum), String.valueOf(upPayLoad), String.valueOf(downPayLoad)));
		}
	}

}

class KpiWite implements Writable {
	long upPackNum;
	long downPackNum;
	long upPayLoad;
	long downPayLoad;

	public KpiWite() {
	}

	public KpiWite(String upPackNum, String downPackNum, String upPayLoad, String downPayLoad) {
		this.upPackNum = Long.parseLong(upPackNum);
		this.downPackNum = Long.parseLong(downPackNum);
		this.upPayLoad = Long.parseLong(upPayLoad);
		this.downPayLoad = Long.parseLong(downPayLoad);
	}

	@Override
	public void readFields(DataInput in) throws IOException {
		this.upPackNum = in.readLong();
		this.downPackNum = in.readLong();
		this.upPayLoad = in.readLong();
		this.downPayLoad = in.readLong();
	}

	@Override
	public void write(DataOutput out) throws IOException {
		out.writeLong(upPackNum);
		out.writeLong(downPackNum);
		out.writeLong(upPayLoad);
		out.writeLong(downPayLoad);
	}

}

欢迎大家一起讨论学习!

有用的自己收!

记录与分享,让你我共成长!欢迎查看我的其他博客;我的博客地址:http://blog.csdn.net/caicongyang

时间: 2024-12-25 19:29:30

Hadoop读书笔记(六)MapReduce自定义数据类型demo的相关文章

Hadoop读书笔记(八)MapReduce 打成jar包demo

Hadoop读书笔记(一)Hadoop介绍:http://blog.csdn.net/caicongyang/article/details/39898629 Hadoop读书笔记(二)HDFS的shell操作:http://blog.csdn.net/caicongyang/article/details/41253927 Hadoop读书笔记(三)Java API操作HDFS:http://blog.csdn.net/caicongyang/article/details/41290955

Hadoop读书笔记(七)MapReduce 0.x版本API使用demo

Hadoop读书笔记(一)Hadoop介绍:http://blog.csdn.net/caicongyang/article/details/39898629 Hadoop读书笔记(二)HDFS的shell操作:http://blog.csdn.net/caicongyang/article/details/41253927 Hadoop读书笔记(三)Java API操作HDFS:http://blog.csdn.net/caicongyang/article/details/41290955

Hadoop读书笔记(五)MapReduce统计单词demo

Hadoop读书笔记(一)Hadoop介绍:http://blog.csdn.net/caicongyang/article/details/39898629 Hadoop读书笔记(二)HDFS的shell操作:http://blog.csdn.net/caicongyang/article/details/41253927 Hadoop读书笔记(三)Java API操作HDFS:http://blog.csdn.net/caicongyang/article/details/41290955

Hadoop读书笔记(十二)MapReduce自定义排序

Hadoop读书笔记系列文章:http://blog.csdn.net/caicongyang/article/category/2166855 1.说明: 对给出的两列数据首先按照第一列升序排列,当第一列相同时,第二列升序排列 数据格式: 3 3 3 2 3 1 2 2 2 1 1 1 2.代码 SortApp.java package sort; import java.io.DataInput; import java.io.DataOutput; import java.io.IOExc

Hadoop读书笔记(十四)MapReduce中TopK算法(Top100算法)

Hadoop读书笔记系列文章:http://blog.csdn.net/caicongyang/article/category/2166855 (系列文章会逐步修整完成,添加数据文件格式预计相关注释) 1.说明: 从给定的文件中的找到最大的100个值,给定的数据文件格式如下: 533 16565 17800 2929 11374 9826 6852 20679 18224 21222 8227 5336 912 29525 3382 2100 10673 12284 31634 27405 1

Hadoop读书笔记(十)MapReduce中的从计数器理解combiner归约

Hadoop读书笔记系列文章:http://blog.csdn.net/caicongyang/article/category/2166855 1.combiner 问:什么是combiner: 答:Combiner发生在Mapper端,对数据进行归约处理,使传到reducer端的数据变小了,传输时间变端,作业时间变短,Combiner不能夸Mapper执行,(只有reduce可以接受多个Mapper的任务). 并不是所有的算法都适合归约处理,例如求平均数 2.代码实现 WordCount.j

Hadoop读书笔记(十一)MapReduce中的partition分组

Hadoop读书笔记系列文章:http://blog.csdn.net/caicongyang/article/category/2166855 1.partition分组 partition是指定分组算法,以及通过setNumReduceTasks设定Reduce的任务个数 2.代码 KpiApp.ava package cmd; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; i

Hadoop读书笔记(九)MapReduce计数器

Hadoop读书笔记系列文章:http://blog.csdn.net/caicongyang/article/category/2166855 1.MapReduce 计数器的作用 统计Map.Reduce以及Combiner执行的次数,可以用户简单判断代码的执行流程 2.MapReduce自带的计数器 14/11/26 22:28:51 INFO mapred.JobClient: Counters: 19 14/11/26 22:28:51 INFO mapred.JobClient: F

Hadoop读书笔记(十三)MapReduce中Top算法

Hadoop读书笔记系列文章:http://blog.csdn.net/caicongyang/article/category/2166855 1.说明: 从给定的文件中的找到最大值 2.代码: TopApp.java package suanfa; import java.io.IOException; import java.net.URI; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.F