package cn.sjq.mr.sort.number;
import java.io.IOException;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.junit.Test;
/**
* MapReduce字符串排序,字符串排序是按照数据字典顺序进行排序
* 这里所有的类采用匿名内部类实现
* @author songjq
*
*/
public class StringSort {
/**
* 对字符串进行排序Mapper类
* @author songjq
*
*/
static class StringSortMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
private Text tkey = new Text();
@Override
protected void map(LongWritable k1, Text v1, Context context)
throws IOException, InterruptedException {
String line = v1.toString();
//分词 格式:Wait Events Statistics
String[] split = StringUtils.split(line," ");
for(int i=0;i<split.length;i++) {
tkey.set(split[i]);
context.write(tkey, NullWritable.get());
}
}
}
/**
* MapReduce提交job类
* 这里采用MapReduce默认的比较器进行字符串的升序排序
* 执行结果:
* Activity
Advisory
Buffer
Cache
Cache
Dictionary
Events
IO
Instance
...
* @throws Exception
*/
@Test
public void StringSortJob() throws Exception {
Job job = Job.getInstance(new Configuration());
job.setJarByClass(StringSort.class);
job.setMapperClass(StringSortMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
//采用MapReduce默认排序规则
//job.setSortComparatorClass(cls);
FileInputFormat.setInputPaths(job, new Path("D:\\test\\tmp\\sort\\Strings.data"));
FileOutputFormat.setOutputPath(job, new Path("D:\\test\\tmp\\sort\\out5"));
job.waitForCompletion(true);
}
/**
* 自定义StringSortMyComparator比较器,继承Text.Comparator重写compare方法,实现对字符串降序排序
* @author songjq
*
*/
static class StringSortMyComparator extends Text.Comparator{
//只需要在super前面加 - 即可实现降序排序
@Override
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
return - super.compare(b1, s1, l1, b2, s2, l2);
}
}
/**
* MapReduce提交job类
* 这里采用自定义比较器StringSortMyComparator实现对字符串的降序排序
* 执行结果:
* Latch
Instance
IO
Events
Dictionary
Cache
Cache
Buffer
Advisory
Activity
...
* @throws Exception
*/
@Test
public void StringSortUseMyComparatorJob() throws Exception {
Job job = Job.getInstance(new Configuration());
job.setJarByClass(StringSort.class);
job.setMapperClass(StringSortMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
//采用自定义比较器实现对字符串降序排序
job.setSortComparatorClass(StringSortMyComparator.class);
FileInputFormat.setInputPaths(job, new Path("D:\\test\\tmp\\sort\\Strings.data"));
FileOutputFormat.setOutputPath(job, new Path("D:\\test\\tmp\\sort\\out6"));
job.waitForCompletion(true);
}
}
原文地址:http://blog.51cto.com/2951890/2151971