MapReduce基础入门(二)

1.Combiner合并

2.序列化

3.Sort排序

4.Partitioner分区

5.综合案例

1.Combiner合并

1)什么是Combiner

Combiner 是 MapReduce 程序中 Mapper 和 Reducer 之外的一种组件,它的作用是在 maptask 之后给 maptask 的结果进行局部汇总,以减轻 reducetask 的计算负载,减少网络传输

2)如何使用combiner

Combiner 和 Reducer 一样,编写一个类,然后继承 Reducer, reduce 方法中写具体的 Combiner 逻辑,然后在 job 中设置 Combiner 类: job.setCombinerClass(FlowSumCombine.class)

3)使用combiner注意事项

(1)Combiner 和 Reducer 的区别在于运行的位置:

Combiner 是在每一个 maptask 所在的节点运行
           Reducer 是接收全局所有 Mapper 的输出结果

(2)Combiner 的输出 kv 应该跟 reducer 的输入 kv 类型要对应起来

(3)Combiner 的使用要非常谨慎,因为 Combiner 在 MapReduce 过程中可能调用也可能不调 用,可能调一次也可能调多次,所以: Combiner 使用的原则是:有或没有都不能影响业务 逻辑,都不能影响最终结果(求平均值时,combiner和reduce逻辑不一样)

2.序列化

1)概述

Java 的序列化是一个重量级序列化框架( Serializable),一个对象被序列化后,会附带很多额 外的信息(各种校验信息, header,继承体系等),不便于在网络中高效传输;所以, hadoop 自己开发了一套序列化机制( Writable),精简,高效,Hadoop 中的序列化框架已经对基本类型和 null 提供了序列化的实现了,如下

byte


ByteWritable


short


ShortWritable


int


IntWritable


long


LongWritable


float


FloatWritable


double


DoubleWritable


String


Text


null


NullWritable

2)Java序列化

Java序列号只需要实现Serializable接口即可如

  1 public class Student implements Serializable{
  2      public Student(){
  3     }
  4     private int id;
  5 }

3)自定义对象实现mapreduce框架序列化

如果需要将自定义的 bean 放在 key 中传输,则还需要实现 Comparable 接口,因为 mapreduce框中的 shuffle 过程一定会对 key 进行排序,此时,自定义的 bean 实现的接口应该是:

  1 public class FlowBean implements WritableComparable<FlowBean> {
  2
  3 	private String phoneNbr;
  4 	private long up_flow;
  5 	private long d_flow;
  6 	private long sum_flow;
  7
  8 	public void set(String phoneNbr, long up_flow, long d_flow) {
  9 		this.phoneNbr = phoneNbr;
 10 		this.up_flow = up_flow;
 11 		this.d_flow = d_flow;
 12 		this.sum_flow = up_flow + d_flow;
 13 	}
 14
 15 	/**
 16 	 * 序列化,将数据字段以字节流写出去
 17 	 */
 18 	public void write(DataOutput out) throws IOException {
 19 		out.writeUTF(this.phoneNbr);
 20 		out.writeLong(this.up_flow);
 21 		out.writeLong(this.d_flow);
 22 		out.writeLong(this.sum_flow);
 23 	}
 24
 25 	/**
 26 	 * 反序列化,从字节流中读出各个数据字段 读出的顺序应该跟序列化时写入的顺序保持一致
 27 	 */
 28 	public void readFields(DataInput in) throws IOException {
 29 		this.phoneNbr = in.readUTF();
 30 		this.up_flow = in.readLong();
 31 		this.d_flow = in.readLong();
 32 		this.sum_flow = in.readLong();
 33 	}
 34
 35 	public int compareTo(FlowBean o) {
 36 		return this.sum_flow > o.getSum_flow() ? -1 : 1;
 37 	}
 38
 39 	@Override
 40 	public String toString() {
 41 		return phoneNbr+"\t"+up_flow + "\t" + d_flow + "\t" + sum_flow;
 42 	}
 43
 44 	public String getPhoneNbr() {
 45 		return phoneNbr;
 46 	}
 47
 48 	public void setPhoneNbr(String phoneNbr) {
 49 		this.phoneNbr = phoneNbr;
 50 	}
 51
 52 	public long getUp_flow() {
 53 		return up_flow;
 54 	}
 55
 56 	public void setUp_flow(long up_flow) {
 57 		this.up_flow = up_flow;
 58 	}
 59
 60 	public long getD_flow() {
 61 		return d_flow;
 62 	}
 63
 64 	public void setD_flow(long d_flow) {
 65 		this.d_flow = d_flow;
 66 	}
 67
 68 	public long getSum_flow() {
 69 		return sum_flow;
 70 	}
 71
 72 	public void setSum_flow(long sum_flow) {
 73 		this.sum_flow = sum_flow;
 74 	}
 75
 76 }

3.Sort排序

在MapReduce执行Spill溢出过程中,会对结果进行排序,排序的方法是,根据key进行排序,如果我们把自己的FlowBean当作key,并且实现了WritableComparable,只需要重写compareTo方法即可,如,按照sun_flow倒叙排序,重新的compareTo方法如下:

  1 public int compareTo(FlowBean o) {
  2 		return this.sum_flow > o.getSum_flow() ? -1 : 1;
  3 	}

4.Partitioner分区

在MapReduce分区的过程中,Reduce的个数即分区的个数,这个时候只需要设置job.setNumReduceTasks(5),就可以设置Reduce的个数了,但这个分区是系统随便分区的,要想实现自己的分区,则还需要继承extends Partitioner<KEY, VALUE>类,并且重写getPartition即可

  1 public int getPartition(KEY key, VALUE value, int numPartitions)

5.综合案例

测试数据:

部分数据如下:

1363157985066   13726230503     00-FD-07-A4-72-B8:CMCC  120.196.100.82  i02.c.aliimg.com                24      27      2481    24681   200

1363157995052   13826544101     5C-0E-8B-C7-F1-E0:CMCC  120.197.40.4                    4       0       264     0       200

需求分析:

(1)统计每一个用户(手机号)所耗费的总上行流量、下行流量、总流量

(2)得出上体的结果在加一个需求:将统计结果按照总流量倒叙排序

(3)将流量汇总结果按照手机归属地输出到不同的文件中

实现如下:

FLowBean.java

  1 import java.io.DataInput;
  2 import java.io.DataOutput;
  3 import java.io.IOException;
  4
  5 import org.apache.hadoop.io.WritableComparable;
  6
  7 public class FlowBean implements WritableComparable<FlowBean> {
  8
  9 	private String phoneNbr;
 10 	private long up_flow;
 11 	private long d_flow;
 12 	private long sum_flow;
 13
 14 	public void set(String phoneNbr, long up_flow, long d_flow) {
 15 		this.phoneNbr = phoneNbr;
 16 		this.up_flow = up_flow;
 17 		this.d_flow = d_flow;
 18 		this.sum_flow = up_flow + d_flow;
 19 	}
 20
 21 	/**
 22 	 * 序列化,将数据字段以字节流写出去
 23 	 */
 24 	public void write(DataOutput out) throws IOException {
 25 		out.writeUTF(this.phoneNbr);
 26 		out.writeLong(this.up_flow);
 27 		out.writeLong(this.d_flow);
 28 		out.writeLong(this.sum_flow);
 29 	}
 30
 31 	/**
 32 	 * 反序列化,从字节流中读出各个数据字段 读出的顺序应该跟序列化时写入的顺序保持一致
 33 	 */
 34 	public void readFields(DataInput in) throws IOException {
 35 		this.phoneNbr = in.readUTF();
 36 		this.up_flow = in.readLong();
 37 		this.d_flow = in.readLong();
 38 		this.sum_flow = in.readLong();
 39 	}
 40
 41 	public int compareTo(FlowBean o) {
 42 		return this.sum_flow > o.getSum_flow() ? -1 : 1;
 43 	}
 44
 45 	@Override
 46 	public String toString() {
 47 		return phoneNbr+"\t"+up_flow + "\t" + d_flow + "\t" + sum_flow;
 48 	}
 49
 50 	public String getPhoneNbr() {
 51 		return phoneNbr;
 52 	}
 53
 54 	public void setPhoneNbr(String phoneNbr) {
 55 		this.phoneNbr = phoneNbr;
 56 	}
 57
 58 	public long getUp_flow() {
 59 		return up_flow;
 60 	}
 61
 62 	public void setUp_flow(long up_flow) {
 63 		this.up_flow = up_flow;
 64 	}
 65
 66 	public long getD_flow() {
 67 		return d_flow;
 68 	}
 69
 70 	public void setD_flow(long d_flow) {
 71 		this.d_flow = d_flow;
 72 	}
 73
 74 	public long getSum_flow() {
 75 		return sum_flow;
 76 	}
 77
 78 	public void setSum_flow(long sum_flow) {
 79 		this.sum_flow = sum_flow;
 80 	}
 81
 82 }

PhonePartitioner.java

  1 import java.util.HashMap;
  2 import org.apache.hadoop.mapreduce.Partitioner;
  3
  4 public class PhonePartitioner<KEY, VALUE> extends Partitioner<KEY, VALUE>{
  5
  6 	private static HashMap<String, Integer> areaMap =  new HashMap<>();
  7
  8 	static {
  9 		areaMap.put("136", 0);
 10 		areaMap.put("137", 1);
 11 		areaMap.put("138", 2);
 12 		areaMap.put("139", 3);
 13 	}
 14
 15 	@Override
 16 	public int getPartition(KEY key, VALUE value, int numPartitions) {
 17 		Integer provinceCode = areaMap.get(key.toString().substring(0,3));
 18 		return provinceCode==null?4:provinceCode;
 19 	}
 20 }

FlowCount.java

  1 import java.io.IOException;
  2 import java.net.URI;
  3 import org.apache.commons.lang.StringUtils;
  4 import org.apache.hadoop.conf.Configuration;
  5 import org.apache.hadoop.fs.FileSystem;
  6 import org.apache.hadoop.fs.Path;
  7 import org.apache.hadoop.io.LongWritable;
  8 import org.apache.hadoop.io.Text;
  9 import org.apache.hadoop.mapreduce.Job;
 10 import org.apache.hadoop.mapreduce.Mapper;
 11 import org.apache.hadoop.mapreduce.Reducer;
 12 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 13 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
 14 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 15 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
 16
 17
 18 //统计手机号总流,分区并且统计流量总数
 19 public class FlowCount {
 20
 21 	public static class FlowCountMapper extends
 22 			Mapper<LongWritable, Text, Text, FlowBean> {
 23 		private FlowBean flowBean = new FlowBean();
 24
 25 		@Override
 26 		protected void map(LongWritable key, Text value, Context context)
 27 				throws IOException, InterruptedException {
 28
 29 			String line = value.toString();
 30 			if (StringUtils.isBlank(line)) {
 31 				return;
 32 			}
 33
 34 			// 切分字段
 35 			String[] fields = line.split("\\s+");
 36
 37 			// 拿到我们需要的若干个字段
 38 			String phoneNbr = fields[1];
 39 			long up_flow = Long.parseLong(fields[fields.length - 3]);
 40 			long d_flow = Long.parseLong(fields[fields.length - 2]);
 41 			// 将数据封装到一个flowbean中
 42 			flowBean.set(phoneNbr, up_flow, d_flow);
 43 			context.write(new Text(phoneNbr), flowBean);
 44 		}
 45 	}
 46
 47 	public static class FlowCountReducer extends
 48 			Reducer<Text, FlowBean, Text, FlowBean> {
 49 		private FlowBean flowBean = new FlowBean();
 50
 51 		@Override
 52 		protected void reduce(Text key, Iterable<FlowBean> values,
 53 				Context context) throws IOException, InterruptedException {
 54 			long up_flow_sum = 0;
 55 			long d_flow_sum = 0;
 56 			for (FlowBean flowBean : values) {
 57 				up_flow_sum += flowBean.getUp_flow();
 58 				d_flow_sum += flowBean.getD_flow();
 59 			}
 60 			flowBean.set(key.toString(), up_flow_sum, d_flow_sum);
 61 			context.write(key, flowBean);
 62 		}
 63 	}
 64
 65 	final static String INPUT_PATH = "hdfs://10.202.62.66:8020/wmapreduce/phoneflow/input/phonedata.txt";
 66 	final static String OUT_PATH = "hdfs://10.202.62.66:8020/wmapreduce/phoneflow/output";
 67
 68 	public static void main(String[] args) throws Exception {
 69 		Configuration conf = new Configuration();
 70
 71 		final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), conf);
 72 		if (fileSystem.exists(new Path(OUT_PATH))) {
 73 			fileSystem.delete(new Path(OUT_PATH), true);
 74 		}
 75
 76 		Job job = Job.getInstance(conf, "flowjob");
 77 		job.setJarByClass(FlowCount.class);
 78
 79 		job.setMapperClass(FlowCountMapper.class);
 80 		job.setReducerClass(FlowCountReducer.class);
 81
 82 		job.setPartitionerClass(PhonePartitioner.class);
 83 		job.setNumReduceTasks(5);
 84
 85 		job.setMapOutputKeyClass(Text.class);
 86 		job.setMapOutputValueClass(FlowBean.class);
 87
 88 		job.setOutputKeyClass(Text.class);
 89 		job.setOutputValueClass(FlowBean.class);
 90
 91 		job.setInputFormatClass(TextInputFormat.class);
 92 		job.setOutputFormatClass(TextOutputFormat.class);
 93
 94 		FileInputFormat.setInputPaths(job, new Path(INPUT_PATH));
 95 		FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));
 96
 97 		job.waitForCompletion(true);
 98 	}
 99 }

FlowCountSort.java

  1 import java.io.IOException;
  2 import java.net.URI;
  3 import org.apache.commons.lang.StringUtils;
  4 import org.apache.hadoop.conf.Configuration;
  5 import org.apache.hadoop.fs.FileSystem;
  6 import org.apache.hadoop.fs.Path;
  7 import org.apache.hadoop.io.LongWritable;
  8 import org.apache.hadoop.io.NullWritable;
  9 import org.apache.hadoop.io.Text;
 10 import org.apache.hadoop.mapreduce.Job;
 11 import org.apache.hadoop.mapreduce.Mapper;
 12 import org.apache.hadoop.mapreduce.Reducer;
 13 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 14 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
 15 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 16 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
 17
 18
 19 //按照总流量数从大到小排序
 20 public class FlowCountSort {
 21
 22 	public static class FlowCountSortMapper extends Mapper<LongWritable, Text, FlowBean, NullWritable>{
 23 		private FlowBean bean = new FlowBean();
 24 		@Override
 25 		protected void map(LongWritable key, Text value,Context context)throws IOException, InterruptedException {
 26 			String line = value.toString();
 27
 28 			if (StringUtils.isBlank(line)) {
 29 				return;
 30 			}
 31
 32 			String[] fields = StringUtils.split(line, "\t");
 33 			String phoneNbr = fields[1];
 34 			long up_flow = Long.parseLong(fields[fields.length - 3]);
 35 			long d_flow = Long.parseLong(fields[fields.length - 2]);
 36 			bean.set(phoneNbr, up_flow, d_flow);
 37
 38 			context.write(bean, NullWritable.get());
 39 		}
 40 	}
 41
 42 	public static class FlowCountSortReducer extends Reducer<FlowBean, NullWritable, Text, FlowBean>{
 43 		@Override
 44 		protected void reduce(FlowBean bean, Iterable<NullWritable> values,Context context) throws IOException, InterruptedException {
 45 			context.write(new Text(bean.getPhoneNbr()), bean);
 46 		}
 47 	}
 48
 49 	final static String INPUT_PATH = "hdfs://10.202.62.66:8020/wmapreduce/phoneflow/input/phonedata.txt";
 50 	final static String OUT_PATH = "hdfs://10.202.62.66:8020/wmapreduce/phoneflow/output";
 51
 52
 53 	public static void main(String[] args) throws Exception {
 54
 55 		Configuration conf = new Configuration();
 56
 57 		final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), conf);
 58 		if (fileSystem.exists(new Path(OUT_PATH))) {
 59 			fileSystem.delete(new Path(OUT_PATH), true);
 60 		}
 61
 62 		Job job = Job.getInstance(conf,"flowjob");
 63 		job.setJarByClass(FlowCountSort.class);
 64
 65 		job.setMapperClass(FlowCountSortMapper.class);
 66 		job.setReducerClass(FlowCountSortReducer.class);
 67
 68 		job.setMapOutputKeyClass(FlowBean.class);
 69 		job.setMapOutputValueClass(NullWritable.class);
 70
 71 		job.setOutputKeyClass(Text.class);
 72 		job.setOutputValueClass(FlowBean.class);
 73
 74 		job.setInputFormatClass(TextInputFormat.class);
 75 		job.setOutputFormatClass(TextOutputFormat.class);
 76
 77 		FileInputFormat.setInputPaths(job, new Path(INPUT_PATH));
 78 		FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));
 79
 80 		job.waitForCompletion(true);
 81 	}
 82 }

原文地址:https://www.cnblogs.com/mingyueguli/p/10492137.html

时间: 2024-08-02 06:29:58

MapReduce基础入门(二)的相关文章

C#基础入门 二

C#基础入门 二 循环语句 与C语言中用法相同. continue:结束本次循环(continue)后面的代码不再执行,进入下次循环(通常与if连用). 数组 一维数组定义:int[] intArray; 一维数组初始化:定义后,必须对其初始化才能使用 动态初始化:数据类型[] 数组名=new 数据类型[数组长度]{元素1,元素2...}:动态初始化借助new运算符为数组元素分配内存空间,并赋初值. 不给定初值情况下,各元素取默认值,数值型初始值为0,布尔型为false. 静态初始化:数据类型[

MapReduce(一) mapreduce基础入门

一.mapreduce入门 1.什么是mapreduce 首先让我们来重温一下 hadoop 的四大组件:HDFS:分布式存储系统MapReduce:分布式计算系统YARN: hadoop 的资源调度系统Common: 以上三大组件的底层支撑组件,主要提供基础工具包和 RPC 框架等 Mapreduce 是一个分布式运算程序的编程框架,是用户开发"基于 hadoop 的数据分析 应用"的核心框架Mapreduce 核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的 分布式

Python基础入门 (二)

在上次课程中简要的讲述了Python的基础常识,现在来详细的学习一下吧! 一.类和对象 面向过程和面向对象 面向过程:C 面向对象:Java.Python等 类和对象的含义: 类,是对事物的抽象,比如:车 对象,是类的一个实例,比如:别克 类和对象的关系: 类是对象的定义,而实例是真正的事物,它存放了类中所定义的对象的具体信息: 对象的创建,也就是类的实例化. 对于Python,一切事物都是对象,对象基于类的创建. 具体实例讲解 右边的两个对象是基于左边的字符串类创建的,而两个字符串同时具有了字

前端基础入门二(CSS)

学习大纲 学会使用CSS选择器 熟记CSS样式和外观属性 熟练掌握CSS各种选择器 熟练掌握CSS各种选择器 熟练掌握CSS三种显示模式 熟练掌握CSS背景属性 熟练掌握CSS三大特性 熟练掌握CSS盒子模型 熟练掌握CSS浮动 10.熟练掌握CSS定位11.熟练掌握CSS高级技巧强化CSS 自己是个做了几年开发的老码农,希望本文对你有用! 这里推荐一下我的前端学习交流圈:767273102 ,里面都是学习前端的,从最基础的HTML+CSS+JS[炫酷特效,游戏,插件封装,设计模式]到移动端HT

Spring MVC 基础入门二

单个接收表单提交的参数 在实际开发中通过会在spring MVC的Controller里面接收表单提交过来的参数,这块代码该怎么去编写呢? 示例: 编写一个带有表单的jsp: <%@ page contentType="text/html;charset=UTF-8" language="java" %> <html> <head> <title>Title</title> </head> &l

React.js 基础入门二--组件嵌套

组件:在设计接口的时候,将常用元素(按钮,表单字段,布局组件等)分解成定义好接口的可重用组件.这样下次你在创建同样界面就可以少写很多代码,这意味着更快的开发时间,更少的bug,和更少的内容下载. 将上章节Index.html中Js部分修改为: <script type="text/jsx">   // 注意此处                 // 定义web组件 (<font color="#ff0000">MessageBox 错误组件嵌

C#学习笔记---基础入门(二)

枚举 枚举是被命名的整型常数的集合:枚举类型的变量只有赋值后才能使用:不同枚举中的枚举值可以重名:可以自定义枚举值. enum Playstates {            跑, 跳,下滑,左转,右转}枚举值的使用:Playstates.跑: 结构体 结构体相当于一个复合型的容器,其是由一系列不同类型的数据构成的集合:结构体中的成员不能在定义中赋初始值. struct Person{ //public 类型 变量名 public string name; public int age; }//

Maven基础入门(二)

配置本地仓库 找到apache-maven-3.6.1\conf\setting.xml文件,修改本地仓库的地址. <localRepository>E:\apache-maven-3.6.1\repertories</localRepository> 原文地址:https://www.cnblogs.com/feiqiangsheng/p/12315323.html

[Spring框架]Spring AOP基础入门总结一.

前言:前面已经有两篇文章讲了Spring IOC/DI 以及 使用xml和注解两种方法开发的案例, 下面就来梳理一下Spring的另一核心AOP. 一, 什么是AOP 在软件业,AOP为Aspect Oriented Programming的缩写,意为:面向切面编程,通过预编译方式和运行期动态代理实现程序功能的统一维护的一种技术.AOP是OOP的延续,是软件开发中的一个热点,也是Spring框架中的一个重要内容,是函数式编程的一种衍生范型.利用AOP可以对业务逻辑的各个部分进行隔离,从而使得业务