Hadoop之-->自定义排序

data:

3  3
3  2
3  1
2  2
2  1
1  1

---------------------

需求:

1  1
2  1
2  2
3  1
3  2
3  3

package sort;

import java.io.IOException;
import java.net.URI;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/**
 *
 *
 *
 */
public class SortApp {

    private static final String inputPaths = "hdfs://hadoop:9000/data";
    private static final String OUT_PATH = "hdfs://hadoop:9000/out";

    public static void main(String[] args) throws Exception {

        Configuration conf = new Configuration();

        FileSystem fileSystem = FileSystem.get(new URI(OUT_PATH), conf);
        fileSystem.delete(new Path(OUT_PATH), true);
        Job job = new Job(conf, SortApp.class.getSimpleName());
        job.setJarByClass(SortApp.class);

        FileInputFormat.setInputPaths(job, inputPaths);
        job.setInputFormatClass(TextInputFormat.class);

        job.setMapperClass(MyMapper.class);
        job.setMapOutputKeyClass(LongWritable.class);
        job.setMapOutputValueClass(LongWritable.class);

        job.setReducerClass(MyReducer.class);
        job.setOutputKeyClass(LongWritable.class);
        job.setOutputValueClass(LongWritable.class);

        FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));

        job.waitForCompletion(true);
    }

    public static class MyMapper extends Mapper<LongWritable, Text, LongWritable, LongWritable>{

        @Override
        protected void map(LongWritable key, Text value,
                org.apache.hadoop.mapreduce.Mapper<LongWritable, Text, LongWritable, LongWritable>.Context context)
                throws IOException, InterruptedException {

            String[] split = value.toString().split("\t");
            context.write(new LongWritable(Long.parseLong(split[0])),new LongWritable(Long.parseLong(split[1])));

        }
    }

    public static class MyReducer extends Reducer<LongWritable, LongWritable, LongWritable, LongWritable>{
        @Override
        protected void reduce(LongWritable key, Iterable<LongWritable> values,
                org.apache.hadoop.mapreduce.Reducer<LongWritable, LongWritable, LongWritable, LongWritable>.Context context)
                throws IOException, InterruptedException {

            for (LongWritable times : values) {
                context.write(key, times);
            }
        }
    }
}

执行结果:k2排序了,V2不参与排序

如何使第二列也排序呢?

则需要将第二列也作为k2,进行排序,这时候需要自定义序列化类型

package sort;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.net.URI;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/**
 *
 *
 *
 */
public class SortApp {

    private static final String inputPaths = "hdfs://hadoop:9000/data";
    private static final String OUT_PATH = "hdfs://hadoop:9000/out";

    public static void main(String[] args) throws Exception {

        Configuration conf = new Configuration();

        FileSystem fileSystem = FileSystem.get(new URI(OUT_PATH), conf);
        fileSystem.delete(new Path(OUT_PATH), true);
        Job job = new Job(conf, SortApp.class.getSimpleName());
        job.setJarByClass(SortApp.class);

        FileInputFormat.setInputPaths(job, inputPaths);
        job.setInputFormatClass(TextInputFormat.class);

        job.setMapperClass(MyMapper.class);
        job.setMapOutputKeyClass(LongWritable.class);
        job.setMapOutputValueClass(LongWritable.class);

        job.setReducerClass(MyReducer.class);
        job.setOutputKeyClass(LongWritable.class);
        job.setOutputValueClass(LongWritable.class);

        FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));

        job.waitForCompletion(true);
    }

    public static class MyMapper extends Mapper<LongWritable, Text, NewK2, LongWritable>{

        @Override
        protected void map(LongWritable key, Text value,
                org.apache.hadoop.mapreduce.Mapper<LongWritable, Text, NewK2, LongWritable>.Context context)
                throws IOException, InterruptedException {

            String[] split = value.toString().split("\t");
            context.write(new NewK2(Long.parseLong(split[0]),Long.parseLong(split[1])),new LongWritable(Long.parseLong(split[1])));

        }
    } 

    /**
     *
     * k2这时候没有相等的,意味着Reduce接收到6个分组
     *
     */
    public static class MyReducer extends Reducer<NewK2, LongWritable, LongWritable, LongWritable>{
        @Override
        protected void reduce(NewK2 key, Iterable<LongWritable> values,
                org.apache.hadoop.mapreduce.Reducer<NewK2, LongWritable, LongWritable, LongWritable>.Context context)
                throws IOException, InterruptedException {

                context.write(new LongWritable(key.frist),new LongWritable(key.second));
        }
    }

    /**
     *
     * 自定义排序
     *
     *
     */
    public static class NewK2 implements WritableComparable<NewK2>{
        long  frist;
        long second;

        public NewK2(){}
        public NewK2(long frist, long second) {
            this.frist = frist;
            this.second = second;
        }

        @Override
        public void write(DataOutput out) throws IOException {
            out.writeLong(this.frist);
            out.writeLong(this.frist);
        }

        @Override
        public void readFields(DataInput in) throws IOException {
            this.frist = in.readLong();
            this.second = in.readLong();

        }

        /**
         *  做比较,先按照第一列进行排序,当第一列相同时,按照第二列进行排序
         */
        @Override
        public int compareTo(NewK2 o) {
            long minus = this.frist - o.frist;
            if(minus != 0){
                //不相等
                return (int)minus;
            }
            //第一列相等,让第二列进行处理
            return (int)(this.second - o.second);
        }
    }
}

时间: 2024-12-03 17:00:15

Hadoop之-->自定义排序的相关文章

hadoop 学习自定义排序

(网易云课程hadoop大数据实战学习笔记) 自定义排序,是基于k2的排序,设现有以下一组数据,分别表示矩形的长和宽,先按照面积的升序进行排序. 99 66 78 11 54 现在需要重新定义数据类型,MR的key值必须继承WritableComparable接口,因此定义RectangleWritable数据类型如下: import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import

Hadoop mapreduce自定义排序WritableComparable

本文发表于本人博客. 今天继续写练习题,上次对分区稍微理解了一下,那根据那个步骤分区.排序.分组.规约来的话,今天应该是要写个排序有关的例子了,那好现在就开始! 说到排序我们可以查看下hadoop源码里面的WordCount例子中对LongWritable类型定义,它实现抽象接口WritableComparable,代码如下: public interface WritableComparable<T> extends Writable, Comparable<T> { } pub

hadoop的自定义排序

package com.qq; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; //import java.net.URI; import org.apache.hadoop.conf.Configuration; //import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import or

【Hadoop】Hadoop MR 自定义排序

1.概念 2.代码示例 FlowSort package com.ares.hadoop.mr.flowsort; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; i

Hadoop之——自定义排序算法实现排序功能

要求首先按照第一列升序排列,当第一列相同时,第二列升序排列:不多说直接上代码 1.Mapper类的实现 /** * Mapper类的实现 * @author liuyazhuang * */ static class MyMapper extends Mapper<LongWritable, Text, NewK2, LongWritable>{ protected void map(LongWritable key, Text value, org.apache.hadoop.mapredu

Hadoop mapreduce自定义分组RawComparator

本文发表于本人博客. 今天接着上次[Hadoop mapreduce自定义排序WritableComparable]文章写,按照顺序那么这次应该是讲解自定义分组如何实现,关于操作顺序在这里不多说了,需要了解的可以看看我在博客园的评论,现在开始. 首先我们查看下Job这个类,发现有setGroupingComparatorClass()这个方法,具体源码如下: /** * Define the comparator that controls which keys are grouped toge

hadoop提交作业自定义排序和分组

现有数据如下: 3 3 3 2 3 1 2 2 2 1 1 1 要求为: 先按第一列从小到大排序,如果第一列相同,按第二列从小到大排序 如果是hadoop默认的排序方式,只能比较key,也就是第一列,而value是无法参与排序的 这时候就需要用到自定义的排序规则 解决思路: 自定义数据类型,将原本的key和value都包装进去 将这个数据类型当做key,这样就比较key的时候就可以包含第一列和第二列的值了 自定义数据类型NewK2如下: //要实现自定义的排序规则必须实现WritableComp

自定义排序及Hadoop序列化

自定义排序 将两列数据进行排序,第一列按照升序排列,当第一列相同时,第二列升序排列. 在map和reduce阶段进行排序时,比较的是k2.v2是不参与排序比较的.如果要想让v2也进行排序,需要把k2和v2组装成新的类,作为k2,才能参与比较. 1 package sort; 2 3 import java.io.DataInput; 4 import java.io.DataOutput; 5 import java.io.IOException; 6 import java.net.URI;

Hadoop学习之路(7)MapReduce自定义排序

本文测试文本: tom 20 8000 nancy 22 8000 ketty 22 9000 stone 19 10000 green 19 11000 white 39 29000 socrates 30 40000    MapReduce中,根据key进行分区.排序.分组MapReduce会按照基本类型对应的key进行排序,如int类型的IntWritable,long类型的LongWritable,Text类型,默认升序排序   为什么要自定义排序规则?现有需求,需要自定义key类型,