Hadoop2.4.1 MapReduce通过Map端shuffle(Combiner)完成数据去重

package com.bank.service;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/**
 * 将清洗后的数据通过Map端Shuffle(Job.setCombinerClass)去除重复值
 * @author mengyao
 *
 */
public class CnyDataFormatReplition extends Configured implements Tool {

/**
     * Map端将行内容通过key输出到Reduce,这样会按照字典顺序对key进行排序,输出的value则为空,空值使用Hadoop提供的NullWritable类,该类是Hadoop的序列化后的类型
     * @author mengyao
     *
     */
    static class CnyDataFormatReplitionMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
        @Override
        protected void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
            context.write(value, NullWritable.get());
        }
    }

/**
     * 在Map端Combiner后作为Reduce接收的key,Reduce端将key写入到HDFS,value则无需输出,使用NullWritable表示不输出
     * @author mengyao
     *
     */
    static class CnyDataFormatReplitionReduce extends Reducer<Text, NullWritable, Text, NullWritable> {
        @Override
        protected void reduce(Text key, Iterable<NullWritable> value, Context context)
                throws IOException, InterruptedException {
            context.write(key, NullWritable.get());        
        }
    }
    
    @Override
    public int run(String[] arg0) throws Exception {
        Job job = Job.getInstance(getConf(), CnyDataFormatReplition.class.getSimpleName());
        //指定运行作业类的主函数入口
        job.setJarByClass(CnyDataFormatReplition.class);
        
        FileInputFormat.setInputPaths(job, new Path(arg0[0]));
        job.setMapperClass(CnyDataFormatReplitionMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(NullWritable.class);
        
        //在Map端进行shuffle,先写入缓冲区预排序(达到缓冲区默认100m后系统起后台线程spill到本地磁盘,写入磁盘前会进行二次快速排序),减少到Reduce的网络开销
        job.setCombinerClass(CnyDataFormatReplitionReduce.class);
        
        FileOutputFormat.setOutputPath(job, new Path(arg0[1]));
        job.setReducerClass(CnyDataFormatReplitionReduce.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);
        
        //提交作业并打印作业的进度详情,true打印,false为不打印
        return job.waitForCompletion(true) ? 0 : 1;
    }
    
    
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        String[] otherArgs = new GenericOptionsParser(conf,args).getRemainingArgs();
        if (otherArgs.length != 2) {
            System.err.println(" ERROR: <inputDir> <outputDir>");
            System.exit(2);
        }
        int status = ToolRunner.run(new CnyDataFormatReplition(), otherArgs);
        System.exit(status);
    }

}

时间: 2024-10-20 18:04:34

Hadoop2.4.1 MapReduce通过Map端shuffle(Combiner)完成数据去重的相关文章

MapReduce核心map reduce shuffle (spill sort partition merge)详解

Shuffle过程是MapReduce的核心,也被称为奇迹发生的地方.要想理解MapReduce, Shuffle是必须要了解的.Shuffle的正常意思是洗牌或弄乱,可能大家更熟悉的是Java API里Collections.shuffle(List)方法,它会随机地打乱参数list里的元素顺序.如果你不知道MapReduce里 Shuffle是什么,那么请看这张图: 这张是官方对Shuffle过程的描述.但我可以肯定的 是,单从这张图你基本不可能明白Shuffle的过程,因为它与事实相差挺多

hadoop核心逻辑shuffle代码分析-map端

首先要推荐一下:http://www.alidata.org/archives/1470 阿里的大牛在上面的文章中比较详细的介绍了shuffle过程中mapper和reduce的每个过程,强烈推荐先读一下. 不过,上文没有写明一些实现的细节,比如:spill的过程,mapper生成文件的 partition是怎么做的等等,相信有很多人跟我一样在看了上面的文章后还是有很多疑问,我也是带着疑问花了很久的看了cdh4.1.0版本 shuffle的逻辑,整理成本文,为以后回顾所用. 首先用一张图展示下m

Hadoop on Mac with IntelliJ IDEA - 10 陆喜恒. Hadoop实战(第2版)6.4.1(Shuffle和排序)Map端 内容整理

下午对着源码看陆喜恒. Hadoop实战(第2版)6.4.1  (Shuffle和排序)Map端,发现与Hadoop 1.2.1的源码有些出入.下面作个简单的记录,方便起见,引用自书本的语句都用斜体表示. 依书本,从MapTask.java开始.这个类有多个内部类: 从书的描述可知,collect()并不在MapTask类,而在MapOutputBuffer类,其函数功能是 1.定义输出内存缓冲区为环形结构2.定义输出内存缓冲区内容到磁盘的操作 在collect函数中将缓冲区的内容写出时会调用s

MapReduce实现Reduce端Join操作实例

使用案例: 联接两张表 Table EMP:(新建文件EMP,第一行属性名不要) Name Sex Age DepNo zhang male 20 1 li female 25 2 wang female 30 3 zhou male 35 2 Table Dep:(新建文件DEP,第一行属性名不要) DepNo DepName 1 Sales 2 Dev 3 Mgt Inner join: select Name,Sex,Age,DepName from EMP inner join DEP

【转】MapReduce:详解Shuffle过程

——转自:{http://langyu.iteye.com/blog/992916} Shuffle过程是MapReduce的核心,也被称为奇迹发生的地方.要想理解MapReduce, Shuffle是必须要了解的.我看过很多相关的资料,但每次看完都云里雾里的绕着,很难理清大致的逻辑,反而越搅越混.前段时间在做MapReduce job 性能调优的工作,需要深入代码研究MapReduce的运行机制,这才对Shuffle探了个究竟.考虑到之前我在看相关资料而看不懂时很恼火,所以在这里我尽最大的可能

map-reduce 、map、reduce

map-reduce 过程 中间绿线区域就是shuffle("洗牌")过程:map之后,reduce之前的过程. 可以这样理解:一个map 产生的数据,结果通过hash过程分区却分配给了不同的reduce任务,就相当于对数据洗牌的过程. map端过程 reduce 过程

hadoop的压缩解压缩,reduce端join,map端join

hadoop的压缩解压缩 hadoop对于常见的几种压缩算法对于我们的mapreduce都是内置支持,不需要我们关心.经过map之后,数据会产生输出经过shuffle,这个时候的shuffle过程特别需要消耗网络资源,它传输的数据量越少,对作业的运行时间越有意义,在这种情况下,我们可以对输出进行一个压缩.输出压缩之后,reducer就要接收,然后再解压,reducer处理完之后也需要做输出,也可以做压缩.对于我们程序而言,输入的压缩是我们原来的,不是程序决定的,因为输入源就是这样子,reduce

MapReduce:详解Shuffle过程

Shuffle过程,也称Copy阶段.reduce task从各个map task上远程拷贝一片数据,并针对某一片数据,如果其大小超过一定的阀值,则写到磁盘上,否则直接放到内存中. 官方的Shuffle过程如上图所示,不过细节有错乱,官方图并没有说明partition.sort和combiner具体作用于哪个阶段. 注意:Shuffle过程是贯穿于map和reduce两个过程的! Hadoop的集群环境,大部分的map task和reduce task是执行在不同的节点上的,那么reduce就要

Hadoop-2.4.1学习之Map任务源码分析(上)

众所周知,Mapper是MapReduce编程模式中最重要的环节之一(另一个当然是Reducer了).在Hadoop-2.x版本中虽然不再有JobTracker和TaskTracker,但Mapper任务的功能却没有变化,本篇文章将结合源代码深入分析Mapper任务时如何执行的,包括处理InputSplit,mapper的输出.对输出分类等.在进行分析之前先明确几个概念:作业.任务.任务的阶段和任务的状态,可以将作业理解为要最终实现的功能或目的,比如统计单词的数量,而任务就是对该作业的拆分,只负