mapreduce排序【二次排序】

mr自带的例子中的源码SecondarySort,我重新写了一下,基本没变。

这个例子中定义的map和reduce如下,关键是它对输入输出类型的定义:(java泛型编程)

public static class Map extends Mapper<LongWritable, Text, IntPair, IntWritable>
public static class Reduce extends Reducer<IntPair, NullWritable, IntWritable, IntWritable>

1 首先说一下工作原理:

1)MAP输入。在map阶段,使用job.setInputFormatClass定义的InputFormat将输入的数据集分割成小数据块splites,同时InputFormat提供一个RecordReder的实现。本例子中使用的是TextInputFormat,他提供的RecordReder会将文本的一行的行号作为key,这一行的文本作为value。这就是自定义Map的输入是<LongWritable, Text>的原因。

2)MAP输出。然后调用自定义Map的map方法,将一个个<LongWritable, Text>对输入给Map的map方法。注意输出应该符合自定义Map中定义的输出<IntPair, IntWritable>。最终是生成一个List<IntPair, IntWritable>。

3)分区。在map阶段的最后,会先调用job.setPartitionerClass对这个List进行分区,每个分区映射到一个reducer。默认情况下,相同的KEY会分到相同的一个分区,且只能分到一个分区中。

4)分区中KEY排序。每个分区中KEY一定是有序的。每个分区内又调用job.setSortComparatorClass设置的key比较函数类排序。如果没有通过job.setSortComparatorClass设置key比较函数类,则使用key的实现的compareTo方法。在第一个例子中,使用了IntPair实现的compareTo方法,而在下一个例子中,专门定义了key比较函数类。在reduce阶段,reducer接收到所有映射到这个reducer的map输出后,也是会调用job.setSortComparatorClass设置的key比较函数类对所有数据对排序。

5)分组。分区后开始构造一个key对应的value迭代器,这时就要用到分组。默认分组,相同的KEY会分到一个组,使用jobjob.setGroupingComparatorClass设置的分组函数类。只要这个比较器比较的两个key相同,他们就属于同一个组,它们的value放在一个value迭代器,而这个迭代器的key使用属于同一个组的所有key的第一个key。最后就是进入Reducer的reduce方法,reduce方法的输入是所有的(key和它的value迭代器)。同样注意输入与输出的类型必须与自定义的Reducer中声明的一致。

2 二次排序就是首先按照第一字段排序,然后再对第一字段相同的行按照第二字段排序,注意不能破坏第一次排序的结果 。例如

输入文件
20 21
50 51
50 52
50 53
50 54
60 51
60 53
60 52
60 56
60 57
70 58
60 61
70 54
70 55
70 56
70 57
70 58
1 2
3 4
5 6
7 82
203 21
50 512
50 522
50 53
530 54
40 511
20 53
20 522
60 56
60 57
740 58
63 61
730 54
71 55
71 56
73 57
74 58
12 211
31 42
50 62
7 8
输出:(注意需要分割线)
------------------------------------------------
1 2
------------------------------------------------
3 4
------------------------------------------------
5 6
------------------------------------------------
7 8
7 82
------------------------------------------------
12 211
------------------------------------------------
20 21
20 53
20 522
------------------------------------------------
31 42
------------------------------------------------
40 511
------------------------------------------------
50 51
50 52
50 53
50 53
50 54
50 62
50 512
50 522
------------------------------------------------
60 51
60 52
60 53
60 56
60 56
60 57
60 57
60 61
------------------------------------------------
63 61
------------------------------------------------
70 54
70 55
70 56
70 57
70 58
70 58
------------------------------------------------
71 55
71 56
------------------------------------------------
73 57
------------------------------------------------
74 58
------------------------------------------------
203 21
------------------------------------------------
530 54
------------------------------------------------
730 54
------------------------------------------------
740 58

3 具体步骤:
(1)自定义key

在mr中,所有的key是需要被比较和排序的,并且是二次,先根据partitione,再根据大小。而本例中也是要比较两次。先按照第一字段排序,然后再对第一字段相同的按照第二字段排序。根据这一点,我们可以构造一个复合类IntPair,他有两个字段,先利用分区对第一字段排序,再利用分区内的比较对第二字段排序。
所有自定义的key应该实现接口WritableComparable,因为是可序列的并且可比较的。并重载方法:

//反序列化,从流中的二进制转换成IntPair
public void readFields(DataInput in) throws IOException
//序列化,将IntPair转化成使用流传送的二进制
public void write(DataOutput out)
//key的比较
public int compareTo(IntPair o)
//另外新定义的类应该重写的两个方法
//The hashCode() method is used by the HashPartitioner (the default partitioner in MapReduce)
public int hashCode()
public boolean equals(Object right)

(2)由于key是自定义的,所以还需要自定义一下类:
(2.1)分区函数类。这是key的第一次比较。

 public static class FirstPartitioner extends Partitioner<IntPair,IntWritable>

在job中使用setPartitionerClasss设置Partitioner。
(2.2)key比较函数类。这是key的第二次比较。这是一个比较器,需要继承WritableComparator。

public static class KeyComparator extends WritableComparator

必须有一个构造函数,并且重载 public int compare(WritableComparable w1, WritableComparable w2)

另一种方法是 实现接口RawComparator。
在job中使用setSortComparatorClass设置key比较函数类。
(2.3)分组函数类。在reduce阶段,构造一个key对应的value迭代器的时候,只要first相同就属于同一个组,放在一个value迭代器。这是一个比较器,需要继承WritableComparator。

public static class GroupingComparator extends WritableComparator

分组函数类也必须有一个构造函数,并且重载 public int compare(WritableComparable w1, WritableComparable w2)
分组函数类的另一种方法是实现接口RawComparator。
在job中使用setGroupingComparatorClass设置分组函数类。

另外注意的是,如果reduce的输入与输出不是同一种类型,则不要定义Combiner也使用reduce,因为Combiner的输出是reduce的输入。除非重新定义一个Combiner。

4 代码。

这个例子中没有使用key比较函数类,而是使用key的实现的compareTo方法。

package secondarySort;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class SecondarySort {
    //自己定义的key类应该实现WritableComparable接口
    public static class IntPair implements WritableComparable<IntPair> {
        int first;
        int second;
        /**
         * Set the left and right values.
         */
        public void set(int left, int right) {
            first = left;
            second = right;
        }
        public int getFirst() {
            return first;
        }
        public int getSecond() {
            return second;
        }
        @Override
        //反序列化,从流中的二进制转换成IntPair
        public void readFields(DataInput in) throws IOException {
            // TODO Auto-generated method stub
            first = in.readInt();
            second = in.readInt();
        }
        @Override
        //序列化,将IntPair转化成使用流传送的二进制
        public void write(DataOutput out) throws IOException {
            // TODO Auto-generated method stub
            out.writeInt(first);
            out.writeInt(second);
        }
        @Override
        //key的比较
        public int compareTo(IntPair o) {
            // TODO Auto-generated method stub
            if (first != o.first) {
                return first < o.first ? -1 : 1;
            } else if (second != o.second) {
                return second < o.second ? -1 : 1;
            } else {
                return 0;
            }
        }

        //新定义类应该重写的两个方法
        @Override
        //The hashCode() method is used by the HashPartitioner (the default partitioner in MapReduce)
        public int hashCode() {
            return first * 157 + second;
        }
        @Override
        public boolean equals(Object right) {
            if (right == null)
                return false;
            if (this == right)
                return true;
            if (right instanceof IntPair) {
                IntPair r = (IntPair) right;
                return r.first == first && r.second == second;
            } else {
                return false;
            }
        }
    }
     /**
       * 分区函数类。根据first确定Partition。
       */
      public static class FirstPartitioner extends Partitioner<IntPair,IntWritable>{
        @Override
        public int getPartition(IntPair key, IntWritable value,
                                int numPartitions) {
          return Math.abs(key.getFirst() * 127) % numPartitions;
        }
      }

      /**
       * 分组函数类。只要first相同就属于同一个组。
       */
    /*//第一种方法,实现接口RawComparator
    public static class GroupingComparator implements RawComparator<IntPair> {
        @Override
        public int compare(IntPair o1, IntPair o2) {
            int l = o1.getFirst();
            int r = o2.getFirst();
            return l == r ? 0 : (l < r ? -1 : 1);
        }
        @Override
        //一个字节一个字节的比,直到找到一个不相同的字节,然后比这个字节的大小作为两个字节流的大小比较结果。
        public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2){
            // TODO Auto-generated method stub
             return WritableComparator.compareBytes(b1, s1, Integer.SIZE/8,
                     b2, s2, Integer.SIZE/8);
        }
    }*/
    //第二种方法,继承WritableComparator
    public static class GroupingComparator extends WritableComparator {
          protected GroupingComparator() {
            super(IntPair.class, true);
          }
          @Override
          //Compare two WritableComparables.
          public int compare(WritableComparable w1, WritableComparable w2) {
            IntPair ip1 = (IntPair) w1;
            IntPair ip2 = (IntPair) w2;
            int l = ip1.getFirst();
            int r = ip2.getFirst();
            return l == r ? 0 : (l < r ? -1 : 1);
          }
        }

    // 自定义map
    public static class Map extends
            Mapper<LongWritable, Text, IntPair, IntWritable> {
        private final IntPair intkey = new IntPair();
        private final IntWritable intvalue = new IntWritable();
        public void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
            String line = value.toString();
            StringTokenizer tokenizer = new StringTokenizer(line);
            int left = 0;
            int right = 0;
            if (tokenizer.hasMoreTokens()) {
                left = Integer.parseInt(tokenizer.nextToken());
                if (tokenizer.hasMoreTokens())
                    right = Integer.parseInt(tokenizer.nextToken());
                intkey.set(left, right);
                intvalue.set(right);
                context.write(intkey, intvalue);
            }
        }
    }
    // 自定义reduce
    //
    public static class Reduce extends
            Reducer<IntPair, IntWritable, Text, IntWritable> {
        private final Text left = new Text();
        private static final Text SEPARATOR =
              new Text("------------------------------------------------");
        public void reduce(IntPair key, Iterable<IntWritable> values,
                Context context) throws IOException, InterruptedException {
            context.write(SEPARATOR, null);
            left.set(Integer.toString(key.getFirst()));
            for (IntWritable val : values) {
                context.write(left, val);
            }
        }
    }
    /**
     * @param args
     */
    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
        // TODO Auto-generated method stub
        // 读取hadoop配置
        Configuration conf = new Configuration();
        // 实例化一道作业
        Job job = new Job(conf, "secondarysort");
        job.setJarByClass(SecondarySort.class);
        // Mapper类型
        job.setMapperClass(Map.class);
        // 不再需要Combiner类型,因为Combiner的输出类型<Text, IntWritable>对Reduce的输入类型<IntPair, IntWritable>不适用
        //job.setCombinerClass(Reduce.class);
        // Reducer类型
        job.setReducerClass(Reduce.class);
        // 分区函数
        job.setPartitionerClass(FirstPartitioner.class);
        // 分组函数
        job.setGroupingComparatorClass(GroupingComparator.class);

        // map 输出Key的类型
        job.setMapOutputKeyClass(IntPair.class);
        // map输出Value的类型
        job.setMapOutputValueClass(IntWritable.class);
        // rduce输出Key的类型,是Text,因为使用的OutputFormatClass是TextOutputFormat
        job.setOutputKeyClass(Text.class);
        // rduce输出Value的类型
        job.setOutputValueClass(IntWritable.class);

        // 将输入的数据集分割成小数据块splites,同时提供一个RecordReder的实现。
        job.setInputFormatClass(TextInputFormat.class);
        // 提供一个RecordWriter的实现,负责数据输出。
        job.setOutputFormatClass(TextOutputFormat.class);

        // 输入hdfs路径
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        // 输出hdfs路径
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        // 提交job
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

mapreduce排序【二次排序】

时间: 2024-08-01 22:42:56

mapreduce排序【二次排序】的相关文章

mapreduce的二次排序实现方式

本文主要介绍下二次排序的实现方式 我们知道mapreduce是按照key来进行排序的,那么如果有有个需求就是先按照第一个字段排序,在第一个字段相等的情况下,按照第二个字段排序,这个其实就是二次排序. 下面就具体说一下二次排序的实现方式 1. 自定义一个key 为什么要自定义一个key,我们知道mapreduce中排序就是按照key来排序的,我们既然想要实现按照两个字段进行排序,默认的方式肯定不行,所以需要定义一个新的key,key里面有两个属性,也就是我们要排序的两个字段 首先,实现Writab

MapReduce处理二次排序(分区-排序-分组)

MapReduce二次排序原理 在map阶段,使用job.setInputFormatClass定义的InputFormat将输入的数据集分割成小数据块splites,同时InputFormat提供一个RecordReader的实现. 本例子中使用的时TextInputFormat,他提供的RecordReader会将文本的字节偏移量作为key,这一行的文本作为value. 这就是自定义Map的输入是<LongWritable,Text>的原因,然后调用自定义的Map的map方法,将一个个&l

mapreduce 的二次排序

一: 理解二次排序的功能, 使用自己理解的方式表达(包括自定义数据类型,分区,分组,排序) 二: 编写实现二次排序功能, 提供源码文件. 三:理解mapreduce join 的几种 方式,编码实现reduce join,提供源代码,说出思路. 一: 二次排序 使用自己理解的方式表达(包括自定义数据类型,分区,分组,排序) 1.1 二次排序的功能 1. 当客户端提交一个作业的时候,hadoop 会开启yarn 接受进行数据拷贝处理,之后交友有yarn 框架上的启动服务resourcemanage

二维数组里,根据数组字段为条件,进行总体排序(二维排序)

1 <?php 2 /** 3 * 二维数组根据某个字段排序 4 * 功能:按照用户的年龄倒序排序 5 * @author ruxing.li 6 */ 7 header('Content-Type:text/html;Charset=utf-8'); 8 $arrUsers = array( 9 array( 10 'id' => 1, 11 'name' => '张三', 12 'age' => 25, 13 ), 14 array( 15 'id' => 2, 16 '

算法 排序(二) 选择排序

选择排序(Selection sort)是一种简单直观的排序算法.它的工作原理是每一次从待排序的数据元素中选出最小(或最大)的一个元素,存放在序列的起始位置,直到全部待排序的数据元素排完. 思想 n个记录的文件的直接选择排序可经过n-1趟直接选择排序得到有序结果: ①初始状态:无序区为R[1..n],有序区为空. ②第1趟排序 在无序区R[1..n]中选出关键字最小的记录R[k],将它与无序区的第1个记录R[1]交换,使R[1..1]和R[2..n]分别变为记录个数增加1个的新有序区和记录个数减

Hadoop二次排序及MapReduce处理流程实例详解

一.概述 MapReduce框架对处理结果的输出会根据key值进行默认的排序,这个默认排序可以满足一部分需求,但是也是十分有限的,在我们实际的需求当中,往往有要对reduce输出结果进行二次排序的需求.对于二次排序的实现,网络上已经有很多人分享过了,但是对二次排序的实现原理及整个MapReduce框架的处理流程的分析还是有非常大的出入,而且部分分析是没有经过验证的.本文将通过一个实际的MapReduce二次排序的例子,讲述二次排序的实现和其MapReduce的整个处理流程,并且通过结果和Map.

Haoop MapReduce 的Partition和reduce端的二次排序

先贴一张原理图(摘自hadoop权威指南第三版) 实际中看了半天还是不太理解其中的Partition,和reduce端的二次排序,最终根据实验来结果来验证自己的理解 1eg 数据如下 2014010114 标识20140101日的温度为14度,需求为统计每年温度的最最高值 2014010114 2014010216 2014010317 2014010410... Partition 实际是根据map 任务的key,以及reduce任务的数量来决定最终来由那个reduce来处理,默认指定redu

结合手机上网流量业务来说明Hadoop中的二次排序机制,分区机制

本篇博客将结合手机上网流量业务来详细介绍Hadoop的二次排序机制.分区机制,先介绍一下业务场景: 先介绍一下业务场景:统计每个用户的上行流量和,下行流量和,以及总流量和. 本次描述所用数据: 日志格式描述: 日志flowdata.txt中的具体数据: 首先我们先通过mapreduce程序实现上面的业务逻辑: 代码实现: package FlowSum; import java.io.DataInput; import java.io.DataOutput; import java.io.IOE

Hadoop---mapreduce排序和二次排序以及全排序

自己学习排序和二次排序的知识整理如下. 1.Hadoop的序列化格式介绍:Writable 2.Hadoop的key排序逻辑 3.全排序 4.如何自定义自己的Writable类型 5.如何实现二次排序 1.Hadoop的序列化格式介绍:Writable 要了解和编写MR实现排序必须要知道的第一个知识点就是Writable相关的接口和类,这些是HADOOP自己的序列化格式.更多的可能是要关注他的Subinterfaces:WritableComparable<T>.他是继承Writable和Co

MapReduce排序之 二次排序

一:背景 Hadoop中虽然有自动排序和分组,由于自带的排序是按照Key进行排序的,有些时候,我们希望同时对Key和Value进行排序.自带的排序功能就无法满足我们了,还好Hadoop提供了一些组件可以让开发人员进行二次排序. 二:技术实现 我们先来看案例需求 #需求1: 首先按照第一列数字升序排列,当第一列数字相同时,第二列数字也升序排列(列之间用制表符\t隔开) [java] view plain copy 3   3 3   2 3   1 2   2 2   1 1   1 MapRed