MapReduce的分区

第一部分 分区简述(比如国家由省市来划分)

分区:map的输出经过partitioner分区进行下一步的reducer。一个分区对应一个reducer,就会使得reducer并行化处理任务。默认为1

1. Partitioner是partitioner的基类,如果需要定制partitioner也需要继承该类。

2. HashPartitioner是mapreduce的默认partitioner。计算方法是 which reducer=(key.hashCode() & Integer.MAX_VALUE) % numReduceTasks,得到当前的目的reducer。

/** Partition keys by their {@link Object#hashCode()}. */
@InterfaceAudience.Public
@InterfaceStability.Stable
public class HashPartitioner<K, V> extends Partitioner<K, V> {

  /** Use {@link Object#hashCode()} to partition. */
  public int getPartition(K key, V value,    //这里的key是指的是key2
                          int numReduceTasks) {
    return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;  //numReduceTasks为reduce任务数量    //这里返回值int指的是位置,并非实际意义的数字,如果numReduceTasks为1,则整个结果恒等于0    //也就是说自定义分区返回的是索引或标记
} }

第二部分 分区编程

    项目:1.观察数据,如下

      

2.不自定义分区的情况

import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
/如果不自定义分区,则默认使用的代码为
job.setPartitionerClass(HashPartitioner.class);

3.自定义分区情况

//自定义分区
        job.setPartitionerClass(MyPartitioner.class);
        job.setNumReduceTasks(2);//根据业务需要将手机和非手机用户区分需要做两个分区,对应两个reduce

MyPartition类

    //自定义分区代码
    private static class MyPartitioner extends Partitioner<Text, TrafficWritable>{
        //手机号根据位数判断
        @Override
        public int getPartition(Text key, TrafficWritable value,int numPartitions) {
            return key.toString().length()==11?0:1;
        }
    }

    实例代码:    

  1 package Mapreduce;
  2
  3 import java.io.DataInput;
  4 import java.io.DataOutput;
  5 import java.io.IOException;
  6
  7 import org.apache.hadoop.conf.Configuration;
  8 import org.apache.hadoop.fs.Path;
  9 import org.apache.hadoop.io.LongWritable;
 10 import org.apache.hadoop.io.Text;
 11 import org.apache.hadoop.io.Writable;
 12 import org.apache.hadoop.mapreduce.Job;
 13 import org.apache.hadoop.mapreduce.Mapper;
 14 import org.apache.hadoop.mapreduce.Partitioner;
 15 import org.apache.hadoop.mapreduce.Reducer;
 16 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 17 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 18 import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
 19
 20 public class MyPartitionerTest {
 21     public static void main(String[] args) throws Exception {
 22         Job job = Job.getInstance(new Configuration(), MyPartitionerTest.class.getSimpleName());
 23         job.setJarByClass(MyPartitionerTest.class);
 24         //1.自定义输入路径
 25         FileInputFormat.setInputPaths(job, new Path(args[0]));
 26         //2.自定义mapper
 27         //job.setInputFormatClass(TextInputFormat.class);
 28         job.setMapperClass(MyMapper.class);
 29         //job.setMapOutputKeyClass(Text.class);
 30         //job.setMapOutputValueClass(TrafficWritable.class);
 31
 32         //如果不自定义分区,则默认使用的代码为
 33         //job.setPartitionerClass(HashPartitioner.class);
 34         //自定义分区
 35         job.setPartitionerClass(MyPartitioner.class);
 36         job.setNumReduceTasks(2);//根据业务需要将手机和非手机用户区分需要做两个分区,对应两个reduce
 37
 38         //3.自定义reduce
 39         job.setReducerClass(MyReducer.class);
 40         job.setOutputKeyClass(Text.class);
 41         job.setOutputValueClass(TrafficWritable.class);
 42         //4.自定义输出路径
 43         FileOutputFormat.setOutputPath(job, new Path(args[1]));
 44         //job.setOutputFormatClass(TextOutputFormat.class);//对输出的数据格式化并写入磁盘
 45
 46         job.waitForCompletion(true);
 47
 48     }
 49     //自定义分区代码
 50     private static class MyPartitioner extends Partitioner<Text, TrafficWritable>{
 51         //手机号根据位数判断
 52         @Override
 53         public int getPartition(Text key, TrafficWritable value,int numPartitions) {
 54             return key.toString().length()==11?0:1;
 55         }
 56     }
 57
 58     private static class MyMapper extends Mapper<LongWritable, Text, Text, TrafficWritable>{
 59         Text k2 =new Text(); //k2为第二个字段,手机号码
 60         TrafficWritable v2 = new TrafficWritable();
 61         @Override
 62         protected void map(
 63                 LongWritable key,
 64                 Text value,
 65                 Mapper<LongWritable, Text, Text, TrafficWritable>.Context context)
 66                 throws IOException, InterruptedException {
 67             // TODO Auto-generated method stub
 68             String line = value.toString();
 69             String[] splited = line.split("\t");
 70             //手机号码,第二个字段为手机号
 71             k2.set(splited[1]);
 72             //流量,注:写代码的时候先写方法名在写方法的实现(测试驱动开发s)
 73             v2.set(splited[6],splited[7],splited[8],splited[9]);
 74             context.write(k2, v2);
 75         }
 76     }
 77     private static class MyReducer extends Reducer<Text, TrafficWritable, Text, TrafficWritable>{
 78         TrafficWritable v3 = new TrafficWritable();
 79         @Override
 80         protected void reduce(
 81                 Text k2, //表示手机号码
 82                 Iterable<TrafficWritable> v2s,  //相同手机号码流量之和
 83                 Reducer<Text, TrafficWritable, Text, TrafficWritable>.Context context)
 84                 throws IOException, InterruptedException {
 85             //迭代v2s,将里面的植相加即可
 86             long t1 =0L;
 87             long t2 =0L;
 88             long t3 =0L;
 89             long t4 =0L;
 90             for (TrafficWritable v2 : v2s) {
 91                 t1+=v2.t1;
 92                 t2+=v2.t2;
 93                 t3+=v2.t3;
 94                 t4+=v2.t4;
 95             }
 96             v3.set(t1, t2, t3, t4);
 97             context.write(k2, v3);//如果执行没有输出的话,可能reduce没有往外写,或mapper没有写,或源文件没有数据
 98         }
 99     }
100     //自定义类型
101     private static class TrafficWritable implements Writable{
102         public long t1;
103         public long t2;
104         public long t3;
105         public long t4;
106         public void write(DataOutput out) throws IOException {
107             out.writeLong(t1);
108             out.writeLong(t2);
109             out.writeLong(t3);
110             out.writeLong(t4);
111         }
112         //t1-4原来是TrafficWritable类型,在set中进行转换
113         public void set(long t1, long t2, long t3, long t4) {
114             // TODO Auto-generated method stub
115             this.t1=t1;
116             this.t2=t2;
117             this.t3=t3;
118             this.t4=t4;
119         }
120
121         public void set(String t1, String t2, String t3,String t4) {
122             // v2的set方法
123             this.t1=Long.parseLong(t1);
124             this.t2=Long.parseLong(t2);
125             this.t3=Long.parseLong(t3);
126             this.t4=Long.parseLong(t4);
127         }
128
129         public void readFields(DataInput in) throws IOException {
130             //顺序不可颠倒,和写出去的顺序需要一致
131             this.t1=in.readLong();
132             this.t2=in.readLong();
133             this.t3=in.readLong();
134             this.t4=in.readLong();
135         }
136         @Override
137         public String toString() {
138             return Long.toString(t1)+"\t"+Long.toString(t2)+"\t"+Long.toString(t3)+"\t"+Long.toString(t4);
139         }
140     }
141 }

MyPartitionerTest

打包并运行:

[[email protected] filecontent]# hadoop jar MyPartitionerTest.jar /data/HTTP_20130313143750.dat  /out4

[[email protected] filecontent]hadoop dfs -ls /out4

[[email protected] filecontent]# hadoop dfs -text /out4/part-r-00000

[[email protected] filecontent]# hadoop dfs -text /out4/part-r-00001

问题:如果分区数量大于reduce数量,如果分区数量小于educe数量?

实验:更改代码如下,让reduce数量作为参数传入程序中

job.setNumReduceTasks(Integer.parseInt(args[2]));

 (1)  一个reduce,两个分区的情况: 

[[email protected] filecontent]# hadoop jar MyPartitionerTest2.jar /data/HTTP_20130313143750.dat  /out5 1

[[email protected] filecontent]# hadoop dfs -ls /out5

衹有一個輸出:

[[email protected] filecontent]# hadoop dfs -text /out5/part-r-00000 ,从结果分析可得,这种情况没有区分两种数据,手机和非手机

总结:在hadoop2中reduce数量少于partitioner分区数量的时候,程序依然可以执行,但是结果有误。在hadoop1中会报错。

 (2)  3个reduce,两个分区的情况: 

[[email protected] filecontent]# hadoop jar MyPartitionerTest2.jar /data/HTTP_20130313143750.dat  /out6 3

[[email protected] filecontent]# hadoop dfs -ls /out6

多余的reduce是没有数据的,前面两个是正确的。

总结:对下图进行分析每一个mapper task都有三个分支,也就是三个任务,如果对每一个任务标号的话,编号为0 的将会分到一个区,编号

为1的分到同一个区,编号为2的分到同一个区。也就是说相同分区的都会给到同一个reduce任务进行处理。

时间: 2024-10-19 06:52:37

MapReduce的分区的相关文章

Mapreduce的分区—Partitioner

1. 需求将流量汇总统计结果按照手机归属地不同省份输出到不同文件中.2. 分析Mapreduce中会将map输出的kv对,按照相同key分组,然后分发给不同的reducetask.默认的分发规则为:根据key的hashcode%reducetask数来分发所以:如果要按照我们自己的需求进行分组,则需要改写数据分发(分组)组件Partitioner,自定义一个CustomPartitioner继承抽象类:Partitioner,然后在job对象中,设置自定义partitioner: job.set

hadoop MapReduce自定义分区Partition输出各运营商的手机号码

MapReduce和自定义Partition MobileDriver主类 package Partition; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; public class MobileDriver { public static void main(String[] args) { String[] paths = {"F:\\mobile.txt", "F

Hadoop学习之路(6)MapReduce自定义分区实现

MapReduce自带的分区器是HashPartitioner原理:先对map输出的key求hash值,再模上reduce task个数,根据结果,决定此输出kv对,被匹配的reduce任务取走.自定义分分区需要继承Partitioner,复写getpariton()方法自定义分区类:注意:map的输出是<K,V>键值对其中int partitionIndex = dict.get(text.toString()),partitionIndex是获取K的值 附:被计算的的文本 Dear Dea

MapReduce序列化及分区的java代码示例

概述 序列化(Serialization)是指把结构化对象转化为字节流. 反序列化(Deserialization)是序列化的逆过程.把字节流转为结构化对象. 当要在进程间传递对象或持久化对象的时候,就需要序列化对象成字节流,反之当要将接收到或从磁盘读取的字节流转换为对象,就要进行反序列化. Java 的序列化(Serializable)是一个重量级序列化框架,一个对象被序列化后,会附带很多额外的信息(各种校验信息,header,继承体系-),不便于在网络中高效传输:所以,hadoop 自己开发

大数据笔记(九)——Mapreduce的高级特性(B)

二.排序 对象排序 员工数据 Employee.java  ----> 作为key2输出 需求:按照部门和薪水升序排列 Employee.java package mr.object; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableCo

Hive中的分桶

对于每一个表(table)或者分区, Hive可以进一步组织成桶,也就是说桶是更为细粒度的数据范围划分.Hive也是针对某一列进行桶的组织.Hive采用对列值哈希,然后除以桶的个数求余的方式决定该条记录存放在哪个桶当中. 把表(或者分区)组织成桶(Bucket)有两个理由: (1)获得更高的查询处理效率.桶为表加上了额外的结构,Hive 在处理有些查询时能利用这个结构.具体而言,连接两个在(包含连接列的)相同列上划分了桶的表,可以使用 Map 端连接 (Map-side join)高效的实现.比

Hive_进阶

回顾: hive 优点 1. 类sql语句靠近关系型数据库,可自定义函数,增加了扩展性,易于开发,减少mapreduce学习成本 2. hive转换sql语句为mapreduce程序以mapreduce为底层实现 3. hive基于hadoop的hdfs,在hdfs上存储,因为hdfs的扩展性,hive的存储扩展性相应增加 hive 安装部署 1. 解压安装包 2. 进入conf目录,拷贝(备份)相应配置文件,修改 hive-env.sh --> HADOOP_HOME=/opt/cdh-5.6

零基础学习hadoop到上手工作线路指导(编程篇)

问题导读:1.hadoop编程需要哪些基础?2.hadoop编程需要注意哪些问题?3.如何创建mapreduce程序及其包含几部分?4.如何远程连接eclipse,可能会遇到什么问题?5.如何编译hadoop源码? 阅读此篇文章,需要些基础下面两篇文章零基础学习hadoop到上手工作线路指导(初级篇) 零基础学习hadoop到上手工作线路指导(中级篇)如果看过的话,看这篇不成问题,此篇讲hadoop编程篇. hadoop编程,hadoop是一个Java框架,同时也是编程的一次革命,使得传统开发运

零基础学习hadoop到上手工作线路指导

问题导读: 1.hadoop编程需要哪些基础? 2.hadoop编程需要注意哪些问题? 3.如何创建mapreduce程序及其包含几部分? 4.如何远程连接eclipse,可能会遇到什么问题? 5.如何编译hadoop源码? 阅读此篇文章,需要些基础下面两篇文章 零基础学习hadoop到上手工作线路指导(初级篇) http://www.aboutyun.com/thread-6780-1-1.html 零基础学习hadoop到上手工作线路指导(中级篇) http://www.aboutyun.c