HBase源代码分析之MemStore的flush发起时机、推断条件等详情(二)

《HBase源代码分析之MemStore的flush发起时机、推断条件等详情》一文中,我们具体介绍了MemStore flush的发起时机、推断条件等详情。主要是两类操作。一是会引起MemStore数据大小变化的Put、Delete、Append、Increment等操作,二是会引起HRegion变化的诸如Regin的分裂、合并以及做快照时的复制拷贝等。相同会触发MemStore的flush流程。同一时候。在《HBase源代码分析之compact请求发起时机、推断条件等详情(一)》一文中,我们讲到了针对compact。在HRegionServer内部存在一个工作线程compactionChecker,它会周期性的工作。以检查是否达到可以发起compact请求的条件。那么,回过头来,我们再看MemStore
flush,它是不是也存在一个后台工作线程。可以周期性的工作,以检查是否达到可以发起flush请求的条件呢?本文,我们就之前《HBase源代码分析之MemStore的flush发起时机、推断条件等详情》一文,做一个关于MemStore flush后台检查线程等内容的补充。

在HRegionServer中,有一个和合并检查线程compactionChecker一样的Chore--periodicFlusher,它也是类似于compactionChecker的后台工作线程。它负责周期性的检查MemStore,查看是否达到发起MemStore flush的条件。其定义例如以下:

  /*
   * Check for flushes
   * 检查刷新请求
   */
  Chore periodicFlusher;

它也是一个继承自Chore的工作线程。关于Chore的介绍。在《HBase源代码分析之compact请求发起时机、推断条件等详情(一)》一文中我已经讲过了。这里不再做介绍。

而periodicFlusher的初始化。自然同compactionChecker一样,也是在HRegionServer的initializeThreads()方法中完毕的,代码例如以下:

    this.periodicFlusher = new PeriodicMemstoreFlusher(this.threadWakeFrequency, this);

非常easy,构造一个PeriodicMemstoreFlusher对象,并且其工作频率也是通过HRegionServer的threadWakeFrequency决定的。

那么这个periodicFlusher究竟是什么样的实现类,其工作原理是什么样子的呢?莫慌,让我为大家一一道来。

首先看下PeriodicMemstoreFlusher的定义、成员变量与构造方法。代码例如以下:

  static class PeriodicMemstoreFlusher extends Chore {
    final HRegionServer server;
    final static int RANGE_OF_DELAY = 20000; //millisec
    final static int MIN_DELAY_TIME = 3000; //millisec
    public PeriodicMemstoreFlusher(int cacheFlushInterval, final HRegionServer server) {
      // cacheFlushInterval为flush的时间间隔
      super(server.getServerName() + "-MemstoreFlusherChore", cacheFlushInterval, server);
      this.server = server;
    }
  }

通过成员变量和构造方法。我们能够看到,比較重要的就是线程中HRegionServer的实例server以及线程工作频率。

另外它还提供了两个定值MIN_DELAY_TIME与RANGE_OF_DELAY,有什么用呢。继续看它的chore()方法:

    @Override
    protected void chore() {
      // 循环HRegionSever上的onlineRegions
      for (HRegion r : this.server.onlineRegions.values()) {

    	// HRegion为null的话直接跳过
    	if (r == null)
          continue;

    	// 调用HRegion上的shouldFlush()方法,推断能否够进行flush
        if (r.shouldFlush()) {
          // 获取RegionServer上的MemStoreFlusher类型的memstore内存刷新管理对象
          FlushRequester requester = server.getFlushRequester();
          if (requester != null) {
        	// 随机延迟时间:20s内的一个随机时间+3s的基础时间
            long randomDelay = RandomUtils.nextInt(RANGE_OF_DELAY) + MIN_DELAY_TIME;
            LOG.info(getName() + " requesting flush for region " + r.getRegionNameAsString() +
                " after a delay of " + randomDelay);
            //Throttle the flushes by putting a delay. If we don't throttle, and there
            //is a balanced write-load on the regions in a table, we might end up
            //overwhelming the filesystem with too many flushes at once.
            // 通过设置一个延迟时间控制flush。防止Region上多个flush同一时间并发进行
            requester.requestDelayedFlush(r, randomDelay);
          }
        }
      }
    }
  }

通过chore()方法我们知道,periodicFlusher线程周期性的对HRegionServer上全部在线Region进行检測,调用其shouldFlush()方法进行检測,假设该Region须要flush memstore,获取RegionServer上的MemStoreFlusher类型的memstore内存刷新管理对象,发起flush请求。

须要注意的是。该flush请求携带一个固定加随机的延迟时间,其算法为:

long randomDelay = RandomUtils.nextInt(RANGE_OF_DELAY) + MIN_DELAY_TIME;

MIN_DELAY_TIME就是我们上面提到的固定值3秒,然后再加上一个20s内的一个随机数。为什么要这么做呢?试想下。假设马上提交一个flush请求,或者在3秒后马上提交一个flush请求,是不是非常easy就产生一个风暴。引起系统性能瓶颈呢?

关于怎样提交一个flush请求。前面的文章已经介绍过了,不再赘述。

这里我们介绍下HRegion的shouldFlush()方法,代码例如以下:

  /**
   * Should the memstore be flushed now
   * memstore如今是否应该被flush
   */
  boolean shouldFlush() {
    // This is a rough measure.
	// 这里是一个粗略的測量
	// 上次flush之后。sequenceId的增长超过flushPerChanges。即发起一次flush
	// 次数限制通过參数hbase.regionserver.flush.per.changes配置。默觉得30000000(3千万)
	// 也就是该Region上数据的修改次数,不管增、改、删等,超过一定的次数,即发起一次flush
	// 意味着会兼顾HRegion上的写请求及时flush到磁盘上
    if (this.lastFlushSeqId > 0
          && (this.lastFlushSeqId + this.flushPerChanges < this.sequenceId.get())) {
      return true;
    }

    // hbase.regionserver.optionalcacheflushinterval參数小于等于0,不会触发flush
    //
    if (flushCheckInterval <= 0) { //disabled
      return false;
    }
    long now = EnvironmentEdgeManager.currentTime();
    //if we flushed in the recent past, we don't need to do again now
    // 时间间隔未超过hbase.regionserver.optionalcacheflushinterval配置的时间间隔
    // 默觉得3600000ms,即1小时
    if ((now - getLastFlushTime() < flushCheckInterval)) {
      return false;
    }

    //since we didn't flush in the recent past, flush now if certain conditions
    //are met. Return true on first such memstore hit.
    // 检測每一个列簇,当当中一个列簇超过flushCheckInterval没有flush时。发起flush
    for (Store s : this.getStores().values()) {
      if (s.timeOfOldestEdit() < now - flushCheckInterval) {
        // we have an old enough edit in the memstore, flush
        return true;
      }
    }
    return false;
  }

推断的逻辑比較清晰,概括例如以下:

1、首先。上次flush之后。sequenceId的增长超过flushPerChanges。即发起一次flush:

次数限制flushPerChanges是通过參数hbase.regionserver.flush.per.changes配置,默觉得30000000(3千万),这个sequenceId的增长该Region上数据的修改次数,不管增、删、改或者append、increment等,它是对HRegion数据变动的一个考虑,即便是MemStore不大。数据变动的频繁了,也须要进行flush。以减少宕机后拆分日志的工作量;

2、再看參数hbase.regionserver.optionalcacheflushinterval:

參数小于等于0。不会触发flush,时间间隔未超过參数l配置的时间间隔的话,也不会触发flush。这个參数默觉得3600000ms,即1小时;

3、当超过參数配置的时间间隔。再检測每一个列簇,当当中一个列簇超过flushCheckInterval没有flush时。发起flush。也就是说它有足够久的数据没有被flush。

以上就是HRegionServer内部PeriodicMemstoreFlusher工作线程periodicFlusher的所有内容。

同一时候。在上面针对每一个HRegion的循环,以及后面针对每一个HStore的推断,我们能够发现,flush还是以Region为最小单位进行的。

即便是某个列簇下MemStore过大或者过旧,另外一个MemStore还比較小或者比較新的话,它还是跟着那个过大或者过旧的列簇一起flush。这也是HBase饱受诟病的列簇不能过多的原因之中的一个。

在HBase1.1.2版本号中,有对于MemStore
flush的改进,改成了以HStore,即列簇为单位进行。

此乃后话,我们以后再做分析。

时间: 2024-10-12 12:52:12

HBase源代码分析之MemStore的flush发起时机、推断条件等详情(二)的相关文章

HBase源代码分析之HRegionServer上MemStore的flush处理流程(二)

继上篇文章<HBase源代码分析之HRegionServer上MemStore的flush处理流程(一)>遗留的问题之后,本文我们接着研究HRegionServer上MemStore的flush处理流程.重点讲述下怎样选择一个HRegion进行flush以缓解MemStore压力,还有HRegion的flush是怎样发起的. 我们先来看下第一个问题:怎样选择一个HRegion进行flush以缓解MemStore压力.上文中我们讲到过flush处理线程假设从flushQueue队列中拉取出的一个

HBase源代码分析之HRegion上MemStore的flsuh流程(二)

继上篇<HBase源代码分析之HRegion上MemStore的flsuh流程(一)>之后.我们继续分析下HRegion上MemStore flush的核心方法internalFlushcache().它的主要流程如图所看到的: 当中.internalFlushcache()方法的代码例如以下: /** * Flush the memstore. Flushing the memstore is a little tricky. We have a lot of updates in the

区块链教程Fabric1.0源代码分析Peer peer根命令入口及加载子命令二

区块链教程Fabric1.0源代码分析Peer peer根命令入口及加载子命令二.flogging,即:fabric logging,为Fabric基于第三方包go-logging封装的日志包,go-logging使用方法参考:github.com/op/go-logging如下代码为flogging包的初始化函数: func init() { ????logger = logging.MustGetLogger(pkgLogID) //创建仅在flogging包内代码使用的logging.Lo

MemStore刷写线程—MemStoreFlusher源代码分析

在HBase中表由一个或多个Region组成,而Region由一个或者多个Store组成,Store又由一个MenStore和若干个StoreFile组成.无论是向HBase写入数据还是请求读数据,都首先经过MemStore,对于写请求来说就是将数据直接写入MemStore,对于读请求来说就是先检查MenStore中是否包含相应的数据,如果有则直接读取该数据,否则在StoreFile中检索并读取数据.当写入MemStore中的数据达到一定的数量时,就需要将其中的数据刷写到StoreFile中,这

Hadoop源代码分析

关键字: 分布式云计算 Google的核心竞争技术是它的计算平台.Google的大牛们用了下面5篇文章,介绍了它们的计算设施. GoogleCluster:http://research.google.com/archive/googlecluster.html Chubby:http://labs.google.com/papers/chubby.html GFS:http://labs.google.com/papers/gfs.html BigTable:http://labs.googl

转:RTMPDump源代码分析

0: 主要函数调用分析 rtmpdump 是一个用来处理 RTMP 流媒体的开源工具包,支持 rtmp://, rtmpt://, rtmpe://, rtmpte://, and rtmps://.也提供 Android 版本. 最近研究了一下它内部函数调用的关系. 下面列出几个主要的函数的调用关系. RTMPDump用于下载RTMP流媒体的函数Download: 用于建立网络连接(NetConnect)的函数Connect: 用于建立网络流(NetStream)的函数 rtmpdump源代码

Spark SQL之External DataSource外部数据源(二)源代码分析

上周Spark1.2刚公布,周末在家没事,把这个特性给了解一下,顺便分析下源代码,看一看这个特性是怎样设计及实现的. /** Spark SQL源代码分析系列文章*/ (Ps: External DataSource使用篇地址:Spark SQL之External DataSource外部数据源(一)演示样例 http://blog.csdn.net/oopsoom/article/details/42061077) 一.Sources包核心 Spark SQL在Spark1.2中提供了Exte

Jafka源代码分析——LogManager

在Kafka中,LogManager负责管理broker上所有的Log(每一个topic-partition为一个Log).通过阅读源代码可知其具体完成的功能如下: 1. 按照预设规则对消息队列进行清理. 2. 按照预设规则对消息队列进行持久化(flush操作). 3. 连接ZooKeeper进行broker.topic.partition相关的ZooKeeper操作. 4. 管理broker上所有的Log. 下面一一对这些功能的实现进行详细的解析. 一.对于Log的管理 LogManager包

Hadoop源代码分析(MapTask辅助类 I)

Hadoop源代码分析(MapTask辅助类 I)MapTask的辅劣类主要针对Mapper的输入和输出.首先我们来看MapTask中用的的Mapper输入,在类图中,返部分位于右上角.MapTask.TrackedRecordReader是一个Wrapper,在原有输入RecordReader的基础上,添加了收集上报统计数据的功能.MapTask.SkippingRecordReader也是一个Wrapper,它在MapTask.TrackedRecordReader的基础上,添加了忽略部分输