Map/Reduce简单样例----wordcount

1.1 MapReduce编程模型  

  MapReduce采用"分而治之"的思想,把对大规模数据集的操作,分发给一个主节点管理下的各个分节点共同完成,然后通过整合各个节点的中间结果,得到最终结果。简单地说,MapReduce就是"任务的分解与结果的汇总"。

  在Hadoop中,用于执行MapReduce任务的机器角色有两个:一个是JobTracker;另一个是TaskTracker,JobTracker是用于调度工作的,TaskTracker是用于执行工作的。一个Hadoop集群中只有一台JobTracker。

  在分布式计算中,MapReduce框架负责处理了并行编程中分布式存储、工作调度、负载均衡、容错均衡、容错处理以及网络通信等复杂问题,把处理过程高度抽象为两个函数:map和reduce,map负责把任务分解成多个任务,reduce负责把分解后多任务处理的结果汇总起来。

  需要注意的是,用MapReduce来处理的数据集(或任务)必须具备这样的特点:待处理的数据集可以分解成许多小的数据集,而且每一个小数据集都可以完全并行地进行处理。

1.2 MapReduce处理过程

  在Hadoop中,每个MapReduce任务都被初始化为一个Job,每个Job又可以分为两种阶段:map阶段和reduce阶段。这两个阶段分别用两个函数表示,即map函数和reduce函数。map函数接收一个<key,value>形式的输入,然后同样产生一个<key,value>形式的中间输出,Hadoop函数接收一个如<key,(list of values)>形式的输入,然后对这个value集合进行处理,每个reduce产生0或1个输出,reduce的输出也是<key,value>形式的。

                            MapReduce处理大数据集的过程

2、运行WordCount程序

  单词计数是最简单也是最能体现MapReduce思想的程序之一,可以称为MapReduce版"Hello World",该程序的完整代码可以在Hadoop安装包的"src/examples"目录下找到。单词计数主要完成功能是:统计一系列文本文件中每个单词出现的次数,如下图所示。

  1)源代码程序

  1 package org.apache.hadoop.examples;
  2
  3 import java.io.IOException;
  4
  5 import java.util.StringTokenizer;
  6
  7 import org.apache.hadoop.conf.Configuration;
  8
  9 import org.apache.hadoop.fs.Path;
 10
 11 import org.apache.hadoop.io.IntWritable;
 12
 13 import org.apache.hadoop.io.Text;
 14
 15 import org.apache.hadoop.mapreduce.Job;
 16
 17 import org.apache.hadoop.mapreduce.Mapper;
 18
 19 import org.apache.hadoop.mapreduce.Reducer;
 20
 21 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 22
 23 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 24
 25 import org.apache.hadoop.util.GenericOptionsParser;
 26
 27 public class WordCount {
 28
 29   public static class TokenizerMapper
 30
 31       extends Mapper<Object, Text, Text, IntWritable>{
 32
 33       private final static IntWritable one = new IntWritable(1);
 34
 35       private Text word = new Text();
 36
 37
 38
 39       public void map(Object key, Text value, Context context)
 40
 41         throws IOException, InterruptedException {
 42
 43         StringTokenizer itr = new StringTokenizer(value.toString());
 44
 45         while (itr.hasMoreTokens()) {
 46
 47         word.set(itr.nextToken());
 48
 49         context.write(word, one);
 50
 51       }
 52
 53     }
 54
 55   }
 56
 57   public static class IntSumReducer
 58
 59       extends Reducer<Text,IntWritable,Text,IntWritable> {
 60
 61       private IntWritable result = new IntWritable();
 62
 63       public void reduce(Text key, Iterable<IntWritable> values,Context context)
 64
 65            throws IOException, InterruptedException {
 66
 67         int sum = 0;
 68
 69         for (IntWritable val : values) {
 70
 71            sum += val.get();
 72
 73         }
 74
 75       result.set(sum);
 76
 77       context.write(key, result);
 78
 79     }
 80
 81   }
 82
 83
 84
 85   public static void main(String[] args) throws Exception {
 86
 87     Configuration conf = new Configuration();
 88
 89     String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
 90
 91     if (otherArgs.length != 2) {
 92
 93       System.err.println("Usage: wordcount <in> <out>");
 94
 95       System.exit(2);
 96
 97     }
 98
 99     Job job = new Job(conf, "word count");
100
101     job.setJarByClass(WordCount.class);
102
103     job.setMapperClass(TokenizerMapper.class);
104
105     job.setCombinerClass(IntSumReducer.class);
106
107     job.setReducerClass(IntSumReducer.class);
108
109     job.setOutputKeyClass(Text.class);
110
111     job.setOutputValueClass(IntWritable.class);
112
113     FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
114
115     FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
116
117     System.exit(job.waitForCompletion(true) ? 0 : 1);
118
119 }
120
121 }

  2)Map过程

 1 public static class TokenizerMapper
 2
 3   extends Mapper<Object, Text, Text, IntWritable>{
 4
 5   private final static IntWritable one = new IntWritable(1);
 6
 7   private Text word = new Text();
 8
 9   public void map(Object key, Text value, Context context)
10
11     throws IOException, InterruptedException {
12
13     StringTokenizer itr = new StringTokenizer(value.toString());
14
15     while (itr.hasMoreTokens()) {
16
17       word.set(itr.nextToken());
18
19       context.write(word, one);
20
21   }
22
23 }

  Map过程需要继承org.apache.hadoop.mapreduce包中Mapper类,并重写其map方法。通过在map方法中添加两句把key值和value值输出到控制台的代码,可以发现map方法中value值存储的是文本文件中的一行(以回车符为行结束标记),而key值为该行的首字母相对于文本文件的首地址的偏移量。然后StringTokenizer类将每一行拆分成为一个个的单词,并将<word,1>作为map方法的结果输出,其余的工作都交有MapReduce框架处理。

  3)Reduce过程

 1 public static class IntSumReducer
 2
 3   extends Reducer<Text,IntWritable,Text,IntWritable> {
 4
 5   private IntWritable result = new IntWritable();
 6
 7   public void reduce(Text key, Iterable<IntWritable> values,Context context)
 8
 9      throws IOException, InterruptedException {
10
11     int sum = 0;
12
13     for (IntWritable val : values) {
14
15       sum += val.get();
16
17     }
18
19     result.set(sum);
20
21     context.write(key, result);
22
23   }
24
25 }

  Reduce过程需要继承org.apache.hadoop.mapreduce包中Reducer类,并重写其reduce方法。Map过程输出<key,values>中key为单个单词,而values是对应单词的计数值所组成的列表,Map的输出就是Reduce的输入,所以reduce方法只要遍历values并求和,即可得到某个单词的总次数。

  4)执行MapReduce任务

  

 1 public static void main(String[] args) throws Exception {
 2
 3   Configuration conf = new Configuration();
 4
 5   String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
 6
 7   if (otherArgs.length != 2) {
 8
 9     System.err.println("Usage: wordcount <in> <out>");
10
11     System.exit(2);
12
13   }
14
15   Job job = new Job(conf, "word count");
16
17   job.setJarByClass(WordCount.class);
18
19   job.setMapperClass(TokenizerMapper.class);
20
21   job.setCombinerClass(IntSumReducer.class);
22
23   job.setReducerClass(IntSumReducer.class);
24
25   job.setOutputKeyClass(Text.class);
26
27   job.setOutputValueClass(IntWritable.class);
28
29   FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
30
31   FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
32
33   System.exit(job.waitForCompletion(true) ? 0 : 1);
34
35 }

  在MapReduce中,由Job对象负责管理和运行一个计算任务,并通过Job的一些方法对任务的参数进行相关的设置。此处设置了使用TokenizerMapper完成Map过程中的处理和使用IntSumReducer完成Combine和Reduce过程中的处理。还设置了Map过程和Reduce过程的输出类型:key的类型为Text,value的类型为IntWritable。任务的输出和输入路径则由命令行参数指定,并由FileInputFormat和FileOutputFormat分别设定。完成相应任务的参数设定后,即可调用job.waitForCompletion()方法执行任务。

3、WordCount处理过程

  1)将文件拆分成splits,由于测试用的文件较小,所以每个文件为一个split,并将文件按行分割形成<key,value>对,如图4-1所示。这一步由MapReduce框架自动完成,其中偏移量(即key值)包括了回车所占的字符数(Windows和Linux环境会不同)。

                        

                                      图4-1 分割过程

  2)将分割好的<key,value>对交给用户定义的map方法进行处理,生成新的<key,value>对,如图4-2所示。

                        

                                    图4-2 执行map方法

  3)得到map方法输出的<key,value>对后,Mapper会将它们按照key值进行排序,并执行Combine过程,将key至相同value值累加,得到Mapper的最终输出结果。如图4-3所示。

                   

                                图4-3 Map端排序及Combine过程

  4)Reducer先对从Mapper接收的数据进行排序,再交由用户自定义的reduce方法进行处理,得到新的<key,value>对,并作为WordCount的输出结果,如图4-4所示

                     

                                  图4-4 Reduce端排序及输出结果

4、MapReduce新旧改变  

    • 新的API倾向于使用抽象类,而不是接口,因为这更容易扩展。例如,你可以添加一个方法(用默认的实现)到一个抽象类而不需修改类之前的实现方法。在新的API中,Mapper和Reducer是抽象类。
    • 新的API是在org.apache.hadoop.mapreduce包(和子包)中的。之前版本的API则是放在org.apache.hadoop.mapred中的。
    • 新的API广泛使用context object(上下文对象),并允许用户代码与MapReduce系统进行通信。例如,MapContext基本上充当着JobConf的OutputCollector和Reporter的角色。
    • 新的API同时支持"推"和"拉"式的迭代。在这两个新老API中,键/值记录对被推mapper中,但除此之外,新的API允许把记录从map()方法中拉出,这也适用于reducer。"拉"式的一个有用的例子是分批处理记录,而不是一个接一个。
    • 新的API统一了配置。旧的API有一个特殊的JobConf对象用于作业配置,这是一个对于Hadoop通常的Configuration对象的扩展。在新的API中,这种区别没有了,所以作业配置通过Configuration来完成。作业控制的执行由Job类来负责,而不是JobClient,它在新的API中已经荡然无存。
时间: 2024-08-06 08:48:58

Map/Reduce简单样例----wordcount的相关文章

Junit的最简单样例:Hello world!

我的技术博客经常被流氓网站恶意爬取转载.请移步原文:http://www.cnblogs.com/hamhog/p/3824934.html,享受整齐的排版.有效的链接.正确的代码缩进.更好的阅读体验. 不多说了,贴两段代码. HelloWorld类: public class HelloWorld { public void main() { System.out.println(helloWorld()); } public static String helloWorld() { retu

自己定义隐式转换和显式转换c#简单样例

自己定义隐式转换和显式转换c#简单样例 (出自朱朱家园http://blog.csdn.net/zhgl7688) 样例:对用户user中,usernamefirst name和last name进行转换成合成一个限定长度为10个字符新name. 自己定义隐式转换: namespace transduction { public partial class transductionForm : Form { public transductionForm() { InitializeCompon

JsPlumb简单样例

JsPlumb简单样例: <!DOCTYPE html> <html> <head>     <script src="jquery-1.9.0.js"></script>     <script src="jquery-ui-1.9.2-min.js"></script>     <script src="jquery.jsPlumb-1.4.0-all.js&qu

velocity简单样例

velocity简单样例整体实现须要三个步骤,详细例如以下: 1.创建一个Javaproject 2.导入须要的jar包 3.创建须要的文件 ============================================ 1.创建一个Javaproject 名称:JKTest,例如以下: watermark/2/text/aHR0cDovL2Jsb2cuY3Nkbi5uZXQvbG92ZV9qaw==/font/5a6L5L2T/fontsize/400/fill/I0JBQkFCMA

最大熵算法及简单样例

近期在学模式识别,正在看Introduction to Pattern Recognition这本书,挺不错的一本书.好.以下和大家一起来学习最大熵算法. 首先,最大熵算法是干什么的呢?通常是用来预计一个分布,至于把分布预计出来之后用来干什么,那要视详细问题而定. 那这里的"熵"是什么意思呢?它是指信息熵,一个分布的均匀程度能够用熵的大小来衡量.熵越大,就越均匀.而最大熵就是要求在满足特定约束下,分布是什么样的时候.熵最大.也就是越均匀越好. 为什么在满足特定约束下越均匀越好?由于你已

VC6 鼠标钩子 最简单样例

Windows系统是建立在事件驱动的机制上的,说穿了就是整个系统都是通过消息的传递来实现的.而钩子是Windows系统中非常重要的系统接口,用它能够截获并处理送给其它应用程序的消息,来完毕普通应用程序难以实现的功能.钩子能够监视系统或进程中的各种事件消息,截获发往目标窗体的消息并进行处理.这样,我们就能够在系统中安装自己定义的钩子,监视系统中特定事件的发生,完毕特定的功能,比方截获键盘.鼠标的输入,屏幕取词,日志监视等等.可见,利用钩子能够实现很多特殊而实用的功能.因此,对于高级编程人员来说,掌

基于Spring-SpringMVC-Mybatis的简单样例

复习下 好久没搞过撸过代码了! 这个样例包括一个完整的增删改查! 源代码地址http://download.csdn.net/detail/wangdianyong/8909903

使用SALT-API进入集成开发的简单样例

测试的时候,可以CURL -K,但真正作集成的时候,却是不可以的. 必须,不可以让TOKEN满天飞吧. 现在进入这个阶段了.写个样例先: import salt import salt.auth import salt.log import saltapi opts = salt.client.LocalClient().opts auth = salt.auth.LoadAuth(opts) lowstate = {'username':'XXX','password':'XXX','eaut

Spring Ajax一个简单样例

配置不说了.要在前面helloworld的样例基础上弄. 相同在hello下新建ajax.jsp <%@ page language="java" contentType="text/html; charset=UTF-8" pageEncoding="UTF-8"%> <%@ page isELIgnored ="false" %> <!DOCTYPE html PUBLIC "-//