Hadoop之-->自定义分组 RawComparator

data:

3  3
3  2
3  2
2  2
2  1
1  1

---------------------

需求:

1 1
2 2
3 3

当第一列相同时候要第二列的最小值

package group;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.net.URI;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/**
 *
 * 自定义分组 k2
 *
 */
public class GroupApp {

    private static final String inputPaths = "hdfs://hadoop:9000/data";
    private static final String OUT_PATH = "hdfs://hadoop:9000/out";

    public static void main(String[] args) throws Exception {

        Configuration conf = new Configuration();

        FileSystem fileSystem = FileSystem.get(new URI(OUT_PATH), conf);
        fileSystem.delete(new Path(OUT_PATH), true);
        Job job = new Job(conf, GroupApp.class.getSimpleName());
        job.setJarByClass(GroupApp.class);

        FileInputFormat.setInputPaths(job, inputPaths);
        job.setInputFormatClass(TextInputFormat.class);

        job.setMapperClass(MyMapper.class);
        job.setMapOutputKeyClass(LongWritable.class);
        job.setMapOutputValueClass(LongWritable.class);

        //设置自定义的分组键
        job.setGroupingComparatorClass(MyGropComparator.class);

        job.setReducerClass(MyReducer.class);
        job.setOutputKeyClass(LongWritable.class);
        job.setOutputValueClass(LongWritable.class);

        FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));

        job.waitForCompletion(true);
    }

    public static class MyMapper extends Mapper<LongWritable, Text, NewK2, LongWritable>{

        @Override
        protected void map(LongWritable key, Text value,
                org.apache.hadoop.mapreduce.Mapper<LongWritable, Text, NewK2, LongWritable>.Context context)
                throws IOException, InterruptedException {

            String[] split = value.toString().split("\t");
            context.write(new NewK2(Long.parseLong(split[0]),Long.parseLong(split[1])),new LongWritable(Long.parseLong(split[1])));

        }
    } 

    /**
     *
     * k2这时候没有相等的,意味着Reduce接收到6个分组
     *
     */
    public static class MyReducer extends Reducer<NewK2, LongWritable, LongWritable, LongWritable>{
        @Override
        protected void reduce(NewK2 key, Iterable<LongWritable> values,
                org.apache.hadoop.mapreduce.Reducer<NewK2, LongWritable, LongWritable, LongWritable>.Context context)
                throws IOException, InterruptedException {

            Long min = Long.MAX_VALUE;
            for (LongWritable longWritable : values) {
                if(longWritable.get()<min){
                    min = longWritable.get();
                }
            }
            context.write(new LongWritable(key.frist),new LongWritable(min));
        }
    }

    /**
     *
     * 自定义排序
     *
     *
     */
    public static class NewK2 implements WritableComparable<NewK2>{
        long  frist;
        long second;

        public NewK2(){}
        public NewK2(long frist, long second) {
            this.frist = frist;
            this.second = second;
        }

        @Override
        public void write(DataOutput out) throws IOException {
            out.writeLong(this.frist);
            out.writeLong(this.frist);
        }

        @Override
        public void readFields(DataInput in) throws IOException {
            this.frist = in.readLong();
            this.second = in.readLong();

        }

        /**
         *  做比较,先按照第一列进行排序,当第一列相同时,按照第二列进行排序
         */
        @Override
        public int compareTo(NewK2 o) {
            long minus = this.frist - o.frist;
            if(minus != 0){
                //不相等
                return (int)minus;
            }
            //第一列相等,让第二列进行处理
            return (int)(this.second - o.second);
        }
    }

    /**
     * 自定义分组
     *
     */

    public static class MyGropComparator implements RawComparator<NewK2>{

        @Override
        public int compare(NewK2 o1, NewK2 o2) {
            return 0;
        }

        /**
         *  b1相当于this  b2相当于o
         *  s1 和s2 表示的是 从很长的字节数组中从哪个位置开始读取值,
         *  l1和l2处理的长度
         */

        @Override
        public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {

            return WritableComparator.compareBytes(b1, s1, 8, b2, s2, 8);
        }

    } 

}
时间: 2024-10-20 09:47:48

Hadoop之-->自定义分组 RawComparator的相关文章

Hadoop mapreduce自定义分组RawComparator

本文发表于本人博客. 今天接着上次[Hadoop mapreduce自定义排序WritableComparable]文章写,按照顺序那么这次应该是讲解自定义分组如何实现,关于操作顺序在这里不多说了,需要了解的可以看看我在博客园的评论,现在开始. 首先我们查看下Job这个类,发现有setGroupingComparatorClass()这个方法,具体源码如下: /** * Define the comparator that controls which keys are grouped toge

Hadoop之——自定义分组比较器实现分组功能

转载请注明出处:http://blog.csdn.net/l1028386804/article/details/46287985 不多说,直接上代码,大家都懂得 1.Mapper类的实现 /** * Mapper类的实现 * @author liuyazhuang * */ static class MyMapper extends Mapper<LongWritable, Text, NewK2, LongWritable>{ protected void map(LongWritable

hadoop的自定义分组实现 (Partition机制)

hadoop开发中我们会遇到类似这样的问题,比如 如何将不同省份的手机号分别输出到不同的文件中,本片文章将对hadoop内置的Partition类进行重写以解决这个问题. MapReduce的使用者通常会指定Reduce任务和Reduce任务输出文件的数量(R).用户在中间key上使用分区函数来对数据进行分区,之后在输入到后续任务执行进程.Hadoop中自带了一个默认的分区类HashPartitioner,它继承了Partitioner类,提供了一个getPartition的方法,它的定义如下所

【Hadoop】Hadoop MR 自定义分组 Partition机制

1.概念 2.Hadoop默认分组机制--所有的Key分到一个组,一个Reduce任务处理 3.代码示例 FlowBean package com.ares.hadoop.mr.flowgroup; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.WritableComparable; public class FlowBean

MapReduce自定义分组Group

一:背景 在上一篇文章中我们可以对两列数据进行排序,即完成了当第一列相同时第二列数据升序排列的功能,现在我们需要进一步完善一个功能,那就是当第一列相同时求出第二列的最小值或最大值,Hadoop提供了自定义分组的功能,可以满足我们的需求. 二:技术实现 我们先来看看需求 #当第一列不相等时,第一列按升序排列,当第一列相等时,求出对应第二列的最小值 [java] view plain copy 3   3 3   2 3   1 2   2 2   1 1   1 输出结果应该是: [java] v

关于MapReduce中自定义分组类(三)

Job类  /**    * Define the comparator that controls which keys are grouped together    * for a single call to    * {@link Reducer#reduce(Object, Iterable,    *                       org.apache.hadoop.mapreduce.Reducer.Context)}    * @param cls the raw

自定义分组

job.setGroupingComparatorClass(MyGroupingComparator.class); //按照第一列进行分组,然后找出每个分组中的第二列中的最小值 为什么要自定义分组? 业务要求分组是按照第一列分组,但是NewK2的比较规则决定了不能按照第一列分.只能自定义分组比较器. 1 package group; 2 3 import java.io.DataInput; 4 import java.io.DataOutput; 5 import java.io.IOEx

gridcontrol自定义分组表头显示

gridcontrol分组后可以显示分组列名.值 以及汇总项,但是想再显示其他列的值就需要使用自定义分组头了,如下所示: private void bandedGridView2_CustomDrawGroupRow(object sender, DevExpress.XtraGrid.Views.Base.RowObjectCustomDrawEventArgs e) { GridGroupRowInfo GridGroupRowInfo = e.Info as GridGroupRowInf

commoncrawl 源码库是用于 Hadoop 的自定义 InputFormat 配送实现

commoncrawl 源码库是用于 Hadoop 的自定义 InputFormat 配送实现. Common Crawl 提供一个示例程序 BasicArcFileReaderSample.java (位于 org.commoncrawl.samples) 用来配置 InputFormat. commoncrawl / commoncrawl Watch414 Fork86 CommonCrawl Project Repository — More... http://www.commoncr