MemStore刷写线程—MemStoreFlusher源代码分析

在HBase中表由一个或多个Region组成,而Region由一个或者多个Store组成,Store又由一个MenStore和若干个StoreFile组成。无论是向HBase写入数据还是请求读数据,都首先经过MemStore,对于写请求来说就是将数据直接写入MemStore,对于读请求来说就是先检查MenStore中是否包含相应的数据,如果有则直接读取该数据,否则在StoreFile中检索并读取数据。当写入MemStore中的数据达到一定的数量时,就需要将其中的数据刷写到StoreFile中,这是由后台线程自动完成的,负责该任务的主要Java类是MemStoreFlusher,本篇文章将结合该类的源代码学习HBase如何决定是否刷写数据到StoreFile中的,而不关注MemStoreFlusher如何被实例化及启动的(相应的代码位于HRegionServer.java中)以及写入StoreFile的代码,而仅仅关注HBase在满足什么条件的情况将触发刷写。

官方文档将MenStoreFlusher解释为一个线程,而实际上该类既没有继承自Thread也没有实现Runnable接口,但定义了FlushHandler内部类,该类继承自HasThread,而HasThread实现了Runnable接口,在稍后的学习中将会看到FlushHandler就是负责刷写数据到StoreFile中的线程,并会根据参数hbase.hstore.flusher.count的值(默认为2)决定启动该线程的数量。该参数具有比较重要的意义,如果该值较小,则会导致刷写队列中包含过多的待刷写的Region,而如果该值较大的话,则会有较多并行执行刷写的线程,则会增加HDFS的负担,进而引起HBase更加频繁地执行Compaction。在MenStoreFlusher中刷写队列是由下面的数据结构定义的:

private final BlockingQueue<FlushQueueEntry> flushQueue = new DelayQueue<FlushQueueEntry>();
private final Map<HRegion, FlushRegionEntry> regionsInQueue = new HashMap<HRegion, FlushRegionEntry>();

这两个数据结构必须一起使用,若某个FlushQueueEntry在二者中的一个,则在另一个中也必须能够找到该FlushQueueEntry。注意FlushRegionEntry实现了FlushQueueEntry接口,而后者继承自java.util.concurrent.Delayed接口。另一个实现了FlushQueueEntry的类为WakeupFlushThread类,该类的主要用作占位符插入到刷写队列中以确保刷写线程不会休眠。FlushRegionEntry类保存了请求刷写的Region、重试次数以及在刷写队列中存在的时间。刷写队列flushQueue的类型为DelayQueue,保存在该队列中的对象必须实现了Delayed接口,且位于该队列中的对象只有在其延迟过期后才可以被取出,位于队列头部的是延迟过期最长的对象,如果没有延迟过期,队列没有头部,poll操作将返回null。当调用对象的getDelay(TimeUnit.NANOSECONDS)方法返回值小于等于0时,该对象的延迟过期。由于DelayQueue实现了BlockingQueue接口,该接口是线程安全的,因此该队列也就支持BlockingQueue具有的阻塞入列和出列,即当队列为空时,将等待队列直到队列不为空再提取对象,当队列为满时,将等待队列有空闲位置时再插入对象。FlushRegionEntry的该方法实现如下:

@Override
 public long getDelay(TimeUnit unit) {
      return unit.convert(this.whenToExpire - EnvironmentEdgeManager.currentTime(), TimeUnit.MILLISECONDS);
 }

FlushRegionEntry除了实现getDelay方法外,还定义了requeue()方法:

 public FlushRegionEntry requeue(final long when) {
      this.whenToExpire = EnvironmentEdgeManager.currentTime() + when;
      this.requeueCount++;
      return this;
 }

稍后会介绍为什么会使用DelayQueue这样的数据结构定义刷写队列,现在继续看MemStoreFlusher的源代码。在MemStoreFlusher的构造函数中,读取hbase-site.xml中设置的与刷写相关的参数并赋值给相应的变量,并实例化了刷写线程FlushHandler:

public MemStoreFlusher(final Configuration conf, final HRegionServer server) {
    super();
    this.server = server;
    //hbase.server.thread.wakefrequency,默认值10s
    this.threadWakeFrequency =conf.getLong(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
    long max = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getMax();
    //hbase.regionserver.global.memstore.size的值,默认为0.4,且该值必须小于等于0.8,大于0,即(0.0,0.8]
    float globalMemStorePercent = HeapMemorySizeUtil.getGlobalMemStorePercent(conf, true);
    this.globalMemStoreLimit = (long) (max * globalMemStorePercent);
    //hbase.regionserver.global.memstore.size.lower.limit
    this.globalMemStoreLimitLowMarkPercent =  HeapMemorySizeUtil.getGlobalMemStoreLowerMark(conf,globalMemStorePercent);
    //RS Xmx * hbase.regionserver.global.memstore.size * hbase.regionserver.global.memstore.size.lower.limit
    this.globalMemStoreLimitLowMark =  (long) (this.globalMemStoreLimit * this.globalMemStoreLimitLowMarkPercent);
    //默认值为90s,如果任何一个Store总的StoreFile数量大于hbase.hstore.blockingStoreFiles值,HRegion将阻塞更新直到该参数
    //设置的时间到达或者Compaction完成。在该参数设置的时间到期后,即使Compaction没有完成,HRegion也将不再阻塞更新
    this.blockingWaitTime = conf.getInt("hbase.hstore.blockingWaitTime", 90000);
    int handlerCount = conf.getInt("hbase.hstore.flusher.count", 2);
    this.flushHandlers = new FlushHandler[handlerCount];
  }

MemStoreFlusher定义了三个刷写方法,分别为:flushOneForGlobalPressure、flushRegion(final FlushRegionEntry fqe)和flushRegion(finalHRegion region, final boolean emergencyFlush)其中前两个方法最终调用了最后一个方法,也就是只有最后一个方法执行了实际的刷写操作,前两个方法都在FlushHandler刷写线程中调用,分别用于当RegionServer中的所有MemStore的大小超过了RS Xmx *hbase.regionserver.global.memstore.size *hbase.regionserver.global.memstore.size.lower.limit进行的刷写和当某个Region需要执行刷写操作。

首先看看flushOneForGlobalPressure方法的作用,从该方法的名称就可以看出,该方法用于缓解全局MemStore缓存的压力,在FlushHandler刷写线程的run方法中,当刷写队列中没有要求立刻执行刷写的Region时,刷写线程会判断所有MemStore的大小是否大参数设置的阈值,如果是,则执行该方法。在该方法中,刷写是按照Region的大小进行的,但不一定对最大的Region执行刷写,还需要进一步判断。判断主要是由方法getBiggestMemstoreRegion(SortedMap<Long, HRegion> regionsBySize, Set<HRegion>excludedRegions, boolean checkStoreFileCount)完成的,该方法的第三个参数决定是否检查当前Region的StoreFile文件数量(决定是否执行Compact操作),如果当前Region的StoreFile数量超过了参数hbase.hstore.blockingStoreFiles设置的值(默认值为10),且checkStoreFileCount的值为true,则当前Region不执行刷写,其它不执行刷写的情况还包括当前Region正在执行刷写或者当前Region不可写。在flushOneForGlobalPressure中,对getBiggestMemstoreRegion连续调用两次,一次检查StoreFile的数量,一次不检查,这样的话,不检查StoreFile数量的调用返回的Region的大小总是大于等于检查StoreFile数量的调用返回的Region,在源代码中,前者声明为bestAnyRegion,后者声明为bestFlushableRegion。如果当前Region包含有过多的StoreFile,则bestFlushableRegion的大小小于当前Region,否则bestAnyRegion和bestFlushableRegion都为当前Region,因而二者也相等。如果bestFlushableRegion不为null(bestAnyRegion肯定不为null),且拥有过多StoreFile的bestAnyRegion的大小比bestFlushableRegion大2倍多,则选择bestAnyRegion为要刷写的Region,这是因为如果选择较小的bestFlushableRegion为要刷写的Region则会导致产生较多较小的StoreFile,从而引起Compaction。如果bestFlushableRegion为null,则bestAnyRegion为要刷写的Region,如果bestFlushableRegion且其大小大于等于bestAnyRegion的一半,则bestFlushableRegion为要刷写的Region。在确定了要执行刷写的Region后,将会调用flushRegion(regionToFlush, true)方法执行刷写操作。如果刷写不成功,则重复上述的步骤,并将当前执行刷写的Region排除在判断挑选之外。

在刷写线程中调用的另一个刷写方法是flushRegion(final FlushRegionEntry fqe),该方法用于检查指定的Region是否有过多的StoreFile,如果否的话则调用flushRegion(finalHRegion region, final boolean emergencyFlush)执行刷写,否则的话再根据该Region是否等待了足够长的时间来决定执行刷写还是重新入列。这里足够长的时间指的是参数hbase.hstore.blockingWaitTime设置的值(默认为90s),该参数用于设置阻塞Region更新的时间,当Region的StoreFile数量超过了参数hbase.hstore.blockingStoreFiles设置的值,Region将阻塞更新而执行Compaction操作,当Compaction完成或者阻塞的时间超过了hbase.hstore.blockingWaitTime设置的值,Region将不再阻塞更新。如果该Region等待了足够的时间,即使该Region拥有过多的StoreFile,也将会对该Region执行刷写操作。如果该Region还没有等待hbase.hstore.blockingWaitTime设置的值,则先判断该Region重新入列的次数,如果该Region是第一次从刷写队列中取出(即该队列没有重复入列),再判断该Region是否需要执行split操作,如果不需要执行split操作,则请求执行Compaction(由于存在太多StoreFile)。对于没有等待足够长时间的Region,将该队列重新入列(此时重新入列的次数将会加1),并设置延迟时间为百分之一hbase.hstore.blockingWaitTime的值,这样该Region至少要在刷写队列中等待百分之一hbase.hstore.blockingWaitTime才能够被再次取出,进而给合并线程时间来完成对StoreFile的合并,当该Region再次有机会执行刷写时,就有可能StoreFile的数量降低到hbase.hstore.blockingStoreFiles之下,进而可刷写数据到StoreFile中。从对该方法的描述中,可以知道为什么使用DelayQueue做为刷写队列的实现,就是为了让刷写队列中的Region必须等待设置的时间,以让合并线程有时间完成合并StoreFile的任务。

上述两个方法都没有实现实际的刷写任务,而将刷写任务交给了flushRegion(final HRegion region, finalboolean emergencyFlush)方法。该方法的第二个参数的值由调用者决定,在上述的第一个方法中为true(因为所有MemStore的大小已经超过了设置的阈值),第二个方法中为false(因为仅仅是某个Region需要执行刷写操作)。如果第二个方法的参数为true,则将该Region从刷写队列中移除,如果为false,则DelayQueue的poll方法移除该Region。在该方法中调用了HRegion的flushcache方法(此处不关心具体是如何刷写数据到StoreFile,仅关注MemStoreFlusher中的实现,后续会详细研究flushcache的具体实现),flushcache的返回结果为HRegion.FlushResult,可能的值为:FLUSHED_NO_COMPACTION_NEEDED、FLUSHED_COMPACTION_NEEDED、CANNOT_FLUSH_MEMSTORE_EMPTY、CANNOT_FLUSH,接着根据返回结果判断是否要执行Compaction和Split,只能执行二者中的一种操作,无论Compaction还是Split都是由CompactSplitThread类完成的。当刷写成功完成后,将会记录刷写的完成时间。

在MemStoreFlusher中还有一个方法用于阻塞Region的更新,该方法就是reclaimMemStoreMemory。该方法将检查当前RegionServer中所有MemStore的大小是否超过了参数hbase.regionserver.global.memstore.size设置的值,如果超过了该值,则会阻塞更新5秒钟,当但并不影响刷写线程的执行。由于当所有MemStore的值高于MemStore的最高水位时自然大于MemStore的低水位,所以在刷写线程的run方法中同样会调用flushOneForGlobalPressure方法以选择合适的Region并对其所拥有的MemStore进行刷写。每当对Region进行修改操作时都将调用reclaimMemStoreMemory方法,这将导致检查所有MemStore的大小,并在条件满足时阻塞更新操作。

总结,本篇文章仅基于MemStoreFlusher的源代码及少量其他类的源代码对刷写线程及所涉及的方法进行了详细学习,仅包含了部分触发刷写MemStore的原因,比如当所有的MemStore的大小超过低水位或者最高水位的情况,以及对单个Region执行刷写的情况,对后者有一点需要补充的时,按照Hbase Reference Guide及hbase-default.xml的说明,当Region的某个MemStore大于hbase.hregion.memstore.flush.size的值时,会对该Region执行刷写,而从HRegion的源代码中发现是当前Region中所有的MemStore的大小超过了该参数的时请求刷写该Region,这一点还需要再进一步阅读相关源代码来确定真实情况。

时间: 2024-10-20 07:33:55

MemStore刷写线程—MemStoreFlusher源代码分析的相关文章

HBase源代码分析之MemStore的flush发起时机、推断条件等详情(二)

在<HBase源代码分析之MemStore的flush发起时机.推断条件等详情>一文中,我们具体介绍了MemStore flush的发起时机.推断条件等详情.主要是两类操作.一是会引起MemStore数据大小变化的Put.Delete.Append.Increment等操作,二是会引起HRegion变化的诸如Regin的分裂.合并以及做快照时的复制拷贝等.相同会触发MemStore的flush流程.同一时候.在<HBase源代码分析之compact请求发起时机.推断条件等详情(一)>

HBase源代码分析之HRegion上MemStore的flsuh流程(二)

继上篇<HBase源代码分析之HRegion上MemStore的flsuh流程(一)>之后.我们继续分析下HRegion上MemStore flush的核心方法internalFlushcache().它的主要流程如图所看到的: 当中.internalFlushcache()方法的代码例如以下: /** * Flush the memstore. Flushing the memstore is a little tricky. We have a lot of updates in the

HBase源代码分析之HRegionServer上MemStore的flush处理流程(二)

继上篇文章<HBase源代码分析之HRegionServer上MemStore的flush处理流程(一)>遗留的问题之后,本文我们接着研究HRegionServer上MemStore的flush处理流程.重点讲述下怎样选择一个HRegion进行flush以缓解MemStore压力,还有HRegion的flush是怎样发起的. 我们先来看下第一个问题:怎样选择一个HRegion进行flush以缓解MemStore压力.上文中我们讲到过flush处理线程假设从flushQueue队列中拉取出的一个

从源代码分析Universal-Image-Loader中的线程池

一般来讲一个网络访问就需要App创建一个线程来执行,但是这也导致了当网络访问比较多的情况下,线程的数目可能积聚增多,虽然Android系统理论上说可以创建无数个线程,但是某一时间段,线程数的急剧增加可能导致系统OOM.在UIL中引入了线程池这种技术来管理线程.合理利用线程池能够带来三个好处.第一:降低资源消耗.通过重复利用已创建的线程降低线程创建和销毁造成的消耗.第二:提高响应速度.当任务到达时,任务可以不需要等到线程创建就能立即执行.第三:提高线程的可管理性.线程是稀缺资源,如果无限制的创建,

Kafka SocketServer源代码分析

Kafka SocketServer源代码分析 标签: kafka 本文将详细分析Kafka SocketServer的相关源码. 总体设计 Kafka SocketServer是基于Java NIO来开发的,采用了Reactor的模式,其中包含了1个Acceptor负责接受客户端请求,N个Processor负责读写数据,M个Handler来处理业务逻辑.在Acceptor和Processor,Processor和Handler之间都有队列来缓冲请求. kafka.network.Accepto

UiAutomator喷射事件的源代码分析

上一篇文章<UiAutomator源代码分析之UiAutomatorBridge框架>中我们把UiAutomatorBridge以及它相关的类进行的描写叙述,往下我们会尝试依据两个实例将这些类给串联起来,我准备做的是用例如以下两个非常有代表性的实例: 注入事件 获取控件 这一篇文章我们会通过分析UiDevice的pressHome这种方法来分析UiAutomator是怎样注入事件的,下一篇文章会描写叙述怎样获取控件,敬请期待. 1. UiObject.pressHome顺序图 首先我们看一下我

Jafka源代码分析——LogManager

在Kafka中,LogManager负责管理broker上所有的Log(每一个topic-partition为一个Log).通过阅读源代码可知其具体完成的功能如下: 1. 按照预设规则对消息队列进行清理. 2. 按照预设规则对消息队列进行持久化(flush操作). 3. 连接ZooKeeper进行broker.topic.partition相关的ZooKeeper操作. 4. 管理broker上所有的Log. 下面一一对这些功能的实现进行详细的解析. 一.对于Log的管理 LogManager包

Android 消息处理源代码分析(2)

Android 消息处理源代码分析(1)点击打开链接 继续接着分析剩下的类文件 Looper.java public final class Looper { final MessageQueue mQueue; //消息队列 final Thread mThread; //Looper联系的线程 public static void prepare() { prepare(true); } private static void prepare(boolean quitAllowed) { /

Android应用Activity、Dialog、PopWindow、Toast窗体加入机制及源代码分析

[工匠若水 http://blog.csdn.net/yanbober 转载烦请注明出处.尊重劳动成果] 1 背景 之所以写这一篇博客的原因是由于之前有写过一篇<Android应用setContentView与LayoutInflater载入解析机制源代码分析>.然后有人在文章以下评论和微博私信中问我关于Android应用Activity.Dialog.PopWindow载入显示机制是咋回事,所以我就写一篇文章来分析分析吧(本文以Android5.1.1 (API 22)源代码为基础分析),以