HRegionServer Flush操作源码分析

Flush操作是将HBase中的数据存到硬盘上的过程,具体的flush的流程图如下,本文主要简要分析flush的过程相关源码。

Flush 任务提交

每当HRegion完成数据插入的操作的时候,就会进行检查此时是否需要进行一次flush,flush是将HRegion缓存的数据存储到磁盘的过程:

long addedSize = doMiniBatchMutation(batchOp);
long newSize = this.addAndGetGlobalMemstoreSize(addedSize);
if (isFlushSize(newSize)) {
    requestFlush();
}

本文主要分析flush的过程以及涉及到得相关数据结构,在requestFlush内部调用:

this.rsServices.getFlushRequester().requestFlush(this);实际是调用了MemStoreFlusher具体执行flush的操作:

  public void requestFlush(HRegion r) {
    synchronized (regionsInQueue) {
      if (!regionsInQueue.containsKey(r)) {
        // This entry has no delay so it will be added at the top of the flush
        // queue.  It‘ll come out near immediately.
        FlushRegionEntry fqe = new FlushRegionEntry(r);
        this.regionsInQueue.put(r, fqe);
        this.flushQueue.add(fqe);
      }
    }
  }

MemStoreFlushRequeter有两个数据结构管理者需要flush的任务,private BlockingQueue<FlushQueueEntry>flushQueue Map<HRegion, FlushRegionEntry> regionsInQueue flushQueue相当于需要flush的工作队列,而regionsInQueue则是于保存已经在队列中的region的信息,上面的一段代码表示当当前请求flush的region没有记录在flushQueue中的时候则加入。其中FlushRegionEntry是一个flushQueue的单元数据结构

到这里flush request的请求就提交结束了,接下来等待MemStore中的FlushHander线程取出region并执行flush的任务。

Flush的任务执行前期准备

1.FlushHandler从flushQueue中取出FlushRegionEntry并执行

flushRegion(final FlushRegionEntry fqe)

这里首先判断当前region中是否含有过多的storefile的文件,如果是的话,需要首先进行storefile的合并操作(这里有必要解释一下HRegion中的数据组织),然后重新加入队列,否则的话直接对region执行flush操作:

isTooManyStoreFiles(region)
this.server.compactSplitThread.requestSystemCompaction(
                  region, Thread.currentThread().getName());
                        this.flushQueue.add(fqe.requeue(this.blockingWaitTime / 100));
else
return flushRegion(region, false);

2.flushRegion函数内部的主要执行逻辑如下,首先notifyFlushRequest只是进行一些flush线程数量的统计,region.flashcache具体负责flush的工作。执行完之后会根据返回值进行相关的辅助操作

 notifyFlushRequest(region, emergencyFlush);
 HRegion.FlushResult flushResult = region.flushcache();
 boolean shouldCompact = flushResult.isCompactionNeeded();
      // We just want to check the size
 boolean shouldSplit = region.checkSplit() != null;
 if (shouldSplit) {
    this.server.compactSplitThread.requestSplit(region);
 } else if (shouldCompact) {
     server.compactSplitThread.requestSystemCompaction(
            region, Thread.currentThread().getName());
 }
if (flushResult.isFlushSucceeded()) {
   long endTime = EnvironmentEdgeManager.currentTime();
   server.metricsRegionServer.updateFlushTime(endTime - startTime);
}

Flush的任务执行过程

flushcahe内部调用 FlushResult fs = internalFlushcache(status);实际执行flush操作,StoreFlushContext的实现为StoreFlusherImpl,为每个HStore建一个StoreFlusherImpl,它为对应的HStore执行着具体非flush的操作。flush的具体实现包括三个步骤:

1.快照

 public void prepare() {
      this.snapshot = memstore.snapshot();
      this.cacheFlushCount = snapshot.getCellsCount();
      this.cacheFlushSize = snapshot.getSize();
      committedFiles = new ArrayList<Path>(1);
    }

2.将memestore中的数据写入.tmp文件中

   public void flushCache(MonitoredTask status) throws IOException {
      tempFiles = HStore.this.flushCache(cacheFlushSeqNum, snapshot, status);
    }

3.将.tmp文件写入对应的cf下面的对应的文件中去,并用StoreFile保存相应的HFile的文件信息

    public boolean commit(MonitoredTask status) throws IOException {
      if (this.tempFiles == null || this.tempFiles.isEmpty()) {
        return false;
      }
      List<StoreFile> storeFiles = new ArrayList<StoreFile>(this.tempFiles.size());
      for (Path storeFilePath : tempFiles) {
        try {
          storeFiles.add(HStore.this.commitFile(storeFilePath, cacheFlushSeqNum, status));
        } catch (IOException ex) {
          LOG.error("Failed to commit store file " + storeFilePath, ex);
          // Try to delete the files we have committed before.
          for (StoreFile sf : storeFiles) {
            Path pathToDelete = sf.getPath();
            try {
              sf.deleteReader();
            } catch (IOException deleteEx) {
              LOG.fatal("Failed to delete store file we committed, halting " + pathToDelete, ex);
              Runtime.getRuntime().halt(1);
            }
          }
          throw new IOException("Failed to commit the flush", ex);
        }
      }

      for (StoreFile sf : storeFiles) {
        if (HStore.this.getCoprocessorHost() != null) {
          HStore.this.getCoprocessorHost().postFlush(HStore.this, sf);
        }
        committedFiles.add(sf.getPath());
      }

      HStore.this.flushedCellsCount += cacheFlushCount;
      HStore.this.flushedCellsSize += cacheFlushSize;

      // Add new file to store files.  Clear snapshot too while we have the Store write lock.
      return HStore.this.updateStorefiles(storeFiles, snapshot.getId());
    }

至此HBase的flush的操作过程就完成了。

时间: 2024-10-10 06:28:08

HRegionServer Flush操作源码分析的相关文章

Django rest framework 权限操作(源码分析二)

知识回顾 这一篇是基于上一篇写的,上一篇谢了认证的具体流程,看懂了上一篇这一篇才能看懂, 当用户访问是 首先执行dispatch函数,当执行当第二部时: #2.处理版本信息 处理认证信息 处理权限信息 对用户的访问频率进行限制 self.initial(request, *args, **kwargs) 进入到initial方法: def initial(self, request, *args, **kwargs): """ Runs anything that needs

HBase1.0.0源码分析之请求处理流程分析以Put操作为例(一)

如下面的代码所示,是HBase Put操作的简单代码实例,关于代码中的Connection connection = ConnectionFactory.createConnection(conf),已近在前一篇博 HBase1.0.0源码分析之Client启动连接流程,中介绍了链接的相关流程以及所启动的服务信息. TableName tn = TableName.valueOf("test010"); try (Connection connection = ConnectionFa

Linq特取操作之ElementAt,Single,Last,First源码分析

Linq特取操作之ElementAt,Single,Last,First源码分析 一:linq的特取操作 First/FirstOrDefault, Last/LastOrDefault, ElementAt/ElementAtOrDefault, Single/SingleOrDefault 二:First/FirstOrDefault 介绍 解释: 用于返回序列中的第一个值 异常: 如果当前集合没有值的话,如果你取第一个值,会抛出throw Error.NoElements();异常. pu

HBase1.0.0源码分析之请求处理流程分析以Put操作为例(二)

HBase1.0.0源码分析之请求处理流程分析以Put操作为例(二) 1.通过mutate(put)操作,将单个put操作添加到缓冲操作中,这些缓冲操作其实就是Put的父类的一个List的集合.如下: private List<Row> writeAsyncBuffer = new LinkedList<>(); writeAsyncBuffer.add(m); 当writeAsyncBuffer满了之后或者是人为的调用backgroundFlushCommits操作促使缓冲池中的

storm操作zookeeper源码分析-cluster.clj

storm操作zookeeper的主要函数都定义在命名空间backtype.storm.cluster中(即cluster.clj文件中).backtype.storm.cluster定义了两个重要protocol:ClusterState和StormClusterState.clojure中的protocol可以看成java中的接口,封装了一组方法.ClusterState协议中封装了一组与zookeeper进行交互的基础函数,如获取子节点函数,获取子节点数据函数等,ClusterState协

jQuery源码分析系列(38) : 队列操作

Queue队列,如同data数据缓存与Deferred异步模型一样,都是jQuery库的内部实现的基础设施 Queue队列是animate动画依赖的基础设施,整个jQuery中队列仅供给动画使用 Queue队列 队列是一种特殊的线性表,只允许在表的前端(队头)进行删除操作(出队),在表的后端(队尾)进行插入操作(入队).队列的特点是先进先出(FIFO-first in first out),即最先插入的元素最先被删除. 为什么要引入队列? 我们知道代码的执行流有异步与同步之分,例如 var a

集合的操作出现的ConcurrentModificationException(源码分析)

摘要: 为了保证线程安全,在迭代器迭代的过程中,线程是不能对集合本身进行操作(修改,删除,增加)的,否则会抛出ConcurrentModificationException的异常. 示例: 1 public static void main(String[] args) { 2 Collection num = new ArrayList<String>(); 3 num.add("One"); 4 num.add("Two"); 5 num.add(&

Netty源码分析第2章(NioEventLoop)----&gt;第6节: 执行selector操作

Netty源码分析第二章: NioEventLoop 第六节: 执行select操作 分析完了selector的创建和优化的过程, 这一小节分析select相关操作 跟到跟到NioEventLoop的run方法: protected void run() { for (;;) { try { switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) { case SelectStrategy.CONTINUE

jQuery 源码分析(十三) 数据操作模块 DOM属性 详解

jQuery的属性操作模块总共有4个部分,本篇说一下第2个部分:DOM属性部分,用于修改DOM元素的属性的(属性和特性是不一样的,一般将property翻译为属性,attribute翻译为特性) DOM属性的静态方法接口如下: prop(elem, name, value)    ;设置或读取DOM属性,有两种用法,如下 ·$.prop(elem,name,value)      ;传入第三个参数表示设置elem元素的name属性值为value ·$.prop(elem,name,)