转载请注明源出处:http://www.cnblogs.com/lighten/p/7542578.html
1.前言
一个可伸缩的并发实现,这个map实现了排序功能,默认使用的是对象自身的compareTo方法,如果提供了比较器,使用比较器的比较方法。简单来说ConcurrentSkipListMap是TreeMap的并发实现,但是为什么没有称之为ConcurrentTreeMap呢?这和其自身的实现有关。该类是SkipLists的变种实现,提供了log(n)的时间开销:containsKey、get、put、remove。Insertion, removal, update, and access等操作都是线程安全的。迭代器是弱一致性的,升序迭代器比降序的快。该map的size方法不是常量时间开销,需要遍历,所以这个值在并发的时候可能不准。该map也不允许空键或值。
2.ConcurrentSkipListMap
2.1 实现原理
此类实现了一个二维树状跳跃链表,index level由持有数据的基本节点的独立节点表示。有两个原因采取这种方法,而不是使用数组:1.数组实现复杂性更高,开销更大,2.我们可以使用开销更小的算法来完成大量遍历的索引列表而不是用于基本链表。下图是一个基础的两级索引的list结构:
基础的算法使用的是the HM linked ordered set algorithm的变种。这些算法的基本原理是在删除节点时标记删除节点的下一个节点指针,以避免并发插入冲突,当遍历跟踪三元组(前置节点,当前结点,后继节点)时,决定何时以及如何将这些已删除的结点断开。
节点直接使用CAS标记next指针,而不是使用标记位来标记列表删除。在删除时,不是标记一个指针,而是在另一个被认为是标记指针的结点中进行拼接。此外使用删除标记,链表中也有空的元素被看成是删除,这类似于懒删除模式。如果一个节点的值为null,就被认为是逻辑删除了,并忽略。下面是删除一个节点的示意图,删除n节点,初始状态如下:
1.CAS设置n的值从非null变成null。从这个时刻开始,没有public操作会认为这个节点存在,然而其它正在进行的insert和delete可能依旧在改变其next指针。
2.CAS设置n的next指针为一个新的标记节点,当这个节点存在时,没有其它的结点能添加在n后面,这是为了防止删除错误。
3.CAS设置b的next指针,越过n和marker节点,这个时刻开始,没有遍历方法能够访问到n,其可以被垃圾回收了。
第一步失败会导致简单的重试,2,3步失败了也不要紧,因为其他操作会忽视null节点,并且会帮助逻辑删除节点从链表移除。
2.2 数据结构
上图是一个数据结构,链表的头结点,比较器,和一个头结点的对象。
Node节点也比较标准,键值和下一个节点,内部方法如下:
casValue:CAS设置结点的value
casNext:CAS设置结点的next
isMarker:判断该节点是否是Marker节点,依据就是Marker节点的value就是其本身
isBaseHeader:判断该节点是否是头结点,依据就是head节点的value是类的BASE_HEADER的对象
appendMarker:CAS设置结点的next节点为marker节点,参数是该节点原来的next结点
helpDelete:帮助删除节点(当该节点是value为null的时候)
getValidValue:返回当前结点的值
createSnapshot:创建该节点的键值对快照,是一个不可改变的集合
Index就是该类的数据结构了,其对Node继续了封装,多了down和right节点,这是一个跳跃表的基本结构。里面的方法如下:
casRight:CAS设置该节点的right指针
indexesDeletedNode:判断该节点是否被逻辑删除了
link:CAS设置新的后继节点,参数是原后继节点和新的后继节点
unlink:CAS设置该节点的后继节点的后继节点为该节点的后继节点,就是将该节点的后继节点移除
HeadIndex继承自Index,补充了Index缺少了level字段。
2.3 基本操作
获取一个键值对:
步骤如下:
1.通过key找到跳跃表key的前一个节点b,该key的键值就是在这个节点b的后面。
2.如果b的next节点n为null,意味着b为最后节点,没有元素可找,跳出循环
3.再次检测b的next节点是否是n,不是意味着被put抢先插入了,重新找前置节点进行循环
4.n节点值为null,已逻辑删除,helpDelete帮助移除该节点,跳出循环,重新找前置节点
5.b节点value为null,或n==n.value(marked节点)b被移除,跳出循环,重新找前置节点
6.比较n的key和获取的key,相等就返回值,<0就跳出循环。没找到就继续判断n.next.
查找指定key开始遍历的跳跃表前置节点方法如上图,步骤如下:
1.从头结点开始遍历,当前结点的右节点为r
2.r不为null,但是值为null,尝试移除,移除失败重新从head开始遍历,成功继续找下一个跳跃点,继续遍历。如果r值不为null,但是比较出来key的值要大,意味着还可以跳跃这段,继续找下一个跳跃点。
3.找到合适的跳跃点,就去找该跳跃点的起始节点,down存在就是要当前跳跃点的结点,存在就在down中查找合适的跳跃点。
如果对这个结构有疑惑的,可以参考:这里。来理解一下什么是跳跃表。
放入一个元素:
private V doPut(K key, V value, boolean onlyIfAbsent) { Node<K,V> z; // added node if (key == null) throw new NullPointerException(); Comparator<? super K> cmp = comparator; outer: for (;;) { for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) { if (n != null) { Object v; int c; Node<K,V> f = n.next; if (n != b.next) // inconsistent read break; if ((v = n.value) == null) { // n is deleted n.helpDelete(b, f); break; } if (b.value == null || v == n) // b is deleted break; if ((c = cpr(cmp, key, n.key)) > 0) { b = n; n = f; continue; } if (c == 0) { if (onlyIfAbsent || n.casValue(v, value)) { @SuppressWarnings("unchecked") V vv = (V)v; return vv; } break; // restart if lost race to replace value } // else c < 0; fall through } z = new Node<K,V>(key, value, n); if (!b.casNext(n, z)) break; // restart if lost race to append to b break outer; } } int rnd = ThreadLocalRandom.nextSecondarySeed(); if ((rnd & 0x80000001) == 0) { // test highest and lowest bits int level = 1, max; while (((rnd >>>= 1) & 1) != 0) ++level; Index<K,V> idx = null; HeadIndex<K,V> h = head; if (level <= (max = h.level)) { for (int i = 1; i <= level; ++i) idx = new Index<K,V>(z, idx, null); } else { // try to grow by one level level = max + 1; // hold in array and later pick the one to use @SuppressWarnings("unchecked")Index<K,V>[] idxs = (Index<K,V>[])new Index<?,?>[level+1]; for (int i = 1; i <= level; ++i) idxs[i] = idx = new Index<K,V>(z, idx, null); for (;;) { h = head; int oldLevel = h.level; if (level <= oldLevel) // lost race to add level break; HeadIndex<K,V> newh = h; Node<K,V> oldbase = h.node; for (int j = oldLevel+1; j <= level; ++j) newh = new HeadIndex<K,V>(oldbase, newh, idxs[j], j); if (casHead(h, newh)) { h = newh; idx = idxs[level = oldLevel]; break; } } } // find insertion points and splice in splice: for (int insertionLevel = level;;) { int j = h.level; for (Index<K,V> q = h, r = q.right, t = idx;;) { if (q == null || t == null) break splice; if (r != null) { Node<K,V> n = r.node; // compare before deletion check avoids needing recheck int c = cpr(cmp, key, n.key); if (n.value == null) { if (!q.unlink(r)) break; r = q.right; continue; } if (c > 0) { q = r; r = r.right; continue; } } if (j == insertionLevel) { if (!q.link(r, t)) break; // restart if (t.node.value == null) { findNode(key); break splice; } if (--insertionLevel == 0) break splice; } if (--j >= insertionLevel && j < level) t = t.down; q = q.down; r = q.right; } } } return null; }
put方法主要经历了2个步骤:
1:查找合适的位置,插入该节点。
1)查找前置跳跃点b,其next节点为n,
2)遍历查找合适的插入点,n为null就创建节点,添加在b的next节点,添加成功跳出第一步,失败重新进行1)
3)n不为null,就查找其应该插入的节点,先要判断n是否还是b的next节点,防止被抢先在中间插入了,再判断n节点是否是有效节点,逻辑删除了就回到1)再重来。最后判断b节点是否被删除了。后面如果key的大小大于n节点的k,意味着还要往后找,如果等于就替换掉该节点的值,跳出第一步。最后找到了合适的插入点就尝试插入,失败重来,成功结束第一步。整个过程的逻辑和get的类似。
2:构建跳跃表的结点,调整跳表。完成第一步仅仅是将节点插入了链表中,还需要完成跳表的构成。(级别就意味着跳表的间隔,级别越大同一级别的结点越少,间隔越大,这种方式在查找的时候可以提升查找速度,从最大的级别开始,逐级定位结点)
1)随机级别,偶数且大于0。随机方法不说明。
2)如果该级别比头结点要小,生成一系列头结点的down节点(Index结点包含的node,自然是步骤1插入的结点),从级别1开始
3)该级别比头结点级别高,加大一个级别,生成从1开始的所有级别结点(node为插入节点)构成down链。
4)再次判断头结点级别,如果head级别比该级别高,证明被抢先调整了,重来。没有抢先,重新构建头结点索引headIndex,node是头结点的node,补充缺失的级别就可以了。替换头结点HeadIndex成功跳出循环,失败重来。
上面都是构建down方向的结点,确保head的down方向包含了所有索引级别。后面的方法就是构建right方法的连接了。这里要注意,h变成了新的头结点,level却是旧的级别。
5)h结点或h的right结点r为null,没必要进行,结束该环节
6)r不为null,比较key和r的结点n的key,n结点被逻辑删除,就帮助其移除,移除后找下一个r结点。当前r结点要小于key,则key还在右边,继续找r。直到找到key应该在的位置,即r结点>=key,key的right就是r。
7)不断降级,直到找到当前的插入级别,直到到指定级别,构建连接,连接失败重来,成功如果构建的结点被逻辑删除了,通过findNode方法,删除它。
整个过程有些抽象,结合二维图看会比较清楚,首先是一维的有序链表,这个就是Node结点,但是跳表为了加快搜索速度,使用了检索级别indexlevel构成了二维图。之前也提过,indexlevel级别越高,间隔越大,结点越少。一个新加结点,首先要确定其属于几级,1级就不需要构建IndexNode,一系列判断出其所属级别后,就先构建down方向的一系列结点,再通过头结点,将整个right方向结点联通,这个就是一个基本的思路。由于从头结点开始遍历,所以头结点必须有最高的级别。所以新节点基本超过头结点的时候,要提升头结点级别。大体逻辑就是这样。
其它的方法不再进行介绍,上面基本能了解ConcurrentSkipListMap的基本原理。