本文主要介绍下二次排序的实现方式
我们知道mapreduce是按照key来进行排序的,那么如果有有个需求就是先按照第一个字段排序,在第一个字段相等的情况下,按照第二个字段排序,这个其实就是二次排序。
下面就具体说一下二次排序的实现方式
1. 自定义一个key
为什么要自定义一个key,我们知道mapreduce中排序就是按照key来排序的,我们既然想要实现按照两个字段进行排序,默认的方式肯定不行,所以需要定义一个新的key,key里面有两个属性,也就是我们要排序的两个字段
首先,实现WritableComparable接口,因为key是可序列化并且可以比较的
其次,重载相关的方法,例如序列化、反序列化相关的方法write、readFields。重载在分区的时候要用到的hashcode方法,注意后面会说道一个partitioner类,也是用来分区的,用hashcode方法和partitioner类进行分区都是可以的,使用其中的一个即可。重载排序用的compareTo方法,这个就是真正对排序起作用的方法。
2. 分区函数类
上面定义了一个新的key,那么我现在做分发,到底按照什么样的规则进行分发是在分区函数类中定义的,这个类要继承Partitioner类,重载其中的分区方法getPartition,在main函数里给job添加上即可,例如:job.setPartitionerClass(partitioner.class)
这个类的作用跟key的hashcode方法的作用一样,所以如果在hashcode方法中写了分区的方法,这个分区类是可以省掉的
3. 比较函数类
这个类决定着key的排序规则,是一个比较器,需要继承WritableComparator类,并且重载其中的compare方法。在main函数里给job添加上即可,例如:job.setSortComparatorClass(KeyComparator.class)
这个类的作用跟自定义key的compareTo方法一样,如果在自定义的key中重载的compareTo方法,则这个类可省略。
4. 分组函数类
通过分区类,我们重新定义了key的分区规则,但是多个key不同的也可以进入到一个reducer中,所以我们需要分组函数类来定义什么样的key做为一组来执行,因为也涉及到比较,所以这个类也需要继承WritableComparator,并且重载其中的compare方法,在main函数中加入即可,例如:job.setPartitionerClass(partitioner.class);
下面是具体实现的代码
public class SecondSortTest { private static String input = "/dsap/rawdata/secondSortTest/result3"; private static String output = "/dsap/rawdata/secondSortTest/result6"; public static class Mapper1 extends Mapper<Object, Text, Pair, Text> { private Pair pair = new Pair(); public void map(Object key, Text value, Context context) throws IOException, InterruptedException { String[] segs = value.toString().split("\\s+"); pair.set(Float.parseFloat(segs[0]), Float.parseFloat(segs[1])); context.write(pair, new Text(segs[1])); } } public static class Reducer2 extends Reducer<Pair, Text, Text, Text> { public void reduce(Pair key, Iterable<Text> values, Context context) throws IOException, InterruptedException { context.write(new Text(key.toString()), new Text("===========")); for (Text text : values) { context.write(new Text(key.toString()), text); } } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); /** 判断输出路径是否存在,如果存在,则删除 */ FileSystem hdfs = FileSystem.get(conf); Job job = new Job(conf, "secondSortTest"); job.setJarByClass(SecondSortTest.class); FileInputFormat.addInputPath(job, new Path(input)); if (hdfs.exists(new Path(output))) hdfs.delete(new Path(output)); FileOutputFormat.setOutputPath(job, new Path(output)); job.setGroupingComparatorClass(GroupingComparator.class); job.setNumReduceTasks(19); job.setMapperClass(Mapper1.class); job.setReducerClass(Reducer2.class); job.setMapOutputKeyClass(Pair.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.waitForCompletion(true); } public static class partitioner extends Partitioner<Pair, Text> { @Override public int getPartition(Pair key, Text value, int numPartitions) { return Math.abs((int) (key.getFirst() * 127)) % numPartitions; } } static class Pair implements WritableComparable<Pair> { private float first; private float second = 0; @Override public void readFields(DataInput in) throws IOException { first = in.readFloat(); second = in.readFloat(); } @Override public void write(DataOutput out) throws IOException { out.writeFloat(first); out.writeFloat(second); } @Override public int hashCode() { return (int) (first * 127); } // 这里的代码是关键,因为对key排序时,调用的就是这个compareTo方法 @Override public int compareTo(Pair o) { if (first != o.first) { return first - o.first > 0 ? 1 : -1; } else if (second != o.second) { return second - o.second > 0 ? 1 : -1; } return 0; } public void set(float left, float right) { first = left; second = right; } public float getFirst() { return first; } public float getSecond() { return second; } @Override public String toString() { return "Pair [first=" + first + ", second=" + second + "]"; } } static class GroupingComparator implements RawComparator<Pair> { @Override public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { return WritableComparator.compareBytes(b1, s1, Integer.SIZE / 8, b2, s2, Integer.SIZE / 8); } @Override public int compare(Pair o1, Pair o2) { float first1 = o1.getFirst(); float first2 = o2.getFirst(); return first1 - first2 > 0 ? 1 : -1; } }
}
上面的代码中注意一点,就是reduce中的key到底是什么,如果我把key直接tostring打印出来,那么这个值是排序排在最前面的那个key,如果我遍历value迭代器,并且在里面将key也打印出来,可以看到,迭代器的value里对应的key也被迭代出来了
参考:http://www.cnblogs.com/xuxm2007/archive/2011/09/03/2165805.html