Hadoop的ChainMapper和ChainReducer实战

Hadoop的MR作业支持链式处理,类似在一个生产牛奶的流水线上,每一个阶段都有特定的任务要处理,比如提供牛奶盒,装入牛奶,封盒,打印出厂日期,等等,通过这样进一步的分工,从而提高了生产效率,那么在我们的Hadoop的MapReduce中也是如此,支持链式的处理方式,这些Mapper像Linux管道一样,前一个Mapper的输出结果直接重定向到下一个Mapper的输入,形成一个流水线,而这一点与Lucene和Solr中的Filter机制是非常类似的,Hadoop项目源自Lucene,自然也借鉴了一些Lucene中的处理方式。

举个例子,比如处理文本中的一些禁用词,或者敏感词,等等,Hadoop里的链式操作,支持的形式类似正则Map+ Rrduce Map*,代表的意思是全局只能有一个唯一的Reduce,但是在Reduce的前后是可以存在无限多个Mapper来进行一些预处理或者善后工作的。

下面来看下的散仙今天的测试例子,先看下我们的数据,以及需求。

数据如下:

手机 5000
电脑 2000
衣服 300
鞋子 1200
裙子 434
手套 12
图书 12510
小商品 5
小商品 3
订餐 2

需求是:
/**
* 需求:
* 在第一个Mapper里面过滤大于10000万的数据
* 第二个Mapper里面过滤掉大于100-10000的数据
* Reduce里面进行分类汇总并输出
* Reduce后的Mapper里过滤掉商品名长度大于3的数据
*/

预计处理完的结果是:
手套 12
订餐 2

散仙的hadoop版本是1.2的,在1.2的版本里,hadoop支持新的API,但是链式的ChainMapper类和ChainReduce类却不支持新 的,新的在hadoop2.x里面可以使用,差别不大,散仙今天给出的是旧的API的,需要注意一下。
代码如下:

package com.qin.test.hadoop.chain;

import java.io.IOException;
import java.util.Iterator;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.lib.ChainMapper;
import org.apache.hadoop.mapred.lib.ChainReducer;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

import com.qin.reducejoin.NewReduceJoin2;

/**
 *
 * 测试Hadoop里面的
 * ChainMapper和ReduceMapper的使用
 *
 * @author qindongliang
 * @date 2014年5月7日
 *
 * 大数据交流群:  376932160
 *
 *
 *
 *
 * ***/
public class HaoopChain {

/**
 * 需求:
 * 在第一个Mapper里面过滤大于10000万的数据
 * 第二个Mapper里面过滤掉大于100-10000的数据
 * Reduce里面进行分类汇总并输出
 * Reduce后的Mapper里过滤掉商品名长度大于3的数据
 */

    /**
     *
     * 过滤掉大于10000万的数据
     *
     * */
    private static class AMapper01 extends MapReduceBase implements  Mapper<LongWritable, Text, Text, Text>{

     @Override
    public void map(LongWritable key, Text value, OutputCollector<Text, Text> output, Reporter reporter)
            throws IOException {
            String text=value.toString();
            String texts[]=text.split(" ");

        System.out.println("AMapper01里面的数据: "+text);
        if(texts[1]!=null&&texts[1].length()>0){
        int count=Integer.parseInt(texts[1]);
        if(count>10000){
            System.out.println("AMapper01过滤掉大于10000数据:  "+value.toString());
            return;
        }else{
            output.collect(new Text(texts[0]), new Text(texts[1]));

        }

        }
    }
    }

    /**
     *
     * 过滤掉大于100-10000的数据
     *
     * */
    private static class AMapper02 extends MapReduceBase implements  Mapper<Text, Text, Text, Text>{

     @Override
    public void map(Text key, Text value,
            OutputCollector<Text, Text> output, Reporter reporter)
            throws IOException {

         int count=Integer.parseInt(value.toString());
            if(count>=100&&count<=10000){
                System.out.println("AMapper02过滤掉的小于10000大于100的数据: "+key+"    "+value);
                return;
            } else{

                output.collect(key, value);
            }

    }
    } 

    /**
     * Reuduce里面对同种商品的
     * 数量相加数据即可
     *
     * **/
    private static class AReducer03 extends MapReduceBase implements Reducer<Text, Text, Text, Text>{

        @Override
        public void reduce(Text key, Iterator<Text> values,
                OutputCollector<Text, Text> output, Reporter reporter)
                throws IOException {
            int sum=0;
             System.out.println("进到Reduce里了");

            while(values.hasNext()){

                Text t=values.next();
                sum+=Integer.parseInt(t.toString());

            }

            //旧API的集合,不支持foreach迭代
//            for(Text t:values){
//                sum+=Integer.parseInt(t.toString());
//            }

            output.collect(key, new Text(sum+""));

        }

    }

    /***
     *
     * Reduce之后的Mapper过滤
     * 过滤掉长度大于3的商品名
     *
     * **/

    private static class AMapper04 extends MapReduceBase implements Mapper<Text, Text, Text, Text>{

        @Override
        public void map(Text key, Text value,
                OutputCollector<Text, Text> output, Reporter reporter)
                throws IOException {

            int len=key.toString().trim().length();

            if(len>=3){
                System.out.println("Reduce后的Mapper过滤掉长度大于3的商品名: "+ key.toString()+"   "+value.toString());
                return ;
            }else{
                output.collect(key, value);
            }

        }

    }

     /***
      * 驱动主类
      * **/
    public static void main(String[] args) throws Exception{
         //Job job=new Job(conf,"myjoin");
         JobConf conf=new JobConf(HaoopChain.class);
           conf.set("mapred.job.tracker","192.168.75.130:9001");
           conf.setJobName("t7");
            conf.setJar("tt.jar");
          conf.setJarByClass(HaoopChain.class);

        //  Job job=new Job(conf, "2222222");
        // job.setJarByClass(HaoopChain.class);
         System.out.println("模式:  "+conf.get("mapred.job.tracker"));;

        // job.setMapOutputKeyClass(Text.class);
        // job.setMapOutputValueClass(Text.class);

          //Map1的过滤
         JobConf mapA01=new JobConf(false);
         ChainMapper.addMapper(conf, AMapper01.class, LongWritable.class, Text.class, Text.class, Text.class, false, mapA01);

         //Map2的过滤
         JobConf mapA02=new JobConf(false);
         ChainMapper.addMapper(conf, AMapper02.class, Text.class, Text.class, Text.class, Text.class, false, mapA02);

         //设置Reduce
         JobConf recduceFinallyConf=new JobConf(false);
         ChainReducer.setReducer(conf, AReducer03.class, Text.class, Text.class, Text.class, Text.class, false, recduceFinallyConf);

        //Reduce过后的Mapper过滤
         JobConf  reduceA01=new  JobConf(false);
         ChainReducer.addMapper(conf, AMapper04.class, Text.class, Text.class, Text.class, Text.class, true, reduceA01);

         conf.setOutputKeyClass(Text.class);
         conf.setOutputValueClass(Text.class);

         conf.setInputFormat(org.apache.hadoop.mapred.TextInputFormat.class);
         conf.setOutputFormat(org.apache.hadoop.mapred.TextOutputFormat.class);

         FileSystem fs=FileSystem.get(conf);
//
         Path op=new Path("hdfs://192.168.75.130:9000/root/outputchain");
         if(fs.exists(op)){
             fs.delete(op, true);
             System.out.println("存在此输出路径,已删除!!!");
         }
//
//         

         org.apache.hadoop.mapred.FileInputFormat.setInputPaths(conf, new Path("hdfs://192.168.75.130:9000/root/inputchain"));
         org.apache.hadoop.mapred.FileOutputFormat.setOutputPath(conf, op);
//
      //System.exit(conf.waitForCompletion(true)?0:1);
        JobClient.runJob(conf);

    }

}

运行日志如下:

模式:  192.168.75.130:9001
存在此输出路径,已删除!!!
WARN - JobClient.copyAndConfigureFiles(746) | Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
WARN - NativeCodeLoader.<clinit>(52) | Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
WARN - LoadSnappy.<clinit>(46) | Snappy native library not loaded
INFO - FileInputFormat.listStatus(199) | Total input paths to process : 1
INFO - JobClient.monitorAndPrintJob(1380) | Running job: job_201405072054_0009
INFO - JobClient.monitorAndPrintJob(1393) |  map 0% reduce 0%
INFO - JobClient.monitorAndPrintJob(1393) |  map 50% reduce 0%
INFO - JobClient.monitorAndPrintJob(1393) |  map 100% reduce 0%
INFO - JobClient.monitorAndPrintJob(1393) |  map 100% reduce 33%
INFO - JobClient.monitorAndPrintJob(1393) |  map 100% reduce 100%
INFO - JobClient.monitorAndPrintJob(1448) | Job complete: job_201405072054_0009
INFO - Counters.log(585) | Counters: 30
INFO - Counters.log(587) |   Job Counters
INFO - Counters.log(589) |     Launched reduce tasks=1
INFO - Counters.log(589) |     SLOTS_MILLIS_MAPS=11357
INFO - Counters.log(589) |     Total time spent by all reduces waiting after reserving slots (ms)=0
INFO - Counters.log(589) |     Total time spent by all maps waiting after reserving slots (ms)=0
INFO - Counters.log(589) |     Launched map tasks=2
INFO - Counters.log(589) |     Data-local map tasks=2
INFO - Counters.log(589) |     SLOTS_MILLIS_REDUCES=9972
INFO - Counters.log(587) |   File Input Format Counters
INFO - Counters.log(589) |     Bytes Read=183
INFO - Counters.log(587) |   File Output Format Counters
INFO - Counters.log(589) |     Bytes Written=19
INFO - Counters.log(587) |   FileSystemCounters
INFO - Counters.log(589) |     FILE_BYTES_READ=57
INFO - Counters.log(589) |     HDFS_BYTES_READ=391
INFO - Counters.log(589) |     FILE_BYTES_WRITTEN=174859
INFO - Counters.log(589) |     HDFS_BYTES_WRITTEN=19
INFO - Counters.log(587) |   Map-Reduce Framework
INFO - Counters.log(589) |     Map output materialized bytes=63
INFO - Counters.log(589) |     Map input records=10
INFO - Counters.log(589) |     Reduce shuffle bytes=63
INFO - Counters.log(589) |     Spilled Records=8
INFO - Counters.log(589) |     Map output bytes=43
INFO - Counters.log(589) |     Total committed heap usage (bytes)=336338944
INFO - Counters.log(589) |     CPU time spent (ms)=1940
INFO - Counters.log(589) |     Map input bytes=122
INFO - Counters.log(589) |     SPLIT_RAW_BYTES=208
INFO - Counters.log(589) |     Combine input records=0
INFO - Counters.log(589) |     Reduce input records=4
INFO - Counters.log(589) |     Reduce input groups=3
INFO - Counters.log(589) |     Combine output records=0
INFO - Counters.log(589) |     Physical memory (bytes) snapshot=460980224
INFO - Counters.log(589) |     Reduce output records=2
INFO - Counters.log(589) |     Virtual memory (bytes) snapshot=2184105984
INFO - Counters.log(589) |     Map output records=4

总结,测试过程中,发现如果Reduce后面,还有Mapper执行,那么注意一定要,在ChainReducer里面先set一个全局唯一的Reducer,然后再add一个Mapper,否则,在运行的时候,会报空指针异常,这一点需要特别注意! 

Hadoop的ChainMapper和ChainReducer实战

时间: 2024-10-10 17:55:18

Hadoop的ChainMapper和ChainReducer实战的相关文章

Hadoop的ChainMapper和ChainReducer使用案例(链式处理)

不多说,直接上干货!      Hadoop的MR作业支持链式处理,类似在一个生产牛奶的流水线上,每一个阶段都有特定的任务要处理,比如提供牛奶盒,装入牛奶,封盒,打印出厂日期,等等,通过这样进一步的分工,从而提高了生产效率,那么在我们的Hadoop的MapReduce中也是如此,支持链式的处理方式,这些Mapper像Linux管道一样,前一个Mapper的输出结果直接重定向到下一个Mapper的输入,形成一个流水线,而这一点与Lucene和Solr中的Filter机制是非常类似的,Hadoop项

【Hadoop大数据分析与挖掘实战】(一)----------P19~22

这是一本书的名字,叫做[Hadoop大数据分析与挖掘实战],我从2017.1开始学习 软件版本为Centos6.4 64bit,VMware,Hadoop2.6.0,JDK1.7. 但是这本书的出版时间为2016.1,待到我2017.1使用时,一部分内容已经发生了翻天覆地的变化. 于是我开始写这么一个博客,把这些记录下来. 我使用的软件版本为: 软件 版本 操作系统 CentOS 7 64bit-1611 虚拟机 VMware 12.5.2 Hadoop 2.7.3 JDK 1.8.0 本人大二

【Hadoop大数据分析与挖掘实战】(三)----------P23~25

6.安装Hadoop 1)在Hadoop网站下,下载稳定版的并且已经编译好的二进制包,并解压缩. [[email protected] ~]$ wget http://mirrors.hust.edu.cn/apache/hadoop/common/hadoop-2.7.3/hadoop-2.7.3.tar.gz [[email protected] ~]$ tar -zxvf hadoop-2.7.3.tar.gz ~/opt [[email protected] ~]$ ~/opt/hado

Hadoop应用开发案例(实战)视频教程(项目实战)视频教程

38套大数据,云计算,架构,数据分析师,Hadoop,Spark,Storm,Kafka,人工智能,机器学习,深度学习,项目实战视频教程 视频课程包含: 38套大数据和人工智能精品高级课包含:大数据,云计算,架构,数据挖掘实战,实时推荐系统实战,电视收视率项目实战,实时流统计项目实战,离线电商分析项目实战,Spark大型项目实战用户分析,智能客户系统项目实战,Linux基础,Hadoop,Spark,Storm,Docker,Mapreduce,Kafka,Flume,OpenStack,Hiv

Hadoop小兵笔记【二】Hadoop分布式集群扩展实战经验

一.集群原先配置: 主机名sparkMaster,Ubuntu12.04-32 ,用户名Root , 内存4g    (只用于任务调度和分配,不做计算节点) 从机名sparkSlave1,Ubuntu12.04-32 ,用户名Root , 内存4g    (计算节点) 从机名sparkSlave2,Ubuntu12.04-32 ,用户名Root , 内存1.7g (计算节点) 二.扩展原因:计算数据量增大,原先的两个工作节点已不不能满足实时性的需求,由于实验室计算资源有限,故将原先的调度节点也增

高可用Hadoop平台-Flume NG实战图解篇

1.概述 今天补充一篇关于Flume的博客,前面在讲解高可用的Hadoop平台的时候遗漏了这篇,本篇博客为大家讲述以下内容: Flume NG简述 单点Flume NG搭建.运行 高可用Flume NG搭建 Failover测试 截图预览 下面开始今天的博客介绍. 2.Flume NG简述 Flume NG是一个分布式,高可用,可靠的系统,它能将不同的海量数据收集,移动并存储到一个数据存储系统中.轻量,配置简单,适用于各种日志收集,并支持Failover和负载均衡.并且它拥有非常丰富的组件.Fl

《Hadoop大数据技术开发实战》新书上线

当今互联网已进入大数据时代,大数据技术已广泛应用于金融.医疗.教育.电信.政府等领域.各行各业每天都在产生大量的数据,数据计量单位已从B.KB.MB.GB.TB发展到PB.EB.ZB.YB甚至BB.NB.DB.预计未来几年,全球数据将呈爆炸式增长.谷歌.阿里巴巴.百度.京东等互联网公司都急需掌握大数据技术的人才,而大数据相关人才却出现了供不应求的状况. Hadoop作为大数据生态系统中的核心框架,专为离线和大规模数据处理而设计.Hadoop的核心组成HDFS为海量数据提供了分布式存储:MapRe

hadoop大数据分析与挖掘实战(读书笔记1)

第一章节是从一个餐厅的角度出发,引出来许许多多的相关概念. 第一个概念就是什么是数据挖掘,这个简单,望文生义就好了.它的名字本身就诠释了它的内涵. 基本任务还是得记一下: 1分类与预测.(有点像量化,股票交易) 2聚类分析() 3关联规则() 4时序模式() 5偏差检测() 关于定义挖掘目标,就是什么菜品推荐,门店开在哪,这些问题.可以不多说. 关于数据取样,没啥好说的. 然后重点在后面, 数据质量分析:有缺失值怎么办?(就是统计一下缺失率什么的,然后删掉或者补值呗) 有异常值怎么办?(识别出来

hadoop学习;datajoin;chain签名;combine()

hadoop有种简化机制来管理job和control的非线性作业之间的依赖,job对象时mapreduce的表现形式.job对象的实例化可通过传递一个jobconf对象到作业的构造函数中来实现. x.addDeopendingJob(y)意味着x在y完成之前不会启动. 鉴于job对象存储着配置和依赖信息,jobcontrol对象会负责监管作业的执行,通过addjob(),你可以为jobcontrol添加作业,当所有作业和依赖关系添加完成后,调用jobcontrol的run()方法,生成一个线程提