hadoop编程小技巧(9)---二次排序(值排序)

代码测试环境:Hadoop2.4

应用场景:在Reducer端一般是key排序,而没有value排序,如果想对value进行排序,则可以使用此技巧。

应用实例描述:

比如针对下面的数据:

a,5
b,7
c,2
c,9
a,3
a,1
b,10
b,3
c,1

如果使用一般的MR的话,其输出可能是这样的:

a	1
a	3
a	5
b	3
b	10
b	7
c	1
c	9
c	2

从数据中可以看到其键是排序的,但是其值不是。通过此篇介绍的技巧可以做到下面的输出:

a	1
a	3
a	5
b	3
b	7
b	10
c	1
c	2
c	9

这个数据就是键和值都是排序的了。

二次排序原理:
1)自定义键类型,把值放入键中;

2)利用键的排序特性,可以顺便把值也排序了;

3)这时会有两个问题:

a. 数据传输到不同的Reducer会有异常;

b. 数据在Reducer中的分组不同;

针对这两个问题,需要使用自定义Partitioner、使用自定义GroupComparator来定义相应的逻辑;

实例:

driver类:

package fz.secondarysort;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class SortDriver extends Configured implements Tool{

	/**
	 * @param args
	 * @throws Exception
	 */
	public static void main(String[] args) throws Exception {
		// TODO Auto-generated method stub
		ToolRunner.run(new Configuration(), new SortDriver(),args);
	}

	@Override
	public int run(String[] args) throws Exception {
		Job job1 = Job.getInstance(getConf(), "secondary sort ");
		job1.setJarByClass(getClass());
		if(args.length!=5){
			System.err.println("Usage: <input> <output> <numReducers> <useSecondarySort> <useGroupComparator>");
			System.exit(-1);
		}
		Path out = new Path(args[1]);
		out.getFileSystem(getConf()).delete(out, true);
		FileInputFormat.setInputPaths(job1, new Path(args[0]));
		FileOutputFormat.setOutputPath(job1, out);
		if("true".equals(args[3])||"false".equals(args[3])){
			if("true".equals(args[3])){ // 使用二次排序
				job1.setMapperClass(Mapper1.class);
				job1.setReducerClass(Reducer1.class);
				job1.setMapOutputKeyClass(CustomKey.class);
				job1.setMapOutputValueClass(NullWritable.class);
				job1.setOutputKeyClass(CustomKey.class);
				job1.setOutputValueClass(NullWritable.class);
				if("true".equals(args[4])){
					job1.setGroupingComparatorClass(CustomGroupComparator.class);
				}else if("false".equals(args[4])){
					// do nothing
				}else{
					System.err.println("Wrong Group Comparator argument!");
					System.exit(-1);
				}
				job1.setPartitionerClass(CustomPartitioner.class);
			}else{  // 不使用二次排序
				job1.setMapperClass(Mapper2.class);
				job1.setReducerClass(Reducer2.class);
				job1.setMapOutputKeyClass(Text.class);
				job1.setMapOutputValueClass(IntWritable.class);
				job1.setOutputKeyClass(Text.class);
				job1.setOutputValueClass(IntWritable.class);
			}
		}else{
			System.err.println("The fourth argument should be ‘true‘ or ‘false‘");
			System.exit(-1);
		}
		job1.setInputFormatClass(TextInputFormat.class);
		job1.setOutputFormatClass(TextOutputFormat.class);

		job1.setNumReduceTasks(Integer.parseInt(args[2]));

		boolean job1success = job1.waitForCompletion(true);
		if(!job1success) {
			System.out.println("The CreateBloomFilter job failed!");
			return -1;
		}
		return 0;
	}

}

mapper1(二次排序mapper):

package fz.secondarysort;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
/**
 * 有二次排序mapper
 * @author fansy
 *
 */
public class Mapper1 extends
		Mapper<LongWritable, Text, CustomKey, NullWritable> {
	private String COMMA =",";
	private CustomKey newKey = new CustomKey();
	public void map(LongWritable key,Text value, Context cxt ) throws IOException,InterruptedException{
		String [] values = value.toString().split(COMMA);
		newKey.setSymbol(values[0]);
		newKey.setValue(Integer.parseInt(values[1]));
		cxt.write(newKey, NullWritable.get());
	}
}

Reducer1(二次排序Reducer)

package fz.secondarysort;

import java.io.IOException;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
 * 二次排序Reducer
 * @author fansy
 *
 */
public class Reducer1 extends
		Reducer<CustomKey, NullWritable, CustomKey, NullWritable> {
	private Logger log = LoggerFactory.getLogger(Reducer1.class);
	public void setup(Context cxt){
		log.info("reducer1*********************in setup()");
	}
	public void reduce(CustomKey key ,Iterable<NullWritable> values,Context cxt)throws IOException,InterruptedException{
		log.info("reducer1******* in reduce()");
		for(NullWritable v:values){
			log.info("key:"+key+"-->value:"+v);
			cxt.write(key, v);
		}
		log.info("reducer1****** in reduce() *******end");
	}
}

无排序mapper2

package fz.secondarysort;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
/**
 * 不是二次排序
 * @author fansy
 *
 */
public class Mapper2 extends Mapper<LongWritable, Text, Text, IntWritable> {
	private String COMMA =",";
	public void map(LongWritable key,Text value, Context cxt ) throws IOException,InterruptedException{
		String [] values = value.toString().split(COMMA);
		Text newKey =  new Text(values[0]);
		cxt.write(newKey, new IntWritable(Integer.parseInt(values[1])));
	}
}

无排序Reducer2

package fz.secondarysort;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
 * 无二次排序
 * @author fansy
 *
 */
public class Reducer2 extends Reducer<Text, IntWritable, Text, IntWritable> {
	private Logger log = LoggerFactory.getLogger(Reducer2.class);
	public void setup(Context cxt){
		log.info("reducer2*********************in setup()");
	}
	public void reduce(Text key ,Iterable<IntWritable> values,Context cxt)throws IOException,InterruptedException{
		log.info("reducer2******* in reduce()");
		for(IntWritable v:values){
			log.info("key:"+key+"-->value:"+v);
			cxt.write(key, v);
		}
		log.info("reducer2****** in reduce() *******end");
	}
}

自定义key

package fz.secondarysort;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;
/**
 * symbol 是原始的key
 * value是原始的值
 * @author fansy
 *
 */
public class CustomKey implements WritableComparable<CustomKey> {

	private int value;
	private String symbol;

	@Override
	public void write(DataOutput out) throws IOException {
		out.writeInt(this.value);
		out.writeUTF(this.symbol);
	}

	@Override
	public void readFields(DataInput in) throws IOException {
		this.value=in.readInt();
		this.symbol= in.readUTF();
	}

	@Override
	public int compareTo(CustomKey o) {
		int result = this.symbol.compareTo(o.symbol);
		return result!=0 ? result :this.value-o.value;
	}

	@Override
	public String toString(){
		return this.symbol+"\t"+this.value;
	}
	public int getValue() {
		return value;
	}

	public void setValue(int value) {
		this.value = value;
	}

	public String getSymbol() {
		return symbol;
	}

	public void setSymbol(String symbol) {
		this.symbol = symbol;
	}

}

自定义GroupComparator

package fz.secondarysort;

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
/**
 * 只对比symbol,即原始的键
 * @author fansy
 *
 */
public class CustomGroupComparator extends WritableComparator {

	protected CustomGroupComparator(){
		super(CustomKey.class,true);
	}
	@SuppressWarnings("rawtypes")
	@Override
	public int compare(WritableComparable a,WritableComparable b){
		CustomKey ak = (CustomKey) a;
		CustomKey bk = (CustomKey) b;
		return ak.getSymbol().compareTo(bk.getSymbol());
	}
}

自定义Partitioner

package fz.secondarysort;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
/**
 * 保持原始分组条件
 * @author fansy
 *
 * @param <K1>
 * @param <V1>
 */
public class CustomPartitioner<K1, V1> extends Partitioner<K1, V1> {

	@Override
	public int getPartition(K1 key, V1 value, int numPartitions) {
		CustomKey keyK= (CustomKey) key;
		Text tmpValue =new  Text(keyK.getSymbol());
		return (tmpValue.hashCode() & Integer.MAX_VALUE)%numPartitions;
	}

}

结果查看:

不使用二次排序

使用二次排序,同时使用自定义组分类器:


可以看到不管二次排序和非二次排序,在Reducer端都只有三个分组;同时二次排序的其值也是排序的;

如果在二次排序中不使用组分类器,那么会得到下面的结果:

从这个结果可以看到有大于3个分组,这样的结果可能是有问题的(对于键是a的分组 整合不了数据);

同时上面使用了自定义的Partitioner,这里看不出区别是因为只有一个Reducer,如果有多个就可以看出差别。

总结:使用二次排序可以对不单单是键排序,同时可以对值进行排序,即在Reducer每个组中接收的value值是排序的,这样在某些操作中是可以增加效率的。

分享,成长,快乐

转载请注明blog地址:http://blog.csdn.net/fansy1990

hadoop编程小技巧(9)---二次排序(值排序)

时间: 2024-10-07 16:43:24

hadoop编程小技巧(9)---二次排序(值排序)的相关文章

hadoop编程小技巧(4)---全局key排序类TotalOrderPartitioner

Hadoop代码测试版本:Hadoop2.4 原理:在进行MR程序之前对输入数据进行随机提取样本,把样本排序,然后在MR的中间过程Partition的时候使用这个样本排序的值进行分组数据,这样就可以达到全局排序的目的了. 难点:如果使用Hadoop提供的方法来实现全局排序,那么要求Mapper的输入.输出的key不变才可以,因为在源码InputSampler中提供的随机抽取的数据是输入数据最原始的key,如下代码(line:225): for (int i = 0; i < splitsToSa

hadoop编程小技巧(2)---计数器Counter

Hadoop代码测试版本:2.4 应用场景:在Hadoop编程的时候,有时我们在进行我们算法逻辑的时候想附带了解下数据的一些特性,比如全部数据的记录数有多少,map的输出有多少等等信息(这些是在算法运行完毕后,直接有的),就可以使用计数器Counter. 如果是针对很特定的数据的一些统计,比如统计以1开头的所有记录数等等信息,这时就需要自定义Counter.自定义Counter有两种方式,第一种,定义枚举类型,类似: public enum MyCounters{ ALL_RECORDS,ONE

hadoop编程小技巧(1)---map端聚合

测试hadoop版本:2.4 Map端聚合的应用场景:当我们只关心所有数据中的部分数据时,并且数据可以放入内存中. 使用的好处:可以大大减小网络数据的传输量,提高效率: 一般编程思路:在Mapper的map函数中读入所有数据,然后添加到一个List(队列)中,然后在cleanup函数中对list进行处理,输出我们关系的少量数据. 实例: 在map函数中使用空格分隔每行数据,然后把每个单词添加到一个堆栈中,在cleanup函数中输出堆栈中单词次数比较多的单词以及次数: package fz.inm

hadoop编程小技巧(3)---自定义分区类Partitioner

Hadoop代码测试环境:Hadoop2.4 原理:在Hadoop的MapReduce过程中,Mapper读取处理完成数据后,会把数据发送到Partitioner,由Partitioner来决定每条记录应该送往哪个reducer节点,默认使用的是HashPartitioner,其核心代码如下: /** Use {@link Object#hashCode()} to partition. */ public int getPartition(K2 key, V2 value, int numRe

hadoop编程小技巧(6)---处理大量小数据文件CombineFileInputFormat应用

代码测试环境:Hadoop2.4 应用场景:当需要处理很多小数据文件的时候,可以应用此技巧来达到高效处理数据的目的. 原理:应用CombineFileInputFormat,可以把多个小数据文件在进行分片的时候合并.由于每个分片会产生一个Mapper,当一个Mapper处理的数据比较小的时候,其效率较低.而一般使用Hadoop处理数据时,即默认方式,会把一个输入数据文件当做一个分片,这样当输入文件较小时就会出现效率低下的情况. 实例: 参考前篇blog:hadoop编程小技巧(5)---自定义输

hadoop编程小技巧(7)---自定义输出文件格式以及输出到不同目录

代码测试环境:Hadoop2.4 应用场景:当需要定制输出数据格式时可以采用此技巧,包括定制输出数据的展现形式,输出路径,输出文件名称等. Hadoop内置的输出文件格式有: 1)FileOutputFormat<K,V>  常用的父类: 2)TextOutputFormat<K,V> 默认输出字符串输出格式: 3)SequenceFileOutputFormat<K,V> 序列化文件输出: 4)MultipleOutputs<K,V> 可以把输出数据输送到

hadoop编程小技巧(5)---自定义输入文件格式类InputFormat

Hadoop代码测试环境:Hadoop2.4 应用:在对数据需要进行一定条件的过滤和简单处理的时候可以使用自定义输入文件格式类. Hadoop内置的输入文件格式类有: 1)FileInputFormat<K,V>这个是基本的父类,我们自定义就直接使用它作为父类: 2)TextInputFormat<LongWritable,Text>这个是默认的数据格式类,我们一般编程,如果没有特别指定的话,一般都使用的是这个:key代表当前行数据距离文件开始的距离,value代码当前行字符串:

hadoop编程小技巧(8)---Unit Testing (单元测试)

所需环境: Hadoop相关jar包(下载官网发行版即可): 下载junit包(最新为好): 下载mockito包: 下载mrunit包: 下载powermock-mockito包: 相关包截图如下(相关下载参考:http://download.csdn.net/detail/fansy1990/7690977): 应用场景: 在进行Hadoop的一般MR编程时,需要验证我们的业务逻辑,或者说是验证数据流的时候可以使用此环境,这个环境不要求真实的云平台,只是针对算法或者代码逻辑进行验证,方便调试

hadoop编程小技巧(7)---自己定义输出文件格式以及输出到不同文件夹

代码測试环境:Hadoop2.4 应用场景:当须要定制输出数据格式时能够採用此技巧,包含定制输出数据的展现形式.输出路径.输出文件名称称等. Hadoop内置的输出文件格式有: 1)FileOutputFormat<K,V>  经常使用的父类. 2)TextOutputFormat<K,V> 默认输出字符串输出格式. 3)SequenceFileOutputFormat<K,V> 序列化文件输出: 4)MultipleOutputs<K,V> 能够把输出数据