大数据时代之hadoop(五):hadoop 分布式计算框架(MapReduce)

hadoop的核心分为两块,一是分布式存储系统-hdfs,这个我已经在上一章节大致讲了一下,另一个就是hadoop的计算框架-mapreduce

mapreduce其实就是一个移动式的基于key-value形式的分布式计算框架

其计算分为两个阶段,map阶段和reduce阶段,都是对数据的处理,由于其入门非常简单,但是若想理解其中各个环节及实现细节还是有一定程度的困难,因此我计划在本文中只是挑几个mapreduce的核心来进行分析讲解。

1、MapReduce驱动程序默认值

编写mapreduce程序容易入手的其中一个原因就在于它提供了一些了的默认值,而这些默认值刚好就是供开发环境设置而设定的。虽然容易入手,但还是的
理解mapreduce的精髓,因为它是mapreduce的引擎,只有理解了mapreduce的核心,当你在编写mapreduce程序的时候,你所
编写的程序才是最终稳重的,想要的程序。废话少说,见下面代码:

[java] view plaincopyprint?

  1. public int run(String[] args) throws IOException {
  2. JobConf conf = new JobConf();
  3. /**
  4. *默认的输入格式,即mapper程序要处理的数据的格式,hadoop支持很多种输入格式,下面会详细讲解,
  5. *但TextInputFormat是最常使用的(即普通文本文件,key为LongWritable-文件中每行的开始偏移量,value为Text-文本行)。
  6. **/
  7. conf.setInputFormat(org.apache.hadoop.mapred.TextInputFormat.class);
  8. /**
  9. *真正的map任务数量取决于输入文件的大小以及文件块的大小
  10. **/
  11. conf.setNumMapTasks(1);
  12. /**
  13. *默认的mapclass,如果我们不指定自己的mapper class时,就使用这个IdentityMapper 类
  14. **/
  15. conf.setMapperClass(org.apache.hadoop.mapred.lib.IdentityMapper.class);
  16. /**
  17. * map 任务是由MapRunner负责运行的,MapRunner是MapRunnable的默认实现,它顺序的为每一条记录调用一次Mapper的map()方法,详解代码  --重点
  18. */
  19. conf.setMapRunnerClass(org.apache.hadoop.mapred.MapRunner.class);
  20. /**
  21. * map任务输出结果的key 和value格式
  22. */
  23. conf.setMapOutputKeyClass(org.apache.hadoop.io.LongWritable.class);
  24. conf.setMapOutputValueClass(org.apache.hadoop.io.Text.class);
  25. /**
  26. * HashPartitioner 是默认的分区实现,它对map 任务运行后的数据进行分区,即把结果数据划分成多个块(每个分区对应一个reduce任务)。
  27. * HashPartitioner是对每条 记录的键进行哈希操作以决定该记录应该属于哪个分区。
  28. *
  29. */
  30. conf.setPartitionerClass(org.apache.hadoop.mapred.lib.HashPartitioner.class);
  31. /**
  32. * 设置reduce任务个数
  33. */
  34. conf.setNumReduceTasks(1);
  35. /**
  36. *默认的reduce class,如果我们不指定自己的reduce class时,就使用这个IdentityReducer 类
  37. **/
  38. conf.setReducerClass(org.apache.hadoop.mapred.lib.IdentityReducer.class);
  39. /**
  40. * 任务最终输出结果的key 和value格式
  41. */
  42. conf.setOutputKeyClass(org.apache.hadoop.io.LongWritable.class);
  43. conf.setOutputValueClass(org.apache.hadoop.io.Text.class);
  44. /**
  45. * 最终输出到文本文件类型中
  46. */
  47. conf.setOutputFormat(org.apache.hadoop.mapred.TextOutputFormat.class);/*]*/
  48. JobClient.runJob(conf);
  49. return 0;
  50. }

我要说的大部分都包含在了代码的注释里面,除此之外,还有一点:由于java的泛型机制有很多限制:类型擦除导致运行过程中类型信息并非一直可见,所以hadoop需要明确设定map,reduce输入和结果类型

上面比较重要的就是MapRunner这个类,它是map任务运行的引擎,默认实现如下:

[java] view plaincopyprint?

  1. public class MapRunner<K1, V1, K2, V2>
  2. implements MapRunnable<K1, V1, K2, V2> {
  3. private Mapper<K1, V1, K2, V2> mapper;
  4. private boolean incrProcCount;
  5. @SuppressWarnings("unchecked")
  6. public void configure(JobConf job) {
  7. //通过反射方式取得map 实例
  8. this.mapper = ReflectionUtils.newInstance(job.getMapperClass(), job);
  9. //increment processed counter only if skipping feature is enabled
  10. this.incrProcCount = SkipBadRecords.getMapperMaxSkipRecords(job)>0 &&
  11. SkipBadRecords.getAutoIncrMapperProcCount(job);
  12. }
  13. public void run(RecordReader<K1, V1> input, OutputCollector<K2, V2> output,
  14. Reporter reporter)
  15. throws IOException {
  16. try {
  17. // allocate key & value instances that are re-used for all entries
  18. K1 key = input.createKey();
  19. V1 value = input.createValue();
  20. while (input.next(key, value)) {
  21. // map pair to output
  22. //循环调用map函数
  23. mapper.map(key, value, output, reporter);
  24. if(incrProcCount) {
  25. reporter.incrCounter(SkipBadRecords.COUNTER_GROUP,
  26. SkipBadRecords.COUNTER_MAP_PROCESSED_RECORDS, 1);
  27. }
  28. }
  29. } finally {
  30. mapper.close();
  31. }
  32. }
  33. protected Mapper<K1, V1, K2, V2> getMapper() {
  34. return mapper;
  35. }
  36. }

要相信,有些时候还是看源码理解的更快!

2、shuffle

shuffle过程其实就是从map的输出到reduce的输入过程中所经历的步骤,堪称mapreduce的“心脏”,分为3个阶段,map端分区、reduce端复制、reduce排序(合并)阶段。

2.1、map端分区

由于在mapreduce计算中,有多个map任务和若干个reduce任何,而且各个任务都可能处于不同的机器里面,所以如何从map任务的输出到reduce的输入是一个难点。

map函数在产生输出时,并不是简单的写到磁盘中,而是利用缓冲的形式写入到内存,并出于效率进行预排序,过程如下图:

写磁盘之前,线程首先根据reduce的个数将输出数据划分成响应的分区(partiton)。在每个分区中,后台线程按键进行内排序,如果有个一combiner,它会在排序后的输出上运行。

2.2、reduce端复制阶段

由于map任务的输出文件写到了本地磁盘上,并且划分成reduce个数的分区(每一个reduce需要一个分区),由于map任务完成的时间可能不同,因此只要一个任务完成,reduce任务就开始复制其输出,这就是reduce任务的复制阶段。如上图所示。

2.3、reduce端排序(合并)阶段

复制完所有map输出后,reduce任务进入排序阶段(sort phase),这个阶段将合并map输出,维持其顺序排序,如上图所示。

3、输入与输出格式

随着时间的增加,数据的增长也是指数级的增长,且数据的格式也越来越多,对大数据的处理也就越来越困难,为了适应能够处理各种各样的数据,hadoop提供了一系列的输入和输出格式控制,其目的很简单,就是能够解析各种输入文件,并产生需要的输出格式数据

但是不管处理哪种格式的数据,都要与mapreduce结合起来,才能最大化的发挥hadoop的有点。

这部分也是hadoop的核心啊!

3.1、输入分片与记录

在讲HDFS的时候,说过,一个输入分片就是由单个map任务处理的输入块一个分片的大小最好与hdfs的块大小相同

每个分片被划分成若干个记录,每个记录就是一个键值对,map一个接一个的处理每条记录

数据库常见中,一个输入分片可以对应一个表的若干行,而一条记录对应一行(DBInputFormat)。

输入分片在hadoop中表示为InputSplit接口,有InputFormat创建的

InputFormat负责产生输入分片并将他们分割成记录,其只是一个接口,具体任务有具体实现去做的

3.2、FileInputFormat

FileInputFormat是所有使用文件作为其数据源的InputFormat实现的基类,它提供了两个功能:一个定义哪些文件包含在作业的输入中;一个为输入文件产生分片的实现。把分片割成基类的作业有其子类实现,FileInputFormat是个抽象类

FileInputFormat实现了把文件分区的功能,但它是怎么来实现了呢?需要先说三个参数:


属性名称


类型


默认值


描述


mapred.min.split.size


Int


1


一个文件分片的最小字节数


mapred.max.split.size


Long


Long.MAX_VALUE


一个文件分片的最大字节数


dfs.block.size


long


64M


HDFS中块大小

分片的大小有一个公式计算(参考FileInputFomat类的computeSplitSize()方法)

max(minimumSize,min(maximumSize,blockSize))

默认情况下: minimumSize  <  blockSize < maximumSize

FileInputFormat只分割大文件,即文件大小超过块大小的文件

FileInputFormat生成的InputSplit是一整个文件(文件太小,未被分区,整个文件当成一个分区,供map任务处理)或该文件的一部分(文件大,被分区)

3.3、常用的InputFormat实现

小文件与CombineFileInputFormat

虽然hadoop适合处理大文件,但在实际的情况中,大量的小文件处理是少不了的,因此hadoop提供了一个CombineFileInputFormat,它针对小文件而设计的,它把多个文件打包到一个分片中一般每个mapper可以处理更多的数据

TextInputFormat


     hadoop默认的InputFormat,每个记录的键是文件中行的偏移量,值为行内容

KeyValueInputFormat

适合处理配置文件,文件中行中为key value格式的,如key=value类型的文件  ,key即为行中的key,value即为行中的value。

NLineInputFormat

也是为处理文本文件而开发的,它的特点是为每个map任务收到固定行数的输入,其他与TextInputFormat相似。

SequenceFileInputFormat(二进制输入)

hadoop的顺序文件格式存储格式存储二进制的键值对序列,由于顺序文件里面存储的就是map结构的数据,所以刚好可以有SequenceFileInputFormat 来进行处理。

DBInputFormat

顾名思义,用于使用jdbc从关系数据库中读取数据。

多种输入

MultipleInputs类可以用来处理多种输入格式的数据,如输入数据中包含文本类型和二进制类型的,这个时候就可以用 MultipleInputs来指定某个文件有哪种输入类型和哪个map函数来解析。

3.4、输出格式

既然有输入格式,就有输出格式,与输入格式对应。

 默认的输出格式是TextOutputFormat,它把记录写成文本行,键值对可以是任意类型, 键值对中间默认用制表符分割

3.5、hadoop特性

除了上面几点之外,还有计数器、排序、连接等需要关注,详细待后续吧。。。

时间: 2024-10-07 05:27:06

大数据时代之hadoop(五):hadoop 分布式计算框架(MapReduce)的相关文章

大数据时代之hadoop(六):hadoop 生态圈(pig,hive,hbase,ZooKeeper,Sqoop)

大数据时代之hadoop(一):hadoop安装 大数据时代之hadoop(二):hadoop脚本解析 大数据时代之hadoop(三):hadoop数据流(生命周期) 大数据时代之hadoop(四):hadoop 分布式文件系统(HDFS) 大数据时代之hadoop(五):hadoop 分布式计算框架(MapReduce) hadoop是有apache基金会所开发的分布式系统基础架构,其主要提供了两方面的功能:分布式存储和分布式计算.其中分布式存储是分布式计算的基础,在hadoop的实现里面,提

决胜大数据时代:Hadoop&amp;Yarn&amp;Spark企业级最佳实践(8天完整版脱产式培训版本)

Hadoop.Yarn.Spark是企业构建生产环境下大数据中心的关键技术,也是大数据处理的核心技术,是每个云计算大数据工程师必修课. 课程简介 大数据时代的精髓技术在于Hadoop.Yarn.Spark,是大数据时代公司和个人必须掌握和使用的核心内容. Hadoop.Yarn.Spark是Yahoo!.阿里淘宝等公司公认的大数据时代的三大核心技术,是大数据处理的灵魂,是云计算大数据时代的技术命脉之所在,以Hadoop.Yarn.Spark为基石构建起来云计算大数据中心广泛运行于Yahoo!.阿

大数据时代到底Hadoop和Spark谁是王者!

在现在这个大数据时代,Hadoop和Spark是最潮流的两个词汇,Hadoop是一种分布式计算框架,由Google提出,主要用于搜索领域,解决海量数据的计算问题,Hadoop中的MapReduce包括两个阶段:Mapper阶段和Reducer阶段,用户只需要实现map函数和reduce函数即可实现分布式计算,非常简单.而近几年Spark新兴框架的产生,以不可挡之势席卷中国,其核心内部结构RDD以超强的弹性机制更加的引人注目!越来越多的人认为Spark终有一天要取代Hadoop,但是事实究竟如何呢

Hadoop大数据时代:Hadoop&amp;YarnSpark企业级最佳实践 (4天)

Hadoop.Yarn.Spark是企业构建生产环境下大数据中心的关键技术,也是大数据处理的核心技术,是每个云计算大数据工程师必修课. 大数据时代的精髓技术在于Hadoop.Yarn.Spark,是大数据时代公司和个人必须掌握和使用的核心内容. Hadoop.Yarn.Spark是Yahoo!.阿里淘宝等公司公认的大数据时代的三大核心技术,是大数据处理的灵魂,是云计算大数据时代的技术命脉之所在,以Hadoop.Yarn.Spark为基石构建起来云计算大数据中心广泛运行于Yahoo!.阿里淘宝.腾

大数据项目实践:基于hadoop+spark+mongodb+mysql开发医院临床知识库系统

一.前言 从20世纪90年代数字化医院概念提出到至今的20多年时间,数字化医院(Digital Hospital)在国内各大医院飞速的普及推广发展,并取得骄人成绩.不但有数字化医院管理信息系统(HIS).影像存档和通信系统(PACS).电子病历系统(EMR)和区域医疗卫生服务(GMIS)等成功实施与普及推广,而且随着日新月异的计算机技术和网络技术的革新,进一步为数字化医院带来新的交互渠道譬如:远程医疗服务,网上挂号预约. 随着IT技术的飞速发展,80%以上的三级医院都相继建立了自己的医院信息系统

大数据架构师基础:hadoop家族,Cloudera产品系列等各种技术

大数据我们都知道hadoop,可是还会各种各样的技术进入我们的视野:Spark,Storm,impala,让我们都反映不过来.为了能够更好的架构大数据项目,这里整理一下,供技术人员,项目经理,架构师选择合适的技术,了解大数据各种技术之间的关系,选择合适的语言. 我们可以带着下面问题来阅读本文章: 1.hadoop都包含什么技术 2.Cloudera公司与hadoop的关系是什么,都有什么产品,产品有什么特性 3. Spark与hadoop的关联是什么? 4. Storm与hadoop的关联是什么

大数据系列(3)——Hadoop集群完全分布式坏境搭建

前言 上一篇我们讲解了Hadoop单节点的安装,并且已经通过VMware安装了一台CentOS 6.8的Linux系统,咱们本篇的目标就是要配置一个真正的完全分布式的Hadoop集群,闲言少叙,进入本篇的正题. 技术准备 VMware虚拟机.CentOS 6.8 64 bit 安装流程 我们先来回顾上一篇我们完成的单节点的Hadoop环境配置,已经配置了一个CentOS 6.8 并且完成了java运行环境的搭建,Hosts文件的配置.计算机名等诸多细节. 其实完成这一步之后我们就已经完成了Had

大数据系列(2)——Hadoop集群坏境CentOS安装

前言 前面我们主要分析了搭建Hadoop集群所需要准备的内容和一些提前规划好的项,本篇我们主要来分析如何安装CentOS操作系统,以及一些基础的设置,闲言少叙,我们进入本篇的正题. 技术准备 VMware虚拟机.CentOS 6.8 64 bit 安装流程 因为我的笔记本是Window7操作系统,然后内存配置,只有8G,内存配置太低了,当然为了演示,我会将Hadoop集群中的主节点分配2GB内存,然后剩余的三个节点都是1GB配置. 所有的节点存储我都设置为50GB. 在安装操作系统之前,我们需要

当不再炒作大数据的时候,大数据时代就真的来了

从2015年开始,大数据就已经被移出了Gartner的新兴技术炒作曲线."Big Data"(大数据)一词最早于2011年8月出现在Gartner新兴技术炒作曲线中,当时Gartner预计大数据技术需要2年到5年才能进入企业的实际生产型应用中.从那以后,大数据就迅速被市场热炒,最终在2015年彻底在Gartner新兴技术炒作曲线中消失. 进入2016年,大数据已经进入了实际的企业生产应用,在切实推动企业向数字化转型.另一家市场调查公司IDC则强调,在未来5年中,全球的数据驱动型企业将获

跟上节奏 大数据时代十大必备IT技能(转)

新的想法诞生新的技术,从而造出许多新词,云计算.大数据.BYOD.社交媒体……在互联网时代,各种新词层出不穷,让人应接不暇.这些新的技术,这些新兴应用和对应的IT发展趋势,使得IT人必须了解甚至掌握最新的IT技能. 新的想法诞生新的技术,从而造出许多新词,云计算.大数据.BYOD.社交媒体.3D打印机.物联网……在互联网时代,各种新词层出不穷,让人应接不暇.这些新的技术,这些新兴应用和对应的IT发展趋势,使得IT人必须了解甚至掌握最新的IT技能.另一方面,云计算和大数据乃至其他助推各个行业发展的