MPSC lock free queue



PS: 代码中注释对链头链尾判定的标准是添加的元素所在的位置为链尾,这和代码中的命名相冲突了

PPS: single customer 就不太需要考虑消费者的同时取的竞争状态

package akka.dispatch;

import akka.util.Unsafe;

import java.util.concurrent.atomic.AtomicReference;

 * Lock-free MPSC linked queue implementation based on Dmitriy Vyukov‘s non-intrusive MPSC queue:
public abstract class AbstractNodeQueue<T> extends AtomicReference<AbstractNodeQueue.Node<T>> {
    // Extends AtomicReference for the "head" slot (which is the one that is appended to) since Unsafe does not expose XCHG operation intrinsically
    private volatile Node<T> _tailDoNotCallMeDirectly;

    protected AbstractNodeQueue() {
       final Node<T> n = new Node<T>();
       _tailDoNotCallMeDirectly = n;//初始化根节点,value=null,链头!

     * !!! There is a copy of this code in pollNode() !!!
    protected final Node<T> peekNode() {//链头
        for(;;) {
          final Node<T> tail = ((Node<T>)Unsafe.instance.getObjectVolatile(this, tailOffset));
          final Node<T> next =;
          if (next != null || get() == tail)//next!=null表明它不是根节点且在链表中;get()==tail 表明整个链表中还没有Node,只能返回根节点了
            return next;

    public final T peek() {
        final Node<T> n = peekNode();
        return (n != null) ? n.value : null;

    public final void add(final T value) {
        final Node<T> n = new Node<T>(value);
    public final void addNode(final Node<T> n) {

    public final boolean isEmpty() {
        return peek() == null;

    public final int count() {//count 是不准确的
        int count = 0;
        for(Node<T> n = peekNode();n != null; n =
        return count;

     * !!! There is a copy of this code in pollNode() !!!
    public final T poll() {
        final Node<T> next = peekNode();
        if (next == null) return null;
        else {
            final T ret = next.value;
            next.value = null;
            Unsafe.instance.putOrderedObject(this, tailOffset, next);//将链头替换掉
            return ret;
    public final Node<T> pollNode() {
      Node<T> tail;
      Node<T> next;
      for(;;) {
        tail = ((Node<T>)Unsafe.instance.getObjectVolatile(this, tailOffset));
        next =;
        if (next != null || get() == tail)
      if (next == null) return null;
      else {
        tail.value = next.value;
        next.value = null;
        Unsafe.instance.putOrderedObject(this, tailOffset, next);
        return tail;

    private final static long tailOffset;

    static {
        try {
          tailOffset = Unsafe.instance.objectFieldOffset(AbstractNodeQueue.class.getDeclaredField("_tailDoNotCallMeDirectly"));
        } catch(Throwable t){
            throw new ExceptionInInitializerError(t);

    public static class Node<T> {
        public T value;
        private volatile Node<T> _nextDoNotCallMeDirectly;//下一个节点。next,直接用Unsafe来进行操作

        public Node() {

        public Node(final T value) {
            this.value = value;

        public final Node<T> next() {
            return (Node<T>)Unsafe.instance.getObjectVolatile(this, nextOffset);

        protected final void setNext(final Node<T> newNext) {
          Unsafe.instance.putOrderedObject(this, nextOffset, newNext);
        private final static long nextOffset;
        static {
            try {
                nextOffset = Unsafe.instance.objectFieldOffset(Node.class.getDeclaredField("_nextDoNotCallMeDirectly"));
            } catch(Throwable t){
                throw new ExceptionInInitializerError(t);
