在DataNode的本地存储空间上,与存储服务密切相关的,比如创建数据块,恢复数据块,数据块校验等相关的代码都在org.apache.hadoop.hdfs.server.datanode.fsdataset包下(代码版本CDH5.1)
首先说下org.apache.hadoop.hdfs.server.datanode.fsdataset下的主要接口,FsDatasetSpi接口的方法比较多,主要分三类。第一类主要是和数据块相关的,如创建rbw状态和temporary状态的block,追加block,恢复block,提交block,缓存block,打开block的输出流等。第二类主要是目录管理方面的,获取volume列表,获取block对应的volume,trash目录相关的管理等。第三类主要是FsDataset自身的健康检查,资源回收等。FsDataSetSpi对应cdh3x版本中的FsDataSet接口。
FsVolumeSpi接口主要是用于volume管理,方法比较少,代码如下
public interface FsVolumeSpi { //获取volume下的存储UUID public String getStorageID(); //获取BlockPool的列表 public String[] getBlockPoolList(); //获取可用的存储空间大小 public long getAvailable() throws IOException; //获取volume的基本路径(current目录的父目录路径) public String getBasePath(); //获取volume的绝对路径 public String getPath(String bpid) throws IOException; //获取block的提交目录 public File getFinalizedDir(String bpid) throws IOException; //获取存储类型 public StorageType getStorageType(); }
RollingLogs接口用于对DataNode上的EditLog进行管理,内部包含两个重要的接口。LineIterator用于遍历EditLog的内容,Appender接口用于追加内容到EditLog。
VolumeChoosingPolicy接口提供写入block副本的volume选择策略,实现类有两个,和接口在同一个包下,分别是RoundRobinVolumeChoosingPolicy和AvailableSpaceVolumeChoosingPolicy。
org.apache.hadoop.hdfs.server.datanode.fsdataset.impl主要包括相关接口的实现类和一些工具类。FsDataSet将存储空间分为三个级别,LDir(cdh3x的FSDir),FsVolumeImpl(cdh3x的FsVolume)和FsVolumeList(cdh3x的FsVolumeSet)。LDir代表current目录下的子目录,FsVolumeImpl代表${dfs.datanode.data.dir}中的一项,DataNode可以有多个数据目录,FsVolumeList负责管理FsVolumeImpl对象。
以创建数据块为入口,详细分析下代码的执行流程,在HDFS支持Append特性之前,一个block副本在DataNode的状态要么是finalized,要么是temporary。temporary状态的block会在DataNode重启过程中被删除。但支持了Append之后,HDFS必须为正在构建中的block提供更强的可持久化支持,一些temporary的block副本必须在DataNode重启过程中持久存在。所以在后续的版本中代码的变动比较大。详情参考https://issues.apache.org/jira/browse/HDFS-265。
在cdh3x中,FsDataSet中的writeToBlock方法用于选择block写入的volume,并创建写入的输出流,新版本中将这个方法拆分为三个方法,分别是createRbw,createTemporary和append。详情见https://issues.apache.org/jira/browse/HDFS-543。rbw状态代表一个block副本刚刚被创建或者被Append,temporary则代表DataNode间block的备份和reblance。下面详细分析下客户端写入文件创建block输出流的过程。首先看一段cdh3x的代码以做对比
BlockReceiver(Block block, DataInputStream in, String inAddr, String myAddr, boolean isRecovery, String clientName, DatanodeInfo srcDataNode, DataNode datanode) throws IOException { //通过调用writeToBlock方法创建流对象 streams = datanode.data.writeToBlock(block, isRecovery, clientName == null || clientName.length() == 0); this.finalized = false; if (streams != null) { //获取输出流对象 this.out = streams.dataOut; //获取校验信息输出流对象 this.checksumOut = new DataOutputStream(new BufferedOutputStream(streams.checksumOut, SMALL_BUFFER_SIZE)); } }
cdh3x中通过调用writeToBlock来创建对应的输出流,而在新代码中获取输出流对象的逻辑相对复杂一些。代码如下
//如果是DataNode节点发起的数据块复制或者移动 if (isDatanode) { //创建temporary状态的replica replicaInfo = datanode.data.createTemporary(block); } else { //如果是客户端发起的,根据所处的阶段执行不同的操作 switch (stage) { //pipeline启动创建 case PIPELINE_SETUP_CREATE: //创建rbw状态的replicaInfo replicaInfo = datanode.data.createRbw(block); //向NameNode已经创建block replica的消息 datanode.notifyNamenodeReceivingBlock(block, replicaInfo.getStorageUuid()); break; case PIPELINE_SETUP_STREAMING_RECOVERY: //获取rbw恢复的replicaInfo replicaInfo = datanode.data.recoverRbw(block, newGs, minBytesRcvd, maxBytesRcvd ); //设置block新版本号 block.setGenerationStamp(newGs); break; case PIPELINE_SETUP_APPEND: //block追加操作 replicaInfo = datanode.data.append(block, newGs, minBytesRcvd); if (datanode.blockScanner != null) { //通过blockScanner删除旧的block datanode.blockScanner.deleteBlock(block.getBlockPoolId(),block.getLocalBlock()); } //设置block的新版本号 block.setGenerationStamp(newGs); //向NameNode发送变更消息 datanode.notifyNamenodeReceivingBlock(block, replicaInfo.getStorageUuid()); break; case PIPELINE_SETUP_APPEND_RECOVERY: //获取append恢复的replicaInfo replicaInfo = datanode.data.recoverAppend(block, newGs, minBytesRcvd); if (datanode.blockScanner != null) { //通过blockScanner删除旧的block datanode.blockScanner.deleteBlock(block.getBlockPoolId(), block.getLocalBlock()); } //设置block的新版本号 block.setGenerationStamp(newGs); //向NameNode发送变更消息 datanode.notifyNamenodeReceivingBlock(block, replicaInfo.getStorageUuid()); break; case TRANSFER_RBW: case TRANSFER_FINALIZED: //DataNode之间传输创建的replicaInfo replicaInfo = datanode.data.createTemporary(block); break; default: throw new IOException("Unsupported stage " + stage + " while receiving block " + block + " from " + inAddr); } } this.dropCacheBehindWrites = (cachingStrategy.getDropBehind() == null) ? datanode.getDnConf().dropCacheBehindWrites : cachingStrategy.getDropBehind(); this.syncBehindWrites = datanode.getDnConf().syncBehindWrites; final boolean isCreate = isDatanode || isTransfer || stage == BlockConstructionStage.PIPELINE_SETUP_CREATE; //创建输出流对象 streams = replicaInfo.createStreams(isCreate, requestedChecksum); //获取校验相关的信息 this.clientChecksum = requestedChecksum; this.diskChecksum = streams.getChecksum(); this.needsChecksumTranslation = !clientChecksum.equals(diskChecksum); this.bytesPerChecksum = diskChecksum.getBytesPerChecksum(); this.checksumSize = diskChecksum.getChecksumSize(); //获取数据输出流 this.out = streams.getDataOut();
由于支持了Append操作,block副本所处的状态更多了,代码实现上也更加复杂。下面详细分析下FsDatasetImpl类下的createRbw方法
public synchronized ReplicaInPipeline createRbw(ExtendedBlock b) throws IOException { //从volumeMap中获取replicaInfo,因为是新创建的,如果在volumeMap存在,则不能继续创建 ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(), b.getBlockId()); if (replicaInfo != null) { throw new ReplicaAlreadyExistsException("Block " + b + " already exists in state " + replicaInfo.getState() + " and thus cannot be created."); } //根据相应的规则获取要写入的volume FsVolumeImpl v = volumes.getNextVolume(b.getNumBytes()); //在得到的volume下创建rbw文件 File f = v.createRbwFile(b.getBlockPoolId(), b.getLocalBlock()); //创建ReplicaBeingWritten对象,此对象继承自ReplicaInfo ReplicaBeingWritten newReplicaInfo = new ReplicaBeingWritten(b.getBlockId(), b.getGenerationStamp(), v, f.getParentFile()); //将新创建的replicaInfo加入到volumeMap中 volumeMap.add(b.getBlockPoolId(), newReplicaInfo); return newReplicaInfo; }
在选择写入的volume的时候,用户可以根据dfs.datanode.fsdataset.volume.choosing.policy属性来选择使用何种策略来选择volume。默认提供两种策略,分别是根据volume可用空间和轮询的方法来选择volume,对应的实现类分别是AvailableSpaceVolumeChoosingPolicy和RoundRobinVolumeChoosingPolicy,它们都继承自VolumeChoosingPolicy接口,用户也可以根据自己的需求来自定义选择策略。