Hadoop---mapreduce排序和二次排序以及全排序

自己学习排序和二次排序的知识整理如下。

1.Hadoop的序列化格式介绍:Writable

2.Hadoop的key排序逻辑

3.全排序

4.如何自定义自己的Writable类型

5.如何实现二次排序


1.Hadoop的序列化格式介绍:Writable

要了解和编写MR实现排序必须要知道的第一个知识点就是Writable相关的接口和类,这些是HADOOP自己的序列化格式。更多的可能是要关注他的Subinterfaces:WritableComparable<T>。他是继承Writable和Comparable<T>接口,继而WritableComparable<T>的实现除了具有序列化特性,更重要的是具有了比较的特性,而比较的特性在MapReduce里是很重要的,因为MR中有个基于键的排序过程,所以可以作为键的类型必须具有Comparable<T>的特性。

除了WritableComparable接口外,还有一个接口RawComparaotor。

WritableComparable和RawComparator两个接口的区别是:

WritableComparable是需要把数据流反序列化为对象后,然后做对象之间的比较,而RawComparator是直接比较数据流的数据,不需要数据流反序列化成对象,省去了新建对象的开销。



2.Hadoop的key排序逻辑

Hadoop本身Key的数据类型的排序逻辑其实就是依赖于Hadoop本身的继承与WritableComparable<T>的基本数据类型和其他类型(相关类型可参考《Hadoop权威指南》第二版的90页)的compareTo方法的定义。

Key排序的规则:

1.如果调用jobconf的setOutputKeyComparatorClass()设置mapred.output.key.comparator.class

2.否则,使用key已经登记的comparator

3.否则,实现接口WritableComparable的compareTo()函数来操作

例如IntWritable的比较算法如下:

Java代码  

  1. public int compareTo(Object o) {
  2. int thisValue = this.value;
  3. int thatValue = ((IntWritable)o).value;
  4. return (thisValue<thatValue ? -1 : (thisValue==thatValue ? 0 : 1));
  5. }

可以修改compareTo来实现自己所需的比较算法。

虽然我们知道是compareTo这个方法实现Key的排序,但其实我们在使用Hadoop的基本数据类型时不需要关注这个排序如何实现,因为Hadoop的框架会自动调用compareTo这个方法实现key的排序。但是这个排序只是局限在map或者reduce内部。针对于map与map,reduce与reduce之间的排序compareTo就管不着了,虽然这种情况不常出现,但是确实存在这种问题的,而且确实有适用场景,比如说全排序。


3.全排序

这里就需要关注Partition这个阶段,Partition阶段是针对每个Reduce,需要创建一个分区,然后把Map的输出结果映射到特定的分区中。这个分区中可能会有N个Key对应的数据,但是一个Key的所有数据只能在一个分区中。在实现全排序的过程中,如果只有一个reduce,也就是只有一个Partition,那么所有Map的输出都会经过一个Partition到一个reduce里,在一个reduce里可以根据compareTo(也可以采用其他比较算法)来排序,实现全排序。但是这种情况就让MapReduce失去了分布式计算的光环。

所以全排序的大概思路为:确保Partition之间是有序的就OK了,即保证Partition1的最大值小于Partition2的最小值就OK了,即便这样做也还是有个问题:Partition的分布不均,可能导致某些Partition处理的数据量远大于其他Partition处理的数据量。而实现全排序的核心步骤为:取样和Partition。

先“取样”,保证Partition得更均匀:

1) 对Math.min(10, splits.length)个split(输入分片)进行随机取样,对每个split取10000个样,总共10万个样
2) 10万个样排序,根据reducer的数量(n),取出间隔平均的n-1个样
3) 将这个n-1个样写入partitionFile(_partition.lst,是一个SequenceFile),key是取的样,值是nullValue
4) 将partitionFile写入DistributedCache

整个全排序的详细介绍可参照:http://www.iteye.com/topic/709986



4.如何自定义自己的Writable类型

自定义自己的Writable类型的场景应该很简单:Hadoop自带的数据类型要么在功能上不能满足需求,要么在性能上满足需求,毕竟Hadoop还在发展,不是所有情况都考虑的,但是他提供了自主的框架实现我们想要的功能。

定义自己的Writable类型需要实现:

a.重载构造函数

b.实现set和get方法

c.实现接口的方法:write()、readFields()、compareTo()

d.(可选)相当于JAVA构造的对象,重写java.lang.Object的hashCode()、equals()、toString()。Partition阶段默认的hashpartitioner会根据hashCode()来选择分区,如果不要对自定义类型做key进行分区,hashCode()可不实现

具体例子可参考hadoop的基本类型IntWritable的实现

Java代码  

  1. public class IntWritable implements WritableComparable {
  2. private int value;
  3. public IntWritable() {}
  4. public IntWritable(int value) { set(value); }
  5. /** Set the value of this IntWritable. */
  6. public void set(int value) { this.value = value; }
  7. /** Return the value of this IntWritable. */
  8. public int get() { return value; }
  9. public void readFields(DataInput in) throws IOException {
  10. value = in.readInt();
  11. }
  12. public void write(DataOutput out) throws IOException {
  13. out.writeInt(value);
  14. }
  15. /** Returns true iff <code>o</code> is a IntWritable with the same value. */
  16. public boolean equals(Object o) {
  17. if (!(o instanceof IntWritable))
  18. return false;
  19. IntWritable other = (IntWritable)o;
  20. return this.value == other.value;
  21. }
  22. public int hashCode() {
  23. return value;
  24. }
  25. /** Compares two IntWritables. */
  26. public int compareTo(Object o) {
  27. int thisValue = this.value;
  28. int thatValue = ((IntWritable)o).value;
  29. return (thisValue<thatValue ? -1 : (thisValue==thatValue ? 0 : 1));
  30. }
  31. public String toString() {
  32. return Integer.toString(value);
  33. }
  34. }

5.如何实现二次排序

二次排序的工作原理涉及到如下几方面:

a.创建key的数据类型,key要包括两次排序的元素

b.setPartitionerClass(Class<? extends Partitioner> theClass)

hadoop0.20.0以后的函数为setPartitionerClass

c.setOutputKeyComparatorClass(Class<? extends RawComparator> theClass)

hadoop0.20.0以后的函数为setSortComparatorClass

d.setOutputValueGroupingComparator(Class<? extends RawComparator> theClass)

hadoop0.20.0以后的函数为setGroupingComparatorClass

根据hadoop自己提供的example:org.apache.hadoop.examplesSecondarySort来说明二次排序具体是如何实现的.

SecondarySort实现IntPair、FirstPartitioner、FirstGroupingComparator、MapClass、Reduce这几个内部类,然后在main函数中调用。先说明下main函数中有哪些地方和普通的MR代码不同。

不同点是多了这两个set:

job.setPartitionerClass(FirstPartitioner.class);

设置自定义的Partition操作,在此是调用我们自定义的内部类FirstPartitioner

job.setGroupingComparatorClass(FirstGroupingComparator.class);

设置哪些value进入哪些key的迭代器中,在此是调用自定义的内部类FirstGroupingComparator

具体的操作逻辑为:

a.定义一个作为key的类型IntPair,在IntPair中有两个变量first、second,SecondarySort就是在对first排序后再对second再排序处理

b.定义分区函数类FirstPartitioner,Key的第一次排序。在FirstPartitioner实现如何处理key的first,把key对应的数据划分到不同的分区中。这样key中first相同的value会被放在同一个reduce中,在reduce中再做第二次排序

c(代码没有实现,其实内部是有处理).key比较函数类,key的第二次排序,是继承WritableComparator的一个比较器。setSortComparatorClass可以实现。

为什么没有使用setSortComparatorClass()是因为hadoop对key排序的规则(参看2.Hadoop的key排序逻辑)决定的。由于我们在IntPair中已经定义了compareTo()函数。

d.定义分组函数类FirstGroupingComparator,保证只要key的的第一部分相同,value就进入key的value迭代器中

原文地址:https://www.cnblogs.com/stone-learning/p/9250509.html

时间: 2024-11-08 14:04:46

Hadoop---mapreduce排序和二次排序以及全排序的相关文章

Hadoop实现全排序

1.1TB(或1分钟)排序的冠军 作为分布式数据处理的框架,集群的数据处理能力究竟有多快?或许1TB排序可以作为衡量的标准之一. 1TB排序,就是对1TB(1024GB,大约100亿行数据)的数据进行排序.2008年,Hadoop赢得1TB排序基准评估第一名,排序1TB数据耗时209秒.后来,1TB排序被1分钟排序所取代,1分钟排序指的是在一分钟内尽可能多的排序.2009年,在一个1406个节点组成的hadoop集群,在59秒里对500GB完成了排序:而在1460个节点的集群,排序1TB数据只花

Hadoop Mapreduce 中的Partitioner

Partitioner的作用的对Mapper产生的中间结果进行分片,以便将同一分组的数据交给同一个Reduce处理,Partitioner直接影响Reduce阶段的负载均衡. MapReduce提供了两个Partitioner实现:HashPartitioner和TotalOederPartitioner. HashPartitioner是默认实现,实现了一种基于哈希值的分片方法,代码如下: public int getPartition(K2 key, V2 value, int numRed

五 数据组织模式 4)全排序、混排。

前面讲的 分区.分箱模式 都是不关心数据的顺序. 接下来 全排序.混排序模式 关心的是数据按照指定键进行并行排序. 全排序解释: 排序在顺序结构程序中容易实现, 但是在MapReduce 中,或者说在并行编程中不易实现.这是典型的 "分治法". 每个 reduce 将按照键对他的数据排序,但这种排序并不是全局意义上的排序. 这里想做的是全排序,记录是整个数据集按照顺序排列好的. 作用: 排序号的数据有很多有用的特性,比如时间排序可以提供一个基于时间轴的视图.在一个已排序好的数据集中查找

Hadoop MapReduce编程 API入门系列之二次排序

不多说,直接上代码. 2016-12-12 17:04:32,012 INFO [org.apache.hadoop.metrics.jvm.JvmMetrics] - Initializing JVM Metrics with processName=JobTracker, sessionId=2016-12-12 17:04:33,056 WARN [org.apache.hadoop.mapreduce.JobSubmitter] - Hadoop command-line option

MapReduce排序之 二次排序

一:背景 Hadoop中虽然有自动排序和分组,由于自带的排序是按照Key进行排序的,有些时候,我们希望同时对Key和Value进行排序.自带的排序功能就无法满足我们了,还好Hadoop提供了一些组件可以让开发人员进行二次排序. 二:技术实现 我们先来看案例需求 #需求1: 首先按照第一列数字升序排列,当第一列数字相同时,第二列数字也升序排列(列之间用制表符\t隔开) [java] view plain copy 3   3 3   2 3   1 2   2 2   1 1   1 MapRed

python 实现Hadoop的partitioner和二次排序

我们知道,一个典型的Map-Reduce过程包 括:Input->Map->Patition->Reduce->Output.Pation负责把Map任务输出的中间结果 按key分发给不同的Reduce任务进行处理.Hadoop 提供了一个非常实用的partitioner类KeyFieldBasedPartitioner,通过配置相应的参数就可以使用.通过 KeyFieldBasedPartitioner可以方便地实现二次排序. 使用方法:       -partitioner o

mapreduce排序【二次排序】

mr自带的例子中的源码SecondarySort,我重新写了一下,基本没变. 这个例子中定义的map和reduce如下,关键是它对输入输出类型的定义:(java泛型编程) public static class Map extends Mapper<LongWritable, Text, IntPair, IntWritable> public static class Reduce extends Reducer<IntPair, NullWritable, IntWritable,

Hadoop学习之自定义二次排序

一.概述    MapReduce框架对处理结果的输出会根据key值进行默认的排序,这个默认排序可以满足一部分需求,但是也是十分有限的.在我们实际的需求当中,往 往有要对reduce输出结果进行二次排序的需求.对于二次排序的实现,本文将通过一个实际的MapReduce二次排序例子讲述 二次排序的实现和其MapReduce的整个处理流程,并且通过结果和map.reduce端的日志来验证所描述的处理流程的正确性. 二.需求描述 1.输入数据: sort1    1 sort2    3 sort2 

MapReduce程序之二次排序与多次排序

[toc] MapReduce程序之二次排序与多次排序 需求 有下面的数据: cookieId time url 2 12:12:34 2_hao123 3 09:10:34 3_baidu 1 15:02:41 1_google 3 22:11:34 3_sougou 1 19:10:34 1_baidu 2 15:02:41 2_google 1 12:12:34 1_hao123 3 23:10:34 3_soso 2 05:02:41 2_google 假如我们现在的需求是先按 cook

实验六 MapReduce实验:二次排序

实验指导: 6.1 实验目的基于MapReduce思想,编写SecondarySort程序. 6.2 实验要求要能理解MapReduce编程思想,会编写MapReduce版本二次排序程序,然后将其执行并分析执行过程. 6.3 实验原理MR默认会对键进行排序,然而有的时候我们也有对值进行排序的需求.满足这种需求一是可以在reduce阶段排序收集过来的values,但是,如果有数量巨大的values可能就会导致内存溢出等问题,这就是二次排序应用的场景——将对值的排序也安排到MR计算过程之中,而不是单