tair源码分析——leveldb新增的CompactRangeSelfLevel过程

tair是一个分布式KV存储引擎,当新增机器或者有机器down掉的时候,tair的dataserver会根据ConfigServer生成的新的对照表进行数据的迁移和清理。在数据清理的过程中就用到了在tair中新增的Compaction方式——CompactRangeSelfLevel,顾名思义,这个CompactRangeSelfLevel就是对自己所在(指定)的Level进行一定Key范围的Compaction然后将生成的输出文件也写入到自己所在的Level而不是父层(L + 1)。下面我们来对这个CompactRangeSelfLevel进行分析。

// compact filenumber 小于limit的key 在 [begin, end)范围内的 sstable
// compact的时候只有level 0 SSTable会输出到 level 1, 其他的level都是输入输出在同一个level
Status DBImpl::CompactRangeSelfLevel(
  uint64_t limit_filenumber,
  const Slice* begin,
  const Slice* end) {
  // 初始化一个MannualCompaction对象
  manual.limit_filenumber = limit_filenumber;
  manual.bg_compaction_func = &DBImpl::BackgroundCompactionSelfLevel;
  // use TimedCond() 防止丢失唤醒信号
  // 每个层级逐次schedule manualcompaction
  for (int level = 0; level < config::kNumLevels && manual.compaction_status.ok(); ++level) {
    ManualCompaction each_manual = manual;
    each_manual.level = level;
    while (each_manual.compaction_status.ok() && !each_manual.done) {
      // still have compaction running 就等待
      while (bg_compaction_scheduled_) {
        bg_cv_.TimedWait(timed_us);
      }
      manual_compaction_ = &each_manual;
      MaybeScheduleCompaction();
      while (manual_compaction_ == &each_manual) {
        bg_cv_.TimedWait(timed_us);
      }
    }
    manual.compaction_status = each_manual.compaction_status;
  }
  return manual.compaction_status;
}

MaybeScheduleCompaction主要是判断是否新启动一个后台的Compaction线程,主要是以是否有Compaction的任务和是否已经有Compaction线程已经在运行为依据,这个函数已经在前面专门解释Compaction的文章中进行了分析,这里不再介绍。我们详细分析一下真正进行工作的BackgroundCompactionSelfLevel函数

void DBImpl::BackgroundCompactionSelfLevel() {
  do {
    // level-0 不对 filenumber 进行限制
   /* CompactRangeOneLevel故名思议将该level中的所有符合filenumber 小于limit
        key 在 [begin, end)范围内的 sstable找出来 */
    c = versions_-> CompactRangeOneLevel(m->level,
            m->level > 0 ? m->limit_filenumber : ~(static_cast<uint64_t>(0)),
            m->begin, m->end);  

    if (NULL == c) { // no compact for this level
      m->done = true; // done all.
      break;
    }
    // 记录下manual Compaction的结束key
    manual_end = c->input(0, c->num_input_files(0) - 1)->largest;

    CompactionState* compact = new CompactionState(c);
    status = DoCompactionWorkSelfLevel(compact);     // 真正进行Compaction的函数
    CleanupCompaction(compact);
    c->ReleaseInputs();
    DeleteObsoleteFiles();
    delete c;
    if (shutting_down_.Acquire_Load()) {
      // Ignore compaction errors found during shutting down
    } else if (!status.ok()) {
      m->compaction_status = status; // save error
      if (bg_error_.ok()) { // no matter paranoid_checks
        bg_error_ = status;
      }
      break; // exit once fail.
    }
  } while (false);
  if (!m->done) {
    // We only compacted part of the requested range. Update *m
    // to the range that is left to be compacted.
    m->tmp_storage = manual_end;
    m->begin = &m->tmp_storage;
  }
  // Mark it as done
  manual_compaction_ = NULL;
}

这里DoCompactionWorkSelfLevel是真正进行KV读取和Compaction的地方,然而我们这里并不打算对其进行详细的分析,因为通过对比我们知道其主题过程和DoCompactionWork相同,只是在一些细微的判断方式和处理方式上稍微有所不同。具体的DoCompactionWork的过程请参考《leveldb源码分析--SSTable之Compaction》,我们下面通过对比 差异之处的方式让大家理解DoCompactionWorkSelfLevel的实际处理过程。

DoCompactionWorkSelfLevel 和 DoCompactionWork基本相同,只是流程上少了几个判断:

1. DoCompactionWorkSelfLevel 遍历到key以后不需要进行ShouldStopBefore的判断,因为这个是判断是否跟L + 2层有过多的重叠,这里selflevel是输出到当前层,所以必然不会影响跟L+ 2层的重叠情况;

Slice key = input->key();
    // if (compact->compaction->ShouldStopBefore(key) &&
    //     compact->builder != NULL) {
    //   status = FinishCompactionOutputFile(compact, input);
    //   if (!status.ok()) {
    //     break;
    //   }
    // }
    // 注释掉的地方即是CompactionWorkSelfLevel减少的部分

2. 是否drop的时候少了seq<= smallest_snapshot && (type == Deletion || ShouldDrop) && IsBaseLevelForKey(ikey)) 为drop,也是因为当前层的Compaction,而IsBaseLevelForKey是判断的L + 2层以上有无该key的相关值,这里如果要加上判断就应该是将 L+1 层也包含在判断范围内。

} else if (ikey.sequence <= compact->smallest_snapshot &&
                 (ikey.type == kTypeDeletion || // deleted or ..
                  user_comparator()->ShouldDropMaybe(ikey.user_key.data(),
                                                     ikey.sequence, expired_end_time)) &&
                 // .. user-defined should drop(maybe),
                 // based on some condition(eg. this key only has this update.).
                 compact->compaction->IsBaseLevelForKey(ikey.user_key)) {
        // For this user key:
        // (1) there is no data in higher levels
        // (2) data in lower levels will have larger sequence numbers
        // (3) data in layers that are being compacted here and have
        //     smaller sequence numbers will be dropped in the next
        //     few iterations of this loop (by rule (A) above).
        // Therefore this deletion marker is obsolete and can be dropped.
        drop = true;
      }

3. 在InstallCompactionResults 时第二个参数传入的是false,这样这个函数将新生成的SSTable放入当前层而不是L+ 1层

status = InstallCompactionResults(compact);
// 修改为:
status = InstallCompactionResults(compact, false); // output files is in current level, not level + 1

另外这里顺便提一下tair中的leveldb是对google开源的leveldb也有了一定的修改,比如添加expire等特性,在tair中comparator就添加了三个接口函数。而在tair中实现了两个这样的comparator分别是NumericalComparatorImpl和BitcmpLdbComparatorImpl,我们这里以BitcmpLdbComparatorImpl为例进行一下简单的介绍其功能。

// 判断这个key是否在需要回收的bucket中,如果是就返回true,那么Compaction的时候直接删除(即回收掉)
  virtual bool ShouldDrop(const char* key, int64_t sequence, uint32_t now = 0) const { return false;}
  // 根据expire_time判断这个key是否已经过期,如过期则返回true
  virtual bool ShouldDropMaybe(const char* key, int64_t sequence, uint32_t now = 0) const { return false;}
  // start_key和key是否依旧属于同一个bucket,是的放回false,否则返回true
  virtual bool ShouldStopBefore(const Slice& start_key, const Slice& key) const { return false;}

有了这三个函数以后tair的ldb引擎就可以在Compaction的时候对key进行回收和判断是否写入同一个SSTable中,比如最直接的Compaction的时候如果ShouldDrop返回true那么直接标记这个key为drop不写入到新的SSTable中;而ShouldStopBefore则被用在是否生成新的SSTable文件,如果返回true则结束当前文件的写入生成下一个SSTable,这样就可以将不同的bucket写入到不同的SSTable文件中了。

tair源码分析——leveldb新增的CompactRangeSelfLevel过程

时间: 2024-10-03 13:20:22

tair源码分析——leveldb新增的CompactRangeSelfLevel过程的相关文章

tair源码分析——leveldb存储引擎使用

分析完leveldb以后,接下来的时间准备队tair的源码进行阅读和分析.我们刚刚分析完了leveldb而在tair中leveldb是其几大存储引擎之一,所以我们这里首先从tair对leveldb的使用和修改来窥探在这个分布式的存储引擎中是如何将area和bucket持久化到存储,并且方便bucket和area的处理的. 我们首先来看tair中key的结构,我们以大致梳理存储和查询一个KV的流程来确认key的处理.tair_client_api::put直接调用tair_client_impl:

【Spring源码分析】原型Bean实例化过程、byName与byType及FactoryBean获取Bean源码实现

原型Bean加载过程 之前的文章,分析了非懒加载的单例Bean整个加载过程,除了非懒加载的单例Bean之外,Spring中还有一种Bean就是原型(Prototype)的Bean,看一下定义方式: 1 <?xml version="1.0" encoding="UTF-8"?> 2 <beans xmlns="http://www.springframework.org/schema/beans" 3 xmlns:xsi=&qu

Tomcat源码分析--一次HTTP请求过程

前两篇我们分析了Tomcat服务的启动过程和Connector(连接器).现在让我们看看一次Tomcat服务器是怎么提供HTTP服务的. 上文我们说到endpoint是底层处理I/O具体实现类,那么一次HTTP首先也要从这个类中开始.还是以NIOEndPoint实现类为例子.在NIOEndPoint类中有一个名为Acceptor内部类.该内部类负责接收即将到来的TCP/IP连接,并将它们分配给合适的processor处理. HTTP底层是TCP协议,Java实现TCP协议的具体的方式就是Sock

ViewPager源码分析——滑动切换页面处理过程

上周客户反馈Contacts快速滑动界面切换tab有明显卡顿,让优化. 自己验证又没发现卡顿现象,但总得给客户一个技术性的回复,于是看了一下ViewPager源码中处理滑动切换tab的过程. ViewPager  源码位置: android\frameworks\support\v4\java\android\support\v4\view\ViewPager.java ViewPager其实就是一个重写的ViewGroup,使用ViewPager可以参考SDK中的demo:sdk\extras

f2fs源码分析(一)mount 过程

许多文章会介绍F2FS,对于入门者来说能够了解个F2FS全貌,但是真正了解这个年轻的文件系统还是要看源码的.网上F2fs源码导读的文章,我到现在还是没看过,所以就用这几篇博客来介绍下f2fs,以期对f2fs有更加深入的认识,甚至对整个IO路径的认知有所启发. 下面 文件系统的包括文件系统在磁盘上的布局,也包括在驻留在内存中的文件系统的“驱动” mount过程主要是新建段管理器(segment manager),节点管理器(node manager).其中,段管理器是为了垃圾回收,因为垃圾回收算法

Qt事件分发机制源码分析之QApplication对象构建过程

我们在新建一个Qt GUI项目时,main函数里会生成类似下面的代码: int main(int argc, char *argv[]) { QApplication application(argc, argv); CQDialog dialog(NULL); dialog.show(); return application.exec(); } 对应的步骤解释如下 1.构建QApplication对象 2.构建CQDialog主界面 3.主界面显示 4.QApplication对象进入事件循

Tair源码分析_对照表创建流程

Tair 中存放在主节点中的表的创建共分为五步: 1)根据当前m_hash_table表,统计出每一个alive节点上存储的bucket数量(包括master bucket和slave bucket).假设有节点A,B,C和D,则形成数据如下的map,同时在扫描主节点的过程当中判断主节点是否down,如果不可用,则需要执行第五步快速建表: 注意: 在检查可用节点列表的过程中如果为负载均衡优先策略,group会统计当前可用节点列表,将该列表与m_hash_table一起作为算法的输入.这里对可用节

Hadoop源码分析(1):HDFS读写过程解析

一.文件的打开 1.1.客户端 HDFS打开一个文件,需要在客户端调用DistributedFileSystem.open(Path f, int bufferSize),其实现为: public FSDataInputStream open(Path f, int bufferSize) throws IOException { return new DFSClient.DFSDataInputStream( dfs.open(getPathName(f), bufferSize, verif

hadoop源码分析(2):Map-Reduce的过程解析

一.客户端 Map-Reduce的过程首先是由客户端提交一个任务开始的. 提交任务主要是通过JobClient.runJob(JobConf)静态函数实现的: public static RunningJob runJob(JobConf job) throws IOException { //首先生成一个JobClient对象 JobClient jc = new JobClient(job); …… //调用submitJob来提交一个任务 running = jc.submitJob(jo