HDFS读写数据块--${dfs.data.dir}选择策略

  最近工作需要,看了HDFS读写数据块这部分。不过可能跟网上大部分帖子不一样,本文主要写了${dfs.data.dir}的选择策略,也就是block在DataNode上的放置策略。我主要是从我们工作需要的角度来读这部分代码的。

  

1 hdfs-site.xml
2 <property>
3   <name>dfs.data.dir</name>
4   <value>/mnt/datadir1/data,/mnt/datadir2/data,/mnt/datadir3/data</value>
5 </property>

  所谓${dfs.data.dir}的选择策略,就是当DataNode配置有多个${dfs.data.dir}目录时(如上面的配置),该选择哪个目录来存放block。一般多个硬盘分别挂载到不同的${dfs.data.dir}下,所以存储block是要决定block该放到哪个磁盘上。

  创建文件总共有两步:

  1、在写block之前,需要与NameNode通信来生成文件(INodeFile、INodeFileUnderConstruction)。首先在DFSClient端的create()方法中发起写请求,然后通过RPC由NameNode最终调用FSNameSystem的startFileInternal()方法来创建文件。

  1   private void startFileInternal(String src,
  2                                               PermissionStatus permissions,
  3                                               String holder,
  4                                               String clientMachine,
  5                                               boolean overwrite,
  6                                               boolean append,
  7                                               boolean createParent,
  8                                               short replication,
  9                                               long blockSize
 10                                               ) throws IOException {
 11     if (NameNode.stateChangeLog.isDebugEnabled()) {
 12       NameNode.stateChangeLog.debug("DIR* startFile: src=" + src
 13           + ", holder=" + holder
 14           + ", clientMachine=" + clientMachine
 15           + ", createParent=" + createParent
 16           + ", replication=" + replication
 17           + ", overwrite=" + overwrite
 18           + ", append=" + append);
 19     }
 20
 21     FSPermissionChecker pc = getPermissionChecker();
 22     synchronized (this) {
 23       if (isInSafeMode())
 24         throw new SafeModeException("Cannot create " + src, safeMode);
 25       if (!DFSUtil.isValidName(src)) {
 26         throw new IOException("Invalid name: " + src);
 27       }
 28
 29       // Verify that the destination does not exist as a directory already.
 30       boolean pathExists = dir.exists(src);
 31       if (pathExists && dir.isDir(src)) {
 32         throw new IOException("Cannot create "+ src + "; already exists as a directory");
 33       }
 34
 35       if (isPermissionEnabled) {
 36         if (append || (overwrite && pathExists)) {
 37           checkPathAccess(pc, src, FsAction.WRITE);
 38         } else {
 39           checkAncestorAccess(pc, src, FsAction.WRITE);
 40         }
 41       }
 42
 43       if (!createParent) {
 44         verifyParentDir(src);
 45       }
 46
 47       try {
 48         INode myFile = dir.getFileINode(src); //根据路径寻找该文件
 49         recoverLeaseInternal(myFile, src, holder, clientMachine, false);
 50
 51         try {
 52           verifyReplication(src, replication, clientMachine);
 53         } catch (IOException e) {
 54           throw new IOException("failed to create " + e.getMessage());
 55         }
 56         if (append) {//若是追加操作
 57           if (myFile == null) {
 58             throw new FileNotFoundException("failed to append to non-existent "
 59                 + src + " on client " + clientMachine);
 60           } else if (myFile.isDirectory()) {
 61             throw new IOException("failed to append to directory " + src
 62                 + " on client " + clientMachine);
 63           }
 64         } else if (!dir.isValidToCreate(src)) {
 65           if (overwrite) {//允许覆盖原来的文件
 66             delete(src, true);
 67           } else {
 68             throw new IOException("failed to create file " + src
 69                 + " on client " + clientMachine
 70                 + " either because the filename is invalid or the file exists");
 71           }
 72         }
 73
 74         DatanodeDescriptor clientNode = host2DataNodeMap
 75             .getDatanodeByHost(clientMachine);
 76
 77         if (append) {
 78           //
 79           // Replace current node with a INodeUnderConstruction.
 80           // Recreate in-memory lease record.
 81           //
 82           INodeFile node = (INodeFile) myFile;
 83           INodeFileUnderConstruction cons = new INodeFileUnderConstruction(
 84               node.getLocalNameBytes(), node.getReplication(),
 85               node.getModificationTime(), node.getPreferredBlockSize(),
 86               node.getBlocks(), node.getPermissionStatus(), holder,
 87               clientMachine, clientNode);
 88           dir.replaceNode(src, node, cons);
 89           leaseManager.addLease(cons.clientName, src);
 90
 91         } else {
 92           // Now we can add the name to the filesystem. This file has no
 93           // blocks associated with it.
 94           //
 95           checkFsObjectLimit();
 96
 97           // increment global generation stamp
 98           long genstamp = nextGenerationStamp();
 99           INodeFileUnderConstruction newNode = dir.addFile(src, permissions,
100               replication, blockSize, holder, clientMachine, clientNode,
101               genstamp);
102           if (newNode == null) {
103             throw new IOException("DIR* startFile: Unable to add to namespace");
104           }
105           leaseManager.addLease(newNode.clientName, src);
106           if (NameNode.stateChangeLog.isDebugEnabled()) {
107             NameNode.stateChangeLog.debug("DIR* startFile: "
108                                        +"add "+src+" to namespace for "+holder);
109           }
110         }
111       } catch (IOException ie) {
112         NameNode.stateChangeLog.warn("DIR* startFile: "
113                                      +ie.getMessage());
114         throw ie;
115       }
116     }
117   }

startFileInternal()

  该方法的主要内容如下:

  1)首先做一些检查(安全模式、权限、该路径是否已经以文件夹形式存在等)

  2)若不是追加操作:

  生成generation stamp(针对每个文件生成一个);并构造INodeFileUnderConstruction对象(preferredBlockSize);将这个文件添加到filesystem;添加租约(即有时间限制的写锁);

若是追加操作:

  将src下的INodeFile替换成INodeFileUnderConstruction;添加租约;

  2、在NameNode端生成文件之后,client向NameNode申请block,并将其写入到DataNode。在上面的工作完成后,就启动DataStreamer线程来向DataNode中写入block。整个流程如下:

  1)一些前期检查

  2)向NameNode申请block(与NameNode有一次通信)

    a. 根据副本放置策略,选择N个DataNode作为block的放置位置;

    b. 随机生成一个不重复的blockID;

    c. 把该block添加到对应的文件;

  3)将目标DN组织成pipeline,并向第一个DN发送Packet

  选择其中几个比较重要的方法分析下:

 1  /**
 2    * The client would like to obtain an additional block for the indicated
 3    * filename (which is being written-to).  Return an array that consists
 4    * of the block, plus a set of machines.  The first on this list should
 5    * be where the client writes data.  Subsequent items in the list must
 6    * be provided in the connection to the first datanode.
 7    *
 8    * Make sure the previous blocks have been reported by datanodes and
 9    * are replicated.  Will return an empty 2-elt array if we want the
10    * client to "try again later".
11    */
12   //向NameNode申请block
13   public LocatedBlock getAdditionalBlock(String src,
14                                          String clientName,
15                                          HashMap<Node, Node> excludedNodes
16                                          ) throws IOException {
17     long fileLength, blockSize;
18     int replication;
19     DatanodeDescriptor clientNode = null;
20     Block newBlock = null;
21
22     NameNode.stateChangeLog.debug("BLOCK* getAdditionalBlock: "
23                                   +src+" for "+clientName);
24
25     synchronized (this) {
26       if (isInSafeMode()) {//check safemode first for failing-fast
27         throw new SafeModeException("Cannot add block to " + src, safeMode);
28       }
29       // have we exceeded the configured limit of fs objects.
30       checkFsObjectLimit();
31
32       INodeFileUnderConstruction pendingFile  = checkLease(src, clientName);
33
34       //
35       // If we fail this, bad things happen!
36       //
37       if (!checkFileProgress(pendingFile, false)) {
38         throw new NotReplicatedYetException("Not replicated yet:" + src);
39       }
40       fileLength = pendingFile.computeContentSummary().getLength();
41       blockSize = pendingFile.getPreferredBlockSize();
42       clientNode = pendingFile.getClientNode();
43       replication = (int)pendingFile.getReplication();
44     }
45
46     // choose targets for the new block to be allocated.
47     //选择副本存放的位置
48     DatanodeDescriptor targets[] = replicator.chooseTarget(src,
49                                                            replication,
50                                                            clientNode,
51                                                            excludedNodes,
52                                                            blockSize);
53     if (targets.length < this.minReplication) {
54       throw new IOException("File " + src + " could only be replicated to " +
55                             targets.length + " nodes, instead of " +
56                             minReplication);
57     }
58
59     // Allocate a new block and record it in the INode.
60     synchronized (this) {
61       if (isInSafeMode()) { //make sure it is not in safemode again.
62         throw new SafeModeException("Cannot add block to " + src, safeMode);
63       }
64       INode[] pathINodes = dir.getExistingPathINodes(src);
65       int inodesLen = pathINodes.length;
66       checkLease(src, clientName, pathINodes[inodesLen-1]);
67       INodeFileUnderConstruction pendingFile  = (INodeFileUnderConstruction)
68                                                 pathINodes[inodesLen - 1];
69
70       if (!checkFileProgress(pendingFile, false)) {
71         throw new NotReplicatedYetException("Not replicated yet:" + src);
72       }
73
74       // allocate new block record block locations in INode.
75       //分配block,并随机生成一个不重复的blockID,然后在INode中记录该block
76       newBlock = allocateBlock(src, pathINodes);
77       pendingFile.setTargets(targets);
78
79       for (DatanodeDescriptor dn : targets) {
80         dn.incBlocksScheduled();
81       }
82       dir.persistBlocks(src, pendingFile);
83     }
84     if (persistBlocks) {
85       getEditLog().logSync();
86     }
87
88     // Create next block
89     LocatedBlock b = new LocatedBlock(newBlock, targets, fileLength);
90     if (isAccessTokenEnabled) {
91       b.setBlockToken(accessTokenHandler.generateToken(b.getBlock(),
92           EnumSet.of(BlockTokenSecretManager.AccessMode.WRITE)));
93     }
94     return b;
95   }

getAdditionalBlock

  上面的方法还涉及到了块的选择策略,这个留在下一篇再说。下面这个图来总结下上面方法的调用层次:

  最后重点说一下block在DataNode上的存储策略。其调度层次如下:

  首先说一下其中涉及到的数据结构:

1  class FSVolume {    //卷信息,代表${dfs.data.dir}
2     private File currentDir;      //存放block,即${dfs.data.dir}/current
3     private FSDir dataDir;        //表示currentDir有哪些块文件
4     private File tmpDir;          //存放一些临时文件,即${dfs.data.dir}/tmp
5     private File blocksBeingWritten;    //放置正在写的block,即${dfs.data.dir}/ blocksBeingWritten
6     private File detachDir;       //是否写分离,即${dfs.data.dir}/detach
7     private DF usage;
8     private DU dfsUsage;
9     private long reserved;
1   static class FSVolumeSet {  //卷信息集合,代表多个${dfs.data.dir}
2     FSVolume[] volumes = null;    //代表多个FSVolume,并将其组织成一个数组
3     int curVolume = 0;            //指示当前正在使用哪一个FSVolume  

  FSVolumeSet 代表多个${dfs.data.dir}目录的集合,它将这些目录组织成一个数组volumes,然后用curVolume来指示当前正在使用的是哪个${dfs.data.dir}目录。${dfs.data.dir}的选择策略如下:

  当有多个${dfs.data.dir}时,DataNode顺序地从volumes选择一个FSVolume用来存放block(先放在blocksBeingWritten目录下,写完后再转移到current目录下);每次写完一个block, curVolume增1。以此实现多个${dfs.data.dir}目录的轮流写。该策略在FSDataSet.FSVolumeSet的getNextVolume()方法中实现

 1    synchronized FSVolume getNextVolume(long blockSize) throws IOException {
 2
 3       if(volumes.length < 1) {
 4         throw new DiskOutOfSpaceException("No more available volumes");
 5       }
 6
 7       // since volumes could‘ve been removed because of the failure
 8       // make sure we are not out of bounds
 9       if(curVolume >= volumes.length) {
10         curVolume = 0;
11       }
12
13       int startVolume = curVolume;
14
15       while (true) {
16         FSVolume volume = volumes[curVolume];
17         curVolume = (curVolume + 1) % volumes.length;    //增1
18         if (volume.getAvailable() > blockSize) { return volume; }
19         if (curVolume == startVolume) {
20           throw new DiskOutOfSpaceException("Insufficient space for an additional block");
21         }
22       }
23     }

  本文基于hadoop1.2.1

  如有错误,还请指正

  参考文章:《Hadoop技术内幕 深入理解MapReduce架构设计与实现原理》 董西成

  转载请注明出处:

时间: 2024-10-25 20:06:16

HDFS读写数据块--${dfs.data.dir}选择策略的相关文章

Hadoop Datanode节点无法启动(All directories in dfs.data.dir are invalid)

Hadoop Datanode节点无法启动(All directories in dfs.data.dir are invalid) java.io.IOException: All directories in dfs.datanode.data.dir are invalid: "/usr/local/hadoop-2.4.0/dfs/data" at org.apache.hadoop.hdfs.server.datanode.DataNode.checkStorageLocat

${mapred.local.dir}选择策略--Map Task存放中间结果

上篇说了block在DataNode配置有多个${dfs.data.dir}时的存储策略,本文主要介绍TaskTracker在配置有多个${mapred.local.dir}时的选择策略. 1 mapred-site.xml 2 <property> 3 <name>mapred.local.dir</name> 4 <value>/mnt/localdir1/local,/mnt/localdir2/local,/mnt/localdir3/local&l

Hadoop -- HDFS 读写数据

一.HDFS读写文件过程 1.读取文件过程 1)       初始化FileSystem,然后客户端(client)用FileSystem的open()函数打开文件 2)       FileSystem用RPC调用元数据节点,得到文件的数据块信息,对于每一个数据块,元数据节点返回保存数据块的数据节点的地址. 3)       FileSystem返回FSDataInputStream给客户端,用来读取数据,客户端调用stream的read()函数开始读取数据. 4)       DFSInpu

DataNode引用计数磁盘选择策略

前言 在HDFS中,所有的数据都是存在各个DataNode上的.而这些DataNode上的数据都是存放于节点机器上的各个目录中的,而一般每个目录我们会对应到1个独立的盘,以便我们把机器的存储空间基本用上.这么多的节点,这么多块盘,HDFS在进行写操作时如何进行有效的磁盘选择呢,选择不当必然造成写性能下降,从而影响集群整体的性能.本文来讨论一下目前HDFS中存在的几个磁盘选择策略的特点和不足,然后针对其不足,自定义1个新的磁盘选择策略. HDFS现有磁盘选择策略 上文前言中提到,随着节点数的扩增,

如何选择容器,主要从存放要求和读写数据效率两方面考虑

1 ,存放要求 无序:set,不能重复. 有序:List,允许重复 “key-value”对:Map 2 ,读写效率 Hash:两者都最高 Array:读快改慢 Linked:读慢改快 Tree:加入元素可排序使用 如何选择容器,主要从存放要求和读写数据效率两方面考虑,布布扣,bubuko.com

HDFS上读写数据的流程解释

文件的读取 文件读取的过程如下: 1)解释一  客户端(client)用FileSystem的open()函数打开文件.  DistributedFileSystem用RPC调用元数据节点,得到文件的数据块信息.  对于每一个数据块,元数据节点返回保存数据块的数据节点的地址.  DistributedFileSystem返回FSDataInputStream给客户端,用来读取数据.  客户端调用stream的read()函数开始读取数据.  DFSInputStream连接保存此文件

大数据系列文章-Hadoop的HDFS读写流程(二)

在介绍HDFS读写流程时,先介绍下Block副本放置策略. Block副本放置策略 第一个副本:放置在上传文件的DataNode:如果是集群外提交,则随机挑选一台磁盘不太满,CPU不太忙的节点. 第二个副本:放置在与第一个副本不同的机架的节点上. 第三个副本:与第二个副本相同机架的节点. 更多副本:随机节点. HDFS写流程 客户端发请求给NameNode,我想保存一个文件A,这时候在NameNode会有一个标识,标识为A_copy(文件不可用). 根据副本放置策略,返回三个副本的可放置位置列表

Hadoop之HDFS(HDFS的数据流读写数据) (面试开发重点)

1 HDFS写数据流程 1.1 剖析文件写入 HDFS写数据流程,如图所示 1)客户端通过Distributed FileSystem模块向NameNode请求上传文件,NameNode检查目标文件是否已存在,父目录是否存在. 2)NameNode返回是否可以上传. 3)客户端请求第一个 Block上传到哪几个DataNode服务器上. 4)NameNode返回3个DataNode节点,分别为dn1.dn2.dn3. 5)客户端通过FSDataOutputStream模块请求dn1上传数据,dn

Hadoop_08_客户端向HDFS读写(上传)数据流程

1.HDFS的工作机制: HDFS集群分为两大角色:NameNode.DataNode (Secondary Namenode) NameNode负责管理整个文件系统的元数据 DataNode 负责管理用户的文件数据块(只管接收保存,不负责切片) 文件会按照固定的大小(blocksize)128M切成若干块后分布式存储在若干台datanode上 每一个文件块可以有多个副本,并存放在不同的datanode上 Datanode会定期向Namenode汇报自身所保存的文件block信息,而nameno