Hadoop读书笔记(十)MapReduce中的从计数器理解combiner归约

Hadoop读书笔记系列文章:http://blog.csdn.net/caicongyang/article/category/2166855

1.combiner

问:什么是combiner:

答:Combiner发生在Mapper端,对数据进行归约处理,使传到reducer端的数据变小了,传输时间变端,作业时间变短,Combiner不能夸Mapper执行,(只有reduce可以接受多个Mapper的任务)。 并不是所有的算法都适合归约处理,例如求平均数

2.代码实现

WordCount.java

package combine;

import java.io.IOException;
import java.net.URI;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
/**
 *
 * <p>
 * Title: WordCount.java
 * Package counter
 * </p>
 * <p>
 * Description:
 *	问:什么是combiner:
 *  答:Combiner发生在Mapper端,对数据进行归约处理,使传到reducer端的数据变小了,传输时间变端,作业时间变短,Combiner不能夸Mapper执行,
 *  (只有reduce可以接受多个Mapper的任务)并不是多少的算法都适合归约处理,例如求平均数
 *
 * <p>
 * @author Tom.Cai
 * @created 2014-11-26 下午10:47:32
 * @version V1.0
 *
 */
public class WordCount {
	private static final String INPUT_PATH = "hdfs://192.168.80.100:9000/hello";
	private static final String OUT_PATH = "hdfs://192.168.80.100:9000/out";

	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), conf);
		Path outPath = new Path(OUT_PATH);
		if (fileSystem.exists(outPath)) {
			fileSystem.delete(outPath, true);
		}
		Job job = new Job(conf, WordCount.class.getSimpleName());
		//1.1设定输入文件
		FileInputFormat.setInputPaths(job, INPUT_PATH);
		//1.2设定输入格式
		job.setInputFormatClass(TextInputFormat.class);
		//指定自定义Mapper类
		job.setMapperClass(MyMapper.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(LongWritable.class);
		//1.3设定分区
		job.setPartitionerClass(HashPartitioner.class);
		job.setNumReduceTasks(1);
		//1.4排序分组

		//1.5归约
		job.setCombinerClass(MyReducer.class);

		//2.2设定Reduce类
		job.setReducerClass(MyReducer.class);

		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(LongWritable.class);
		//2.3指定输出地址
		FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));
		job.setOutputFormatClass(TextOutputFormat.class);

		job.waitForCompletion(true);
	}

	static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
		@Override
		protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
			String[] splited = value.toString().split("\t");
			for (String word : splited) {
				context.write(new Text(word), new LongWritable(1));
			}
		}
	}

	static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
		@Override
		protected void reduce(Text key, Iterable<LongWritable> value, Context context) throws IOException, InterruptedException {
			long count = 0L;
			for (LongWritable times : value) {
				count += times.get();
			}
			context.write(key, new LongWritable(count));
		}

	}

}
</pre><p></p><pre>

3.加入Combiner后的计数器:

14/12/01 21:26:41 INFO mapred.JobClient: Counters: 19

14/12/01 21:26:41 INFO mapred.JobClient:   File Output Format Counters

14/12/01 21:26:41 INFO mapred.JobClient:     Bytes Written=20

14/12/01 21:26:41 INFO mapred.JobClient:   FileSystemCounters

14/12/01 21:26:41 INFO mapred.JobClient:     FILE_BYTES_READ=346

14/12/01 21:26:41 INFO mapred.JobClient:     HDFS_BYTES_READ=40

14/12/01 21:26:41 INFO mapred.JobClient:     FILE_BYTES_WRITTEN=128546

14/12/01 21:26:41 INFO mapred.JobClient:     HDFS_BYTES_WRITTEN=20

14/12/01 21:26:41 INFO mapred.JobClient:   File Input Format Counters

14/12/01 21:26:41 INFO mapred.JobClient:     Bytes Read=20

14/12/01 21:26:41 INFO mapred.JobClient:   Map-Reduce Framework

14/12/01 21:26:41 INFO mapred.JobClient:     Map output materialized bytes=50

14/12/01 21:26:41 INFO mapred.JobClient:     Map input records=2

14/12/01 21:26:41 INFO mapred.JobClient:     Reduce shuffle bytes=0

14/12/01 21:26:41 INFO mapred.JobClient:     Spilled Records=6

14/12/01 21:26:41 INFO mapred.JobClient:     Map output bytes=52

14/12/01 21:26:41 INFO mapred.JobClient:     Total committed heap usage (bytes)=532807680

14/12/01 21:26:41 INFO mapred.JobClient:     SPLIT_RAW_BYTES=97

14/12/01 21:26:41 INFO mapred.JobClient:     Combine input records=4

14/12/01 21:26:41 INFO mapred.JobClient:     Reduce input records=3

14/12/01 21:26:41 INFO mapred.JobClient:     Reduce input groups=3

14/12/01 21:26:41 INFO mapred.JobClient:     Combine output records=3

14/12/01 21:26:41 INFO mapred.JobClient:     Reduce output records=3

14/12/01 21:26:41 INFO mapred.JobClient:     Map output records=4

4.未加入归约之前的计数器

14/12/01 21:35:27 INFO mapred.JobClient: Counters: 19

14/12/01 21:35:27 INFO mapred.JobClient:   File Output Format Counters

14/12/01 21:35:27 INFO mapred.JobClient:     Bytes Written=20

14/12/01 21:35:27 INFO mapred.JobClient:   FileSystemCounters

14/12/01 21:35:27 INFO mapred.JobClient:     FILE_BYTES_READ=362

14/12/01 21:35:27 INFO mapred.JobClient:     HDFS_BYTES_READ=40

14/12/01 21:35:27 INFO mapred.JobClient:     FILE_BYTES_WRITTEN=128090

14/12/01 21:35:27 INFO mapred.JobClient:     HDFS_BYTES_WRITTEN=20

14/12/01 21:35:27 INFO mapred.JobClient:   File Input Format Counters

14/12/01 21:35:27 INFO mapred.JobClient:     Bytes Read=20

14/12/01 21:35:27 INFO mapred.JobClient:   Map-Reduce Framework

14/12/01 21:35:27 INFO mapred.JobClient:     Map output materialized bytes=66

14/12/01 21:35:27 INFO mapred.JobClient:     Map input records=2

14/12/01 21:35:27 INFO mapred.JobClient:     Reduce shuffle bytes=0

14/12/01 21:35:27 INFO mapred.JobClient:     Spilled Records=8

14/12/01 21:35:27 INFO mapred.JobClient:     Map output bytes=52

14/12/01 21:35:27 INFO mapred.JobClient:     Total committed heap usage (bytes)=366034944

14/12/01 21:35:27 INFO mapred.JobClient:     SPLIT_RAW_BYTES=97

14/12/01 21:35:27 INFO mapred.JobClient:     Combine input records=0

14/12/01 21:35:27 INFO mapred.JobClient:     Reduce input records=4

14/12/01 21:35:27 INFO mapred.JobClient:     Reduce input groups=3

14/12/01 21:35:27 INFO mapred.JobClient:     Combine output records=0

14/12/01 21:35:27 INFO mapred.JobClient:     Reduce output records=3

14/12/01 21:35:27 INFO mapred.JobClient:     Map output records=4

5.总结

从前后两个计数器输出可以看到:加了归约以后 Reduce input records从4变成了3,从Mapper端到Reduce端的作业变少了,传输时间变少了,从而提升了整体的作业时间。

欢迎大家一起讨论学习!

有用的自己收!

记录与分享,让你我共成长!欢迎查看我的其他博客;我的博客地址:http://blog.csdn.net/caicongyang

时间: 2024-12-10 19:21:29

Hadoop读书笔记(十)MapReduce中的从计数器理解combiner归约的相关文章

Hadoop学习笔记—11.MapReduce中的排序和分组

一.写在之前的 1.1 回顾Map阶段四大步凑 首先,我们回顾一下在MapReduce中,排序和分组在哪里被执行: 从上图中可以清楚地看出,在Step1.4也就是第四步中,需要对不同分区中的数据进行排序和分组,默认情况下,是按照key进行排序和分组. 1.2 实验场景数据文件 在一些特定的数据文件中,不一定都是类似于WordCount单次统计这种规范的数据,比如下面这类数据,它虽然只有两列,但是却有一定的实践意义. 3 3 3 2 3 1 2 2 2 1 1 1 (1)如果按照第一列升序排列,当

Hadoop学习笔记—12.MapReduce中的常见算法

一.MapReduce中有哪些常见算法 (1)经典之王:单词计数 这个是MapReduce的经典案例,经典的不能再经典了! (2)数据去重 "数据去重"主要是为了掌握和利用并行化思想来对数据进行有意义的筛选.统计大数据集上的数据种类个数.从网站日志中计算访问地等这些看似庞杂的任务都会涉及数据去重. (3)排序:按某个Key进行升序或降序排列 (4)TopK:对源数据中所有数据进行排序,取出前K个数据,就是TopK. 通常可以借助堆(Heap)来实现TopK问题. (5)选择:关系代数基

Hadoop读书笔记(十四)MapReduce中TopK算法(Top100算法)

Hadoop读书笔记系列文章:http://blog.csdn.net/caicongyang/article/category/2166855 (系列文章会逐步修整完成,添加数据文件格式预计相关注释) 1.说明: 从给定的文件中的找到最大的100个值,给定的数据文件格式如下: 533 16565 17800 2929 11374 9826 6852 20679 18224 21222 8227 5336 912 29525 3382 2100 10673 12284 31634 27405 1

Hadoop读书笔记(十一)MapReduce中的partition分组

Hadoop读书笔记系列文章:http://blog.csdn.net/caicongyang/article/category/2166855 1.partition分组 partition是指定分组算法,以及通过setNumReduceTasks设定Reduce的任务个数 2.代码 KpiApp.ava package cmd; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; i

Hadoop读书笔记(十三)MapReduce中Top算法

Hadoop读书笔记系列文章:http://blog.csdn.net/caicongyang/article/category/2166855 1.说明: 从给定的文件中的找到最大值 2.代码: TopApp.java package suanfa; import java.io.IOException; import java.net.URI; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.F

Hadoop读书笔记(十二)MapReduce自定义排序

Hadoop读书笔记系列文章:http://blog.csdn.net/caicongyang/article/category/2166855 1.说明: 对给出的两列数据首先按照第一列升序排列,当第一列相同时,第二列升序排列 数据格式: 3 3 3 2 3 1 2 2 2 1 1 1 2.代码 SortApp.java package sort; import java.io.DataInput; import java.io.DataOutput; import java.io.IOExc

Hadoop读书笔记(六)MapReduce自定义数据类型demo

Hadoop读书笔记(一)Hadoop介绍:http://blog.csdn.net/caicongyang/article/details/39898629 Hadoop读书笔记(二)HDFS的shell操作:http://blog.csdn.net/caicongyang/article/details/41253927 Hadoop读书笔记(三)Java API操作HDFS:http://blog.csdn.net/caicongyang/article/details/41290955

Hadoop读书笔记(五)MapReduce统计单词demo

Hadoop读书笔记(一)Hadoop介绍:http://blog.csdn.net/caicongyang/article/details/39898629 Hadoop读书笔记(二)HDFS的shell操作:http://blog.csdn.net/caicongyang/article/details/41253927 Hadoop读书笔记(三)Java API操作HDFS:http://blog.csdn.net/caicongyang/article/details/41290955

Hadoop读书笔记(八)MapReduce 打成jar包demo

Hadoop读书笔记(一)Hadoop介绍:http://blog.csdn.net/caicongyang/article/details/39898629 Hadoop读书笔记(二)HDFS的shell操作:http://blog.csdn.net/caicongyang/article/details/41253927 Hadoop读书笔记(三)Java API操作HDFS:http://blog.csdn.net/caicongyang/article/details/41290955