MapReduce排序之 二次排序

一:背景

Hadoop中虽然有自动排序和分组,由于自带的排序是按照Key进行排序的,有些时候,我们希望同时对Key和Value进行排序。自带的排序功能就无法满足我们了,还好Hadoop提供了一些组件可以让开发人员进行二次排序。

二:技术实现

我们先来看案例需求

#需求1: 首先按照第一列数字升序排列,当第一列数字相同时,第二列数字也升序排列(列之间用制表符\t隔开)

[java] view plain copy

  1. 3   3
  2. 3   2
  3. 3   1
  4. 2   2
  5. 2   1
  6. 1   1

MapReduce计算之后的结果应该是:

[java] view plain copy

  1. 1   1
  2. 2   1
  3. 2   2
  4. 3   1
  5. 3   2
  6. 3   3

#需求2:第一列不相等时,第一列按降序排列,当第一列相等时,第二列按升序排列

[java] view plain copy

  1. 3   3
  2. 3   2
  3. 3   1
  4. 2   2
  5. 2   1
  6. 1   1

MapReduce计算之后的结果应该是:

[java] view plain copy

  1. 3   1
  2. 3   2
  3. 3   3
  4. 2   1
  5. 2   2
  6. 1   1

下面是实现代码,实现两种需求的关键是compareTo()方法的实现不同:

[java] view plain copy

    1. public class SecondSortTest {
    2. // 定义输入路径
    3. private static final String INPUT_PATH = "hdfs://liaozhongmin:9000/data";
    4. // 定义输出路径
    5. private static final String OUT_PATH = "hdfs://liaozhongmin:9000/out";
    6. public static void main(String[] args) {
    7. try {
    8. // 创建配置信息
    9. Configuration conf = new Configuration();
    10. /**********************************************/
    11. //对Map端输出进行压缩
    12. //conf.setBoolean("mapred.compress.map.output", true);
    13. //设置map端输出使用的压缩类
    14. //conf.setClass("mapred.map.output.compression.codec", GzipCodec.class, CompressionCodec.class);
    15. //对reduce端输出进行压缩
    16. //conf.setBoolean("mapred.output.compress", true);
    17. //设置reduce端输出使用的压缩类
    18. //conf.setClass("mapred.output.compression.codec", GzipCodec.class, CompressionCodec.class);
    19. // 添加配置文件(我们可以在编程的时候动态配置信息,而不需要手动去改变集群)
    20. /*
    21. * conf.addResource("classpath://hadoop/core-site.xml");
    22. * conf.addResource("classpath://hadoop/hdfs-site.xml");
    23. * conf.addResource("classpath://hadoop/hdfs-site.xml");
    24. */
    25. // 创建文件系统
    26. FileSystem fileSystem = FileSystem.get(new URI(OUT_PATH), conf);
    27. // 如果输出目录存在,我们就删除
    28. if (fileSystem.exists(new Path(OUT_PATH))) {
    29. fileSystem.delete(new Path(OUT_PATH), true);
    30. }
    31. // 创建任务
    32. Job job = new Job(conf, SecondSortTest.class.getName());
    33. //1.1   设置输入目录和设置输入数据格式化的类
    34. FileInputFormat.setInputPaths(job, INPUT_PATH);
    35. job.setInputFormatClass(TextInputFormat.class);
    36. //1.2   设置自定义Mapper类和设置map函数输出数据的key和value的类型
    37. job.setMapperClass(MySecondSortMapper.class);
    38. job.setMapOutputKeyClass(CombineKey.class);
    39. job.setMapOutputValueClass(LongWritable.class);
    40. //1.3   设置分区和reduce数量(reduce的数量,和分区的数量对应,因为分区为一个,所以reduce的数量也是一个)
    41. job.setPartitionerClass(HashPartitioner.class);
    42. job.setNumReduceTasks(1);
    43. //1.4   排序、分组
    44. //1.5   归约
    45. //2.1   Shuffle把数据从Map端拷贝到Reduce端。
    46. //2.2   指定Reducer类和输出key和value的类型
    47. job.setReducerClass(MySecondSortReducer.class);
    48. job.setOutputKeyClass(LongWritable.class);
    49. job.setOutputValueClass(LongWritable.class);
    50. //2.3   指定输出的路径和设置输出的格式化类
    51. FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));
    52. job.setOutputFormatClass(TextOutputFormat.class);
    53. // 提交作业 退出
    54. System.exit(job.waitForCompletion(true) ? 0 : 1);
    55. } catch (Exception e) {
    56. e.printStackTrace();
    57. }
    58. }
    59. public static class MySecondSortMapper extends Mapper<LongWritable, Text, CombineKey, LongWritable>{
    60. //定义联合的key
    61. private CombineKey combineKey = new CombineKey();
    62. protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, CombineKey, LongWritable>.Context context) throws IOException,
    63. InterruptedException {
    64. //对输入的value进行切分
    65. String[] splits = value.toString().split("\t");
    66. //设置联合的key
    67. combineKey.setComKey(Long.parseLong(splits[0]));
    68. combineKey.setComVal(Long.parseLong(splits[1]));
    69. //通过context写出去
    70. context.write(combineKey, new LongWritable(Long.parseLong(splits[1])));
    71. }
    72. }
    73. public static class MySecondSortReducer extends Reducer<CombineKey, LongWritable, LongWritable, LongWritable>{
    74. @Override
    75. protected void reduce(CombineKey combineKey, Iterable<LongWritable> values, Reducer<CombineKey, LongWritable, LongWritable, LongWritable>.Context context)
    76. throws IOException, InterruptedException {
    77. //因为输入的CombineKey已经排好序了,所有我们只要获取其中的两个成员变量写出去就可以了。values在这个例子中没有什么作用
    78. context.write(new LongWritable(combineKey.getComKey()), new LongWritable(combineKey.getComVal()));
    79. }
    80. }
    81. }
    82. /**
    83. * 重新组合成一个key,实现二次排序
    84. * @author 廖*民
    85. * time : 2015年1月18日下午7:27:52
    86. * @version
    87. */
    88. class CombineKey implements WritableComparable<CombineKey>{
    89. public long comKey;
    90. public long comVal;
    91. //必须提供无参构造函数,否则hadoop反射机制会出错
    92. public CombineKey() {
    93. }
    94. //有参构造函数
    95. public CombineKey(long comKey, long comVal) {
    96. this.comKey = comKey;
    97. this.comVal = comVal;
    98. }
    99. public long getComKey() {
    100. return comKey;
    101. }
    102. public void setComKey(long comKey) {
    103. this.comKey = comKey;
    104. }
    105. public long getComVal() {
    106. return comVal;
    107. }
    108. public void setComVal(long comVal) {
    109. this.comVal = comVal;
    110. }
    111. public void write(DataOutput out) throws IOException {
    112. out.writeLong(comKey);
    113. out.writeLong(comVal);
    114. }
    115. public void readFields(DataInput in) throws IOException {
    116. this.comKey = in.readLong();
    117. this.comVal = in.readLong();
    118. }
    119. /**
    120. * 这个方法一定要实现
    121. * java里面排序默认是小的放在前面,即返回负数的放在前面,这样就是所谓的升序排列
    122. * 我们在下面的方法中直接返回一个差值,也就相当于会升序排列。
    123. * 如果我们要实现降序排列,那么我们就可以返回一个正数
    124. */
    125. /*public int compareTo(CombineKey o) {
    126. //第一列不相同时按升序排列,当第一列相同时第二列按升序排列
    127. long minus = this.comKey - o.comKey;
    128. //如果第一个值不相等时,我们就先对第一列进行排序
    129. if (minus != 0){
    130. return (int) minus;
    131. }
    132. //如果第一列相等时,我们就对第二列进行排序
    133. return (int) (this.comVal - o.comVal);
    134. }*/
    135. /**
    136. * 为了实现第一列不同时按降序排序,第一列相同时第二列按升序排列
    137. * 第一列:降序,当第一列相同时,第二列:升序
    138. * 为了实现降序,
    139. */
    140. public int compareTo(CombineKey o) {
    141. //如果a-b<0即,a小于b,按这样 的思路应该是升序排列,我们可以返回一个相反数使其降序
    142. long tmp = this.comKey - o.comKey;
    143. //如果第一个值不相等时,我们就先对第一列进行排序
    144. if (tmp != 0){
    145. return (int) (-tmp);
    146. }
    147. //如果第一列相等时,我们就对第二列进行升序排列
    148. return (int) (this.comVal - o.comVal);
    149. }
    150. @Override
    151. public int hashCode() {
    152. final int prime = 31;
    153. int result = 1;
    154. result = prime * result + (int) (comKey ^ (comKey >>> 32));
    155. return result;
    156. }
    157. @Override
    158. public boolean equals(Object obj) {
    159. if (this == obj)
    160. return true;
    161. if (obj == null)
    162. return false;
    163. if (getClass() != obj.getClass())
    164. return false;
    165. CombineKey other = (CombineKey) obj;
    166. if (comKey != other.comKey)
    167. return false;
    168. return true;
    169. }
    170. }
时间: 2024-12-04 21:53:22

MapReduce排序之 二次排序的相关文章

mapreduce排序【二次排序】

mr自带的例子中的源码SecondarySort,我重新写了一下,基本没变. 这个例子中定义的map和reduce如下,关键是它对输入输出类型的定义:(java泛型编程) public static class Map extends Mapper<LongWritable, Text, IntPair, IntWritable> public static class Reduce extends Reducer<IntPair, NullWritable, IntWritable,

MapReduce程序之二次排序与多次排序

[toc] MapReduce程序之二次排序与多次排序 需求 有下面的数据: cookieId time url 2 12:12:34 2_hao123 3 09:10:34 3_baidu 1 15:02:41 1_google 3 22:11:34 3_sougou 1 19:10:34 1_baidu 2 15:02:41 2_google 1 12:12:34 1_hao123 3 23:10:34 3_soso 2 05:02:41 2_google 假如我们现在的需求是先按 cook

Hadoop---mapreduce排序和二次排序以及全排序

自己学习排序和二次排序的知识整理如下. 1.Hadoop的序列化格式介绍:Writable 2.Hadoop的key排序逻辑 3.全排序 4.如何自定义自己的Writable类型 5.如何实现二次排序 1.Hadoop的序列化格式介绍:Writable 要了解和编写MR实现排序必须要知道的第一个知识点就是Writable相关的接口和类,这些是HADOOP自己的序列化格式.更多的可能是要关注他的Subinterfaces:WritableComparable<T>.他是继承Writable和Co

实验六 MapReduce实验:二次排序

实验指导: 6.1 实验目的基于MapReduce思想,编写SecondarySort程序. 6.2 实验要求要能理解MapReduce编程思想,会编写MapReduce版本二次排序程序,然后将其执行并分析执行过程. 6.3 实验原理MR默认会对键进行排序,然而有的时候我们也有对值进行排序的需求.满足这种需求一是可以在reduce阶段排序收集过来的values,但是,如果有数量巨大的values可能就会导致内存溢出等问题,这就是二次排序应用的场景——将对值的排序也安排到MR计算过程之中,而不是单

排序(二) 外部排序

一 定义 外部排序指的是大文件的排序,即待排序的记录存储在外存储器上,待排序的文件无法一次装入内存,需要在内存和外部存储器之间进行多次数据交换,以达到排序整个文件的目的. 二 处理过程 (1)按可用内存的大小,把外存上含有n个记录的文件分成若干个长度为L的子文件,把这些子文件依次读入内存,并利用有效的内部排序方法对它们进行排序,再将排序后得到的有序子文件(又称归并段)重新写入外存: (2)对这些有序子文件逐趟归并,使其逐渐由小到大,直至得到整个有序文件为止. 先从一个例子来看外排序中的归并是如何

[排序算法二]选择排序

选择排序(Selection sort)是一种简单直观的排序算法.它的工作原理是:第一次从待排序的数据元素中选出最小(或最大)的一个元素,存放在序列的起始位置,然后再从剩余的未排序元素中寻找到最小(大)元素,然后放到已排序的序列的末尾.以此类推,直到全部待排序的数据元素的个数为零.选择排序是不稳定的排序方法. 算法性能 时间复杂度:O(n^2),总循环次数 n(n-1)/2.数据交换次数 O(n),这点上来说比冒泡排序要好,因为冒泡是把数据一位一位的移上来,而选择排序只需要在子循环结束后移动一次

排序算法(二)选择排序---堆排序

概念:利用树结构进行排序. 分类:1.大顶堆: 每个小树的根节点都大于子节点   升序排序使用大顶堆 2.小顶堆:每个小树的子节点都大于根节点 降序排序使用小顶堆 1 public class HeapSort { 2 3 public static void main(String[] args){ 4 int[] arr=new int[]{9,6,7,0,1,10,4,2}; 5 System.out.println(Arrays.toString(arr)); 6 heapSort(ar

Hadoop二次排序及MapReduce处理流程实例详解

一.概述 MapReduce框架对处理结果的输出会根据key值进行默认的排序,这个默认排序可以满足一部分需求,但是也是十分有限的,在我们实际的需求当中,往往有要对reduce输出结果进行二次排序的需求.对于二次排序的实现,网络上已经有很多人分享过了,但是对二次排序的实现原理及整个MapReduce框架的处理流程的分析还是有非常大的出入,而且部分分析是没有经过验证的.本文将通过一个实际的MapReduce二次排序的例子,讲述二次排序的实现和其MapReduce的整个处理流程,并且通过结果和Map.

hadoop编程小技巧(9)---二次排序(值排序)

代码测试环境:Hadoop2.4 应用场景:在Reducer端一般是key排序,而没有value排序,如果想对value进行排序,则可以使用此技巧. 应用实例描述: 比如针对下面的数据: a,5 b,7 c,2 c,9 a,3 a,1 b,10 b,3 c,1 如果使用一般的MR的话,其输出可能是这样的: a 1 a 3 a 5 b 3 b 10 b 7 c 1 c 9 c 2 从数据中可以看到其键是排序的,但是其值不是.通过此篇介绍的技巧可以做到下面的输出: a 1 a 3 a 5 b 3 b