MapReduce程序之二次排序与多次排序

[toc]


MapReduce程序之二次排序与多次排序

需求

有下面的数据:

cookieId    time    url
2   12:12:34    2_hao123
3   09:10:34    3_baidu
1   15:02:41    1_google
3   22:11:34    3_sougou
1   19:10:34    1_baidu
2   15:02:41    2_google
1   12:12:34    1_hao123
3   23:10:34    3_soso
2   05:02:41    2_google

假如我们现在的需求是先按 cookieId 排序,然后按 time 排序,以便按 session 切分日志,排序后的结果如下:

---------------------------------
1      12:12:34        1_hao123
1      15:02:41        1_google
1      19:10:34        1_baidu
---------------------------------
2      05:02:41        2_google
2      12:12:34        2_hao123
2      15:02:41        2_google
---------------------------------
3      09:10:34        3_baidu
3      22:11:34        3_sougou
3      23:10:34        3_soso

要求使用MapReduce程序实现。

程序思路分析

Map函数:
/**
 * Map函数,解析每一行记录为AccessLogWritable,这样Map输出的时候就可以根据
 * AccessLogWritable对象中的两个字段进行排序,从而实现前面要求的二次排序需求
 * 也就是说,排序依旧是依赖Map输出时的排序,但是规则是我们在AccessLogWritable中定义的
 */

 Reduce函数:
/**
 * 经过shuffle后到达Reducer的数据已经是有序的,所以直接写出即可
 */

所以为了进行多个数据的比较,我们需要自定义key来作为Map输出的key。

MapReduce程序

关于如何进行数据的排序,思路已经在代码注释中有说明,不过需要注意的是,这里使用了前面开发的Job工具类来开发驱动程序。

SecondSortJob.java

package com.uplooking.bigdata.mr.secondsort;

import com.uplooking.bigdata.common.utils.MapReduceJobUtil;
import com.uplooking.bigdata.mr.sort.SortJob;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

import java.io.IOException;

/**
 * MapReduce排序之二次排序
 */
public class SecondSortJob {

    /**
     * 驱动程序,使用工具类使用Job
     * @param args
     */
    public static void main(String[] args) throws Exception {
        if (args == null || args.length < 2) {
            System.err.println("Parameter Errors! Usages:<inputpath> <outputpath>");
            System.exit(-1);
        }

        Job job = MapReduceJobUtil.buildJob(new Configuration(),
                SecondSortJob.class,
                args[0],
                TextInputFormat.class,
                SecondSortMapper.class,
                AccessLogWritable.class,
                NullWritable.class,
                new Path(args[1]),
                TextOutputFormat.class,
                SecondSortReducer.class,
                AccessLogWritable.class,
                NullWritable.class);

        // ReduceTask必须设置为1
        job.setNumReduceTasks(1);
        job.waitForCompletion(true);
    }

    /**
     * Map函数,解析每一行记录为AccessLogWritable,这样Map输出的时候就可以根据
     * AccessLogWritable对象中的两个字段进行排序,从而实现前面要求的二次排序需求
     * 也就是说,排序依旧是依赖Map输出时的排序,但是规则是我们在AccessLogWritable中定义的
     */
    public static class SecondSortMapper extends Mapper<LongWritable, Text, AccessLogWritable, NullWritable> {
        @Override
        protected void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
            // 解析每一行
            String[] fields = value.toString().split("\t");
            if(fields == null || fields.length < 3) {
                return;
            }
            String cookieId = fields[0];
            String time = fields[1];
            String url = fields[2];
            // 构建AccessLogWritable对象
            AccessLogWritable logLine = new AccessLogWritable(cookieId, time, url);
            // 写出到context
            context.write(logLine, NullWritable.get());
        }
    }

    /**
     * 经过shuffle后到达Reducer的数据已经是有序的,所以直接写出即可
     */
    public static class SecondSortReducer extends Reducer<AccessLogWritable, NullWritable, AccessLogWritable, NullWritable> {
        @Override
        protected void reduce(AccessLogWritable key, Iterable<NullWritable> values, Context context)
                throws IOException, InterruptedException {
            context.write(key, NullWritable.get());
        }
    }
}

AccessLogWritable.java

package com.uplooking.bigdata.mr.secondsort;

import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

/**
 * 自定义Hadoop数据类型,作为key,需要实现WritableComparable接口
 * map中排序需要比较的对象为AccessLogWritable,所以泛型填写为AccessLogWritable
 */
public class AccessLogWritable implements WritableComparable<AccessLogWritable> {

    private String cookieId;
    private String time;
    private String url;

    /**
     * 空参构造方法,必须要有,否则会有下面的异常:
     Caused by: java.lang.NoSuchMethodException: com.uplooking.bigdata.mr.secondsort.AccessLogWritable.<init>()
     at java.lang.Class.getConstructor0(Class.java:3082)
     at java.lang.Class.getDeclaredConstructor(Class.java:2178)
     at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:125)
     ... 16 more
     */
    public AccessLogWritable() {

    }

    public AccessLogWritable(String cookieId, String time, String url) {
        this.cookieId = cookieId;
        this.time = time;
        this.url = url;
    }

    /**
     * 比较的方法,定义的规则为:
     * 先按 cookieId 排序,然后按 time 排序
     * @param o
     * @return
     */
    public int compareTo(AccessLogWritable o) {
        int ret = this.cookieId.compareTo(o.cookieId);
        // 如果cookieId比较结果相同,再比较time
        if(ret == 0) {
            ret = this.time.compareTo(o.time);
        }
        return ret;
    }

    public void write(DataOutput out) throws IOException {
        out.writeUTF(cookieId);
        out.writeUTF(time);
        out.writeUTF(url);
    }

    public void readFields(DataInput in) throws IOException {
        this.cookieId = in.readUTF();
        this.time = in.readUTF();
        this.url = in.readUTF();
    }

    @Override
    public String toString() {
        return cookieId + "\t" + time + "\t" + url;
    }
}

测试

这里使用本地环境来运行MapReduce程序,输入的参数如下:

/Users/yeyonghao/data/input/secondsort /Users/yeyonghao/data/output/mr/secondsort

也可以将其打包成jar包,然后上传到Hadoop环境中运行。

运行程序后,查看输出结果如下:

[email protected]:~/data/output/mr/secondsort$ cat part-r-00000
1   12:12:34    1_hao123
1   15:02:41    1_google
1   19:10:34    1_baidu
2   05:02:41    2_google
2   12:12:34    2_hao123
2   15:02:41    2_google
3   09:10:34    3_baidu
3   22:11:34    3_sougou
3   23:10:34    3_soso

可以看到,通过使用自定义的key,我们的MapReduce程序已经完成了二次排序的功能。

扩展:如何实现多次排序

其实如果上面的程序能够理解清楚的话,多次排序的思路应该也是很自然就可以想到的,因为比较的规则其实是在key中定义的,而对于Map来说,是依据key来进行排序的,所以如果需要进行多次排序,我们就可以在自定义的key的compareTo方法中来实现多次排序的规则,有兴趣的朋友可以自行写出这样的程序,这里就不再说明。

原文地址:http://blog.51cto.com/xpleaf/2084323

时间: 2024-08-01 23:42:09

MapReduce程序之二次排序与多次排序的相关文章

HADOOP之MAPREDUCE程序应用二

摘要:MapReduce程序进行单词计数. 关键词:MapReduce程序  单词计数 数据源:人工构造英文文档file1.txt,file2.txt. file1.txt 内容 Hello   Hadoop I   am  studying   the   Hadoop  technology file2.txt内容 Hello  world The  world  is  very  beautiful I   love    the   Hadoop    and    world 问题描

Hadoop之MapReduce程序应用三

摘要:MapReduce程序进行数据去重. 关键词:MapReduce   数据去重 数据源:人工构造日志数据集log-file1.txt和log-file2.txt. log-file1.txt内容 2014-1-1    wangluqing 2014-1-2    root 2014-1-3   root 2014-1-4  wangluqing 2014-1-5  root 2014-1-6  wangluqing log-file2.txt内容 2014-1-1  root 2014-

Hadoop(二):MapReduce程序(Java)

Java版本程序开发过程主要包含三个步骤,一是map.reduce程序开发:第二是将程序编译成JAR包:第三使用Hadoop jar命令进行任务提交. 下面拿一个具体的例子进行说明,一个简单的词频统计,输入数据是一个单词文本,输出每个单词的出现个数. 一.MapReduce程序 标准的MapReduce程序包含一个Mapper函数.一个Reducer函数和一个main函数 1.主程序 1 package hadoop; 2 import org.apache.hadoop.conf.Config

在Hadoop上运行基于RMM中文分词算法的MapReduce程序

原文:http://xiaoxia.org/2011/12/18/map-reduce-program-of-rmm-word-count-on-hadoop/ 在Hadoop上运行基于RMM中文分词算法的MapReduce程序 23条回复 我知道这个文章标题很“学术”化,很俗,让人看起来是一篇很牛B或者很装逼的论文!其实不然,只是一份普通的实验报告,同时本文也不对RMM中文分词算法进行研究.这个实验报告是我做高性能计算课程的实验里提交的.所以,下面的内容是从我的实验报告里摘录出来的,当作是我学

6.命令行编译打包运行五个MapReduce程序

对于如何编译WordCount.java,对于0.20 等旧版本版本的做法很常见,具体如下: javac -classpath /usr/local/hadoop/hadoop-1.0.1/hadoop-core-1.0.1.jar WordCount.java 但较新的 2.X 版本中,已经没有 hadoop-core*.jar 这个文件,因此编辑和打包自己的MapReduce程序与旧版本有所不同. Hadoop 2.x 版本中的依赖 jar Hadoop 2.x 版本中jar不再集中在一个

mapreduce程序编写(WordCount)

折腾了半天.终于编写成功了第一个自己的mapreduce程序,并通过打jar包的方式运行起来了. 运行环境: windows 64bit eclipse 64bit jdk6.0 64bit 一.工程准备 1.新建java project 2.导入jar包 新建一个user library 把hadoop文件夹里的hadoop-core和lib包里的所有包都导入进来,以免出错. 二.编码 1.主要是计算单词的小程序,测试用 package com.hirra; import java.io.IO

Hadoop学习---第三篇Hadoop的第一个Mapreduce程序

Mapreducer程序写了好几个了,但是之前一直都没有仔细的测试过本地运行和集群上运行的区别,今天写了一个Mapreduce程序,在此记录下来. 本地运行注意事项有以下几点: 1.本地必须配置好Hadoop的开发环境 2.在src里不加入配置文件运行,或者如果本地的src里有mapred-site.xml和yarn-site.xml配置文件,那么mapreduce.framework.name=local以及yarn.resourcemanager.hostname=local 测试说明:sr

MapReduce数据流(二)

输入块(InputSplit):一个输入块描述了构成MapReduce程序中单个map任务的一个单元.把一个MapReduce程序应用到一个数据集上,即是指一个作业,会由几个(也可能几百个)任务组成.Map任务可能会读取整个文件,但一般是读取文件的一部分.默认情况下,FileInputFormat及其子类会以64MB(与HDFS的Block默认大小相同,译注:Hadoop建议Split大小与此相同)为基数来拆分文件.你可以在hadoop-site.xml(译注:0.20.*以后是在mapred-

用python + hadoop streaming 编写分布式程序(二) -- 在集群上运行与监控

写在前面 前文:用python + hadoop streaming 编写分布式程序(一) -- 原理介绍,样例程序与本地调试 为了方便,这篇文章里的例子均为伪分布式运行,一般来说只要集群配置得当,在伪分布式下能够运行的程序,在真实集群上也不会有什么问题. 为了更好地模拟集群环境,我们可以在mapred-site.xml中增设reducer和mapper的最大数目(默认为2,实际可用数目大约是CPU核数-1). 假设你为Hadoop安装路径添加的环境变量叫$HADOOP_HOME(如果是$HAD