这是这个系列的最后一篇了,实在没精力写了,本来还想写一下hbck的,这个东西很常用,当hbase的Meta表出现错误的时候,它能够帮助我们进行修复,无奈看到3000多行的代码时,退却了,原谅我这点自私的想法吧。
在讲《Get、Scan在服务端是如何处理?》当中的nextInternal流程,它的第一步从storeHeap当中取出当前kv,这块其实有点儿小复杂的,因为它存在异构的Scanner(一个MemStoreScanner和多个StoreFileScanner),那怎么保证从storeHeap里面拿出来的总是离上一个kv最接近的kv呢?
这里我们知道,在打开这些Scanner之后,就对他们进行了一下seek操作,它们就已经调整到最佳位置了。
我们看看KeyValueHeap的构造函数里面去看看吧。
public KeyValueHeap(List<? extends KeyValueScanner> scanners, KVComparator comparator) throws IOException {
this.comparator = new KVScannerComparator(comparator);
if (!scanners.isEmpty()) {
this.heap = new PriorityQueue<KeyValueScanner>(scanners.size(),
this.comparator);
//...
this.current = pollRealKV();
}
}
它内部有一个叫heap的PriorityQueue<KeyValueScanner>队列,它会对所有的Scanner进行排序,排序的比较器是KVScannerComparator,
然后current又调用了pollRealKV通过比较获得当前的Scanner,后面会讲。
那好,我们直接进去KVScannerComparator看看它的compare方法就能知道怎么回事了。
public int compare(KeyValueScanner left, KeyValueScanner right) {
// 先各取出来一个KeyValue进行比较
int comparison = compare(left.peek(), right.peek());
if (comparison != 0) {
return comparison;
} else {
// key相同,选择最新的那个
long leftSequenceID = left.getSequenceID();
long rightSequenceID = right.getSequenceID();
if (leftSequenceID > rightSequenceID) {
return -1;
} else if (leftSequenceID < rightSequenceID) {
return 1;
} else {
return 0;
}
}
}
额,从上面代码看得出来,把left和right各取出一个kv来进行比较,如果一样就比较SequenceID,SequenceID越大说明这个文件越新,返回-1,在升序的情况下,这个Scanner就跑到前面去了。
这样就实现了heap里面拿出来的第一个就是最小的kv的最新版。
在继续将之前,我们看一下在KeyValue是怎么被调用的,这样我们好理清思路。
//从storeHeap里面取出一个来
KeyValue current = this.storeHeap.peek();
//后面是一顿比较,比较通过,把结果保存到results当中
KeyValue nextKv = populateResult(results, this.storeHeap, limit, currentRow, offset, length);
接着看populateResult方法。
private KeyValue populateResult(List<Cell> results, KeyValueHeap heap, int limit,
byte[] currentRow, int offset, short length) throws IOException {
KeyValue nextKv;
do {
//从heap当中取出剩下的结果保存在results当中
heap.next(results, limit - results.size());
//如果够数了,就返回了
if (limit > 0 && results.size() == limit) {
return KV_LIMIT;
}
nextKv = heap.peek();
} while (nextKv != null && nextKv.matchingRow(currentRow, offset, length));
return nextKv;
}
我们对KeyValueHeap的使用,就是先peek,然后再next,我们接下来就按这个顺序看吧。
先从peek取出来一个,peek就是从heap队列取出来的current的scanner取出来的当前的KeyValue。
if (this.current == null) {
return null;
}
return this.current.peek();
然后我们看next方法。
public boolean next(List<Cell> result, int limit) throws IOException {
if (this.current == null) {
return false;
}
InternalScanner currentAsInternal = (InternalScanner)this.current;
boolean mayContainMoreRows = currentAsInternal.next(result, limit);
KeyValue pee = this.current.peek();
if (pee == null || !mayContainMoreRows) {
this.current.close();
} else {
this.heap.add(this.current);
}
this.current = pollRealKV();
return (this.current != null);
}
1、通过currentAsInternal.next继续获取kv,它是只针对通过通过检查的当前行的剩下的KeyValue,这个过程在之前那篇文章讲过了。
2、如果后面没有值了,就关闭这个Scanner。
3、然后还有,就把这个Scanner放回heap上,等待下一次调用。
4、使用pollRealKV再去一个新的Scanner出来。
private KeyValueScanner pollRealKV() throws IOException {
KeyValueScanner kvScanner = heap.poll();
if (kvScanner == null) {
return null;
}while (kvScanner != null && !kvScanner.realSeekDone()) {
if (kvScanner.peek() != null) {
//查询之前没有查的
kvScanner.enforceSeek();
//把之前的查到位置的kv拿出来
KeyValue curKV = kvScanner.peek();
if (curKV != null) {
//再选出来下一个的scanner
KeyValueScanner nextEarliestScanner = heap.peek();
if (nextEarliestScanner == null) {
// 后面没了,只能是它了
return kvScanner;
}// 那下一个Scanner的kv也出来比较比较
KeyValue nextKV = nextEarliestScanner.peek();
if (nextKV == null || comparator.compare(curKV, nextKV) < 0) {
// 它确实小,那么就把它放出去吧
return kvScanner;
}// 把它放回去,和别的kv进行竞争
heap.add(kvScanner);
} else {
// 它没东西了,关闭完事
kvScanner.close();
}
} else {
// 它没东西了,关闭完事
kvScanner.close();
}
kvScanner = heap.poll();
}return kvScanner;
}
尽管它已经排过序了,它还是要再不停的比较这些个Scanner的取出来的kv,可能是因为它不是每次放进去都排序,只是在初始化的时候排序?这个就不管了,反正给出我们要的结果就行了。
总结:
这就把如何查询出来下一个KeyValue的过程讲完了,它的peek方法、next方法、比较的方法,希望对大家有帮助,这个系列的文章到此也就结束了,下个目标是跟随超哥学习Spark源码,感谢广大读者的支持,觉得我写得好的,可以关注一下我的博客,谢谢!