Hadoop 框架基础(四)

** Hadoop 框架基础(四)

上一节虽然大概了解了一下 mapreduce,徒手抓了海胆,不对,徒手写了 mapreduce 代码,也运行了出来。但是没有做更深入的理解和探讨。

那么……

本节目标:

* 深入了解 mapreduce 过程

* 成功部署 Hadoop 集群

** mapreduce 原理

想要了解 mapreduce 原理,我们必须搞清楚处理数据时的每一个重要阶段,首先,贴上一张官方的图:

我们依次讨论每一个过程以及该过程对应的作用:

我先在这里假设一个情景,我现在有一个 10G 大小的 words.txt,里面存放的是 N 多个英文单词。

这 10G 大小的文件分为若干个 128M 的文件块 block 分布存储于若干个服务器。

好,现在我要统计这 10G 文件中的单词出现频率。

** input split

一个 split 会对应一个 map 任务。

一般来讲,split 的个数和 block 的个数相同,当然也可以多个 block 属于同一个 split,但是后者会产生大量的网络和磁盘 IO,原因在于一个 split 对应一个 map 任务,一个 map 任务肯定跑在某一台机器上,如果某个 split 所包含的多个 block 分布于不同的机器,首先需要做的操作就是把其他机器的 block 拷贝到运行 map 任务的机器上,这会耗费一定时间,所以,默认情况下,一个 block 对应一个 split,源码中设定如下:

mapreduce.input.fileinputformat.split.minsize == 0

mapreduce.input.fileinputformat.split.maxsize == 10000

splitSize=max(minSize,min(maxSize, blockSize)),此为默认 split 大小

如果要修改,则如下方式:

recordSize 表示一个记录的大小,分块要保证数据的完整性,所以:

int blockSize = Integer.parseInt(x); //x 表示你希望的 split 大小

int splitSize = blockSize / recordSize * recordSize;

conf.setLong("mapred.max.split.size",splitSize);

conf.setLong("mapred.min.split.size",splitSize);

** map

此时输入的到 map 中的数据形式大致为:

<0, cat one hadoop element...>  ---> 调用一次 map

<30, dog two one hadoop....>  ---> 调用一次 map

……

省略号表示后边还有,其中 0,30 表示的是偏移量,每次从当前 split 中读取 1 行数据,比如第一次读取第一行,偏移量为 0~29,第二次是第二行数据,偏移量是 30~?,以此类推。每次读取都执行一次 map 任务,并调用一次 map 方法。map 阶段结束,输出结果形式大致为:

<cat , 1>  <one, 1>  <hadoop, 1>  <element, 1> …… 等等


从此进入 shuffle 阶段

** buffer in memory

这是一个状态描述,表明此刻在内存中开始操作,buffer 在这里是内存中的一个环形数组。

之所以用环形数组来存放数据,是因为这样可以最大化的利用存储空间。

这个环形数组中存放数据分为两个类别:

1、元数据区(Kvmeta):

里面存放的每组数据都包含:

** value 的起始位置

** key 的起始位置

** partition 值

** value 的长度

2、数据区(Kvbuffer):

里面存放的每组数据都包含:

** key 值,例如 <cat ,1> 中的 cat

** value 值,例如 <cat, 1> 中的 1

注意:

* 以上两个区域的分界点为 0,即 0 以上存储数据区内容,0 以下存储元数据区的内容。

* 这个环形 buffer 虽然实际为一个字节数组,但抽象为一个 IntBuffer,无论哪个区域中的数据,每组数据中的每个元素都占用 4 个字节,也就是每组中的每个元素的存储,数组下标都将移动 4 位 (因为一个 int 为 4 个字节)。

* partition

分区的意义在于把一系列相似的单词分为同一个区。即单词归类处理,这样不同机器上的不同 map 任务输出的单词可以依据分区递交给相同的 reduce 做处理。

注意:

* 相关类: HashPartitioner

* 这里的 “相似”,指的是:键(此例中为单词)的 hash 值在某一个范围内

* sort

map 排序阶段,在 buffer 中把数据按照 partion 和 key 两个关键字做升序排序,这个排序只需要移动 “元数据区” 中的每组数据顺序即可。排序结果是 “元数据区” 中的每组数据按照 partition 分区聚集在一起,同一个 partition 分区内的 key 按照字典顺序排序。

* combine(可选)

结合阶段,可以在 map 阶段简化数据输出,减少后边 spill 溢写过程中,spill 溢写文件的大小,例如:可以将 <cat, 1> <cat, 1 > 这样的数据在 map 阶段合并为 < cat, 2 > 这样的数据作为 map 输出,默认没有开启。

* spill

溢写阶段,当内存中的环形存储结构占用率达到一定程度(默认占用 80% 时,则开始溢写),则将环形数据区中的所有内容,刷入到当前本地硬盘能够存的下这些数据的目录中,以使内容腾出空间供后边继续使用。

相同的 partition 分区的数据写入到同一个文件中,类似:“spill10.out”,“spill11.out”这样的文件,每一个 partition 分区所产生的文件的存放位置和一些相关信息,存放在另一个 “元数据” 文件中,类似“spill10.out.index”,“spill11.out.index”(注意,这个元数据文件和刚才说的元数据区不是一码事)。

这个元数据文件包含:

** 起始位置

** 原始数据长度

** 压缩之后的数据长度

** crc32 的校验数据

该文件的作用是:标记某个 partition 对应的文件在哪个目录,哪个索引中存放。

注意:

* spill10.out.index 这样的文件不一定会产生,如果内存中放得下(针对这个文件数据的存放,内存只提供 1M 空间可用),就放在内存中。

* 内存占用达到 80%,开始溢写,那么此时 map 任务还在进行,还在往内存里添加数据,新的数据的起始点(0 点)为剩余空间的中间部分,然后数据区和元数据区分别往两边递增即可,溢写后释放内存后也不必改变什么,继续写入即可。

** map merge

map 融合阶段,将溢写阶段产生的多个文件,根据所属分区,把具有相同 partition 分区的 “元数据(从 spill10.out.index 这样的文件中读取的)” 放置于同一个 segment 列表中,最后根据 segment 列表,把数据从 spill 溢写出来的文件一个一个中读取出来,写入到 file.out 文件中,同时将这一批段的数据索引(元数据分区等)写入到 file.out.index 文件中,最终生成两个文件,file.out 和 file.out.index,其中包含了多段数据,每段数据对应一个分区。

** compress (可选)

map 压缩阶段,对 map merge 阶段产生的文件进行压缩处理,以便于在后边的网络传输过程中减少网络 IO 压力,提升效率。

至此,map 端的 shuffle 过程结束。

** sort merge

reduce 任务会根据分区数据段拉取每个 map 任务产生的数据,拉取后,因为可能涉及到多个 map 产生的数据,所以要进行排序,一边 copy 一边排序,最后把多个 map 产生的具有相同分区的数据合并为一个分区数据段,这个 merge 过程和 map 的 merge 算法过程一样。

在此完成 shuffle 阶段



** reduce

对于本例而言,此时产生的某个分区中的某个单词形式大概如下:

<cat, [1, 1, 1, 1, 1, 1]>,在调用 reduce 方法时,进行 values 各个元素的叠加操作即可。

** output

统计完成后,输出数据到文件目录,文件格式为 part-r-00000 这样形式的文件,存放于 HDFS 中。文件中 key 和 value 默认的分隔符为:\t

** Hadoop 集群部署

之前我们在 yarn 框架中运行 mapreduce 任务,或者操作 hdfs,其中的各种节点都是运行在一台虚拟机上的,现在我们要将 hadoop 部署在一个多台虚拟机构成的完全分布式集群中(全部都在一个机器节点上的叫做伪分布式,比如之前的方式)。部署前,我们先勾画一下各个节点的部署结构,如下图所示:

描述:

3 台机器共有进程:HDFS 的 datanode,yarn 的 nodemanager

其中,HDFS 的 namenode 开在 z01 这台机器上,secondarynamenode 开在 z03 这台机器上

YARN 的 resourcemanager 开在 z02 这台机器上。

注:SecondaryNameNode 是用来协助 NameNode 整合 fsimage 和 edits 的。

一、准备系统环境

1、修改主机名

# vi /etc/hostname

2、主机名和 ip 地址的映射

# vi /etc/hosts,我的机器修改如图,注意,三台机器都要这么设置:

3、关闭防火墙和 selinux

请跳转至 Linux 基础 04 查看相关方法。

4、创建普通用户

# useradd 用户名,如果已经有普通用户,则无需再次创建

# echo 666666 | passwd --stdin 用户名

5、配置静态 IP 和 DNS

请参看 Linux 基础 01 内容

6、把后面两个虚拟机的系统启动级别改成 “字符模式”(就是没有桌面,这样可以减少虚拟机负担,加速系统启动和运行)

# cat /etc/inittab,内容如图所示:

根据文件中的提示,可以使用命令:

systemctl set-default multi-user.target,来设置无界面启动 linux

systemctl set-default graphical.target,来设置有界面启动 linux

7、卸载服务器 JDK

请参看 Linux 基础 02 中的内容

二、配置 NTP 时间服务器

对于我们当前这种案例,主要目标是把 z01 这台服务器设置为时间服务器,剩下的 z02,z03 这两台机器同步 z01 的时间,我们需要这样做的原因是因为,整个集群架构中的时间,要保持一致。

** 检查当前系统时区,使用命令:

# date -R,如图:

注意这里,如果显示的时区不是 + 0800,你可以删除 localtime 文件夹后,再关联一个正确时区的链接过去,命令如下:

# rm -rf /etc/localtime

# ln -s /usr/share/zoneinfo/Asia/Shanghai /etc/localtime

** 同步时间

# ntpdate pool.ntp.org

** 检查软件包

查看 ntp 软件包是否已安装,使用命令:

# rpm -qa | grep ntp,如图,红框中的内容:

如果没有红框中的内容,则可以使用命令:

# yum -y install ntp,来进行安装

** 修改 ntp 配置文件

# vi /etc/ntp.conf

去掉下面这行前面的# , 并把网段修改成自己的网段:

restrict 192.168.122.0 mask 255.255.255.0 nomodify notrap

注释掉以下几行:

#server 0.centos.pool.ntp.org

#server 1.centos.pool.ntp.org

#server 2.centos.pool.ntp.org

把下面两行前面的 #号去掉, 如果没有这两行内容, 需要手动添加

server  127.127.1.0    # local clock

fudge  127.127.1.0 stratum 10

最后,如图所示:

** 重启 ntp 服务

# systemctl start  ntpd.service,注意,如果是 centOS7 以下的版本,使用命令:service ntpd start

# systemctl enable ntpd.service,注意,如果是 centOS7 以下的版本,使用命令:chkconfig ntpd on

** z03,z03 去同步 z01 这台时间服务器时间

首先需要关闭这两台计算机的 ntp 服务

# systemctl stop  ntpd.service,centOS7 以下,则:service ntpd stop

# systemctl disable ntpd.service,centOS7 以下,则:chkconfig ntpd off

# systemctl status ntpd,查看 ntp 服务状态

# pgrep ntpd,查看 ntp 服务进程 id

同步第一台服务器 z01 的时间:

# ntpdate z01,如图:

** 制定计划任务, 周期性同步时间

# crontab -e

*/10 * * * * /usr/sbin/ntpdate z01,如图所示:

重启定时任务:

# systemctl restart  crond.service,centOS7 以下使用:service crond restart,z03 这台机器的配置同理

三、配置无密钥登录

配置 hadoop 集群,首先需要配置集群中的各个主机的 ssh 无密钥访问

在 z01 上,通过如下命令,生成一对公私钥对

$ ssh-keygen -t rsa,一顿回车操作,这条命令执行完毕后(注意使用普通用户执行该命令),会在 / home/z/.ssh / 目录下生成两个文件:id_rsa 和 id_rsa.pub,如图所示:

生成之后呢,把 z01 生成的公钥拷贝给 z01,z02,z03 这三台机器,对,没错,包含当前机器。

$ ssh-copy-id z01

$ ssh-copy-id z02

$ ssh-copy-id z03

完成后,z02 机器如图(z03 同理):

以上完成了 z01 生成私钥,公钥并把公钥拷贝给 z01,z02,z03 三台机器的过程,z02,z03 这两台机器也需要进行如上操作。全部完成后,我们可以在任意一台机器上,无密钥的连接到另外一台机器,比如,我们在 z01 连接到 z02 这台机器,使用命令:

$ ssh z02,如图:

这样就成功的在 z01 的机器登录到 z02 机器了。

四、安装配置 JDK

使用 root 用户,在后面两台机器上创建 / opt/modules 文件夹,并使该文件夹的所属改为普通用户。

接着便可以使用远程命令 scp,把已经在 z01 中安装好的 jdk 目录拷贝给另外两台机器。

$ scp -r /opt/modules/jdk1.7.0_67/ z02:/opt/modules/

$ scp -r /opt/modules/jdk1.7.0_67/ z03:/opt/modules/

注意中间有空格分开。配置完成后,记得去 z02,z03 修改 / etc/profile 环境变量

五、安装配置 Hadoop

** 首先,需要先删除 z01 中的 / opt/modules/hadoop-2.5.0/data 目录,执行命令:

$ rm -rf /opt/modules/hadoop-2.5.0/data

** 在如下文件中,修改 JAVA_HOME

hadoop-env.sh  yarn-env.sh  mapred-env.sh

export JAVA_HOME=/opt/modules/jdk1.8.0_121

** 修改 HDFS 默认地址、HDFS 临时存储路径

涉及文件:core-site.xml

fs.defaultFS:hdfs://z01:8020

hadoop.tmp.dir:/opt/modules/hadoop-2.5.0/data

如图:

** 声明哪些服务器是 datanode

涉及文件:slaves

z01

z02

z03

如图:

** 修改数据存放的副本数,SecondaryNameNode 节点地址

涉及文件:hdfs-site.xml

dfs.replication:3

dfs.namenode.secondary.http-address:z03:50090

dfs.namenode.http-address:z01:50070

dfs.permissions.enabled:false

如图:

**resourcemanager 节点配置,以及一些其他配置

涉及文件:yarn-site.xml

yarn.resourcemanager.hostname:z02

yarn.nodemanager.aux-services:mapreduce_shuffle

yarn.log-aggregation-enable:true

yarn.log-aggregation.retain-seconds:86400

如图:

** jobhistory 服务以及其他设置

涉及文件:mapred-site.xml

mapreduce.framework.name:yarn

mapreduce.jobhistory.address:z01:10020

mapreduce.jobhistory.webapp.address:z01:19888

如图:

** 配置好后,拷贝 hadoop 安装目录给其他服务器

$ rm -rf /opt/modules/hadoop-2.5.0/share/doc/,删除该文档目录,以减少远程拷贝的体积

$ scp -r /opt/modules/hadoop-2.5.0/ z02:/opt/modules/

$ scp -r/opt/modules/ hadoop-2.5.0/ z03:/opt/modules/

全部搞定后,接下来我们就可以启动这个分布式系统了

六、启动 Hadoop

** 在 z01 需要先格式化 hdfs 的 namenode:

$ bin/hdfs namenode -format

** 使用 start 的脚本启动集群中所有的 hdfs 服务,包含 namenode 和 datanode 节点

$ sbin/start-dfs.sh

** 在 z02 中启动 yarn 服务,包含 resourcemanager 和 nodemanager,注意,如果 resourcemanger 和 namenode 服务不在同一台机器上,那么启动 resourcemanager 服务必须在所在的机器启动,这里参看我们之前设定的集群配置图,所以需要在 z02 机器上执行如下命令:

$ sbin/start-yarn.sh

启动完成后,分别查看 z01,z02,z03 机器的 jps,如下图:

z01:

z02:

z03:

在对比一下之前的集群配置图,是符合我们的期望的。

** 总结

本节主要深入讨论 mapreduce 的运算原理及过程,以及如何配置一个 hadoop 完全分布式集群。



个人微博:http://weibo.com/seal13

QQ 大数据技术交流群(广告勿入):476966007


作者:Z尽际链接:https://www.jianshu.com/p/0ad52ec23309來源:简书著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。

原文地址:https://www.cnblogs.com/wzlbigdata/p/8277644.html

时间: 2024-10-04 13:39:57

Hadoop 框架基础(四)的相关文章

Hadoop框架基础(五)

** Hadoop框架基础(五) 已经部署了Hadoop的完全分布式集群,我们知道NameNode节点的正常运行对于整个HDFS系统来说非常重要,如果NameNode宕掉了,那么整个HDFS就要整段垮掉了,所以人类思考,能不能让世界多一点爱:),我们能不能弄个备用的NameNode,一旦正在使用的NameNode原地爆炸了,另一台备用的NameNode能立刻代替原先NameNode的位置,继续让HDFS系统正常运行?(同理,ResourceManager也是可以的.) 世界果然充满爱,动物管理员

HBase框架基础(四)

* HBase框架基础(四) 上一节我们介绍了如何使用HBase搞一些MapReduce小程序,其主要作用呢是可以做一些数据清洗和分析或者导入数据的工作,这一节我们来介绍如何使用HBase与其他框架进行搭配使用. * HBase与Hive 在开始HBase与Hive搭配使用前,我们复习一下这两个框架的特点: Hive: ** 数据仓库 ** 用于数据分析,数据清洗等等 ** 基于MapReduce ** 延迟高,离线使用 HBase: ** 面向列存储的非关系型数据库 ** 存储数据 ** 基于

hadoop rpc基础

第一部分: hadoop rpc基础 RPC,远程程序调用,分布式计算中C/S模型的一个应用实例. 同其他RPC框架一样,Hadoop分为四个部分: 序列化层:支持多种框架实现序列化与反序列化 函数调用层:利用java反射与动态代理实现 网络传输层:基于TCP/IP的Socket机制 服务的处理框架:基于Reactor模式的事件驱动IO模型 Hadoop RPC主要对外提供2种接口 public static ProtocolProxy getProxy/waitForProxy: 构造一个客户

Hive框架基础(二)

* Hive框架基础(二) 我们继续讨论hive框架 * Hive的外部表与内部表 内部表:hive默认创建的是内部表 例如: create table table001 (name string , age string) location '/input/table_data'; 此时:会在HDFS上新建一个table001表的数据存放地 接着执行: load data inpath'/input/data 'into table table001;(注意,load关键字后没有跟local关

框架基础:关于ajax设计方案(三)---集成ajax上传技术

之前发布了ajax的通用解决方案,核心的ajax发布请求,以及集成了轮询.这次去外国网站逛逛,然后发现了ajax level2的上传文件,所以就有了把ajax的上传文件集成进去的想法,ajax方案的level2的改进就不介绍了,不清楚的可到前几篇博客去看看.我们直接切入主题. 概念介绍: 1. js的FormData:js中在新的版本中已经支持了FormData对象,可以初始化一个空的form,或者初始化已经存在的form,浏览器测试代码. 2. 浏览器的支持:浏览器已支持input=file的

框架基础:ajax设计方案(三)---集成ajax上传技术

之前发布了ajax的通用解决方案,核心的ajax发布请求,以及集成了轮询.这次去外国网站逛逛,然后发现了ajax level2的上传文件,所以就有了把ajax的上传文件集成进去的想法,ajax方案的level2的改进就不介绍了,不清楚的可到前几篇博客去看看.我们直接切入主题. 概念介绍: 1. js的FormData:js中在新的版本中已经支持了FormData对象,可以初始化一个空的form,或者初始化已经存在的form,浏览器测试代码. 2. 浏览器的支持:浏览器已支持input=file的

从hadoop框架与MapReduce模式中谈海量数据处理

前言 几周前,当我最初听到,以致后来初次接触Hadoop与MapReduce这两个东西,我便稍显兴奋,觉得它们很是神秘,而神秘的东西常能勾起我的兴趣,在看过介绍它们的文章或论文之后,觉得Hadoop是一项富有趣味和挑战性的技术,且它还牵扯到了一个我更加感兴趣的话题:海量数据处理. 由此,最近凡是空闲时,便在看“Hadoop”,“MapReduce”“海量数据处理”这方面的论文.但在看论文的过程中,总觉得那些论文都是浅尝辄止,常常看的很不过瘾,总是一个东西刚要讲到紧要处,它便结束了,让我好生“愤懑

从Hadoop框架与MapReduce模式中谈海量数据处理(含淘宝技术架构)

从hadoop框架与MapReduce模式中谈海量数据处理 前言 几周前,当我最初听到,以致后来初次接触Hadoop与MapReduce这两个东西,我便稍显兴奋,认为它们非常是神奇,而神奇的东西常能勾起我的兴趣,在看过介绍它们的文章或论文之后,认为Hadoop是一项富有趣味和挑战性的技术,且它还牵扯到了一个我更加感兴趣的话题:海量数据处理. 由此,近期凡是空暇时,便在看"Hadoop","MapReduce""海量数据处理"这方面的论文.但在看论

MVC系列——MVC源码学习:打造自己的MVC框架(四:自定义视图)

前言:通过之前的三篇介绍,我们基本上完成了从请求发出到路由匹配.再到控制器的激活,再到Action的执行这些个过程.今天还是趁热打铁,将我们的View也来完善下,也让整个系列相对完整,博主不希望烂尾.对于这个系列,通过学习源码,博主也学到了很多东西,在此还是把博主知道的先发出来,供大家参考. 本文原创地址:http://www.cnblogs.com/landeanfen/p/6019719.html MVC源码学习系列文章目录: MVC系列——MVC源码学习:打造自己的MVC框架(一) MVC