重新认识mapreduce

写这篇文章,是因为最近遇到了mapreduce的二次排序问题。以前的理解不完全正确。首先看一下mapreduce的过程

相信这张图熟悉MR的人都应该见过,再来一张图

wordcount也不细说了,hadoop里面的hello,world

之前我的理解是map过来的<k,v>会形成(k,<v1,v2,v3...>)的格式,并且按照这种思路写出来不少的mapreduce程序,而且没有错。

后来自定义Writable对象,封装一组值作为key,也没有什么问题,而且一直认为key只要在compareTo中重写 了方法就万事大吉,而且compareTo返回0的会作为相同的key。误区就在这里,之前一直认为key相同的value会合并到一个"list"中-。这句话就有错,key是key,value是value,根本不会将key对应的value合并在一起,真实情况是默认将key相同(compareTo返回0的)的合并成了一组,在组相同的里面去foreach里面的value,如果是自定义key的话你可以将key打印一下,或发现key并不相同。

上代码:

public class Entry implements WritableComparable<Entry> {
    private String yearMonth;
    private int count;

    public Entry() {
    }

    @Override
    public int compareTo(Entry entry) {
        int result = this.yearMonth.compareTo(entry.getYearMonth());
        if (result == 0) {
            result = Integer.compare(count, entry.getCount());
        }
        return result;
    }

    @Override
    public void write(DataOutput dataOutput) throws IOException {
        dataOutput.writeUTF(yearMonth);
        dataOutput.writeInt(count);
    }

    @Override
    public void readFields(DataInput dataInput) throws IOException {
        this.yearMonth = dataInput.readUTF();
        this.count = dataInput.readInt();
    }

    public String getYearMonth() {
        return yearMonth;
    }

    public void setYearMonth(String yearMonth) {
        this.yearMonth = yearMonth;
    }

    public int getCount() {
        return count;
    }

    public void setCount(int count) {
        this.count = count;
    }

    @Override
    public String toString() {
        return yearMonth;
    }
}

自定义分区 EntryPartitioner.java

public class EntryPartitioner extends Partitioner<Entry, Text> {

    @Override
    public int getPartition(Entry entry, Text paramVALUE, int numberPartitions) {
        return Math.abs((entry.getYearMonth().hashCode() % numberPartitions));
    }
}

自定义分组

public class EntryGroupingComparator extends WritableComparator {
    public EntryGroupingComparator() {
        super(Entry.class, true);
    }

    @Override
    public int compare(WritableComparable a, WritableComparable b) {
        Entry a1 = (Entry) a;
        Entry b1 = (Entry) b;
        return a1.getYearMonth().compareTo(b1.getYearMonth());
    }
}

mapper类

public class SecondarySortMapper extends
        Mapper<LongWritable, Text, Entry, Text> {

    private Entry entry = new Entry();
    private Text value = new Text();

    @Override
    protected void map(LongWritable key, Text lines, Context context)
            throws IOException, InterruptedException {
        String line = lines.toString();
        String[] tokens = line.split(",");
        String yearMonth = tokens[0] + "-" + tokens[1];
        int count = Integer.parseInt(tokens[2]);

        entry.setYearMonth(yearMonth);
        entry.setCount(count);
        value.set(tokens[2]);
        context.write(entry, value);

    }
}

reducer类

public class SecondarySortReducer extends Reducer<Entry, Text, Entry, Text> {
    @Override
    protected void reduce(Entry key, Iterable<Text> values, Context context)
            throws IOException, InterruptedException {
        System.out.println("-----------------华丽的分割线-----------------");
        StringBuilder builder = new StringBuilder();
        for (Text value : values) {
            System.out.println(key+"==>"+value);
            builder.append(value.toString());
            builder.append(",");
        }
        context.write(key, new Text(builder.toString()));
    }
}

reducer中打印出来的跟原来想的不一样,一组的值除了自定义分组的属性相同外,其他的属性有不同的。看来以前是自己理解不够深入啊,特此写出,以示警戒

时间: 2024-07-29 11:39:17

重新认识mapreduce的相关文章

MapReduce实现手机上网流量分析

一.问题背景 现在的移动刚一通话就可以在网站上看自己的通话记录,以前是本月只能看上一个月.不过流量仍然是只能看上一月的. 目的就是找到用户在一段时间内的上网流量. 本文并没有对时间分组. 二.数据集分析 可以看出实际数据集并不是每个字段都有值,但是还好,完整地以tab隔开了,数据格式还是不错的,我们需要的上行下行数据都有,没有缺失值.其实这个需要在程序中处理,如果不在的话 该怎么办. 1363157985066 13726230503 00-FD-07-A4-72-B8:CMCC 120.196

mapreduce和spark的原理及区别

Mapreduce和spark是数据处理层两大核心,了解和学习大数据必须要重点掌握的环节,根据自己的经验和大家做一下知识的分享. 首先了解一下Mapreduce,它最本质的两个过程就是Map和Reduce,Map的应用在于我们需要数据一对一的元素的映射转换,比如说进行截取,进行过滤,或者任何的转换操作,这些一对一的元素转换就称作是Map:Reduce主要就是元素的聚合,就是多个元素对一个元素的聚合,比如求Sum等,这就是Reduce. Mapreduce是Hadoop1.0的核心,Spark出现

基于 Eclipse 的 MapReduce 开发环境搭建

文 / vincentzh 原文连接:http://www.cnblogs.com/vincentzh/p/6055850.html 上周末本来要写这篇的,结果没想到上周末自己环境都没有搭起来,运行起来有问题的呢,拖到周一才将问题解决掉.刚好这周也将之前看的内容复习了下,边复习边码代码理解,印象倒是很深刻,对看过的东西理解也更深入了. 目录 1.概述 2.环境准备 3.插件配置 4.配置文件系统连接 5.测试连接 6.代码编写与执行 7.问题梳理 7.1 console 无日志输出问题 7.2

mongodb aggregate and mapReduce

Aggregate MongoDB中聚合(aggregate)主要用于处理数据(诸如统计平均值,求和等),并返回计算后的数据结果.有点类似sql语句中的 count(*) 语法如下: db.collection.aggregate() db.collection.aggregate(pipeline,options) db.runCommand({ aggregate: "<collection>", pipeline: [ <stage>, <...&g

MapReduce源码分析之Task中关于对应TaskAttempt存储Map方案的一些思考

我们知道,MapReduce有三层调度模型,即Job-->Task-->TaskAttempt,并且: 1.通常一个Job存在多个Task,这些Task总共有Map Task和Redcue Task两种大的类型(为简化描述,Map-Only作业.JobSetup Task等复杂的情况这里不做考虑): 2.每个Task可以尝试运行1-n此,而且通常很多情况下都是1次,只有当开启了推测执行原理且存在拖后腿Task,或者Task之前执行失败时,Task才执行多次. 而TaskImpl中存在一个成员变

初步掌握MapReduce的架构及原理

目录 1.MapReduce定义 2.MapReduce来源 3.MapReduce特点 4.MapReduce实例 5.MapReduce编程模型 6.MapReduce 内部逻辑 7.MapReduce架构 8.MapReduce框架的容错性 9.MapReduce资源组织方式 1.MapReduce 定义 Hadoop 中的 MapReduce是一个使用简单的软件框架,基于它写出来的应用程序能够运行在由上千个商用机器组成的大型集群上,并以一种可靠容错式并行处理TB级别的数据集 2.MapR

MapReduce/Hbase进阶提升(原理剖析、实战演练)

什么是MapReduce? MapReduce是一种编程模型,用于大规模数据集(大于1TB)的并行运算.概念"Map(映射)"和"Reduce(归约)",和他们的主要思想,都是从函数式编程语言里借来的,还有从矢量编程语言里借来的特性.他极大地方便了编程人员在不会分布式并行编程的情况下,将自己的程序运行在分布式系统上. 当前的软件实现是指定一个Map(映射)函数,用来把一组键值对映射成一组新的键值对,指定并发的Reduce(归约)函数,用来保证所有映射的键值对中的每一

关于mongodb的mapReduce

由于nodejs本身的限制,在程序中使用js进行大批量计算效率不高.而V8引擎自身对内存大小的限制(64位系统下1.4G),同样限制了数据规模. 因此,相对于从mongodb中抽出数据进行计算,在mongodb中利用聚合函数或者其他方法完成计算,避开nodejs自身限制的方案在可靠性和扩展性上都相对较为令人满意. mongodb支持类似SQL中的聚合函数,虽然语法不通,不过基本原理类似. mongodb自带的接口中,aggregate被用来实现聚合查询: rec = db.LIBRARY.agg

mapreduce工作原理

转自:http://www.cnblogs.com/z1987/p/5055565.html MapReduce模型主要包含Mapper类和Reducer类两个抽象类.Mapper类主要负责对数据的分析处理,最终转化为key-value数据对:Reducer类主要获取key-value数据对,然后处理统计,得到结果.MapReduce实现了存储的均衡,但没有实现计算的均衡. 一. MapReduce框架组成 MapReduce主要包括JobClient.JobTracker.TaskTracke

Hadoop hdfs&mapreduce核心概念

1.HDFS(分布式文件系统体系) 1.1.NameNode:(名称节点) Hdfs的守护程序 记录文件是如何分割成数据块的,以及这些数据块被存储到了哪些节点上 对内存和I/O进行集中管理 是个单点,发生故障将使集群崩溃 1.2.SecondaryNamenode(辅助名称节点):发生故障进行人工的设置才能实现集群崩溃的问题 监控HDFS状态的辅助后台程序 每个集群都有一个 与NameNode进行通讯,定期保存HDFS元数据快照 与NameNode故障可以作为备用NameNode使用 1.3.D