/************************************************************ * A Lease governs all the locks held by a single client. * For each client there‘s a corresponding lease, whose * timestamp is updated when the client periodically * checks in. If the client dies and allows its lease to * expire, all the corresponding locks can be released. *************************************************************/ //每条租约记录信息,只能被单一的客户端占有 class Lease implements Comparable<Lease> { //租约信息客户持有者 private final String holder; //租约最后更新时间 private long lastUpdate; //此租约内所打开的文件,维护一个客户端打开的所有文件 private final Collection<String> paths = new TreeSet<String>(); /** Only LeaseManager object can create a lease */ private Lease(String holder) { this.holder = holder; renew(); } .....
/** Only LeaseManager object can renew a lease */ //根据租约最后的检测时间 private void renew() { this.lastUpdate = FSNamesystem.now(); }
/** * LeaseManager does the lease housekeeping for writing on files. * This class also provides useful static methods for lease recovery. * * Lease Recovery Algorithm * 1) Namenode retrieves lease information * 2) For each file f in the lease, consider the last block b of f * 2.1) Get the datanodes which contains b * 2.2) Assign one of the datanodes as the primary datanode p * 2.3) p obtains a new generation stamp form the namenode * 2.4) p get the block info from each datanode * 2.5) p computes the minimum block length * 2.6) p updates the datanodes, which have a valid generation stamp, * with the new generation stamp and the minimum block length * 2.7) p acknowledges the namenode the update results * 2.8) Namenode updates the BlockInfo * 2.9) Namenode removes f from the lease * and removes the lease once all files have been removed * 2.10) Namenode commit changes to edit log * 租约管理器,包含了与文件租约相关的许多方法 */ public class LeaseManager { public static final Log LOG = LogFactory.getLog(LeaseManager.class); private final FSNamesystem fsnamesystem; //租约软超时时间 private long softLimit = FSConstants.LEASE_SOFTLIMIT_PERIOD; //租约硬超时时间 private long hardLimit = FSConstants.LEASE_HARDLIMIT_PERIOD; // // Used for handling lock-leases // Mapping: leaseHolder -> Lease //租约持有者到租约的映射图,保存在treeMap图中 private SortedMap<String, Lease> leases = new TreeMap<String, Lease>(); // Set of: Lease //全部租约图 private SortedSet<Lease> sortedLeases = new TreeSet<Lease>(); // // Map path names to leases. It is protected by the sortedLeases lock. // The map stores pathnames in lexicographical order. //路径租约图映射关系 private SortedMap<String, Lease> sortedLeasesByPath = new TreeMap<String, Lease>(); .....
/** @return the lease containing src */ //根据路径获取租约 public Lease getLeaseByPath(String src) {return sortedLeasesByPath.get(src);}
/** * Adds (or re-adds) the lease for the specified file. * 添加指定文件的租约信息 */ synchronized Lease addLease(String holder, String src) { //根据用户名获取其租约 Lease lease = getLease(holder); if (lease == null) { //如果租约为空 lease = new Lease(holder); //加入租约集合中 leases.put(holder, lease); sortedLeases.add(lease); } else { //如果存在此用户的租约,则进行租约更新 renewLease(lease); } //加入一条新的路径到租约的映射信息 sortedLeasesByPath.put(src, lease); //在此租约路径映射信息中加入新路径 lease.paths.add(src); return lease; }
/** * Remove the specified lease and src. * 移除值指定路径以及租约 */ synchronized void removeLease(Lease lease, String src) { //移动掉指定路径的映射信息 sortedLeasesByPath.remove(src); //租约内部移除此路径 if (!lease.removePath(src)) { LOG.error(src + " not found in lease.paths (=" + lease.paths + ")"); } if (!lease.hasPath()) { //根据租约持有者移除指定租约 leases.remove(lease.holder); if (!sortedLeases.remove(lease)) { LOG.error(lease + " not found in sortedLeases"); } } }
/** * Create a new file entry in the namespace. * * @see ClientProtocol#create(String, FsPermission, String, boolean, short, long) * * @throws IOException if file name is invalid * {@link FSDirectory#isValidToCreate(String)}. * 命名系统打开一个新的文件 */ void startFile(String src, PermissionStatus permissions, String holder, String clientMachine, boolean overwrite, boolean createParent, short replication, long blockSize ) throws IOException { //调用startFileInternal方法 startFileInternal(src, permissions, holder, clientMachine, overwrite, false, createParent, replication, blockSize); getEditLog().logSync(); if (auditLog.isInfoEnabled() && isExternalInvocation()) { final HdfsFileStatus stat = dir.getFileInfo(src); logAuditEvent(UserGroupInformation.getCurrentUser(), Server.getRemoteIp(), "create", src, null, stat); } }
private synchronized void startFileInternal(String src, PermissionStatus permissions, String holder, String clientMachine, boolean overwrite, boolean append, boolean createParent, short replication, long blockSize ) throws IOException { if (NameNode.stateChangeLog.isDebugEnabled()) { NameNode.stateChangeLog.debug("DIR* NameSystem.startFile: src=" + src + ", holder=" + holder + ", clientMachine=" + clientMachine + ", createParent=" + createParent + ", replication=" + replication + ", overwrite=" + overwrite + ", append=" + append); } if (isInSafeMode()) throw new SafeModeException("Cannot create file" + src, safeMode); if (!DFSUtil.isValidName(src)) { throw new IOException("Invalid file name: " + src); } ..... if (!createParent) { verifyParentDir(src); } try { INode myFile = dir.getFileINode(src); //在这里进行租约的恢复操作 recoverLeaseInternal(myFile, src, holder, clientMachine, false); ....
//租约恢复操作 private void recoverLeaseInternal(INode fileInode, String src, String holder, String clientMachine, boolean force) throws IOException { if (fileInode != null && fileInode.isUnderConstruction()) { INodeFileUnderConstruction pendingFile = (INodeFileUnderConstruction) fileInode; // // If the file is under construction , then it must be in our // leases. Find the appropriate lease record. // //根据客户端名称,取出其租约 Lease lease = leaseManager.getLease(holder); // // We found the lease for this file. And surprisingly the original // holder is trying to recreate this file. This should never occur. // if (!force && lease != null) { //如果租约记录中已经存在此文件路径,不允许重复创建记录操作 Lease leaseFile = leaseManager.getLeaseByPath(src); if (leaseFile != null && leaseFile.equals(lease)) { throw new AlreadyBeingCreatedException( "failed to create file " + src + " for " + holder + " on client " + clientMachine + " because current leaseholder is trying to recreate file."); } } // // Find the original holder. //取出客户端的租约记录 lease = leaseManager.getLease(pendingFile.clientName); if (lease == null) { throw new AlreadyBeingCreatedException( "failed to create file " + src + " for " + holder + " on client " + clientMachine + " because pendingCreates is non-null but no leases found."); } if (force) { // close now: no need to wait for soft lease expiration and // close only the file src LOG.info("recoverLease: recover lease " + lease + ", src=" + src + " from client " + pendingFile.clientName); //如果设置了强制执行参数,直接进行租约恢复操作 internalReleaseLeaseOne(lease, src); } else { // // If the original holder has not renewed in the last SOFTLIMIT // period, then start lease recovery. // //如果没有设置,判断是否软超时,来进行租约恢复 if (lease.expiredSoftLimit()) { LOG.info("startFile: recover lease " + lease + ", src=" + src + " from client " + pendingFile.clientName); internalReleaseLease(lease, src); } throw new AlreadyBeingCreatedException( "failed to create file " + src + " for " + holder + " on client " + clientMachine + ", because this file is already being created by " + pendingFile.getClientName() + " on " + pendingFile.getClientMachine()); } } }
/** * This is invoked when a lease expires. On lease expiry, * all the files that were written from that dfsclient should be * recovered. * 进行租约恢复操作 */ void internalReleaseLease(Lease lease, String src) throws IOException { if (lease.hasPath()) { // make a copy of the paths because internalReleaseLeaseOne removes // pathnames from the lease record. String[] leasePaths = new String[lease.getPaths().size()]; lease.getPaths().toArray(leasePaths); for (String p: leasePaths) { internalReleaseLeaseOne(lease, p); } } else { internalReleaseLeaseOne(lease, src); } }
/** * Move a file that is being written to be immutable. * @param src The filename * @param lease The lease for the client creating the file */ void internalReleaseLeaseOne(Lease lease, String src) throws IOException { assert Thread.holdsLock(this); LOG.info("Recovering lease=" + lease + ", src=" + src); INodeFile iFile = dir.getFileINode(src); if (iFile == null) { final String message = "DIR* NameSystem.internalReleaseCreate: " + "attempt to release a create lock on " + src + " file does not exist."; NameNode.stateChangeLog.warn(message); throw new IOException(message); } if (!iFile.isUnderConstruction()) { final String message = "DIR* NameSystem.internalReleaseCreate: " + "attempt to release a create lock on " + src + " but file is already closed."; NameNode.stateChangeLog.warn(message); throw new IOException(message); } INodeFileUnderConstruction pendingFile = (INodeFileUnderConstruction) iFile; ..... // start lease recovery of the last block for this file. pendingFile.assignPrimaryDatanode(); //在末尾进行租约的重分配 Lease reassignedLease = reassignLease( lease, src, HdfsConstants.NN_RECOVERY_LEASEHOLDER, pendingFile); leaseManager.renewLease(reassignedLease); }
/** * Reassign lease for file src to the new holder. * 租约重分配方法,等价于先移除后添加的方法 */ synchronized Lease reassignLease(Lease lease, String src, String newHolder) { assert newHolder != null : "new lease holder is null"; if (lease != null) { removeLease(lease, src); } return addLease(newHolder, src); } synchronized void renewLease(Lease lease) { if (lease != null) { //首先进行列表租约移除 sortedLeases.remove(lease); //更新时间 lease.renew(); //再进行添加 sortedLeases.add(lease); } }
/** * The client would like to let go of the given block */ public synchronized boolean abandonBlock(Block b, String src, String holder ) throws IOException { // // Remove the block from the pending creates list // NameNode.stateChangeLog.debug("BLOCK* NameSystem.abandonBlock: " +b+"of file "+src); if (isInSafeMode()) { throw new SafeModeException("Cannot abandon block " + b + " for fle" + src, safeMode); } //移除块操作时进行租约检查,如果出现不符号要求的时候会抛异常 INodeFileUnderConstruction file = checkLease(src, holder); dir.removeBlock(src, file, b); NameNode.stateChangeLog.debug("BLOCK* NameSystem.abandonBlock: " + b + " is removed from pendingCreates"); return true; }
// make sure that we still have the lease on this file. private INodeFileUnderConstruction checkLease(String src, String holder) throws IOException { INodeFile file = dir.getFileINode(src); //继续调用同名方法 checkLease(src, holder, file); return (INodeFileUnderConstruction)file; }
//下面是租约检查的核心逻辑方法 private void checkLease(String src, String holder, INode file) throws IOException { //如果正在操作的文件不存在,抛异常 if (file == null || file.isDirectory()) { Lease lease = leaseManager.getLease(holder); throw new LeaseExpiredException("No lease on " + src + " File does not exist. " + (lease != null ? lease.toString() : "Holder " + holder + " does not have any open files.")); } //如果文件没有被打开,说明一定没有对应的租约记录存在,也抛异常 if (!file.isUnderConstruction()) { Lease lease = leaseManager.getLease(holder); throw new LeaseExpiredException("No lease on " + src + " File is not open for writing. " + (lease != null ? lease.toString() : "Holder " + holder + " does not have any open files.")); } INodeFileUnderConstruction pendingFile = (INodeFileUnderConstruction)file; //判断文件所有者和客户端租约持有者是否一致 if (holder != null && !pendingFile.getClientName().equals(holder)) { throw new LeaseExpiredException("Lease mismatch on " + src + " owned by " + pendingFile.getClientName() + " but is accessed by " + holder); } }
//租约过期监控检测线程 class Monitor implements Runnable { final String name = getClass().getSimpleName(); /** Check leases periodically. */ public void run() { for(; fsnamesystem.isRunning(); ) { synchronized(fsnamesystem) { //执行checkLeases方法 checkLeases(); } try { Thread.sleep(2000); } catch(InterruptedException ie) { if (LOG.isDebugEnabled()) { LOG.debug(name + " is interrupted", ie); } } } } }
/** Check the leases beginning from the oldest. */ synchronized void checkLeases() { for(; sortedLeases.size() > 0; ) { //获取距离目前最晚的租约时间开始 final Lease oldest = sortedLeases.first(); //如果最晚的时间是否超过硬超时时间 if (!oldest.expiredHardLimit()) { return; } //到了这步,说明已经发生租约超时 LOG.info("Lease " + oldest + " has expired hard limit"); final List<String> removing = new ArrayList<String>(); // need to create a copy of the oldest lease paths, becuase // internalReleaseLease() removes paths corresponding to empty files, // i.e. it needs to modify the collection being iterated over // causing ConcurrentModificationException //获取此租约管理的文件路径 String[] leasePaths = new String[oldest.getPaths().size()]; oldest.getPaths().toArray(leasePaths); for(String p : leasePaths) { try { //进行租约释放 fsnamesystem.internalReleaseLeaseOne(oldest, p); } catch (IOException e) { // 如果是租约释放失败的情况加入移除列表中 LOG.error("Cannot release the path "+p+" in the lease "+oldest, e); removing.add(p); } } //进行移除租约记录的remove操作 for(String p : removing) { removeLease(oldest, p); } } }