EditLog源码分析之获取编辑日志输入流

《HDFS源码分析之EditLogTailer》一文中,我们详细了解了编辑日志跟踪器EditLogTailer的实现,介绍了其内部编辑日志追踪线程EditLogTailerThread的实现,及其线程完成编辑日志跟踪所依赖的最重要的方法,执行日志追踪的doTailEdits()方法。在该方法的处理流程中,首先需要从编辑日志editLog中获取编辑日志输入流集合streams,获取的输入流为最新事务ID加1之后的数据。那么这个编辑日志输入流集合streams是如何获取的呢?本文我们将进行详细研究。

在doTailEdits()方法中,获取编辑日志输入流的代码如下:

// 从编辑日志editLog中获取编辑日志输入流集合streams,获取的输入流为最新事务ID加1之后的数据
streams = editLog.selectInputStreams(lastTxnId + 1, 0, null, false);  

这个editLog便是一个文件系统编辑日志FSEditLog实例,我们看下它的selectInputStreams()方法,代码如下:

  /**
   * Select a list of input streams.
   *
   * @param fromTxId first transaction in the selected streams
   * @param toAtLeastTxId the selected streams must contain this transaction
   * @param recovery recovery context
   * @param inProgressOk set to true if in-progress streams are OK
   */
  public Collection<EditLogInputStream> selectInputStreams(
      long fromTxId, long toAtLeastTxId, MetaRecoveryContext recovery,
      boolean inProgressOk) throws IOException {

	// 创建编辑日志输入流EditLogInputStream列表streams
    List<EditLogInputStream> streams = new ArrayList<EditLogInputStream>();

    // 在Object对象journalSetLock上使用synchronized进行同步
    synchronized(journalSetLock) {

      // 检测journalSet状态
      Preconditions.checkState(journalSet.isOpen(), "Cannot call " +
          "selectInputStreams() on closed FSEditLog");

      // 调用三个参数的selectInputStreams()方法,传入空的streams列表,从fromTxId事务ID开始,
      // 编辑日志同步时,标志位inProgressOk为false
      selectInputStreams(streams, fromTxId, inProgressOk);
    }

    try {
      // 数据监测
      checkForGaps(streams, fromTxId, toAtLeastTxId, inProgressOk);
    } catch (IOException e) {
      if (recovery != null) {
        // If recovery mode is enabled, continue loading even if we know we
        // can't load up to toAtLeastTxId.
        LOG.error(e);
      } else {
        closeAllStreams(streams);
        throw e;
      }
    }
    return streams;
  }

它首先会创建编辑日志输入流EditLogInputStream列表streams,并在Object对象journalSetLock上使用synchronized进行同步,检测journalSet状态,随后调用三个参数的selectInputStreams()方法,传入空的streams列表,从fromTxId事务ID开始,编辑日志是否可处于处理过程中的标志位inProgressOk,编辑日志同步时,标志位inProgressOk为false,最后,调用checkForGaps()方法进行相关数据监测。

我们继续看下三个参数的selectInputStreams()方法,代码如下:

  @Override
  public void selectInputStreams(Collection<EditLogInputStream> streams,
      long fromTxId, boolean inProgressOk) throws IOException {

	// 调用JournalSet的同名方法
    journalSet.selectInputStreams(streams, fromTxId, inProgressOk);
  }

它其实是调用JournalSet的同名方法。JournalSet是什么呢?它是Journal集合的管理者,而Journal就是日志的意思,它是Hadoop HA中EditLog在JournalNode上的组织形式。我们看下JournalSet的selectInputStreams()方法,代码如下:

  /**
   * In this function, we get a bunch of streams from all of our JournalManager
   * objects.  Then we add these to the collection one by one.
   * 在这个方法内,我们从所有JournalManager对象中得到一堆输入流。接着我们把它们一个接一个的添加到集合中。
   *
   * @param streams          The collection to add the streams to.  It may or
   *                         may not be sorted-- this is up to the caller.
   * @param fromTxId         The transaction ID to start looking for streams at
   * @param inProgressOk     Should we consider unfinalized streams?
   */
  @Override
  public void selectInputStreams(Collection<EditLogInputStream> streams,
      long fromTxId, boolean inProgressOk) throws IOException {
    final PriorityQueue<EditLogInputStream> allStreams =
        new PriorityQueue<EditLogInputStream>(64,
            EDIT_LOG_INPUT_STREAM_COMPARATOR);

    // 遍历journals,得到每个JournalAndStream实例jas
    for (JournalAndStream jas : journals) {

      // 如果jas不可用,记录日志,跳过
      if (jas.isDisabled()) {
        LOG.info("Skipping jas " + jas + " since it's disabled");
        continue;
      }

      // 利用jas得到JournalManager实例,然后调用其selectInputStreams()方法,获得输入流,并放入allStreams输入流集合
      try {
        jas.getManager().selectInputStreams(allStreams, fromTxId, inProgressOk);
      } catch (IOException ioe) {
        LOG.warn("Unable to determine input streams from " + jas.getManager() +
            ". Skipping.", ioe);
      }
    }

    // 过滤掉无用的编辑日志输入流
    chainAndMakeRedundantStreams(streams, allStreams, fromTxId);
  }

这个方法的大体处理流程如下:

1、遍历JournalSet中的journals,得到每个JournalAndStream实例jas:

1.1、如果jas不可用,记录日志,跳过;

1.2、利用jas得到JournalManager实例,然后调用其selectInputStreams()方法,获得输入流,并放入allStreams输入流集合;

2、调用chainAndMakeRedundantStreams()方法,过滤掉无用的编辑日志输入流。

首先,我们来看下JournalManager实例的selectInputStreams()方法,我们以FileJournalManager为例,代码如下:

  @Override
  synchronized public void selectInputStreams(
      Collection<EditLogInputStream> streams, long fromTxId,
      boolean inProgressOk) throws IOException {

	// 先调用StorageDirectory的getCurrentDir()方法获得其current目录,
	// 然后再调用matchEditLogs()方法,获得编辑日志文件EditLogFile列表elfs
    List<EditLogFile> elfs = matchEditLogs(sd.getCurrentDir());
    LOG.debug(this + ": selecting input streams starting at " + fromTxId +
        (inProgressOk ? " (inProgress ok) " : " (excluding inProgress) ") +
        "from among " + elfs.size() + " candidate file(s)");

    // 调用addStreamsToCollectionFromFiles()方法,根据编辑日志文件列表elfs获得输入流,并添加到输入流列表streams
    addStreamsToCollectionFromFiles(elfs, streams, fromTxId, inProgressOk);
  }

这个方法的处理流程如下:

1、先调用StorageDirectory的getCurrentDir()方法获得其current目录,然后再调用matchEditLogs()方法,获得编辑日志文件EditLogFile列表elfs;

2、调用addStreamsToCollectionFromFiles()方法,根据编辑日志文件列表elfs获得输入流,并添加到输入流列表streams。

我们再看下matchEditLogs()方法,代码如下:

  /**
   * returns matching edit logs via the log directory. Simple helper function
   * that lists the files in the logDir and calls matchEditLogs(File[])
   *
   * @param logDir
   *          directory to match edit logs in
   * @return matched edit logs
   * @throws IOException
   *           IOException thrown for invalid logDir
   */
  public static List<EditLogFile> matchEditLogs(File logDir) throws IOException {
    return matchEditLogs(FileUtil.listFiles(logDir));
  }

  static List<EditLogFile> matchEditLogs(File[] filesInStorage) {
    return matchEditLogs(filesInStorage, false);
  }

  private static List<EditLogFile> matchEditLogs(File[] filesInStorage,
      boolean forPurging) {

	// 创建编辑日志文件EditLogFile列表ret
    List<EditLogFile> ret = Lists.newArrayList();

    // 遍历filesInStorage,对每个文件进行处理
    for (File f : filesInStorage) {

      // 获得文件名
      String name = f.getName();

      // Check for edits
      // 根据文件名,利用正则表达式,检测其是否为编辑日志edits log
      // 正则表达式为edits_(\d+)-(\d+)
      // 原edits_(\\d+)-(\\d+)中d前面的两个\中第一个只是转义符
      // 文件名类似如下:edits_0000000000001833048-0000000000001833081
      // 第一串数字为起始事务ID,第二串数字为终止事务ID
      Matcher editsMatch = EDITS_REGEX.matcher(name);

      if (editsMatch.matches()) {// 文件名匹配正则表达式的话,说明其是我们需要寻找的编辑日志文件
        try {

          // 获取起始事务ID:正则表达式中第一个匹配的是起始事务ID
          long startTxId = Long.parseLong(editsMatch.group(1));

          // 获取终止事务ID:正则表达式中第二个匹配的是终止事务ID
          long endTxId = Long.parseLong(editsMatch.group(2));

          // 利用文件f、起始事务艾迪startTxId、终止事务艾迪endTxId构造编辑日志文件EditLogFile实例,
          // 并添加到ret列表中
          ret.add(new EditLogFile(f, startTxId, endTxId));
          continue;
        } catch (NumberFormatException nfe) {
          LOG.error("Edits file " + f + " has improperly formatted " +
                    "transaction ID");
          // skip
        }
      }

      // Check for in-progress edits
      // 检测正在处理中的编辑日志
      // 正则表达式为edits_inprogress_(\d+)
      // 原edits_inprogress_(\\d+)中d前面的两个\中第一个只是转义符
      Matcher inProgressEditsMatch = EDITS_INPROGRESS_REGEX.matcher(name);
      if (inProgressEditsMatch.matches()) {
        try {
          long startTxId = Long.parseLong(inProgressEditsMatch.group(1));
          ret.add(
              new EditLogFile(f, startTxId, HdfsConstants.INVALID_TXID, true));
          continue;
        } catch (NumberFormatException nfe) {
          LOG.error("In-progress edits file " + f + " has improperly " +
                    "formatted transaction ID");
          // skip
        }
      }
      if (forPurging) {
        // Check for in-progress stale edits
        Matcher staleInprogressEditsMatch = EDITS_INPROGRESS_STALE_REGEX
            .matcher(name);
        if (staleInprogressEditsMatch.matches()) {
          try {
            long startTxId = Long.valueOf(staleInprogressEditsMatch.group(1));
            ret.add(new EditLogFile(f, startTxId, HdfsConstants.INVALID_TXID,
                true));
            continue;
          } catch (NumberFormatException nfe) {
            LOG.error("In-progress stale edits file " + f + " has improperly "
                + "formatted transaction ID");
            // skip
    }
        }
      }
    }
    return ret;
  }

我们看下最核心的两个参数的matchEditLogs()方法,它的处理流程为:

1、创建编辑日志文件EditLogFile列表ret;

2、遍历filesInStorage,对每个文件进行处理:

2.1、获得文件名name;

2.2、根据文件名name,利用正则表达式,检测其是否为编辑日志edits log:

利用的正则表达式为edits_(\d+)-(\d+),原edits_(\\d+)-(\\d+)中d前面的两个\中第一个只是转义符,文件名类似如下:edits_0000000000001833048-0000000000001833081,第一串数字为起始事务ID,第二串数字为终止事务ID;

2.3、文件名匹配正则表达式的话,说明其是我们需要寻找的编辑日志文件:

2.3.1、获取起始事务艾迪startTxId:正则表达式中第一个匹配的是起始事务ID;

2.3.2、获取终止事务艾迪endTxId:正则表达式中第二个匹配的是终止事务ID;

2.3.3、利用文件f、起始事务艾迪startTxId、终止事务艾迪endTxId构造编辑日志文件EditLogFile实例,并添加到ret列表中;

2.3.4、continue,继续循环下一个文件;

2.4、根据文件名name,利用正则表达式,检测其是否为正在处理中的编辑日志edits log:

利用的正则表达式为edits_inprogress_(\d+),原edits_inprogress_(\\d+)中d前面的两个\中第一个只是转义符,文件名类似如下:edits_inprogress_0000000000001853186,后面的字符串为起始事务ID;

2.5、文件名匹配正则表达式的话,说明其是我们需要寻找的正在处理中的编辑日志文件:

2.5.1、获取起始事务艾迪startTxId:正则表达式中第一个匹配的是起始事务ID;

2.5.2、利用文件f、起始事务艾迪startTxId、终止事务艾迪-12345构造编辑日志文件EditLogFile实例,并添加到ret列表中;

2.5.3、continue,继续循环下一个文件;

3、返回编辑日志文件EditLogFile列表ret。

再回到FileJournalManager的selectInputStreams()方法,我们看下它的第二步:调用addStreamsToCollectionFromFiles()方法,根据编辑日志文件列表elfs添加输入流列表streams,代码如下:

  static void addStreamsToCollectionFromFiles(Collection<EditLogFile> elfs,
      Collection<EditLogInputStream> streams, long fromTxId, boolean inProgressOk) {

	// 遍历EditLogFile集合elfs,针对每个EditLogFile实例elf进行如下处理:
	for (EditLogFile elf : elfs) {

	  // 如果elf处于处理过程中:
	  if (elf.isInProgress()) {

		// 如果不需要获取处于处理过程中的编辑日志,直接跳过
		if (!inProgressOk) {
          LOG.debug("passing over " + elf + " because it is in progress " +
              "and we are ignoring in-progress logs.");
          continue;
        }

		// 校验编辑日志,校验不成功的话,直接跳过
        try {
          elf.validateLog();
        } catch (IOException e) {
          LOG.error("got IOException while trying to validate header of " +
              elf + ".  Skipping.", e);
          continue;
        }
      }

	  // 如果elf的最后事务艾迪lastTxId小于我们获取编辑日志的起始事务艾迪fromTxId,直接跳过
      if (elf.lastTxId < fromTxId) {
        assert elf.lastTxId != HdfsConstants.INVALID_TXID;
        LOG.debug("passing over " + elf + " because it ends at " +
            elf.lastTxId + ", but we only care about transactions " +
            "as new as " + fromTxId);
        continue;
      }

      // 利用elf中的文件file、起始事务艾迪firstTxId、终止事务艾迪lastTxId、编辑日志文件是否处于处理过程中标志位isInProgress,
      // 构造编辑日志文件输入流EditLogFileInputStream实例elfis
      EditLogFileInputStream elfis = new EditLogFileInputStream(elf.getFile(),
            elf.getFirstTxId(), elf.getLastTxId(), elf.isInProgress());
      LOG.debug("selecting edit log stream " + elf);

      // 将elfis加入到输入流列表streams
      streams.add(elfis);
    }
  }

它的处理流程如下:

遍历EditLogFile集合elfs,针对每个EditLogFile实例elf进行如下处理:

1、如果elf处于处理过程中,同时如果不需要获取处于处理过程中的编辑日志,直接跳过,否则校验编辑日志,校验不成功的话,直接跳过,成功则继续;

2、如果elf的最后事务艾迪lastTxId小于我们获取编辑日志的起始事务艾迪fromTxId,直接跳过,否则继续;

3、利用elf中的文件file、起始事务艾迪firstTxId、终止事务艾迪lastTxId、编辑日志文件是否处于处理过程中标志位isInProgress,构造编辑日志文件输入流EditLogFileInputStream实例elfis;

4、将elfis加入到输入流列表streams。

时间: 2024-10-25 15:50:48

EditLog源码分析之获取编辑日志输入流的相关文章

UiAutomator源码分析之获取控件信息

根据上一篇文章<UiAutomator源码分析之注入事件>开始时提到的计划,这一篇文章我们要分析的是第二点: 如何获取控件信息 我们在测试脚本中初始化一个UiObject的时候通常是像以下这个样子: UiObject appsTab = new UiObject(new UiSelector().text("Apps")); appsTab.click() 那么这个过程发生了什么呢?这就是我们接下来要说的事情了. 1. 获取控件信息顺序图 这里依然是一个手画的不规范的顺序图

HDFS源码分析EditLog之读取操作符

在<HDFS源码分析EditLog之获取编辑日志输入流>一文中,我们详细了解了如何获取编辑日志输入流EditLogInputStream.在我们得到编辑日志输入流后,是不是就该从输入流中获取数据来处理呢?答案是显而易见的!在<HDFS源码分析之EditLogTailer>一文中,我们在讲编辑日志追踪同步时,也讲到了如下两个连续的处理流程: 4.从编辑日志editLog中获取编辑日志输入流集合streams,获取的输入流为最新事务ID加1之后的数据 5.调用文件系统镜像FSImage

Robotium源码分析之Instrumentation进阶

在分析Robotium的运行原理之前,我们有必要先搞清楚Instrumentation的一些相关知识点,因为Robotium就是基于Instrumentation而开发出来的一套自动化测试框架.鉴于之前本人已经转载和编写了Instrumentation的一些文章,所以建议大家如果没有看过的还是翻看下先对Instrumentation有个基本的理解.然后带着疑问再来看这篇文章看是否能帮上自己. 既然是分析Instrumentation,那么我们必须要先看下Instrumentation 这个类的类

死磕 java并发包之AtomicInteger源码分析

问题 (1)什么是原子操作? (2)原子操作和数据库的ACID有啥关系? (3)AtomicInteger是怎么实现原子操作的? (4)AtomicInteger是有什么缺点? 简介 AtomicInteger是java并发包下面提供的原子类,主要操作的是int类型的整型,通过调用底层Unsafe的CAS等方法实现原子操作. 还记得Unsafe吗?点击链接直达[死磕 java魔法类之Unsafe解析] 原子操作 原子操作是指不会被线程调度机制打断的操作,这种操作一旦开始,就一直运行到结束,中间不

【Spring源码分析】原型Bean实例化过程、byName与byType及FactoryBean获取Bean源码实现

原型Bean加载过程 之前的文章,分析了非懒加载的单例Bean整个加载过程,除了非懒加载的单例Bean之外,Spring中还有一种Bean就是原型(Prototype)的Bean,看一下定义方式: 1 <?xml version="1.0" encoding="UTF-8"?> 2 <beans xmlns="http://www.springframework.org/schema/beans" 3 xmlns:xsi=&qu

8. SOFAJRaft源码分析— JRaft是如何实现日志复制的?

前言 前几天和腾讯的大佬一起吃饭聊天,说起我对SOFAJRaft的理解,我自然以为我是很懂了的,但是大佬问起了我那SOFAJRaft集群之间的日志是怎么复制的? 我当时哑口无言,说不出是怎么实现的,所以这次来分析一下SOFAJRaft中日志复制是怎么做的. Leader发送探针获取Follower的LastLogIndex Leader 节点在通过 Replicator 和 Follower 建立连接之后,要发送一个 Probe 类型的探针请求,目的是知道 Follower 已经拥有的的日志位置

9. SOFAJRaft源码分析— Follower如何通过Snapshot快速追上Leader日志?

前言 引入快照机制主要是为了解决两个问题: JRaft新节点加入后,如何快速追上最新的数据 Raft 节点出现故障重新启动后如何高效恢复到最新的数据 Snapshot 源码分析 生成 Raft 节点的快照文件 如果用户需开启 SOFAJRaft 的 Snapshot 机制,则需要在其客户端中设置配置参数类 NodeOptions 的"snapshotUri"属性(即为:Snapshot 文件的存储路径),配置该属性后,默认会启动一个定时器任务("JRaft-SnapshotT

java使用websocket,并且获取HttpSession,源码分析

一:本文使用范围 此文不仅仅局限于spring boot,普通的spring工程,甚至是servlet工程,都是一样的,只不过配置一些监听器的方法不同而已. 本文经过作者实践,确认完美运行. 二:Spring boot使用websocket 2.1:依赖包 websocket本身是servlet容器所提供的服务,所以需要在web容器中运行,像我们所使用的tomcat,当然,spring boot中已经内嵌了tomcat. websocket遵循了javaee规范,所以需要引入javaee的包 <

Flume NG源码分析(五)使用ThriftSource通过RPC方式收集日志

上一篇说了利用ExecSource从本地日志文件异步的收集日志,这篇说说采用RPC方式同步收集日志的方式.笔者对Thrift比较熟悉,所以用ThriftSource来介绍RPC的日志收集方式. 整体的结构图如下: 1. ThriftSource包含了一个Thrift Server,以及一个Thrift Service服务的实现.这里的Thrift Service是由ThriftSourceProtocol定义 2. 应用程序调用Thrift Service的客户端,以RPC的方式将日志发送到Th