MapReduce二次排序原理
在map阶段,使用job.setInputFormatClass定义的InputFormat将输入的数据集分割成小数据块splites,同时InputFormat提供一个RecordReader的实现。
本例子中使用的时TextInputFormat,他提供的RecordReader会将文本的字节偏移量作为key,这一行的文本作为value。
这就是自定义Map的输入是<LongWritable,Text>的原因,然后调用自定义的Map的map方法,将一个个<LongWritable,Text>对输入的给Map的map方法。
注意输出应该符合自定义Map中定义的输出<IntPair,IntWritable>.最终是生成一个List<IntPair,IntWritable>,在map阶段的最后,会先调用job.setPartitionerClass对这个List进行分区,每个分区映射到一个reducer。每个分区内又调用job.setSortComparatorClass设置的key比较函数类排序。可以看到,这本身就是一个二次排序。如果没有通过job.setSortComparatorClass设置Key比较函数类,则使用key的实现的compareTo方法。
在reduce阶段,reducer接收到所有映射到这个reducer的map输出后,也会调用job.setSortComparatorClass设置的key比较函数类对所有数据对排序,然后开始构造一个key对应的value迭代器,这是就要用到分组,使用job.setGroupingComparatorClass设置的分组函数类。只要这个比较器比较的两个key相同,他们就属于同一个组,他们的value就放在一个value迭代器,而这个迭代器的key使用属于同一个组的所有key的第一个key。最后就是进入Reducer的reduce方法,reduce方法的输入是所有的(key和他的value迭代器)。同样注意输入与输出的类型必须与自定义的reducer中声明的一致。
核心总结
1.map最后阶段进行partition分区。一般使用job.setPartitionerClass设置的类,如果没有自定义的类,用key的hashcode()方法进行排序
2.每个分区内部调用job.setSortComparatorClass设置Key的比较函数类进行排序,如果没有则使用key的实现的compareTo方法。
3.当reduce接收到所有map传输过来的数据之后,调job.setSortComparatorClass设置的key比较函数类对所有数据对排序,如果没有则使用key的实现的compareTo方法
4.紧接着使用job.setGroupingComparatorClass设置的分组函数类,进行分组,同一个key的value放在一个迭代器里面
分区 ---> 排序(二次) ---> 分组
分区默认的是key的hashcode()
排序默认的实key的compareTo()
-----------------------------------------
job.setPartitionerClass(Partitioner p); //设置分区。默认分区时hashcode()
job.setSortComparatorClass(RawComparator c); //比较排序。shuffle阶段map输出之后,reduce之前。默认是key的compareTo()方法
job.setGroupingComparatorClass(RawComparator c); //分组。Reduce阶段
-----------------------------------------
案例
原始数据
2 12:12:34 2_hao123
3 09:10:34 3_baidu
1 15:02:41 1_google
3 22:11:34 3_sougou
1 19:23:23 1_baidu
2 13:56:60 2_soso
分别依据第一列和第二列对数据进行二次排序
1.分区类
package test.mr.seconderysort; import org.apache.hadoop.io.Text; /* * 分区类 */ public class Partitioner extends org.apache.hadoop.mapreduce.Partitioner<StringPart, Text> { @Override public int getPartition(StringPart key, Text value, int numPartitions) { // TODO Auto-generated method stub return Math.abs(key.hashCode()) % numPartitions; } }
2.自定义Map输出的key类,将原始数据要排序的两列作为该JavaBean的属性,实现WritableComparable接口,实现CompareTo()排序方法
Ps:WritableComparatable接口中的CompareTo()方法:在这个方法中,如果返回-1,则当前对象排前面,返回1,就排后面 ,0,就相等。
String类中的CompareTo()方法:
/*
* compareTo()的返回值是整型,它是先比较对应字符的大小(ASCII码顺序),如果第一个字符和参数的第一个字符不等,结束比较,返回他们之间的差值,如果第一个字符和参数的第一个字符相等,则以第二个字符和参数的第二个字符做比较,以此类推,直至比较的字符或被比较的字符有一方全比较完,这时就比较字符的长度.
*
* 例: String s1 = "abc";
* String s2 = "abcd";
* String s3 = "abcdfg";
* String s4 = "1bcdfg";
* String s5 = "cdfg";
* System.out.println( s1.compareTo(s2) );// -1 (前面相等,s1长度小1)
* System.out.println( s1.compareTo(s3) ); //-3 (前面相等,s1长度小3)
* System.out.println( s1.compareTo(s4) ); //48("a"的ASCII码是97,"1"的的ASCII码是49,所以返回48)
* System.out.println( s1.compareTo(s5) ); // -2 ("a"的ASCII码是97,"c"的ASCII码是99,所以返回-2)
*/
package test.mr.seconderysort; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.WritableComparable; /* * 自定义key */ /* *如果想对自己写的类排序,你就把自己写的这个类实现Comparable接口 *然后写一个comparaTo方法来规定这个类的对象排序的顺序。 *在这个方法中,如果返回-1,则当前对象排前面,返回1,就排后面 ,0,就相等 */ public class StringPart implements WritableComparable<StringPart> { /* * 两列排序 */ private String first; private String second; public String getFirst() { return first; } public void setFirst(String first) { this.first = first; } public String getSecond() { return second; } public void setSecond(String second) { this.second = second; } public StringPart() { super(); // TODO Auto-generated constructor stub } public StringPart(String first, String second) { super(); this.first = first; this.second = second; } @Override public void write(DataOutput out) throws IOException { out.writeUTF(first); out.writeUTF(second); } @Override public void readFields(DataInput in) throws IOException { this.first = in.readUTF(); this.second = in.readUTF(); } /* * 排序 */ /* * compareTo()的返回值是整型,它是先比较对应字符的大小(ASCII码顺序),如果第一个字符和参数的第一个字符不等,结束比较,返回他们之间的 * * 差值,如果第一个字符和参数的第一个字符相等,则以第二个字符和参数的第二个字符做比较,以此类推,直至比较的字符或被比较的字符有一方 * * 全比较完,这时就比较字符的长度. * * 例: String s1 = "abc"; * String s2 = "abcd"; * String s3 = "abcdfg"; * String s4 = "1bcdfg"; * String s5 = "cdfg"; * System.out.println( s1.compareTo(s2) );// -1 (前面相等,s1长度小1) * System.out.println( s1.compareTo(s3) ); //-3 (前面相等,s1长度小3) * System.out.println( s1.compareTo(s4) ); //48("a"的ASCII码是97,"1"的的ASCII码是49,所以返回48) * System.out.println( s1.compareTo(s5) ); // -2 ("a"的ASCII码是97,"c"的ASCII码是99,所以返回-2) */ @Override public int compareTo(StringPart o) { if (!this.first.equals(o.getFirst())) { return first.compareTo(o.getFirst()); // 字符串的compareTo()方法 } else { if (!this.second.equals(o.getSecond())) { return second.compareTo(o.getSecond()); } else { return 0; } } } @Override public int hashCode() { final int prime = 31; int result = 1; result = prime * result + ((first == null) ? 0 : first.hashCode()); result = prime * result + ((second == null) ? 0 : second.hashCode()); return result; } @Override public boolean equals(Object obj) { if (this == obj) return true; if (obj == null) return false; if (getClass() != obj.getClass()) return false; StringPart other = (StringPart) obj; if (first == null) { if (other.first != null) return false; } else if (!first.equals(other.first)) return false; if (second == null) { if (other.second != null) return false; } else if (!second.equals(other.second)) return false; return true; } }
3.分组类(根据原始数据的第一列进行分组)
package test.mr.seconderysort; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; /* * 实现分组 */ public class Grouping extends WritableComparator { protected Grouping() { super(StringPart.class, true); } @Override public int compare(WritableComparable a, WritableComparable b) { StringPart a1 = (StringPart) a; StringPart b1 = (StringPart) b; // 只需要比较a1,b1的first字段即认为他们是否属于同组 return a1.getFirst().compareTo(b1.getFirst()); } }
4.Map类
package test.mr.seconderysort; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class SeconderyMap extends Mapper<LongWritable, Text, StringPart, Text> { @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, StringPart, Text>.Context context) throws IOException, InterruptedException { String line = value.toString(); String[] str = line.split("\t"); if (str.length == 3) { StringPart temp = new StringPart(str[0], str[1]); context.write(temp, value); } } }
5.Reduce类
package test.mr.seconderysort; import java.io.IOException; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class SeconderyRedu extends Reducer<StringPart, Text, NullWritable, Text> { private static Text part = new Text("------------"); @Override protected void reduce(StringPart key, Iterable<Text> values, Reducer<StringPart, Text, NullWritable, Text>.Context context) throws IOException, InterruptedException { context.write(NullWritable.get(), part); for (Text t : values) { context.write(NullWritable.get(), t); } } }
6.job类
package test.mr.seconderysort; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class SeconderyMain { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = new Job(conf); job.setJarByClass(SeconderyMain.class); job.setGroupingComparatorClass(Grouping.class); job.setPartitionerClass(Partitioner.class); job.setMapperClass(SeconderyMap.class); job.setMapOutputKeyClass(StringPart.class); job.setMapOutputValueClass(Text.class); job.setReducerClass(SeconderyRedu.class); job.setOutputKeyClass(NullWritable.class); job.setOutputValueClass(Text.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.waitForCompletion(true); } }