HDFS源码分析(六)-----租约

前言

在文章开始,先讲个大家都经历过的事--去图书馆借书,当然,喜欢阅读的朋友也许和我一样比较喜欢借书阅读,借书阅读方便是方便,但是唯一不好的地方在于他又期限,就是deadlline,之前在我们学校有规定,如果超期为归还的书不允许借阅另外的书籍,所以要想使自己能接到新的书,就必须先归怀超期的书籍。当然这个经历本身再寻常不过了,但是我想表达的是在HDFS分布式文件系统中的租约机制与此过程有着极强的吻合性,后面的归还书籍相当于租约恢复的操作,下面详细介绍一下什么是租约。

租约以及租约相关类

租约可简单理解为在短期时间内对于租约持有者也就是客户端一定的权限,例如写文件的凭证。在每次HDFS中进行块的添加,删除操作时候,都会进行租约的核查和更新,以此维护各个文件操作情况。一下列出租约的相关类:

1.LeaseManager--租约管理类,可以理解为是一个租约大管家,里面维护了多种映射关系的租约集合列表。

2.LeaseManager.Lease--租约实体类,就是租约具体的表现形式类。在下面会详细介绍,此类中的变量和方法。

3.FSNamesystem--名字系统类,因为在这个大杂烩的大类中会用到租约相关的方法,也就加入进来。

Lease

首先从小类开始分析,也就是租约类Lease,他是一个内部类,存在于LeaseManager中。对于租约,首先有明白这样一个概念,租约是凭证,对客户端写操作文件的一种凭证,首先肯定得包含租约持有者变量,其次有租约记录的操作文件列表,当然租约还需要有时间,来记录租约超时的情况,所以类的变量结构如下

/************************************************************
   * A Lease governs all the locks held by a single client.
   * For each client there‘s a corresponding lease, whose
   * timestamp is updated when the client periodically
   * checks in.  If the client dies and allows its lease to
   * expire, all the corresponding locks can be released.
   *************************************************************/
   //每条租约记录信息,只能被单一的客户端占有
  class Lease implements Comparable<Lease> {
    //租约信息客户持有者
    private final String holder;
    //租约最后更新时间
    private long lastUpdate;
    //此租约内所打开的文件,维护一个客户端打开的所有文件
    private final Collection<String> paths = new TreeSet<String>();

    /** Only LeaseManager object can create a lease */
    private Lease(String holder) {
      this.holder = holder;
      renew();
    }
.....

在这里,Lease类将客户端打开的所有文件维护在了paths类中,然后通过租约持有者的名字进行初始构造函数的构造。然后注意这里有一个renew()方法,他是做租约时间更新的

/** Only LeaseManager object can renew a lease */
    //根据租约最后的检测时间
    private void renew() {
      this.lastUpdate = FSNamesystem.now();
    }

OK,租约类暂时先了解到这里,跳到下一个租约管理者类LeaseManager.

LeaseManager

身为一个管理者,内部变量肯定会稍稍多一些

/**
 * LeaseManager does the lease housekeeping for writing on files.
 * This class also provides useful static methods for lease recovery.
 *
 * Lease Recovery Algorithm
 * 1) Namenode retrieves lease information
 * 2) For each file f in the lease, consider the last block b of f
 * 2.1) Get the datanodes which contains b
 * 2.2) Assign one of the datanodes as the primary datanode p

 * 2.3) p obtains a new generation stamp form the namenode
 * 2.4) p get the block info from each datanode
 * 2.5) p computes the minimum block length
 * 2.6) p updates the datanodes, which have a valid generation stamp,
 *      with the new generation stamp and the minimum block length
 * 2.7) p acknowledges the namenode the update results

 * 2.8) Namenode updates the BlockInfo
 * 2.9) Namenode removes f from the lease
 *      and removes the lease once all files have been removed
 * 2.10) Namenode commit changes to edit log
 * 租约管理器,包含了与文件租约相关的许多方法
 */
public class LeaseManager {
  public static final Log LOG = LogFactory.getLog(LeaseManager.class);

  private final FSNamesystem fsnamesystem;

  //租约软超时时间
  private long softLimit = FSConstants.LEASE_SOFTLIMIT_PERIOD;
  //租约硬超时时间
  private long hardLimit = FSConstants.LEASE_HARDLIMIT_PERIOD;

  //
  // Used for handling lock-leases
  // Mapping: leaseHolder -> Lease
  //租约持有者到租约的映射图,保存在treeMap图中
  private SortedMap<String, Lease> leases = new TreeMap<String, Lease>();
  // Set of: Lease
  //全部租约图
  private SortedSet<Lease> sortedLeases = new TreeSet<Lease>();

  //
  // Map path names to leases. It is protected by the sortedLeases lock.
  // The map stores pathnames in lexicographical order.
  //路径租约图映射关系
  private SortedMap<String, Lease> sortedLeasesByPath = new TreeMap<String, Lease>();
.....

从上往下看,首先是2个超时时间,软超时时间和硬超时时间,这2个超时时间分别运用在了不同的租约场景检测环境中,在后面会做分析。其次,管理者类在这里维护了3种租约映射关系对

1.租约持有者到所属租约

2.所有租约集合类

3.文件路径就是打开文件到租约的映射集合

初步分析,作者这么设计的目的是为了方便快速的找出目标租约,以便进行后续操作。这里用到了SortedMap也是为了超找的快速。毕竟如此庞大的分布式系统,租约记录将会非常多,因为实时的操作文件数数目也一定是非常多的。OK,下面再回到类,看看几个与租约操作相关的几个典型方法

/** @return the lease containing src */
  //根据路径获取租约
  public Lease getLeaseByPath(String src) {return sortedLeasesByPath.get(src);}

从其中一个集合中直接获取,这个很好理解。下面是一个添加新的租约记录方法

/**
   * Adds (or re-adds) the lease for the specified file.
   * 添加指定文件的租约信息
   */
  synchronized Lease addLease(String holder, String src) {
    //根据用户名获取其租约
    Lease lease = getLease(holder);
    if (lease == null) {
      //如果租约为空
      lease = new Lease(holder);
      //加入租约集合中
      leases.put(holder, lease);
      sortedLeases.add(lease);
    } else {
      //如果存在此用户的租约,则进行租约更新
      renewLease(lease);
    }
    //加入一条新的路径到租约的映射信息
    sortedLeasesByPath.put(src, lease);
    //在此租约路径映射信息中加入新路径
    lease.paths.add(src);
    return lease;
  }

在加入新的租约记录时,要同时同步相应集合的数据。对应的租约移除方法

/**
   * Remove the specified lease and src.
   * 移除值指定路径以及租约
   */
  synchronized void removeLease(Lease lease, String src) {
    //移动掉指定路径的映射信息
    sortedLeasesByPath.remove(src);
    //租约内部移除此路径
    if (!lease.removePath(src)) {
      LOG.error(src + " not found in lease.paths (=" + lease.paths + ")");
    }

    if (!lease.hasPath()) {
      //根据租约持有者移除指定租约
      leases.remove(lease.holder);
      if (!sortedLeases.remove(lease)) {
        LOG.error(lease + " not found in sortedLeases");
      }
    }
  }

实际租约样例

下面通过实际的操作文件方法,看看租约在这个过程中发挥的作用。比如在FSNamesystem的一个打开文件操作

/**
   * Create a new file entry in the namespace.
   *
   * @see ClientProtocol#create(String, FsPermission, String, boolean, short, long)
   *
   * @throws IOException if file name is invalid
   *         {@link FSDirectory#isValidToCreate(String)}.
   * 命名系统打开一个新的文件
   */
  void startFile(String src, PermissionStatus permissions,
                 String holder, String clientMachine,
                 boolean overwrite, boolean createParent, short replication, long blockSize
                ) throws IOException {
    //调用startFileInternal方法
    startFileInternal(src, permissions, holder, clientMachine, overwrite, false,
                      createParent, replication, blockSize);
    getEditLog().logSync();
    if (auditLog.isInfoEnabled() && isExternalInvocation()) {
      final HdfsFileStatus stat = dir.getFileInfo(src);
      logAuditEvent(UserGroupInformation.getCurrentUser(),
                    Server.getRemoteIp(),
                    "create", src, null, stat);
    }
  }

继续追踪

private synchronized void startFileInternal(String src,
                                              PermissionStatus permissions,
                                              String holder,
                                              String clientMachine,
                                              boolean overwrite,
                                              boolean append,
                                              boolean createParent,
                                              short replication,
                                              long blockSize
                                              ) throws IOException {
    if (NameNode.stateChangeLog.isDebugEnabled()) {
      NameNode.stateChangeLog.debug("DIR* NameSystem.startFile: src=" + src
          + ", holder=" + holder
          + ", clientMachine=" + clientMachine
          + ", createParent=" + createParent
          + ", replication=" + replication
          + ", overwrite=" + overwrite
          + ", append=" + append);
    }

    if (isInSafeMode())
      throw new SafeModeException("Cannot create file" + src, safeMode);
    if (!DFSUtil.isValidName(src)) {
      throw new IOException("Invalid file name: " + src);
    }
.....

    if (!createParent) {
      verifyParentDir(src);
    }

    try {
      INode myFile = dir.getFileINode(src);
      //在这里进行租约的恢复操作
      recoverLeaseInternal(myFile, src, holder, clientMachine, false);
     ....

因为重新操作了此文件,所以要进行租约的恢复操作

//租约恢复操作
  private void recoverLeaseInternal(INode fileInode,
      String src, String holder, String clientMachine, boolean force)
  throws IOException {
    if (fileInode != null && fileInode.isUnderConstruction()) {
      INodeFileUnderConstruction pendingFile = (INodeFileUnderConstruction) fileInode;
      //
      // If the file is under construction , then it must be in our
      // leases. Find the appropriate lease record.
      //
      //根据客户端名称,取出其租约
      Lease lease = leaseManager.getLease(holder);
      //
      // We found the lease for this file. And surprisingly the original
      // holder is trying to recreate this file. This should never occur.
      //
      if (!force && lease != null) {
        //如果租约记录中已经存在此文件路径,不允许重复创建记录操作
        Lease leaseFile = leaseManager.getLeaseByPath(src);
        if (leaseFile != null && leaseFile.equals(lease)) {
          throw new AlreadyBeingCreatedException(
                    "failed to create file " + src + " for " + holder +
                    " on client " + clientMachine +
                    " because current leaseholder is trying to recreate file.");
        }
      }
      //
      // Find the original holder.
      //取出客户端的租约记录
      lease = leaseManager.getLease(pendingFile.clientName);
      if (lease == null) {
        throw new AlreadyBeingCreatedException(
                    "failed to create file " + src + " for " + holder +
                    " on client " + clientMachine +
                    " because pendingCreates is non-null but no leases found.");
      }
      if (force) {
        // close now: no need to wait for soft lease expiration and
        // close only the file src
        LOG.info("recoverLease: recover lease " + lease + ", src=" + src +
                 " from client " + pendingFile.clientName);
        //如果设置了强制执行参数,直接进行租约恢复操作
        internalReleaseLeaseOne(lease, src);
      } else {
        //
        // If the original holder has not renewed in the last SOFTLIMIT
        // period, then start lease recovery.
        //
        //如果没有设置,判断是否软超时,来进行租约恢复
        if (lease.expiredSoftLimit()) {
          LOG.info("startFile: recover lease " + lease + ", src=" + src +
              " from client " + pendingFile.clientName);
          internalReleaseLease(lease, src);
        }
        throw new AlreadyBeingCreatedException(
            "failed to create file " + src + " for " + holder +
            " on client " + clientMachine +
            ", because this file is already being created by " +
            pendingFile.getClientName() +
            " on " + pendingFile.getClientMachine());
      }
    }

  }

这里就用到了软超时时间。继续调用恢复租约操作

/**
   * This is invoked when a lease expires. On lease expiry,
   * all the files that were written from that dfsclient should be
   * recovered.
   * 进行租约恢复操作
   */
  void internalReleaseLease(Lease lease, String src) throws IOException {
    if (lease.hasPath()) {
      // make a copy of the paths because internalReleaseLeaseOne removes
      // pathnames from the lease record.
      String[] leasePaths = new String[lease.getPaths().size()];
      lease.getPaths().toArray(leasePaths);
      for (String p: leasePaths) {
        internalReleaseLeaseOne(lease, p);
      }
    } else {
      internalReleaseLeaseOne(lease, src);
    }
  }

根据租约维护的打开文件列表一条条的恢复

/**
   * Move a file that is being written to be immutable.
   * @param src The filename
   * @param lease The lease for the client creating the file
   */
  void internalReleaseLeaseOne(Lease lease, String src) throws IOException {
    assert Thread.holdsLock(this);

    LOG.info("Recovering lease=" + lease + ", src=" + src);

    INodeFile iFile = dir.getFileINode(src);
    if (iFile == null) {
      final String message = "DIR* NameSystem.internalReleaseCreate: "
        + "attempt to release a create lock on "
        + src + " file does not exist.";
      NameNode.stateChangeLog.warn(message);
      throw new IOException(message);
    }
    if (!iFile.isUnderConstruction()) {
      final String message = "DIR* NameSystem.internalReleaseCreate: "
        + "attempt to release a create lock on "
        + src + " but file is already closed.";
      NameNode.stateChangeLog.warn(message);
      throw new IOException(message);
    }

    INodeFileUnderConstruction pendingFile = (INodeFileUnderConstruction) iFile;
    .....
    // start lease recovery of the last block for this file.
    pendingFile.assignPrimaryDatanode();
    //在末尾进行租约的重分配
    Lease reassignedLease = reassignLease(
      lease, src, HdfsConstants.NN_RECOVERY_LEASEHOLDER, pendingFile);
    leaseManager.renewLease(reassignedLease);
  }

进行租约重分配,重分配操作很简单,就是先移除老租约,再添加新的租约,然后更新一下时间

/**
   * Reassign lease for file src to the new holder.
   * 租约重分配方法,等价于先移除后添加的方法
   */
  synchronized Lease reassignLease(Lease lease, String src, String newHolder) {
    assert newHolder != null : "new lease holder is null";
    if (lease != null) {
      removeLease(lease, src);
    }
    return addLease(newHolder, src);
  }

synchronized void renewLease(Lease lease) {
    if (lease != null) {
      //首先进行列表租约移除
      sortedLeases.remove(lease);
      //更新时间
      lease.renew();
      //再进行添加
      sortedLeases.add(lease);
    }
  }

当你要具体操作block块的时候,还会经历租约检测工作,比如下面abandon块操作的时候

/**
   * The client would like to let go of the given block
   */
  public synchronized boolean abandonBlock(Block b, String src, String holder
      ) throws IOException {
    //
    // Remove the block from the pending creates list
    //
    NameNode.stateChangeLog.debug("BLOCK* NameSystem.abandonBlock: "
                                  +b+"of file "+src);
    if (isInSafeMode()) {
      throw new SafeModeException("Cannot abandon block " + b +
                                  " for fle" + src, safeMode);
    }
    //移除块操作时进行租约检查,如果出现不符号要求的时候会抛异常
    INodeFileUnderConstruction file = checkLease(src, holder);
    dir.removeBlock(src, file, b);
    NameNode.stateChangeLog.debug("BLOCK* NameSystem.abandonBlock: "
                                    + b
                                    + " is removed from pendingCreates");
    return true;
  }
// make sure that we still have the lease on this file.
  private INodeFileUnderConstruction checkLease(String src, String holder)
                                                      throws IOException {
    INodeFile file = dir.getFileINode(src);
    //继续调用同名方法
    checkLease(src, holder, file);
    return (INodeFileUnderConstruction)file;
  }

然后是下面的核心检查方法

//下面是租约检查的核心逻辑方法
  private void checkLease(String src, String holder, INode file)
                                                     throws IOException {
    //如果正在操作的文件不存在,抛异常
    if (file == null || file.isDirectory()) {
      Lease lease = leaseManager.getLease(holder);
      throw new LeaseExpiredException("No lease on " + src +
                                      " File does not exist. " +
                                      (lease != null ? lease.toString() :
                                       "Holder " + holder +
                                       " does not have any open files."));
    }

    //如果文件没有被打开,说明一定没有对应的租约记录存在,也抛异常
    if (!file.isUnderConstruction()) {
      Lease lease = leaseManager.getLease(holder);
      throw new LeaseExpiredException("No lease on " + src +
                                      " File is not open for writing. " +
                                      (lease != null ? lease.toString() :
                                       "Holder " + holder +
                                       " does not have any open files."));
    }

    INodeFileUnderConstruction pendingFile = (INodeFileUnderConstruction)file;
    //判断文件所有者和客户端租约持有者是否一致
    if (holder != null && !pendingFile.getClientName().equals(holder)) {
      throw new LeaseExpiredException("Lease mismatch on " + src + " owned by "
          + pendingFile.getClientName() + " but is accessed by " + holder);
    }
  }

主要进行了租约持有者与文件所属者名称进行检查,还有文件打开状态的判断。通过抛出异常的方式阻止用户的操作。

租约监控

在租约管理器内部也有租约线程操作

//租约过期监控检测线程
  class Monitor implements Runnable {
    final String name = getClass().getSimpleName();

    /** Check leases periodically. */
    public void run() {
      for(; fsnamesystem.isRunning(); ) {
        synchronized(fsnamesystem) {
          //执行checkLeases方法
          checkLeases();
        }

        try {
          Thread.sleep(2000);
        } catch(InterruptedException ie) {
          if (LOG.isDebugEnabled()) {
            LOG.debug(name + " is interrupted", ie);
          }
        }
      }
    }
  }

会进行定期的租约检查操作,并对超时租约进行租约恢复操作

  /** Check the leases beginning from the oldest. */
  synchronized void checkLeases() {
    for(; sortedLeases.size() > 0; ) {
      //获取距离目前最晚的租约时间开始
      final Lease oldest = sortedLeases.first();
      //如果最晚的时间是否超过硬超时时间
      if (!oldest.expiredHardLimit()) {
        return;
      }

      //到了这步,说明已经发生租约超时
      LOG.info("Lease " + oldest + " has expired hard limit");

      final List<String> removing = new ArrayList<String>();
      // need to create a copy of the oldest lease paths, becuase
      // internalReleaseLease() removes paths corresponding to empty files,
      // i.e. it needs to modify the collection being iterated over
      // causing ConcurrentModificationException
      //获取此租约管理的文件路径
      String[] leasePaths = new String[oldest.getPaths().size()];
      oldest.getPaths().toArray(leasePaths);
      for(String p : leasePaths) {
        try {
          //进行租约释放
          fsnamesystem.internalReleaseLeaseOne(oldest, p);
        } catch (IOException e) {
          // 如果是租约释放失败的情况加入移除列表中
          LOG.error("Cannot release the path "+p+" in the lease "+oldest, e);
          removing.add(p);
        }
      }

      //进行移除租约记录的remove操作
      for(String p : removing) {
        removeLease(oldest, p);
      }
    }
  }

每次从获取最晚的租约记录检测。

全部代码的分析请点击链接https://github.com/linyiqun/hadoop-hdfs,后续将会继续更新HDFS其他方面的代码分析。

参考文献

《Hadoop技术内部–HDFS结构设计与实现原理》.蔡斌等

版权声明:本文为博主原创文章,未经博主允许不得转载。

时间: 2024-12-14 02:18:34

HDFS源码分析(六)-----租约的相关文章

Nouveau源码分析(六):NVIDIA设备初始化之nouveau_drm_load (3)

Nouveau源码分析(六) 上一篇中我们暂时忽略了两个函数,第一个是用于创建nvif_device对应的nouveau_object的ctor函数: // /drivers/gpu/drm/nouveau/core/engine/device/base.c 488 static struct nouveau_ofuncs 489 nouveau_devobj_ofuncs = { 490 .ctor = nouveau_devobj_ctor, 491 .dtor = nouveau_devo

Hadoop HDFS源码分析 关于数据块的类

Hadoop HDFS源码分析 关于数据块的类 1.BlocksMap 官方代码中的注释为: /** * This class maintains the map from a block to its metadata. * block's metadata currently includes blockCollection it belongs to and * the datanodes that store the block. */ BlocksMap数据块映射,管理名字节点上的数据

HDFS源码分析之LightWeightGSet

LightWeightGSet是名字节点NameNode在内存中存储全部数据块信息的类BlocksMap需要的一个重要数据结构,它是一个占用较低内存的集合的实现,它使用一个数组array存储元素,使用linked lists来解决冲突.它没有实现重新哈希分区,所以,内部的array不会改变大小.这个类不支持null元素,并且不是线程安全的.它在BlocksMap中的初始化如下: this.blocks = new LightWeightGSet<Block, BlockInfo>(capaci

HDFS源码分析EditLog之读取操作符

在<HDFS源码分析EditLog之获取编辑日志输入流>一文中,我们详细了解了如何获取编辑日志输入流EditLogInputStream.在我们得到编辑日志输入流后,是不是就该从输入流中获取数据来处理呢?答案是显而易见的!在<HDFS源码分析之EditLogTailer>一文中,我们在讲编辑日志追踪同步时,也讲到了如下两个连续的处理流程: 4.从编辑日志editLog中获取编辑日志输入流集合streams,获取的输入流为最新事务ID加1之后的数据 5.调用文件系统镜像FSImage

HDFS源码分析数据块校验之DataBlockScanner

DataBlockScanner是运行在数据节点DataNode上的一个后台线程.它为所有的块池管理块扫描.针对每个块池,一个BlockPoolSliceScanner对象将会被创建,其运行在一个单独的线程中,为该块池扫描.校验数据块.当一个BPOfferService服务变成活跃或死亡状态,该类中的blockPoolScannerMap将会更新. 我们先看下DataBlockScanner的成员变量,如下: // 所属数据节点DataNode实例 private final DataNode

HDFS源码分析(一)-----INode文件节点

前言 在linux文件系统中,i-node节点一直是一个非常重要的设计,同样在HDFS中,也存在这样的一个类似的角色,不过他是一个全新的类,INode.class,后面的目录类等等都是他的子类.最近学习了部分HDFS的源码结构,就好好理一理这方面的知识,帮助大家更好的从深层次了解Hadoop分布式系统文件. HDFS文件相关的类设计 在HDFS中与文件相关的类主要有这么几个 1.INode--这个就是最底层的一个类,抽象类,提炼一些文件目录共有的属性. 2.INodeFile--文件节点类,继承

Hadoop-06-RPC机制以及HDFS源码分析

1.RPC机制 1.1.概述 RPC--远程过程调用协议,它是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议.RPC协议假定某些传输协议的存在,如TCP或UDP,为通信程序之间携带信息数据.在OSI网络通信模型中,RPC跨越了传输层和应用层.RPC使得开发包括网络分布式多程序在内的应用程序更加容易. RPC采用客户机/服务器模式.请求程序就是一个客户机,而服务提供程序就是一个服务器.首先,客户机调用进程发送一个有进程参数的调用信息到服务进程,然后等待应答信息.在服务器端,

Hbase写入hdfs源码分析

版权声明:本文由熊训德原创文章,转载请注明出处: 文章原文链接:https://www.qcloud.com/community/article/258 来源:腾云阁 https://www.qcloud.com/community 本文档从源码角度分析了,hbase作为dfs client写入hdfs的hadoop sequence文件最终刷盘落地的过程.之前在<wal线程模型源码分析>中描述wal的写过程时说过会写入hadoop sequence文件,hbase为了保证数据的安全性,一般都

Vue 2.0 深入源码分析(六) 基础篇 computed 属性详解

用法 模板内的表达式非常便利,但是设计它们的初衷是用于简单运算的.在模板中放入太多的逻辑会让模板过重且难以维护,比如: <div id="example">{{ message.split('').reverse().join('') }}</div> <script> var app = new Vue({ el:'#example', data:{message:'hello world'} }) </script> 这样模板不再是简