MapReduce教程(一)基于MapReduce框架开发<转>

1 MapReduce编程

1.1 MapReduce简介

MapReduce是一种编程模型,用于大规模数据集(大于1TB)的并行运算,用于解决海量数据的计算问题。

MapReduce分成了两个部分:

1、映射(Mapping)对集合里的每个目标应用同一个操作。即,如果你想把表单里每个单元格乘以二,那么把这个函数单独地应用在每个单元格上的操作就属于mapping。

2、化简(Reducing)遍历集合中的元素来返回一个综合的结果。即,输出表单里一列数字的和这个任务属于reducing。

你向MapReduce框架提交一个计算作业时,它会首先把计算作业拆分成若干个Map任务,然后分配到不同的节点上去执行,

每一个Map任务处理输入数据中的一部分,当Map任务完成后,它会生成一些中间文件,这些中间文件将会作为Reduce任务的输入数据。

Reduce任务的主要目标就是把前面若干个Map的输出汇总到一起并输出。

MapReduce的伟大之处就在于编程人员在不会分布式并行编程的情况下,将自己的程序运行在分布式系统上。

1.2 MapReduce运行原理

 

MapReduce论文流程图 - 1.1

 

一切都是从最上方的user program开始的,user program链接了MapReduce库,实现了最基本的Map函数和Reduce函数。图中执行的顺序都用数字标记了。

1、MapReduce库先把user program的输入文件划分为M份(M为用户定义),每一份通常有16MB到64MB,如图左方所示分成了split0~4;然后使用fork将用户进程拷贝到集群内其它机器上。

2、user program的副本中有一个称为master,其余称为worker,master是负责调度的,为空闲worker分配作业(Map作业3或者Reduce作业),worker的数量也是可以由用户指定的。

3、被分配了Map作业的worker,开始读取对应分片的输入数据,Map作业数量是由M决定的,和split一一对应;Map作业从输入数据中抽取出键值对,每一个键值对都作为参数传递给map函数,map函数产生的中间键值对被缓存在内存中。

4、缓存的中间键值对会被定期写入本地磁盘,而且被分为R个区,R的大小是由用户定义的,将来每个区会对应一个Reduce作业;这些中间键值对的位置会被通报给master,master负责将信息转发给Reduce worker。

5、master通知分配了Reduce作业的worker它负责的分区在什么位置(肯定不止一个地方,每个Map作业产生的中间键值对都可能映射到所有R个不同分区),当Reduce worker把所有它负责的中间键值对都读过来后,先对它们进行排序,使得相同键的键值对聚集在一起。因为不同的键可能会映射到同一个分区也就是同一个Reduce作业(谁让分区少呢),所以排序是必须的。

6、reduce worker遍历排序后的中间键值对,对于每个唯一的键,都将键与关联的值传递给reduce函数,reduce函数产生的输出会添加到这个分区的输出文件中。

7、当所有的Map和Reduce作业都完成了,master唤醒正版的user program,MapReduce函数调用返回user program的代码

8、所有执行完毕后,MapReduce输出放在了R个分区的输出文件中(分别对应一个Reduce作业)。用户通常并不需要合并这R个文件,而是将其作为输入交给另一个MapReduce程序处理。整个过程中,输入数据是来自底层分布式文件系统(GFS)的,中间数据是放在本地文件系统的,最终输出数据是写入底层分布式文件系统(GFS)的。而且我们要注意Map/Reduce作业和map/reduce函数的区别:Map作业处理一个输入数据的分片,可能需要调用多次map函数来处理每个输入键值对;Reduce作业处理一个分区的中间键值对,期间要对每个不同的键调用一次reduce函数,Reduce作业最终也对应一个输出文件。

HadoopMapReduce模型实现图– 1.2

1.3 输入与输出

Map/Reduce框架运转在<key, value>键值对上,也就是说,框架把作业的输入看为是一组<key, value>键值对,同样也产出一组 <key, value>键值对做为作业的输出,这两组键值对的类型可能不同。

框架需要对key和value的类(classes)进行序列化操作,因此,这些类需要实现Writable接口。另外,为了方便框架执行排序操作,key类必须实现 WritableComparable接口。

一个Map/Reduce作业的输入和输出类型如下所示:

(input) <k1, v1> -> map -> <k2, v2>-> combine -> <k2, v2> -> reduce -> <k3, v3> (output)。

1.4 Writable接口

Writable接口是一个实现了序列化协议的序列化对象。

在Hadoop中定义一个结构化对象都要实现Writable接口,使得该结构化对象可以序列化为字节流,字节流也可以反序列化为结构化对象。


Java基本类型


Writable使用序列化大小


字节


布尔型


BooleanWritable


1


字节型


ByteWritable


1


整型


IntWritable


4


整型


VIntWritable


1-5


浮点型


FloatWritable


4


长整型


LongWritable


8


长整型


VLongWritable


1-9


双精度浮点型


DoubleWritable


8


Text类型对应


java的string

2 MapReduce编程

2.1 准备数据

1、    在/home路径下,新建words.txt文档,文档内容如下:

hello tom

hello jerry

hello kitty

hello world

hello tom

2、    上传到hdfs文件服务器/hadoop目录下:

执行命令:hadoop fs -put /home/words.txt /hadoop/words.txt

执行命令:hadoop fs -cat /hadoop/words.txt

2.2 WordCount v1.0代码编写

WordCount是一个简单的应用,它可以计算出指定数据集中每一个单词出现的次数。

1、    在pom.xml引入Jar包:

[html] view plain copy

  1. <!-- 引入hadoop-common Jar包 -->
  2. <dependency>
  3. <groupId>org.apache.hadoop</groupId>
  4. <artifactId>hadoop-common</artifactId>
  5. <version>2.7.1</version>
  6. </dependency>
  7. <!-- 引入hadoop-mapreduce-client-core Jar包 -->
  8. <dependency>
  9. <groupId>org.apache.hadoop</groupId>
  10. <artifactId>hadoop-mapreduce-client-core</artifactId>
  11. <version>2.7.1</version>
  12. </dependency>

2、    WCMapper代码编写:

[java] view plain copy

  1. package com.hadoop.mapreduce;
  2. import java.io.IOException;
  3. import org.apache.hadoop.io.LongWritable;
  4. import org.apache.hadoop.io.Text;
  5. import org.apache.hadoop.mapreduce.Mapper;
  6. /*
  7. * 继承Mapper类需要定义四个输出、输出类型泛型:
  8. * 四个泛型类型分别代表:
  9. * KeyIn        Mapper的输入数据的Key,这里是每行文字的起始位置(0,11,...)
  10. * ValueIn      Mapper的输入数据的Value,这里是每行文字
  11. * KeyOut       Mapper的输出数据的Key,这里是每行文字中的单词"hello"
  12. * ValueOut     Mapper的输出数据的Value,这里是每行文字中的出现的次数
  13. *
  14. * Writable接口是一个实现了序列化协议的序列化对象。
  15. * 在Hadoop中定义一个结构化对象都要实现Writable接口,使得该结构化对象可以序列化为字节流,字节流也可以反序列化为结构化对象。
  16. * LongWritable类型:Hadoop.io对Long类型的封装类型
  17. */
  18. public class WCMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
  19. /**
  20. * 重写Map方法
  21. */
  22. @Override
  23. protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, LongWritable>.Context context)
  24. throws IOException, InterruptedException {
  25. // 获得每行文档内容,并且进行折分
  26. String[] words = value.toString().split(" ");
  27. // 遍历折份的内容
  28. for (String word : words) {
  29. // 每出现一次则在原来的基础上:+1
  30. context.write(new Text(word), new LongWritable(1));
  31. }
  32. }
  33. }

3、    WCReducer代码编写:

[java] view plain copy

  1. package com.hadoop.mapreduce;
  2. import java.io.IOException;
  3. import org.apache.hadoop.io.LongWritable;
  4. import org.apache.hadoop.io.Text;
  5. import org.apache.hadoop.mapreduce.Reducer;
  6. /*
  7. * 继承Reducer类需要定义四个输出、输出类型泛型:
  8. * 四个泛型类型分别代表:
  9. * KeyIn        Reducer的输入数据的Key,这里是每行文字中的单词"hello"
  10. * ValueIn      Reducer的输入数据的Value,这里是每行文字中的次数
  11. * KeyOut       Reducer的输出数据的Key,这里是每行文字中的单词"hello"
  12. * ValueOut     Reducer的输出数据的Value,这里是每行文字中的出现的总次数
  13. */
  14. public class WCReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
  15. /**
  16. * 重写reduce方法
  17. */
  18. @Override
  19. protected void reduce(Text key, Iterable<LongWritable> values,
  20. Reducer<Text, LongWritable, Text, LongWritable>.Context context) throws IOException, InterruptedException {
  21. long sum = 0;
  22. for (LongWritable i : values) {
  23. // i.get转换成long类型
  24. sum += i.get();
  25. }
  26. // 输出总计结果
  27. context.write(key, new LongWritable(sum));
  28. }
  29. }

4、    WordCount代码编写:

[java] view plain copy

  1. package com.hadoop.mapreduce;
  2. import java.io.IOException;
  3. import org.apache.hadoop.conf.Configuration;
  4. import org.apache.hadoop.fs.Path;
  5. import org.apache.hadoop.io.LongWritable;
  6. import org.apache.hadoop.io.Text;
  7. import org.apache.hadoop.mapreduce.Job;
  8. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  9. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  10. public class WordCount {
  11. public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
  12. // 创建job对象
  13. Job job = Job.getInstance(new Configuration());
  14. // 指定程序的入口
  15. job.setJarByClass(WordCount.class);
  16. // 指定自定义的Mapper阶段的任务处理类
  17. job.setMapperClass(WCMapper.class);
  18. job.setMapOutputKeyClass(Text.class);
  19. job.setMapOutputValueClass(LongWritable.class);
  20. // 数据HDFS文件服务器读取数据路径
  21. FileInputFormat.setInputPaths(job, new Path("/hadoop/words.txt"));
  22. // 指定自定义的Reducer阶段的任务处理类
  23. job.setReducerClass(WCReducer.class);
  24. // 设置最后输出结果的Key和Value的类型
  25. job.setOutputKeyClass(Text.class);
  26. job.setOutputValueClass(LongWritable.class);
  27. // 将计算的结果上传到HDFS服务
  28. FileOutputFormat.setOutputPath(job, new Path("/hadoop/wordsResult"));
  29. // 执行提交job方法,直到完成,参数true打印进度和详情
  30. job.waitForCompletion(true);
  31. System.out.println("Finished");
  32. }
  33. }

2.3 生成JAR包

1、    选择hdfs项目->右击菜单->Export…,在弹出的提示框中选择Java下的JAR file:

2、    设置导出jar名称和路径,选择Next>:

3、    设置程序的入口,设置完成后,点击Finish:

4、    成生wc.jar如下文件,如下图:

2.4 执行JAR运行结果

1、    在开Xft软件,将D:盘的wc.jar上传到Linux/home路径下:

2、    执行命令

切换目录命令:cd /home/

执行JAR包命令:hadoop jar wc.jar

3、    查看执行结果

执行命令:hadoop fs -ls /hadoop/wordsResult

执行命令:hadoop fs -cat /hadoop/wordsResult/part-r-00000

--以上为《MapReduce教程(一)基于MapReduce框架开发》,如有不当之处请指出,我后续逐步完善更正,大家共同提高。谢谢大家对我的关注。

转自 http://blog.csdn.net/yuan_xw/article/details/50532368

时间: 2024-12-20 05:45:32

MapReduce教程(一)基于MapReduce框架开发<转>的相关文章

Hadoop伪分布安装详解+MapReduce运行原理+基于MapReduce的KNN算法实现

本篇博客将围绕Hadoop伪分布安装+MapReduce运行原理+基于MapReduce的KNN算法实现这三个方面进行叙述. (一)Hadoop伪分布安装 1.简述Hadoop的安装模式中–伪分布模式与集群模式的区别与联系. Hadoop的安装方式有三种:本地模式,伪分布模式,集群(分布)模式,其中后两种模式为重点,有意义 伪分布:如果Hadoop对应的Java进程都运行在一个物理机器上,称为伪分布 分布:如果Hadoop对应的Java进程运行在多台物理机器上,称为分布.[集群就是有主有从] 伪

一个基于SSM框架开发的高并发电商秒杀Web系统

0 前言 一个基于SSM框架的高并发秒杀系统采用IDEA+Maven+SSM+Mysql+Redis+Jetty.Bootstrap/Jquery开发. 通过这个小项目,理清了基于SSM框架开发Web应用的流程以及常见的避坑方法,并在最后简单采用了Redis缓存以及Mysql Procedure对项目进行了高并发优化. 接下来从DAO层.Service层.Web层开发以及高并发优化4个方面梳理整个项目开发过程. 源码地址https://github.com/Allegr0/seckill 项目准

基于ssm框架开发的零食商城源码

很多朋友说要分享一些基于ssm框架开发的项目,在休闲时间搭建和撸一个以ssm框架开发的零食商城源码,详情如下,希望大家能够见解和学习. 首先ssm定义是框架集由Spring.MyBatis两个开源框架整合而成(SpringMVC是Spring中的部分内容),在开发上前后分离,耦合度小,且开发方便快速,效率较高.大家可以把我分享的项目下载下来二次学习或者开发,同时也可用于毕设. 系统分为前后太两大部分,包含管理员.普通用户权限,具有一系列的包含权限,用户信息,商品信息,订单信息,个人中新,购物车下

【分享】后盾网原创视频,仿京东商城(基于HDPHP框架开发,适合提高)视频教程(PHP实战)

下载地址 链接:http://pan.baidu.com/disk/home#list/path=/ 用户[email protected] 密码:redbaidu 如需要全部课程请扫描下面二维码或者关注微信公众号 redbaidu 课时:32课时 知识点:织梦(DEDECMS)万能仿站课程,从零开始,详细.系统的讲解了整套织梦万能仿站技术,内容涵盖了理论讲解.实践演示.实战操作等方面.本套教程包含两个实战案例,讲解通俗易懂,深入浅出,适合各层次水平的学员学习.目前网站建设行业,一个普通的企业站

基于Vue框架开发的仿饿了么前端小应用

主要使用vue框架进行开发.使用最新的框架版本,修正了vue1.0到vue2.0过度过程出现的几处bug. 视频教程则是黄轶老师的<vuejs高仿饿了么APP>. 源代码地址:https://github.com/waihoyu/sell 原文地址:https://www.cnblogs.com/waihoyu/p/9350175.html

基于struts2框架开发的学生管理系统

学生管理系统采用struts2框架作为后台开发框架,jsp实现页面数据的展示,数据库采用mysql.功能介绍:包含学生信息管理,班级信息管理,年级信息管理,系统信息管理等功能.数据库模型设置如下:CREATE TABLE t_class (classId int(11) NOT NULL auto_increment,className varchar(20) default NULL,gradeId int(11) default NULL,classDesc text,PRIMARY KEY

java毕设--基于ssh框架开发的个人博客系统

联系qq:2835777178   有兴趣者可以联系我,也可先查看项目运行视频再决定 项目部分功能界面 一.博客主页面 二.关于我 三.个人日记 四.用户登录界面 五.登录后主界面 六.个人资料管理界面 在这里其他界面就不粘贴啦,如有需要联系上面的qq

0018 基于DRF框架开发(多表增删改的实现)

一个接口同时更新多个表的步骤如下: 先针对每个表写一个序列化器 在视图中接收数据,并根据数据分别调用不同的序列化器. 1 新增主从表 在视图中先接收数据,把主从表的数据分别放在不同的字典里,把主表字典传入主表序列化器,如果主表数据存在,则获取主表ID,如果主表记录不存在,则先新增主表记录,完成后得到该记录的ID,再把ID加入到从表字典中,调用从表序列化器更新. 2 修改主从表 在视图中接收数据,把主从表的数据分别放在不同的字典里,先调用主表序列化器更新主表,再调用从表序列化器更新从表. 3 删除

MVVM开源框架Knot.js 教程2 - 大幅改变前端框架开发体验的Debugger

Knotjs教程系列 1.CBS初步 2.Knot.js Debugger(本文) ....持续增加中 Knot.js 教程2 - 改变前端框架开发体验的Debugger Debugger只是一个方便开发的附属工具,按道理说是不值得单独为之写一篇文章的.不过Knot.js的Debugger绝对值得一篇文章. 有过框架开发体验的朋友一定多少都有过和框架搏斗的经验.一个小小的设置错误,由于你对框架的不够熟悉,导致出错后完全摸不到头脑.或者被迫在一堆陌生的代码中跟踪尝试找出问题,或者只有上网到处拉人提