定制Partitioner

用户可以继承Partitioner基类,也可以继承默认的HashPartitioner类,覆写其中的getPartition()方法实现自己的分区。

需求:本例是对上一个实例的改写,需求不变

package country;

import java.io.IOException;
import java.io.UnsupportedEncodingException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueLineRecordReader;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class Multiples {

    public static void main(String[] args) throws Exception {
        /**
         * 【严重注意】
         * 有分区的例子,必须达成java包在集群上运行
         * 这是因为,eclipse其实是在本地模式运行。所以只能有一个reduce
         */
        //本地模式,使用eclipse测试用的环境变量配置!
        //System.setProperty("hadoop.home.dir", "F:\\JAVA\\hadoop-2.2.0");

        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
        job.setJarByClass(Multiples.class);        

        /**
         * 使用KeyValueTextInputFormat作为输入类型
         */
        job.setInputFormatClass(KeyValueTextInputFormat.class);

        /**
         * 指定key和Value的分隔符【默认也是\t】
         */
        conf.set(KeyValueLineRecordReader.KEY_VALUE_SEPERATOR, "\t"); 

        job.setMapperClass(MyMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);

        //指定自定义的分区类
        job.setPartitionerClass(MyPartitioner.class);

        job.setReducerClass(MyReducer.class);
        job.setNumReduceTasks(3);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);        

        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        job.waitForCompletion(true);
    }

    /**
     * map阶段
     */
    public static class MyMapper extends Mapper<Text, Text, Text, Text>{
        @Override
        protected void map(Text key, Text value, Context context)
                throws IOException, InterruptedException {

            context.write(key, value);
        }
    }
    /**
     * 分区函数
     */
    public static class MyPartitioner extends Partitioner<Text, Text>{
        @Override
        public int getPartition(Text key, Text value, int numPartitions) {

            //生成以utf-8方式编码的汉字
            String line = null;
            try {
                line = new String(key.getBytes(),0,key.getLength(),"utf-8");
            } catch (UnsupportedEncodingException e) {
                e.printStackTrace();
            }

            if(line.equals("中国")){
                return 0;
            }else if (line.equals("美国")) {
                return 1;
            }else
                return 2;
        }
    }

    /**
     * reduce阶段
     */
    public static class MyReducer extends Reducer<Text, Text, Text, Text>{
        @Override
        protected void reduce(Text key, Iterable<Text> v2s, Context context)
                throws IOException, InterruptedException {
            for(Text text : v2s){
                context.write(key, text);
            }
        }
    }
}

时间: 2024-11-09 06:12:26

定制Partitioner的相关文章

Hadoop学习笔记—9.Partitioner与自定义Partitioner

一.初步探索Partitioner 1.1 再次回顾Map阶段五大步凑 在第四篇博文<初始MapReduce>中,我们认识了MapReduce的八大步凑,其中在Map阶段总共五个步凑,如下图所示: 其中,step1.3就是一个分区操作.通过前面的学习我们知道Mapper最终处理的键值对<key, value>,是需要送到Reducer去合并的,合并的时候,有相同key的键/值对会送到同一个Reducer节点中进行归并.哪个key到哪个Reducer的分配过程,是由Partition

MapReducer Counter计数器的使用,Combiner ,Partitioner,Sort,Grop的使用,

一:Counter计数器的使用 hadoop计数器:可以让开发人员以全局的视角来审查程序的运行情况以及各项指标,及时做出错误诊断并进行相应处理. 内置计数器(MapReduce相关.文件系统相关和作业调度相关) 也可以通过http://master:50030/jobdetails.jsp查看 /** * 度量,在运行job任务的时候产生了那些j输出.通过计数器可以观察整个计算的过程,运行时关键的指标到底是那些.可以表征程序运行时一些关键的指标. * 计数器 counter 统计敏感单词出现次数

Hadoop Partitioner编程

1.Partitioner是partitioner的基类,如果需要定制Partitioner也需要继承该类. 2. HashPartitioner是mapreduce的默认partitioner.计算方法是 which reducer=(key.hashCode() & Integer.MAX_VALUE) % numReduceTasks,得到当前的目的reducer. 3.说明,Partitioner是在Mapper执行完成,Reducer执行前.它有两个参数,就是Mapper的输出参数,在

hadoop(五) - 分布式计算利器MapReduce加强

一. Partitioner是partitioner的基类,如果需要定制partitioner也需要继承该类. public class DataCount { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(DataCount.class);

Hadoop之——MapReduce实战(二)

转载请注明出处:http://blog.csdn.net/l1028386804/article/details/45957715 MapReduce的老api写法 import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.FileOutputFormat; import org.apache.hadoop.mapred.Jo

Mapreduce-Partition分析(转)

http://blog.oddfoo.net/2011/04/17/mapreduce-partition%E5%88%86%E6%9E%90-2/ Partition所处的位置 Partition位置 Partition主要作用就是将map的结果发送到相应的reduce.这就对partition有两个要求: 1)均衡负载,尽量的将工作均匀的分配给不同的reduce. 2)效率,分配速度一定要快. Mapreduce提供的Partitioner Mapreduce默认的partitioner是H

Hadoop日记Day17---计数器、map规约、分区学习

一.Hadoop计数器 1.1 什么是Hadoop计数器 Haoop是处理大数据的,不适合处理小数据,有些大数据问题是小数据程序是处理不了的,他是一个高延迟的任务,有时处理一个大数据需要花费好几个小时这都是正常的.下面我们说一下Hadoop计数器,Hadoop计数器就相当于我们的日志,而日志可以让我们查看程序运行时的很多状态,而计数器也有这方面的作用.那么就研究一下Hadoop自身的计数器.计数器的程序如代码1.1所示,下面代码还是以内容为“hello you:hell0 me”的单词统计为例.

三 概要模式 2) MR倒排索引、性能分析、搜索干扰词。

二  倒排索引 倒排索引(英语:Inverted index),也常被称为反向索引.置入档案或反向档案,是一种索引方法,被用来存储在全文搜索下某个单词在一个文档或者一组文档中的存储位置的映射.它是文档检索系统中最常用的数据结构. 有两种不同的反向索引形式: 一条记录的水平反向索引(或者反向档案索引)包含每个引用单词的文档的列表. 一个单词的水平反向索引(或者完全反向索引)又包含每个单词在一个文档中的位置.[1] 后者的形式提供了更多的兼容性(比如短语搜索),但是需要更多的时间和空间来创建. 使用

MapReduce的分区

第一部分 分区简述(比如国家由省市来划分) 分区:map的输出经过partitioner分区进行下一步的reducer.一个分区对应一个reducer,就会使得reducer并行化处理任务.默认为1 1. Partitioner是partitioner的基类,如果需要定制partitioner也需要继承该类. 2. HashPartitioner是mapreduce的默认partitioner.计算方法是 which reducer=(key.hashCode() & Integer.MAX_V