HBase源代码分析之HRegionServer上MemStore的flush处理流程(二)

继上篇文章《HBase源代码分析之HRegionServer上MemStore的flush处理流程(一)》遗留的问题之后,本文我们接着研究HRegionServer上MemStore的flush处理流程。重点讲述下怎样选择一个HRegion进行flush以缓解MemStore压力,还有HRegion的flush是怎样发起的。

我们先来看下第一个问题:怎样选择一个HRegion进行flush以缓解MemStore压力。上文中我们讲到过flush处理线程假设从flushQueue队列中拉取出的一个FlushQueueEntry为为空,或者为WakeupFlushThread,而且通过isAboveLowWaterMark()方法推断全局MemStore的大小高于限制值得低水平线,调用flushOneForGlobalPressure()方法,依照一定策略,flush一个HRegion的MemStore,减少MemStore的大小。预防OOM等异常情况的发生。

以下。我们重点分析下flushOneForGlobalPressure()方法,代码例如以下:

/**
   * The memstore across all regions has exceeded the low water mark. Pick
   * one region to flush and flush it synchronously (this is called from the
   * flush thread)
   *
   * 全部region的memstore已超过最低水平。
   * 选择一个region同步刷新。
   * 被flush线程调用
   *
   * @return true if successful
   */
  private boolean flushOneForGlobalPressure() {

	// 获取RegionServer上的在线Region,依据Region的memstoreSize大小倒序排列。得到regionsBySize
    SortedMap<Long, HRegion> regionsBySize =
        server.getCopyOfOnlineRegionsSortedBySize();

    // 构造被排除的Region集合excludedRegions
    Set<HRegion> excludedRegions = new HashSet<HRegion>();

    boolean flushedOne = false;// 标志位
    while (!flushedOne) {// 循环一次,没有选中的话,再循环,直到选中或者没有可选的Region

      // Find the biggest region that doesn't have too many storefiles
      // (might be null!)
      // 选择一个Memstore最大的而且不含太多storefiles的region作为最有可能被选中的region,即bestFlushableRegion
      HRegion bestFlushableRegion = getBiggestMemstoreRegion(
          regionsBySize, excludedRegions, true);

      // Find the biggest region, total, even if it might have too many flushes.
      // 选择一个Memstore最大的region,即便是它包括太多storefiles,作为终于可以被选中的备份方案,即bestAnyRegion
      HRegion bestAnyRegion = getBiggestMemstoreRegion(
          regionsBySize, excludedRegions, false);

      // 在内存上阈值之上可是没有可以flush的region的话,直接返回false
      if (bestAnyRegion == null) {
        LOG.error("Above memory mark but there are no flushable regions!");
        return false;
      }

      HRegion regionToFlush;

      // 选择须要flush的region
      // 假设bestAnyRegion的的memstore大小大于bestFlushableRegion的两倍。则选取bestAnyRegion
      if (bestFlushableRegion != null &&
          bestAnyRegion.memstoreSize.get() > 2 * bestFlushableRegion.memstoreSize.get()) {
        // Even if it's not supposed to be flushed, pick a region if it's more than twice
        // as big as the best flushable one - otherwise when we're under pressure we make
        // lots of little flushes and cause lots of compactions, etc, which just makes
        // life worse!
        if (LOG.isDebugEnabled()) {
          LOG.debug("Under global heap pressure: " +
            "Region " + bestAnyRegion.getRegionNameAsString() + " has too many " +
            "store files, but is " +
            StringUtils.humanReadableInt(bestAnyRegion.memstoreSize.get()) +
            " vs best flushable region's " +
            StringUtils.humanReadableInt(bestFlushableRegion.memstoreSize.get()) +
            ". Choosing the bigger.");
        }
        regionToFlush = bestAnyRegion;
      } else {// 否则。优先选取bestFlushableRegion
        if (bestFlushableRegion == null) {
          regionToFlush = bestAnyRegion;
        } else {
          regionToFlush = bestFlushableRegion;
        }
      }

      // 检測状态:被选中Region的memstoreSize必须大于0
      Preconditions.checkState(regionToFlush.memstoreSize.get() > 0);

      LOG.info("Flush of region " + regionToFlush + " due to global heap pressure");

      // 调用flushRegion()方法。针对单个Region,进行MemStore的flush
      flushedOne = flushRegion(regionToFlush, true);
      if (!flushedOne) {// flush失败则加入到excludedRegions集合中。避免下次再被选中
        LOG.info("Excluding unflushable region " + regionToFlush +
          " - trying to find a different region to flush.");
        excludedRegions.add(regionToFlush);
      }
    }
    return true;
  }

我们来总结下这种方法的处理逻辑。例如以下:

1、获取RegionServer上的在线Region。依据Region的memstoreSize大小倒序排列,得到regionsBySize。

2、构造被排除的Region集合excludedRegions;

3、标志位flushedOne设置为false;

4、循环。直到标志位flushedOne为true,即存在Region被选中,或者根本没有可选的Region:

4.1、循环regionsBySize,选择一个Memstore最大的而且不含太多storefiles的region作为最有可能被选中的region,即bestFlushableRegion:

4.1.1、假设当前region在excludedRegions列表中,直接跳过;

4.1.2、假设当前region的写状态为正在flush,或者当前region的写状态不是写启用,直接跳过;

4.1.3、假设须要检查StoreFile数目,且包括太多StoreFiles。也直接跳过;

4.1.4、否则返回该region;

4.2、循环regionsBySize。选择一个Memstore最大的region,即便是它包括太多storefiles,作为终于能够被选中的备份方案,即bestAnyRegion:

4.2.1、假设当前region在excludedRegions列表中。直接跳过;

4.2.2、假设当前region的写状态为正在flush,或者当前region的写状态不是写启用,直接跳过;

4.2.3、否则返回该region。

4.3、在内存上阈值之上可是没有可以flush的region的话。直接返回false。

4.4、选择须要flush的region:

4.4.1、假设bestAnyRegion的的memstore大小大于bestFlushableRegion的两倍。则选取bestAnyRegion;

4.4.2、否则,优先选取bestFlushableRegion;

4.5、检測状态:被选中Region的memstoreSize必须大于0。

4.6、调用flushRegion()方法,针对单个Region。进行MemStore的flush;

4.7、flush失败则加入到excludedRegions集合中,避免下次再被选中。

以上就是依照一定策略选择一个HRegion进行MemStore的flush以缓解MemStore压力的方法。那么,剩下的flush指定HRegion的问题就同接下来我们将要讲的HRegion的flush是怎样发起的一致了。我们先看下带一个參数的flushRegion()方法。代码例如以下:

/*
   * A flushRegion that checks store file count.  If too many, puts the flush
   * on delay queue to retry later.
   *
   * 一个待刷新的Region首先会检測store file的数目,假设太多。会把该region的刷新推迟并稍后再试,否则马上刷新。
   *
   * @param fqe
   * @return true if the region was successfully flushed, false otherwise. If
   * false, there will be accompanying log messages explaining why the region was
   * not flushed.
   */
  private boolean flushRegion(final FlushRegionEntry fqe) {
    HRegion region = fqe.region;
    if (!region.getRegionInfo().isMetaRegion() &&
        isTooManyStoreFiles(region)) {// 假设Region不是MetaRegion且Region上有太多的StoreFiles

      if (fqe.isMaximumWait(this.blockingWaitTime)) {
    	// 假设已堵塞指定时间。记录日志并运行刷新
        LOG.info("Waited " + (EnvironmentEdgeManager.currentTime() - fqe.createTime) +
          "ms on a compaction to clean up 'too many store files'; waited " +
          "long enough... proceeding with flush of " +
          region.getRegionNameAsString());
      } else {
        // If this is first time we've been put off, then emit a log message.
    	// 假设是第一次推迟,并对该HRegion请求分裂或系统合并。记录一条日志信息
        if (fqe.getRequeueCount() <= 0) {
          // Note: We don't impose blockingStoreFiles constraint on meta regions
          // 注意:我们不强加blockingstorefiles约束元区域
          LOG.warn("Region " + region.getRegionNameAsString() + " has too many " +
            "store files; delaying flush up to " + this.blockingWaitTime + "ms");

          // 对该HRegion先请求分裂Split,分裂不成功的话再请求系统合并SystemCompaction
          if (!this.server.compactSplitThread.requestSplit(region)) {
            try {
              this.server.compactSplitThread.requestSystemCompaction(
                  region, Thread.currentThread().getName());
            } catch (IOException e) {
              LOG.error(
                "Cache flush failed for region " + Bytes.toStringBinary(region.getRegionName()),
                RemoteExceptionHandler.checkIOException(e));
            }
          }
        }

        // Put back on the queue.  Have it come back out of the queue
        // after a delay of this.blockingWaitTime / 100 ms.
        // 再放回队列,等待900ms(參数可配置)后。再从队列中取出来
        this.flushQueue.add(fqe.requeue(this.blockingWaitTime / 100));
        // Tell a lie, it's not flushed but it's ok
        // 佯言。该Region没有被flush,可是应该返回true
        return true;
      }
    }

    // 调用两个參数的flushRegion()方法。通知HRegion运行flush
    return flushRegion(region, false);
  }

这个带一个參数的flushRegion()方法。实际上是在拿到一个待flush的HRegion的封装体FlushRegionEntry类型的fqe后,对其做一些必要的推断,决定是直接进行flush还是推后运行,且在第一次推后前。假设须要。则做分裂或系统合并处理。详细处理逻辑例如以下:

1、假设Region不是MetaRegion且Region上有太多的StoreFiles:

1.1、通过isMaximumWait()推断堵塞时间,已堵塞达到或超过指定时间。记录日志并运行flush,跳到2,结束。

1.2、假设是第一次推迟。记录一条日志信息。然后对该HRegion先请求分裂Split。分裂不成功的话再请求系统合并SystemCompaction。

1.3、再将fqe放回到队列flushQueue。添加延迟时间900ms(參数可配置),等到到期后再从队列中取出来进行处理;

1.4、佯言,该Region被推迟进行flush,结果还不确定,所以应该返回true;

2、调用两个參数的flushRegion()方法,通知HRegion运行flush。

怎样进行堵塞时间的推断呢?非常easy。推断当前时间减去创建时间是否大于指定时间就OK了。代码例如以下:

/**
     * @param maximumWait
     * @return True if we have been delayed > <code>maximumWait</code> milliseconds.
     */
    public boolean isMaximumWait(final long maximumWait) {
      return (EnvironmentEdgeManager.currentTime() - this.createTime) > maximumWait;
    }

好了,是时候该分析这个带有两个參数的flushRegion()方法了。先上代码。再做分析:

/*
   * Flush a region.
   * @param region Region to flush.
   * @param emergencyFlush Set if we are being force flushed. If true the region
   * needs to be removed from the flush queue. If false, when we were called
   * from the main flusher run loop and we got the entry to flush by calling
   * poll on the flush queue (which removed it).
   *
   * @return true if the region was successfully flushed, false otherwise. If
   * false, there will be accompanying log messages explaining why the region was
   * not flushed.
   *
   * 刷新region
   */
  private boolean flushRegion(final HRegion region, final boolean emergencyFlush) {
    long startTime = 0;
    synchronized (this.regionsInQueue) {

      // 先从regionsInQueue中移除相应的HRegion信息
      FlushRegionEntry fqe = this.regionsInQueue.remove(region);
      // Use the start time of the FlushRegionEntry if available
      if (fqe != null) {
        // 获取flush的開始时间startTime
    	startTime = fqe.createTime;
      }
      if (fqe != null && emergencyFlush) {
        // Need to remove from region from delay queue.  When NOT an
        // emergencyFlush, then item was removed via a flushQueue.poll.
    	// 须要从flushQueue队列中移除,假设不是紧急刷新,fqe将通过flushQueue.poll被移除
    	// 由于假设是flush线程处理的,run()方法会周期性的从flushQueue队列取feq,而且假设取出的为null或者WakeupFlushThread,
        // 它会在MemStore位于低水平线上时。依照一定策略选择一个HRegion。包装成fqe进行flush。以减少MemStore,避免OOM等风险。
    	// 此时。假设fqe位于flushQueue中,须要被移除,移除的推断就是这个emergencyFlush是否为true,
    	// 由于通过线程在到期的正常情况下进行处理的,会传入false,而为减少风险进行紧急flush的,会传入true,此时就须要从队列中移除,也是为了避免做反复工作
        flushQueue.remove(fqe);
     }
    }

    // 获取flush的開始时间startTime
    if (startTime == 0) {
      // Avoid getting the system time unless we don't have a FlushRegionEntry;
      // shame we can't capture the time also spent in the above synchronized
      // block
      startTime = EnvironmentEdgeManager.currentTime();
    }

    // 上读锁,意味着与其它拥有读锁的线程不冲突,能够同步进行,而与拥有写锁的线程相互排斥
    lock.readLock().lock();
    try {

      // 通过监听器Listener通知flush请求者flush的type
      notifyFlushRequest(region, emergencyFlush);

      // 调用HRegion的flushcache()方法,运行MemStore的flush
      HRegion.FlushResult flushResult = region.flushcache();

      // 依据flush的结果,推断下一步该做怎样处理

      // 推断是否应该进行合并compact
      boolean shouldCompact = flushResult.isCompactionNeeded();
      // We just want to check the size

      // 检測是否应该进行分裂split
      boolean shouldSplit = region.checkSplit() != null;

      // 必要的情况下,先进行split,再进行system compact
      if (shouldSplit) {
        this.server.compactSplitThread.requestSplit(region);
      } else if (shouldCompact) {
        server.compactSplitThread.requestSystemCompaction(
            region, Thread.currentThread().getName());
      }

      // 假设flush成功,获取flush结束时间。计算耗时,记录HRegion上的度量信息
      if (flushResult.isFlushSucceeded()) {
        long endTime = EnvironmentEdgeManager.currentTime();
        server.metricsRegionServer.updateFlushTime(endTime - startTime);
      }
    } catch (DroppedSnapshotException ex) {
      // Cache flush can fail in a few places. If it fails in a critical
      // section, we get a DroppedSnapshotException and a replay of wal
      // is required. Currently the only way to do this is a restart of
      // the server. Abort because hdfs is probably bad (HBASE-644 is a case
      // where hdfs was bad but passed the hdfs check).
      server.abort("Replay of WAL required. Forcing server shutdown", ex);
      return false;
    } catch (IOException ex) {
      LOG.error("Cache flush failed" +
        (region != null ?

(" for region " + Bytes.toStringBinary(region.getRegionName())) : ""),
        RemoteExceptionHandler.checkIOException(ex));
      if (!server.checkFileSystem()) {
        return false;
      }
    } finally {
      // 释放读锁
      lock.readLock().unlock();

      // 唤醒堵塞的其它线程
      wakeUpIfBlocking();
    }
    return true;
  }

带有两个參数的flushRegion()方法大体逻辑例如以下:

1、首选处理regionsInQueue集合和flushQueue队列:

1.1、先从regionsInQueue中移除相应的HRegion信息,这个不管是否紧急flush,都是必需要做的;

1.2、获取flush的開始时间startTime;

1.3、假设是紧急刷新,须要从flushQueue队列中移除相应的fqe,假设不是紧急刷新,fqe将通过flushQueue.poll被移除。

2、假设startTime为null,获取flush的開始时间startTime。

3、上读锁,意味着与其它拥有读锁的线程不冲突。能够同步进行,而与拥有写锁的线程相互排斥(后期将会写专门的文章分析HBase内部各流程中锁的应用)。

4、通过监听器Listener通知flush请求者flush的type;

5、调用HRegion的flushcache()方法。运行MemStore的flush。并获得flush结果;

6、依据flush的结果,推断下一步该做怎样处理:

6.1、依据flush结果推断是否应该进行合并compact。即标志位shouldCompact;

6.2、调用HRegion的checkSplit()方法检測是否应该进行分裂split。即标志位shouldSplit;

6.3、通过两个标志位推断,必要的情况下,先进行split,再进行system compact;

7、假设flush成功,获取flush结束时间,计算耗时,记录HRegion上的度量信息。

8、最后释放读锁。唤醒堵塞的其它线程。

这里,先有必要解释下对flushQueue的特殊处理,假设是紧急刷新,须要从flushQueue队列中移除相应的fqe,假设不是紧急刷新,fqe将通过flushQueue.poll被移除。

由于假设是flush线程处理的,run()方法会周期性的从flushQueue队列取feq,而且假设取出的为null或者WakeupFlushThread,它会在MemStore位于低水平线上时。依照一定策略选择一个HRegion,包装成fqe进行flush,以减少MemStore,避免OOM等风险,此时,假设fqe位于flushQueue中。须要被移除,移除的推断就是这个emergencyFlush是否为true。由于通过线程在到期的正常情况下进行处理的。会传入false,而为减少风险进行紧急flush的。会传入true,此时就须要从队列中移除,也是为了避免做反复工作。

通过监听器Listener通知flush请求者flush的type也非常easy。也做凝视了,不再解释。代码例如以下:

private void notifyFlushRequest(HRegion region, boolean emergencyFlush) {

	// 默认类型为 FlushType.NORMAL
	FlushType type = FlushType.NORMAL;

	// 假设是紧急刷新,跟是否在高水位线上来确定type,高水位线上为FlushType.ABOVE_HIGHER_MARK。低水位线上为FlushType.ABOVE_LOWER_MARK
	if (emergencyFlush) {
      type = isAboveHighWaterMark() ?

FlushType.ABOVE_HIGHER_MARK : FlushType.ABOVE_LOWER_MARK;
    }

	// 针对监听器逐个加入region、type
    for (FlushRequestListener listener : flushRequestListeners) {
      listener.flushRequested(type, region);
    }
  }

最后再说说这个flush结果FlushResult,它是HRegion中的一个静态内部类。包括一个Result枚举,当中包括的flush结果例如以下:

1、FLUSHED_NO_COMPACTION_NEEDED:flush成功,可是不须要运行compact;

2、FLUSHED_COMPACTION_NEEDED:flush成功,同一时候须要运行compact;

3、CANNOT_FLUSH_MEMSTORE_EMPTY:无法进行flush。由于MemStore为空;

4、CANNOT_FLUSH:无法进行flush。

推断flush是否成功。则就是看result是否为FLUSHED_NO_COMPACTION_NEEDED或FLUSHED_COMPACTION_NEEDED。推断是否须要进行compact,则就是看result是否为FLUSHED_COMPACTION_NEEDED。

相关代码例如以下:

    /**
     * Convenience method, the equivalent of checking if result is
     * FLUSHED_NO_COMPACTION_NEEDED or FLUSHED_NO_COMPACTION_NEEDED.
     * @return true if the memstores were flushed, else false.
     */
    public boolean isFlushSucceeded() {
      return result == Result.FLUSHED_NO_COMPACTION_NEEDED || result == Result
          .FLUSHED_COMPACTION_NEEDED;
    }

    /**
     * Convenience method, the equivalent of checking if result is FLUSHED_COMPACTION_NEEDED.
     * @return True if the flush requested a compaction, else false (doesn't even mean it flushed).
     */
    public boolean isCompactionNeeded() {
      return result == Result.FLUSHED_COMPACTION_NEEDED;
    }

至此,HRegionServer上MemStore的flush处理流程所有分析完成。末尾关于split、compact,兴许会有专门的文章进行介绍,敬请关注本人博客。谢谢!

时间: 2024-08-01 02:44:34

HBase源代码分析之HRegionServer上MemStore的flush处理流程(二)的相关文章

HBase源代码分析之HRegion上MemStore的flsuh流程(二)

继上篇<HBase源代码分析之HRegion上MemStore的flsuh流程(一)>之后.我们继续分析下HRegion上MemStore flush的核心方法internalFlushcache().它的主要流程如图所看到的: 当中.internalFlushcache()方法的代码例如以下: /** * Flush the memstore. Flushing the memstore is a little tricky. We have a lot of updates in the

HBase源代码分析之MemStore的flush发起时机、推断条件等详情(二)

在<HBase源代码分析之MemStore的flush发起时机.推断条件等详情>一文中,我们具体介绍了MemStore flush的发起时机.推断条件等详情.主要是两类操作.一是会引起MemStore数据大小变化的Put.Delete.Append.Increment等操作,二是会引起HRegion变化的诸如Regin的分裂.合并以及做快照时的复制拷贝等.相同会触发MemStore的flush流程.同一时候.在<HBase源代码分析之compact请求发起时机.推断条件等详情(一)>

HBase二次开发之搭建HBase调试环境,如何远程debug HBase源代码

版本 HDP:3.0.1.0 HBase:2.0.0 一.前言 之前的文章也提到过,最近工作中需要对HBase进行二次开发(参照HBase的AES加密方法,为HBase增加SMS4数据加密类型).研究了两天,终于将开发流程想清楚并搭建好了debug环境,所以就迫不及待地想写篇文章分享给大家. 二.思路 首先看到这个需求,肯定是需要先实现HBase配置AES加密<HBase配置AES加密>,或者还可以再继续了解实现SMS4加密算法<Java版SMS4加密解密算法>.等到这些都完成之后

【JUnit4.10源代码分析】0导航

JUnit是由GOF 之一的Erich Gamma和 Kent Beck 编写的一个开源的单元测试框架,yqj2065分析JUnit源代码的主要目的是 学习其中对设计模式的运用. JUnit也是一个学习Java编程. 学习框架设计 和研究如何应对版本升级和接口变化的案例. NetBeans IDE 7.4 (Build 201310111528) 的测试库为JUnit4.10,因而在前面对JUnit4.8.2源代码分析的基础上,yqj2065将采用较正规的方式介绍JUnit4.10源代码. 10

Spark SQL之External DataSource外部数据源(二)源代码分析

上周Spark1.2刚公布,周末在家没事,把这个特性给了解一下,顺便分析下源代码,看一看这个特性是怎样设计及实现的. /** Spark SQL源代码分析系列文章*/ (Ps: External DataSource使用篇地址:Spark SQL之External DataSource外部数据源(一)演示样例 http://blog.csdn.net/oopsoom/article/details/42061077) 一.Sources包核心 Spark SQL在Spark1.2中提供了Exte

转:RTMPDump源代码分析

0: 主要函数调用分析 rtmpdump 是一个用来处理 RTMP 流媒体的开源工具包,支持 rtmp://, rtmpt://, rtmpe://, rtmpte://, and rtmps://.也提供 Android 版本. 最近研究了一下它内部函数调用的关系. 下面列出几个主要的函数的调用关系. RTMPDump用于下载RTMP流媒体的函数Download: 用于建立网络连接(NetConnect)的函数Connect: 用于建立网络流(NetStream)的函数 rtmpdump源代码

Kafka SocketServer源代码分析

Kafka SocketServer源代码分析 标签: kafka 本文将详细分析Kafka SocketServer的相关源码. 总体设计 Kafka SocketServer是基于Java NIO来开发的,采用了Reactor的模式,其中包含了1个Acceptor负责接受客户端请求,N个Processor负责读写数据,M个Handler来处理业务逻辑.在Acceptor和Processor,Processor和Handler之间都有队列来缓冲请求. kafka.network.Accepto

pomelo源代码分析(一)

千里之行始于足下,一直说想了解pomelo,对pomelo有兴趣,但一直迟迟没有去碰,尽管对pomelo进行源代码分析,在网络上肯定不止我一个,已经有非常优秀的前辈走在前面,如http://golanger.cn/,在阅读Pomelo代码的时候,已经连载到了11篇了,在我的源代码分析參考了该博客,当然,也会添?我对pomelo的理解,借此希望能提高一下自己对node.js的了解和学习一些优秀的设计. 开发环境:win7 调试环境:webstorm5.0 node.js版本号:v0.8.21 源代

Jafka源代码分析——随笔

Kafka是一个分布式的消息中间件,可以粗略的将其划分为三部分:Producer.Broker和Consumer.其中,Producer负责产生消息并负责将消息发送给Kafka:Broker可以简单的理解为Kafka集群中的每一台机器,其负责完成消息队列的主要功能(接收消息.消息的持久化存储.为Consumer提供消息.消息清理.....):Consumer从Broker获取消息并进行后续的操作.每个broker会有一个ID标识,该标识由人工在配置文件中配置. Kafka中的消息隶属于topic