【Hadoop】Hadoop MR 自定义分组 Partition机制

1、概念

2、Hadoop默认分组机制--所有的Key分到一个组,一个Reduce任务处理

3、代码示例

FlowBean

package com.ares.hadoop.mr.flowgroup;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;

public class FlowBean implements WritableComparable<FlowBean>{
    private String phoneNum;
    private long upFlow;
    private long downFlow;
    private long sumFlow;

    public FlowBean() {
        // TODO Auto-generated constructor stub
    }
//    public FlowBean(String phoneNum, long upFlow, long downFlow, long sumFlow) {
//        super();
//        this.phoneNum = phoneNum;
//        this.upFlow = upFlow;
//        this.downFlow = downFlow;
//        this.sumFlow = sumFlow;
//    }

    public String getPhoneNum() {
        return phoneNum;
    }

    public void setPhoneNum(String phoneNum) {
        this.phoneNum = phoneNum;
    }

    public long getUpFlow() {
        return upFlow;
    }

    public void setUpFlow(long upFlow) {
        this.upFlow = upFlow;
    }

    public long getDownFlow() {
        return downFlow;
    }

    public void setDownFlow(long downFlow) {
        this.downFlow = downFlow;
    }

    public long getSumFlow() {
        return sumFlow;
    }

    public void setSumFlow(long sumFlow) {
        this.sumFlow = sumFlow;
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        // TODO Auto-generated method stub
        phoneNum = in.readUTF();
        upFlow = in.readLong();
        downFlow = in.readLong();
        sumFlow = in.readLong();
    }

    @Override
    public void write(DataOutput out) throws IOException {
        // TODO Auto-generated method stub
        out.writeUTF(phoneNum);
        out.writeLong(upFlow);
        out.writeLong(downFlow);
        out.writeLong(sumFlow);
    }

    @Override
    public String toString() {
        return "" + phoneNum + "\t" + upFlow + "\t" + downFlow + "\t" + sumFlow;
    }

    @Override
    public int compareTo(FlowBean o) {
        // TODO Auto-generated method stub
        return sumFlow>o.getSumFlow()?-1:1;
    }

}

FlowGroup

package com.ares.hadoop.mr.flowgroup;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.log4j.Logger;

import com.ares.hadoop.mr.exception.LineException;
import com.ares.hadoop.mr.flowgroup.FlowBean;;

public class FlowGroup extends Configured implements Tool {
    private static final Logger LOGGER = Logger.getLogger(FlowGroup.class);
    enum Counter {
        LINESKIP
    }

    public static class FlowGroupMapper extends Mapper<LongWritable, Text,
        Text, FlowBean> {
        private String line;
        private int length;
        private final static char separator = ‘\t‘;

        private String phoneNum;
        private long upFlow;
        private long downFlow;
        //private long sumFlow;

        private Text text = new Text();
        private FlowBean flowBean = new FlowBean();

        @Override
        protected void map(
                LongWritable key,
                Text value,
                Mapper<LongWritable, Text, Text, FlowBean>.Context context)
                throws IOException, InterruptedException {
            // TODO Auto-generated method stub
            //super.map(key, value, context);
            String errMsg;
            try {
                line = value.toString();
                String[] fields = StringUtils.split(line, separator);
                length = fields.length;
                if (length != 11) {
                    throw new LineException(key.get() + ", " + line + " LENGTH INVALID, IGNORE...");
                }

                phoneNum = fields[1];
                upFlow = Long.parseLong(fields[length-3]);
                downFlow = Long.parseLong(fields[length-2]);
                //sumFlow = upFlow + downFlow;

                text.set(phoneNum);
                flowBean.setPhoneNum(phoneNum);
                flowBean.setUpFlow(upFlow);
                flowBean.setDownFlow(downFlow);
                //flowBean.setSumFlow(sumFlow);

                context.write(text, flowBean);
            } catch (LineException e) {
                // TODO: handle exception
                LOGGER.error(e);
                System.out.println(e);
                context.getCounter(Counter.LINESKIP).increment(1);
                return;
            } catch (NumberFormatException e) {
                // TODO: handle exception
                errMsg = key.get() + ", " + line + " FLOW DATA INVALID, IGNORE...";
                LOGGER.error(errMsg);
                System.out.println(errMsg);
                context.getCounter(Counter.LINESKIP).increment(1);
                return;
            } catch (Exception e) {
                // TODO: handle exception
                LOGGER.error(e);
                System.out.println(e);
                context.getCounter(Counter.LINESKIP).increment(1);
                return;
            }
        }
    }

    public static class FlowGroupReducer extends Reducer<Text, FlowBean,
        FlowBean, NullWritable> {

        private FlowBean flowBean = new FlowBean();

        @Override
        protected void reduce(
                Text key,
                Iterable<FlowBean> values,
                Reducer<Text, FlowBean, FlowBean, NullWritable>.Context context)
                throws IOException, InterruptedException {
            // TODO Auto-generated method stub
            //super.reduce(arg0, arg1, arg2);
            long upFlowCounter = 0;
            long downFlowCounter = 0;

            for (FlowBean flowBean : values) {
                upFlowCounter += flowBean.getUpFlow();
                downFlowCounter += flowBean.getDownFlow();
            }
            flowBean.setPhoneNum(key.toString());
            flowBean.setUpFlow(upFlowCounter);
            flowBean.setDownFlow(downFlowCounter);
            flowBean.setSumFlow(upFlowCounter + downFlowCounter);

            context.write(flowBean, NullWritable.get());
        }
    }

    @Override
    public int run(String[] args) throws Exception {
        // TODO Auto-generated method stub
        String errMsg = "FlowGroup: TEST STARTED...";
        LOGGER.debug(errMsg);
        System.out.println(errMsg);

        Configuration conf = new Configuration();
        //FOR Eclipse JVM Debug
        //conf.set("mapreduce.job.jar", "flowsum.jar");
        Job job = Job.getInstance(conf);

        // JOB NAME
        job.setJobName("FlowGroup");

        // JOB MAPPER & REDUCER
        job.setJarByClass(FlowGroup.class);
        job.setMapperClass(FlowGroupMapper.class);
        job.setReducerClass(FlowGroupReducer.class);

        // JOB PARTITION
        job.setPartitionerClass(FlowGroupPartition.class);

        // JOB REDUCE TASK NUMBER
        job.setNumReduceTasks(5);

        // MAP & REDUCE
        job.setOutputKeyClass(FlowBean.class);
        job.setOutputValueClass(NullWritable.class);
        // MAP
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(FlowBean.class);

        // JOB INPUT & OUTPUT PATH
        //FileInputFormat.addInputPath(job, new Path(args[0]));
        FileInputFormat.setInputPaths(job, args[1]);
        FileOutputFormat.setOutputPath(job, new Path(args[2]));

        // VERBOSE OUTPUT
        if (job.waitForCompletion(true)) {
            errMsg = "FlowGroup: TEST SUCCESSFULLY...";
            LOGGER.debug(errMsg);
            System.out.println(errMsg);
            return 0;
        } else {
            errMsg = "FlowGroup: TEST FAILED...";
            LOGGER.debug(errMsg);
            System.out.println(errMsg);
            return 1;
        }            

    }

    public static void main(String[] args) throws Exception {
        if (args.length != 3) {
            String errMsg = "FlowGroup: ARGUMENTS ERROR";
            LOGGER.error(errMsg);
            System.out.println(errMsg);
            System.exit(-1);
        }

        int result = ToolRunner.run(new Configuration(), new FlowGroup(), args);
        System.exit(result);
    }
}

FlowGroupPartition

package com.ares.hadoop.mr.flowgroup;

import java.util.HashMap;

import org.apache.hadoop.mapreduce.Partitioner;

public class FlowGroupPartition<KEY, VALUE> extends Partitioner<KEY, VALUE>{
    private static HashMap<String, Integer> groupMap = new HashMap<String, Integer>();
    static {
        groupMap.put("135", 0);
        groupMap.put("136", 1);
        groupMap.put("137", 2);
        groupMap.put("138", 3);
    }

    @Override
    public int getPartition(KEY key, VALUE value, int numPartitions) {
        // TODO Auto-generated method stub
        return (groupMap.get(key.toString().substring(0, 3)) == null)?4:
            groupMap.get(key.toString().substring(0, 3));
    }

}
时间: 2024-08-26 01:30:27

【Hadoop】Hadoop MR 自定义分组 Partition机制的相关文章

MapReduce自定义分组Group

一:背景 在上一篇文章中我们可以对两列数据进行排序,即完成了当第一列相同时第二列数据升序排列的功能,现在我们需要进一步完善一个功能,那就是当第一列相同时求出第二列的最小值或最大值,Hadoop提供了自定义分组的功能,可以满足我们的需求. 二:技术实现 我们先来看看需求 #当第一列不相等时,第一列按升序排列,当第一列相等时,求出对应第二列的最小值 [java] view plain copy 3   3 3   2 3   1 2   2 2   1 1   1 输出结果应该是: [java] v

一脸懵逼学习Hadoop中的MapReduce程序中自定义分组的实现

1:首先搞好实体类对象: write 是把每个对象序列化到输出流,readFields是把输入流字节反序列化,实现WritableComparable,Java值对象的比较:一般需要重写toString(),hashCode(),equals()方法 1 package com.areapartition; 2 3 import java.io.DataInput; 4 import java.io.DataOutput; 5 import java.io.IOException; 6 7 im

结合手机上网流量业务来说明Hadoop中的自定义数据类型(序列化、反序列化机制)

大家都知道,Hadoop中为Key的数据类型必须实现WritableComparable接口,而Value的数据类型只需要实现Writable接口即可:能做Key的一定可以做Value,能做Value的未必能做Key.但是具体应该怎么应用呢?--本篇文章将结合手机上网流量业务进行分析. 先介绍一下业务场景:统计每个用户的上行流量和,下行流量和,以及总流量和. 本次描述所用数据: 日志格式描述: 日志flowdata.txt中的具体数据: 接下来贴出详细代码,代码中含有详细注释,从代码中可以看出,

Hadoop之——自定义分组比较器实现分组功能

转载请注明出处:http://blog.csdn.net/l1028386804/article/details/46287985 不多说,直接上代码,大家都懂得 1.Mapper类的实现 /** * Mapper类的实现 * @author liuyazhuang * */ static class MyMapper extends Mapper<LongWritable, Text, NewK2, LongWritable>{ protected void map(LongWritable

hadoop提交作业自定义排序和分组

现有数据如下: 3 3 3 2 3 1 2 2 2 1 1 1 要求为: 先按第一列从小到大排序,如果第一列相同,按第二列从小到大排序 如果是hadoop默认的排序方式,只能比较key,也就是第一列,而value是无法参与排序的 这时候就需要用到自定义的排序规则 解决思路: 自定义数据类型,将原本的key和value都包装进去 将这个数据类型当做key,这样就比较key的时候就可以包含第一列和第二列的值了 自定义数据类型NewK2如下: //要实现自定义的排序规则必须实现WritableComp

Hadoop mapreduce自定义分组RawComparator

本文发表于本人博客. 今天接着上次[Hadoop mapreduce自定义排序WritableComparable]文章写,按照顺序那么这次应该是讲解自定义分组如何实现,关于操作顺序在这里不多说了,需要了解的可以看看我在博客园的评论,现在开始. 首先我们查看下Job这个类,发现有setGroupingComparatorClass()这个方法,具体源码如下: /** * Define the comparator that controls which keys are grouped toge

Hadoop之--&gt;自定义分组 RawComparator

data: 3 33 23 22 22 11 1 --------------------- 需求: 1 12 23 3 当第一列相同时候要第二列的最小值 package group; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.net.URI; import org.apache.hadoop.conf.Configuration; import org

3 weekend110的hadoop中的RPC框架实现机制 + hadoop中的RPC应用实例demo

hadoop中的RPC框架实现机制 RPC是Remotr Process Call, 进程间的远程过程调用,不是在一个jvm里. 即,Controller拿不到Service的实例对象. hadoop中的RPC应用实例demo 在windows是调用端,在linux里是服务端. 在这里,需要LoginServiceinterface.java 停止 出错误了,很明显. 这是个很好的思考题?

hadoop学习;自定义Input/OutputFormat;类引用mapreduce.mapper;三种模式

hadoop分割与读取输入文件的方式被定义在InputFormat接口的一个实现中,TextInputFormat是默认的实现,当你想要一次获取一行内容作为输入数据时又没有确定的键,从TextInputFormat返回的键为每行的字节偏移量,但目前没看到用过 以前在mapper中曾使用LongWritable(键)和Text(值),在TextInputFormat中,因为键是字节偏移量,可以是LongWritable类型,而当使用KeyValueTextInputFormat时,第一个分隔符前后