对于hdfs的管理,需要namenode及datanode。其中namenode用于记录哪些文件分成了哪几个block(默认是64M),以及这些block分别在哪些datanode结点上,并且负
责将客户端上传到hdfs的文件分成n个block,并选择合适的datanode进行存储。可以配置hadoop文件块备份数,如设置成3。
datanode负责将某个block写入到所在的磁盘上,并定时发送心跳到namenode。如果某台datanode挂了,namenode需要将这台datanode上所有block的副本数在没有这台
datanode后依旧为n份(这里为3)。如:datanode上有block A,这时这台datanode挂了,那么hadoop会选择一台含有block A副本的datanode将block A副本增加到另外一台datanode。
对于map reduce的管理,有一个中心job任务控制进程,即:JobTracker。和n个taskTracker。
JobTraker负责将用户提交的job请求,分解成n个task,然后交给相应的taskTracker进行处理,如果某台taskTracker结点挂了,JobTracker负责将它的任务放在一个
健康的结点上进行处理。
有了这几个组件,看下具体请求处理流程:
当前hadoop block大小配置成64M。jobTracker及namenode在一台机子V上,另外三台机子作为datanode及map reduce计算的机子,分别为A,B,C。副本数为3。这时A,B,C三台机子上都会有R1,R2两个block(R1和R2是文件R切分的两个block)。
整个处理流程如图:
这个图包含了四个角色:客户端job client进程,HDFS(含namenode与datanode),jobTracker,taskTracker。
客户端job负责指明要处理的文件是什么,最终reduce输出到什么地方等等,并且与namenode合作,得到一组要处理的数据分片,即split。
HDFS保证数据的可用性,这部分有namenode(单个)与datanode(可以有多个)来完成HDFS数据的维护,保证向HDFS写的文件的可用性。
JobTracker则作为任务的协调者出现,负责将单个客户端提交的job,分解成多个task,并指派到具体的某个taskTracker中。
taskTracker是一个进程,目前单独运行在一个虚拟机,每次新起一个map或reduce任务都需要单独启一个JVM进程,现在公司用的是这种形式,优点是由于每次新的map或reduce完全是在新的jvm进程中运行,不需要关心数据的状态性。缺点是每次都起一个JVM进程会存在一定开销。
在了解了这几个角色的职责后,以下按照上图分析具体过程。
1.首先客户端运行一个job,分别指明了要处理的Map类与Reduce类,输入文件在哪,输出到哪。接着向jobTracker请求说我要运行一个job。
2.jobTracker将这个请求放入一个队列。当jobTracker从队列取到这个job请求并进行处理时,生成一个job id表明一个job开始,记录下,并返回到客户端。
这时在交给客户端job id时,可能会发生丢失传送,tcp重试多次,如果还是没有成功,估计就将job id取消并标为失败。
当然没有传送成功job id给客户端,也有可能是客户端断网,这时客户端再次连接到JobTracker后,由于之前已经将这个job id标为失败,因此客户端收到这个消息后
也将取消这个job。(目前未验证)
3.客户端拿到这个job id后,需要完成将输入文件分成逻辑上n个分片split,以便交给m个Map进行并行处理。这时需要与namenode通信取得这个文件都在哪几个datanode结点上。假设我们要处理文件是65M,采用默认的HDFS block大小64M。这时将切分成2个split。
客户端将分片split及配置信息上传到HDFS。并通知JobTracker,说我已经准备好配置信息及计算好分片split了。
4.JobTracker接收到信息后,开始将split分片尽量分给单独的map。比如共2个分片,有三个taskTracker结点(datanode结点),这时会通知其中两个taskTracker结点,告诉它们要新启一个JVM进程处理map任务了。并将相应的split id及split及配置信息(客户端上传的)在HDFS中的地址告诉它们。
5.taskTracker接收到JobTracker命令后需要新启一个JVM(当然可以配置成不新启)。
然后进行map。
6.在进入map函数前,map进程会先调用InputFormater的getSplits方法,从HDFS根据传过来的split ID取出需要的分片数据及相应配置信息,并封装成一个List<InputSplit>。其中每个InputSplit代表一个数据分片抽象,含数据分片长度,分片文件在哪,以及读取方法。
接着需要将这些InputSplit转成对应Map的key,value。这里Map的inputFormat采用TextInputFormat。这时input key为行号,value为这行数据。
将所有这些InputSplit都转成key,value集合后,开启map方法处理。处理完成后,将数据进行partion,含对数据以key进行排序。这个过程应该会根据reduce数量,
提前按key对reduce数量取模,然后将数据写入相应磁盘文件,如临时文件0,临时文件1(对应两个reduce)。
6.当两个taskTracker都完成了map任务,它们分别通知JobTracker。JobTracker开启分配x个reduce结点进行处理(取决与你配置最多可以启多少个reduce与当前split数),这里为2个reduce。
7.启动好两个reduce结点后,jobTracker有义务告诉这两个reduce从每个map结点取出自己需要的数据.
8.当某个reduce取到所有map结点数据后,将这些数据合成一个大文件,然后按key进行排序组合,相同的key同一组value会单独进行处理。
如我们文件中,数据如下,第一列为key,第二列为value:
kit 26
kit 100
kit 33
ro 1
这时,假设key为kit的三行数据被流转到reduce1,这时reduce1会按key排序,将key为kit的三行数据作为一个集合。处理时reduce对应的value就是一个Iterable集合(含26,100,33)。
接着进行reduce业务逻辑处理,处理完成后将结果按配置写入到相应的HDFS文件中。
hadoop基础一