MapReduce之Partitioner组件

简述

Partitioner组件可以让Map对Key进行分区,从而可以根据不同的key来分发到不同的reduce中去处理;

你可以自定义key的一个分发规则,如数据文件包含不同的大学,而输出的要求是每个大学输出一个文件;

Partitioner组件提供了一个默认的HashPartitioner

package org.apache.hadoop.mapreduce.lib.partition;
public class HashPartitioner<K, V> extends Partitioner<K, V> {

  /** Use {@link Object#hashCode()} to partition. */
  public int getPartition(K key, V value,
                          int numReduceTasks) {
    return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
  }
}

自定义Partitioner

1、继承抽象类Partitioner,实现自定义的getPartition()方法;

2、通过job.setPartitionerClass(...)来设置自定义的Partitioner;

Partitioner类

package org.apache.hadoop.mapreduce;
public abstract class Partitioner<KEY, VALUE> {

  /**
   * Get the partition number for a given key (hence record) given the total
   * number of partitions i.e. number of reduce-tasks for the job.
   *
   * <p>Typically a hash function on a all or a subset of the key.</p>
   *
   * @param key the key to be partioned.
   * @param value the entry value.
   * @param numPartitions the total number of partitions.
   * @return the partition number for the <code>key</code>.
   */
  public abstract int getPartition(KEY key, VALUE value, int numPartitions);

}

Partitioner应用场景及实例

需求:分别统计每种商品的周销售情况

address1的周销售清单(input1):

shoes 20

hat 10

stockings 30

clothes 40

address2的周销售清单(input2):

shoes 15

hat 1

stockings 90

clothes 80

汇总结果(output):

shoes 35

hat 11

stockings 120

clothes 120

package MyPartitioner;

import java.io.IOException;
import java.net.URI;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class MyPartitioner {
    private final static String INPUT_PATH = "hdfs://liguodong:8020/input";
    private final static String OUTPUT_PATH = "hdfs://liguodong:8020/output";

    public static class MyMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
    private Text word = new Text();
    private IntWritable one = new IntWritable();

    @Override
    protected void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {

            String[] str = value.toString().split("\\s+");

            word.set(str[0]);
            one.set(Integer.parseInt(str[1]));
            context.write(word, one);
        }
    }

    public static class MyReducer extends Reducer<Text, IntWritable,Text, IntWritable>{
        private IntWritable result = new IntWritable();

        @Override
        protected void reduce(Text key, Iterable<IntWritable> values,
                Context context)
                throws IOException, InterruptedException {

            int sum = 0;
            for (IntWritable val : values) {
                sum+=val.get();
            }
            result.set(sum);
            context.write(key,result);
        }
    }

    public static class DefPartitioner extends Partitioner<Text,IntWritable>{

        @Override
        public int getPartition(Text key, IntWritable value, int numPartitions) {
            if(key.toString().equals("shoes")){
                return 0;

            }else if(key.toString().equals("hat")){
                return 1;

            }else if(key.toString().equals("stockings")){
                return 2;

            }else{
                return 3;
            }
        }

    }

    public static void main(String[] args) throws Exception {
        //1、配置
        Configuration conf = new Configuration();
        final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH),conf);
        if(fileSystem.exists(new Path(OUTPUT_PATH)))
        {
            fileSystem.delete(new Path(OUTPUT_PATH),true);
        }
        Job job = Job.getInstance(conf, "define partitioner"); 

        //2、打包运行必须执行的方法
        job.setJarByClass(MyPartitioner.class);

        //3、输入路径
        FileInputFormat.addInputPath(job, new Path(INPUT_PATH));
        //4、Map
        job.setMapperClass(MyMapper.class);

        //5、Combiner
        //job.setCombinerClass(MyReducer.class);
        job.setPartitionerClass(DefPartitioner.class);

        //6、Reducer
        job.setReducerClass(MyReducer.class);
        job.setNumReduceTasks(4);//reduce个数默认是1

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        //7、 输出路径
        FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH));

        //8、提交作业
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}
[[email protected] file]# hdfs dfs -mkdir /input
上传文件
[[email protected] file]# hdfs dfs -put input1 /input/
[[email protected] file]# hdfs dfs -put input2 /input/
[[email protected] file]# hdfs dfs -ls  /input/
Found 2 items
-rw-r--r--   1 root supergroup         52 2015-06-14 10:22 /input/input1
-rw-r--r--   1 root supergroup         50 2015-06-14 10:22 /input/input2

打成jar包,然后执行。
[[email protected] file]# jar tf partitioner.jar
META-INF/MANIFEST.MF
MyPartitioner/MyPartitioner$DefPartitioner.class
MyPartitioner/MyPartitioner$MyMapper.class
MyPartitioner/MyPartitioner$MyReducer.class
MyPartitioner/MyPartitioner.class

[[email protected] file]# yarn jar partitioner.jar

输出结果
[[email protected] file]# hdfs dfs -ls /output/
Found 5 items
-rw-r--r--   1 root supergroup          0 2015-06-14 11:08 /output/_SUCCESS
-rw-r--r--   1 root supergroup          9 2015-06-14 11:08 /output/part-r-00000
-rw-r--r--   1 root supergroup          7 2015-06-14 11:08 /output/part-r-00001
-rw-r--r--   1 root supergroup          0 2015-06-14 11:08 /output/part-r-00002
-rw-r--r--   1 root supergroup         26 2015-06-14 11:08 /output/part-r-00003
[[email protected] file]# hdfs dfs -cat /output/part-r-00000
shoes   35
[[email protected] file]# hdfs dfs -cat /output/part-r-00001
hat     11
[[email protected] file]# hdfs dfs -cat /output/part-r-00002
stockings       120
[[email protected] file]# hdfs dfs -cat /output/part-r-00003
clothes 120
时间: 2024-08-28 18:31:03

MapReduce之Partitioner组件的相关文章

MapReduce教程(二)MapReduce框架Partitioner分区&lt;转&gt;

1 Partitioner分区 1.1 Partitioner分区描述 在进行MapReduce计算时,有时候需要把最终的输出数据分到不同的文件中,按照手机号码段划分的话,需要把同一手机号码段的数据放到一个文件中:按照省份划分的话,需要把同一省份的数据放到一个文件中:按照性别划分的话,需要把同一性别的数据放到一个文件中.我们知道最终的输出数据是来自于Reducer任务.那么,如果要得到多个文件,意味着有同样数量的Reducer任务在运行.Reducer任务的数据来自于Mapper任务,也就说Ma

MapReduce框架Partitioner分区方法

前言:对于二次排序相信大家也是似懂非懂,我也是一样,对其中的很多方法都不理解诶,所有只有暂时放在一边,当你接触到其他的函数,你知道的越多时你对二次排序的理解也就更深入了,同时建议大家对wordcount的流程好好分析一下,要真正的知道每一步都是干什么的. 1.Partitioner分区类的作用是什么? 2.getPartition()三个参数分别是什么? 3.numReduceTasks指的是设置的Reducer任务数量,默认值是是多少? 扩展: 如果不同类型的数据被分配到了同一个分区,输出的数

MapReduce之RecordReader组件源码解析及实例

简述 无论我们以怎样的方式从分片中读取一条记录,每读取一条记录都会调用RecordReader类: 系统默认的RecordReader是LineRecordReader,TextInputFormat: LineRecordReader是用每行的偏移量作为map的key,每行的内容作为map的value: 而SequenceFileInputFormat的RecordReader是SequenceFileRecordReader: 应用场景:自定义读取每一条记录的方式:自定义读入key的类型,如

MapReduce分区方法Partitioner方法

前言:对于二次排序相信大家也是似懂非懂,我也是一样,对其中的很多方法都不理解诶,所有只有暂时放在一边,当你接触到其他的函数,你知道的越多时你对二次排序的理解也就更深入了,同时建议大家对wordcount的流程好好分析一下,要真正的知道每一步都是干什么的. 1.Partitioner分区类的作用是什么? 2.getPartition()三个参数分别是什么? 3.numReduceTasks指的是设置的Reducer任务数量,默认值是是多少? 扩展: 如果不同类型的数据被分配到了同一个分区,输出的数

MapReduce使用Partitioner分区案例

Mapper: import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Mapper.Context; pu

MapReduce的Partitioner案例

项目简介 这里给出一个经典的词频统计的案例:统计如下样本数据中每个单词出现的次数. SparkHBase HiveFlinkStormHadoopHBaseSpark Flink HBaseStorm HBaseHadoopHiveFlink HBaseFlinkHiveStorm HiveFlinkHadoop HBaseHive HadoopSparkHBaseStorm HBaseHadoopHiveFlink HBaseFlinkHiveStorm HiveFlinkHadoop HBa

MapReduce之Combiner组件

简述 Combiner的作用是把一个map产生的多个<KEY,VALUE>合并成一个新的<KEY,VALUE>,然后再将新<KEY,VALUE>的作为reduce的输入: 在map函数与reduce函数之间多了一个combine函数,目的是为了减少map输出的中间结果,这样减少了reduce复制map输出的数据,减少网络传输负载: 并不是所有情况下都能使用Combiner,Combiner适用于对记录汇总的场景(如求和),但是,求平均数的场景就不能使用Combiner了

MapReduce(3): Partitioner, Combiner and Shuffling

Partitioner: Partitioning and Combining take place between Map and Reduce phases. It is to club the data which should go to the same reducer based on keys. The number of partitioners is equal to the number of reducers. That means a partitioner will d

Hadoop自定义分区Partitioner

一:背景 为了使得MapReduce计算后的结果显示更加人性化,Hadoop提供了分区的功能,可以使得MapReduce计算结果输出到不同的分区中,方便查看.Hadoop提供的Partitioner组件可以让Map对Key进行分区,从而可以根据不同key来分发到不同的reduce中去处理,我们可以自定义key的分发规则,如数据文件包含不同的省份,而输出的要求是每个省份对应一个文件. 二:技术实现 自定义分区很简单,我们只需要继承抽象类Partitioner,实现自定义的getPartitione