经过这段时间对淘宝开源K/V缓存系统tair基础接口put/get/get_range等源码的详细剖析后,按自己的理解简单总结下tair中我较关心的get和get_range这两个接口的基本实现流程。由于get_range接口是最新几个版本才加入的功能,因此以前的tair Java客户端及最新版本安装后的命令行测试中都没有加入get_range测试接口,这无疑给需要专门优化get_range接口的我带来了不便,因此本文最后给出了在tair中实现get_range命令行测试的方法和示例。
1 tair基础接口Get
Get基础接口的实现主要分为两个部分:tair client端和tair server端,这里的server端指的是存储引擎之一leveldb。
1.1 client端的实现(tair_client_api_impl.cpp::get)
在分布式通信中,client端是封装给用户使用的接口,本质上并不实现真正的存储功能,而是将请求打包发送给server端,让server端实现具体的功能,然后将返回结果也打包发送给client端解析处理完成用户所要求的功能。
client端get接口的大致实现流程如下:
int get(int area, const data_entry &key, data_entry* &data);
(1)首先检查key和area参数的有效性;
if(!key_entry_check(key)){ return TAIR_RETURN_ITEMSIZE_ERROR; } if(area < 0 || area >= TAIR_MAX_AREA_COUNT){ return TAIR_RETURN_INVALID_ARGUMENT; }
(2)利用MurmurHash2哈希算法取得key所在的server list;
vector<uint64_t> server_list; if (!get_server_id(key, server_list)) { TBSYS_LOG(DEBUG, "can not find serverId, return false"); return -1; } TBSYS_LOG(DEBUG,"get from server:%s",tbsys::CNetUtil::addrToString(server_list[0]).c_str());
在分布式存储系统中,数据都是存储在多台服务器上,如何将数据均匀的分布在各个服务器上即如何实现负载均衡也是分布式存储的一大难题,最多采用的一般是hash做法,比如有10台服务器,那么存储key所用的server id可以取hash(key) % 10。当然这是最粗糙的做法,有很多缺点,比如key1和key2就很有可能不在同一台服务器上,这样当我们get_range key为前缀的数据时就需要到所有服务器上找一遍,十分低效。为此,tair采取了一种更好的hash做法,使用prefix
key而非整个key作为hash的参数,并采用著名的MurmurHash2哈希算法,一方面使得相同前缀的key都能存储在同一台或同几台服务器上方便查找,另一方面与其它流行的哈希函数相比,MurmurHash对于规律性较强的key的随机分布特征表现更良好,使得负载相对更均衡。
(3)将所有参数封装成一个packet,然后调用tbnet库函数向服务器端发送get请求,底层通过socket通信和RPC远程过程调用机制。注意,在send_request之前创建wait_object用于异步通信。
while (loop_count < s_size) { request_get *packet = new request_get(); packet->area = area; packet->add_key(const_cast<data_entry*>(&key), true); cwo = this_wait_object_manager->create_wait_object(); if (send_request(server_list[index],packet,cwo->get_id()) < 0) { ... } ++loop_count; }
这里s_size是server list的大小,向所有可能包含该key的server发送get请求。
(4)获取server端的反馈结果,解析返回数据包根据返回值判断是否请求成功,并释放wait_object。
resp = (response_get*)tpacket; ret = resp->get_code(); if (ret != TAIR_RETURN_SUCCESS) { goto FAIL; } if (resp->data) { data = resp->data; resp->data = 0; ret = TAIR_RETURN_SUCCESS; } else { ret = TAIR_RETURN_PROXYED_ERROR; TBSYS_LOG(ERROR, "proxyed get error: %d.", ret); goto FAIL; } this_wait_object_manager->destroy_wait_object(cwo);
从上可以看出,client端其实就是一个发包收包解析包的过程,真正的请求实现还是在server端。
1.2 server端的实现(ldb_instance.cpp::get)
client端和server端连接的接口在ldb_manager.cpp中,真正的实现在ldb_instance.cpp中,get接口的大致实现流程如下:
(1)首先查询cache,如果cache命中则结束查找。tair中使用了LRU缓存机制。
int rc = do_cache_get(ldb_key, db_value, true/* update stat */);
(2)如果cache未命中或者过期了,则从db中一步步查询,并将此次查询结果添加到cache中。
rc = do_get(ldb_key, db_value, false/* not get from cache */, true/* fill cache */);
这一步才是真正的db查询,涉及到leveldb的存储机制。下面简单描述其查找过程(在db_impl.cc中):
memtable —> immutable memtable —> current sstable
首先在memtable中查找(MemTable::Get());如果memtable中未找到,并且存在immutable memtable,则在immutable memtable中查找(Memtable::Get());若 仍未找到,在sstable中查找(VersionSet::Get())。在sstable中查找就涉及到了磁盘的随机访问,也是最耗时的地方,其查找的过程也较复杂:
a. 首先找到可能包含key的sst files。
for (int level = 0; level < config::kNumLevels; level++) { size_t num_files = files_[level].size(); if (num_files == 0) continue; // Get the list of files to search in this level FileMetaData* const* files = &files_[level][0]; if (level == 0) { PROFILER_BEGIN("db l0"); // Level-0 files may overlap each other. Find all files that // overlap user_key and process them in order from newest to oldest. tmp.reserve(num_files); for (uint32_t i = 0; i < num_files; i++) { FileMetaData* f = files[i]; if (ucmp->Compare(user_key, f->smallest.user_key()) >= 0 && ucmp->Compare(user_key, f->largest.user_key()) <= 0) { tmp.push_back(f); } } if (tmp.empty()) continue; std::sort(tmp.begin(), tmp.end(), NewestFirst); files = &tmp[0]; num_files = tmp.size(); } else { PROFILER_BEGIN("db lN"); // Binary search to find earliest index whose largest key >= ikey. uint32_t index = FindFile(vset_->icmp_, files_[level], ikey); if (index >= num_files) { files = NULL; num_files = 0; } else { tmp2 = files[index]; if (ucmp->Compare(user_key, tmp2->smallest.user_key()) < 0) { // All of "tmp2" is past any data for user_key files = NULL; num_files = 0; } else { files = &tmp2; num_files = 1; } } }
由于leveldb采用了分级存储机制,其中level 0的keys可能互相重叠,而其它levels的keys不可能重叠,因此在level 0中只能根据file的[smallest key, largest key]范围顺序遍历每一个file,找到可能包含key的files,而在其它level就可以通过二分查找找到可能包含key的files。
b. 找到sst files后,就需要逐一遍历每一个file,由于在遍历file之前需要获取该file所对应的table的handle以便操作该file,这里又使用了cache机制,先在cache中查找handle以避免查找table所消耗的随机磁盘访问,如果没找到就需要随机访问file找到handle然后打开这个file,并将查找的结果插入cache中。该过程的实现在table_cache.cc::FindTable中。
c. 找到table后,根据该table的index block对key进行二分查找定位可能包含该key的blocks。
d. 由于blocks的查找需要真正的磁盘IO,比较耗时,因此在遍历每一个block之前使用了Filter_policy(Table.cc:InternalGet()中),
FilterPolicy有3个接口:
virtual constchar* Name() const = 0; // 返回filter的名字 virtual voidCreateFilter(const Slice* keys, int n, std::string* dst)const = 0; virtual boolKeyMayMatch(const Slice& key, const Slice& bloom_filter)const = 0;
其中CreateFilter接口根据指定的参数创建过滤器,并将结果append到dst中。参数keys[0,n-1]包含依据用户提供的comparator排序的key列表,并把根据这些key创建的filter追加到*dst中。
KeyMayMatch接口用于查找匹配,参数bloom_filter包含了调用CreateFilter函数append的数据,如果key在传递函数CreateFilter的key列表中,则返回true。
在leveldb中,有一个默认的Bloomfilter实现,使用了double hashing来模拟多个hash函数。
如果filter能直接过滤该key就避免了block的读过程,降低了读磁盘的消耗。
if (filter != NULL && handle.DecodeFrom(&handle_value).ok() && !filter->KeyMayMatch(handle.offset(), k)) { // Not found } else { // 在block中查找 }
这里bloomfilter的使用就是优化get接口的措施之一,也是我们优化get_range接口的主要参考点。
2 tair基础接口GetRange
int get_range(int area, const data_entry &pkey, const data_entry &start_key, const data_entry &end_key, int offset, int limit, vector<data_entry *> &values,short type=CMD_RANGE_ALL);
有了get接口的分析后,对get_range接口的理解就简单的多,下面简单总结这个过程:
(1)首先客户端调用的是tair_client_api_impl.cpp中的get_range()接口实现,所做的主要工作就是解析参数,检查参数的合法性,然后将参数封装成request_get_range包通过send_request()发送给服务器端,服务器端收到request后解析数据包,得到get_range请求及请求所需要的参数,然后在服务器端本地执行该功能请求,并将执行得到的结果封装成response_get_range包发送给客户端,客户端通过get_response()得到该数据包并解析出所需要的get_range执行结果,从而完成客户端和服务器间的socket通信(通过RPC机制实现)。
(2)服务端通过leveldb真正实现get_range接口,入口在ldb_manager.cpp文件里,该文件里get_range的具体实现在ldb_instance.cpp里。由于ldb通过sstable文件格式实现了持久化,而sstable的数据由一个个的block组成。当持久化数据时,多份KV聚合成block一次写入;当读取时,也是以block为单位做IO。数据在存储之前都进行了前缀压缩,降低存储空间,block将读出来的数据封装成Block结构,然后对key的查找和遍历都是在Block上进行,上层封装成Block::Iter处理,具体的实现在Block.cc中。主要是Seek()函数的实现,简单总结下是:先使用二分查找找到key属于的前缀压缩区间的开始偏移,然后从开始点线性遍历找到不小于key的entry。
而get_range接口就是通过调用Seek()一个个遍历[key_start, key_end]查找其对应的value。
那么get接口和get_range接口具体在实现流程上有什么区别呢?我的理解如下:
GetRange接口的实现是通过遍历封装的DBIter,DBIter内部包括:整个db内部的Iterator、memtable的iterator、immutable memtable的iterator以及所有sstable的iterators,将所有这些获取的iterators作为children iterator构造出一个MergingIterator,再加上snapshot构造出DBIter。即对DBIter->Seek()时相当于逐一对内部的所有Iterator进行了对应的Seek操作,对Range范围内的所有key进行该操作(范围不能超出kLimitGetRange)就实现了GetRange接口。其单个key的查找流程和Get一致(都是mem—>imm—>sstable),只是GetRange将这些过程封装成一个DBIter的Seek操作,而Get接口将这些过程分开了,中间加入了一些优化机制,比如:
a. 加入了cache机制
b. 在block 查找之前加入了Bloomfilter机制
GetRange过程没有这些优化,直接调用各Iterator的Seek按序查找,而所有的Seek操作都没有Bloomfilter实现。
因此我们可以在GetRange接口中也加入bloomfilter机制实现block的过滤,这是优化GetRange接口的初步思想。
3. tair中添加get_range命令行测试接口
由于公开的Tair Java客户端及安装测试命令行中都没有加入GetRange接口,而我们的工作主要是为了优化GetRange接口,对GetRange接口的测试必不可少,因此初期我在命令行测试中加入了GetRange接口。具体实现也很简单:在tair_client.cpp中加入了get_range / get_range_only_key / get_range_only_value三个range接口,通过调用tair_client_api_impl.cpp中的接口实现,如下(事先存储了k1/v1到k8/v8这8对数据):
TAIR>get_range ------------------------------------------------ SYNOPSIS : get_range pkey start_key end_key offsetlimit [area] DESCRIPTION:range query for [start_key, end_key] with the same prefix key pkey TAIR>get_range k 1 9 0 0 Find 8key/value entry KEY:k1, LEN: 3 raw data: v1, \76\31\00 KEY:k2, LEN: 3 raw data: v2, \76\32\00 KEY:k3, LEN: 3 raw data: v3, \76\33\00 KEY:k4, LEN: 3 raw data: v4, \76\34\00 KEY:k5, LEN: 3 raw data: v5, \76\35\00 KEY:k6, LEN: 3 raw data: v6, \76\36\00 KEY:k7, LEN: 3 raw data: v7, \76\37\00 KEY:k8, LEN: 3 raw data: v8, \76\38\00 TAIR>
4. 总结
首先按自己的理解总结了tair中两个基础接口get和get_range的实现流程,然后在tair中增加了get_range的命令行测试接口,后面会逐步介绍tair的性能测试方案和对get_range接口实现bloomfilter的具体方案。
tair中对get/get_range接口的理解及为get_range添加命令行测试接口