leveldb学习:dbimpl(1)

leveldb将数据库的有关操作都定义在了DB类,它负责整个系统功能组件的连接和调用,是整个系统的脊柱。

level::DB是一个接口类,真正的实现在DBimpl类。

作者在文档impl.html中描述了leveldb的实现,其中包括文件组织、compaction和recovery等等。

DBimpl的成员变量包括:字符比较器internal_comparator_、配置类options_、bool型状态量、string型DB库名、cache对象、memtable对象、versionset对象等等前面所说的组件。

前面的讲解组件部分时,分散地介绍过leveldb的文件系统。这里下面来统一说明下创建一个DB,会在硬盘里生成一些什么样的文件,以下翻译自impl.html:

1 dbname/[0-9]+.log:

log文件包含了最新的db更新,每个entry更新都以append的方式追加到文件结尾。

2 dbname/[0-9]+.sst:db的sstable文件

Leveldb把sstable文件通过level的方式组织起来,从log文件中生成的sstable被放在level 0。当level 0的sstable文件个数超过设置时,leveldb就把所有的level 0文件,以及有重合的level 1文件merge起来,组织成一个新的level 1文件。

3 dbname/MANIFEST-[0-9]+:DB元信息文件

它记录的是leveldb的元信息,比如DB使用的Comparator名,以及各SSTable文件的管理信息:如Level层数、文件名、最小key和最大key等等。

4 dbname/CURRENT:记录当前正在使用的Manifest文件

它的内容就是当前的manifest文件名;因为在LevleDb的运行过程中,随着Compaction的进行,新的SSTable文件被产生,老的文件被废弃。并生成新的Manifest文件来记载sstable的变动,而CURRENT则用来记录我们关心的Manifest文件。

5 dbname/log:系统的运行日志,和options_.info_log有关,记录系统的运行信息或者错误日志。

主要函数:

Options SanitizeOptions(const std::string& dbname,
                        const InternalKeyComparator* icmp,
                        const InternalFilterPolicy* ipolicy,
                        const Options& src)

option修正函数,将用户定义的option做一定的检查和修正,返回规范的option对象。主要就是设置字符比较器,检查一些参数的设置(比如最大文件大小、写缓冲区的大小,sstable的block大小是否在规定值范围内)、建立log文件等等。

Status DBImpl::NewDB() {
  VersionEdit new_db;
  new_db.SetComparatorName(user_comparator()->Name());
  new_db.SetLogNumber(0);
  new_db.SetNextFile(2);
  new_db.SetLastSequence(0);
  const std::string manifest = DescriptorFileName(dbname_, 1);
  WritableFile* file;
  Status s = env_->NewWritableFile(manifest, &file);
  if (!s.ok()) {
    return s;
  }
  {
    log::Writer log(file);
    std::string record;
    new_db.EncodeTo(&record);
    s = log.AddRecord(record);
    if (s.ok()) {
      s = file->Close();
    }
  }
  delete file;
  if (s.ok()) {
    // Make "CURRENT" file that points to the new manifest file.
    s = SetCurrentFile(env_, dbname_, 1);
  } else {
    env_->DeleteFile(manifest);
  }
  return s;
}

初始化一个新的DB对象,主要创建一个manfest文件,并调用versionedit::encodeto写入新db的信息(如comparator,lognumder,nextfilenumber,sstable信息),此函数在open()操作中被调用,完成创建DB的一步。

void DBImpl::DeleteObsoleteFiles()

根据i节点删除db中的文件,会对文件的类型和内容做一个判断,首先,正在compact的sstable不删,versionset中各个版本下的sstable文件不删,当前的log和manfest文件不删。调用env_->DeleteFile删除文件。

Status DBImpl::Recover(VersionEdit* edit) 

DB恢复函数,基于前面介绍的文件系统

1.recover首先找到当前数据库dbname_路径下的current文件,参考函数CurrentFileName(dbname_),文件错误或者不存在,恢复都无法继续进行),2.然后调用versionset::recover(),读取manfest文件,通过一个versionedit对象中间过渡,恢复出新的version。

3.遍历dbname_文件下的文件,对比当前版本集合versions_中记录的sstable,如果缺失,输出缺失的文件i节点,recover失败,否则

恢复log文件(参考RecoverLogFile函数)

Status DBImpl::RecoverLogFile(uint64_t log_number,
                              VersionEdit* edit,
                              SequenceNumber* max_sequence)

从log文件中逐条恢复entry,并写入新建立的memtable。并在合适的条件下(memtable大小大于写缓存下限:mem->ApproximateMemoryUsage() > options_.write_buffer_size),写入level_0的sstable中(参考函数WriteLevel0Table)

Status DBImpl::WriteLevel0Table(MemTable* mem, VersionEdit* edit,
                                Version* base)

将memtable dump到磁盘,也就是level-0的sstable中。

1.首先产生一个新文件,并记录在文件描述结构FileMetaData中

2.利用memtable的迭代器Iterator遍历memtable中的KV数据,构造sstable(参考函数BuildTable,还记得前面介绍table和block么,要对memtable的kv做进一步的打包,才能形成kv的磁盘形式)

3.把新的文件变化信息存储进versionedit,并记录这次compact的信息,主要是耗时和写入的sstable大小。

注:PickLevelForMemTableOutput函数,新的sstable定级,不能和同级的sstable有overlap,也不能和上级的sstable overlap太多(> kMaxGrandParentOverlapBytes)

WriteLevel0Table是函数CompactMemTable的核心。

leveldb中有且只有一个进程单独做compact,当主线程触发compact,调用void DBImpl::MaybeScheduleCompaction(),如果compact正在运行或者DB正在退出,直接返回;检查version中是否存在需要compact,有则触发后台调度env_->schedele(…)

void DBImpl::MaybeScheduleCompaction() {
  mutex_.AssertHeld();
  if (bg_compaction_scheduled_) {
    // Already scheduled
  } else if (shutting_down_.Acquire_Load()) {
    // DB is being deleted; no more background compactions
  } else if (!bg_error_.ok()) {
    // Already got an error; no more changes
  } else if (imm_ == NULL &&
             manual_compaction_ == NULL &&
             !versions_->NeedsCompaction()) {
    // No work to be done
  } else {
    bg_compaction_scheduled_ = true;
    env_->Schedule(&DBImpl::BGWork, this);
  }
}

schedele把compact处理程序函数指针和db对象指针传入后台任务队列,BGWork 是compact处理函数,Schedule函数如下:

void PosixEnv::Schedule(void (*function)(void*), void* arg) {
  PthreadCall("lock", pthread_mutex_lock(&mu_));
  // Start background thread if necessary
  if (!started_bgthread_) {
    started_bgthread_ = true;
    PthreadCall(
        "create thread",
        pthread_create(&bgthread_, NULL,  &PosixEnv::BGThreadWrapper, this));
  }
  // If the queue is currently empty, the background thread may currently be
  // waiting.
  if (queue_.empty()) {
    PthreadCall("signal", pthread_cond_signal(&bgsignal_));
  }
  // Add to priority queue
  queue_.push_back(BGItem());
  queue_.back().function = function;
  queue_.back().arg = arg;
  PthreadCall("unlock", pthread_mutex_unlock(&mu_));
}

将处理函数放入任务队列中,后台进程就可以不断地从queue_中取出任务函数,并执行。

实际compact处理进程是BackgroundCall和BackgroundCompaction。BackgroundCall完成一些判断,条件符合则调用BackgroundCompaction,compact完成后再次触发compact,重复上述过程。

void DBImpl::BackgroundCall() {
  MutexLock l(&mutex_);
  assert(bg_compaction_scheduled_);
  if (shutting_down_.Acquire_Load()) {
    // No more background work when shutting down.
  } else if (!bg_error_.ok()) {
    // No more background work after a background error.
  } else {
    BackgroundCompaction();
  }
  bg_compaction_scheduled_ = false;
  // Previous compaction may have produced too many files in a level,
  // so reschedule another compaction if needed.
  MaybeScheduleCompaction();
  bg_cv_.SignalAll();
}

实际compact流程:

void DBImpl::BackgroundCompaction() {
  mutex_.AssertHeld();
  if (imm_ != NULL) {
    CompactMemTable();
    return;
  }
  Compaction* c;
  bool is_manual = (manual_compaction_ != NULL);
  InternalKey manual_end;
  if (is_manual) {
    ManualCompaction* m = manual_compaction_;
    c = versions_->CompactRange(m->level, m->begin, m->end);
    m->done = (c == NULL);
    if (c != NULL) {
      manual_end = c->input(0, c->num_input_files(0) - 1)->largest;
    }
    Log(options_.info_log,
        "Manual compaction at level-%d from %s .. %s; will stop at %s\n",
        m->level,
        (m->begin ? m->begin->DebugString().c_str() : "(begin)"),
        (m->end ? m->end->DebugString().c_str() : "(end)"),
        (m->done ? "(end)" : manual_end.DebugString().c_str()));
  } else {
    c = versions_->PickCompaction();
  }
  Status status;
  if (c == NULL) {
    // Nothing to do
  } else if (!is_manual && c->IsTrivialMove()) {
    // Move file to next level
    assert(c->num_input_files(0) == 1);
    FileMetaData* f = c->input(0, 0);
    c->edit()->DeleteFile(c->level(), f->number);
    c->edit()->AddFile(c->level() + 1, f->number, f->file_size,
                       f->smallest, f->largest);
    status = versions_->LogAndApply(c->edit(), &mutex_);
    if (!status.ok()) {
      RecordBackgroundError(status);
    }
    VersionSet::LevelSummaryStorage tmp;
    Log(options_.info_log, "Moved #%lld to level-%d %lld bytes %s: %s\n",
        static_cast<unsigned long long>(f->number),
        c->level() + 1,
        static_cast<unsigned long long>(f->file_size),
        status.ToString().c_str(),
        versions_->LevelSummary(&tmp));
  } else {
    CompactionState* compact = new CompactionState(c);
    status = DoCompactionWork(compact);
    if (!status.ok()) {
      RecordBackgroundError(status);
    }
    CleanupCompaction(compact);
    c->ReleaseInputs();
    DeleteObsoleteFiles();
  }
  delete c;
  if (status.ok()) {
    // Done
  } else if (shutting_down_.Acquire_Load()) {
    // Ignore compaction errors found during shutting down
  } else {
    Log(options_.info_log,
        "Compaction error: %s", status.ToString().c_str());
  }
  if (is_manual) {
    ManualCompaction* m = manual_compaction_;
    if (!status.ok()) {
      m->done = true;
    }
    if (!m->done) {
      // We only compacted part of the requested range.  Update *m
      // to the range that is left to be compacted.
      m->tmp_storage = manual_end;
      m->begin = &m->tmp_storage;
    }
    manual_compaction_ = NULL;
  }
}

1.如果存在immutable memtable,将其dump成sstable,完成返回。

2.如果是外部触发的compact,根据manual_compaction指定的level/start_key/end_key,选出compaction(VersionSet::CompactRange())

3.如果不是manual compact,则根据db当前状态,选出compaction(VersionSet::PickCompaction()),考虑到level sstable的均衡性,提高查找效率。class compaction用于记录compact信息,包括compact的level和输入sstable文件等等,参见version_set.h。

4.对于manual compact并且选出的sstable都处于level-n且不会造成过多的GrandparentOverrlap(Compaction::IsTrivialMove()),简单处理,将这些sstable推到level-n+1,更新db元信息即可(VersionSet::LogAndApply())。

5.其他情况,则一律根据确定出的Compaction,做具体的compact处理(DBImpl::DoCompactionWork()),最后做异常情况的清理(DBImpl::CleanupCompaction())。

未完待续!

版权声明:本文为博主原创文章,未经博主允许不得转载。

时间: 2024-10-28 13:12:31

leveldb学习:dbimpl(1)的相关文章

LevelDb学习资料

LevelDb学习资料 标签(空格分隔): db,k/v_db 以下是leveldb的介绍资料 初识LevelDb 整体架构 log文件 SSTable文件 MemTable 写入与删除记录 如何根据Key读取记录? Compaction levelDb中的Cache LevelDB性能分析和表现 levelDB tutorial LevelDB 网址:http://leveldb.org/ 基于levelDB做优化的RocksDB,由facebook维护开发,仍然在更新,其网址:http://

leveldb 学习。

1)大概浏览了leveldb文档的介绍.本想逐步看代码,想想还是自己先实现一个看看如何改进. 2)完成了一个非常丑陋的初版,但是还是比初初版有进步. 3)key value的数据库,不允许有key重复,所以必须检测key. 1,插入检测key重复,太耗时间,不可能去检查数据文件.明显必须加入一个索引文件.形式key,offset. 2,  key,offset的索引形式,数据到达5w,简直不可忍受.插入数据时,必须对索引文件排序,之后可以二分法查找key. 3,排序,二分查找法,又必须要求可以对

leveldb学习:skiplist

leveldb中的memtable仅仅是一个封装类,它的底层实现是一个跳表. 跳表是一种基于随机数的平衡数据结构.其它的平衡数据结构还有红黑树.AVL树.但跳表的原理比它们简单非常多.跳表有点像链表,仅仅只是每一个节点是多层结构,通过在每一个节点中添加向前的指针提高查找效率.例如以下图: 在/leveldb/db目录下有跳表的实现skiplist.h和跳表的測试程序skiplist_test.cc. template<typename Key, class Comparator> class

leveldb学习之version

到此为止,基本上leveldb的主要功能组件都已经分析完了,下面如何把它们组合在一起形成一个高效稳定的数据库,这就是DBimpl类和compact进程的工作. campact进程 为了均衡读写的效率,sstable文件分层次(level)管理,db预定义了最大的level值.compact负责将memtable持久化成sstable,以及均衡整个db中各level的sstable. 版本控制 当执行一次compaction后,Leveldb将在当前版本基础上创建一个新版本,当前版本就变成了历史版

leveldb学习笔记

LevelDB由 Jeff Dean和Sanjay Ghemawat开发. LevelDb是能够处理十亿级别规模Key-Value型数据持久性存储的C++ 程序库. 特别如下: 1.LevelDb是一个持久化存储的KV系统,将大部分数据存储到磁盘上. 2.LevleDb在存储数据时,是根据记录的key值有序存储的,应用可以自定义key大小比较函数. 3.LevelDb的操作接口包括写记录,读记录以及删除记录.针对多条操作的原子批量操作. 4.LevelDb支持数据快照(snapshot)功能,使

leveldb学习:Cache

leveldb自己实现了cache缓冲区替代算法,参见代码cache.h和cache.c文件.leveldb中table_cache等都是以class cache作为底层实现. cache.h中,我们看到cache类是一个抽象类,声明了lookup;insert;release;value;erase等函数,同时声明了一个全局函数 extern Cache* NewLRUCache(size_t capacity); 用来构造cache派生类对象,并返回派生类指针.那么cache的派生类究竟是什

leveldb 学习笔记之VarInt

在leveldb在查找比较时的key里面保存key长度用的是VarInt,何为VarInt呢,就是变长的整数,每7bit代表一个数,第8bit代表是否还有下一个字节, 比如小于128(一个字节以内)的值生成方式如下: 将该值与二进制1000 0000值进行比较,如果小于,则直接将该值作为unsigned char也就是整数值写入. 一个大于128也就是超过1个字节的变长整数的生成方式如下: 将该值与二进制1000 000 000 0000 (32768=2<<14)进行比较,如果小于,则将该值

leveldb学习:versionedit和versionset

VersionEdit: compact过程中会有一系列改变当前Version的操作(FileNumber增加,删除input的sstable,增加输出的sstable),为了缩小version切换的时间点,将这些操作封装成versionedit,compact完成时,将versionedit中的操作一次应用到当前version即可得到最新状态的version. versionedit的成员变量: private: friend class VersionSet; typedef std::se

leveldb学习之sstable(2)

block写入:block_builder block.h和.cc里定义了block的entry存储格式和restart,提供了entry的查找接口以及迭代器.那么如何往写block里写entry呢?leveldb遵循面向对象的设计理念在block_builder类里提供了相关接口. BlockBuilder相关函数: Add( )将entry顺序写入现有block数据块的末端,排序工作在上层的函数完成. Finish( ),当block写满,完成写入重启点数组和重启点个数的写入 Reset(