folly::AtomicHashmap源码分析

本文为原创,转载请注明:http://www.cnblogs.com/gistao/

Atomic的两点背景

看下这个场景,老张去厕所,发现门是锁着的,他就在门口等着里边人出来,此时小王也来了,他想了想,决定去楼上的厕所碰碰运气。

如果把门类比为一种竞争资源的话,老张就像mutex,而小王更像atomic,注意是像而已。

atomic跟传统的通过临界区加锁来避免竞争的多线程处理方式来说,它更像是一种状态机编程,根据当前的状态做出相应的逻辑。

而至于是小王还是老张谁先解决内急,无从得知,同样,mutex vs atomic 的性能对比,实测见真知

atomic支持的数据类型有限,其他信息可以查看之前我的一篇blog:躲不开的多线程

Atomic是否适合hashmap

hashmap的数据存储一般是array,每个元素按照自己的index(下标)存放,数据结构天然决定了非常适合lock-free(atomic)。

但hashmap有两个‘讨厌‘的技术点:rehash和probe。

这里的rehash是说当‘空间不够时‘,需要重新申请一块大的内存,并对之前所有的元素重新hash计算,然后确认其新的index,最后

再一次插入的过程。rehash的同时其他操作不能并行,比如查找,试想一下,在高度并发情况下,百万级别元素的rehash操作是多么的糟糕,

当然这与选择atomic无关,并发情况下,最好不要支持rehash功能。

这里probe是说当多个key求数组下标时,冲突不可能完全避免,自然就有了解决冲突算法:probe,probe的效率相应的决定了查找的效率。

在高度并发情况下,开链比线性探查更适合,因为一个桶的冲突不会影响其他桶。当然开链没有线性探查的局部性好,代码也比线性探查复杂,

使用atomic来编写lock-free数据结构显示不是一件容易的事情,所以显然用后者更可控一些。

AtomicHashmap的关键指标

hashmap的关键指标可能是平均查询时间,也可能是较充分的内存利用。但在AtomicHashmap的设计里,我觉得并发执行效率是第一考虑因素,

所以‘锁‘一定发生在同一个元素操作中。

其他指标应该等同于一般的hashmap。

AtomicHashmap的使用例子

   class Counters {
    private:
     AtomicHashMap<int64_t,int64_t> ahm;

    public:
     explicit Counters(size_t numCounters) : ahm(numCounters) {}

     void increment(int64_t obj_id) {
       auto ret = ahm.insert(make_pair(obj_id, 1));
       if (!ret.first) {
         // obj_id already exists, increment
         NoBarrier_AtomicIncrement(&ret.first->second, 1);
       }
     }

     int64_t getValue(int64_t obj_id) {
       auto ret = ahm.find(obj_id);
       return ret != ahm.end() ? ret->second : 0;
     }

     // Serialize the counters without blocking increments
     string toString() {
       string ret = "{\n";
       ret.reserve(ahm.size() * 32);
       for (const auto& e : ahm) {
         ret += folly::to<string>(
           "  [", e.first, ":", NoBarrier_Load(&e.second), "]\n");
       }
       ret += "}\n";
       return ret;
     }
   };

AtomicHashmap的实现细节

数据组织结构

数据为二层组织结构,一级数组存储的是二级数组指针,二级数组为元素的真实存储空间,元素是[key,value]的pair。

类组织结构

AtomicHashMap类主要负责对AtomicHashArray对象的创建和管理,以及接口的封装。

AtomicHashArray是hashmap的实现类:插入、查找、删除

空间初始化

当首次插入时

  //容量为size和加载因子的除数
  size_t capacity = size_t(maxSize / maxLoadFactor);
  size_t sz = sizeof(AtomicHashArray) + sizeof(value_type) * capacity;

  auto const mem = Allocator().allocate(sz);
  //将对象指针绑在mem上
  new (mem) AtomicHashArray(capacity, c.emptyKey, c.lockedKey, c.erasedKey,
                              c.maxLoadFactor, c.entryCountThreadCacheSize);

  //key全部初始化为empty
  FOR_EACH_RANGE(i, 0, map->capacity_) {
    cellKeyPtr(map->cells_[i])->store(map->kEmptyKey_,
      std::memory_order_relaxed);
  }

当插满时

    //根据增长因子确认要分配的大小,要分配的大小<*2
    size_t numCellsAllocated = (size_t)
      (primarySubMap->capacity_ *
       std::pow(1.0 + kGrowthFrac_, nextMapIdx - 1));
    size_t newSize = (int) (numCellsAllocated * kGrowthFrac_);
//初始化一个新submap
    Config config;
    config.emptyKey = primarySubMap->kEmptyKey_;
    config.lockedKey = primarySubMap->kLockedKey_;
    config.erasedKey = primarySubMap->kErasedKey_;
    config.maxLoadFactor = primarySubMap->maxLoadFactor();
    config.entryCountThreadCacheSize =
      primarySubMap->getEntryCountThreadCacheSize();
    subMaps_[nextMapIdx].store(SubMap::create(newSize, config).release(),
      std::memory_order_relaxed);

插入算法

AtomicHashArray的插入算法

insertInternal(KeyT key_in, T&& value) {
  //hash + %size ,获取下标
  size_t idx = keyToAnchorIdx(key_in);
  size_t numProbes = 0;
  for (;;) {
    value_type* cell = &cells_[idx];
    //判断cell是否被使用
    if (relaxedLoadKey(*cell) == kEmptyKey_) {

      if (isFull_.load(std::memory_order_acquire)) {
        //已经满了,就不能插入了
        //返回capacity_,告诉AtomicHashMap类:我这个submap不能再插入了
        return SimpleRetT(capacity_, false);
      } else {
        //还没有满
        if (tryLockCell(cell)) {
          //tryLockCell其实是compare_exchange_strong,即当这个cell的key为empty时
          //将empty状态修改为lock状态
          new (&cell->second) ValueT(std::forward<T>(value));
          //将cell的key字段标记为插入的key
          unlockCell(cell, key_in); // Sets the new key
          //已经插入的元素>=最大元素时,标记isFull_为true
          //最大元素maxEntries_ == 初始化时的参数size
          if (numEntries_.readFast() >= maxEntries_) {
            isFull_.store(NO_NEW_INSERTS, std::memory_order_relaxed);
          }
          //插入成功
          return SimpleRetT(idx, true);
        }
        //注意:线程走到这里,说明之前没有成功trylockcell
        //线程继续往下走
      }
    }
    if (kLockedKey_ == acquireLoadKey(*cell)) {
      //cell还在被锁定,说明其他线程还在插入这个cell
      //等待其他线程插入完成
      //为什么要等待,因为其他线程插入可能会失败,也可能其他线程和本身线程插入的key一模一样,
      //需要特定的逻辑
      FOLLY_SPIN_WAIT(
        kLockedKey_ == acquireLoadKey(*cell)
      );
    }

    const KeyT thisKey = acquireLoadKey(*cell);
    if (EqualFcn()(thisKey, key_in)) {
      //比较两个key一样,那本次插入失败,因为之前已经有一个成功插入了
      return SimpleRetT(idx, false);
    } else if (thisKey == kEmptyKey_ || thisKey == kLockedKey_) {
      //两个key不一样,状态又不是插入成功的状态,那么continue了
      continue;
    }

    ++numProbes;
    if (UNLIKELY(numProbes >= capacity_)) {
      //所有的元素空间都遍历一遍了,还是没成功插入,只有失败了
      return SimpleRetT(capacity_, false);
    }
    //线性探测
    idx = probeNext(idx, numProbes);
  }
}

当AtomicHashArray插入失败,AtomicHashmap会创建一个新的submap,继续插入,周而复始,直至失败。

查找算法

AtomicHashArray的查找算法

findInternal(const KeyT key_in) {
  //查找失败,那么遍历查找
  for (size_t idx = keyToAnchorIdx(key_in), numProbes = 0;
       ;
       idx = probeNext(idx, numProbes)) {
    //根据下标,获取key,如果一致,那么查找成功
    const KeyT key = acquireLoadKey(cells_[idx]);
    if (LIKELY(EqualFcn()(key, key_in))) {
      return SimpleRetT(idx, true);
    }
    //key是个空,说明之前并没有插入过
    if (UNLIKELY(key == kEmptyKey_)) {
      // if we hit an empty element, this key does not exist
      return SimpleRetT(capacity_, false);
    }
    ++numProbes;
    //最坏的情况,全部遍历了一遍,还是没查找,所以失败了
    if (UNLIKELY(numProbes >= capacity_)) {
      // probed every cell...fail
      return SimpleRetT(capacity_, false);
    }
  }
}

这里查找有三个关键点

  • 要查的key发生过冲突,那么查找最好的情况是往后遍历一个或者几个找到元素或者发现空元素,然后结束查找,而最坏的情况是遍历查找一遍,当然这种可能性很小,取决于填充因子
  • 要查的key并没有插入过,那么最好的情况是遍历到空元素结束,最坏的情况是遍历查找一遍
  • 要查的key已经被删除了,这种情况同上

当AtomicHashArray查找失败,AtomicHashmap会寻找下一个被初始化过的submap进行查找,周而复始,直至失败。

删除算法

AtomicHashArray的删除算法

erase(KeyT key_in) {
  //遍历删除
  for (size_t idx = keyToAnchorIdx(key_in), numProbes = 0;
       ;
       idx = probeNext(idx, numProbes)) {
    //获取下标的key
    value_type* cell = &cells_[idx];
    KeyT currentKey = acquireLoadKey(*cell);
    //如果key是空的,说明不存在
    //如果key正在被插入,那么也是不存在的
    if (currentKey == kEmptyKey_ || currentKey == kLockedKey_) {
      // If we hit an empty (or locked) element, this key does not exist. This
      // is similar to how it‘s handled in find().
      return 0;
    }

    if (EqualFcn()(currentKey, key_in)) {
      //找到了元素
      KeyT expect = currentKey;
      if (cellKeyPtr(*cell)->compare_exchange_strong(expect, kErasedKey_)) {
        numErases_.fetch_add(1, std::memory_order_relaxed);

        //将key标记为erase
        //为什么没有释放内存,为什么不再把key标记为empty
        //因为查找的时候,是把value的指针给的用户,没有时机做那些
        return 1;
      }
      //别的线程已经搞定了删除,那么本线程返回就可以了
      //返回0?应该返回1
      return 0;
    }
    //key不一样,说明之前冲突过
    //那么遍历往下找吧
    ++numProbes;
    if (UNLIKELY(numProbes >= capacity_)) {
      return 0;
    }
  }
}

当AtomicHashArray删除失败,AtomicHashmap会寻找下一个被初始化过的submap进行删除,周而复始,直至失败

AtomicHashmap的限制

  • key只支持32和64位
  • 当插入元素数量超过初始化的大小后,性能呈线性下降
  • 元素被删除后,空间并没有释放,并且不能再次使用
时间: 2024-12-29 11:46:39

folly::AtomicHashmap源码分析的相关文章

folly::AtomicHashmap源码分析(二)

本文为原创,转载请注明:http://www.cnblogs.com/gistao/ 背景 上一篇只是细致的把源码分析了一遍,而源码背后的设计思想并没有写,设计思想往往是最重要的,没有它,基本无法做整体性的优化或正确的使用, 但是根据结果反推原因是困难的,也极容易不到位,这里‘磕磕绊绊’写下自己的理解,另外对源码里的‘问题’也写出来. 简单 调试一个多线程程序是比较头疼的,而使用atomic来编写一个正确的多线程数据结构更是困难的,出了问题一般都不是随机问题,且等着复现看log吧, 所以简单这个

TeamTalk源码分析之login_server

login_server是TeamTalk的登录服务器,负责分配一个负载较小的MsgServer给客户端使用,按照新版TeamTalk完整部署教程来配置的话,login_server的服务端口就是8080,客户端登录服务器地址配置如下(这里是win版本客户端): 1.login_server启动流程 login_server的启动是从login_server.cpp中的main函数开始的,login_server.cpp所在工程路径为server\src\login_server.下表是logi

Android触摸屏事件派发机制详解与源码分析二(ViewGroup篇)

1 背景 还记得前一篇<Android触摸屏事件派发机制详解与源码分析一(View篇)>中关于透过源码继续进阶实例验证模块中存在的点击Button却触发了LinearLayout的事件疑惑吗?当时说了,在那一篇咱们只讨论View的触摸事件派发机制,这个疑惑留在了这一篇解释,也就是ViewGroup的事件派发机制. PS:阅读本篇前建议先查看前一篇<Android触摸屏事件派发机制详解与源码分析一(View篇)>,这一篇承接上一篇. 关于View与ViewGroup的区别在前一篇的A

HashMap与TreeMap源码分析

1. 引言     在红黑树--算法导论(15)中学习了红黑树的原理.本来打算自己来试着实现一下,然而在看了JDK(1.8.0)TreeMap的源码后恍然发现原来它就是利用红黑树实现的(很惭愧学了Java这么久,也写过一些小项目,也使用过TreeMap无数次,但到现在才明白它的实现原理).因此本着"不要重复造轮子"的思想,就用这篇博客来记录分析TreeMap源码的过程,也顺便瞅一瞅HashMap. 2. 继承结构 (1) 继承结构 下面是HashMap与TreeMap的继承结构: pu

Linux内核源码分析--内核启动之(5)Image内核启动(rest_init函数)(Linux-3.0 ARMv7)【转】

原文地址:Linux内核源码分析--内核启动之(5)Image内核启动(rest_init函数)(Linux-3.0 ARMv7) 作者:tekkamanninja 转自:http://blog.chinaunix.net/uid-25909619-id-4938395.html 前面粗略分析start_kernel函数,此函数中基本上是对内存管理和各子系统的数据结构初始化.在内核初始化函数start_kernel执行到最后,就是调用rest_init函数,这个函数的主要使命就是创建并启动内核线

Spark的Master和Worker集群启动的源码分析

基于spark1.3.1的源码进行分析 spark master启动源码分析 1.在start-master.sh调用master的main方法,main方法调用 def main(argStrings: Array[String]) { SignalLogger.register(log) val conf = new SparkConf val args = new MasterArguments(argStrings, conf) val (actorSystem, _, _, _) =

Solr4.8.0源码分析(22)之 SolrCloud的Recovery策略(三)

Solr4.8.0源码分析(22)之 SolrCloud的Recovery策略(三) 本文是SolrCloud的Recovery策略系列的第三篇文章,前面两篇主要介绍了Recovery的总体流程,以及PeerSync策略.本文以及后续的文章将重点介绍Replication策略.Replication策略不但可以在SolrCloud中起到leader到replica的数据同步,也可以在用多个单独的Solr来实现主从同步.本文先介绍在SolrCloud的leader到replica的数据同步,下一篇

zg手册 之 python2.7.7源码分析(4)-- pyc字节码文件

什么是字节码 python解释器在执行python脚本文件时,对文件中的python源代码进行编译,编译的结果就是byte code(字节码) python虚拟机执行编译好的字节码,完成程序的运行 python会为导入的模块创建字节码文件 字节码文件的创建过程 当a.py依赖b.py时,如在a.py中import b python先检查是否有b.pyc文件(字节码文件),如果有,并且修改时间比b.py晚,就直接调用b.pyc 否则编译b.py生成b.pyc,然后加载新生成的字节码文件 字节码对象

LevelDB源码分析--Iterator

我们先来参考来至使用Iterator简化代码2-TwoLevelIterator的例子,略微修改希望能帮助更加容易立即,如果有不理解请各位看客阅读原文. 下面我们再来看一个例子,我们为一个书店写程序,书店里有许多书Book,每个书架(BookShelf)上有多本书. 类结构如下所示 class Book { private: string book_name_; }; class Shelf { private: vector<Book> books_; }; 如何遍历书架上所有的书呢?一种实