hadoop的自定义分组实现 (Partition机制)

hadoop开发中我们会遇到类似这样的问题,比如 如何将不同省份的手机号分别输出到不同的文件中,本片文章将对hadoop内置的Partition类进行重写以解决这个问题。

  MapReduce的使用者通常会指定Reduce任务和Reduce任务输出文件的数量(R)。用户在中间key上使用分区函数来对数据进行分区,之后在输入到后续任务执行进程。Hadoop中自带了一个默认的分区类HashPartitioner,它继承了Partitioner类,提供了一个getPartition的方法,它的定义如下所示:

有些人死活不明白 key.hashCode() & Integer.MAX_VALUE) % numReduceTasks; 这段代码到底是怎么运算的,不要紧 ,我们main方法运行一下不久完了吗。

如果你想了解大数据的学习路线,想学习大数据知识以及需要免费的学习资料可以加群:784789432.欢迎你的加入。每天下午三点开直播分享基础知识,晚上20:00都会开直播给大家分享大数据项目实战。

[html] view plain copy

  1. public class Txt {
  2. /*
  3. * 将key均匀分布在ReduceTasks上,举例如果Key为Text的话,Text的hashcode方法跟String的基本一致,
  4. * 都是采用的Horner公式计算,得到一个int,string太大的话这个int值可能会溢出变成负数,
  5. * 所以与上Integer.MAX_VALUE(即0111111111111111),然后再对reduce个数取余,这样就可以让key均匀分布在reduce上。
  6. */
  7. public static void main(String[] args) {
  8. String key = "a,b,c,d,e,f,sdf,hth,iu,44,efwfqegergegew,h,ww,b,mm,lwefwefwfwefwefkj";
  9. String[] fields = key.split(",");
  10. int numReduceTasks = 4 ;
  11. for (int i = 0; i < 16; i++) {
  12. int j = ( fields[i].hashCode() & Integer.MAX_VALUE) % numReduceTasks;
  13. System.out.println("与结果:"+(fields[i].hashCode() & Integer.MAX_VALUE)+" --> key值: "+fields[i]+" 所在区间数 :"+j);
  14. }
  15. }
  16. }

结果 :

从结果我们简单明了的看出,通过这个算法的key具体是分布到那个区间,有几个区间,就是靠你的 reducetasks值决定的,如上图代码我们写死reducetasks数量为4,也就是 4个reduce ,  那么输出结果为 0 , 1, 2, 3 个数值, 既为 4个区间。各个key值也较为均匀的分布再来 0,1,2,3 这四个区间之间的任意一个。

1,自定义 partitioner , 这是一坨数据,我们将根据相同省份的手机号放到不同文件中,省份根据手机号前三位判断。

2, 继承重写Partitioner中的getPartition()方法,根据key不同值返回不同 int 值, 共4组。

public class AreaPartitioner<KEY, VALUE> extends Partitioner<KEY, VALUE>{
private static HashMap<String,Integer> map = new HashMap<>();
static{
map.put("135", 0);
map.put("136", 1);
map.put("137", 2);
map.put("150", 3);  //其余情况既返回4
}
//map数据分组机制 hash(key)%1 == 0 ,既只有一组,所有手机号都放到一个分组里面
//现在可 返回  0 1 2 3  ,既 总共4组
//main方法中控制 reduce 任务数
@Override
public int getPartition(KEY key, VALUE value, int numPartitions) {
//从key中拿到手机号,不同的省份返回不同的组号
int a  = map.get(key.toString().substring(0, 3))==null?4:map.get(key.toString().substring(0, 3));
return a;
}
    }

3,  在main方法中定义reduce的任务数量, 改数量大于等于 你的分组数 4 。

public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(FlowSumArea.class);
job.setMapperClass(FlowSumAreaMapper.class);
job.setReducerClass(FlowSumAreaReducer.class);
//设置我们自定义的分组逻辑定义
job.setPartitionerClass(AreaPartitioner.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
//设置reduce的任务并发数,应该跟分组的数量保持一致
job.setNumReduceTasks(4);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true)?0:1);

}

4  , 重新打jar包, 在虚拟机上运行下,如果看到结果产生了多个文件,既为成功。

5, 这里的流程原理就是 haoop中 mapreduce中间过程 , 叫做shuffle , 下图是我总结的流程图,可以参考看下 如果写的不对请指出,谢谢 。

原文地址:https://www.cnblogs.com/xuexiqun784789432/p/9263458.html

时间: 2024-08-03 10:14:08

hadoop的自定义分组实现 (Partition机制)的相关文章

Hadoop之——自定义分组比较器实现分组功能

转载请注明出处:http://blog.csdn.net/l1028386804/article/details/46287985 不多说,直接上代码,大家都懂得 1.Mapper类的实现 /** * Mapper类的实现 * @author liuyazhuang * */ static class MyMapper extends Mapper<LongWritable, Text, NewK2, LongWritable>{ protected void map(LongWritable

Hadoop mapreduce自定义分组RawComparator

本文发表于本人博客. 今天接着上次[Hadoop mapreduce自定义排序WritableComparable]文章写,按照顺序那么这次应该是讲解自定义分组如何实现,关于操作顺序在这里不多说了,需要了解的可以看看我在博客园的评论,现在开始. 首先我们查看下Job这个类,发现有setGroupingComparatorClass()这个方法,具体源码如下: /** * Define the comparator that controls which keys are grouped toge

Hadoop之--&gt;自定义分组 RawComparator

data: 3 33 23 22 22 11 1 --------------------- 需求: 1 12 23 3 当第一列相同时候要第二列的最小值 package group; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.net.URI; import org.apache.hadoop.conf.Configuration; import org

【Hadoop】Hadoop MR 自定义分组 Partition机制

1.概念 2.Hadoop默认分组机制--所有的Key分到一个组,一个Reduce任务处理 3.代码示例 FlowBean package com.ares.hadoop.mr.flowgroup; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.WritableComparable; public class FlowBean

一脸懵逼学习Hadoop中的MapReduce程序中自定义分组的实现

1:首先搞好实体类对象: write 是把每个对象序列化到输出流,readFields是把输入流字节反序列化,实现WritableComparable,Java值对象的比较:一般需要重写toString(),hashCode(),equals()方法 1 package com.areapartition; 2 3 import java.io.DataInput; 4 import java.io.DataOutput; 5 import java.io.IOException; 6 7 im

hadoop MapReduce自定义分区Partition输出各运营商的手机号码

MapReduce和自定义Partition MobileDriver主类 package Partition; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; public class MobileDriver { public static void main(String[] args) { String[] paths = {"F:\\mobile.txt", "F

MapReduce自定义分组Group

一:背景 在上一篇文章中我们可以对两列数据进行排序,即完成了当第一列相同时第二列数据升序排列的功能,现在我们需要进一步完善一个功能,那就是当第一列相同时求出第二列的最小值或最大值,Hadoop提供了自定义分组的功能,可以满足我们的需求. 二:技术实现 我们先来看看需求 #当第一列不相等时,第一列按升序排列,当第一列相等时,求出对应第二列的最小值 [java] view plain copy 3   3 3   2 3   1 2   2 2   1 1   1 输出结果应该是: [java] v

结合手机上网流量业务来说明Hadoop中的二次排序机制,分区机制

本篇博客将结合手机上网流量业务来详细介绍Hadoop的二次排序机制.分区机制,先介绍一下业务场景: 先介绍一下业务场景:统计每个用户的上行流量和,下行流量和,以及总流量和. 本次描述所用数据: 日志格式描述: 日志flowdata.txt中的具体数据: 首先我们先通过mapreduce程序实现上面的业务逻辑: 代码实现: package FlowSum; import java.io.DataInput; import java.io.DataOutput; import java.io.IOE

自定义分组

job.setGroupingComparatorClass(MyGroupingComparator.class); //按照第一列进行分组,然后找出每个分组中的第二列中的最小值 为什么要自定义分组? 业务要求分组是按照第一列分组,但是NewK2的比较规则决定了不能按照第一列分.只能自定义分组比较器. 1 package group; 2 3 import java.io.DataInput; 4 import java.io.DataOutput; 5 import java.io.IOEx