hbase(0.94) get、scan源码分析

简介

本文是需要用到hbase timestamp性质时研究源码所写.内容有一定侧重.且个人理解不算深入,如有错误请不吝指出.

如何看源码

hbase依赖很重,没有独立的client包.所以目前如果在maven中指定如下:

<dependency>
    <groupId>org.apache.hbase</groupId>
    <artifactId>hbase</artifactId>
    <version>0.94-adh3u9.9</version>
    <exclusions>
        <exclusion>
            <groupId>org.jruby</groupId>
            <artifactId>jruby-complete</artifactId>
        </exclusion>
        <exclusion>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
        </exclusion>
    </exclusions>
</dependency>

可以看到其会把整个hbase的源码都下载下来.这一点在查看源码上是比较方便的.

入口

本文以get为例.代码的入口位于org.apache.hadoop.hbase.client.HTable的get方法:

  public Result get(final Get get) throws IOException {
    try {
    startTrace(get);
    ServerCallable<Result> serverCallable = new ServerCallable<Result>(
        connection, tableName, get.getRow(), operationTimeout) {
      public Result call() throws IOException {
        // rpc调用服务端进行查询
        return server.get(location.getRegionInfo().getRegionName(), get);
      }
    };
    return executeServerCallable(serverCallable);
    } finally {
      endTrace(TableOperationMetricType.GET, 1);
    }
  }

其中在实际执行server.get时,会通过反射调用一个rpc接口真正和服务器进行沟通.所以如果把断点打在这个函数的里面,会发现无法断住.

路由

这里其实有一个很重要的问题.就是在执行get的时候.用户只会传入rowkey等信息,这里hbase是如何根据rowkey确认该数据所在region的.由上述代码可见location.getRegionInfo().getRegionName()即获取到了regionname.这一块的细节逻辑未深入研究.

regionserver逻辑

  public Result get(byte[] regionName, Get get) throws IOException {
    checkOpen();
    final long startTime = System.nanoTime();
    ReadMetricsData metricsData = null;
    try {
      // 确认当前请求真正的region
      HRegion region = getRegion(regionName);
      checkReadEnabled(region.getTableDesc().getNameAsString());
      HBaseServer.setRegionInfoForCurCall(region.getRegionInfo()
          .getTableNameAsString(), region.getRegionInfo().getEncodedName());
      if (!region.getRegionInfo().isMetaTable()) {
        metricsData = new ReadMetricsData();
        ReadMetricsData.setCurReadMetricsData(metricsData);
      }
      // 真正执行查询,第二个参数是一个递增的整数.用于实现mvcc,即多版本并发控制
      // 简单描述就是查询时会依靠这个数字来确定读取的数据版本,避免出现读取put
      // 多个列、列族时,get到其插入一半的数据.
      Result r = region.get(get, getLockFromId(get.getLockId()));
      if (get.isSupportEncodingResult()) {
        r.setKVEncoding(KVEncoding.FASTPREFIX);
      }
      int dataLen = (int) r.getWritableSize();
      long took = System.nanoTime() - startTime;
      this.addReadMetricsCount(region, dataLen,
          HBaseServer.getRemoteAddress(), 1, (int) (took / MS_CONVERTTO_NS));
      if (metricsData != null) {
        this.metrics.getLatencies.updateReadMetricsData(metricsData, took);
      }
      return r;
    } catch (Throwable t) {
      this.metrics.failedReadRequests.inc();
      throw convertThrowableToIOE(cleanup(t));
    } finally {
      ReadMetricsData.setCurReadMetricsData(null);
    }
  }

region逻辑

上文代码最终会调用:HRegion.get(Get get, boolean withCoprocessor)方法.

  private List<KeyValue> get(Get get, boolean withCoprocessor)
  throws IOException {
    long now = EnvironmentEdgeManager.currentTimeMillis();

    List<KeyValue> results = new ArrayList<KeyValue>();

    // pre-get CP hook
    // Coprocessor 为hbase中的协处理器概念.其可由clinet发往hbaseserver,在执行put、get前后
    // 执行一些逻辑.如进行简单运算、筛选.
    if (withCoprocessor && (coprocessors != null)) {
       if (coprocessors.preGet(get, results)) {
         return results;
       }
    }

    // 注意这里.所有的get都会转化为一个scan
    Scan scan = new Scan(get);

    RegionScanner scanner = null;
    try {
      // 构造scanner
      scanner = getScanner(scan);
      // 从scanner中取出数据放入results
      scanner.next(results);
    } finally {
      if (scanner != null)
        scanner.close();
    }

    // post-get CP hook
    // 协处理器的后置处理调用
    if (withCoprocessor && (coprocessors != null)) {
      coprocessors.postGet(get, results);
    }

    // do after lock
    final long after = EnvironmentEdgeManager.currentTimeMillis();
    this.opMetrics.updateGetMetrics(get.familySet(), after - now);

    return results;
  }

对于Scan scan = new Scan(get);,在scan的构造方法中可见:

  public Scan(Get get) {
    this.startRow = get.getRow();
    this.stopRow = get.getRow();
    this.filter = get.getFilter();
    this.cacheBlocks = get.getCacheBlocks();
    this.maxVersions = get.getMaxVersions();
    this.tr = get.getTimeRange();
    this.familyMap = get.getFamilyMap();
  }

也就是对于一个get.实际上是把其当做一个scan进行查询的.所以这里可以推断出.一个get和一个startrow、stoprow均相同的scan,在执行效率上是不会有差异的.

其次是关于timerange这一项.hbase中一个get、scan.可以设置timerange或者timestamp.其中timerange是指只查询某个时间范围的数据.而timestamp是指只查询某个时间点的数据.

而如果不设置,则会默认查询所有数据.这一块的逻辑实现就在get、scan的构造方法和set方法中.

  • 如果用户没有setTimeStamp、setTimeRange

    // get、scan均会调用默认无参数构造方法构造其tr.
    private TimeRange tr = new TimeRange();
  • 如果用户进行了设置
      public Get setTimeRange(long minStamp, long maxStamp)
      throws IOException {
        tr = new TimeRange(minStamp, maxStamp);
        return this;
      }
    
      public Get setTimeStamp(long timestamp) {
        try {
          tr = new TimeRange(timestamp, timestamp+1);
        } catch(IOException e) {
          // Will never happen
        }
        return this;
      }

可见timestamp是一种特殊的timerange,其构造方法为[timestamp,timestamp+1)的range.

scanner存在的意义

hbase的scanner并非为了使用设计模式而强行加入一个scanner做数据查询.这里的scanner的必要性主要在于其逻辑、物理存储特性.这里简单描述就是,在hbase中一个region是由多个store的.每个store才是真正的存储的逻辑最小单元.而一个store里面又有一个memstore(内存),零个或多个storefile(一般是位于HDFS的HFile文件).

由此,有个很明显的问题:在用户的一次查询中,用户的输入是一个rowkey,而这个rowkey的不同列族的数据,可能在不同的store中.而即使确定了一个store,可能数据在memstore中(尚未flush到硬盘),也有可能已经在storefile中了.

进一步也就需要一个机制从这些不同的逻辑、物理的存储媒介中遍历查询数据并且做一个合并.

scanner 的构造

暂无

scanner的遍历逻辑

上文中的scanner.next(results);最终会执行到HRegion.nextInternal(int limit)方法.

代码:

    private boolean nextInternal(int limit) throws IOException {
      RpcCallContext rpcCall = HBaseServer.getCurrentCall();
      while (true) {
        if (rpcCall != null) {
          // If a user specifies a too-restrictive or too-slow scanner, the
          // client might time out and disconnect while the server side
          // is still processing the request. We should abort aggressively
          // in that case.
          rpcCall.throwExceptionIfCallerDisconnected();
        }

        KeyValue current = this.storeHeap.peek();
        byte[] currentRow = null;
        int offset = 0;
        short length = 0;
        if (current != null) {
          currentRow = current.getBuffer();
          offset = current.getRowOffset();
          length = current.getRowLength();
        }
        if (isStopRow(currentRow, offset, length)) {
          if (filter != null && filter.hasFilterRow()) {
            filter.filterRow(results);
          }
          if (filter != null && filter.filterRow()) {
            results.clear();
          }

          return false;
        } else if (filterRowKey(currentRow, offset, length)) {
          nextRow(currentRow, offset, length);
        } else {
          KeyValue nextKv;
          do {
            this.storeHeap.next(results, limit - results.size());
            if (limit > 0 && results.size() == limit) {
              if (this.filter != null && filter.hasFilterRow()) {
                throw new IncompatibleFilterException(
                  "Filter with filterRow(List<KeyValue>) incompatible with scan with limit!");
              }
              return true; // we are expecting more yes, but also limited to how many we can return.
            }
            nextKv = this.storeHeap.peek();
          } while (nextKv != null && nextKv.matchingRow(currentRow, offset, length));

          final boolean stopRow = nextKv == null || isStopRow(nextKv.getBuffer(), nextKv.getRowOffset(), nextKv.getRowLength());

          // now that we have an entire row, lets process with a filters:

          // first filter with the filterRow(List)
          if (filter != null && filter.hasFilterRow()) {
            filter.filterRow(results);
          }

          if (results.isEmpty() || filterRow()) {
            // this seems like a redundant step - we already consumed the row
            // there‘re no left overs.
            // the reasons for calling this method are:
            // 1. reset the filters.
            // 2. provide a hook to fast forward the row (used by subclasses)
            nextRow(currentRow, offset, length);

            // This row was totally filtered out, if this is NOT the last row,
            // we should continue on.

            if (!stopRow) continue;
          } else if (this.remainingOffset > 0) {
            this.remainingOffset--;
            nextRow(currentRow, offset, length);
            if (!stopRow) continue;
          }
          return !stopRow;
        }
      }
    }

简化版:

    private boolean nextInternal(int limit) throws IOException {
      while (true) {
        // 获取heap顶的KeyValue
        KeyValue current = this.storeHeap.peek();
        byte[] currentRow = null;
        int offset = 0;
        short length = 0;
        if (current != null) {
          currentRow = current.getBuffer();
          offset = current.getRowOffset();
          length = current.getRowLength();
        }
        if (isStopRow(currentRow, offset, length)) {
          // 如果是结束行
          return false;
        } else if (filterRowKey(currentRow, offset, length)) {
          // 如果filter过滤掉了.则直接看下一行数据
          nextRow(currentRow, offset, length);
        } else {
          KeyValue nextKv;
          do {
            // 核心代码
            this.storeHeap.next(results, limit - results.size());
            nextKv = this.storeHeap.peek();
          } while (nextKv != null && nextKv.matchingRow(currentRow, offset, length));
        }
      }
    }

这里省去了边缘控制、过滤逻辑等内容.主要关注其核心逻辑.

全部代码中最核心的有两行:KeyValue current = this.storeHeap.peek();this.storeHeap.next(results, limit - results.size());.

这里的storeHeap是hbase维护的一个二叉堆(优先队列).这个堆里面存储的元素,是scanner.每个scanner都会持有当前scanner目前最新的、尚未返回的keyvalue.这个二叉堆的排序方式就是根据每个scanner当前keyvalue的rowkey进行排序.

每次执行查询的时候.首先会用KeyValue current = this.storeHeap.peek();取出堆顶的scanner的当前keyvalue.进行一些逻辑判断(主要是判断rowkey,如判断是否超过limit、是否到了stoprow、是否被filter过滤等).如果该keyvalue全部通过,也就是认为其应该被本次查询查到.会执行一次this.storeHeap.next(results, limit - results.size());注意这里才是有可能把当前keyvalue放入查询结果的地方.(不一定会放入,next方法中还有针对value的判断逻辑,比如比较timestamp是否正确).

scanner的排序

上文提到了scanner会依靠其当前元素rowkey进行排序.可以在类KeyValueHeap的构造方法中看到端倪.

  KeyValueHeap(List<? extends KeyValueScanner> scanners,
      KVScannerComparator comparator) throws IOException {
    this.comparator = comparator;
    if (!scanners.isEmpty()) {
      this.heap = new PriorityQueue<KeyValueScanner>(scanners.size(),
          this.comparator);
      for (KeyValueScanner scanner : scanners) {
        if (scanner.peek() != null) {
          this.heap.add(scanner);
        } else {
          scanner.close();
        }
      }
      this.current = pollRealKV();
    }

  }

毫无疑问.从scanner中取出元素也会影响KeyValueHeap中二叉堆的排序.故其可以保证二叉堆的堆顶的scanner的当前keyvalue一直是离上个遍历到的rowkey最近的keyvalue.

二叉堆的next逻辑

上文中的this.storeHeap.next(results, limit - results.size());最终会执行到:StoreScanner.next(List<KeyValue> outResult, int limit).其源码很长:

  public synchronized boolean next(List<KeyValue> outResult, int limit) throws IOException {

    if (checkReseek()) {
      return true;
    }

    // if the heap was left null, then the scanners had previously run out anyways, close and
    // return.
    if (this.heap == null) {
      close();
      return false;
    }

    KeyValue peeked = this.heap.peek();
    if (peeked == null) {
      close();
      return false;
    }

    // only call setRow if the row changes; avoids confusing the query matcher
    // if scanning intra-row
    byte[] row = peeked.getBuffer();
    int offset = peeked.getRowOffset();
    short length = peeked.getRowLength();
    if ((matcher.row == null) || !Bytes.equals(row, offset, length, matcher.row, matcher.rowOffset, matcher.rowLength)) {
      matcher.setRow(row, offset, length);
    }

    KeyValue kv;
    KeyValue prevKV = null;
    List<KeyValue> results = new ArrayList<KeyValue>();

    // Only do a sanity-check if store and comparator are available.
    KeyValue.KVComparator comparator =
        store != null ? store.getComparator() : null;

    long cumulativeMetric = 0;
    try {
      LOOP: while ((kv = this.heap.peek()) != null) {
        // Check that the heap gives us KVs in an increasing order.
        checkScanOrder(prevKV, kv, comparator);
        prevKV = kv;
        ScanQueryMatcher.MatchCode qcode = matcher.match(kv);
        switch (qcode) {
        case INCLUDE:
        case INCLUDE_AND_SEEK_NEXT_ROW:
        case INCLUDE_AND_SEEK_NEXT_COL:

          Filter f = matcher.getFilter();
          results.add(f == null ? kv : f.transform(kv));

          if (limit > 0 && results.size() == limit) {
            if (this.mustIncludeColumn != null) {
                throw new DoNotRetryIOException("Assistant " + this.assistant
                    + " incompatible with scan where limit=" + limit);
            }
          }

          if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_ROW) {
            if (!matcher.moreRowsMayExistAfter(kv)) {
              filterRowIfMissingMustIncludeColumn(results);
              outResult.addAll(results);
              return false;
            }
            seekToNextRow(kv);
          } else if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_COL) {
            reseek(matcher.getKeyForNextColumn(kv));
          } else {
            this.heap.next();
          }

          cumulativeMetric += kv.getLength();
          if (limit > 0 && (results.size() == limit)) {
            break LOOP;
          }
          continue;

        case DONE:
          filterRowIfMissingMustIncludeColumn(results);
          // copy jazz
          outResult.addAll(results);
          return true;

        case DONE_SCAN:
          close();
          filterRowIfMissingMustIncludeColumn(results);
          // copy jazz
          outResult.addAll(results);

          return false;

        case SEEK_NEXT_ROW:
          // This is just a relatively simple end of scan fix, to
          // short-cut end
          // us if there is an endKey in the scan.
          if (!matcher.moreRowsMayExistAfter(kv)) {
            filterRowIfMissingMustIncludeColumn(results);
            outResult.addAll(results);
            return false;
          }

          seekToNextRow(kv);
          break;

        case SEEK_NEXT_COL:
          reseek(matcher.getKeyForNextColumn(kv));
          break;

        case SKIP:
          this.heap.next();
          break;

        case SEEK_NEXT_USING_HINT:
          KeyValue nextKV = matcher.getNextKeyHint(kv);
          if (nextKV != null) {
            reseek(nextKV);
          } else {
            heap.next();
          }
          break;

        default:
          throw new RuntimeException("UNEXPECTED");
        }
      }
    } finally {
      RegionMetricsStorage.incrNumericMetric(metricNameGetSize,
          cumulativeMetric);
    }

    if (!results.isEmpty()) {
      filterRowIfMissingMustIncludeColumn(results);
      // copy jazz
      outResult.addAll(results);
      return true;
    }

    // No more keys
    close();
    return false;
  }

简化版:

  public synchronized boolean next(List<KeyValue> outResult, int limit) throws IOException {

    KeyValue peeked = this.heap.peek();
    if (peeked == null) {
      close();
      return false;
    }

    byte[] row = peeked.getBuffer();
    int offset = peeked.getRowOffset();
    short length = peeked.getRowLength();
    if ((matcher.row == null) || !Bytes.equals(row, offset, length, matcher.row, matcher.rowOffset, matcher.rowLength)) {
      matcher.setRow(row, offset, length);
    }

    KeyValue kv;
    List<KeyValue> results = new ArrayList<KeyValue>();

    KeyValue.KVComparator comparator =
        store != null ? store.getComparator() : null;

    long cumulativeMetric = 0;
    try {
      // 获取本store当前的keyvalue
      LOOP: while ((kv = this.heap.peek()) != null) {
        // 执行匹配.这里的匹配规则部分从用户的filter中来
        ScanQueryMatcher.MatchCode qcode = matcher.match(kv);
        switch (qcode) {
        case INCLUDE:
        case INCLUDE_AND_SEEK_NEXT_ROW:
        case INCLUDE_AND_SEEK_NEXT_COL:
          // 针对当前keyvalue应该包含在用户的查询结果的情况
          Filter f = matcher.getFilter();
          results.add(f == null ? kv : f.transform(kv));

          if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_ROW) {
            if (!matcher.moreRowsMayExistAfter(kv)) {
              filterRowIfMissingMustIncludeColumn(results);
              outResult.addAll(results);
              return false;
            }
            seekToNextRow(kv);
          } else if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_COL) {
            reseek(matcher.getKeyForNextColumn(kv));
          } else {
            this.heap.next();
          }

          cumulativeMetric += kv.getLength();
          if (limit > 0 && (results.size() == limit)) {
            // 查到了limit的数量.跳出循环
            break LOOP;
          }
          continue;

        case DONE:
        case DONE_SCAN:
        case SEEK_NEXT_ROW:
        case SEEK_NEXT_COL:
        case SKIP:
        case SEEK_NEXT_USING_HINT:
        // 这些都是查询过程中的各种情况.会有针对的处理

        default:
          throw new RuntimeException("UNEXPECTED");
        }
      }
    } finally {
      RegionMetricsStorage.incrNumericMetric(metricNameGetSize,
          cumulativeMetric);
    }

    // 放入结果中
    if (!results.isEmpty()) {
      filterRowIfMissingMustIncludeColumn(results);
      outResult.addAll(results);
      return true;
    }
    close();
    return false;
  }

上述代码可以看到整个next的核心逻辑.其实就是把当前store的当前keyvalue取出.用一个matcher做比较.看看该数据是否应该是用户查询的结果.

查询到的数据的多种可能

上文中的枚举值.这里举个例子.比如查询到一行数据的第一列是应该被查询到的.按理就应该查询其列族的第二列.而如果发现该行数据被删除掉了.那么不用查询其第二列了.也不用查询下个store了.应该直接查询下一行数据.

原文地址:https://www.cnblogs.com/dsj2016/p/10296191.html

时间: 2024-07-29 15:58:24

hbase(0.94) get、scan源码分析的相关文章

RxJava1.0 flatMap方法的源码分析

RxJava1.0 flatMap方法的源码分析 package com.yue.test; import java.awt.Cursor; import java.util.ArrayList; import java.util.List; import com.yue.bean.Course; import com.yue.bean.Student; import rx.Observable; import rx.Subscription; import rx.Observable.OnSu

HBase createTable 的服务器端实现源码分析

HBase的所有请求调用都是通过RPC的机制进行的,RPCServer监听到请求之后会解析请求内容,然后根据解析的方法以及参数调用服务器端实际的方法,这也是远程代理模式的经典做法,createTable的请求最终实现是在HMaster中的,但是实际的表的建立过程是在CreateTableHandler类中的,接下来主要就HBase中表的建立过程进行详细分析. 1. HMaster的createTable实现 如下代码所示,是HMaster中的createTable的流程代码: public vo

AppWidget源码分析---接口类

最近项目中接触到AppWidget,相对来说这部分比较简单,所以趁着空余时间详细阅读了AppWidget的源码.这篇文章主要是从源码上分析AppWidget中API类的相关原理,相关类的简单功能介绍和实现原理.关于使用,建议看指导文档. 简述 AppWidget相关的API类(供我们应用开发者使用的类)主要有: AppWidgetProvider:继承这个类,来提供Appwidget. AppWidgetManager:提供了AppWidget的管理接口,比如更新,绑定AppWidget id,

仿网易新闻导航栏PagerSlidingTabStrip源码分析

转载请注明本文出自Cym的博客(http://blog.csdn.net/cym492224103),谢谢支持!   前言 最近工作比较忙,所以现在才更新博文,对不住大家了~!言归正传,我们来说说这个PagerSlidingTabStrip,它是配合ViewPager使用的导航栏,网易新闻就是用的这个导航,我们仔细观察这个导航栏不仅他是跟着ViewPager滑动而滑动,而且指示器还会随着标题的长度而动态的变化长度. · 下载地址: Github:https://github.com/astuet

jquery2源码分析系列目录

学习jquery的源码对于提高前端的能力很有帮助,下面的系列是我在网上看到的对jquery2的源码的分析.等有时间了好好研究下.我们知道jquery2开始就不支持IE6-8了,从jquery2的源码中可以学到很多w3c新的标准( 如html5,css3,ECMAScript).原文地址是:http://www.cnblogs.com/aaronjs/p/3279314.html 关于1.x.x版的jquery源码分析系列,本博客也转载了一个地址http://www.cnblogs.com/jav

ym——Android仿网易新闻导航栏PagerSlidingTabStrip源码分析

转载请注明本文出自Cym的博客(http://blog.csdn.net/cym492224103),谢谢支持! 前言 最近工作比较忙,所以现在才更新博文,对不住大家了~!言归正传,我们来说说这个PagerSlidingTabStrip,它是配合ViewPager使用的导航栏,网易新闻就是用的这个导航,我们仔细观察这个导航栏不仅他是跟着ViewPager滑动而滑动,而且指示器还会随着标题的长度而动态的变化长度,还可以改变多种样式哦~! · 下载地址: Github:https://github.

死磕 java集合之DelayQueue源码分析

问题 (1)DelayQueue是阻塞队列吗? (2)DelayQueue的实现方式? (3)DelayQueue主要用于什么场景? 简介 DelayQueue是java并发包下的延时阻塞队列,常用于实现定时任务. 继承体系 从继承体系可以看到,DelayQueue实现了BlockingQueue,所以它是一个阻塞队列. 另外,DelayQueue还组合了一个叫做Delayed的接口,DelayQueue中存储的所有元素必须实现Delayed接口. 那么,Delayed是什么呢? public

Solr4.8.0源码分析(7)之Solr SPI

Solr4.8.0源码分析(7)之Solr SPI 查看Solr源码时候会发现,每一个package都会由对应的resources. 如下图所示: 一时对这玩意好奇了,看了文档以后才发现,这个services就是java SPI机制.首先介绍下java SPI机制,然后再结合Solr谈一下SPI. 1. JAVA SPI 当服务的提供者,提供了服务接口的一种实现之后,在jar包的META-INF/services/目录里同时创建一个以服务接口命名的文件.该文件里就是实现该服务接口的具体实现类.而

hbase split 源码分析之split策略

在工作中接触到split,于是查看了这块的源代码,先看到了split的策略,今天就说说这个吧,后续还会有split的其他源码分析和compact相关的源码分析. 看了很多其他人的博客,很多都是转发的,原创的也都没有注明是哪个版本.其实给很多读者造成混淆,我这里是基于Hbase-0.98.13  版本作为分析的,注意:不同版本的此部分源码很可能不一样. 在这个版本中使用的split策略是IncreasingToUpperBoundRegionSplitPolicy.确切来说他是0.94版本以后的策