[c实现的队列](http://www.1024cores.net/home/lock-free-algorithms/queues/non-intrusive-mpsc-node-based-queue)
下面是akka实现的一个MPSC队列。
PS: 代码中注释对链头链尾判定的标准是添加的元素所在的位置为链尾,这和代码中的命名相冲突了
PPS: single customer 就不太需要考虑消费者的同时取的竞争状态
/** * Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com> */ package akka.dispatch; import akka.util.Unsafe; import java.util.concurrent.atomic.AtomicReference; /** * Lock-free MPSC linked queue implementation based on Dmitriy Vyukov‘s non-intrusive MPSC queue: * http://www.1024cores.net/home/lock-free-algorithms/queues/non-intrusive-mpsc-node-based-queue */ @SuppressWarnings("serial") public abstract class AbstractNodeQueue<T> extends AtomicReference<AbstractNodeQueue.Node<T>> { // Extends AtomicReference for the "head" slot (which is the one that is appended to) since Unsafe does not expose XCHG operation intrinsically //AtomicReference的value对应队列的链尾Node @SuppressWarnings("unused") private volatile Node<T> _tailDoNotCallMeDirectly; protected AbstractNodeQueue() { final Node<T> n = new Node<T>(); _tailDoNotCallMeDirectly = n;//初始化根节点,value=null,链头! set(n);//初始化链尾部Node } /* * !!! There is a copy of this code in pollNode() !!! */ @SuppressWarnings("unchecked") protected final Node<T> peekNode() {//链头 for(;;) { final Node<T> tail = ((Node<T>)Unsafe.instance.getObjectVolatile(this, tailOffset)); final Node<T> next = tail.next(); if (next != null || get() == tail)//next!=null表明它不是根节点且在链表中;get()==tail 表明整个链表中还没有Node,只能返回根节点了 return next; } } public final T peek() { final Node<T> n = peekNode(); return (n != null) ? n.value : null; } public final void add(final T value) { final Node<T> n = new Node<T>(value); getAndSet(n).setNext(n);//蛮经典的一句,将AtomicReference的value设置为最新的Node,并将n链接到链表尾部上去 } public final void addNode(final Node<T> n) { n.setNext(null); getAndSet(n).setNext(n); } public final boolean isEmpty() { return peek() == null; } public final int count() {//count 是不准确的 int count = 0; for(Node<T> n = peekNode();n != null; n = n.next()) ++count; return count; } /* * !!! There is a copy of this code in pollNode() !!! */ public final T poll() { final Node<T> next = peekNode(); if (next == null) return null; else { final T ret = next.value; next.value = null; Unsafe.instance.putOrderedObject(this, tailOffset, next);//将链头替换掉 return ret; } } @SuppressWarnings("unchecked") public final Node<T> pollNode() { Node<T> tail; Node<T> next; for(;;) { tail = ((Node<T>)Unsafe.instance.getObjectVolatile(this, tailOffset)); next = tail.next(); if (next != null || get() == tail) break; } if (next == null) return null; else { tail.value = next.value; next.value = null; Unsafe.instance.putOrderedObject(this, tailOffset, next); return tail; } } private final static long tailOffset; static { try { tailOffset = Unsafe.instance.objectFieldOffset(AbstractNodeQueue.class.getDeclaredField("_tailDoNotCallMeDirectly")); } catch(Throwable t){ throw new ExceptionInInitializerError(t); } } public static class Node<T> { public T value; @SuppressWarnings("unused") private volatile Node<T> _nextDoNotCallMeDirectly;//下一个节点。next,直接用Unsafe来进行操作 public Node() { this(null); } public Node(final T value) { this.value = value; } @SuppressWarnings("unchecked") public final Node<T> next() { return (Node<T>)Unsafe.instance.getObjectVolatile(this, nextOffset); } protected final void setNext(final Node<T> newNext) { Unsafe.instance.putOrderedObject(this, nextOffset, newNext); } private final static long nextOffset; static { try { nextOffset = Unsafe.instance.objectFieldOffset(Node.class.getDeclaredField("_nextDoNotCallMeDirectly")); } catch(Throwable t){ throw new ExceptionInInitializerError(t); } } } }
时间: 2024-10-14 00:49:29