MapReduce的Partitioner案例

项目简介

这里给出一个经典的词频统计的案例:统计如下样本数据中每个单词出现的次数。



SparkHBase
HiveFlinkStormHadoopHBaseSpark
Flink
HBaseStorm
HBaseHadoopHiveFlink
HBaseFlinkHiveStorm
HiveFlinkHadoop
HBaseHive
HadoopSparkHBaseStorm
HBaseHadoopHiveFlink
HBaseFlinkHiveStorm
HiveFlinkHadoop
HBaseHive

13

1

SparkHBase

2

HiveFlinkStormHadoopHBaseSpark

3

Flink

4

HBaseStorm

5

HBaseHadoopHiveFlink

6

HBaseFlinkHiveStorm

7

HiveFlinkHadoop

8

HBaseHive

9

HadoopSparkHBaseStorm

10

HBaseHadoopHiveFlink

11

HBaseFlinkHiveStorm

12

HiveFlinkHadoop

13

HBaseHive

项目依赖

想要进行 MapReduce 编程,需要导入 hadoop-client 依赖:



<dependency>
   <groupId>org.apache.hadoop</groupId>
   <artifactId>hadoop-client</artifactId>
   <version>${hadoop.version}</version>
</dependency>

5

1

<dependency>

2

   <groupId>org.apache.hadoop</groupId>

3

   <artifactId>hadoop-client</artifactId>

4

   <version>${hadoop.version}</version>

5

</dependency>

WordCountMapper

将每行数据按照指定分隔符进行拆分。这里需要注意在 MapReduce 中必须使用 Hadoop 定义的类型,因为 Hadoop 预定义的类型都是可序列化,可比较的,所有类型均实现了 WritableComparable接口。



public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

   @Override
   protected void map(LongWritable key, Text value, Context context) throws IOException,
                                                                     InterruptedException {
       String[] words = value.toString().split("\t");
       for (String word : words) {
           context.write(new Text(word), new IntWritable(1));
      }
  }
}

11

1

public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

2


3

   @Override

4

   protected void map(LongWritable key, Text value, Context context) throws IOException, 

5

                                                                     InterruptedException {

6

       String[] words = value.toString().split("\t");

7

       for (String word : words) {

8

           context.write(new Text(word), new IntWritable(1));

9

      }

10

  }

11

}

WordCountMapper 对应下图的 Mapping 操作:

WordCountMapper继承自 Mappe类,这是一个泛型类,定义如下:



WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable>

public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {

}

5

1

WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable>

2


3

public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {

4

  

5

}

  • KEYIN : mapping输入 key 的类型,即每行的偏移量 (每行第一个字符在整个文本中的位置),Long类型,对应 Hadoop 中的 LongWritable类型;
  • VALUEIN : mapping输入 value 的类型,即每行数据;String类型,对应 Hadoop 中 Text类型;
  • KEYOUTmapping输出的 key 的类型,即每个单词;String类型,对应 Hadoop 中 Text类型;
  • VALUEOUTmapping输出 value 的类型,即每个单词出现的次数;这里用 int 类型,对应 IntWritable类型。

WordCountReducer

在 Reduce 中进行单词出现次数的统计:



public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

   @Override
   protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException,
                                                                                 InterruptedException {
       int count = 0;
       for (IntWritable value : values) {
           count += value.get();
      }
       context.write(key, new IntWritable(count));
  }
}

12

1

public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

2


3

   @Override

4

   protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, 

5

                                                                                 InterruptedException {

6

       int count = 0;

7

       for (IntWritable value : values) {

8

           count += value.get();

9

      }

10

       context.write(key, new IntWritable(count));

11

  }

12

}

如下图,shuffling 的输出是 reduce 的输入。这里的 key 是每个单词,values 是一个可迭代的数据类型,类似 (1,1,1,...)

WordCountApp

组装 MapReduce 作业,并提交到服务器运行,代码如下:



/**
* 组装作业 并提交到集群运行
*/
public class WordCountApp {
   // 这里为了直观显示参数 使用了硬编码,实际开发中可以通过外部传参
   private static final String HDFS_URL = "hdfs://192.168.100.1";
   private static final String HADOOP_USER_NAME = "root";

   public static void main(String[] args) throws Exception {

       // 文件输入路径和输出路径由外部传参指定
       if (args.length < 2) {
           System.out.println("Input and output paths are necessary!");
           return;
      }

       // 需要指明 hadoop 用户名,否则在 HDFS 上创建目录时可能会抛出权限不足的异常
       System.setProperty("HADOOP_USER_NAME", HADOOP_USER_NAME);

       Configuration configuration = new Configuration();
       // 指明 HDFS 的地址
       configuration.set("fs.defaultFS", HDFS_URL);

       // 创建一个 Job
       Job job = Job.getInstance(configuration);

       // 设置运行的主类
       job.setJarByClass(WordCountApp.class);

       // 设置 Mapper 和 Reducer
       job.setMapperClass(WordCountMapper.class);
       job.setReducerClass(WordCountReducer.class);

       // 设置 Mapper 输出 key 和 value 的类型
       job.setMapOutputKeyClass(Text.class);
       job.setMapOutputValueClass(IntWritable.class);

       // 设置 Reducer 输出 key 和 value 的类型
       job.setOutputKeyClass(Text.class);
       job.setOutputValueClass(IntWritable.class);

       // 如果输出目录已经存在,则必须先删除,否则重复运行程序时会抛出异常
       FileSystem fileSystem = FileSystem.get(new URI(HDFS_URL), configuration, HADOOP_USER_NAME);
       Path outputPath = new Path(args[1]);
       if (fileSystem.exists(outputPath)) {
           fileSystem.delete(outputPath, true);
      }

       // 设置作业输入文件和输出文件的路径
       FileInputFormat.setInputPaths(job, new Path(args[0]));
       FileOutputFormat.setOutputPath(job, outputPath);

       // 将作业提交到群集并等待它完成,参数设置为 true 代表打印显示对应的进度
       boolean result = job.waitForCompletion(true);

       // 关闭之前创建的 fileSystem
       fileSystem.close();

       // 根据作业结果,终止当前运行的 Java 虚拟机,退出程序
       System.exit(result ? 0 : -1);

  }
}

63

1

/**

2

* 组装作业 并提交到集群运行

3

*/

4

public class WordCountApp {

5

   // 这里为了直观显示参数 使用了硬编码,实际开发中可以通过外部传参

6

   private static final String HDFS_URL = "hdfs://192.168.100.1";

7

   private static final String HADOOP_USER_NAME = "root";

8


9

   public static void main(String[] args) throws Exception {

10


11

       // 文件输入路径和输出路径由外部传参指定

12

       if (args.length < 2) {

13

           System.out.println("Input and output paths are necessary!");

14

           return;

15

      }

16


17

       // 需要指明 hadoop 用户名,否则在 HDFS 上创建目录时可能会抛出权限不足的异常

18

       System.setProperty("HADOOP_USER_NAME", HADOOP_USER_NAME);

19


20

       Configuration configuration = new Configuration();

21

       // 指明 HDFS 的地址

22

       configuration.set("fs.defaultFS", HDFS_URL);

23


24

       // 创建一个 Job

25

       Job job = Job.getInstance(configuration);

26


27

       // 设置运行的主类

28

       job.setJarByClass(WordCountApp.class);

29


30

       // 设置 Mapper 和 Reducer

31

       job.setMapperClass(WordCountMapper.class);

32

       job.setReducerClass(WordCountReducer.class);

33


34

       // 设置 Mapper 输出 key 和 value 的类型

35

       job.setMapOutputKeyClass(Text.class);

36

       job.setMapOutputValueClass(IntWritable.class);

37


38

       // 设置 Reducer 输出 key 和 value 的类型

39

       job.setOutputKeyClass(Text.class);

40

       job.setOutputValueClass(IntWritable.class);

41


42

       // 如果输出目录已经存在,则必须先删除,否则重复运行程序时会抛出异常

43

       FileSystem fileSystem = FileSystem.get(new URI(HDFS_URL), configuration, HADOOP_USER_NAME);

44

       Path outputPath = new Path(args[1]);

45

       if (fileSystem.exists(outputPath)) {

46

           fileSystem.delete(outputPath, true);

47

      }

48


49

       // 设置作业输入文件和输出文件的路径

50

       FileInputFormat.setInputPaths(job, new Path(args[0]));

51

       FileOutputFormat.setOutputPath(job, outputPath);

52


53

       // 将作业提交到群集并等待它完成,参数设置为 true 代表打印显示对应的进度

54

       boolean result = job.waitForCompletion(true);

55


56

       // 关闭之前创建的 fileSystem

57

       fileSystem.close();

58


59

       // 根据作业结果,终止当前运行的 Java 虚拟机,退出程序

60

       System.exit(result ? 0 : -1);

61


62

  }

63

}

需要注意的是:如果不设置 Mapper 操作的输出类型,则程序默认它和 Reducer操作输出的类型相同。

提交到服务器运行

在实际开发中,可以在本机配置 hadoop 开发环境,直接在 IDE 中启动进行测试。这里主要介绍一下打包提交到服务器运行。由于本项目没有使用除 Hadoop 外的第三方依赖,直接打包即可:

# mvn clean package

1

1

# mvn clean package

使用以下命令提交作业:



hadoop jar /usr/appjar/hadoop-word-count-1.0.jar com.heibaiying.WordCountApp /wordcount/input.txt /wordcount/output/WordCountApp

3

1

hadoop jar /usr/appjar/hadoop-word-count-1.0.jar \

2

com.heibaiying.WordCountApp \

3

/wordcount/input.txt /wordcount/output/WordCountApp

作业完成后查看 HDFS 上生成目录:



# 查看目录
hadoop fs -ls /wordcount/output/WordCountApp

# 查看统计结果
hadoop fs -cat /wordcount/output/WordCountApp/part-r-00000

5

1

# 查看目录

2

hadoop fs -ls /wordcount/output/WordCountApp

3


4

# 查看统计结果

5

hadoop fs -cat /wordcount/output/WordCountApp/part-r-00000

默认的Partitioner

这里假设有个需求:将不同单词的统计结果输出到不同文件。这种需求实际上比较常见,比如统计产品的销量时,需要将结果按照产品种类进行拆分。要实现这个功能,就需要用到自定义 Partitioner

这里先介绍下 MapReduce 默认的分类规则:在构建 job 时候,如果不指定,默认的使用的是 HashPartitioner:对 key 值进行哈希散列并对 numReduceTasks取余。其实现如下:

public class HashPartitioner<K, V> extends Partitioner<K, V> {

  public int getPartition(K key, V value,
                          int numReduceTasks) {
    return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
}
}

1

public class HashPartitioner<K, V> extends Partitioner<K, V> {

2


3

  public int getPartition(K key, V value,

4

                          int numReduceTasks) {

5

    return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;

6

}

7

}

自定义Partitioner

这里我们继承 Partitioner自定义分类规则,这里按照单词进行分类:



public class CustomPartitioner extends Partitioner<Text, IntWritable> {

   public int getPartition(Text text, IntWritable intWritable, int numPartitions) {
       return WordCountDataUtils.WORD_LIST.indexOf(text.toString());
  }
}

1

public class CustomPartitioner extends Partitioner<Text, IntWritable> {

2


3

   public int getPartition(Text text, IntWritable intWritable, int numPartitions) {

4

       return WordCountDataUtils.WORD_LIST.indexOf(text.toString());

5

  }

6

}

在构建 job时候指定使用我们自己的分类规则,并设置 reduce的个数:

执行结果

执行结果如下,分别生成 6 个文件,每个文件中为对应单词的统计结果:

原文地址:https://www.cnblogs.com/TiePiHeTao/p/a04e4beefea19a0825097a42eab6878f.html

时间: 2024-08-30 06:25:13

MapReduce的Partitioner案例的相关文章

hadoop笔记之MapReduce的应用案例(利用MapReduce进行排序)

MapReduce的应用案例(利用MapReduce进行排序) MapReduce的应用案例(利用MapReduce进行排序) 思路: Reduce之后直接进行结果合并 具体样例: 程序名:Sort.java import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; impo

MapReduce 单词统计案例编程

MapReduce 单词统计案例编程 一.在Linux环境安装Eclipse软件 1.   解压tar包 下载安装包eclipse-jee-kepler-SR1-linux-gtk-x86_64.tar.gz到/opt/software目录下. 解压到/opt/tools目录下: [[email protected] tools]$ tar -zxf /opt/sofeware/eclipse-jee-kepler-SR1-linux-gtk-x86_64.tar.gz -C /opt/tool

MapReduce教程(二)MapReduce框架Partitioner分区&lt;转&gt;

1 Partitioner分区 1.1 Partitioner分区描述 在进行MapReduce计算时,有时候需要把最终的输出数据分到不同的文件中,按照手机号码段划分的话,需要把同一手机号码段的数据放到一个文件中:按照省份划分的话,需要把同一省份的数据放到一个文件中:按照性别划分的话,需要把同一性别的数据放到一个文件中.我们知道最终的输出数据是来自于Reducer任务.那么,如果要得到多个文件,意味着有同样数量的Reducer任务在运行.Reducer任务的数据来自于Mapper任务,也就说Ma

MapReduce使用Partitioner分区案例

Mapper: import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Mapper.Context; pu

MapReduce框架Partitioner分区方法

前言:对于二次排序相信大家也是似懂非懂,我也是一样,对其中的很多方法都不理解诶,所有只有暂时放在一边,当你接触到其他的函数,你知道的越多时你对二次排序的理解也就更深入了,同时建议大家对wordcount的流程好好分析一下,要真正的知道每一步都是干什么的. 1.Partitioner分区类的作用是什么? 2.getPartition()三个参数分别是什么? 3.numReduceTasks指的是设置的Reducer任务数量,默认值是是多少? 扩展: 如果不同类型的数据被分配到了同一个分区,输出的数

MapReduce分区方法Partitioner方法

前言:对于二次排序相信大家也是似懂非懂,我也是一样,对其中的很多方法都不理解诶,所有只有暂时放在一边,当你接触到其他的函数,你知道的越多时你对二次排序的理解也就更深入了,同时建议大家对wordcount的流程好好分析一下,要真正的知道每一步都是干什么的. 1.Partitioner分区类的作用是什么? 2.getPartition()三个参数分别是什么? 3.numReduceTasks指的是设置的Reducer任务数量,默认值是是多少? 扩展: 如果不同类型的数据被分配到了同一个分区,输出的数

MapReduce的简单案例

字数统计: MapReduce过程: 写一个继承mapper的类,声明输入(基本固定)输出(看需求)类型 重写map(K,V,context),map方法会被调用多次,每次调用map方法读取split传过来的一行数据,需要将这一行数据切割(StringTokeizer类,默认看空格切割) While遍历,通过context输出 要书写一个程序主入口类,将程序打包发给JobTracker(移动计算而不是移动数据) 注意,这里因为是本地的程序,将程序打成xxx.jar包,放入namenode节点的服

Hadoop MapReduce编程入门案例

Hadoop入门例程简析中 (下面的程序下载地址:http://download.csdn.net/detail/zpcandzhj/7810829) 一.一些说明 (1)Hadoop新旧API的区别 新的API倾向于使用虚类(抽象类),而不是接口,因为这更容易扩展. 例如,可以无需修改类的实现而在虚类中添加一个方法(即用默认的实现). 在新的API中,mapper和reducer现在都是虚类. 新的API 放在org.apache.hadoop.mapreduce 包(和子包)中.之前版本的A

MapReduce之Partitioner组件

简述 Partitioner组件可以让Map对Key进行分区,从而可以根据不同的key来分发到不同的reduce中去处理: 你可以自定义key的一个分发规则,如数据文件包含不同的大学,而输出的要求是每个大学输出一个文件: Partitioner组件提供了一个默认的HashPartitioner. package org.apache.hadoop.mapreduce.lib.partition; public class HashPartitioner<K, V> extends Partit