Mapreduce的分区—Partitioner

1. 需求
将流量汇总统计结果按照手机归属地不同省份输出到不同文件中。
2. 分析
Mapreduce中会将map输出的kv对,按照相同key分组,然后分发给不同的reducetask。
默认的分发规则为:根据key的hashcode%reducetask数来分发
所以:如果要按照我们自己的需求进行分组,则需要改写数据分发(分组)组件Partitioner,自定义一个CustomPartitioner继承抽象类:Partitioner,然后在job对象中,设置自定义partitioner: job.setPartitionerClass(CustomPartitioner.class
3. 实现

public class ProvincePartitioner extends Partitioner<Text, FlowBean>

    public static HashMap<String, Integer>  provinceMap = new HashMap<String, Integer>();

    static{

        provinceMap.put("134", 0);

        provinceMap.put("135", 1);

        provinceMap.put("136", 2);

        provinceMap.put("137", 3);

        provinceMap.put("138", 4);

    }

    @Override

    public int getPartition(Text key, FlowBean value, int numPartitions) {

        Integer code = provinceMap.get(key.toString().substring(0, 3));

        if (code != null) {

            return code;

        }

        return 5;

    }

}
public class FlowSumProvince {

 public static class FlowSumProvinceMapper extends Mapper<LongWritable, Text, Text, FlowBean>{

     Text k = new Text();

     FlowBean  v = new FlowBean();

     @Override

     protected void map(LongWritable key, Text value,Context context)

             throws IOException, InterruptedException {

             //拿取一行文本转为String

             String line = value.toString();

             //按照分隔符\t进行分割

             String[] fileds = line.split("\t");

             //获取用户手机号

             String phoneNum = fileds[1];

             long upFlow = Long.parseLong(fileds[fileds.length-3]);

             long downFlow = Long.parseLong(fileds[fileds.length-2]);

             k.set(phoneNum);

             v.set(upFlow, downFlow);

             context.write(k,v);

        }

    }

    public static class FlowSumProvinceReducer extends Reducer<Text, FlowBean, Text, FlowBean>{

        FlowBean  v  = new FlowBean();

        @Override

        protected void reduce(Text key, Iterable<FlowBean> flowBeans,Context context) throws IOException, InterruptedException {

            long upFlowCount = 0;

            long downFlowCount = 0;

            for (FlowBean flowBean : flowBeans) {

                upFlowCount += flowBean.getUpFlow();

                downFlowCount += flowBean.getDownFlow();

            }

            v.set(upFlowCount, downFlowCount);

            context.write(key, v);

    }

    public static void main(String[] args) throws Exception{

        Configuration conf = new Configuration();

        conf.set("mapreduce.framework.name", "local");

        Job job = Job.getInstance(conf);

        //指定我这个 job 所在的 jar包位置

        job.setJarByClass(FlowSumProvince.class);

        //指定我们使用的Mapper是那个类  reducer是哪个类

        job.setMapperClass(FlowSumProvinceMapper.class);

        job.setReducerClass(FlowSumProvinceReducer.class);

//        job.setCombinerClass(FlowSumProvinceReducer.class);

        // 设置我们的业务逻辑 Mapper 类的输出 key 和 value 的数据类型

        job.setMapOutputKeyClass(Text.class);

        job.setMapOutputValueClass(FlowBean.class);

        // 设置我们的业务逻辑 Reducer 类的输出 key 和 value 的数据类型

        job.setOutputKeyClass(Text.class);

        job.setOutputValueClass(FlowBean.class);

        //这里设置运行reduceTask的个数

        //getPartition 返回的分区个数 = NumReduceTasks   正常执行

        //getPartition 返回的分区个数 > NumReduceTasks   报错:Illegal partition

        //getPartition 返回的分区个数 < NumReduceTasks   可以执行 ,多出空白文件

        job.setNumReduceTasks(10);

        //这里指定使用我们自定义的分区组件

        job.setPartitionerClass(ProvincePartitioner.class);

        FileInputFormat.setInputPaths(job, new Path("D:\\flowsum\\input"));

        // 指定处理完成之后的结果所保存的位置

        FileOutputFormat.setOutputPath(job, new Path("D:\\flowsum\\outputProvince"));

        boolean res = job.waitForCompletion(true);

        System.exit(res ? 0 : 1);

    }

 }

}

原文地址:http://blog.51cto.com/13587708/2174193

时间: 2024-11-08 20:57:57

Mapreduce的分区—Partitioner的相关文章

MapReduce的分区

第一部分 分区简述(比如国家由省市来划分) 分区:map的输出经过partitioner分区进行下一步的reducer.一个分区对应一个reducer,就会使得reducer并行化处理任务.默认为1 1. Partitioner是partitioner的基类,如果需要定制partitioner也需要继承该类. 2. HashPartitioner是mapreduce的默认partitioner.计算方法是 which reducer=(key.hashCode() & Integer.MAX_V

Hadoop自定义分区Partitioner

一:背景 为了使得MapReduce计算后的结果显示更加人性化,Hadoop提供了分区的功能,可以使得MapReduce计算结果输出到不同的分区中,方便查看.Hadoop提供的Partitioner组件可以让Map对Key进行分区,从而可以根据不同key来分发到不同的reduce中去处理,我们可以自定义key的分发规则,如数据文件包含不同的省份,而输出的要求是每个省份对应一个文件. 二:技术实现 自定义分区很简单,我们只需要继承抽象类Partitioner,实现自定义的getPartitione

hadoop MapReduce自定义分区Partition输出各运营商的手机号码

MapReduce和自定义Partition MobileDriver主类 package Partition; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; public class MobileDriver { public static void main(String[] args) { String[] paths = {"F:\\mobile.txt", "F

Hadoop学习之路(6)MapReduce自定义分区实现

MapReduce自带的分区器是HashPartitioner原理:先对map输出的key求hash值,再模上reduce task个数,根据结果,决定此输出kv对,被匹配的reduce任务取走.自定义分分区需要继承Partitioner,复写getpariton()方法自定义分区类:注意:map的输出是<K,V>键值对其中int partitionIndex = dict.get(text.toString()),partitionIndex是获取K的值 附:被计算的的文本 Dear Dea

Spark自定义分区(Partitioner)

我们都知道Spark内部提供了HashPartitioner和RangePartitioner两种分区策略,这两种分区策略在很多情况下都适合我们的场景.但是有些情况下,Spark内部不能符合咱们的需求,这时候我们就可以自定义分区策略.为此,Spark提供了相应的接口,我们只需要扩展Partitioner抽象类,然后实现里面的三个方法: 01 package org.apache.spark 02 03 /** 04 * An object that defines how the element

Hadoop Mapreduce 中的Partitioner

Partitioner的作用的对Mapper产生的中间结果进行分片,以便将同一分组的数据交给同一个Reduce处理,Partitioner直接影响Reduce阶段的负载均衡. MapReduce提供了两个Partitioner实现:HashPartitioner和TotalOederPartitioner. HashPartitioner是默认实现,实现了一种基于哈希值的分片方法,代码如下: public int getPartition(K2 key, V2 value, int numRed

MapReduce序列化及分区的java代码示例

概述 序列化(Serialization)是指把结构化对象转化为字节流. 反序列化(Deserialization)是序列化的逆过程.把字节流转为结构化对象. 当要在进程间传递对象或持久化对象的时候,就需要序列化对象成字节流,反之当要将接收到或从磁盘读取的字节流转换为对象,就要进行反序列化. Java 的序列化(Serializable)是一个重量级序列化框架,一个对象被序列化后,会附带很多额外的信息(各种校验信息,header,继承体系-),不便于在网络中高效传输:所以,hadoop 自己开发

2018-07-30期 MapReduce分区(Partitioner)编程案例

1.EmpSalaryBean 对象 package cn.sjq.mr.part; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.Writable; /** * 定义一个员工薪水的JavaBean,并实现MapReduce的Writable序列化接口 * @author songjq * */ public class Em

Hadoop之——MapReduce实战(二)

转载请注明出处:http://blog.csdn.net/l1028386804/article/details/45957715 MapReduce的老api写法 import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.FileOutputFormat; import org.apache.hadoop.mapred.Jo