链式ChainMapper/ChainReducer

类似于Linux管道重定向机制,前一个Map的输出直接作为下一个Map的输入,形成一个流水线。设想这样一个场景:在Map阶段,数据经过mapper1和mapper2处理;在Reduce阶段,数据经过sort和shuffle后,交给对应的reducer处理。reducer处理后并没有直接写入到Hdfs, 而是交给了另一个mapper3处理,它产生的结果最终写到hdfs的输出目录中。

注意:对任意MR作业,Map和Reduce阶段可以有无限个Mapper,但reduer只能有一个。

package chain;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.VLongWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.chain.ChainMapper;
import org.apache.hadoop.mapreduce.lib.chain.ChainReducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class Chain {

    /**
     *     手机 5000        * 需求:
        电脑 2000        * 在第一个Mapper1里面过滤大于10000的数据
        衣服 300        * 第二个Mapper2里面过滤掉大于100-10000的数据
        鞋子 1200        * Reduce里面进行分类汇总并输出
        裙子 434        * Reduce后的Mapper3里过滤掉商品名长度大于3的数据
        手套 12        *
        图书 12510    *
        小商品 5        * 预计处理完的结果是:
        小商品 3        * 手套 12
        订餐 2        * 订餐 2
     * @throws Exception
     */

    public static void main(String[] args) throws Exception {
        Job job = Job.getInstance(new Configuration());
        job.setJarByClass(Chain.class);

        /**
         * 配置mapper1
         * 注意此处带参数的构造函数:new Configuration(false)
         */
        Configuration map1Conf = new Configuration(false);
        ChainMapper.addMapper(job,                 //主作业
                Mapper1.class,                     //待加入的map class
                LongWritable.class,             //待加入map class的输入key类型
                Text.class,                        //待加入map class的输入value类型
                Text.class,                     //待加入map class的输出key类型
                VLongWritable.class,             //待加入map class的输出value类型
                map1Conf);                        //待加入map class的配置信息

        //配置mapper2
        ChainMapper.addMapper(job, Mapper2.class, Text.class, VLongWritable.class, Text.class, VLongWritable.class, new Configuration(false));

        /**
         * 配置Reducer
         * 注意此处使用的是setReducer()方法
         */
        ChainReducer.setReducer(job, Reducer_Only.class, Text.class, VLongWritable.class, Text.class, VLongWritable.class, new Configuration(false));

        //配置mapper3
        ChainReducer.addMapper(job, Mapper3.class, Text.class, VLongWritable.class, Text.class, VLongWritable.class, new Configuration(false));

        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        job.waitForCompletion(true);
    }

    //Mapper1
    public static class Mapper1 extends Mapper<LongWritable, Text, Text, VLongWritable>{
        @Override
        protected void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {

            /**
             * Hadoop中默认的输入格式 TextOutputFormat 只支持UTF-8格式
             * 所以解决GBK中文输出乱码问题,有两个方法:
             * 1. 先将输入的Text类型的value转换为字节数组
             * 2. 然后使用String的构造器String(byte[] bytes, int offset, int length, Charset charset)
             * 3. 通过使用指定的charset解码指定的byte子数组,构造一个新的String
             */
            String line=new String(value.getBytes(),0,value.getLength(),"GBK");
            String[] splited = line.split(" ");

            //过滤大于10000的数据
            if(Integer.parseInt(splited[1])<10000L){
                context.write(new Text(splited[0]), new VLongWritable(Long.parseLong(splited[1])));
            }
        }
    }

    //Mapper2
    public static class Mapper2 extends Mapper<Text, VLongWritable, Text, VLongWritable>{
        @Override
        protected void map(Text key, VLongWritable value, Context context)
                throws IOException, InterruptedException {

            //过滤100-10000间的数据
            if(value.get()<100L){
                context.write(key, value);
            }
        }
    }

    //Reducer
    public static class Reducer_Only extends Reducer<Text, VLongWritable, Text, VLongWritable>{
        @Override
        protected void reduce(Text key, Iterable<VLongWritable> v2s, Context context)
                throws IOException, InterruptedException {

            long sumLong=0L;

            for(VLongWritable vLongWritable : v2s){
                sumLong += vLongWritable.get();

                context.write(key, new VLongWritable(sumLong));
            }
        }
    }

    //Mapper3
    public static class Mapper3 extends Mapper<Text, VLongWritable, Text, VLongWritable>{
        @Override
        protected void map(Text key, VLongWritable value, Context context)
                throws IOException, InterruptedException {

            String line=new String(key.getBytes(),0,key.getLength(),"GBK");

            //过滤商品名称长度大于3
            if(line.length()<3){
                context.write(key, value);
            }
        }
    }
}

时间: 2024-07-28 21:12:58

链式ChainMapper/ChainReducer的相关文章

Hadoop MapReduce链式实践--ChainReducer

版本:CDH5.0.0,HDFS:2.3.0,Mapreduce:2.3.0,Yarn:2.3.0. 场景描述:求一组数据中按照不同类别的最大值,比如,如下的数据: data1: A,10 A,11 A,12 A,13 B,21 B,31 B,41 B,51 data2: A,20 A,21 A,22 A,23 B,201 B,301 B,401 B,501 最后输出为: A,23 B,501 假如这样的逻辑的mapreduce数据流如下: 假设C组数据比较多,同时假设集群有2个节点,那么这个任

(转)Hadoop MapReduce链式实践--ChainReducer

版本:CDH5.0.0,HDFS:2.3.0,Mapreduce:2.3.0,Yarn:2.3.0. 场景描述:求一组数据中按照不同类别的最大值,比如,如下的数据: data1: [plain] view plaincopy A,10 A,11 A,12 A,13 B,21 B,31 B,41 B,51 data2: [plain] view plaincopy A,20 A,21 A,22 A,23 B,201 B,301 B,401 B,501 最后输出为: [plain] view pla

Mapreduce 工作机制图,MapReduce组合式,迭代式,链式

Mapreduce 工作机制图: 图中1:表示待处理数据,比如日志,比如单词计数图中2:表示map阶段,对他们split,然后送到不同分区图中3:表示reduce阶段,对这些数据整合处理.图中4:表示二次mapreduce,这个是mapreduce的链式 MapReduce组合式,迭代式,链式 问题导读: 1.比如我们输出的mapreduce结果,需要进入下一个mapreduce,该怎么解决?可以使用迭代式2.那么什么是迭代式?3.什么是依赖式?4.什么是链式?5.三种模式各自的应用场景是什么?

MapReduce的组合式,迭代式,链式

1.迭代式mapreduce        一些复杂的任务难以用一次MapReduce处理完成,需要多次 MapReduce 才能完成任务,例如Pagrank,K-means算法都需要多次的迭代,关于 MapReduce 迭代在Mahout中运用较多.有兴趣的可以参考一下Mahout的源码.             在MapReduce的迭代思想,类似for循环,前一个 MapReduce的输出结果,作为下一个 MapReduce的输入,任务完成后中间结果都可以删除.        代码示例:

Hadoop的ChainMapper和ChainReducer使用案例(链式处理)

不多说,直接上干货!      Hadoop的MR作业支持链式处理,类似在一个生产牛奶的流水线上,每一个阶段都有特定的任务要处理,比如提供牛奶盒,装入牛奶,封盒,打印出厂日期,等等,通过这样进一步的分工,从而提高了生产效率,那么在我们的Hadoop的MapReduce中也是如此,支持链式的处理方式,这些Mapper像Linux管道一样,前一个Mapper的输出结果直接重定向到下一个Mapper的输入,形成一个流水线,而这一点与Lucene和Solr中的Filter机制是非常类似的,Hadoop项

MapReduce ChainMapper/ChainReducer

The ChainMapper class allows to use multiple Mapper classes within a single Map task. The ChainReducer class allows to chain multiple Mapper classes after a Reducer within the Reducer task. 没有Chain之前,只能通用多个Job迭代来实现数据递进处理,这样做的缺点是: a. 每次迭代,如果所有 Job 对象重

ChainMapper/ChainReducer 的实现原理

ChainMapper/ChainReducer 主要为了解决线性链式Mapper 而提出的.也就是说,在Map 或者Reduce 阶段存在多个Mapper,这些Mapper 像Linux 管道一样,前一个Mapper的输出结果直接重定向到下一个Mapper 的输入,形成一个流水线,形式类似于[MAP+REDUCE MAP*].图1展示了一个典型的ChainMapper/ChainReducer 的应用场景:在Map 阶段,数据依次经过Mapper1 和Mapper2 处理:在Reduce 阶段

UESTC30-最短路-Floyd最短路、spfa+链式前向星建图

最短路 Time Limit: 3000/1000MS (Java/Others) Memory Limit: 65535/65535KB (Java/Others) 在每年的校赛里,所有进入决赛的同学都会获得一件很漂亮的T-shirt.但是每当我们的工作人员把上百件的衣服从商店运回到赛场的时候,却是非常累的!所以现在他们想要寻找最短的从商店到赛场的路线,你可以帮助他们吗? Input 输入包括多组数据. 每组数据第一行是两个整数NN ,MM (N≤100N≤100 ,M≤10000M≤1000

二叉树的链式存储结构----二叉链表

头文件:head.h #include<string.h> #include<ctype.h> #include<malloc.h> /* malloc()等 */ #include<limits.h> /* INT_MAX等 */ #include<stdio.h> /* EOF(=^Z或F6),NULL */ #include<stdlib.h> /* atoi() */ #include<io.h> /* eof()