Hadoop世界中的HelloWorld之WordCount具体分析

MapReduce 应用举例:单词计数

WorldCount可以说是MapReduce中的helloworld了,下面来看看hadoop中的例子worldcount对其进行的处理过程,也能对mapreduce的执行过程有一个清晰的认识,特别是对于每一个阶段的函数执行所产生的键值对

单词 计数主要完成的功能是:统计一系列文本文件中每个单词出现的次数,如下图所示。下面将 通过分析源代码帮助读者摸清 MapReduce 程序的基本结构。

图 3-1 单词计数

WordCount 详细的执行步骤如下:

(1) 将文件拆分成 splits,由于测试用的文件较小,所以每个文件为一个 split,并将文件按行分割形成<key, value>对,如图 3-2 所示。这一步由 MapReduce 框架自动完成,其中偏 移量(即 key 值)包括了回车所占的字符数(Windows 和 Linux 环境下会不同)。

图 3-2 分割过程

(2) 将分割好的<key, value>对交给用户定义的 map 方法进行处理,生成新的<key, value> 对,如图 3-3 所示。

图 3-3 执行 map

(3) 得到 map 方法输出的<key, value>对后,Mapper 会将它们按照 key 值进行排序,并 执行 Combine 过程,将 key 值相同的 value 值累加,得到 Mapper 的最终输出结果。

图 3-4 map 端排序以及 combine 过程

(4) Reducer 先对从 Mapper 接收的数据进行排序,再交由用户自定义的 reduce 方法进行 处理,得到新的<key, value>对,并作为 WordCount 的输出结果,如图 3-5 所示。

图 3-5 reduce 端排序以及输出结果

以上就是wordcount在mapreduce中执行的具体细节,这里面对于中间的键值对产生描述的很详细,这是理解mapreduce很好的资料;

下面来看看hadoop源码中提供的这一源代码:这份代码我的注释很详细,但是运行时需要导入很多包,还要给Eclipse配置hadoop的环境,这里主要是分析worldcount的源码;

 19 import java.io.IOException;
 20 import java.util.StringTokenizer;
 21
 22 import org.apache.hadoop.conf.Configuration;
 23
 24 import org.apache.hadoop.fs.Path;
 25 import org.apache.hadoop.io.IntWritable;
 26 import org.apache.hadoop.io.Text;
 27 import org.apache.hadoop.mapreduce.Job;
 28 import org.apache.hadoop.mapreduce.Mapper;
 29 import org.apache.hadoop.mapreduce.Reducer;
 30 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 31 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 32 import org.apache.hadoop.util.GenericOptionsParser;
 33
 34 public class WordCount {
 35
 36     public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {
 37
 38         private final static IntWritable one = new IntWritable(1);// 初始的单词都是1次,即使重复
 39         private Text word = new Text();// word表示单词
 40         /*
 41          * 重写map方法,读取初试划分的每一个键值对,即行偏移量和一行字符串,key为偏移量,value为该行字符串
 42          */
 43
 44         public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
 45             /*
 46              * 因为每一行就是一个spilt,并会为之生成一个mapper,所以我们的参数,key就是偏移量,value就是一行字符串
 47              */
 48             StringTokenizer itr = new StringTokenizer(value.toString());// value是一行的字符串,这里将其切割成多个单词
 49             while (itr.hasMoreTokens()) {// 多个单词
 50                 word.set(itr.nextToken());// 每个word
 51                 context.write(word, one);// one代表1,最开始每个单词都是1次,context直接将<word,1>写到本地磁盘上
 52                 // write函数直接将两个参数封装成<key,value>
 53             }
 54         }
 55     }
 56
 57     public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
 58         private IntWritable result = new IntWritable();
 59
 60         /*
 61          * 重写reduce函数,key为单词,values是reducer从多个mapper中得到数据后进行排序并将相同key组
 62          * 合成<key.list<V>>中的list<V>,也就是说明排序这些工作都是mapper和reducer自己去做的,
 63          * 我们只需要专注与在map和reduce函数中处理排序处理后的结果
 64          */
 65         public void reduce(Text key, Iterable<IntWritable> values, Context context)
 66                 throws IOException, InterruptedException {
 67             /*
 68              * 因为在同一个spilt对应的mapper中,会将其进行combine,使得其中单词(key)不重复,然后将这些键值对按照
 69              * hash函数分配给对应的reducer,reducer进行排序,和组合成list,然后再调用的用户自定义的这个函数,
 70              * 所以有values
 71              * 这一Iterable对象,说明,这个reducer排序后有多少个键值对,就会有多少次调用这个算法,每一次都会进行写,
 72              * 并且key在整个 并行的多个节点中是唯一的
 73              *
 74              */
 75             int sum = 0;
 76             for (IntWritable val : values) {
 77                 sum += val.get();
 78             }
 79             result.set(sum);
 80             context.write(key, result);
 81         }
 82     }
 83
 84     public static void main(String[] args) throws Exception {
 85         Configuration conf = new Configuration();
 86         String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
 87         if (otherArgs.length < 2) {
 88             System.err.println("Usage: wordcount <in> [<in>...] <out>");
 89             System.exit(2);
 90         }
 91         @SuppressWarnings("deprecation")
 92         Job job = new Job(conf, "word count");
 93         job.setJarByClass(WordCount.class);// 本次作业的job
 94         job.setMapperClass(TokenizerMapper.class);// map函数
 95         job.setCombinerClass(IntSumReducer.class);// combine的实现个reduce函数一样,都是将相同的单词组合成一个键值对
 96         job.setReducerClass(IntSumReducer.class);// reduce函数
 97         job.setOutputKeyClass(Text.class);// 键key的类型,
 98         job.setOutputValueClass(IntWritable.class);// value的类型
 99         for (int i = 0; i < otherArgs.length - 1; ++i) {
100             FileInputFormat.addInputPath(job, new Path(otherArgs[i]));//输入输出参数的获取,说明可以是多个输入文件
101         }
102         FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length - 1]));//参数的最后一个是输出文件
103         System.exit(job.waitForCompletion(true) ? 0 : 1);
104     }
105 }
时间: 2024-10-23 18:44:46

Hadoop世界中的HelloWorld之WordCount具体分析的相关文章

Hadoop版Helloworld之wordcount运行示例

1.编写一个统计单词数量的java程序,并命名为wordcount.java,代码如下: import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoo

Hadoop学习笔记(1):WordCount程序的实现与总结

开篇语: 这几天开始学习Hadoop,花费了整整一天终于把伪分布式给搭好了,激动之情无法言表······ 搭好环境之后,按着书本的代码,实现了这个被誉为Hadoop中的HelloWorld的程序--WordCount,以此开启学习Hadoop的篇章. 本篇旨在总结WordCount程序的基本结构和工作原理,有关环境的搭建这块,网上有很多的教程,大家可以自行找谷歌或百度. 何为MapReduce: 在开始看WordCount的代码之前,先简要了解下什么是MapReduce.HDFS和MapRedu

互联网世界中的C语言——我的golang学习笔记:1(基础语法快速过)

前言 学习任何知识都会有一个学习背景 最近,我们团队乃至我司整个云服务,上go的呼声越来越高!新服务已经开始用go开发,部分现有Java版的服务重构为go也只是时间问题而已,故相关技术积累势在必行!在云网络的分布式服务乃至在一切高并发,分布式后台服务中,golang都有着很大的优势. 据我对国内互联网行业的实际考察,了解,目前国内主流互联网公司都在积极投入go的怀抱…… 青云更是全栈使用了go…… 还有火的一塌糊涂的docker. 它为云而生. 它为并发而生. 还有go的安全.简洁.高效 有良好

在Spark shell中基于HDFS文件系统进行wordcount交互式分析

Spark是一个分布式内存计算框架,可部署在YARN或者MESOS管理的分布式系统中(Fully Distributed),也可以以Pseudo Distributed方式部署在单个机器上面,还可以以Standalone方式部署在单个机器上面.运行Spark的方式有interactive和submit方式.本文中所有的操作都是以interactive方式操作以Standalone方式部署的Spark.具体的部署方式,请参考Hadoop Ecosystem. HDFS是一个分布式的文件管理系统,其

hadoop之运行官方实例二--WordCount

1.在hadoop-2.9.2目录下新建一个wcinput:mkdir wcinput 2.在wcinput下新建一个文件:touch wc.input 3.vim wc.input,在wc.input中输入: hadoop yarn hadoop mapreduce gong gong 4.回到hadoop-2.9.2目录下,输入:hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-2.9.2.jar wordcount wc

比较Apache Hadoop生态系统中不同的文件格式和存储引擎的性能

这篇文章提出了在Apache Hadoop生态系统中对比一些当前流行的数据格式和可用的存储引擎的性能:Apache Avro,Apache Parquet,Apache HBase和Apache Kudu空间效率,提取性能,分析扫描以及随机数据查找等领域.这有助于理解它们中的每一个如何(何时)改善你的大数据工作负载的处理能力. 引言 最初把Hadoop文件格式和存储引擎做比较的想法是在初始系统修订版之一的驱动下完成的 --这个系统是在CERN中大规模调节Hadoop-ATLAS EventInd

显示Hadoop文件系统中一组路径的文件信息

//显示Hadoop文件系统中一组路径的文件信息 //我们可以用这个程序来显示一组路径集目录列表的并集 package com; import java.io.IOException; import java.net.URI; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.

FileSystem以标准输出格式显示Hadoop文件中的文件

//通过FileSystem API读取数据 //这里是以FileSystem以标准输出格式显示Hadoop文件中的文件 package com; import java.io.InputStream; import java.net.URI; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import o

使用seek()方法,将Hadoop文件系统中的一个文件在标准输出上显示两次

//使用seek()方法,将Hadoop文件系统中的一个文件在标准输出上显示两次 package com; import java.io.IOException; import java.net.URI; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apac