HDFS源码分析EditLog之读取操作符

《HDFS源码分析EditLog之获取编辑日志输入流》一文中,我们详细了解了如何获取编辑日志输入流EditLogInputStream。在我们得到编辑日志输入流后,是不是就该从输入流中获取数据来处理呢?答案是显而易见的!在《HDFS源码分析之EditLogTailer》一文中,我们在讲编辑日志追踪同步时,也讲到了如下两个连续的处理流程:

4、从编辑日志editLog中获取编辑日志输入流集合streams,获取的输入流为最新事务ID加1之后的数据

5、调用文件系统镜像FSImage实例image的loadEdits(),利用编辑日志输入流集合streams,加载编辑日志至目标namesystem中的文件系统镜像FSImage,并获得编辑日志加载的大小editsLoaded;

可见,我们在获得编辑日志输入流EditLogInputStream的集合streams后,就需要调用FSImage的loadEdits()方法,利用编辑日志输入流集合streams,加载编辑日志至目标namesystem中的文件系统镜像FSImage。而HDFS是如何从编辑日志输入流中读取数据的呢?本文,我们将进行详细的探究!

首先,在加载编辑日志的主要类FSEditLogLoader中,其核心方法loadEditRecords()中有如下一段代码:

      while (true) {
        try {
          FSEditLogOp op;
          try {

        	// 从编辑日志输入流in中读取操作符op
            op = in.readOp();

            // 如果操作符op为空,直接跳出循环,并返回
            if (op == null) {
              break;
            }
          } catch (Throwable e) {
            // ...省略部分代码
          }

          // ...省略部分代码

          try {
            // ...省略部分代码
            long inodeId = applyEditLogOp(op, fsDir, startOpt,
                in.getVersion(true), lastInodeId);
            if (lastInodeId < inodeId) {
              lastInodeId = inodeId;
            }
          } catch (RollingUpgradeOp.RollbackException e) {
            // ...省略部分代码
          } catch (Throwable e) {
            // ...省略部分代码
          }
          // ...省略部分代码
        } catch (RollingUpgradeOp.RollbackException e) {
          // ...省略部分代码
        } catch (MetaRecoveryContext.RequestStopException e) {
          // ...省略部分代码
        }
      }

它会从编辑日志输入流in中读取一个操作符op,然后调用applyEditLogOp()方法,将操作符作用于内存元数据FSNamesystem。那么问题来了,这个操作符如何从数据流中被读取并解析的呢?

接下来,我们就看下如何从编辑日志输出流EditLogInputStream中读取一个操作符,我们先看其readOp()方法,代码如下:

  /**
   * Read an operation from the stream
   * @return an operation from the stream or null if at end of stream
   * @throws IOException if there is an error reading from the stream
   */
  public FSEditLogOp readOp() throws IOException {
    FSEditLogOp ret;

    // 如果缓存的cachedOp不为null,返回缓存的cachedOp,并将其清空
    if (cachedOp != null) {
      ret = cachedOp;
      cachedOp = null;
      return ret;
    }

    // 如果缓存的cachedOp为null,调用nextOp()进行处理
    return nextOp();
  }

很简单,如果缓存的cachedOp不为null,返回缓存的cachedOp,并将其清空,如果缓存的cachedOp为null,则调用nextOp()进行处理。而EditLogInputStream中nextOp()是一个抽象方法,我们需要看其子类的实现方法,下面就以EditLogFileInputStream为例,看下其nextOp()方法:

  @Override
  protected FSEditLogOp nextOp() throws IOException {
    return nextOpImpl(false);
  }

继续追踪nextOpImpl()方法,代码如下:

  private FSEditLogOp nextOpImpl(boolean skipBrokenEdits) throws IOException {
    FSEditLogOp op = null;

    // 根据编辑日志文件输入流的状态判断:
    switch (state) {
    case UNINIT:// 如果为未初始化状态UNINIT
      try {
    	// 调用init()方法进行初始化
        init(true);
      } catch (Throwable e) {
        LOG.error("caught exception initializing " + this, e);
        if (skipBrokenEdits) {
          return null;
        }
        Throwables.propagateIfPossible(e, IOException.class);
      }

      // 检测编辑日志文件输入流状态,此时不应为UNINIT
      Preconditions.checkState(state != State.UNINIT);

      // 再次调用nextOpImpl()方法
      return nextOpImpl(skipBrokenEdits);
    case OPEN:// 如果为打开OPEN状态

      // 调用FSEditLogOp.Reader的readOp()方法,读取操作符
      op = reader.readOp(skipBrokenEdits);
      if ((op != null) && (op.hasTransactionId())) {
        long txId = op.getTransactionId();
        if ((txId >= lastTxId) &&
            (lastTxId != HdfsConstants.INVALID_TXID)) {
          //
          // Sometimes, the NameNode crashes while it's writing to the
          // edit log.  In that case, you can end up with an unfinalized edit log
          // which has some garbage at the end.
          // JournalManager#recoverUnfinalizedSegments will finalize these
          // unfinished edit logs, giving them a defined final transaction
          // ID.  Then they will be renamed, so that any subsequent
          // readers will have this information.
          //
          // Since there may be garbage at the end of these "cleaned up"
          // logs, we want to be sure to skip it here if we've read everything
          // we were supposed to read out of the stream.
          // So we force an EOF on all subsequent reads.
          //
          long skipAmt = log.length() - tracker.getPos();
          if (skipAmt > 0) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("skipping " + skipAmt + " bytes at the end " +
                  "of edit log  '" + getName() + "': reached txid " + txId +
                  " out of " + lastTxId);
            }
            tracker.clearLimit();
            IOUtils.skipFully(tracker, skipAmt);
          }
        }
      }
      break;
      case CLOSED: // 如果为关闭CLOSED状态,直接返回null
        break; // return null
    }
    return op;
  }

nextOpImpl()方法的大体处理逻辑如下:

根据编辑日志文件输入流的状态判断:

1、如果为未初始化状态UNINIT,调用init()方法进行初始化,然后检测编辑日志文件输入流状态,此时不应为UNINIT,最后再次调用nextOpImpl()方法;

2、如果为打开OPEN状态,调用FSEditLogOp.Reader的readOp()方法,读取操作符op;

3、如果为关闭CLOSED状态,直接返回null。

我们重点关注下FSEditLogOp.Reader的readOp()方法,代码如下:

    /**
     * Read an operation from the input stream.
     *
     * Note that the objects returned from this method may be re-used by future
     * calls to the same method.
     *
     * @param skipBrokenEdits    If true, attempt to skip over damaged parts of
     * the input stream, rather than throwing an IOException
     * @return the operation read from the stream, or null at the end of the
     *         file
     * @throws IOException on error.  This function should only throw an
     *         exception when skipBrokenEdits is false.
     */
    public FSEditLogOp readOp(boolean skipBrokenEdits) throws IOException {
      while (true) {
        try {

          // 调用decodeOp()方法
          return decodeOp();
        } catch (IOException e) {
          in.reset();
          if (!skipBrokenEdits) {
            throw e;
          }
        } catch (RuntimeException e) {
          // FSEditLogOp#decodeOp is not supposed to throw RuntimeException.
          // However, we handle it here for recovery mode, just to be more
          // robust.
          in.reset();
          if (!skipBrokenEdits) {
            throw e;
          }
        } catch (Throwable e) {
          in.reset();
          if (!skipBrokenEdits) {
            throw new IOException("got unexpected exception " +
                e.getMessage(), e);
          }
        }
        // Move ahead one byte and re-try the decode process.
        if (in.skip(1) < 1) {
          return null;
        }
      }
    }

继续追踪decodeOp()方法,代码如下:

    /**
     * Read an opcode from the input stream.
     * 从输入流中读取一个操作符code
     *
     * @return   the opcode, or null on EOF.
     *
     * If an exception is thrown, the stream's mark will be set to the first
     * problematic byte.  This usually means the beginning of the opcode.
     */
    private FSEditLogOp decodeOp() throws IOException {
      limiter.setLimit(maxOpSize);
      in.mark(maxOpSize);

      if (checksum != null) {
        checksum.reset();
      }

      byte opCodeByte;
      try {

    	// 从输入流in中读取一个byte,即opCodeByte
        opCodeByte = in.readByte();
      } catch (EOFException eof) {
        // EOF at an opcode boundary is expected.
        return null;
      }

      // 将byte类型的opCodeByte转换成FSEditLogOpCodes对象opCode
      FSEditLogOpCodes opCode = FSEditLogOpCodes.fromByte(opCodeByte);
      if (opCode == OP_INVALID) {
        verifyTerminator();
        return null;
      }

      // 根据FSEditLogOpCodes对象opCode从cache中获取FSEditLogOp对象op
      FSEditLogOp op = cache.get(opCode);
      if (op == null) {
        throw new IOException("Read invalid opcode " + opCode);
      }

      // 如果支持编辑日志长度,从输入流读入一个int,
      if (supportEditLogLength) {
        in.readInt();
      }

      if (NameNodeLayoutVersion.supports(
          LayoutVersion.Feature.STORED_TXIDS, logVersion)) {
        // Read the txid
    	// 如果支持事务ID,读入一个long,作为事务ID,并在FSEditLogOp实例op中设置事务ID
        op.setTransactionId(in.readLong());
      } else {
    	// 如果不支持事务ID,在FSEditLogOp实例op中设置事务ID为-12345
        op.setTransactionId(HdfsConstants.INVALID_TXID);
      }

      // 从输入流in中读入其他域,并设置入FSEditLogOp实例op
      op.readFields(in, logVersion);

      validateChecksum(in, checksum, op.txid);
      return op;
    }

decodeOp()方法的逻辑很简单:

1、从输入流in中读取一个byte,即opCodeByte,确定操作类型;

2、将byte类型的opCodeByte转换成FSEditLogOpCodes对象opCode;

3、根据FSEditLogOpCodes对象opCode从cache中获取FSEditLogOp对象op,这样我们就得到了操作符对象;

4、如果支持编辑日志长度,从输入流读入一个int;

5、如果支持事务ID,读入一个long,作为事务ID,并在FSEditLogOp实例op中设置事务ID,否则在FSEditLogOp实例op中设置事务ID为-12345;

6、调用操作符对象op的readFields()方法,从输入流in中读入其他域,并设置入FSEditLogOp实例op。

接下来,我们再看下操作符对象的readFields()方法,因为不同种类的操作符肯定包含不同的属性,所以它们的readFields()方法肯定也各不相同。下面,我们就以操作符AddCloseOp为例来分析,其readFields()方法如下:

    @Override
    void readFields(DataInputStream in, int logVersion)
        throws IOException {

      // 读取长度:如果支持读入长度,从输入流in读取一个int,赋值给length
      if (!NameNodeLayoutVersion.supports(
          LayoutVersion.Feature.EDITLOG_OP_OPTIMIZATION, logVersion)) {
        this.length = in.readInt();
      }

      // 读取节点ID:如果支持读入节点ID,从输入流in读取一个long,赋值给inodeId,否则inodeId默认为0
      if (NameNodeLayoutVersion.supports(
          LayoutVersion.Feature.ADD_INODE_ID, logVersion)) {
        this.inodeId = in.readLong();
      } else {
        // The inodeId should be updated when this editLogOp is applied
        this.inodeId = INodeId.GRANDFATHER_INODE_ID;
      }

      // 版本兼容性校验
      if ((-17 < logVersion && length != 4) ||
          (logVersion <= -17 && length != 5 && !NameNodeLayoutVersion.supports(
              LayoutVersion.Feature.EDITLOG_OP_OPTIMIZATION, logVersion))) {
        throw new IOException("Incorrect data format."  +
                              " logVersion is " + logVersion +
                              " but writables.length is " +
                              length + ". ");
      }

      // 读取路径:从输入流in读取一个String,赋值给path
      this.path = FSImageSerialization.readString(in);

      // 读取副本数、修改时间:如果支持读取副本数、修改时间,分别从输入流读取一个short、long,
      // 赋值给replication、mtime
      if (NameNodeLayoutVersion.supports(
          LayoutVersion.Feature.EDITLOG_OP_OPTIMIZATION, logVersion)) {
        this.replication = FSImageSerialization.readShort(in);
        this.mtime = FSImageSerialization.readLong(in);
      } else {
        this.replication = readShort(in);
        this.mtime = readLong(in);
      }

      // 读取访问时间:如果支持读取访问时间,从输入流读取一个long,赋值给atime,否则atime默认为0
      if (NameNodeLayoutVersion.supports(
          LayoutVersion.Feature.FILE_ACCESS_TIME, logVersion)) {
        if (NameNodeLayoutVersion.supports(
            LayoutVersion.Feature.EDITLOG_OP_OPTIMIZATION, logVersion)) {
          this.atime = FSImageSerialization.readLong(in);
        } else {
          this.atime = readLong(in);
        }
      } else {
        this.atime = 0;
      }

      // 读取数据块大小:如果支持读取数据块大小,从输入流读取一个long,赋值给blockSize
      if (NameNodeLayoutVersion.supports(
          LayoutVersion.Feature.EDITLOG_OP_OPTIMIZATION, logVersion)) {
        this.blockSize = FSImageSerialization.readLong(in);
      } else {
        this.blockSize = readLong(in);
      }

      // 调用readBlocks()方法读取数据块,赋值给数据块数组blocks
      this.blocks = readBlocks(in, logVersion);

      // 从输入流读入权限,赋值给permissions
      this.permissions = PermissionStatus.read(in);

      // 如果是ADD操作,需要额外处理客户端名称clientName、客户端机器clientMachine、覆盖写标志overwrite等属性
      if (this.opCode == OP_ADD) {
        aclEntries = AclEditLogUtil.read(in, logVersion);
        this.xAttrs = readXAttrsFromEditLog(in, logVersion);
        this.clientName = FSImageSerialization.readString(in);
        this.clientMachine = FSImageSerialization.readString(in);
        if (NameNodeLayoutVersion.supports(
            NameNodeLayoutVersion.Feature.CREATE_OVERWRITE, logVersion)) {
          this.overwrite = FSImageSerialization.readBoolean(in);
        } else {
          this.overwrite = false;
        }
        if (NameNodeLayoutVersion.supports(
            NameNodeLayoutVersion.Feature.BLOCK_STORAGE_POLICY, logVersion)) {
          this.storagePolicyId = FSImageSerialization.readByte(in);
        } else {
          this.storagePolicyId = BlockStoragePolicySuite.ID_UNSPECIFIED;
        }
        // read clientId and callId
        readRpcIds(in, logVersion);
      } else {
        this.clientName = "";
        this.clientMachine = "";
      }
    }

这个没有什么特别好讲的,依次读入操作符需要的,在输入流中依次存在的属性即可。

不过,我们仍然需要重点讲解下读入数据块的readBlocks()方法,代码如下:

    private static Block[] readBlocks(
        DataInputStream in,
        int logVersion) throws IOException {

      // 读取block数目numBlocks,占一个int
      int numBlocks = in.readInt();

      // 校验block数目numBlocks,应大于等于0,小于等于1024 * 1024 * 64
      if (numBlocks < 0) {
        throw new IOException("invalid negative number of blocks");
      } else if (numBlocks > MAX_BLOCKS) {
        throw new IOException("invalid number of blocks: " + numBlocks +
            ".  The maximum number of blocks per file is " + MAX_BLOCKS);
      }

      // 构造block数组blocks,大小即为numBlocks
      Block[] blocks = new Block[numBlocks];

      // 从输入流中读取numBlocks个数据块
      for (int i = 0; i < numBlocks; i++) {

    	// 构造数据块Block实例blk
    	Block blk = new Block();

    	// 调用Block的readFields()方法,从输入流读入数据块
        blk.readFields(in);

        // 将数据块blk放入数据块数组blocks
        blocks[i] = blk;
      }

      // 返回数据块数组blocks
      return blocks;
    }

很简单,先从输入流读取block数目numBlocks,确定一共需要读取多少个数据块,然后构造block数组blocks,大小即为numBlocks,最后从输入流中读取numBlocks个数据块,每次都是先构造数据块Block实例blk,调用Block的readFields()方法,从输入流读入数据块,然后将数据块blk放入数据块数组blocks。全部数据块读取完毕后,返回数据块数组blocks。

我们再看下数据块Block的readFields()方法,如下:

  @Override // Writable
  public void readFields(DataInput in) throws IOException {
    readHelper(in);
  }

继续看readHelper()方法,如下:

  final void readHelper(DataInput in) throws IOException {
    // 从输入流读取一个long,作为数据块艾迪blockId
	this.blockId = in.readLong();
	// 从输入流读取一个long,作为数据块大小numBytes
    this.numBytes = in.readLong();
    // 从输入流读取一个long,作为数据块产生的时间戳generationStamp
    this.generationStamp = in.readLong();

    // 校验:数据块大小numBytes应大于等于0
    if (numBytes < 0) {
      throw new IOException("Unexpected block size: " + numBytes);
    }
  }

从输入流依次读入数据块艾迪blockId、数据块大小numBytes、数据块产生的时间戳generationStamp即可,三者均为long类型。

总结

时间: 2024-08-14 18:57:59

HDFS源码分析EditLog之读取操作符的相关文章

Hadoop HDFS源码分析 关于数据块的类

Hadoop HDFS源码分析 关于数据块的类 1.BlocksMap 官方代码中的注释为: /** * This class maintains the map from a block to its metadata. * block's metadata currently includes blockCollection it belongs to and * the datanodes that store the block. */ BlocksMap数据块映射,管理名字节点上的数据

HDFS源码分析之LightWeightGSet

LightWeightGSet是名字节点NameNode在内存中存储全部数据块信息的类BlocksMap需要的一个重要数据结构,它是一个占用较低内存的集合的实现,它使用一个数组array存储元素,使用linked lists来解决冲突.它没有实现重新哈希分区,所以,内部的array不会改变大小.这个类不支持null元素,并且不是线程安全的.它在BlocksMap中的初始化如下: this.blocks = new LightWeightGSet<Block, BlockInfo>(capaci

HDFS源码分析数据块校验之DataBlockScanner

DataBlockScanner是运行在数据节点DataNode上的一个后台线程.它为所有的块池管理块扫描.针对每个块池,一个BlockPoolSliceScanner对象将会被创建,其运行在一个单独的线程中,为该块池扫描.校验数据块.当一个BPOfferService服务变成活跃或死亡状态,该类中的blockPoolScannerMap将会更新. 我们先看下DataBlockScanner的成员变量,如下: // 所属数据节点DataNode实例 private final DataNode

Hadoop-06-RPC机制以及HDFS源码分析

1.RPC机制 1.1.概述 RPC--远程过程调用协议,它是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议.RPC协议假定某些传输协议的存在,如TCP或UDP,为通信程序之间携带信息数据.在OSI网络通信模型中,RPC跨越了传输层和应用层.RPC使得开发包括网络分布式多程序在内的应用程序更加容易. RPC采用客户机/服务器模式.请求程序就是一个客户机,而服务提供程序就是一个服务器.首先,客户机调用进程发送一个有进程参数的调用信息到服务进程,然后等待应答信息.在服务器端,

Hbase写入hdfs源码分析

版权声明:本文由熊训德原创文章,转载请注明出处: 文章原文链接:https://www.qcloud.com/community/article/258 来源:腾云阁 https://www.qcloud.com/community 本文档从源码角度分析了,hbase作为dfs client写入hdfs的hadoop sequence文件最终刷盘落地的过程.之前在<wal线程模型源码分析>中描述wal的写过程时说过会写入hadoop sequence文件,hbase为了保证数据的安全性,一般都

HDFS源码分析(一)-----INode文件节点

前言 在linux文件系统中,i-node节点一直是一个非常重要的设计,同样在HDFS中,也存在这样的一个类似的角色,不过他是一个全新的类,INode.class,后面的目录类等等都是他的子类.最近学习了部分HDFS的源码结构,就好好理一理这方面的知识,帮助大家更好的从深层次了解Hadoop分布式系统文件. HDFS文件相关的类设计 在HDFS中与文件相关的类主要有这么几个 1.INode--这个就是最底层的一个类,抽象类,提炼一些文件目录共有的属性. 2.INodeFile--文件节点类,继承

【Spring源码分析】配置文件读取流程

前言 Spring配置文件读取流程本来是和http://www.cnblogs.com/xrq730/p/6285358.html一文放在一起的,这两天在看Spring自定义标签的时候,感觉对Spring配置文件读取流程还是研究得不够,因此将Spring配置文件读取流程部分从之前的文章拆出来单独成为一文. 为了看一下Spring配置文件加载流程,先定义一个bean.xml: 1 <?xml version="1.0" encoding="UTF-8"?>

Hadoop HDFS源码分析 读取命名空间镜像和编辑日志数据

读取命名空间镜像和编辑日志数据 1.读取命名空间镜像 类FSImage是 命名空间镜像的java实现,在源码中,英文注释为, /** * FSImage handles checkpointing and logging of the namespace edits. * */ FSImage.loadFSImage(FSNamesystem, StartupOption, MetaRecoveryContext) 读取命名空间镜像. 1 private boolean loadFSImage(

HDFS源码分析(二)-----元数据备份机制

前言 在Hadoop中,所有的元数据的保存都是在namenode节点之中,每次重新启动整个集群,Hadoop都需要从这些持久化了的文件中恢复数据到内存中,然后通过镜像和编辑日志文件进行定期的扫描与合并,ok,这些稍微了解Hadoop的人应该都知道,这不就是SecondNameNode干的事情嘛,但是很多人只是了解此机制的表象,内部的一些实现机理估计不是每个人都又去深究过,你能想象在写入编辑日志的过程中,用到了双缓冲区来加大并发量的写吗,你能想象为了避免操作的一致性性,作者在写入的时候做过多重的验