用户可以继承Partitioner基类,也可以继承默认的HashPartitioner类,覆写其中的getPartition()方法实现自己的分区。
需求:本例是对上一个实例的改写,需求不变
package country; import java.io.IOException; import java.io.UnsupportedEncodingException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Partitioner; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.KeyValueLineRecordReader; import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class Multiples { public static void main(String[] args) throws Exception { /** * 【严重注意】 * 有分区的例子,必须达成java包在集群上运行 * 这是因为,eclipse其实是在本地模式运行。所以只能有一个reduce */ //本地模式,使用eclipse测试用的环境变量配置! //System.setProperty("hadoop.home.dir", "F:\\JAVA\\hadoop-2.2.0"); Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(Multiples.class); /** * 使用KeyValueTextInputFormat作为输入类型 */ job.setInputFormatClass(KeyValueTextInputFormat.class); /** * 指定key和Value的分隔符【默认也是\t】 */ conf.set(KeyValueLineRecordReader.KEY_VALUE_SEPERATOR, "\t"); job.setMapperClass(MyMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); //指定自定义的分区类 job.setPartitionerClass(MyPartitioner.class); job.setReducerClass(MyReducer.class); job.setNumReduceTasks(3); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.waitForCompletion(true); } /** * map阶段 */ public static class MyMapper extends Mapper<Text, Text, Text, Text>{ @Override protected void map(Text key, Text value, Context context) throws IOException, InterruptedException { context.write(key, value); } } /** * 分区函数 */ public static class MyPartitioner extends Partitioner<Text, Text>{ @Override public int getPartition(Text key, Text value, int numPartitions) { //生成以utf-8方式编码的汉字 String line = null; try { line = new String(key.getBytes(),0,key.getLength(),"utf-8"); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } if(line.equals("中国")){ return 0; }else if (line.equals("美国")) { return 1; }else return 2; } } /** * reduce阶段 */ public static class MyReducer extends Reducer<Text, Text, Text, Text>{ @Override protected void reduce(Text key, Iterable<Text> v2s, Context context) throws IOException, InterruptedException { for(Text text : v2s){ context.write(key, text); } } } }
时间: 2024-11-09 06:12:26