HBase的put流程源码分析

hbase是一个nosql型数据库,本文我们会分析一下客户的数据是通过什么样的路径写入到hbase的。

HBase作为一种列族数据库,其将相关性较高的列聚合成一个列族单元,不同的列族单元物理上存储在不同的文件(HFile)内。一个表的数据会水平切割成不同的region分布在集群中不同的regionserver上。客户端访问集群时会首先得到该表的region在集群中的分布,之后的数据交换由客户端和regionserver间通过rpc通信实现,下面我们从hbase源码里探究客户端put数据的流程。本文参考的源码是1.1.2版本的hbase

1)客户端

put在客户端的操作主要分为三个步骤,下面分别从三个步骤展开解释:

(一)、客户端缓存用户提交的put请求

get/delete/put/append/increment等等等等客户可用的函数都在客户端的HTable.java文件中。

在HTable.java文件中有如下的两个变量:

private RpcRetryingCallerFactory rpcCallerFactory;

private RpcControllerFactory rpcControllerFactory;

protected AsyncProcess multiAp;

如上的几个变量分别定义了rpc调用的工厂和一个异步处理的进程

客户端的put请求调用getBufferedMutator().mutate(put),进入mutate这个函数可以看到它会把用户提交的此次put操作放入到列表writeAsyncBuffer中,当buffer中的数据超过规定值时,由后台进程进行提交。

(二)、将writeBuffer中的put操作根据region的不同进行分组,分别放入不同的Map集合

进程提交由函数backgroudFlushCommits完成,提交动作包含同步提交和异步提交两种情况,由传入的参数boolean控制。进入上述函数分析。

可见当传入backgroudFlushCommits的参数为false时执行的是异步提交,参数为true时执行的是同步提交。

与此同时,可以发现无论异步提交还是同步提交,实际的提交动作是由AsyncProcess ap执行的,调用的语句如下:

ap.submit(tableName,writeAsyncBuffer,true,null,false)

需要注意的是多数情况下执行的是异步提交,只有在异步提交出错的情况下执行同步提交。

进入submit函数,可以看到它循环遍历参数writeAsyncBuffer中的每一行,通过connection.locateRegion函数找到其在集群的位置loc,将该位置与操作action一起绑定在变量actionByServer中。

这里的region定位是由ClusterConnection类型的变量connection完成的,进入其locateRegion方法可以看出,如果客户端有缓存,则直接从缓存读取,否则从META表中读出了region所处的位置,并缓存此次的读取结果。返回的结果是RegionLocations类型的变量。

actionByServer是一个Map<ServerName,MulteAction<Row>>类型的变量,从该变量的类型定义可以看出,其将用户的一批写请求中,写入regionserver地址相同的动作归类到一起。

(三)、提交服务端RegionServer处理,在回调函数中与服务端交互。

最后调用sumitMultiActions函数将所有请求提交给服务端,它接受了上面的actionByServer作为参数,内部实例化一个AsyncRequestFutureImpl类执行异步的提交动作。

从sendMultiAction函数中一步步向里查看代码,其将用户的action请求通过getNewMultiActionRunnable、SingleServerRequestRunnable层层调用最终落到了hbase的RPC框架中,每个用户请求包装成包装MultiServerCallable对象,其是一个Runnable对象,在该对象中用户请求与服务端建立起RPC联系。所有的runnable对象最终交到AsyncProcess对象的内部线程池中处理执行。

2)服务端

客户端MultiServerCallable的call方法中调用了服务端的multi函数执行提交动作,进入服务端。

multi方法内部会根据请求是否是原子请求,执行不同的操作语句,这里我们以非原子性提交为例,其执行了doNonAtomicRegionMutation()函数,这个函数中先进行一些rpc请求的编码,将编码后的action相关信息组织到一个List<ClientProtos.Action>类型的变量mutations中,这里的编码采用的proto buffer的编码方案,然后调用doBatchOp()语句,其接受了mutations作为参数。

在doBatchOp函数中,可以看到其最终调用的batchMutate执行的批量操作,这里操作的结果会返回到OperationStatus类型的变量codes[]中,包括了以下几种状态:BAD_FAMILY;SANITY_CHECK_FAILURE;SUCCESS等状态。 这些状态记录了每个action的执行结果,包括成功啦、失败啦等等。

batchMutate最终落入到HRegion.java的2783行,这里先判断一下资源的状态,然后调用doMiniBatchMutation()执行最终的put操作,该操作返回的是写入数据的大小addedSize,根据addedSize计算此时memstore的size以决定是否flush,如果达到了flush的要求,执行requestFlush()。

服务端的put主要实现在HRegion.java的doMiniBatchMutation(),该函数主要利用了group commit技术,即多次修改一起写。

首先对于所有要修改的行,一次性拿住所有行锁,在2944行实现。

rowLock = getRowLockInternal(mutation.getRow(),shouldBlock) ,注意的是这里的锁是写锁。

put和delete在客户端都是由这个函数实现的,在2960行针对两者的不同第一次出现了不同的处理,分别将put和delete操作归类到putsCfSet和deletesCfSet两个不同的集合中,这两个集合分别代表了put/delete的列族集合,数据类型为Set<byte[]>。

第二步是修正keyvalue的时间戳,把action里面的所有kv时间戳修正为最新的时间。时间戳修正之后,在3009行

lock(this.updatesLock.readLock(),numReadyToWrite) 加入了读锁。

然后获得该批写入memstore数据的批次号mvccNum,mvccNum同时也是此次写事务的版本号,由this.sequenceId加一获得的

然后通过w=mvcc.beginMemstoreInsertWithSeqNum(mvccNum),进入函数beginMemstoreInsertWithSeqNum,可以看见,该函数通过传入的mvccNum new一个新的WriteEntry对象,然后将WriteEntry放入队列writeQueue中,这一步加队列的操作是被锁保护起来的。

writeQueue队列用于保存多个并发写事务的WriteEntry。

然后,就是将batch中的数据写入到各个store的memstore中,并根据batch中的数据构建WAL edit。

构造WAL edit之后,将该条数据对应的table name、region info、cluster id等等包装成一个HLogKey结构的对象,该对象即为walkey,将walKey和WAL edit共同组装成一个entry之后将之append到内存中的ringbuffer数据结构中。

注意的是这次的append操作产生一个HLog范围内的id,记作txid。txid用于标识这次写事务写入的HLog日志。

写入buffer后,即释放所有的行锁,两阶段锁过程结束。然后在3153行

syncOrDefer(txid,durability)

将这次事务的日志持久化到hfs中,一旦持久化完成便提交此次事务,代码在3170行,其调用了completeMemstoreInsertWithSeqNum(),走进这个函数会发现其在写入mvccnum之后,调用了waitForPreviousTransactoinsComplete()函数,这个函数实际是推进了mvcc的memstoreRead,推进的思路如下:

先锁上writeQueue队列,然后一个一个看,找连续的已完成的WriteEntry,最后一个WriteEntry的writeNumber即是最新的点,此时可以赋值给mvcc.memstoreRead,后续读事务一开始就去拿mvcc.memstoreRead,从而能够拿到本次写入的数据。

这里要补充一句,此时写入的数据存储在memstore中,并没有持久化到hdfs中,内存中的key-value是以skip list的数据结构存储的。

总结上面hbase的写路径可以发现在hbase的写入过程中应用到了如下的一些技术:

首先,客户端的rpc请求传递到服务端时,函数AsyncRequestFutureImpl()是一个Lazy优化,或者说是一个异步的优化,虽然函数声明了一个对服务端的rpc调用,但是它并没有马上呼叫服务端,而是在需要时才真正呼叫服务端。

第二,数据提交时采用了group
commit技术,理解group commit可以用挖煤做比喻,是一铲子一铲子挖比较快,还是一次挖出一车比较省力。

第三,MVCC即多版本并发控制

限于篇幅和本人的知识有限,以上所说的只是简单描述了hbase的写事务的主干路径,并简要指出了其中的关键技术点,此外还有幂等控制、回滚操作、错误处理以及写入线程模型等等等等,即便是提到的mvcc、group
commit也只是蜻蜓点水,如果展开还有很多很精彩的内容值得大家研究,如果你也对hbase感兴趣,欢迎与我一起讨论,共同提高。

参考资料:

http://www.cnblogs.com/foxmailed/p/3897884.html

时间: 2024-10-01 00:37:47

HBase的put流程源码分析的相关文章

Activity启动流程源码分析之Launcher启动(二)

1.前述 在前一篇文章中我们简要的介绍Activity的启动流程Activity启动流程源码分析之入门(一),当时只是简单的分析了一下流程,而且在上一篇博客中我们也说了Activity的两种启动方式,现在我们就来分析其中的第一种方式--Launcher启动,这种启动方式的特点是会创建一个新的进程来加载相应的Activity(基于Android5.1源码). 2.Activity启动流程时序图 好啦,接下来我们先看一下Launcher启动Activity的时序图: 好啦,接下来我们将上述时序图用代

A2dp初始化流程源码分析

蓝牙启动的时候,会涉及到各个profile 的启动.这篇文章分析一下,蓝牙中a2dp profile的初始化流程. 我们从AdapterState.java中对于USER_TURN_ON 消息的处理说起: switch(msg.what) { case USER_TURN_ON: notifyAdapterStateChange(BluetoothAdapter.STATE_TURNING_ON); mPendingCommandState.setTurningOn(true); transit

5.Spark Streaming流计算框架的运行流程源码分析2

1 spark streaming 程序代码实例 代码如下: [html] view plain copy object OnlineTheTop3ItemForEachCategory2DB { def main(args: Array[String]){ val conf = new SparkConf() //创建SparkConf对象 //设置应用程序的名称,在程序运行的监控界面可以看到名称 conf.setAppName("OnlineTheTop3ItemForEachCategor

SpringMVC(十七):Web.xml加载流程源码分析

之前章节讲解了web.xml如何使用编码的方式替换掉,但是一直没有写web.xml是如何被加载的相关细节,觉得十分有必要写一篇文章类梳理下. 待完成... 参考 <SpringMVC初始化流程> <Spring 4.x源码分析-BeanWrapper> <第三章 DispatcherServlet详解 ——跟开涛学SpringMVC> <SpringMvc之DispatcherServlet详解> <Spring MVC入口Servlet详解(Http

Android Touch事件派发流程源码分析

分native侧事件派发到java侧和Framework派发事件到UI,流程看源码即可,此处不赘叙, Native侧派发事件的干活类图如下: Framework侧派发事件的类图如下: 从Activity.dispatchTouchEvent开始,Action_Down事件派发的时序如下: 分析Android 5.0源码可知,ViewGroup的事件派发是一个后序遍历树的递归过程,在Action_Down事件的处理中做了两个事情: 1.递归查找touchTarget,并标记在ViewGroup的m

es lucene搜索及聚合流程源码分析

本文以TermQuery,GlobalOrdinalsStringTermsAggregator为例,通过代码,分析es,lucene搜索及聚合流程.1:协调节点收到请求后,将search任务发到相关的各个shard. 相关代码: TransportSearchAction.executeSearch TransportSearchAction.searchAsyncAction.start AbstractSearchAsyncAction.executePhase(SearchQueryTh

a2dp播放流程源码分析

之前分析了a2dp profile 的初始化的流程,这篇文章分析一下,音频流在bluedroid中的处理流程. 上层的音频接口是调用a2dp hal 里面的接口来进行命令以及数据的发送的. 关于控制通道的初始化以及建立的过程,这里就不分析了,我们主要看数据的流向和处理.我们从控制通道的最后一个命令start 开始分析流程. 我们直接看a2dp hal 中out_write的实现: static ssize_t out_write(struct audio_stream_out *stream,

SpringBoot自动装配流程源码分析

SpringBoot 传统方式的SSM框架因为需要配置大量文件而被开发人员诟病重复性工作,所以SpringBoot的出现在减少开发人员做大量重复性配置的工作,使得开发人员能够快速的开始项目开发.更加专注于业务代码的编写.但SpringBoot跟SSM有什么框架不同呢?为什么SpringBoot可以自动装配呢?SpringBoot自动装配是如何实现的呢? SpringBoot入口 写过SpringBoot应用的开发者都知道,SpringBoot应用的启动类是被@SpringBootApplicat

小记--------spark-job触发流程源码分析

job是串行执行的, 执行完上一个才执行下一个 eg:Wordcount案例 val lines = sc.textFile("本地URL or HDFS URL")//详解见代码1 val words = lines.flatMap(line => line.split(" "))//也会返回一个MapPartitionsRDD val pairs = words.map(word => (word , 1))//同样也是返回一个MapPartitio