SkipList介绍
1. SkipList(跳表),在理论上能够在O(log(n))时间内完成查找、插入、删除操作。SkipList是一种红黑树的替代方案,由于SkipList与红黑树相比无论从理论和实现都简单许多,所以得到了很好的推广。SkipList是基于一种统计学原理实现的,有可能出现最坏情况,即查找和更新操作都是O(n)时间复杂度,但从统计学角度分析这种概率极小。使用SkipList类型的数据结构更容易控制多线程对集合访问的处理,因为链表的局部处理性比较好,当多个线程对SkipList进行更新操作(指插入和删除)时,SkipList具有较好的局部性,每个单独的操作,对整体数据结构影响较小。而如果使用红黑树,很可能一个更新操作,将会波及整个树的结构,其局部性较差。因此使用SkipList更适合实现多个线程的并发处理。
2. ConcurrentSkipListMap采用Lock-Free Skip List实现。非并发版TreeMap是红黑树实现的,而为什么并发版本不采用树形结构呢,作者Doug Lea也说了,暂时没有很好的无锁操作树形结构的算法:The reason is that there are no known efficient lock-free insertion and deletion algorithms for search trees. 据说已经有lock-free trees的论文了: Lock-Free
Red-Black Trees Using CAS,相关问题可以参见Skip List vs. Binary Tree的讨论。
3. 调用ConcurrentSkipListMap的size时,由于多个线程可以同时对映射表进行操作,所以映射表需要遍历整个链表才能返回元素个数,这个操作是个O(log(n))的操作。
源码解析(基于jdk1.8.0_40)
无锁链表
ConcurrentSkipListMap能够采用无锁实现,是因为采用了无锁操作(增加/删除)链表的算法--在删除链表结点时,使用标记删除的思想,先将结点value设置为null(步骤1),然后插入一个后继marker结点(步骤2,如果失败则说明有并发的insert,重试append marker就好了)。以后在查找发现marker结点时才进行真正的删除(步骤3)。这样设计让insert/delete操作需要竞争修改同一个结点的next指针,避免了并发操作出现insert被吃掉的情况(例如并发insert/delete操作单链表时,线程A
insert了一个结点x到n结点后面,而此时线程B立即delete掉了n结点,如果不加锁或者append marker标记的话,新结点x就被一起误删了)。下面的算法描述来源于该类的doc注释:
* Here's the sequence of events for a deletion of node n with * predecessor b and successor f, initially: * * +------+ +------+ +------+ * ... | b |------>| n |----->| f | ... * +------+ +------+ +------+ * * 1. CAS n's value field from non-null to null. * From this point on, no public operations encountering * the node consider this mapping to exist. However, other * ongoing insertions and deletions might still modify * n's next pointer. * * 2. CAS n's next pointer to point to a new marker node. * From this point on, no other nodes can be appended to n. * which avoids deletion errors in CAS-based linked lists. * * +------+ +------+ +------+ +------+ * ... | b |------>| n |----->|marker|------>| f | ... * +------+ +------+ +------+ +------+ * * 3. CAS b's next pointer over both n and its marker. * From this point on, no new traversals will encounter n, * and it can eventually be GCed. * +------+ +------+ * ... | b |----------------------------------->| f | ... * +------+ +------+ *
数据结构
/* * This class implements a tree-like two-dimensionally linked skip * list in which the index levels are represented in separate * nodes from the base nodes holding data. There are two reasons * for taking this approach instead of the usual array-based * structure: 1) Array based implementations seem to encounter * more complexity and overhead 2) We can use cheaper algorithms * for the heavily-traversed index lists than can be used for the * base lists. Here's a picture of some of the basics for a * possible list with 2 levels of index: * * Head nodes Index nodes * +-+ right +-+ +-+ * |2|---------------->| |--------------------->| |->null * +-+ +-+ +-+ * | down | | * v v v * +-+ +-+ +-+ +-+ +-+ +-+ * |1|----------->| |->| |------>| |----------->| |------>| |->null * +-+ +-+ +-+ +-+ +-+ +-+ * v | | | | | * Nodes next v v v v v * +-+ +-+ +-+ +-+ +-+ +-+ +-+ +-+ +-+ +-+ +-+ +-+ * | |->|A|->|B|->|C|->|D|->|E|->|F|->|G|->|H|->|I|->|J|->|K|->null * +-+ +-+ +-+ +-+ +-+ +-+ +-+ +-+ +-+ +-+ +-+ +-+ *
这是一个level等于2的skip list,注意初始化时level为1。Level大于等于1的结点均为Index结点,特别的,最左边的Index结点又是HeadIndex结点。而Level 0 也叫base-level,是Nodes单链表,是通过Level 1 的Index.node访问到的。
Node:
/** * Nodes hold keys and values, and are singly linked in sorted * order, possibly with some intervening marker nodes. The list is * headed by a dummy node accessible as head.node. The value field * is declared only as Object because it takes special non-V * values for marker and header nodes. */ static final class Node<K,V> { final K key; volatile Object value; volatile Node<K,V> next; /** * Creates a new regular node. */ Node(K key, Object value, Node<K,V> next) { this.key = key; this.value = value; this.next = next; } /** * Creates a new marker node. A marker is distinguished by * having its value field point to itself. Marker nodes also * have null keys, a fact that is exploited in a few places, * but this doesn't distinguish markers from the base-level * header node (head.node), which also has a null key. */ Node(Node<K,V> next) { this.key = null; this.value = this; this.next = next; } /** * compareAndSet value field */ boolean casValue(Object cmp, Object val) { return UNSAFE.compareAndSwapObject(this, valueOffset, cmp, val); } /** * compareAndSet next field */ boolean casNext(Node<K,V> cmp, Node<K,V> val) { return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val); } /** * Returns true if this node is a marker. This method isn't * actually called in any current code checking for markers * because callers will have already read value field and need * to use that read (not another done here) and so directly * test if value points to node. * * @return true if this node is a marker node */ boolean isMarker() { return value == this; } /** * Returns true if this node is the header of base-level list. * @return true if this node is header node */ boolean isBaseHeader() { return value == BASE_HEADER; } /** * Tries to append a deletion marker to this node. * @param f the assumed current successor of this node * @return true if successful */ boolean appendMarker(Node<K,V> f) { return casNext(f, new Node<K,V>(f)); } /** * Helps out a deletion by appending marker or unlinking from * predecessor. This is called during traversals when value * field seen to be null. * @param b predecessor * @param f successor */ void helpDelete(Node<K,V> b, Node<K,V> f) { /* * Rechecking links and then doing only one of the * help-out stages per call tends to minimize CAS * interference among helping threads. */ if (f == next && this == b.next) { if (f == null || f.value != f) // not already marked casNext(f, new Node<K,V>(f)); else b.casNext(this, f.next); } } ...... }
Index:
/** * Index nodes represent the levels of the skip list. Note that * even though both Nodes and Indexes have forward-pointing * fields, they have different types and are handled in different * ways, that can't nicely be captured by placing field in a * shared abstract class. */ static class Index<K,V> { final Node<K,V> node; final Index<K,V> down; volatile Index<K,V> right; /** * Creates index node with given values. */ Index(Node<K,V> node, Index<K,V> down, Index<K,V> right) { this.node = node; this.down = down; this.right = right; } /** * compareAndSet right field */ final boolean casRight(Index<K,V> cmp, Index<K,V> val) { return UNSAFE.compareAndSwapObject(this, rightOffset, cmp, val); } /** * Returns true if the node this indexes has been deleted. * @return true if indexed node is known to be deleted */ final boolean indexesDeletedNode() { return node.value == null; } /** * Tries to CAS newSucc as successor. To minimize races with * unlink that may lose this index node, if the node being * indexed is known to be deleted, it doesn't try to link in. * @param succ the expected current successor * @param newSucc the new successor * @return true if successful */ final boolean link(Index<K,V> succ, Index<K,V> newSucc) { Node<K,V> n = node; newSucc.right = succ; return n.value != null && casRight(succ, newSucc); } /** * Tries to CAS right field to skip over apparent successor * succ. Fails (forcing a retraversal by caller) if this node * is known to be deleted. * @param succ the expected current successor * @return true if successful */ final boolean unlink(Index<K,V> succ) { return node.value != null && casRight(succ, succ.right); } ...... }
HeadIndex:
/** * Nodes heading each level keep track of their level. */ static final class HeadIndex<K,V> extends Index<K,V> { final int level; HeadIndex(Node<K,V> node, Index<K,V> down, Index<K,V> right, int level) { super(node, down, right); this.level = level; } }
构造函数和初始化方法:
public ConcurrentSkipListMap() { this.comparator = null; initialize(); } /** * Constructs a new, empty map, sorted according to the specified * comparator. * * @param comparator the comparator that will be used to order this map. * If {@code null}, the {@linkplain Comparable natural * ordering} of the keys will be used. */ public ConcurrentSkipListMap(Comparator<? super K> comparator) { this.comparator = comparator; initialize(); } public ConcurrentSkipListMap(Map<? extends K, ? extends V> m) { this.comparator = null; initialize(); putAll(m); } public ConcurrentSkipListMap(SortedMap<K, ? extends V> m) { this.comparator = m.comparator(); initialize(); buildFromSorted(m); }
initialize方法,注意初始的HeadIndex level为1:
/** * Initializes or resets state. Needed by constructors, clone, * clear, readObject. and ConcurrentSkipListSet.clone. * (Note that comparator must be separately initialized.) */ private void initialize() { keySet = null; entrySet = null; values = null; descendingMap = null; head = new HeadIndex<K,V>(new Node<K,V>(null, BASE_HEADER, null), null, null, 1); }
doGet方法:
代码中的Node b, n, f 分别对应文章开头图示的单链表。过程比较清晰,先通过findPredecessor方法搜索到base level层的Node结点,然后再继续顺序向后搜索单链表,如果发现数据不一致的时候则回到outer循环重新查找。
/** * Gets value for key. Almost the same as findNode, but returns * the found value (to avoid retries during re-reads) * * @param key the key * @return the value, or null if absent */ private V doGet(Object key) { if (key == null) throw new NullPointerException(); Comparator<? super K> cmp = comparator; outer: for (;;) { for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) { Object v; int c; if (n == null) break outer; Node<K,V> f = n.next; if (n != b.next) // inconsistent read break; if ((v = n.value) == null) { // n is deleted n.helpDelete(b, f); break; } if (b.value == null || v == n) // b is deleted break; if ((c = cpr(cmp, key, n.key)) == 0) { @SuppressWarnings("unchecked") V vv = (V)v; return vv; } if (c < 0) break outer; b = n; n = f; } } return null; }
findPredecessor方法
通过索引查找到Level 1层的Index结点q,然后返回q.node(即base level层的Node)。另外该方法还提供一个额外功能:发现Node结点被删除后,删除其Index索引结点。
/** * Returns a base-level node with key strictly less than given key, * or the base-level header if there is no such node. Also * unlinks indexes to deleted nodes found along the way. Callers * rely on this side-effect of clearing indices to deleted nodes. * @param key the key * @return a predecessor of key */ private Node<K,V> findPredecessor(Object key, Comparator<? super K> cmp) { if (key == null) throw new NullPointerException(); // don't postpone errors for (;;) { for (Index<K,V> q = head, r = q.right, d;;) { if (r != null) { Node<K,V> n = r.node; K k = n.key; if (n.value == null) {//发现结点已被标记删除,则调用前继结点q.unlink()方法删除Index索引结点 if (!q.unlink(r)) break; // restart r = q.right; // reread r continue; } if (cpr(cmp, key, k) > 0) { q = r; r = r.right; continue; } } if ((d = q.down) == null) return q.node; q = d; r = d.right; } } }
doPut方法:
第一步,outer循环,搜索Skip List,找到key相等的结点则做更新操作,否则创建新结点并插入到base level层。
第二步,判断是否要帮助新增结点创建索引,如果要,使用随机数计算得到level值,作为要创建索引的层数。如果level>当前head.level,则要提升总Level层数(每次Level层数只增加1),并cas更新head属性。然后构建Level 1 到level层的Index结点链(通过链接Index.down指针,后续再补建Index.right指针)。
第三步,splice循环,从level层开始逐层补建Index.right指针。
/** * Main insertion method. Adds element if not present, or * replaces value if present and onlyIfAbsent is false. * @param key the key * @param value the value that must be associated with key * @param onlyIfAbsent if should not insert if already present * @return the old value, or null if newly inserted */ private V doPut(K key, V value, boolean onlyIfAbsent) { Node<K,V> z; // added node if (key == null) throw new NullPointerException(); Comparator<? super K> cmp = comparator; outer: for (;;) { for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) { if (n != null) { Object v; int c; Node<K,V> f = n.next; if (n != b.next) // inconsistent read break; if ((v = n.value) == null) { // n is deleted n.helpDelete(b, f); break; } if (b.value == null || v == n) // b is deleted break; if ((c = cpr(cmp, key, n.key)) > 0) { b = n; n = f; continue; } if (c == 0) { if (onlyIfAbsent || n.casValue(v, value)) { @SuppressWarnings("unchecked") V vv = (V)v; return vv; } break; // restart if lost race to replace value } // else c < 0; fall through } z = new Node<K,V>(key, value, n); if (!b.casNext(n, z)) break; // restart if lost race to append to b break outer; } } int rnd = ThreadLocalRandom.nextSecondarySeed(); if ((rnd & 0x80000001) == 0) { // test highest and lowest bits int level = 1, max; while (((rnd >>>= 1) & 1) != 0) ++level; Index<K,V> idx = null; HeadIndex<K,V> h = head; if (level <= (max = h.level)) { for (int i = 1; i <= level; ++i) idx = new Index<K,V>(z, idx, null); } else { // try to grow by one level level = max + 1; // hold in array and later pick the one to use @SuppressWarnings("unchecked")Index<K,V>[] idxs = (Index<K,V>[])new Index<?,?>[level+1]; for (int i = 1; i <= level; ++i) idxs[i] = idx = new Index<K,V>(z, idx, null); for (;;) { h = head; int oldLevel = h.level; if (level <= oldLevel) // lost race to add level break; HeadIndex<K,V> newh = h; Node<K,V> oldbase = h.node; for (int j = oldLevel+1; j <= level; ++j) newh = new HeadIndex<K,V>(oldbase, newh, idxs[j], j); if (casHead(h, newh)) { h = newh; idx = idxs[level = oldLevel]; break; } } } // find insertion points and splice in splice: for (int insertionLevel = level;;) { int j = h.level; for (Index<K,V> q = h, r = q.right, t = idx;;) { if (q == null || t == null) break splice; if (r != null) { Node<K,V> n = r.node; // compare before deletion check avoids needing recheck int c = cpr(cmp, key, n.key); if (n.value == null) { if (!q.unlink(r)) break; r = q.right; continue; } if (c > 0) { q = r; r = r.right; continue; } } if (j == insertionLevel) { if (!q.link(r, t)) break; // restart if (t.node.value == null) { findNode(key); break splice; } if (--insertionLevel == 0) break splice; } if (--j >= insertionLevel && j < level) t = t.down; q = q.down; r = q.right; } } } return null; }
doRemove方法:
该方法流程类似doGet(),查找到base level层的Node结点后,做标记删除n.casValue(v, null),并append Marker结点。后续利用findPredecessor方法清除无效的Index索引结点。
/** * Main deletion method. Locates node, nulls value, appends a * deletion marker, unlinks predecessor, removes associated index * nodes, and possibly reduces head index level. * * Index nodes are cleared out simply by calling findPredecessor. * which unlinks indexes to deleted nodes found along path to key, * which will include the indexes to this node. This is done * unconditionally. We can't check beforehand whether there are * index nodes because it might be the case that some or all * indexes hadn't been inserted yet for this node during initial * search for it, and we'd like to ensure lack of garbage * retention, so must call to be sure. * * @param key the key * @param value if non-null, the value that must be * associated with key * @return the node, or null if not found */ final V doRemove(Object key, Object value) { if (key == null) throw new NullPointerException(); Comparator<? super K> cmp = comparator; outer: for (;;) { for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) { Object v; int c; if (n == null) break outer; Node<K,V> f = n.next; if (n != b.next) // inconsistent read break; if ((v = n.value) == null) { // n is deleted n.helpDelete(b, f); break; } if (b.value == null || v == n) // b is deleted break; if ((c = cpr(cmp, key, n.key)) < 0) break outer; if (c > 0) { b = n; n = f; continue; } if (value != null && !value.equals(v)) break outer; if (!n.casValue(v, null))//标记删除 break; if (!n.appendMarker(f) || !b.casNext(n, f))//插入marker结点 findNode(key); // retry via findNode else { findPredecessor(key, cmp); // clean index if (head.right == null) tryReduceLevel(); } @SuppressWarnings("unchecked") V vv = (V)v; return vv; } } return null; }
相关内容链接
SkipList 跳表 (skip list原理)
集合框架 Map篇(5)----ConcurrentSkipListMap (原理介绍,以及基于jdk1.6源码解析,不建议看,因为到jdk8很多实现已经不同了)