hadoop核心组件(一)

  hadoop的核心组件:hdfs(分布式文件系统)、mapreduce(分布式计算框架)、Hive(基于hadoop的数据仓库)、HBase(分布式列存数据库)、Zookeeper(分布式协作服务)、Sqoop(数据同步工具)和Flume(日志手机工具)

hdfs(分布式文件系统):

由client、NameNode、DataNode组成

  • client负责切分文件,并与NameNode交互,获取文件位置;与DataNode交互,读取和写入数据
  • NameNode是Master节点,管理HDFS的名称空间和数据块映射信息,配置副本策略,处理客户端请求
  • DataNode是Slave节点,存储实际数据,汇报存储信息给NameNode
  • DataNode与NameNode保持心跳,提交block列表

在hadoop1.x的时候还有Secondary NameNode,负责辅助NameNode,分担其工作量;定期合并fsimage和fsedits,推送给NameNode;紧急情况下,可辅助恢复NameNode

存储模型

(1)文件线性切割成Block  offset

(2)Block分散存储在集群节点中,Block是HDFS的基本存储单元,默认大小是64M

(3)单一文件Block大小一致,文件与文件可以不一致

(4)Block可设置副本数(小于节点数),分散在不同节点

(5)文件上传可以设置Block大小和副本数

(6)已上传的文件Block副本数可以调整,大小不变

(7)只支持一次写入多次读取,同一时刻只有一个写入者

(8)可以append追加数据

架构模型

(1)NameNode节点保存文件元数据

(2)DataNode节点保存文件Block数据

(3)DataNode与NameNode保持心跳,提交Block列表

(4)HdfsClient与NameNode交互元数据信息

(5)HdfsClient与DataNode交互文件Block数据

hdfs结构

一、NameNode(不会与磁盘发生交换)

(1)基于内存存储

  • 只存在内存中
  • 持久化
    • 启动后, 元数据(metadate)信息加载到内存
    • metadata的磁盘文件名为”fsimage”
    • Block的位置信息不会保存到fsimage
    • (journalNode的作用是存放EditLog的)edits记录对metadata的操作日志

(2)功能

  • 接收客户端读写
  • 收集DataNode汇报的block列表信息

(3) metadata

  • 文件ownership, permissions(文件所有权、权限)
  • 文件大小, 时间
  • (block列表,block偏移量)--->会持久化, 位置信息--->不会持久化(启动时候由DataNode汇报过来)
  • block每个副本位置(dataNode上报)

二、DataNode

(1)本地文件形式存储block

(2)存储Block的元数据信息文件

(3)启动DN时会向NN汇报block信息

(4)通过向NameNode发送心跳(3秒一次),如果NameNode 10分钟没有收到,则认为已经lost,并copy其上的block到其它DN

三、SecondaryNameNode/Qurom Journal Manager

合并时机

fs.checkpoint.period  3600s

fs.checkpoint.size  64MB

四、ZooKeeper Failover Controller(HDFS 2.0 HA)

(1)监控NameNode健康状态

(2)向Zookeeper注册NameNode

(3)NameNode挂掉后,ZKFC为NameNode竞争锁,获得ZKFC 锁的NameNode变为active

5、Block副本放置位置

(1)第一个副本:放置在上传文件的DN;如果是集群外提交,则随机挑选一台磁盘不太满,CPU不太忙的节点

(2)第二个副本:放置在于第一个副本不同的机架的节点上

(3)第三个副本:与第二个副本相同机架的节点

(4)更多副本:随机节点

6、安全模式

(1)NameNode启动, fsimage载入内存, 执行edits

(2)成功建立元数据映射后, 创建新的fsimage文件(无需SNN)和空的edits

(3)检查副本数, 数量正常后,过若干时间, 解除安全模式

7、优缺点

优点:

高容错性(多副本, 自动恢复)

适合批处理(计算移动, 数据位置暴露给计算框架(block))

适合大数据处理(GB TB PB级数据)

可构建在廉价机器上

高吞吐

缺点:

高延迟

小文件存取(占用namenode内存, 寻道时间超过读取时间)

并发写入、文件随机修改(一个文件一个写入者, 只能append)

hdfs写流程

client切分文件与NanmeNode交互,获取DataNode列表,验证DataNode后连接DataNode,各节点之间两两交互,确定可用后,client以更小单位流式传输数据;

Block传输数据结束后,DataNode向NameNode汇报Block信息,DataNode向Client汇报完成,Client向NameNode汇报完成,获取去下一个Block存放的DataNode列表,

循环以上步骤,最终client汇报完成,NameNode会在写流程更新文件状态。

hdfs读流程

client与NameNode交互,获取Block存放的DataNode列表(Block副本的位置信息),线性和DataNode交互,获取Block,最终合并为一个文件,其中,在Block副本列表中按距离择优选取DataNode节点获取Block块。

mapreduce(分布式计算框架)

MR运行原理:

1、客户端提交作业之前,检查输入输出路径,首先创建切片列表

反射出作业中设置的input对象,默认是TextInputFormat类

通过input类得到切片列表(getSpilits()方法)

最小值 minSize 默认为1,如果设置就取设置的值

最大值 maxSize 默认为long的最大值

根据输入路径取出文件,获取每个文件的所有block列表,接着创建splits列表(包含文件名,偏移量,长度和位置信息)

切片大小根据最大最小值取,默认为block的大小

一个split对应一个map

提交作业到集群(submitJob()方法)

2、mapInput:

input.initialize    输入初始化

拿到taskContext(上下文)

创建mapper(默认为Mapper类,一般取用户设置的)

获取InputFomat类(输入格式化的类)

获取split

根据以上信息创建input(NewTrackingRecordReader)

input初始化

获取split的开始和结束位置和文件,开启对文件的IO流,将起始偏移量个IO设置一下

如果不是第一个切片(split),每次读取放弃第一行(跳过第一行数据),只有第一个切片才会读取第一行数据

mapper.run

3、output:

MapOutputBuffer初始化

环形缓冲区的阈值0.8、大小(100M)    默认值

sorter :QuickSort算法

反射获取比较器 OutputKeyComparator

排序,溢写,一些一次触发一次combiner

溢写达到3次的时候还会触发一次combiner

通过反射获取Partitioner类,默认为HashPartitoner

write(k,v)

collector.collect(key,value,partition)

output.close()

merger

如果numSplits<minSpillsForCombiner 判断溢写的次数是不是小于设置的合并的溢写次数(默认是3),成立的话combiner

4、reduce:

shuffle:copy

sort:SecondarySort

reduce

1、mapreduce shuffle

(1)maptask的输入是hdfs上的block块,maptask只读取split,block与split的对应关系默认是一对一

(2)进过map端的运行后,输出的格式为key/value,Mapreduce提供接口partition,他的作用是根据maptask输出的key hash后与reduce数量取模,

来决定当前的输出对应到哪个reduce处理,也可以自定义partition

(3)map运行后的数据序列化到缓冲区,默认这个缓冲区大小为100M,作用是收集这个map的结果,当数据达到溢写比例(默认是spill.percent=0.8)后,所定这80M的内存,

对这80M内存中的key做排序(sort),maptask的输出结果还可以往剩下的20M内存中写,互不影响。之后执行溢写的线程会往磁盘中写数据。每次溢写都会产生一个溢写小文件,

map执行完后,会合并这些溢写小文件,这个过程叫Merge。

(4)如果客户端设置了Combiner,那么会优化MapReduce的中间结果,合并map端的数据(相当于reduce端的预处理),Combiner不能改变最终的计算结果。

(5)reduce在执行之前就是从各个maptask执行完后的溢写文件中拿到所对应的数据,然后做合并(Merge),最终形成的文件作为reduce的输入文件,这个过程是归并排序。

最后就是reduce计算,把结果放到hdfs上面。

hdfs参数调优

io.file.buffer.size:4096 (core-default.xml) SequenceFiles在读写中可以使用缓存大小,可减少I/O次数;在大型Hadoop cluster,建议可设定为65536-131072
dfs.blockes:134217728( hdfs-default.xml ) hdfs中一个文件的Block块的大小,CDH5中默认为128M;设置太大影响map同时计算的数量,设置较少会浪费map个数资源
mapred.reduce.tasks(mapreduce.job.reduces):1 默认启动的reduce数
mapreduce.task.io.sort.factor:10 reduce task中合并文件时,一次合并的文件数据
mapred.child.java.opts:-Xmx200m jvm启动子线程可以使用的最大内存
mapred.reduce.parallel.copies:5 Reduce copy数据的线程数量,默认值是5
mapreduce.tasktracker.http.threads:40 map和reduce是通过http进行传输的,这个设置传输的并行线程数
mapreduce.map.output.compress:flase map输出是否进行压缩,如果压缩就会多耗cpu,但是减少传输时间,如果不压缩,就需要较多的传输带宽。配合 mapreduce.map.output.compress.codec使用,默认是 org.apache.hadoop.io.compress.DefaultCodec,可以根据需要设定数据压缩方式。
mapreduce.tasktracker.tasks.reduce.maximum:2 一个tasktracker并发执行的reduce数,建议为cpu核数
mapreduce.map.sort.spill.percent:0.8 溢写比例
min.num.spill.for.combine:3 spill的文件达到设置的参数进行combiner

避免推测执行

mapred.map.tasks.speculative.execution=true
mapred.reduce.tasks.speculative.execution=true

自定义partition

适当添加combiner

自定义reduce端的grouping Comparator

- mapred.reduce.tasks:手动设置reduce个数

- mapreduce.map.output.compress:map输出结果是否压缩

- mapreduce.map.output.compress.codec

- mapreduce.output.fileoutputformat.compress:job输出结果是否压缩

- mapreduce.output.fileoutputformat.compress.type

- mapreduce.output.fileoutputformat.compress.codec

9、调优文件以及参数

一、调优的目的

充分的利用机器的性能,更快的完成mr程序的计算任务。甚至是在有限的机器条件下,能够支持运行足够多的mr程序。

二、调优的总体概述

从mr程序的内部运行机制,我们可以了解到一个mr程序由mapper和reducer两个阶段组成,其中mapper阶段包括数据的读取、map处理以及写出操作(排序和合并/sort&merge),而reducer阶段包含mapper输出数据的获取、数据合并(sort&merge)、reduce处理以及写出操作。那么在这七个子阶段中,能够进行较大力度的进行调优的就是map输出、reducer数据合并以及reducer个数这三个方面的调优操作。也就是说虽然性能调优包括cpu、内存、磁盘io以及网络这四个大方面,但是从mr程序的执行流程中,我们可以知道主要有调优的是内存、磁盘io以及网络。在mr程序中调优,主要考虑的就是减少网络传输和减少磁盘IO操作,故本次课程的mr调优主要包括服务器调优、代码调优、mapper调优、reducer调优以及runner调优这五个方面。

三、服务器调优

服务器调优主要包括服务器参数调优和jvm调优。在本次项目中,由于我们使用hbase作为我们分析数据的原始数据存储表,所以对于hbase我们也需要进行一些调优操作。除了参数调优之外,和其他一般的java程序一样,还需要进行一些jvm调优。

hdfs调优

1. dfs.datanode.failed.volumes.tolerated: 允许发生磁盘错误的磁盘数量,默认为0,表示不允许datanode发生磁盘异常。当挂载多个磁盘的时候,可以修改该值。

2. dfs.replication: 复制因子,默认3

3. dfs.namenode.handler.count: namenode节点并发线程量,默认10

4. dfs.datanode.handler.count:datanode之间的并发线程量,默认10。

5. dfs.datanode.max.transfer.threads:datanode提供的数据流操作的并发线程量,默认4096。

一般将其设置为linux系统的文件句柄数的85%~90%之间,查看文件句柄数语句ulimit -a,修改vim /etc/security/limits.conf, 不能设置太大文件末尾,添加

* soft nofile 65535

* hard nofile 65535

注意:句柄数不能够太大,可以设置为1000000以下的所有数值,一般不设置为-1。

异常处理:当设置句柄数较大的时候,重新登录可能出现unable load session的提示信息,这个时候采用单用户模式进行修改操作即可。

单用户模式:

启动的时候按‘a‘键,进入选择界面,然后按‘e‘键进入kernel修改界面,然后选择第二行‘kernel...‘,按‘e‘键进行修改,在最后添加空格+single即可,按回车键回到修改界面,最后按‘b‘键进行单用户模式启动,当启动成功后,还原文件后保存,最后退出(exit)重启系统即可。

6. io.file.buffer.size: 读取/写出数据的buffer大小,默认4096,一般不用设置,推荐设置为4096的整数倍(物理页面的整数倍大小)。

mapreduce调优

1. mapreduce.task.io.sort.factor: mr程序进行合并排序的时候,打开的文件数量,默认为10个.

2. mapreduce.task.io.sort.mb: mr程序进行合并排序操作的时候或者mapper写数据的时候,内存大小,默认100M

3. mapreduce.map.sort.spill.percent: mr程序进行flush操作的阀值,默认0.80。

4. mapreduce.reduce.shuffle.parallelcopies:mr程序reducer copy数据的线程数,默认5。

5. mapreduce.reduce.shuffle.input.buffer.percent: reduce复制map数据的时候指定的内存堆大小百分比,默认为0.70,适当的增加该值可以减少map数据的磁盘溢出,能够提高系统性能。

6. mapreduce.reduce.shuffle.merge.percent:reduce进行shuffle的时候,用于启动合并输出和磁盘溢写的过程的阀值,默认为0.66。如果允许,适当增大其比例能够减少磁盘溢写次数,提高系统性能。同mapreduce.reduce.shuffle.input.buffer.percent一起使用。

7. mapreduce.task.timeout:mr程序的task执行情况汇报过期时间,默认600000(10分钟),设置为0表示不进行该值的判断。

四、代码调优

代码调优,主要是mapper和reducer中,针对多次创建的对象,进行代码提出操作。这个和一般的java程序的代码调优一样。

五、mapper调优

mapper调优主要就是就一个目标:减少输出量。我们可以通过增加combine阶段以及对输出进行压缩设置进行mapper调优。

combine介绍:

实现自定义combine要求继承reducer类,特点:

以map的输出key/value键值对作为输入输出键值对,作用是减少网络输出,在map节点上就合并一部分数据。

比较适合,map的输出是数值型的,方便进行统计。

压缩设置:

在提交job的时候分别设置启动压缩和指定压缩方式。

六、reducer调优

reducer调优主要是通过参数调优和设置reducer的个数来完成。

reducer个数调优:

要求:一个reducer和多个reducer的执行结果一致,不能因为多个reducer导致执行结果异常。

规则:一般要求在hadoop集群中的执行mr程序,map执行完成100%后,尽量早的看到reducer执行到33%,可以通过命令hadoop job -status job_id或者web页面来查看。

原因: map的执行process数是通过inputformat返回recordread来定义的;而reducer是有三部分构成的,分别为读取mapper输出数据、合并所有输出数据以及reduce处理,其中第一步要依赖map的执行,所以在数据量比较大的情况下,一个reducer无法满足性能要求的情况下,我们可以通过调高reducer的个数来解决该问题。

优点:充分利用集群的优势。

缺点:有些mr程序没法利用多reducer的优点,比如获取top n的mr程序。

七、runner调优

runner调优其实就是在提交job的时候设置job参数,一般都可以通过代码和xml文件两种方式进行设置。

1~8详见ActiveUserRunner(before和configure方法),9详解TransformerBaseRunner(initScans方法)

1. mapred.child.java.opts: 修改childyard进程执行的jvm参数,针对map和reducer均有效,默认:-Xmx200m

2. mapreduce.map.java.opts: 需改map阶段的childyard进程执行jvm参数,默认为空,当为空的时候,使用mapred.child.java.opts。

3. mapreduce.reduce.java.opts:修改reducer阶段的childyard进程执行jvm参数,默认为空,当为空的时候,使用mapred.child.java.opts。

4. mapreduce.job.reduces: 修改reducer的个数,默认为1。可以通过job.setNumReduceTasks方法来进行更改。

5. mapreduce.map.speculative:是否启动map阶段的推测执行,默认为true。其实一般情况设置为false比较好。可通过方法job.setMapSpeculativeExecution来设置。

6. mapreduce.reduce.speculative:是否需要启动reduce阶段的推测执行,默认为true,其实一般情况设置为fase比较好。可通过方法job.setReduceSpeculativeExecution来设置。

7. mapreduce.map.output.compress:设置是否启动map输出的压缩机制,默认为false。在需要减少网络传输的时候,可以设置为true。

8. mapreduce.map.output.compress.codec:设置map输出压缩机制,默认为org.apache.hadoop.io.compress.DefaultCodec,推荐使用SnappyCodec(在之前版本中需要进行安装操作,现在版本不太清楚,安装参数:http://www.cnblogs.com/chengxin1982/p/3862309.html)

9. hbase参数设置

由于hbase默认是一条一条数据拿取的,在mapper节点上执行的时候是每处理一条数据后就从hbase中获取下一条数据,通过设置cache值可以一次获取多条数据,减少网络数据传输。

源码:

1、设置map端的数量:mapreduce.input.fileinputformat.split.minsize

位置FileInputFormat.getSplits()方法

(1)输入文件size巨大,但不是小文件

减小map的数量:增大mapred.min.split.size的值

(2)输入文件数量巨大,且都是小文件

使用FileInputFormat衍生的CombineFileInputFormat将多个input path合并成一个InputSplit送给mapper处理,从而减少mapper的数量

2、增加Map-Reduce job 启动时创建的Mapper数量

可以通过减小每个mapper的输入做到,即减小blockSize或者减小mapred.min.split.size的值,设置blockSize一般不可行

原文地址:https://www.cnblogs.com/asura7969/p/8319127.html

时间: 2024-10-27 01:47:25

hadoop核心组件(一)的相关文章

hadoop基础概念之Hadoop核心组件

认知和学习Hadoop,我们必须得了解Hadoop的构成,我根据自己的经验通过Hadoop构件.大数据处理流程,Hadoop核心三个方面进行一下介绍: Hadoop组件 由图我们可以看到Hadoop组件由底层的Hadoop核心构件以及上层的Hadoop生态系统共同集成,而上层的生态系统都是基于下层的存储和计算来完成的. 首先我们来了解一下核心构件:Mapreduce和HDFS.核心组件的产生都是基于Google的思想来的,Google的GFS带来了我们现在所认识的HDFS,Mapreduce带来

Hadoop核心组件:四步通晓HDFS

Hadoop分布式文件系统(HDFS)被设计成适合运行在通用硬件上的分布式文件系统,它提供高吞吐量来访问应用程序的数据,适合那些有着超大数据集的应用程序,那么在实际应用中我们如何来操作使用呢? 一.HDFS操作方式: 1.  命令行操作 –FsShell:$ hdfs dfs 2. 其他的计算框架-如spark 通过URI,比如: hdfs://nnhost:port/file-,调用HDFS的协议.主机.端口或者对外提供的服务媒.以及文件,在spark的程序中实现对HDFS的访问. 3.  其

Hadoop核心组件

1.Hadoop生态系统 2.HDFS(Hadoop分布式文件系统) 源自于Google的GFS论文,发表于2003年10月,HDFS是GFS克隆版. 是Hadoop体系中数据存储管理的基础.它是一个高度容错的系统,能检测和应对硬件故障,用于在低成本的通用硬件上运行.HDFS简化了文件的一致性模型,通过流式数据访问,提供高吞吐量应用程序数据访问功能,适合带有大型数据集的应用程序. Client:切分文件:访问HDFS:与NameNode交互,获取文件位置信息:与DataNode交互,读取和写入数

hadoop核心组件(三)

HBase简介: HBase---Hadoop DataBase,是一个高可靠.高性能.面向列.可存储.实时读写的分布式数据库 利用HBase HDFS作为其文件存储系统 HBase数据模型: (1)RowKey: 决定一行数据,按照字典顺序排序,RowKey只能存储64K字节数据 (2)Column Family列族 & qualifier列: HBase表中某个列都归属某个列族,列族必须作为表模式(schema)定义的一部分预先给出. 列名以列族作为前缀,每个列族都可以有多个列成员(clou

Hadoop核心组件之MapReduce

## MapReduce概述- Google MapReduce的克隆版本- 优点:海量数据的离线处理,易开发,易运行- 缺点:实时流式计算Hadoop MapReduce是一个软件框架,用于轻松编写应用程序,以可靠,容错的方式在大型集群(数千个节点)的商用硬件上并行处理大量数据(多TB数据集) ## MapReduce编程模型**思想:分而治之**MapReduce作业通常将输入数据集拆分为独立的块,这些块由map任务以完全并行的方式处理.框架对map的输出进行排序,然后输入到reduce任务

hadoop核心组件zookeeper简介与特点

zookeeper是一个针对大型分布式系统的可靠协调系统. 1)它提供的功能包括:配置维护,名字服务,分布式同步,组服务等: 2)它的目标就是封装好复杂易出错的关键服务,将简单易用的接口和性能高效,功能稳定的系统提供给用户: 特点: 1,最终一致性:为客户端展示同一试图 2,可靠性:如果一条消息被一台服务器接受,那么它将被所有的服务器接受. 3,实时性:zookeeper不能保证两个客户端同时得到刚刚更新的数据,如果需要最新数据,应该在读数据之前调用sync()接口. 4,等待无关(wait-f

大数据以及Hadoop相关概念介绍

一.大数据的基本概念 1.1.什么是大数据 大数据指的就是要处理的数据是TB级别以上的数据.大数据是以TB级别起步的.在计算机当中,存放到硬盘上面的文件都会占用一定的存储空间,例如: 文件占用的存储空间代表的就是该文件的大小,在计算机当中,文件的大小可以采用以下单位来表示,各个单位之间的转换关系如下: 平时我们在我们自己的电脑上面常见的就是Byte.KB.MB.GB这几种,那么究竟什么是大数据呢,大数据的起步是以TB级别开始的,1TB=1024GB,而我们处理的数据可能会到达PB级别,1PB=1

Hadoop深入浅出-001

Doc By xvGe  Hadoop深入浅出-001 什么是Hadoop? The Apache Hadoop project develops open-source software for reliable,scalable,distributed,computing. Hadoop解决的问题: --海量数据存储 --海量数据分析 --资源管理调度 作者:Doug Cutting ********************************* (1)hadoop核心组件及文件系统概念

《写给大忙人的Hadoop 》读书笔记(二)Hadoop介绍

本文从Hadoop是什么.能做什么.谁使用三个主要问题去讲解.对于重点知识如MapReduce.YARN.Hadoop生态系统内的其他组件(HBase.Pig.Hive.Zookeeper等)的介绍和使用这里不做详细说明,需要大家对这些内容扩展并深入学习. 1.Hadoop是什么及功能 1.2002年,Hadoop作为一个支持Web搜索引擎的单独软件项目被创立,如今已成为一个用来分析大量不同类型数据的工具和应用程序(如HBase,Hive等)的生态系统. Hadoop数据处理的一些功能如下: 1