描述
ConcurrentLinkedQueue是一个基于单链表的无界线程安全队列,该队列是FIFO的。ConcurrentLinkedQueue/ConcurrentLinkedDeue和LinkedBlockingQueue/LinkedBlockingDeue
相比,不同点在于它们不提供阻塞功能,并且是Lock-Free的,而后者则是利用ReentrantLock实现的,所以他们具有更高的吞吐量。
源码解析(基于jdk1.7.0_76)
数据结构
类图:
head,tail结点定义和描述:
/** * A node from which the first live (non-deleted) node (if any) * can be reached in O(1) time. * Invariants: * - all live nodes are reachable from head via succ() * - head != null * - (tmp = head).next != tmp || tmp != head * Non-invariants: * - head.item may or may not be null. * - it is permitted for tail to lag behind head, that is, for tail * to not be reachable from head! */ private transient volatile Node<E> head; /** * A node from which the last node on list (that is, the unique * node with node.next == null) can be reached in O(1) time. * Invariants: * - the last node is always reachable from tail via succ() * - tail != null * Non-invariants: * - tail.item may or may not be null. * - it is permitted for tail to lag behind head, that is, for tail * to not be reachable from head! * - tail.next may or may not be self-pointing to tail. */ private transient volatile Node<E> tail;
链表结构
构造函数,初始化时让head和tail同时指向一个dummy结点(否则入队时需要同时修改head,tail 2个指针,需要加锁。而有了dummy结点,入队/出队只需各自竞争一个资源即可--入队CAS竞争修改头结点item属性:p.casItem(item, null),出队CAS竞争修改尾结点next指针:p.casNext(null, newNode)):
/** * Creates a {@code ConcurrentLinkedQueue} that is initially empty. */ public ConcurrentLinkedQueue() { head = tail = new Node<E>(null); }
ConcurrentLinkedQueue中单链表结构示意图:
* * head tail * | | * v v * * +------+ +------+ +------+ * |dummy |------>| a | ... ----->| n |----->null * +------+ +------+ +------+ *
注意head和tail指针不是一定要每次更新的,事实上它们的更新是迟钝的,有滞后的(这样有助于减少CAS操作,jdk7中,head/tail指针在滞后实际的头/尾结点2步以上时才会更新,甚至会发生tail指针滞后于head指针)。真正的尾结点可以通过tail指针向后找,直到node.next==null。同样真正的队列头结点可以通过head指针向后找,直到node.item!=null。下面的描述来自ConcurrentLinkedQueue Node类的doc文档:
* Both head and tail are permitted to lag. In fact, failing to * update them every time one could is a significant optimization * (fewer CASes). As with LinkedTransferQueue (see the internal * documentation for that class), we use a slack threshold of two; * that is, we update head/tail when the current pointer appears * to be two or more steps away from the first/last node. * * Since head and tail are updated concurrently and independently, * it is possible for tail to lag behind head (why not)?
入队offer方法
从队尾入队,找到队尾结点p后,cas竞争更新p.next指针,选择性的更新tail指针(前面提到的滞后更新)。
/** * Inserts the specified element at the tail of this queue. * As the queue is unbounded, this method will never return {@code false}. * * @return {@code true} (as specified by {@link Queue#offer}) * @throws NullPointerException if the specified element is null */ public boolean offer(E e) { checkNotNull(e); final Node<E> newNode = new Node<E>(e); for (Node<E> t = tail, p = t;;) { Node<E> q = p.next; if (q == null) { // p is last node if (p.casNext(null, newNode)) { // Successful CAS is the linearization point // for e to become an element of this queue, // and for newNode to become "live". if (p != t) // hop two nodes at a time casTail(t, newNode); // Failure is OK. return true; } // Lost CAS race to another thread; re-read next } else if (p == q) // We have fallen off list. If tail is unchanged, it // will also be off-list, in which case we need to // jump to head, from which all live nodes are always // reachable. Else the new tail is a better bet. p = (t != (t = tail)) ? t : head; else // Check for tail updates after two hops. p = (p != t && t != (t = tail)) ? t : q; } }
出队poll方法
从队列头部出队,找到头部结点p后,cas竞争把p.item值置null,选择性的更新head指针(前面提到的滞后更新)。注意head,tail指针始终不会为null,即使出队后队列空了,head和tail也会指向dummy结点。
public E poll() { restartFromHead: for (;;) { for (Node<E> h = head, p = h, q;;) { E item = p.item; if (item != null && p.casItem(item, null)) { // Successful CAS is the linearization point // for item to be removed from this queue. if (p != h) // hop two nodes at a time updateHead(h, ((q = p.next) != null) ? q : p); return item; } else if ((q = p.next) == null) { updateHead(h, p); return null; } else if (p == q) continue restartFromHead; else p = q; } } }
updateHead方法,cas更新head指针,并通过h.lazySetNext(h),把出队的结点h的next指针指向h自己,标示出已经出队(虽然已经通过设置node.item为null标示,但这里修改next指针是为了帮助GC):
/** * Try to CAS head to p. If successful, repoint old head to itself * as sentinel for succ(), below. */ final void updateHead(Node<E> h, Node<E> p) { if (h != p && casHead(h, p)) h.lazySetNext(h); }
size方法
size(), contains(), toArray()方法都需要遍历整个链表。
/** * Returns the number of elements in this queue. If this queue * contains more than {@code Integer.MAX_VALUE} elements, returns * {@code Integer.MAX_VALUE}. * * <p>Beware that, unlike in most collections, this method is * <em>NOT</em> a constant-time operation. Because of the * asynchronous nature of these queues, determining the current * number of elements requires an O(n) traversal. * Additionally, if elements are added or removed during execution * of this method, the returned result may be inaccurate. Thus, * this method is typically not very useful in concurrent * applications. * * @return the number of elements in this queue */ public int size() { int count = 0; for (Node<E> p = first(); p != null; p = succ(p)) if (p.item != null) // Collection.size() spec says to max out if (++count == Integer.MAX_VALUE) break; return count; }
succ方法,寻找后继结点(通过判断p==p.next来跳过已经出队的结点):
/** * Returns the successor of p, or the head node if p.next has been * linked to self, which will only be true if traversing with a * stale pointer that is now off the list. */ final Node<E> succ(Node<E> p) { Node<E> next = p.next; return (p == next) ? head : next; }
相关内容链接
聊聊并发(六)——ConcurrentLinkedQueue的实现原理分析 (基于jdk1.6的源码解析)