MapReduce教程(二)MapReduce框架Partitioner分区<转>

1 Partitioner分区

1.1 Partitioner分区描述

在进行MapReduce计算时,有时候需要把最终的输出数据分到不同的文件中,按照手机号码段划分的话,需要把同一手机号码段的数据放到一个文件中;按照省份划分的话,需要把同一省份的数据放到一个文件中;按照性别划分的话,需要把同一性别的数据放到一个文件中。我们知道最终的输出数据是来自于Reducer任务。那么,如果要得到多个文件,意味着有同样数量的Reducer任务在运行。Reducer任务的数据来自于Mapper任务,也就说Mapper任务要划分数据,对于不同的数据分配给不同的Reducer任务运行。Mapper任务划分数据的过程就称作Partition。负责实现划分数据的类称作Partitioner。

1.2 MapReduce运行原理

 

MapReduce流程图 - 1.1

 

1.3 数据需求

将文件input_data.txt中的用户数据,根据用户的手机号,按照手机号进行分区。

附件地址链接:http://download.csdn.net/detail/yuan_xw/9459721

1.4 实现步骤

1、    编写UserMapper类,分析用户数据信息。

2、    编写UserReducer类,计算用户的年收数据信息。

3、    编写ProviderPartitioner类,Partitioner组件可以让Map对Key进行分区,从而可以根据不同的key来分发到不同的reduce中去处理。

1.5 UserMapper代码编写

UserMapper类,读取和分析用户数据。

[java] view plain copy

  1. package com.hadoop.mapreduce;
  2. import java.io.IOException;
  3. import org.apache.hadoop.io.LongWritable;
  4. import org.apache.hadoop.io.NullWritable;
  5. import org.apache.hadoop.io.Text;
  6. import org.apache.hadoop.mapreduce.Mapper;
  7. import entity.UserEntity;
  8. /*
  9. * 继承Mapper类需要定义四个输出、输出类型泛型:
  10. * 四个泛型类型分别代表:
  11. * KeyIn        Mapper的输入数据的Key,这里是每行文字的起始位置(0,11,...)
  12. * ValueIn      Mapper的输入数据的Value,这里是每行文字
  13. * KeyOut       Mapper的输出数据的Key,这里是序列化对象UserEntity
  14. * ValueOut     Mapper的输出数据的Value,不返回任何值
  15. *
  16. * Writable接口是一个实现了序列化协议的序列化对象。
  17. * 在Hadoop中定义一个结构化对象都要实现Writable接口,使得该结构化对象可以序列化为字节流,字节流也可以反序列化为结构化对象。
  18. * LongWritable类型:Hadoop.io对Long类型的封装类型
  19. */
  20. public class UserMapper extends Mapper<LongWritable, Text, UserEntity, NullWritable> {
  21. private UserEntity userEntity = new UserEntity();
  22. @Override
  23. protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, UserEntity, NullWritable>.Context context)
  24. throws IOException, InterruptedException {
  25. //将每行的数据以空格切分数据,获得每个字段数据 1 135****9365 林*彬 2484 北京市昌平区北七家东三旗村
  26. String[] fields = value.toString().split("\t");
  27. // 赋值userEntity
  28. userEntity.set(Integer.parseInt(fields[0]), fields[1], fields[2],Double.parseDouble(fields[3]), fields[4],0.00);
  29. // 将对象序列化
  30. context.write(userEntity,NullWritable.get());
  31. }
  32. }

1.6 UserReducer代码编写

UserReducer类,计算用户的年收数据信息。

[java] view plain copy

  1. package com.hadoop.mapreduce;
  2. import java.io.IOException;
  3. import org.apache.hadoop.io.NullWritable;
  4. import org.apache.hadoop.mapreduce.Reducer;
  5. import entity.UserEntity;
  6. /*
  7. * Reducer需要定义四个输出、输出类型泛型:
  8. * 四个泛型类型分别代表:
  9. * KeyIn        Reducer的输入数据的Key,这里是序列化对象UserEntity
  10. * ValueIn      Reducer的输入数据的Value,这里是NullWritable
  11. * KeyOut       Reducer的输出数据的Key,这里是序列化对象UserEntity
  12. * ValueOut     Reducer的输出数据的Value,NullWritable
  13. */
  14. public class UserReducer extends Reducer<UserEntity, NullWritable, UserEntity, NullWritable>{
  15. @Override
  16. protected void reduce(UserEntity userEntity, Iterable<NullWritable> values,
  17. Reducer<UserEntity, NullWritable, UserEntity, NullWritable>.Context context)
  18. throws IOException, InterruptedException {
  19. // 年收入 = 月收入 * 12  四舍五入
  20. String yearIncome = String.format("%.2f", userEntity.getMonthIncome() * 12);
  21. userEntity.setYearIncome(Double.parseDouble(yearIncome));
  22. context.write(userEntity, NullWritable.get());
  23. }
  24. }

1.7 ProviderPartitioner代码编写

Partitioner用于划分键值空间(key space)。

Partitioner组件可以让Map对Key进行分区,从而可以根据不同的key来分发到不同的reduce中去处理。分区的数量与一个作业的reduce任务的数量是一样的。它控制将中间过程的key(也就是这条记录)应该发送给m个reduce任务中的哪一个来进行reduce操作。HashPartitioner是默认的Partitioner。

[java] view plain copy

  1. package com.hadoop.mapreduce;
  2. import java.util.HashMap;
  3. import java.util.Map;
  4. import org.apache.hadoop.io.NullWritable;
  5. import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
  6. import entity.UserEntity;
  7. /*
  8. * Partitioner用于划分键值空间(key space)。
  9. * Partitioner组件可以让Map对Key进行分区,从而可以根据不同的key来分发到不同的reduce中去处理。
  10. * 分区的数量与一个作业的reduce任务的数量是一样的。
  11. * 它控制将中间过程的key(也就是这条记录)应该发送给m个reduce任务中的哪一个来进行reduce操作。
  12. * HashPartitioner是默认的 Partitioner。
  13. */
  14. /**
  15. * 继承抽象类Partitioner,实现自定义的getPartition()方法
  16. * 通过job.setPartitionerClass()来设置自定义的Partitioner;
  17. */
  18. public class ProviderPartitioner extends HashPartitioner<UserEntity, NullWritable> {
  19. // 声明providerMap,并且在static静态块中初始化
  20. private static Map<String, Integer> providerMap = new HashMap<String, Integer>();
  21. static {
  22. providerMap.put("130", 0);
  23. providerMap.put("133", 0);
  24. providerMap.put("134", 0);
  25. providerMap.put("135", 0);
  26. providerMap.put("136", 0);
  27. providerMap.put("137", 0);
  28. providerMap.put("138", 0);
  29. providerMap.put("139", 0);
  30. providerMap.put("150", 1);
  31. providerMap.put("151", 1);
  32. providerMap.put("153", 1);
  33. providerMap.put("158", 1);
  34. providerMap.put("159", 1);
  35. providerMap.put("170", 2);
  36. providerMap.put("180", 3);
  37. providerMap.put("181", 3);
  38. providerMap.put("183", 3);
  39. providerMap.put("185", 3);
  40. providerMap.put("186", 3);
  41. providerMap.put("187", 3);
  42. providerMap.put("188", 3);
  43. providerMap.put("189", 3);
  44. }
  45. /**
  46. * 实现自定义的getPartition()方法,自定义分区规则
  47. */
  48. @Override
  49. public int getPartition(UserEntity key, NullWritable value, int numPartitions) {
  50. String prefix = key.getMobile().substring(0, 3);
  51. return providerMap.get(prefix);
  52. }
  53. }

1.8 UserAnalysis代码编写

UserAnalysis类,程序执行入口类。

[java] view plain copy

  1. package com.hadoop.mapreduce;
  2. import java.io.IOException;
  3. import org.apache.hadoop.conf.Configuration;
  4. import org.apache.hadoop.fs.Path;
  5. import org.apache.hadoop.io.NullWritable;
  6. import org.apache.hadoop.mapreduce.Job;
  7. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  8. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  9. import entity.UserEntity;
  10. public class UserAnalysis {
  11. public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
  12. // 创建job对象
  13. Job job = Job.getInstance(new Configuration());
  14. // 指定程序的入口
  15. job.setJarByClass(UserAnalysis.class);
  16. // 指定自定义的Mapper阶段的任务处理类
  17. job.setMapperClass(UserMapper.class);
  18. job.setMapOutputKeyClass(UserEntity.class);
  19. job.setMapOutputValueClass(NullWritable.class);
  20. // 数据HDFS文件服务器读取数据路径
  21. FileInputFormat.setInputPaths(job, new Path("/mapreduce/partitioner/input_data.txt"));
  22. // 指定自定义的Reducer阶段的任务处理类
  23. job.setReducerClass(UserReducer.class);
  24. // 设置最后输出结果的Key和Value的类型
  25. job.setOutputKeyClass(UserEntity.class);
  26. job.setOutputValueClass(NullWritable.class);
  27. // 设置定义分区的处理类
  28. job.setPartitionerClass(ProviderPartitioner.class);
  29. // 默认ReduceTasks数是1
  30. // 我们对手机号分成4类,所以应该设置为4
  31. job.setNumReduceTasks(4);
  32. // 将计算的结果上传到HDFS服务
  33. FileOutputFormat.setOutputPath(job, new Path("/mapreduce/partitioner/output_data"));
  34. // 执行提交job方法,直到完成,参数true打印进度和详情
  35. job.waitForCompletion(true);
  36. System.out.println("Finished");
  37. }
  38. }

2 安装部署

2.1 生成JAR包

1、    选择hdfs项目->右击菜单->Export…,在弹出的提示框中选择Java下的JAR file:

2、    选择hdfs项目->右击菜单->Export…,在弹出的提示框中选择Java下的JAR file:

3、    设置程序的入口,设置完成后,点击Finish:

4、    成生UserAnalysis.jar如下文件,如下图:

2.2 执行JAR运行结果

1、    打开Xft软件,将E:盘的UserAnalysis.jar、input_data.txt文件上传到Linux/home路径下:

2、    执行命令:

创建两个文件夹:

hadoop fs -mkdir /mapreduce/

hadoop fs -mkdir /mapreduce/partitioner

上传input_data.txt文件:

hadoop fs -put /home/input_data.txt /mapreduce/partitioner

3、    执行JAR包:

切换目录命令:cd /home/

运行JAR包:hadoop jar UserAnalysis.jar

4、    查看执行结果:

查看目录命令:hadoop fs -ls R/mapreduce/partitioner/output_data

5、    查看文件

查看part-r-00000文件命令:hadoop fs -cat/mapreduce/partitioner/output_data/part-r-00000

查看part-r-00001文件命令:hadoop fs -cat/mapreduce/partitioner/output_data/part-r-00001

查看part-r-00002文件命令:hadoop fs -cat/mapreduce/partitioner/output_data/part-r-00002

查看part-r-00003文件命令:hadoop fs -cat/mapreduce/partitioner/output_data/part-r-00003

2.3 相关下载

1、    UserAnalysis.jar包下载地址:http://download.csdn.net/detail/yuan_xw/9459711

2、    input_data.txt文件下载地址:http://download.csdn.net/detail/yuan_xw/9459721

3、    源代码下载地址:http://download.csdn.net/detail/yuan_xw/9459707

--以上为《MapReduce教程(二)MapReduce框架Partitioner分区》,如有不当之处请指出,我后续逐步完善更正,大家共同提高。谢谢大家对我的关注。

——厚积薄发(yuanxw)

转自http://blog.csdn.net/yuan_xw/article/details/50867819

时间: 2024-08-02 03:34:24

MapReduce教程(二)MapReduce框架Partitioner分区<转>的相关文章

MapReduce教程(一)基于MapReduce框架开发&lt;转&gt;

1 MapReduce编程 1.1 MapReduce简介 MapReduce是一种编程模型,用于大规模数据集(大于1TB)的并行运算,用于解决海量数据的计算问题. MapReduce分成了两个部分: 1.映射(Mapping)对集合里的每个目标应用同一个操作.即,如果你想把表单里每个单元格乘以二,那么把这个函数单独地应用在每个单元格上的操作就属于mapping. 2.化简(Reducing)遍历集合中的元素来返回一个综合的结果.即,输出表单里一列数字的和这个任务属于reducing. 你向Ma

MapReduce框架Partitioner分区方法

前言:对于二次排序相信大家也是似懂非懂,我也是一样,对其中的很多方法都不理解诶,所有只有暂时放在一边,当你接触到其他的函数,你知道的越多时你对二次排序的理解也就更深入了,同时建议大家对wordcount的流程好好分析一下,要真正的知道每一步都是干什么的. 1.Partitioner分区类的作用是什么? 2.getPartition()三个参数分别是什么? 3.numReduceTasks指的是设置的Reducer任务数量,默认值是是多少? 扩展: 如果不同类型的数据被分配到了同一个分区,输出的数

MapReduce处理二次排序(分区-排序-分组)

MapReduce二次排序原理 在map阶段,使用job.setInputFormatClass定义的InputFormat将输入的数据集分割成小数据块splites,同时InputFormat提供一个RecordReader的实现. 本例子中使用的时TextInputFormat,他提供的RecordReader会将文本的字节偏移量作为key,这一行的文本作为value. 这就是自定义Map的输入是<LongWritable,Text>的原因,然后调用自定义的Map的map方法,将一个个&l

hadoop(二MapReduce)

hadoop(二MapReduce) 介绍 MapReduce:其实就是把数据分开处理后再将数据合在一起. Map负责“分”,即把复杂的任务分解为若干个“简单的任务”来并行处理.可以进行拆分的前提是这些小任务可以并行计算,彼此间几乎没有依赖关系. Reduce负责“合”,即对map阶段的结果进行全局汇总. MapReduce运行在yarn集群 MapReduce中定义了如下的Map和Reduce两个抽象的编程接口,由用户去编程实现.Map和Reduce, MapReduce处理的数据类型是键值对

DataVeryLite入门教程(二) Entity篇

DataVeryLite 是基于.net 4.0的数据库持久化ORM框架. 目前支持的数据库有Sqlserver,Mysql,Oracle,Db2,PostgreSql,Sqlite和Access. 最好先阅读DataVeryLite入门教程(一) 配置篇,然后再阅读本篇.如果你觉得麻烦也可以跳过. Entity是ORM中的核心对象之一,一个继承Entity的对象对应于数据库中的一个表. Entity提供丰富的API对表中的单条数据进行操作. 比如根据id或其他条件,加载,删除,插入,更新和部分

Spring Cloud 入门教程(二): 配置管理

使用Config Server,您可以在所有环境中管理应用程序的外部属性.客户端和服务器上的概念映射与Spring Environment和PropertySource抽象相同,因此它们与Spring应用程序非常契合,但可以与任何以任何语言运行的应用程序一起使用.随着应用程序通过从开发人员到测试和生产的部署流程,您可以管理这些环境之间的配置,并确定应用程序具有迁移时需要运行的一切.服务器存储后端的默认实现使用git,因此它轻松支持标签版本的配置环境,以及可以访问用于管理内容的各种工具.很容易添加

redis学习教程五《管道、分区》

redis学习教程五<管道.分区> 一:管道 Redis是一个TCP服务器,支持请求/响应协议. 在Redis中,请求通过以下步骤完成: 客户端向服务器发送查询,并从套接字读取,通常以阻塞的方式,用于服务器响应. 服务器处理命令并将响应发送回客户端. 管道的意义 管道的基本含义是,客户端可以向服务器发送多个请求,而不必等待回复,并最终在一个步骤中读取回复. 示例 要检查Redis管道,只需启动Redis实例,并在终端中键入以下命令. (echo -en "PING\r\n SET t

无废话ExtJs 入门教程二[Hello World]

无废话ExtJs 入门教程二[Hello World] extjs技术交流,欢迎加群(201926085) 我们在学校里学习任何一门语言都是从"Hello World"开始,这里我们也不例外. 1.代码如下: 1 <!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd&

无废话ExtJs 入门教程二十一[继承:Extend]

无废话ExtJs 入门教程二十一[继承:Extend] extjs技术交流,欢迎加群(201926085) 在开发中,我们在使用视图组件时,经常要设置宽度,高度,标题等属性.而这些属性可以通过“继承”定义在我们定义的新组件中,从而达到重用的目地. 1.代码如下: 1 <!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-