Hadoop MapReduce链式实践--ChainReducer

版本:CDH5.0.0,HDFS:2.3.0,Mapreduce:2.3.0,Yarn:2.3.0。

场景描述:求一组数据中按照不同类别的最大值,比如,如下的数据:

data1:

A,10
A,11
A,12
A,13
B,21
B,31
B,41
B,51

data2:

A,20
A,21
A,22
A,23
B,201
B,301
B,401
B,501

最后输出为:

A,23
B,501

假如这样的逻辑的mapreduce数据流如下:

假设C组数据比较多,同时假设集群有2个节点,那么这个任务分配2个reducer,且C组数据平均分布到两个reducer中,(这样做是为了效率考虑,如果只有一个reducer,那么当一个节点在运行reducer的时候另外一个节点会处于空闲状态)那么如果在reducer之后,还可以再次做一个reducer,那么不就可以整合数据到一个文件了么,同时还可以再次比较C组数据中,以得到真正比较大的数据。

首先说下,不用上面假设的方式进行操作,那么一般的操作方法。一般有两种方法:其一,直接读出HDFS数据,然后进行整合;其二,新建另外一个Job来进行整合。这两种方法,如果就效率来说的话,可能第一种效率会高点。

考虑到前面提出的mapreduce数据流,以前曾对ChainReducer有点印象,好像可以做这个,所以就拿ChainReducer来试,同时为了学多点知识,也是用了多个Mapper(即使用ChainMapper)。

主程序代码如下:

package chain;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.mapred.lib.ChainMapper;
import org.apache.hadoop.mapred.lib.ChainReducer;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class ChainDriver2 extends Configured implements Tool{

	/**
	 * ChainReducer 实战
	 * 验证多个reducer的整合
	 * 逻辑:寻找最大值
	 * @param args
	 */

	private String input=null;
	private String output=null;
	private String delimiter=null;
	private int reducer=1;
	public static void main(String[] args) throws Exception {
		ToolRunner.run(new Configuration(), new ChainDriver2(),args);
	}

	@Override
	public int run(String[] arg0) throws Exception {
		configureArgs(arg0);
		checkArgs();
		Configuration conf = getConf();
		conf.set("delimiter", delimiter);
		JobConf  job= new JobConf(conf,ChainDriver2.class);

		ChainMapper.addMapper(job, MaxMapper.class, LongWritable.class,
				Text.class, Text.class, IntWritable.class, true, new JobConf(false)) ;

		ChainMapper.addMapper(job, MergeMaxMapper.class, Text.class,
				IntWritable.class, Text.class, IntWritable.class, true, new JobConf(false));

		ChainReducer.setReducer(job, MaxReducer.class, Text.class, IntWritable.class,
				Text.class, IntWritable.class, true, new JobConf(false));
		ChainReducer.addMapper(job, MergeMaxMapper.class, Text.class,
				IntWritable.class, Text.class, IntWritable.class, false, new JobConf(false));
		job.setJarByClass(ChainDriver2.class);
		job.setJobName("ChainReducer test job");

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

       /* job.setMapperClass(MaxMapper.class);
        job.setReducerClass(MaxReducer.class);*/
        job.setInputFormat(TextInputFormat.class);;
        job.setOutputFormat(TextOutputFormat.class);
        job.setNumReduceTasks(reducer);

        FileInputFormat.addInputPath(job, new Path(input));
        FileOutputFormat.setOutputPath(job, new Path(output));

        JobClient.runJob(job);
		return 0;
	}

	/**
	 * check the args
	 */
	private void checkArgs() {
		if(input==null||"".equals(input)){
			System.out.println("no input...");
			printUsage();
			System.exit(-1);
		}
		if(output==null||"".equals(output)){
			System.out.println("no output...");
			printUsage();
			System.exit(-1);
		}
		if(delimiter==null||"".equals(delimiter)){
			System.out.println("no delimiter...");
			printUsage();
			System.exit(-1);
		}
		if(reducer==0){
			System.out.println("no reducer...");
			printUsage();
			System.exit(-1);
		}
	}

	/**
	 * configuration the args
	 * @param args
	 */
	private void configureArgs(String[] args) {
    	for(int i=0;i<args.length;i++){
    		if("-i".equals(args[i])){
    			input=args[++i];
    		}
    		if("-o".equals(args[i])){
    			output=args[++i];
    		}

    		if("-delimiter".equals(args[i])){
    			delimiter=args[++i];
    		}
    		if("-reducer".equals(args[i])){
    			try {
    				reducer=Integer.parseInt(args[++i]);
				} catch (Exception e) {
					reducer=0;
				}
    		}
    	}
	}
	public static void printUsage(){
    	System.err.println("Usage:");
    	System.err.println("-i input \t cell data path.");
    	System.err.println("-o output \t output data path.");
    	System.err.println("-delimiter  data delimiter , default is blanket  .");
    	System.err.println("-reducer  reducer number , default is 1  .");
    }

}

MaxMapper:

package chain;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MaxMapper extends MapReduceBase implements Mapper<LongWritable ,Text,Text,IntWritable>{
	private Logger log = LoggerFactory.getLogger(MaxMapper.class);
	private String delimiter=null;
	@Override
	public void configure(JobConf conf){
		delimiter=conf.get("delimiter");
		log.info("delimiter:"+delimiter);
		log.info("This is the begin of MaxMapper");
	}

	@Override
	public void map(LongWritable key, Text value,
			OutputCollector<Text, IntWritable> out, Reporter reporter)
			throws IOException {
		// TODO Auto-generated method stub
		String[] values= value.toString().split(delimiter);
		log.info(values[0]+"-->"+values[1]);
		out.collect(new Text(values[0]), new IntWritable(Integer.parseInt(values[1])));

	}
	public void close(){
		log.info("This is the end of MaxMapper");
	}
}

MaxReducer:

package chain;

import java.io.IOException;
import java.util.Iterator;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public   class MaxReducer extends MapReduceBase implements Reducer<Text,IntWritable,Text,IntWritable>{
	private Logger log = LoggerFactory.getLogger(MaxReducer.class);
	@Override
	public void configure(JobConf conf){
		log.info("This is the begin of the MaxReducer");
	}
	@Override
	public void reduce(Text key, Iterator<IntWritable> values,
			OutputCollector<Text, IntWritable> out, Reporter reporter)
			throws IOException {
		// TODO Auto-generated method stub
		int max=-1;
		while(values.hasNext()){
			int value=values.next().get();
			if(value>max){
				max=value;
			}
		}
		log.info(key+"-->"+max);
		out.collect(key, new IntWritable(max));

	}

	@Override
	public void close(){
		log.info("This is the end of the MaxReducer");
	}
}

MergeMaxMapper:

package chain;

import java.io.IOException;
//import java.util.ArrayList;
//import java.util.HashMap;
//import java.util.Map;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MergeMaxMapper extends MapReduceBase implements Mapper<Text ,IntWritable,Text,IntWritable>{
	private Logger log = LoggerFactory.getLogger(MergeMaxMapper.class);
//	private Map<Text,ArrayList<IntWritable>> outMap= new HashMap<Text,ArrayList<IntWritable>>();
	@Override
	public void configure(JobConf conf){
		log.info("This is the begin of MergeMaxMapper");
	}

	@Override
	public void map(Text key, IntWritable value,
			OutputCollector<Text, IntWritable> out, Reporter reporter)
			throws IOException {
		log.info(key.toString()+"_MergeMaxMapper"+"-->"+value.get());
		out.collect(new Text(key.toString()+"_MergeMaxMapper"), value);

	}

	@Override
	public void close(){
		log.info("this is the end of MergeMaxMapper");
	}
}

编程思路如下:原始测试数据data1、data2首先经过MaxMapper(由于两个文件,所以生成了2个map),然后经过MergeMaxMapper,到MaxReducer,最后再次经过MergeMaxMapper。

在程序中添加了输出数据的log,可以通过log来查看各个map和reduce的数据流程。

mapper端的log(其中的一个mapper):

2014-05-14 17:23:51,307 INFO [main] chain.MaxMapper: delimiter:,
2014-05-14 17:23:51,307 INFO [main] chain.MaxMapper: This is the begin of MaxMapper
2014-05-14 17:23:51,454 INFO [main] chain.MergeMaxMapper: This is the begin of MergeMaxMapper
2014-05-14 17:23:51,471 INFO [main] chain.MaxMapper: A-->20
2014-05-14 17:23:51,476 INFO [main] chain.MergeMaxMapper: A_MergeMaxMapper-->20
2014-05-14 17:23:51,476 INFO [main] chain.MaxMapper: A-->21
2014-05-14 17:23:51,477 INFO [main] chain.MergeMaxMapper: A_MergeMaxMapper-->21
2014-05-14 17:23:51,477 INFO [main] chain.MaxMapper: A-->22
2014-05-14 17:23:51,477 INFO [main] chain.MergeMaxMapper: A_MergeMaxMapper-->22
2014-05-14 17:23:51,477 INFO [main] chain.MaxMapper: A-->23
2014-05-14 17:23:51,477 INFO [main] chain.MergeMaxMapper: A_MergeMaxMapper-->23
2014-05-14 17:23:51,477 INFO [main] chain.MaxMapper: B-->201
2014-05-14 17:23:51,477 INFO [main] chain.MergeMaxMapper: B_MergeMaxMapper-->201
2014-05-14 17:23:51,477 INFO [main] chain.MaxMapper: B-->301
2014-05-14 17:23:51,477 INFO [main] chain.MergeMaxMapper: B_MergeMaxMapper-->301
2014-05-14 17:23:51,478 INFO [main] chain.MaxMapper: B-->401
2014-05-14 17:23:51,478 INFO [main] chain.MergeMaxMapper: B_MergeMaxMapper-->401
2014-05-14 17:23:51,478 INFO [main] chain.MaxMapper: B-->501
2014-05-14 17:23:51,478 INFO [main] chain.MergeMaxMapper: B_MergeMaxMapper-->501
2014-05-14 17:23:51,481 INFO [main] chain.MaxMapper: This is the end of MaxMapper
2014-05-14 17:23:51,481 INFO [main] chain.MergeMaxMapper: this is the end of MergeMaxMapper

通过上面log,可以看出,通过ChainMapper添加mapper的方式的mapper的处理顺序为:首先初始化第一个mapper(即调用configure方法);接着初始第二个mapper(调用configure方法);然后开始map函数,map函数针对一条记录,首先采用mapper1进行处理,然后使用mapper2进行处理;最后是关闭阶段,关闭的顺序同样是首先关闭mapper1(调用close方法),然后关闭mapper2。

reducer端的log(其中一个reducer)

2014-05-14 17:24:10,171 INFO [main] chain.MergeMaxMapper: This is the begin of MergeMaxMapper
2014-05-14 17:24:10,311 INFO [main] chain.MaxReducer: This is the begin of the MaxReducer
2014-05-14 17:24:10,671 INFO [main] chain.MaxReducer: B_MergeMaxMapper-->501
2014-05-14 17:24:10,672 INFO [main] chain.MergeMaxMapper: B_MergeMaxMapper_MergeMaxMapper-->501
2014-05-14 17:24:10,673 INFO [main] chain.MergeMaxMapper: this is the end of MergeMaxMapper
2014-05-14 17:24:10,673 INFO [main] chain.MaxReducer: This is the end of the MaxReducer

通过上面的log可以看出,通过ChainReducer添加mapper的方式,其数据处理顺序为:首先初始化Reducer之后的Mapper,接着初始化Reducer(看configure函数即可知道);然后处理reducer,reducer的输出接着交给mapper处理;最后先关闭Mapper,接着关闭reducer。

同时,注意到,reducer后面的mapper也是两个的,即有多少个reducer,就有多少个mapper。

通过实验得到上面的ChainReducer的数据处理流程,且ChainReducer没有addReducer的方法,也即是不能添加reducer了,那么最开始提出的mapreduce数据流程就不能采用这种方式实现了。

最后,前面提出的mapreduce数据流程应该是错的,在reducer out里面C组数据不会被拆分为两个reducer,相同的key只会向同一个reducer传输。这里同样做了个试验,通过对接近90M的数据(只有一个分组A)执行上面的程序,可以看到有2个mapper,2个reducer(此数值为设置值),但是在其中一个reducer中并没有A分组的任何数据,在另外一个reducer中才有数据。其实,不用试验也是可以的,以前看的书上一般都会说相同的key进入同一个reducer中。不过,如果是这样的话,那么这样的数据效率应该不高。

返回最开始提出的场景,最开始提出的问题,如果相同的key只会进入一个reducer中,那么最后的2个数据文件(2个reducer生成2个数据文件)其实里面不会有key冲突的数据,所以在进行后面的操作的时候可以直接读多个文件即可,就像是读一个文件一样。

会产生这样的认知错误,应该是对mapreduce 原理不清楚导致。

分享,成长,快乐

转载请注明blog地址:http://blog.csdn.net/fansy1990

Hadoop MapReduce链式实践--ChainReducer

时间: 2024-07-28 21:12:54

Hadoop MapReduce链式实践--ChainReducer的相关文章

(转)Hadoop MapReduce链式实践--ChainReducer

版本:CDH5.0.0,HDFS:2.3.0,Mapreduce:2.3.0,Yarn:2.3.0. 场景描述:求一组数据中按照不同类别的最大值,比如,如下的数据: data1: [plain] view plaincopy A,10 A,11 A,12 A,13 B,21 B,31 B,41 B,51 data2: [plain] view plaincopy A,20 A,21 A,22 A,23 B,201 B,301 B,401 B,501 最后输出为: [plain] view pla

Hadoop MapReduce开发最佳实践(上篇)

body{ font-family: "Microsoft YaHei UI","Microsoft YaHei",SimSun,"Segoe UI",Tahoma,Helvetica,Sans-Serif,"Microsoft YaHei", Georgia,Helvetica,Arial,sans-serif,宋体, PMingLiU,serif; font-size: 10.5pt; line-height: 1.5;}

链式ChainMapper/ChainReducer

类似于Linux管道重定向机制,前一个Map的输出直接作为下一个Map的输入,形成一个流水线.设想这样一个场景:在Map阶段,数据经过mapper1和mapper2处理:在Reduce阶段,数据经过sort和shuffle后,交给对应的reducer处理.reducer处理后并没有直接写入到Hdfs, 而是交给了另一个mapper3处理,它产生的结果最终写到hdfs的输出目录中. 注意:对任意MR作业,Map和Reduce阶段可以有无限个Mapper,但reduer只能有一个. package

Hadoop的ChainMapper和ChainReducer使用案例(链式处理)

不多说,直接上干货!      Hadoop的MR作业支持链式处理,类似在一个生产牛奶的流水线上,每一个阶段都有特定的任务要处理,比如提供牛奶盒,装入牛奶,封盒,打印出厂日期,等等,通过这样进一步的分工,从而提高了生产效率,那么在我们的Hadoop的MapReduce中也是如此,支持链式的处理方式,这些Mapper像Linux管道一样,前一个Mapper的输出结果直接重定向到下一个Mapper的输入,形成一个流水线,而这一点与Lucene和Solr中的Filter机制是非常类似的,Hadoop项

Mapreduce 工作机制图,MapReduce组合式,迭代式,链式

Mapreduce 工作机制图: 图中1:表示待处理数据,比如日志,比如单词计数图中2:表示map阶段,对他们split,然后送到不同分区图中3:表示reduce阶段,对这些数据整合处理.图中4:表示二次mapreduce,这个是mapreduce的链式 MapReduce组合式,迭代式,链式 问题导读: 1.比如我们输出的mapreduce结果,需要进入下一个mapreduce,该怎么解决?可以使用迭代式2.那么什么是迭代式?3.什么是依赖式?4.什么是链式?5.三种模式各自的应用场景是什么?

MapReduce的组合式,迭代式,链式

1.迭代式mapreduce        一些复杂的任务难以用一次MapReduce处理完成,需要多次 MapReduce 才能完成任务,例如Pagrank,K-means算法都需要多次的迭代,关于 MapReduce 迭代在Mahout中运用较多.有兴趣的可以参考一下Mahout的源码.             在MapReduce的迭代思想,类似for循环,前一个 MapReduce的输出结果,作为下一个 MapReduce的输入,任务完成后中间结果都可以删除.        代码示例:

用php实现一个简单的链式操作

最近在读<php核心技术与最佳实践>这本书,书中第一章提到用__call()方法可以实现一个简单的字符串链式操作,比如,下面这个过滤字符串然后再求长度的操作,一般要这么写: strlen(trim($str)); 那么能否实现下面这种写法呢? $str->trim()->strlen(); 下面就来试下. 链式操作,说白了其实就是链式的调用对象的方法.既然要实现字符串的链式操作,那么就要实现一个字符串类,然后对这个类的对象进行调用操作.我对字符串类的期望如下:(1)当我创建对象时,

王家林的云计算分布式大数据Hadoop企业级开发动手实践

一:课程简介: Hadoop是云计算分布式大数据的事实标准软件框架,Hadoop中的架构实现是整个云计算产业技术的基础,作为与Google三大核心技术DFS.MapReduce.BigTable相对的HDFS.MapReduce.和HBase也是整个Hadoop生态系统的核心的技术,本课程致力于帮您掌握这三大技术的同时掌握云计算的数据仓库挖掘技术Hive,助您在云计算技术时代自由翱翔. 二:课程特色 1,      深入浅出中动手实作: 2,      掌握Hadoop三大核心:HDFS.Map

关于JavaScript中的setTimeout()链式调用和setInterval()探索

http://www.cnblogs.com/Wenwang/archive/2012/01/06/2314283.html http://www.cnblogs.com/yangjunhua/archive/2012/04/12/2444106.html 下面的参考:http://evantre.iteye.com/blog/1718777 1.选题缘起 在知乎上瞎逛的时候看到一个自问自答的问题: 知乎上,前端开发领域有哪些值得推荐的问答?,然后在有哪些经典的 Web 前端或者 JavaScr