synchronized VS Lock, wait-notify VS Condition

最近在看Java Threads第三版,收获颇多。全英文阅读,感觉真的是爽歪歪。推荐大家都看看。

这一篇想系统的讲一讲,线程之间通信的2种模式,wait-notify 和 Condition。

先上一个生产者和消费者的例子

package waitnotify;

import java.util.ArrayList;
import java.util.List;
import java.util.Random;

public class Data {

    private List<Integer> data = new ArrayList<Integer>(5);

    private Random random = new Random();

    public void put() {
        synchronized (data) {
            if (data.size() >= 5) {
                try {
                    data.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            } else {
                data.add(random.nextInt(100));
                data.notify();
            }
        }
    }

    public void get() {
        synchronized (data) {
            if (data.size() < 1) {
                try {
                    data.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            } else {
                data.remove(0);
                data.notify();
            }
        }
    }
}
package waitnotify;

public class Producer extends Thread{

    private Data data;

    public Producer(Data data, String name) {
        this.data = data;
        this.setName(name);
    }

    @Override
    public void run() {
        for (int i=0; i<3; i++) {
            data.put();
        }
    }
}
package waitnotify;

public class Consumer extends Thread {

    private Data data;

    public Consumer(Data data, String name) {
        this.data = data;
        this.setName(name);
    }

    @Override
    public void run() {
        for (int i=0; i<3; i++) {
            data.get();
        }
    }
}
package waitnotify;

public class Test {

    public static void main (String args[]) {
        Data data = new Data();
        new Producer(data, "put---1").start();
        new Consumer(data, "get---1").start();
        new Consumer(data, "get---2").start();
        new Producer(data, "put---2").start();
    }
}

这个例子简单、易懂、易读。首先基于synchronized锁定共享资源(data),然后生产者和消费者通过wait和notify相互通信,实现互斥的访问共享资源。这里的关键是互斥,只要用了synchronized,所有的线程都只能互斥的执行。

1、那我们思考一个有关数据库的问题,写操作和读操作、写操作和写操作都是需要互斥的,读操作和读操作需要互斥吗?不需要,因为两个读操作同时进行,不会产生脏数据。这种情况,synchronized就无从下手。

2、还有,当多个线程访问synchronized代码,只有一个线程在执行,其它的线程只能死等。这里也是可以增加一些灵活性的,比如说,其它的线程可以等到一定时间后,中断自己,去干别的事情。

3、notify的作用是唤醒除了自己以外的其它线程,至于是哪一个,它不管。试想一个场景,有多个生产者线程和消费者线程。如果一个生产者线程发现data被装满,自己就等待,然后唤醒一个线程,那唤醒的那个线程是生产者线程,还是消费者线程呢?如果又唤醒了一个生产者线程,它还是等待,虽然不会产生脏数据,但是浪费了性能和时间啊。此处也可以优化。

下面就一起看看,如何解决上面的问题。

先来看看Lock这个接口,定义了哪些行为

public interface Lock {

    void lock();

    void lockInterruptibly() throws InterruptedException;

    boolean tryLock();

    boolean tryLock(long time, TimeUnit unit) throws InterruptedException;

    void unlock();

    Condition newCondition();
}

其中2个抛出中断异常的方法可以解决第二个问题;newCondition方法生成多个Condition可以解决第三个问题;而另一个锁接口ReadWriteLock解决了第一个问题,实现类里面用到了共享锁。

再来一个Condition版本的生产者和消费者(主要是修改了Data类)

package condition;

import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class Data {

    private List<Integer> data = new ArrayList<Integer>(5);

    private Random random = new Random();

    Lock lock = new ReentrantLock();
    Condition full = lock.newCondition();
    Condition empty = lock.newCondition();

    public void put() {
        lock.lock();
        try {
            if (data.size() >= 5) {
                full.await();
            } else {
                data.add(random.nextInt(100));
                empty.signal();
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    public void get() {
        lock.lock();
        try {
            if (data.size() < 1) {
                empty.await();
            } else {
                data.remove(0);
                full.signal();
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }
}

这里用一个lock生成了2个Condition,然后分别进行await和signal。假设一个生产者线程发现data装满,然后full.await,然后之前的empty.signal会唤醒一个线程,此线程必定是消费者线程。其实讲到这里,读者应该能猜到,full和empty分别维护了各自的等待队列,full的队列里全是生产者线程,empty队列里全是消费者线程。另外发现,代码似乎变复杂了,必须在finally里面释放锁。因为synchronized是JVM实现的锁语义,JVM会自动的帮你释放锁。而Lock是Java代码层面的语义,如果有异常,需要自己释放。

讲了半天,好像只是描述了一个表面现象,而没有接触到实质。没办法,源码走起。

从最重要的一个类ReentrantLock的实现看起,发现几乎所有的操作,兜兜转转都是调用了另一个核心类AbstractQueuedSynchronizer。这个类是Java并发大师的杰作,是Lock机制的灵魂所在,我至今还有些地方没能够完全领悟。哎,大师就是大师。

当然,大师也是站在巨人的肩膀上。AbstractQueuedSynchronizer的核心思想是CLH队列,说白了,就是将同一个锁上等待的所有线程看成一个个节点,每一个节点都在自旋,等待前驱节点的信号并且尝试获取锁。而且,AbstractQueuedSynchronizer将同一个Condition上等待的所有线程也封装成了节点队列。

那就先看看这个节点是个什么数据结构

static final class Node {
        /** Marker to indicate a node is waiting in shared mode */
        static final Node SHARED = new Node();
        /** Marker to indicate a node is waiting in exclusive mode */
        static final Node EXCLUSIVE = null;

        /** waitStatus value to indicate thread has cancelled */
        static final int CANCELLED =  1;
        /** waitStatus value to indicate successor‘s thread needs unparking */
        static final int SIGNAL    = -1;
        /** waitStatus value to indicate thread is waiting on condition */
        static final int CONDITION = -2;
        /**
         * waitStatus value to indicate the next acquireShared should
         * unconditionally propagate
         */
        static final int PROPAGATE = -3;

        /**
         * Status field, taking on only the values:
         *   SIGNAL:     The successor of this node is (or will soon be)
         *               blocked (via park), so the current node must
         *               unpark its successor when it releases or
         *               cancels. To avoid races, acquire methods must
         *               first indicate they need a signal,
         *               then retry the atomic acquire, and then,
         *               on failure, block.
         *   CANCELLED:  This node is cancelled due to timeout or interrupt.
         *               Nodes never leave this state. In particular,
         *               a thread with cancelled node never again blocks.
         *   CONDITION:  This node is currently on a condition queue.
         *               It will not be used as a sync queue node
         *               until transferred, at which time the status
         *               will be set to 0. (Use of this value here has
         *               nothing to do with the other uses of the
         *               field, but simplifies mechanics.)
         *   PROPAGATE:  A releaseShared should be propagated to other
         *               nodes. This is set (for head node only) in
         *               doReleaseShared to ensure propagation
         *               continues, even if other operations have
         *               since intervened.
         *   0:          None of the above
         *
         * The values are arranged numerically to simplify use.
         * Non-negative values mean that a node doesn‘t need to
         * signal. So, most code doesn‘t need to check for particular
         * values, just for sign.
         *
         * The field is initialized to 0 for normal sync nodes, and
         * CONDITION for condition nodes.  It is modified using CAS
         * (or when possible, unconditional volatile writes).
         */
        volatile int waitStatus;

        /**
         * Link to predecessor node that current node/thread relies on
         * for checking waitStatus. Assigned during enqueuing, and nulled
         * out (for sake of GC) only upon dequeuing.  Also, upon
         * cancellation of a predecessor, we short-circuit while
         * finding a non-cancelled one, which will always exist
         * because the head node is never cancelled: A node becomes
         * head only as a result of successful acquire. A
         * cancelled thread never succeeds in acquiring, and a thread only
         * cancels itself, not any other node.
         */
        volatile Node prev;

        /**
         * Link to the successor node that the current node/thread
         * unparks upon release. Assigned during enqueuing, adjusted
         * when bypassing cancelled predecessors, and nulled out (for
         * sake of GC) when dequeued.  The enq operation does not
         * assign next field of a predecessor until after attachment,
         * so seeing a null next field does not necessarily mean that
         * node is at end of queue. However, if a next field appears
         * to be null, we can scan prev‘s from the tail to
         * double-check.  The next field of cancelled nodes is set to
         * point to the node itself instead of null, to make life
         * easier for isOnSyncQueue.
         */
        volatile Node next;

        /**
         * The thread that enqueued this node.  Initialized on
         * construction and nulled out after use.
         */
        volatile Thread thread;

        /**
         * Link to next node waiting on condition, or the special
         * value SHARED.  Because condition queues are accessed only
         * when holding in exclusive mode, we just need a simple
         * linked queue to hold nodes while they are waiting on
         * conditions. They are then transferred to the queue to
         * re-acquire. And because conditions can only be exclusive,
         * we save a field by using special value to indicate shared
         * mode.
         */
        Node nextWaiter;

        /**
         * Returns true if node is waiting in shared mode.
         */
        final boolean isShared() {
            return nextWaiter == SHARED;
        }

        /**
         * Returns previous node, or throws NullPointerException if null.
         * Use when predecessor cannot be null.  The null check could
         * be elided, but is present to help the VM.
         *
         * @return the predecessor of this node
         */
        final Node predecessor() throws NullPointerException {
            Node p = prev;
            if (p == null)
                throw new NullPointerException();
            else
                return p;
        }

        Node() {    // Used to establish initial head or SHARED marker
        }

        Node(Thread thread, Node mode) {     // Used by addWaiter
            this.nextWaiter = mode;
            this.thread = thread;
        }

        Node(Thread thread, int waitStatus) { // Used by Condition
            this.waitStatus = waitStatus;
            this.thread = thread;
        }
    }

这个Node是AbstractQueuedSynchronizer的一个内部类,大部分的英文注释说的很明白了。Node就是对线程的包装,Node有两种模式,独占(EXCLUSIVE)和共享(SHARED),前面提到的读-读操作就是利用了共享锁。对于一个节点的等待状态,CANCELLED表示当前节点被取消,比如说线程中断导致的;SIGNAL表示后继节点可以执行;CONDITION表示此节点在等待一个Condition,在Condition等待队列里面。

再看看Condition等待队列的结构

public class ConditionObject implements Condition, java.io.Serializable {
        private static final long serialVersionUID = 1173984872572414699L;
        /** First node of condition queue. */
        private transient Node firstWaiter;
        /** Last node of condition queue. */
        private transient Node lastWaiter;

        /**
         * Creates a new {@code ConditionObject} instance.
         */
        public ConditionObject() { }

        // Internal methods

        /**
         * Adds a new waiter to wait queue.
         * @return its new wait node
         */
        private Node addConditionWaiter() {
            Node t = lastWaiter;
            // If lastWaiter is cancelled, clean out.
            if (t != null && t.waitStatus != Node.CONDITION) {
                unlinkCancelledWaiters();
                t = lastWaiter;
            }
            Node node = new Node(Thread.currentThread(), Node.CONDITION);
            if (t == null)
                firstWaiter = node;
            else
                t.nextWaiter = node;
            lastWaiter = node;
            return node;
        }

        /**
         * Removes and transfers nodes until hit non-cancelled one or
         * null. Split out from signal in part to encourage compilers
         * to inline the case of no waiters.
         * @param first (non-null) the first node on condition queue
         */
        private void doSignal(Node first) {
            do {
                if ( (firstWaiter = first.nextWaiter) == null)
                    lastWaiter = null;
                first.nextWaiter = null;
            } while (!transferForSignal(first) &&
                     (first = firstWaiter) != null);
        }

        /**
         * Removes and transfers all nodes.
         * @param first (non-null) the first node on condition queue
         */
        private void doSignalAll(Node first) {
            lastWaiter = firstWaiter = null;
            do {
                Node next = first.nextWaiter;
                first.nextWaiter = null;
                transferForSignal(first);
                first = next;
            } while (first != null);
        }

        /**
         * Unlinks cancelled waiter nodes from condition queue.
         * Called only while holding lock. This is called when
         * cancellation occurred during condition wait, and upon
         * insertion of a new waiter when lastWaiter is seen to have
         * been cancelled. This method is needed to avoid garbage
         * retention in the absence of signals. So even though it may
         * require a full traversal, it comes into play only when
         * timeouts or cancellations occur in the absence of
         * signals. It traverses all nodes rather than stopping at a
         * particular target to unlink all pointers to garbage nodes
         * without requiring many re-traversals during cancellation
         * storms.
         */
        private void unlinkCancelledWaiters() {
            Node t = firstWaiter;
            Node trail = null;
            while (t != null) {
                Node next = t.nextWaiter;
                if (t.waitStatus != Node.CONDITION) {
                    t.nextWaiter = null;
                    if (trail == null)
                        firstWaiter = next;
                    else
                        trail.nextWaiter = next;
                    if (next == null)
                        lastWaiter = trail;
                }
                else
                    trail = t;
                t = next;
            }
        }

        // public methods

        /**
         * Moves the longest-waiting thread, if one exists, from the
         * wait queue for this condition to the wait queue for the
         * owning lock.
         *
         * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
         *         returns {@code false}
         */
        public final void signal() {
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            Node first = firstWaiter;
            if (first != null)
                doSignal(first);
        }

        /**
         * Moves all threads from the wait queue for this condition to
         * the wait queue for the owning lock.
         *
         * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
         *         returns {@code false}
         */
        public final void signalAll() {
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            Node first = firstWaiter;
            if (first != null)
                doSignalAll(first);
        }

        /**
         * Implements uninterruptible condition wait.
         * <ol>
         * <li> Save lock state returned by {@link #getState}.
         * <li> Invoke {@link #release} with saved state as argument,
         *      throwing IllegalMonitorStateException if it fails.
         * <li> Block until signalled.
         * <li> Reacquire by invoking specialized version of
         *      {@link #acquire} with saved state as argument.
         * </ol>
         */
        public final void awaitUninterruptibly() {
            Node node = addConditionWaiter();
            int savedState = fullyRelease(node);
            boolean interrupted = false;
            while (!isOnSyncQueue(node)) {
                LockSupport.park(this);
                if (Thread.interrupted())
                    interrupted = true;
            }
            if (acquireQueued(node, savedState) || interrupted)
                selfInterrupt();
        }

        /*
         * For interruptible waits, we need to track whether to throw
         * InterruptedException, if interrupted while blocked on
         * condition, versus reinterrupt current thread, if
         * interrupted while blocked waiting to re-acquire.
         */

        /** Mode meaning to reinterrupt on exit from wait */
        private static final int REINTERRUPT =  1;
        /** Mode meaning to throw InterruptedException on exit from wait */
        private static final int THROW_IE    = -1;

        /**
         * Checks for interrupt, returning THROW_IE if interrupted
         * before signalled, REINTERRUPT if after signalled, or
         * 0 if not interrupted.
         */
        private int checkInterruptWhileWaiting(Node node) {
            return Thread.interrupted() ?
                (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
                0;
        }

        /**
         * Throws InterruptedException, reinterrupts current thread, or
         * does nothing, depending on mode.
         */
        private void reportInterruptAfterWait(int interruptMode)
            throws InterruptedException {
            if (interruptMode == THROW_IE)
                throw new InterruptedException();
            else if (interruptMode == REINTERRUPT)
                selfInterrupt();
        }

        /**
         * Implements interruptible condition wait.
         * <ol>
         * <li> If current thread is interrupted, throw InterruptedException.
         * <li> Save lock state returned by {@link #getState}.
         * <li> Invoke {@link #release} with saved state as argument,
         *      throwing IllegalMonitorStateException if it fails.
         * <li> Block until signalled or interrupted.
         * <li> Reacquire by invoking specialized version of
         *      {@link #acquire} with saved state as argument.
         * <li> If interrupted while blocked in step 4, throw InterruptedException.
         * </ol>
         */
        public final void await() throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            Node node = addConditionWaiter();
            int savedState = fullyRelease(node);
            int interruptMode = 0;
            while (!isOnSyncQueue(node)) {
                LockSupport.park(this);
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null) // clean up if cancelled
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
        }

        /**
         * Implements timed condition wait.
         * <ol>
         * <li> If current thread is interrupted, throw InterruptedException.
         * <li> Save lock state returned by {@link #getState}.
         * <li> Invoke {@link #release} with saved state as argument,
         *      throwing IllegalMonitorStateException if it fails.
         * <li> Block until signalled, interrupted, or timed out.
         * <li> Reacquire by invoking specialized version of
         *      {@link #acquire} with saved state as argument.
         * <li> If interrupted while blocked in step 4, throw InterruptedException.
         * </ol>
         */
        public final long awaitNanos(long nanosTimeout)
                throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            Node node = addConditionWaiter();
            int savedState = fullyRelease(node);
            final long deadline = System.nanoTime() + nanosTimeout;
            int interruptMode = 0;
            while (!isOnSyncQueue(node)) {
                if (nanosTimeout <= 0L) {
                    transferAfterCancelledWait(node);
                    break;
                }
                if (nanosTimeout >= spinForTimeoutThreshold)
                    LockSupport.parkNanos(this, nanosTimeout);
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
                nanosTimeout = deadline - System.nanoTime();
            }
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null)
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
            return deadline - System.nanoTime();
        }

        /**
         * Implements absolute timed condition wait.
         * <ol>
         * <li> If current thread is interrupted, throw InterruptedException.
         * <li> Save lock state returned by {@link #getState}.
         * <li> Invoke {@link #release} with saved state as argument,
         *      throwing IllegalMonitorStateException if it fails.
         * <li> Block until signalled, interrupted, or timed out.
         * <li> Reacquire by invoking specialized version of
         *      {@link #acquire} with saved state as argument.
         * <li> If interrupted while blocked in step 4, throw InterruptedException.
         * <li> If timed out while blocked in step 4, return false, else true.
         * </ol>
         */
        public final boolean awaitUntil(Date deadline)
                throws InterruptedException {
            long abstime = deadline.getTime();
            if (Thread.interrupted())
                throw new InterruptedException();
            Node node = addConditionWaiter();
            int savedState = fullyRelease(node);
            boolean timedout = false;
            int interruptMode = 0;
            while (!isOnSyncQueue(node)) {
                if (System.currentTimeMillis() > abstime) {
                    timedout = transferAfterCancelledWait(node);
                    break;
                }
                LockSupport.parkUntil(this, abstime);
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null)
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
            return !timedout;
        }

        /**
         * Implements timed condition wait.
         * <ol>
         * <li> If current thread is interrupted, throw InterruptedException.
         * <li> Save lock state returned by {@link #getState}.
         * <li> Invoke {@link #release} with saved state as argument,
         *      throwing IllegalMonitorStateException if it fails.
         * <li> Block until signalled, interrupted, or timed out.
         * <li> Reacquire by invoking specialized version of
         *      {@link #acquire} with saved state as argument.
         * <li> If interrupted while blocked in step 4, throw InterruptedException.
         * <li> If timed out while blocked in step 4, return false, else true.
         * </ol>
         */
        public final boolean await(long time, TimeUnit unit)
                throws InterruptedException {
            long nanosTimeout = unit.toNanos(time);
            if (Thread.interrupted())
                throw new InterruptedException();
            Node node = addConditionWaiter();
            int savedState = fullyRelease(node);
            final long deadline = System.nanoTime() + nanosTimeout;
            boolean timedout = false;
            int interruptMode = 0;
            while (!isOnSyncQueue(node)) {
                if (nanosTimeout <= 0L) {
                    timedout = transferAfterCancelledWait(node);
                    break;
                }
                if (nanosTimeout >= spinForTimeoutThreshold)
                    LockSupport.parkNanos(this, nanosTimeout);
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
                nanosTimeout = deadline - System.nanoTime();
            }
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null)
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
            return !timedout;
        }

        //  support for instrumentation

        /**
         * Returns true if this condition was created by the given
         * synchronization object.
         *
         * @return {@code true} if owned
         */
        final boolean isOwnedBy(AbstractQueuedSynchronizer sync) {
            return sync == AbstractQueuedSynchronizer.this;
        }

        /**
         * Queries whether any threads are waiting on this condition.
         * Implements {@link AbstractQueuedSynchronizer#hasWaiters(ConditionObject)}.
         *
         * @return {@code true} if there are any waiting threads
         * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
         *         returns {@code false}
         */
        protected final boolean hasWaiters() {
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
                if (w.waitStatus == Node.CONDITION)
                    return true;
            }
            return false;
        }

        /**
         * Returns an estimate of the number of threads waiting on
         * this condition.
         * Implements {@link AbstractQueuedSynchronizer#getWaitQueueLength(ConditionObject)}.
         *
         * @return the estimated number of waiting threads
         * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
         *         returns {@code false}
         */
        protected final int getWaitQueueLength() {
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            int n = 0;
            for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
                if (w.waitStatus == Node.CONDITION)
                    ++n;
            }
            return n;
        }

        /**
         * Returns a collection containing those threads that may be
         * waiting on this Condition.
         * Implements {@link AbstractQueuedSynchronizer#getWaitingThreads(ConditionObject)}.
         *
         * @return the collection of threads
         * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
         *         returns {@code false}
         */
        protected final Collection<Thread> getWaitingThreads() {
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            ArrayList<Thread> list = new ArrayList<Thread>();
            for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
                if (w.waitStatus == Node.CONDITION) {
                    Thread t = w.thread;
                    if (t != null)
                        list.add(t);
                }
            }
            return list;
        }
    }

ConditionObject

ConditionObject也是AbstractQueuedSynchronizer的一个内部类,主要是处理线程通信的一个数据结构,里面维护一个等待被唤醒的线程的节点队列。

AbstractQueuedSynchronizer的成员变量,和几个CAS操作

    /**
     * Head of the wait queue, lazily initialized.  Except for
     * initialization, it is modified only via method setHead.  Note:
     * If head exists, its waitStatus is guaranteed not to be
     * CANCELLED.
     */
    private transient volatile Node head;

    /**
     * Tail of the wait queue, lazily initialized.  Modified only via
     * method enq to add new wait node.
     */
    private transient volatile Node tail;

    /**
     * The synchronization state.
     */
    private volatile int state;

-----------------------------CAS操作----------------------------------------------
    /**
     * Setup to support compareAndSet. We need to natively implement
     * this here: For the sake of permitting future enhancements, we
     * cannot explicitly subclass AtomicInteger, which would be
     * efficient and useful otherwise. So, as the lesser of evils, we
     * natively implement using hotspot intrinsics API. And while we
     * are at it, we do the same for other CASable fields (which could
     * otherwise be done with atomic field updaters).
     */
    private static final Unsafe unsafe = Unsafe.getUnsafe();
    private static final long stateOffset;
    private static final long headOffset;
    private static final long tailOffset;
    private static final long waitStatusOffset;
    private static final long nextOffset;

    static {
        try {
            stateOffset = unsafe.objectFieldOffset
                (AbstractQueuedSynchronizer.class.getDeclaredField("state"));
            headOffset = unsafe.objectFieldOffset
                (AbstractQueuedSynchronizer.class.getDeclaredField("head"));
            tailOffset = unsafe.objectFieldOffset
                (AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
            waitStatusOffset = unsafe.objectFieldOffset
                (Node.class.getDeclaredField("waitStatus"));
            nextOffset = unsafe.objectFieldOffset
                (Node.class.getDeclaredField("next"));

        } catch (Exception ex) { throw new Error(ex); }
    }

    /**
     * CAS head field. Used only by enq.
     */
    private final boolean compareAndSetHead(Node update) {
        return unsafe.compareAndSwapObject(this, headOffset, null, update);
    }

    /**
     * CAS tail field. Used only by enq.
     */
    private final boolean compareAndSetTail(Node expect, Node update) {
        return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
    }

    /**
     * CAS waitStatus field of a node.
     */
    private static final boolean compareAndSetWaitStatus(Node node,
                                                         int expect,
                                                         int update) {
        return unsafe.compareAndSwapInt(node, waitStatusOffset,
                                        expect, update);
    }

    /**
     * CAS next field of a node.
     */
    private static final boolean compareAndSetNext(Node node,
                                                   Node expect,
                                                   Node update) {
        return unsafe.compareAndSwapObject(node, nextOffset, expect, update);
    }

AbstractQueuedSynchronizer通过head和tail连接一个逻辑上的队列,我称之为sync队列,与Condition队列区别开来。后面的CAS操作,以后专门写一遍文章来阐述

现在再重新审视一遍Lock版本的生产者-消费者

lock.lock()调用的是非公平锁的lock方法

        final void lock() {
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
        }

第一个进来的线程如愿的获得了锁,其它的线程调用acquire(1),这个方法在AbstractQueuedSynchronizer

    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

这几行代码真是短小精悍!!!tryAcquire方法本来就是留给子类实现自己的逻辑的,又回到非公平锁,然后调用

        final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            // 第一个线程已经将state改为1了
            if (c == 0) {
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            // 这里处理可重入,每次重入+1
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

然后就会调用addWaiter方法,将线程封装成Node添加到sync队列

    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }
    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

这个enq方法是个入队列操作,似乎在死循环,其实循环两次也就返回了,请读者开动脑筋。

成功添加sync队列之后,开始执行acquireQueued方法

    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            // 此处才是真正的死循环
            for (;;) {
                final Node p = node.predecessor();
                // 这里要先判断node的前驱节点是否为头节点,然后再尝试获取锁
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
    // 当前节点获取锁失败后,是否要将线程park
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        // 前驱节点的等待状态为SIGNAL,可以将当前线程park
        if (ws == Node.SIGNAL)
            /*
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             */
            return true;
        // 前驱节点的等待状态为CANCELLED,则遍历所有前驱,将所有CANCELLED前驱跳过
        if (ws > 0) {
            /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /*
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don‘t park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             */
            // 将前驱节点的等待状态设置为SIGNAL
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }
    private final boolean parkAndCheckInterrupt() {
        // 将线程park
        LockSupport.park(this);
        // 是否中断
        return Thread.interrupted();
    }

park方法最终会借助操作系统将当前线程阻塞,与之对应的unpark方法会唤醒线程。

以上就是lock.lock()获取锁时的大体逻辑,lock.unlock()释放锁时的逻辑不再赘述,望有心人仔细阅读。

回过头来再说说Condition队列。

full.await()是ConditionObject的方法

        public final void await() throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            // 将当前阶段加入到Condition等待队列
            Node node = addConditionWaiter();
            // 释放锁
            int savedState = fullyRelease(node);
            int interruptMode = 0;
            // 判断节点是否已经转移到了sync队列,也是一直循环
            while (!isOnSyncQueue(node)) {
                LockSupport.park(this);
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
            // 被唤醒之后,再度尝试获取锁
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null) // clean up if cancelled
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
        }

addConditionWaiter方法也比较简单,这里不再分析了。

然后就到了full.signal(),signal方法做的事情不多,真正干活的是doSignal方法

        private void doSignal(Node first) {
            do {
                if ( (firstWaiter = first.nextWaiter) == null)
                    lastWaiter = null;
                first.nextWaiter = null;
            } while (!transferForSignal(first) &&
                     (first = firstWaiter) != null);
        }

signal会将Condition等待队列的头节点,通过transferForSignal转移到sync队列,让节点中的线程去竞争锁以获得执行的机会。有一点值得注意,调用signal方法之后,头节点中的线程并没有马上被唤醒,至于什么时候被唤醒,就得看sync队列里的节点的执行情况了。这和wait-notify是一样的,调用notify方法后,没有马上释放锁,只有执行完synchronized代码后,才会释放锁,让被唤醒的线程获取。

时间: 2024-11-13 20:35:06

synchronized VS Lock, wait-notify VS Condition的相关文章

【Java并发系列04】线程锁synchronized和Lock和volatile和Condition

img { border: solid 1px } 一.前言 多线程怎么防止竞争资源,即防止对同一资源进行并发操作,那就是使用加锁机制.这是Java并发编程中必须要理解的一个知识点.其实使用起来还是比较简单,但是一定要理解. 有几个概念一定要牢记: 加锁必须要有锁 执行完后必须要释放锁 同一时间.同一个锁,只能有一个线程执行 二.synchronized synchronized的特点是自动释放锁,作用在方法时自动获取锁,任意对象都可做为锁,它是最常用的加锁机制,锁定几行代码,如下: //---

synchronized与lock 对象锁、互斥锁、共享锁以及公平锁和非公平锁

synchronized与lock  都是用来实现线程同步的锁,synchronized对象锁,lock是一个接口,她的实现有reentrantlock互斥锁以及ReentrantReadWriteLock共享锁. 这里说明一下ReentrantReadWriteLock共享锁,所谓共享就是该锁提供读读锁共享,即可以多个线程共享一个读取锁,但是读写锁以及读读锁是互斥的. 看到网上有好多介绍介绍锁的种类的,有对象锁.互斥锁.共享锁以及公平锁和非公平锁,但是说明都不够详细了然,在这里用直白的说法记录

synchronized和lock的区别

原文摘自:https://www.eyesmoons.com/article/75 1,原始构成 synchronized是关键字,属于JVM层面,通过wait,notify和notifyAll来调度线程. Lock是具体类,是api层面的锁. 2,使用方法 synchronized不需要用户手动去释放锁, 当synchronized代码执行完后,系统会自动释放锁. Lock需要用户手动释放锁,否则会出现死锁现象.需要lock和unlock配合try/finally语句块来完成. 3,等待是否中

java - synchronized与lock的区别

synchronized与lock的区别 原始构成 synchronized是关键字属于JVM层面 monitorenter(底层是通过monitor对象来完成,其实wait/notify等对象也依赖于monitor独享只有在同步块或方法中才能调wait/notify等方法) monitorexit Lock是具体类(java.utl.concurrent.locks.Lock)是api层面的锁 使用方法 synchronized不需要用户去手动释放锁,当synchronized代码执行完后系统

synchronized 与 Lock 的那点事

最近在做一个监控系统,该系统主要包括对数据实时分析和存储两个部分,由于并发量比较高,所以不可避免的使用到了一些并发的知识.为了实现这些要求,后台使用一个队列作为缓存,对于请求只管往缓存里写数据.同时启动一个线程监听该队列,检测到数据,立即请求调度线程,对数据进行处理. 具体的使用方案就是使用同步保证数据的正常,使用线程池提高效率. 同步的实现当然是采用锁了,java中使用锁的两个基本工具是 synchronized 和 Lock. 一直很喜欢synchronized,因为使用它很方便.比如,需要

Java中synchronized和Lock的区别

synchronized和Lock的区别synchronize锁对象可以是任意对象,由于监视器方法必须要拥有锁对象那么任意对象都可以调用的方法所以将其抽取到Object类中去定义监视器方法这样锁对象和监视器对象是同一个,只要创建了锁对象它既是锁对象同时也是监视器对象这样不能实现在一个锁对象上绑定多个监视器对象jdk1.5中Lock对象仅仅是一个锁对象监视器方法被封装到了Condition对象中这样实现了锁对象和监视器对象进行了分离更加的面向对象这样可以实现在一个锁对象上绑定多个监视器对象 在一个

java多线程并发(一)——Semaphore,volatile,synchronized ,Lock

在并发编程中,我们通常会遇到以下三个问题:原子性问题,可见性问题,有序性问题.我们先看具体看一下这三个概念: 1.原子性 原子性:即一个操作或者多个操作 要么全部执行并且执行的过程不会被任何因素打断,要么就都不执行. 一个很经典的例子就是银行账户转账问题 2.可见性 可见性是指当多个线程访问同一个变量时,一个线程修改了这个变量的值,其他线程能够立即看得到修改的值. 3.有序性 有序性:即程序执行的顺序按照代码的先后顺序执行. Semaphore 简介 信号量(Semaphore),有时被称为信号

【java多线程】(3)---synchronized、Lock

synchronized.Lock 一.概述 1.出现线程不安全的原因是什么? 如果我们创建的多个线程,存在着共享数据,那么就有可能出现线程的安全问题:当其中一个线程操作共享数据时,还未操作完成,另外的线程就参与进来,导致对共享数据的操作出现问题. 2.线程不安全解决办法 要求一个线程操作共享数据时,只有当其完成操作完成共享数据,其它线程才有机会执行共享数据.java提供了两种方式来实现同步互斥访问:synchronized和Lock. 二.synchronized synchronized可以

java中synchronized与lock的理解与应用

总结来说,Lock和synchronized有以下几点不同: 1)Lock是一个接口,而synchronized是Java中的关键字,synchronized是内置的语言实现: 2)synchronized在发生异常时,会自动释放线程占有的锁,因此不会导致死锁现象发生:而Lock在发生异常时,如果没有主动通过unLock()去释放锁,则很可能造成死锁现象,因此使用Lock时需要在finally块中释放锁: 3)Lock可以让等待锁的线程响应中断,而synchronized却不行,使用synchr

多线程编程之synchronized和Lock

前言 在高并发多线程应用场景中对于synchronized和Lock的使用是很普遍的,这篇文章我们就来进行这些知识点的学习,比如说:公平锁与非公平锁.乐观锁与悲观锁.线程间通信.读写锁.数据脏读等知识内容.目录:1.同步问题的产生与案例代码2.synchronized解决同步问题3.Lock解决同步代码问题4.公平锁与非公平锁5.乐观锁与悲观锁6.synchronized与Lock比较 同步问题案例 这个问题在我们日常生活中非常常见,比如说:秒杀物品的库存数据.火车票剩余票等就是有同步问题,下面