代码测试环境:Hadoop2.4
应用场景:在Reducer端一般是key排序,而没有value排序,如果想对value进行排序,则可以使用此技巧。
应用实例描述:
比如针对下面的数据:
a,5 b,7 c,2 c,9 a,3 a,1 b,10 b,3 c,1
如果使用一般的MR的话,其输出可能是这样的:
a 1 a 3 a 5 b 3 b 10 b 7 c 1 c 9 c 2
从数据中可以看到其键是排序的,但是其值不是。通过此篇介绍的技巧可以做到下面的输出:
a 1 a 3 a 5 b 3 b 7 b 10 c 1 c 2 c 9
这个数据就是键和值都是排序的了。
二次排序原理:
1)自定义键类型,把值放入键中;
2)利用键的排序特性,可以顺便把值也排序了;
3)这时会有两个问题:
a. 数据传输到不同的Reducer会有异常;
b. 数据在Reducer中的分组不同;
针对这两个问题,需要使用自定义Partitioner、使用自定义GroupComparator来定义相应的逻辑;
实例:
driver类:
package fz.secondarysort; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class SortDriver extends Configured implements Tool{ /** * @param args * @throws Exception */ public static void main(String[] args) throws Exception { // TODO Auto-generated method stub ToolRunner.run(new Configuration(), new SortDriver(),args); } @Override public int run(String[] args) throws Exception { Job job1 = Job.getInstance(getConf(), "secondary sort "); job1.setJarByClass(getClass()); if(args.length!=5){ System.err.println("Usage: <input> <output> <numReducers> <useSecondarySort> <useGroupComparator>"); System.exit(-1); } Path out = new Path(args[1]); out.getFileSystem(getConf()).delete(out, true); FileInputFormat.setInputPaths(job1, new Path(args[0])); FileOutputFormat.setOutputPath(job1, out); if("true".equals(args[3])||"false".equals(args[3])){ if("true".equals(args[3])){ // 使用二次排序 job1.setMapperClass(Mapper1.class); job1.setReducerClass(Reducer1.class); job1.setMapOutputKeyClass(CustomKey.class); job1.setMapOutputValueClass(NullWritable.class); job1.setOutputKeyClass(CustomKey.class); job1.setOutputValueClass(NullWritable.class); if("true".equals(args[4])){ job1.setGroupingComparatorClass(CustomGroupComparator.class); }else if("false".equals(args[4])){ // do nothing }else{ System.err.println("Wrong Group Comparator argument!"); System.exit(-1); } job1.setPartitionerClass(CustomPartitioner.class); }else{ // 不使用二次排序 job1.setMapperClass(Mapper2.class); job1.setReducerClass(Reducer2.class); job1.setMapOutputKeyClass(Text.class); job1.setMapOutputValueClass(IntWritable.class); job1.setOutputKeyClass(Text.class); job1.setOutputValueClass(IntWritable.class); } }else{ System.err.println("The fourth argument should be ‘true‘ or ‘false‘"); System.exit(-1); } job1.setInputFormatClass(TextInputFormat.class); job1.setOutputFormatClass(TextOutputFormat.class); job1.setNumReduceTasks(Integer.parseInt(args[2])); boolean job1success = job1.waitForCompletion(true); if(!job1success) { System.out.println("The CreateBloomFilter job failed!"); return -1; } return 0; } }
mapper1(二次排序mapper):
package fz.secondarysort; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; /** * 有二次排序mapper * @author fansy * */ public class Mapper1 extends Mapper<LongWritable, Text, CustomKey, NullWritable> { private String COMMA =","; private CustomKey newKey = new CustomKey(); public void map(LongWritable key,Text value, Context cxt ) throws IOException,InterruptedException{ String [] values = value.toString().split(COMMA); newKey.setSymbol(values[0]); newKey.setValue(Integer.parseInt(values[1])); cxt.write(newKey, NullWritable.get()); } }
Reducer1(二次排序Reducer)
package fz.secondarysort; import java.io.IOException; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Reducer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * 二次排序Reducer * @author fansy * */ public class Reducer1 extends Reducer<CustomKey, NullWritable, CustomKey, NullWritable> { private Logger log = LoggerFactory.getLogger(Reducer1.class); public void setup(Context cxt){ log.info("reducer1*********************in setup()"); } public void reduce(CustomKey key ,Iterable<NullWritable> values,Context cxt)throws IOException,InterruptedException{ log.info("reducer1******* in reduce()"); for(NullWritable v:values){ log.info("key:"+key+"-->value:"+v); cxt.write(key, v); } log.info("reducer1****** in reduce() *******end"); } }
无排序mapper2
package fz.secondarysort; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; /** * 不是二次排序 * @author fansy * */ public class Mapper2 extends Mapper<LongWritable, Text, Text, IntWritable> { private String COMMA =","; public void map(LongWritable key,Text value, Context cxt ) throws IOException,InterruptedException{ String [] values = value.toString().split(COMMA); Text newKey = new Text(values[0]); cxt.write(newKey, new IntWritable(Integer.parseInt(values[1]))); } }
无排序Reducer2
package fz.secondarysort; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * 无二次排序 * @author fansy * */ public class Reducer2 extends Reducer<Text, IntWritable, Text, IntWritable> { private Logger log = LoggerFactory.getLogger(Reducer2.class); public void setup(Context cxt){ log.info("reducer2*********************in setup()"); } public void reduce(Text key ,Iterable<IntWritable> values,Context cxt)throws IOException,InterruptedException{ log.info("reducer2******* in reduce()"); for(IntWritable v:values){ log.info("key:"+key+"-->value:"+v); cxt.write(key, v); } log.info("reducer2****** in reduce() *******end"); } }
自定义key
package fz.secondarysort; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.WritableComparable; /** * symbol 是原始的key * value是原始的值 * @author fansy * */ public class CustomKey implements WritableComparable<CustomKey> { private int value; private String symbol; @Override public void write(DataOutput out) throws IOException { out.writeInt(this.value); out.writeUTF(this.symbol); } @Override public void readFields(DataInput in) throws IOException { this.value=in.readInt(); this.symbol= in.readUTF(); } @Override public int compareTo(CustomKey o) { int result = this.symbol.compareTo(o.symbol); return result!=0 ? result :this.value-o.value; } @Override public String toString(){ return this.symbol+"\t"+this.value; } public int getValue() { return value; } public void setValue(int value) { this.value = value; } public String getSymbol() { return symbol; } public void setSymbol(String symbol) { this.symbol = symbol; } }
自定义GroupComparator
package fz.secondarysort; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; /** * 只对比symbol,即原始的键 * @author fansy * */ public class CustomGroupComparator extends WritableComparator { protected CustomGroupComparator(){ super(CustomKey.class,true); } @SuppressWarnings("rawtypes") @Override public int compare(WritableComparable a,WritableComparable b){ CustomKey ak = (CustomKey) a; CustomKey bk = (CustomKey) b; return ak.getSymbol().compareTo(bk.getSymbol()); } }
自定义Partitioner
package fz.secondarysort; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Partitioner; /** * 保持原始分组条件 * @author fansy * * @param <K1> * @param <V1> */ public class CustomPartitioner<K1, V1> extends Partitioner<K1, V1> { @Override public int getPartition(K1 key, V1 value, int numPartitions) { CustomKey keyK= (CustomKey) key; Text tmpValue =new Text(keyK.getSymbol()); return (tmpValue.hashCode() & Integer.MAX_VALUE)%numPartitions; } }
结果查看:
不使用二次排序
使用二次排序,同时使用自定义组分类器:
可以看到不管二次排序和非二次排序,在Reducer端都只有三个分组;同时二次排序的其值也是排序的;
如果在二次排序中不使用组分类器,那么会得到下面的结果:
从这个结果可以看到有大于3个分组,这样的结果可能是有问题的(对于键是a的分组 整合不了数据);
同时上面使用了自定义的Partitioner,这里看不出区别是因为只有一个Reducer,如果有多个就可以看出差别。
总结:使用二次排序可以对不单单是键排序,同时可以对值进行排序,即在Reducer每个组中接收的value值是排序的,这样在某些操作中是可以增加效率的。
分享,成长,快乐
转载请注明blog地址:http://blog.csdn.net/fansy1990
hadoop编程小技巧(9)---二次排序(值排序)