Hadoop源代码分析(四零)

有了上面的基础,我们可以来解剖DFSOutputStream了。先看构造函数:

privateDFSOutputStream(String src, longblockSize, Progressable progress,

intbytesPerChecksum) throws IOException

DFSOutputStream(String src, FsPermissionmasked, boolean overwrite,

shortreplication, long blockSize,Progressable progress,

intbuffersize, intbytesPerChecksum) throwsIOException

DFSOutputStream(String src, intbuffersize, Progressable progress,

LocatedBlock lastBlock, FileStatusstat,

intbytesPerChecksum) throwsIOException {

这些构造函数的参数主要有:文件名src;进度回调函数progress(预留接口,目前未使用);数据块大小blockSize;Block副本数replication;每个校验chunk的大小bytesPerChecksum;文件权限masked;是否覆盖原文件标记overwrite;文件状态信息stat;文件的最后一个Block信息lastBlock;buffersize(?未见引用)。
后面两个构造函数会调用第一个构造函数,这个函数会调用父类的构造函数,并设置对象的src,blockSize,progress和checksum属性。
第二个构造函数会调用namenode.create方法,在文件空间中建立文件,并启动DataStreamer,它被DFSClient的create方法调用。第三个构造函数被DFSClient的append方法调用,显然,这种情况比价复杂,文件拥有一些数据块,添加数据往往添加在最后的数据块上。同时,append方法调用时,Client已经知道了最后一个Block的信息和文件的一些信息,如FileStatus中包含的Block大小,文件权限位等等。结合这些信息,构造函数需要计算并设置一些对象成员变量的值,并试图从可能的错误中恢复(调用processDatanodeError),最后启动DataStreamer。
我们先看正常流程,前面已经分析过,通过FSOutputSummer,HDFS客户端能将流转换成package,这个包是通过writeChunk,发送出去的,下面是它们的调用关系。

<ignore_js_op>

在检查完一系列的状态以后,writeChunk先等待,直到dataQueue中未发送的包小于门限值。如果现在没有可用的Packet对象,则创建一个Packet对象,往Packet中写数据,包括校验值和数据。如果数据包被写满,那么,将它放入发送队列dataQueue中。writeChunk的过程比较简单,这里的写入,也只是把数据写到本地队列,等待DataStreamer发送,没有实际写到DataNode上。
createBlockOutputStream用于建立到第一个DataNode的连接,它的声明如下:

private booleancreateBlockOutputStream(DatanodeInfo[] nodes, String client,

booleanrecoveryFlag)

nodes是所有接收数据的DataNode列表,client就是客户端名称,recoveryFlag指示是否是为错误恢复建立的连接。createBlockOutputStream很简单,打开到第一个DataNode的连接,然后发送下面格式的数据包,并等待来自DataNode的Ack。如果出错,记录出错的DataNode在nodes中的位置,设置errorIndex并返回false。
<ignore_js_op>

当recoveryFlag指示为真时,意味着这次写是一次恢复操作,对于DataNode来说,这意味着为写准备的临时文件(在tmp目录中)可能已经存在,需要进行一些特殊处理,具体请看FSDataset的实现。
当Client写数据需要一个新的Block的时候,可以调用nextBlockOutputStream方法。

privateDatanodeInfo[] nextBlockOutputStream(String client) throwsIOException

这个方法的实现很简单,首先调用locateFollowingBlock(包含了重试和出错处理),通过namenode.addBlock获取一个新的数据块,返回的是DatanodeInfo列表,有了这个列表,就可以建立写数据的pipe了。下一个大动作就是调用上面的createBlockOutputStream,建立到DataNode的连接了。
有了上面的准备,我们来分析processDatanodeError,它的主要流程是:

l          参数检查;

l          关闭可能还打开着的blockStream和blockReplyStream;

l          将未收到应答的数据块(在ackQueue中)挪到dataQueue中;

l          循环执行:

1.      计算目前还活着的DataNode列表;

2.      选择一个主DataNode,通过DataNode RPC的recoverBlock方法启动它上面的恢复过程;

3.      处理可能的出错;

4.      处理恢复后Block可能的变化(如Stamp变化);

5.      调用createBlockOutputStream到DataNode的连接。

l          启动ResponseProcessor。

这个过程涉及了DataNode上的recoverBlock方法和createBlockOutputStream中可能的Block恢复,是一个相当耗资源的方法,当系统出错的概率比较小,而且数据块上能恢复的数据很多(平均32M),还是值得这样做的。
写的流程就分析到着,接下来我们来看流的关闭,这个过程也涉及了一系列的方法,它们的调用关系如下:

<ignore_js_op>

flushInternal会一直等待到发送队列(包括可能的currentPacket)和应答队列都为空,这意味着数据都被DataNode顺利接收。
sync作用和UNIX的sync类似,将写入数据持久化。它首先调用父类的flushBuffer方法,将可能还没拷贝到DFSOutputStream的数据拷贝回来,然后调用flushInternal,等待所有的数据都写完。然后调用namenode.fsync,持久化命名空间上的数据。
closeInternal比较复杂一点,它首先调用父类的flushBuffer方法,将可能还没拷贝到DFSOutputStream的数据拷贝回来,然后调用flushInternal,等待所有的数据都写完。接着结束两个工作线程,关闭socket,最后调用amenode.complete,通知NameNode结束一次写操作。close方法先调用closeInternal,然后再本地的leasechecker中移除对应的信息。

更多精彩内容请关注:http://bbs.superwu.cn

关注超人学院微信二维码:

时间: 2024-10-19 09:48:41

Hadoop源代码分析(四零)的相关文章

Hadoop源代码分析

关键字: 分布式云计算 Google的核心竞争技术是它的计算平台.Google的大牛们用了下面5篇文章,介绍了它们的计算设施. GoogleCluster:http://research.google.com/archive/googlecluster.html Chubby:http://labs.google.com/papers/chubby.html GFS:http://labs.google.com/papers/gfs.html BigTable:http://labs.googl

Hadoop源代码分析(完整版)-转载

Hadoop源代码分析(一) http://blog.csdn.net/huoyunshen88/article/details/8611629 关键字: 分布式云计算 Google的核心竞争技术是它的计算平台.Google的大牛们用了下面5篇文章,介绍了它们的计算设施. GoogleCluster:http://research.google.com/archive/googlecluster.html Chubby:http://labs.google.com/papers/chubby.h

Hadoop源代码分析(MapTask辅助类 I)

Hadoop源代码分析(MapTask辅助类 I)MapTask的辅劣类主要针对Mapper的输入和输出.首先我们来看MapTask中用的的Mapper输入,在类图中,返部分位于右上角.MapTask.TrackedRecordReader是一个Wrapper,在原有输入RecordReader的基础上,添加了收集上报统计数据的功能.MapTask.SkippingRecordReader也是一个Wrapper,它在MapTask.TrackedRecordReader的基础上,添加了忽略部分输

Hadoop 源代码分析(二四)FSNamesystem

下面轮到FSNamesystem 出场了.FSNamesystem.java 一共有4573 行,而整个namenode 目录下所有的Java 程序总共也只有16876行,把FSNamesystem 搞定了,NameNode 也就基本搞定.FSNamesystem 是NameNode 实际记录信息的地方,保存在FSNamesystem 中的数据有:文件名数据块列表(存放在FSImage 和日志中)合法的数据块列表(上面关系的逆关系)数据块DataNode(只保存在内存中,根据DataNode 发

Hadoop源代码分析(三二)

搞定ClientProtocol,接下来是DatanodeProtocol部分.接口如下: public DatanodeRegistration register(DatanodeRegistration nodeReg ) throws IOException 用亍DataNode向NameNode登记.输入和输出参数都是DatanodeRegistration,类图如下: 前面讨论DataNode的时候,我们已绊讲过了DataNode的注册过程,我们来看NameNode的过程.下面是主要步

Hadoop源代码分析(MapReduce概论)

大家都熟悉文件系统,在对HDFS进行分析前,我们并没有花很多的时间去介绍HDFS的背景,毕竟大家对文件系统的还是有一定的理解的,而且也有很好的文档.在分析Hadoop的MapReduce部分前,我们还是先了解系统是如何工作的,然后再进入我们的分析部分.下面的图来自http://horicky.blogspot.com/2008/11/hadoop-mapreduce-implementation.html,是我看到的讲MapReduce最好的图. 以Hadoop带的wordcount为例子(下面

Hadoop源代码分析(包hadoop.mapred中的MapReduce接口)

前面已经完成了对org.apache.hadoop.mapreduce的分析,这个包提供了Hadoop MapReduce部分的应用API,用于用户实现自己的MapReduce应用.但这些接口是给未来的MapReduce应用的,目前MapReduce框架还是使用老系统(参考补丁HADOOP-1230).下面我们来分析org.apache.hadoop.mapred,首先还是从mapred的MapReduce框架开始分析,下面的类图(灰色部分为标记为@Deprecated的类/接口): 我们把包m

Hadoop源代码分析(包mapreduce.lib.input)

接下来我们按照MapReduce过程中数据流动的顺序,来分解org.apache.hadoop.mapreduce.lib.*的相关内容,并介绍对应的基类的功能.首先是input部分,它实现了MapReduce的数据输入部分.类图如下: 类图的右上角是InputFormat,它描述了一个MapReduceJob的输入,通过InputFormat,Hadoop可以: l          检查MapReduce输入数据的正确性: l          将输入数据切分为逻辑块InputSplit,这

Hadoop源代码分析(mapreduce.lib.partition/reduce/output)

Map的结果,会通过partition分发到Reducer上,Reducer做完Reduce操作后,通过OutputFormat,进行输出,下面我们就来分析参与这个过程的类. Mapper的结果,可能送到可能的Combiner做合并,Combiner在系统中并没有自己的基类,而是用Reducer作为Combiner的基类,他们对外的功能是一样的,只是使用的位置和使用时的上下文不太一样而已.Mapper最终处理的结果对<key, value>,是需要送到Reducer去合并的,合并的时候,有相同