Hadoop之——Combiner编程

转载请注明出处:http://blog.csdn.net/l1028386804/article/details/46135857

一、Mapper类的实现

	/**
	 * KEYIN	即k1		表示行的偏移量
	 * VALUEIN	即v1		表示行文本内容
	 * KEYOUT	即k2		表示行中出现的单词
	 * VALUEOUT	即v2		表示行中出现的单词的次数,固定值1
	 */
	static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable>{
		protected void map(LongWritable k1, Text v1, Context context) throws java.io.IOException ,InterruptedException {
			final String[] splited = v1.toString().split("\t");
			for (String word : splited) {
				context.write(new Text(word), new LongWritable(1));
				System.out.println("Mapper输出<"+word+","+1+">");
			}
		};
	}

二、Reducer类的实现

	/**
	 * KEYIN	即k2		表示行中出现的单词
	 * VALUEIN	即v2		表示行中出现的单词的次数
	 * KEYOUT	即k3		表示文本中出现的不同单词
	 * VALUEOUT	即v3		表示文本中出现的不同单词的总次数
	 *
	 */
	static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable>{
		protected void reduce(Text k2, java.lang.Iterable<LongWritable> v2s, Context ctx) throws java.io.IOException ,InterruptedException {
			//显示次数表示redcue函数被调用了多少次,表示k2有多少个分组
			System.out.println("MyReducer输入分组<"+k2.toString()+",...>");
			long times = 0L;
			for (LongWritable count : v2s) {
				times += count.get();
				//显示次数表示输入的k2,v2的键值对数量
				System.out.println("MyReducer输入键值对<"+k2.toString()+","+count.get()+">");
			}
			ctx.write(k2, new LongWritable(times));
		};
	}

三、Combiner的类实现

	static class MyCombiner extends Reducer<Text, LongWritable, Text, LongWritable>{
		protected void reduce(Text k2, java.lang.Iterable<LongWritable> v2s, Context ctx) throws java.io.IOException ,InterruptedException {
			//显示次数表示redcue函数被调用了多少次,表示k2有多少个分组
			System.out.println("Combiner输入分组<"+k2.toString()+",...>");
			long times = 0L;
			for (LongWritable count : v2s) {
				times += count.get();
				//显示次数表示输入的k2,v2的键值对数量
				System.out.println("Combiner输入键值对<"+k2.toString()+","+count.get()+">");
			}

			ctx.write(k2, new LongWritable(times));
			//显示次数表示输出的k2,v2的键值对数量
			System.out.println("Combiner输出键值对<"+k2.toString()+","+times+">");
		};
	}

四、完整代码

package combine;

import java.net.URI;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/**
 * 问:为什么使用Combiner?
 * 答:Combiner发生在Map端,对数据进行规约处理,数据量变小了,传送到reduce端的数据量变小了,传输时间变短,作业的整体时间变短。
 *
 * 问:为什么Combiner不作为MR运行的标配,而是可选步骤哪?
 * 答:因为不是所有的算法都适合使用Combiner处理,例如求平均数。
 *
 * 问:Combiner本身已经执行了reduce操作,为什么在Reducer阶段还要执行reduce操作哪?
 * 答:combiner操作发生在map端的,处理一个任务所接收的文件中的数据,不能跨map任务执行;只有reduce可以接收多个map任务处理的数据。
 *
 */
public class WordCountApp {
	static final String INPUT_PATH = "hdfs://liuyazhuang:9000/hello";
	static final String OUT_PATH = "hdfs://liuyazhuang:9000/out";

	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), conf);
		final Path outPath = new Path(OUT_PATH);
		if(fileSystem.exists(outPath)){
			fileSystem.delete(outPath, true);
		}

		final Job job = new Job(conf , WordCountApp.class.getSimpleName());
		//1.1指定读取的文件位于哪里
		FileInputFormat.setInputPaths(job, INPUT_PATH);
		//指定如何对输入文件进行格式化,把输入文件每一行解析成键值对
		//job.setInputFormatClass(TextInputFormat.class);

		//1.2 指定自定义的map类
		job.setMapperClass(MyMapper.class);
		//map输出的<k,v>类型。如果<k3,v3>的类型与<k2,v2>类型一致,则可以省略
		//job.setMapOutputKeyClass(Text.class);
		//job.setMapOutputValueClass(LongWritable.class);

		//1.3 分区
		//job.setPartitionerClass(HashPartitioner.class);
		//有一个reduce任务运行
		//job.setNumReduceTasks(1);

		//1.4 TODO 排序、分组

		//1.5 规约
		job.setCombinerClass(MyCombiner.class);

		//2.2 指定自定义reduce类
		job.setReducerClass(MyReducer.class);
		//指定reduce的输出类型
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(LongWritable.class);

		//2.3 指定写出到哪里
		FileOutputFormat.setOutputPath(job, outPath);
		//指定输出文件的格式化类
		//job.setOutputFormatClass(TextOutputFormat.class);

		//把job提交给JobTracker运行
		job.waitForCompletion(true);
	}

	/**
	 * KEYIN	即k1		表示行的偏移量
	 * VALUEIN	即v1		表示行文本内容
	 * KEYOUT	即k2		表示行中出现的单词
	 * VALUEOUT	即v2		表示行中出现的单词的次数,固定值1
	 */
	static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable>{
		protected void map(LongWritable k1, Text v1, Context context) throws java.io.IOException ,InterruptedException {
			final String[] splited = v1.toString().split("\t");
			for (String word : splited) {
				context.write(new Text(word), new LongWritable(1));
				System.out.println("Mapper输出<"+word+","+1+">");
			}
		};
	}

	/**
	 * KEYIN	即k2		表示行中出现的单词
	 * VALUEIN	即v2		表示行中出现的单词的次数
	 * KEYOUT	即k3		表示文本中出现的不同单词
	 * VALUEOUT	即v3		表示文本中出现的不同单词的总次数
	 *
	 */
	static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable>{
		protected void reduce(Text k2, java.lang.Iterable<LongWritable> v2s, Context ctx) throws java.io.IOException ,InterruptedException {
			//显示次数表示redcue函数被调用了多少次,表示k2有多少个分组
			System.out.println("MyReducer输入分组<"+k2.toString()+",...>");
			long times = 0L;
			for (LongWritable count : v2s) {
				times += count.get();
				//显示次数表示输入的k2,v2的键值对数量
				System.out.println("MyReducer输入键值对<"+k2.toString()+","+count.get()+">");
			}
			ctx.write(k2, new LongWritable(times));
		};
	}

	static class MyCombiner extends Reducer<Text, LongWritable, Text, LongWritable>{
		protected void reduce(Text k2, java.lang.Iterable<LongWritable> v2s, Context ctx) throws java.io.IOException ,InterruptedException {
			//显示次数表示redcue函数被调用了多少次,表示k2有多少个分组
			System.out.println("Combiner输入分组<"+k2.toString()+",...>");
			long times = 0L;
			for (LongWritable count : v2s) {
				times += count.get();
				//显示次数表示输入的k2,v2的键值对数量
				System.out.println("Combiner输入键值对<"+k2.toString()+","+count.get()+">");
			}

			ctx.write(k2, new LongWritable(times));
			//显示次数表示输出的k2,v2的键值对数量
			System.out.println("Combiner输出键值对<"+k2.toString()+","+times+">");
		};
	}
}

五、运行结果

时间: 2024-10-11 11:35:55

Hadoop之——Combiner编程的相关文章

Hadoop中Combiner的使用

文章转载于:http://blog.csdn.net/ipolaris/article/details/8723782 Hadoop中Combiner的使用 在MapReduce中,当map生成的数据过大时,带宽就成了瓶颈,怎样精简压缩传给Reduce的数据,有不影响最终的结果呢.有一种方法就是使用Combiner,Combiner号称本地的Reduce,Reduce最终的输入,是Combiner的输出.下面以<Hadoop in action>中的专利数据为例.我们打算统计每个国家的专利数目

Hadoop的Combiner

在许多MapReduce应用的场景中,如果能在向reducer分发mapper结果之前做一下"本地化Reduce".一wordcount为例子,如果作业处理中的文件单词中"the"出现了574次,存储并shuffling一次("the",574)key/valuthe对比许多次("the",1)更有效.这个过程叫做合并(Combiner). hadoop 通过扩展MapReduce框架,在mapper何reducer之间增加了

Hadoop HDFS Java编程

import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.IOException; import java.net.URI; import org.apache.commons.io.IOUtils; import org.apache.hadoop.conf.Configuration; import org.apac

对于Hadoop的MapReduce编程makefile

根据近期需要hadoop的MapReduce程序集成到一个大的应用C/C++书面框架.在需求make当自己主动MapReduce编译和打包的应用. 在这里,一个简单的WordCount1一个例子详细的实施细则,注意:hadoop版本号2.4.0. 源码包括两个文件.一个是WordCount1.java是详细的对单词计数实现的逻辑.第二个是CounterThread.java.当中简单的当前处理的行数做一个统计和打印.代码分别见附1. 编写makefile的关键是将hadoop提供的jar包的路径

0基础搭建Hadoop大数据处理-编程

Hadoop的编程可以是在Linux环境或Winows环境中,在此以Windows环境为示例,以Eclipse工具为主(也可以用IDEA).网上也有很多开发的文章,在此也参考他们的内容只作简单的介绍和要点总结. Hadoop是一个强大的并行框架,它允许任务在其分布式集群上并行处理.但是编写.调试Hadoop程序都有很大难度.正因为如此,Hadoop的开发者开发出了Hadoop Eclipse插件,它在Hadoop的开发环境中嵌入了Eclipse,从而实现了开发环境的图形化,降低了编程难度.在安装

[Hadoop入门] - 1 Ubuntu系统 Hadoop介绍 MapReduce编程思想

Ubuntu系统 (我用到版本号是140.4) ubuntu系统是一个以桌面应用为主的Linux操作系统,Ubuntu基于Debian发行版和GNOME桌面环境.Ubuntu的目标在于为一般用户提供一个最新的.同时又相当稳定的主要由自由软件构建而成的操作系统,它可免费使用,并带有社团及专业的支持应. 作为Hadoop大数据开发测试环境, 建议大家不要在windows上安装CgyWin来学习或研究, 直接用Vmware+ubuntu来学习. 下载 www.vmware.com这里下载vmware,

基础搭建Hadoop大数据处理-编程

Hadoop的编程可以是在Linux环境或Winows环境中,在此以Windows环境为示例,以Eclipse工具为主(也可以用IDEA).网上也有很多开发的文章,在此也参考他们的内容只作简单的介绍和要点总结. Hadoop是一个强大的并行框架,它允许任务在其分布式集群上并行处理.但是编写.调试Hadoop程序都有很大难度.正因为如此,Hadoop的开发者开发出了Hadoop Eclipse插件,它在Hadoop的开发环境中嵌入了Eclipse,从而实现了开发环境的图形化,降低了编程难度.在安装

myeclipse连接hadoop集群编程及问题解决

原以为搭建一个本地编程测试hadoop程序的环境很简单,没想到还是做得焦头烂额,在此分享步骤和遇到的问题,希望大家顺利. 一.要实现连接hadoop集群并能够编码的目的需要做如下准备: 1.远程hadoop集群(我的master地址为192.168.85.2) 2.本地myeclipse及myeclipse连接hadoop的插件 3.本地hadoop(我用的是hadoop-2.7.2) 先下载插件hadoop-eclipse-plugin,我用的是hadoop-eclipse-plugin-2.

Hadoop 使用Combiner提高Map/Reduce程序效率

众所周知,Hadoop框架使用Mapper将数据处理成一个<key,value>键值对,再网络节点间对其进行整理(shuffle),然后使用Reducer处理数据并进行最终输出. 在上述过程中,我们看到至少两个性能瓶颈: 如果我们有10亿个数据,Mapper会生成10亿个键值对在网络间进行传输,但如果我们只是对数据求最大值,那么很明显的Mapper只需要输出它所知道的最大值即可.这样做不仅可以减轻网络压力,同样也可以大幅度提高程序效率. 使用专利中的国家一项来阐述数据倾斜这个定义.这样的数据远