Hadoop mapreduce自定义分组RawComparator

本文发表于本人博客

今天接着上次【Hadoop mapreduce自定义排序WritableComparable】文章写,按照顺序那么这次应该是讲解自定义分组如何实现,关于操作顺序在这里不多说了,需要了解的可以看看我在博客园的评论,现在开始。

首先我们查看下Job这个类,发现有setGroupingComparatorClass()这个方法,具体源码如下:

  /**
   * Define the comparator that controls which keys are grouped together
   * for a single call to
   * {@link Reducer#reduce(Object, Iterable,
   *                       org.apache.hadoop.mapreduce.Reducer.Context)}
   * @param cls the raw comparator to use
   * @throws IllegalStateException if the job is submitted
   */
  public void setGroupingComparatorClass(Class<? extends RawComparator> cls
                                         ) throws IllegalStateException {
    ensureState(JobState.DEFINE);
    conf.setOutputValueGroupingComparator(cls);
  }

从方法的源码可以看出这个方法是定义自定义键分组功能。设置这个自定义分组类必须满足extends RawComparator,那我们可以看下这个类的源码:

/**
 * <p>
 * A {@link Comparator} that operates directly on byte representations of
 * objects.
 * </p>
 * @param <T>
 * @see DeserializerComparator
 */
public interface RawComparator<T> extends Comparator<T> {
  public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2);
}

然而这个RawComparator是泛型继承Comparator接口的,简单看了下那我们来自定义一个类继承RawComparator,代码如下:

public class MyGrouper implements RawComparator<SortAPI> {
    @Override
    public int compare(SortAPI o1, SortAPI o2) {
        return (int)(o1.first - o2.first);
    }
    @Override
    public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
        int compareBytes = WritableComparator.compareBytes(b1, s1, 8, b2, s2, 8);
        return compareBytes;
    }

}

源码中SortAPI是上节自定义排序中的定义对象,第一个方法从注释可以看出是比较2个参数的大小,返回的是自然整数;第二个方法是在反序列化时比较,所以需要是用字节比较。接下来我们继续看看自定义MyMapper类:

public class MyMapper extends Mapper<LongWritable, Text, SortAPI, LongWritable> {
    @Override
    protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException {
        String[] splied = value.toString().split("\t");
        try {
            long first = Long.parseLong(splied[0]);
            long second = Long.parseLong(splied[1]);
            context.write(new SortAPI(first,second), new LongWritable(1));
        } catch (Exception e) {
            System.out.println(e.getMessage());
        }
    }
}

自定义MyReduce类:

public class MyReduce extends Reducer<SortAPI, LongWritable, LongWritable, LongWritable> {
    @Override
    protected void reduce(SortAPI key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
        context.write(new LongWritable(key.first), new LongWritable(key.second));
    }

}

自定义SortAPI类:

public class SortAPI implements WritableComparable<SortAPI> {
    public Long first;
    public Long second;
    public SortAPI(){

    }
    public SortAPI(long first,long second){
        this.first = first;
        this.second = second;
    }

    @Override
    public int compareTo(SortAPI o) {
        return (int) (this.first - o.first);
    }

    @Override
    public void write(DataOutput out) throws IOException {
        out.writeLong(first);
        out.writeLong(second);
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        this.first = in.readLong();
        this.second = in.readLong();

    }

    @Override
    public int hashCode() {
        return this.first.hashCode() + this.second.hashCode();
    }

    @Override
    public boolean equals(Object obj) {
        if(obj instanceof SortAPI){
            SortAPI o = (SortAPI)obj;
            return this.first == o.first && this.second == o.second;
        }
        return false;
    }

    @Override
    public String toString() {
        return "输出:" + this.first + ";" + this.second;
    }

}

接下来准备数据,数据如下:

1       2
1       1
3       0
3       2
2       2
1       2

上传至hdfs://hadoop-master:9000/grouper/input/test.txt,main代码如下:

public class Test {
    static final String OUTPUT_DIR = "hdfs://hadoop-master:9000/grouper/output/";
    static final String INPUT_DIR = "hdfs://hadoop-master:9000/grouper/input/test.txt";
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = new Job(conf, Test.class.getSimpleName());
        job.setJarByClass(Test.class);
        deleteOutputFile(OUTPUT_DIR);
        //1设置输入目录
        FileInputFormat.setInputPaths(job, INPUT_DIR);
        //2设置输入格式化类
        job.setInputFormatClass(TextInputFormat.class);
        //3设置自定义Mapper以及键值类型
        job.setMapperClass(MyMapper.class);
        job.setMapOutputKeyClass(SortAPI.class);
        job.setMapOutputValueClass(LongWritable.class);
        //4分区
        job.setPartitionerClass(HashPartitioner.class);
        job.setNumReduceTasks(1);
        //5排序分组
        job.setGroupingComparatorClass(MyGrouper.class);
        //6设置在一定Reduce以及键值类型
        job.setReducerClass(MyReduce.class);
        job.setOutputKeyClass(LongWritable.class);
        job.setOutputValueClass(LongWritable.class);
        //7设置输出目录
        FileOutputFormat.setOutputPath(job, new Path(OUTPUT_DIR));
        //8提交job
        job.waitForCompletion(true);
    }

    static void deleteOutputFile(String path) throws Exception{
        Configuration conf = new Configuration();
        FileSystem fs = FileSystem.get(new URI(INPUT_DIR),conf);
        if(fs.exists(new Path(path))){
            fs.delete(new Path(path));
        }
    }
}

执行代码,然后在节点上用终端输入:hadoop fs -text /grouper/output/part-r-00000查看结果:

1       2
2       2
3       0

接下来我们修改下SortAPI类的compareTo()方法:

    @Override
    public int compareTo(SortAPI o) {
        long mis = (this.first - o.first) * -1;
        if(mis != 0 ){
            return (int)mis;
        }
        else{
            return (int)(this.second - o.second);
        }
    }

再次执行并查看/grouper/output/part-r-00000文件:

3       0
2       2
1       1

这样我们就得出了同样的数据分组结果会受到排序算法的影响,比如排序是倒序那么分组也是先按照倒序数据源进行分组输出。我们还可以在map函数以及reduce函数中打印记录(过程省略)这样经过对比也得出分组阶段:键值对中key相同(即compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2)方法返回0)的则为一组,当前组再按照顺序选择第一个往缓冲区输出(也许会存储到硬盘)。其它的相同key的键值对就不会再往缓冲区输出了。在百度上检索到这边文章,其中它的分组是把map函数输出的value全部迭代到同一个key中,就相当于上面{key,value}:{1,{2,1,2}},这个结果跟最开始没有自定义分组时是一样的,我们可以在reduce函数输出Iterable<LongWritable> values进行查看,其实我觉得这样的才算是分组吧就像数据查询一样。

在这里我们应该要弄懂分组与分区的区别。分区是对输出结果文件进行分类拆分文件以便更好查看,比如一个输出文件包含所有状态的http请求,那么为了方便查看通过分区把请求状态分成几个结果文件。分组就是把一些相同键的键值对进行计算减少输出;分区之后数据全部还是照样输出到reduce端,而分组的话就有所减少了;当然这2个步骤也是不同的阶段执行。

这次先到这里。坚持记录点点滴滴!

时间: 2024-10-06 06:48:03

Hadoop mapreduce自定义分组RawComparator的相关文章

Hadoop之--&gt;自定义分组 RawComparator

data: 3 33 23 22 22 11 1 --------------------- 需求: 1 12 23 3 当第一列相同时候要第二列的最小值 package group; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.net.URI; import org.apache.hadoop.conf.Configuration; import org

Hadoop之——自定义分组比较器实现分组功能

转载请注明出处:http://blog.csdn.net/l1028386804/article/details/46287985 不多说,直接上代码,大家都懂得 1.Mapper类的实现 /** * Mapper类的实现 * @author liuyazhuang * */ static class MyMapper extends Mapper<LongWritable, Text, NewK2, LongWritable>{ protected void map(LongWritable

MapReduce自定义分组Group

一:背景 在上一篇文章中我们可以对两列数据进行排序,即完成了当第一列相同时第二列数据升序排列的功能,现在我们需要进一步完善一个功能,那就是当第一列相同时求出第二列的最小值或最大值,Hadoop提供了自定义分组的功能,可以满足我们的需求. 二:技术实现 我们先来看看需求 #当第一列不相等时,第一列按升序排列,当第一列相等时,求出对应第二列的最小值 [java] view plain copy 3   3 3   2 3   1 2   2 2   1 1   1 输出结果应该是: [java] v

[Hadoop] - Mapreduce自定义Counter

在Hadoop的MR程序开发中,经常需要统计一些map/reduce的运行状态信息,这个时候我们可以通过自定义Counter来实现,这个实现的方式是不是通过配置信息完成的,而是通过代码运行时检查完成的. 1.创建一个自己的Counter枚举类. enum PROCESS_COUNTER { BAD_RECORDS, BAD_GROUPS; } 2.在需要统计的地方,比如map或者reduce阶段进行下列操作. context.getCounter(PROCESS_COUNTER.BAD_RECO

hadoop mapreduce 自定义InputFormat

很久以前为了满足公司的需求写过一些自定义InputFormat,今天有时间拿出来记一下     需求是这样的,如果如果使用FileInputFormat作为输入,是按照行来读取日志的,也就是按照\n来区分每一条日志的,而由于一条日志中记录着一些错误信息,比如java的异常信息,这些信息本身就带有换行符,如果还是按照\n进行区分每一条日志的话明显是错误的,由于我们日志的特殊性,将以"]@\n"作为区分日志的标识.     接下来就来看看如何自定义InputFormat,还是不画类图了,我

【Hadoop】Hadoop MR 自定义分组 Partition机制

1.概念 2.Hadoop默认分组机制--所有的Key分到一个组,一个Reduce任务处理 3.代码示例 FlowBean package com.ares.hadoop.mr.flowgroup; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.WritableComparable; public class FlowBean

hadoop MapReduce自定义分区Partition输出各运营商的手机号码

MapReduce和自定义Partition MobileDriver主类 package Partition; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; public class MobileDriver { public static void main(String[] args) { String[] paths = {"F:\\mobile.txt", "F

Hadoop mapreduce自定义排序WritableComparable

本文发表于本人博客. 今天继续写练习题,上次对分区稍微理解了一下,那根据那个步骤分区.排序.分组.规约来的话,今天应该是要写个排序有关的例子了,那好现在就开始! 说到排序我们可以查看下hadoop源码里面的WordCount例子中对LongWritable类型定义,它实现抽象接口WritableComparable,代码如下: public interface WritableComparable<T> extends Writable, Comparable<T> { } pub

hadoop的自定义分组实现 (Partition机制)

hadoop开发中我们会遇到类似这样的问题,比如 如何将不同省份的手机号分别输出到不同的文件中,本片文章将对hadoop内置的Partition类进行重写以解决这个问题. MapReduce的使用者通常会指定Reduce任务和Reduce任务输出文件的数量(R).用户在中间key上使用分区函数来对数据进行分区,之后在输入到后续任务执行进程.Hadoop中自带了一个默认的分区类HashPartitioner,它继承了Partitioner类,提供了一个getPartition的方法,它的定义如下所