最近在看Java Threads第三版,收获颇多。全英文阅读,感觉真的是爽歪歪。推荐大家都看看。
这一篇想系统的讲一讲,线程之间通信的2种模式,wait-notify 和 Condition。
先上一个生产者和消费者的例子
package waitnotify; import java.util.ArrayList; import java.util.List; import java.util.Random; public class Data { private List<Integer> data = new ArrayList<Integer>(5); private Random random = new Random(); public void put() { synchronized (data) { if (data.size() >= 5) { try { data.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } else { data.add(random.nextInt(100)); data.notify(); } } } public void get() { synchronized (data) { if (data.size() < 1) { try { data.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } else { data.remove(0); data.notify(); } } } }
package waitnotify; public class Producer extends Thread{ private Data data; public Producer(Data data, String name) { this.data = data; this.setName(name); } @Override public void run() { for (int i=0; i<3; i++) { data.put(); } } }
package waitnotify; public class Consumer extends Thread { private Data data; public Consumer(Data data, String name) { this.data = data; this.setName(name); } @Override public void run() { for (int i=0; i<3; i++) { data.get(); } } }
package waitnotify; public class Test { public static void main (String args[]) { Data data = new Data(); new Producer(data, "put---1").start(); new Consumer(data, "get---1").start(); new Consumer(data, "get---2").start(); new Producer(data, "put---2").start(); } }
这个例子简单、易懂、易读。首先基于synchronized锁定共享资源(data),然后生产者和消费者通过wait和notify相互通信,实现互斥的访问共享资源。这里的关键是互斥,只要用了synchronized,所有的线程都只能互斥的执行。
1、那我们思考一个有关数据库的问题,写操作和读操作、写操作和写操作都是需要互斥的,读操作和读操作需要互斥吗?不需要,因为两个读操作同时进行,不会产生脏数据。这种情况,synchronized就无从下手。
2、还有,当多个线程访问synchronized代码,只有一个线程在执行,其它的线程只能死等。这里也是可以增加一些灵活性的,比如说,其它的线程可以等到一定时间后,中断自己,去干别的事情。
3、notify的作用是唤醒除了自己以外的其它线程,至于是哪一个,它不管。试想一个场景,有多个生产者线程和消费者线程。如果一个生产者线程发现data被装满,自己就等待,然后唤醒一个线程,那唤醒的那个线程是生产者线程,还是消费者线程呢?如果又唤醒了一个生产者线程,它还是等待,虽然不会产生脏数据,但是浪费了性能和时间啊。此处也可以优化。
下面就一起看看,如何解决上面的问题。
先来看看Lock这个接口,定义了哪些行为
public interface Lock { void lock(); void lockInterruptibly() throws InterruptedException; boolean tryLock(); boolean tryLock(long time, TimeUnit unit) throws InterruptedException; void unlock(); Condition newCondition(); }
其中2个抛出中断异常的方法可以解决第二个问题;newCondition方法生成多个Condition可以解决第三个问题;而另一个锁接口ReadWriteLock解决了第一个问题,实现类里面用到了共享锁。
再来一个Condition版本的生产者和消费者(主要是修改了Data类)
package condition; import java.util.ArrayList; import java.util.List; import java.util.Random; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class Data { private List<Integer> data = new ArrayList<Integer>(5); private Random random = new Random(); Lock lock = new ReentrantLock(); Condition full = lock.newCondition(); Condition empty = lock.newCondition(); public void put() { lock.lock(); try { if (data.size() >= 5) { full.await(); } else { data.add(random.nextInt(100)); empty.signal(); } } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } public void get() { lock.lock(); try { if (data.size() < 1) { empty.await(); } else { data.remove(0); full.signal(); } } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } }
这里用一个lock生成了2个Condition,然后分别进行await和signal。假设一个生产者线程发现data装满,然后full.await,然后之前的empty.signal会唤醒一个线程,此线程必定是消费者线程。其实讲到这里,读者应该能猜到,full和empty分别维护了各自的等待队列,full的队列里全是生产者线程,empty队列里全是消费者线程。另外发现,代码似乎变复杂了,必须在finally里面释放锁。因为synchronized是JVM实现的锁语义,JVM会自动的帮你释放锁。而Lock是Java代码层面的语义,如果有异常,需要自己释放。
讲了半天,好像只是描述了一个表面现象,而没有接触到实质。没办法,源码走起。
从最重要的一个类ReentrantLock的实现看起,发现几乎所有的操作,兜兜转转都是调用了另一个核心类AbstractQueuedSynchronizer。这个类是Java并发大师的杰作,是Lock机制的灵魂所在,我至今还有些地方没能够完全领悟。哎,大师就是大师。
当然,大师也是站在巨人的肩膀上。AbstractQueuedSynchronizer的核心思想是CLH队列,说白了,就是将同一个锁上等待的所有线程看成一个个节点,每一个节点都在自旋,等待前驱节点的信号并且尝试获取锁。而且,AbstractQueuedSynchronizer将同一个Condition上等待的所有线程也封装成了节点队列。
那就先看看这个节点是个什么数据结构
static final class Node { /** Marker to indicate a node is waiting in shared mode */ static final Node SHARED = new Node(); /** Marker to indicate a node is waiting in exclusive mode */ static final Node EXCLUSIVE = null; /** waitStatus value to indicate thread has cancelled */ static final int CANCELLED = 1; /** waitStatus value to indicate successor‘s thread needs unparking */ static final int SIGNAL = -1; /** waitStatus value to indicate thread is waiting on condition */ static final int CONDITION = -2; /** * waitStatus value to indicate the next acquireShared should * unconditionally propagate */ static final int PROPAGATE = -3; /** * Status field, taking on only the values: * SIGNAL: The successor of this node is (or will soon be) * blocked (via park), so the current node must * unpark its successor when it releases or * cancels. To avoid races, acquire methods must * first indicate they need a signal, * then retry the atomic acquire, and then, * on failure, block. * CANCELLED: This node is cancelled due to timeout or interrupt. * Nodes never leave this state. In particular, * a thread with cancelled node never again blocks. * CONDITION: This node is currently on a condition queue. * It will not be used as a sync queue node * until transferred, at which time the status * will be set to 0. (Use of this value here has * nothing to do with the other uses of the * field, but simplifies mechanics.) * PROPAGATE: A releaseShared should be propagated to other * nodes. This is set (for head node only) in * doReleaseShared to ensure propagation * continues, even if other operations have * since intervened. * 0: None of the above * * The values are arranged numerically to simplify use. * Non-negative values mean that a node doesn‘t need to * signal. So, most code doesn‘t need to check for particular * values, just for sign. * * The field is initialized to 0 for normal sync nodes, and * CONDITION for condition nodes. It is modified using CAS * (or when possible, unconditional volatile writes). */ volatile int waitStatus; /** * Link to predecessor node that current node/thread relies on * for checking waitStatus. Assigned during enqueuing, and nulled * out (for sake of GC) only upon dequeuing. Also, upon * cancellation of a predecessor, we short-circuit while * finding a non-cancelled one, which will always exist * because the head node is never cancelled: A node becomes * head only as a result of successful acquire. A * cancelled thread never succeeds in acquiring, and a thread only * cancels itself, not any other node. */ volatile Node prev; /** * Link to the successor node that the current node/thread * unparks upon release. Assigned during enqueuing, adjusted * when bypassing cancelled predecessors, and nulled out (for * sake of GC) when dequeued. The enq operation does not * assign next field of a predecessor until after attachment, * so seeing a null next field does not necessarily mean that * node is at end of queue. However, if a next field appears * to be null, we can scan prev‘s from the tail to * double-check. The next field of cancelled nodes is set to * point to the node itself instead of null, to make life * easier for isOnSyncQueue. */ volatile Node next; /** * The thread that enqueued this node. Initialized on * construction and nulled out after use. */ volatile Thread thread; /** * Link to next node waiting on condition, or the special * value SHARED. Because condition queues are accessed only * when holding in exclusive mode, we just need a simple * linked queue to hold nodes while they are waiting on * conditions. They are then transferred to the queue to * re-acquire. And because conditions can only be exclusive, * we save a field by using special value to indicate shared * mode. */ Node nextWaiter; /** * Returns true if node is waiting in shared mode. */ final boolean isShared() { return nextWaiter == SHARED; } /** * Returns previous node, or throws NullPointerException if null. * Use when predecessor cannot be null. The null check could * be elided, but is present to help the VM. * * @return the predecessor of this node */ final Node predecessor() throws NullPointerException { Node p = prev; if (p == null) throw new NullPointerException(); else return p; } Node() { // Used to establish initial head or SHARED marker } Node(Thread thread, Node mode) { // Used by addWaiter this.nextWaiter = mode; this.thread = thread; } Node(Thread thread, int waitStatus) { // Used by Condition this.waitStatus = waitStatus; this.thread = thread; } }
这个Node是AbstractQueuedSynchronizer的一个内部类,大部分的英文注释说的很明白了。Node就是对线程的包装,Node有两种模式,独占(EXCLUSIVE)和共享(SHARED),前面提到的读-读操作就是利用了共享锁。对于一个节点的等待状态,CANCELLED表示当前节点被取消,比如说线程中断导致的;SIGNAL表示后继节点可以执行;CONDITION表示此节点在等待一个Condition,在Condition等待队列里面。
再看看Condition等待队列的结构
public class ConditionObject implements Condition, java.io.Serializable { private static final long serialVersionUID = 1173984872572414699L; /** First node of condition queue. */ private transient Node firstWaiter; /** Last node of condition queue. */ private transient Node lastWaiter; /** * Creates a new {@code ConditionObject} instance. */ public ConditionObject() { } // Internal methods /** * Adds a new waiter to wait queue. * @return its new wait node */ private Node addConditionWaiter() { Node t = lastWaiter; // If lastWaiter is cancelled, clean out. if (t != null && t.waitStatus != Node.CONDITION) { unlinkCancelledWaiters(); t = lastWaiter; } Node node = new Node(Thread.currentThread(), Node.CONDITION); if (t == null) firstWaiter = node; else t.nextWaiter = node; lastWaiter = node; return node; } /** * Removes and transfers nodes until hit non-cancelled one or * null. Split out from signal in part to encourage compilers * to inline the case of no waiters. * @param first (non-null) the first node on condition queue */ private void doSignal(Node first) { do { if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; first.nextWaiter = null; } while (!transferForSignal(first) && (first = firstWaiter) != null); } /** * Removes and transfers all nodes. * @param first (non-null) the first node on condition queue */ private void doSignalAll(Node first) { lastWaiter = firstWaiter = null; do { Node next = first.nextWaiter; first.nextWaiter = null; transferForSignal(first); first = next; } while (first != null); } /** * Unlinks cancelled waiter nodes from condition queue. * Called only while holding lock. This is called when * cancellation occurred during condition wait, and upon * insertion of a new waiter when lastWaiter is seen to have * been cancelled. This method is needed to avoid garbage * retention in the absence of signals. So even though it may * require a full traversal, it comes into play only when * timeouts or cancellations occur in the absence of * signals. It traverses all nodes rather than stopping at a * particular target to unlink all pointers to garbage nodes * without requiring many re-traversals during cancellation * storms. */ private void unlinkCancelledWaiters() { Node t = firstWaiter; Node trail = null; while (t != null) { Node next = t.nextWaiter; if (t.waitStatus != Node.CONDITION) { t.nextWaiter = null; if (trail == null) firstWaiter = next; else trail.nextWaiter = next; if (next == null) lastWaiter = trail; } else trail = t; t = next; } } // public methods /** * Moves the longest-waiting thread, if one exists, from the * wait queue for this condition to the wait queue for the * owning lock. * * @throws IllegalMonitorStateException if {@link #isHeldExclusively} * returns {@code false} */ public final void signal() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignal(first); } /** * Moves all threads from the wait queue for this condition to * the wait queue for the owning lock. * * @throws IllegalMonitorStateException if {@link #isHeldExclusively} * returns {@code false} */ public final void signalAll() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignalAll(first); } /** * Implements uninterruptible condition wait. * <ol> * <li> Save lock state returned by {@link #getState}. * <li> Invoke {@link #release} with saved state as argument, * throwing IllegalMonitorStateException if it fails. * <li> Block until signalled. * <li> Reacquire by invoking specialized version of * {@link #acquire} with saved state as argument. * </ol> */ public final void awaitUninterruptibly() { Node node = addConditionWaiter(); int savedState = fullyRelease(node); boolean interrupted = false; while (!isOnSyncQueue(node)) { LockSupport.park(this); if (Thread.interrupted()) interrupted = true; } if (acquireQueued(node, savedState) || interrupted) selfInterrupt(); } /* * For interruptible waits, we need to track whether to throw * InterruptedException, if interrupted while blocked on * condition, versus reinterrupt current thread, if * interrupted while blocked waiting to re-acquire. */ /** Mode meaning to reinterrupt on exit from wait */ private static final int REINTERRUPT = 1; /** Mode meaning to throw InterruptedException on exit from wait */ private static final int THROW_IE = -1; /** * Checks for interrupt, returning THROW_IE if interrupted * before signalled, REINTERRUPT if after signalled, or * 0 if not interrupted. */ private int checkInterruptWhileWaiting(Node node) { return Thread.interrupted() ? (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : 0; } /** * Throws InterruptedException, reinterrupts current thread, or * does nothing, depending on mode. */ private void reportInterruptAfterWait(int interruptMode) throws InterruptedException { if (interruptMode == THROW_IE) throw new InterruptedException(); else if (interruptMode == REINTERRUPT) selfInterrupt(); } /** * Implements interruptible condition wait. * <ol> * <li> If current thread is interrupted, throw InterruptedException. * <li> Save lock state returned by {@link #getState}. * <li> Invoke {@link #release} with saved state as argument, * throwing IllegalMonitorStateException if it fails. * <li> Block until signalled or interrupted. * <li> Reacquire by invoking specialized version of * {@link #acquire} with saved state as argument. * <li> If interrupted while blocked in step 4, throw InterruptedException. * </ol> */ public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); int savedState = fullyRelease(node); int interruptMode = 0; while (!isOnSyncQueue(node)) { LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); } /** * Implements timed condition wait. * <ol> * <li> If current thread is interrupted, throw InterruptedException. * <li> Save lock state returned by {@link #getState}. * <li> Invoke {@link #release} with saved state as argument, * throwing IllegalMonitorStateException if it fails. * <li> Block until signalled, interrupted, or timed out. * <li> Reacquire by invoking specialized version of * {@link #acquire} with saved state as argument. * <li> If interrupted while blocked in step 4, throw InterruptedException. * </ol> */ public final long awaitNanos(long nanosTimeout) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); int savedState = fullyRelease(node); final long deadline = System.nanoTime() + nanosTimeout; int interruptMode = 0; while (!isOnSyncQueue(node)) { if (nanosTimeout <= 0L) { transferAfterCancelledWait(node); break; } if (nanosTimeout >= spinForTimeoutThreshold) LockSupport.parkNanos(this, nanosTimeout); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; nanosTimeout = deadline - System.nanoTime(); } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); return deadline - System.nanoTime(); } /** * Implements absolute timed condition wait. * <ol> * <li> If current thread is interrupted, throw InterruptedException. * <li> Save lock state returned by {@link #getState}. * <li> Invoke {@link #release} with saved state as argument, * throwing IllegalMonitorStateException if it fails. * <li> Block until signalled, interrupted, or timed out. * <li> Reacquire by invoking specialized version of * {@link #acquire} with saved state as argument. * <li> If interrupted while blocked in step 4, throw InterruptedException. * <li> If timed out while blocked in step 4, return false, else true. * </ol> */ public final boolean awaitUntil(Date deadline) throws InterruptedException { long abstime = deadline.getTime(); if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); int savedState = fullyRelease(node); boolean timedout = false; int interruptMode = 0; while (!isOnSyncQueue(node)) { if (System.currentTimeMillis() > abstime) { timedout = transferAfterCancelledWait(node); break; } LockSupport.parkUntil(this, abstime); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); return !timedout; } /** * Implements timed condition wait. * <ol> * <li> If current thread is interrupted, throw InterruptedException. * <li> Save lock state returned by {@link #getState}. * <li> Invoke {@link #release} with saved state as argument, * throwing IllegalMonitorStateException if it fails. * <li> Block until signalled, interrupted, or timed out. * <li> Reacquire by invoking specialized version of * {@link #acquire} with saved state as argument. * <li> If interrupted while blocked in step 4, throw InterruptedException. * <li> If timed out while blocked in step 4, return false, else true. * </ol> */ public final boolean await(long time, TimeUnit unit) throws InterruptedException { long nanosTimeout = unit.toNanos(time); if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); int savedState = fullyRelease(node); final long deadline = System.nanoTime() + nanosTimeout; boolean timedout = false; int interruptMode = 0; while (!isOnSyncQueue(node)) { if (nanosTimeout <= 0L) { timedout = transferAfterCancelledWait(node); break; } if (nanosTimeout >= spinForTimeoutThreshold) LockSupport.parkNanos(this, nanosTimeout); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; nanosTimeout = deadline - System.nanoTime(); } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); return !timedout; } // support for instrumentation /** * Returns true if this condition was created by the given * synchronization object. * * @return {@code true} if owned */ final boolean isOwnedBy(AbstractQueuedSynchronizer sync) { return sync == AbstractQueuedSynchronizer.this; } /** * Queries whether any threads are waiting on this condition. * Implements {@link AbstractQueuedSynchronizer#hasWaiters(ConditionObject)}. * * @return {@code true} if there are any waiting threads * @throws IllegalMonitorStateException if {@link #isHeldExclusively} * returns {@code false} */ protected final boolean hasWaiters() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); for (Node w = firstWaiter; w != null; w = w.nextWaiter) { if (w.waitStatus == Node.CONDITION) return true; } return false; } /** * Returns an estimate of the number of threads waiting on * this condition. * Implements {@link AbstractQueuedSynchronizer#getWaitQueueLength(ConditionObject)}. * * @return the estimated number of waiting threads * @throws IllegalMonitorStateException if {@link #isHeldExclusively} * returns {@code false} */ protected final int getWaitQueueLength() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); int n = 0; for (Node w = firstWaiter; w != null; w = w.nextWaiter) { if (w.waitStatus == Node.CONDITION) ++n; } return n; } /** * Returns a collection containing those threads that may be * waiting on this Condition. * Implements {@link AbstractQueuedSynchronizer#getWaitingThreads(ConditionObject)}. * * @return the collection of threads * @throws IllegalMonitorStateException if {@link #isHeldExclusively} * returns {@code false} */ protected final Collection<Thread> getWaitingThreads() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); ArrayList<Thread> list = new ArrayList<Thread>(); for (Node w = firstWaiter; w != null; w = w.nextWaiter) { if (w.waitStatus == Node.CONDITION) { Thread t = w.thread; if (t != null) list.add(t); } } return list; } }
ConditionObject
ConditionObject也是AbstractQueuedSynchronizer的一个内部类,主要是处理线程通信的一个数据结构,里面维护一个等待被唤醒的线程的节点队列。
AbstractQueuedSynchronizer的成员变量,和几个CAS操作
/** * Head of the wait queue, lazily initialized. Except for * initialization, it is modified only via method setHead. Note: * If head exists, its waitStatus is guaranteed not to be * CANCELLED. */ private transient volatile Node head; /** * Tail of the wait queue, lazily initialized. Modified only via * method enq to add new wait node. */ private transient volatile Node tail; /** * The synchronization state. */ private volatile int state; -----------------------------CAS操作---------------------------------------------- /** * Setup to support compareAndSet. We need to natively implement * this here: For the sake of permitting future enhancements, we * cannot explicitly subclass AtomicInteger, which would be * efficient and useful otherwise. So, as the lesser of evils, we * natively implement using hotspot intrinsics API. And while we * are at it, we do the same for other CASable fields (which could * otherwise be done with atomic field updaters). */ private static final Unsafe unsafe = Unsafe.getUnsafe(); private static final long stateOffset; private static final long headOffset; private static final long tailOffset; private static final long waitStatusOffset; private static final long nextOffset; static { try { stateOffset = unsafe.objectFieldOffset (AbstractQueuedSynchronizer.class.getDeclaredField("state")); headOffset = unsafe.objectFieldOffset (AbstractQueuedSynchronizer.class.getDeclaredField("head")); tailOffset = unsafe.objectFieldOffset (AbstractQueuedSynchronizer.class.getDeclaredField("tail")); waitStatusOffset = unsafe.objectFieldOffset (Node.class.getDeclaredField("waitStatus")); nextOffset = unsafe.objectFieldOffset (Node.class.getDeclaredField("next")); } catch (Exception ex) { throw new Error(ex); } } /** * CAS head field. Used only by enq. */ private final boolean compareAndSetHead(Node update) { return unsafe.compareAndSwapObject(this, headOffset, null, update); } /** * CAS tail field. Used only by enq. */ private final boolean compareAndSetTail(Node expect, Node update) { return unsafe.compareAndSwapObject(this, tailOffset, expect, update); } /** * CAS waitStatus field of a node. */ private static final boolean compareAndSetWaitStatus(Node node, int expect, int update) { return unsafe.compareAndSwapInt(node, waitStatusOffset, expect, update); } /** * CAS next field of a node. */ private static final boolean compareAndSetNext(Node node, Node expect, Node update) { return unsafe.compareAndSwapObject(node, nextOffset, expect, update); }
AbstractQueuedSynchronizer通过head和tail连接一个逻辑上的队列,我称之为sync队列,与Condition队列区别开来。后面的CAS操作,以后专门写一遍文章来阐述
现在再重新审视一遍Lock版本的生产者-消费者
lock.lock()调用的是非公平锁的lock方法
final void lock() { if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); }
第一个进来的线程如愿的获得了锁,其它的线程调用acquire(1),这个方法在AbstractQueuedSynchronizer
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
这几行代码真是短小精悍!!!tryAcquire方法本来就是留给子类实现自己的逻辑的,又回到非公平锁,然后调用
final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); // 第一个线程已经将state改为1了 if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } // 这里处理可重入,每次重入+1 else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
然后就会调用addWaiter方法,将线程封装成Node添加到sync队列
private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; }
private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
这个enq方法是个入队列操作,似乎在死循环,其实循环两次也就返回了,请读者开动脑筋。
成功添加sync队列之后,开始执行acquireQueued方法
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; // 此处才是真正的死循环 for (;;) { final Node p = node.predecessor(); // 这里要先判断node的前驱节点是否为头节点,然后再尝试获取锁 if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
// 当前节点获取锁失败后,是否要将线程park private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; // 前驱节点的等待状态为SIGNAL,可以将当前线程park if (ws == Node.SIGNAL) /* * This node has already set status asking a release * to signal it, so it can safely park. */ return true; // 前驱节点的等待状态为CANCELLED,则遍历所有前驱,将所有CANCELLED前驱跳过 if (ws > 0) { /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */ do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don‘t park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ // 将前驱节点的等待状态设置为SIGNAL compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
private final boolean parkAndCheckInterrupt() { // 将线程park LockSupport.park(this); // 是否中断 return Thread.interrupted(); }
park方法最终会借助操作系统将当前线程阻塞,与之对应的unpark方法会唤醒线程。
以上就是lock.lock()获取锁时的大体逻辑,lock.unlock()释放锁时的逻辑不再赘述,望有心人仔细阅读。
回过头来再说说Condition队列。
full.await()是ConditionObject的方法
public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); // 将当前阶段加入到Condition等待队列 Node node = addConditionWaiter(); // 释放锁 int savedState = fullyRelease(node); int interruptMode = 0; // 判断节点是否已经转移到了sync队列,也是一直循环 while (!isOnSyncQueue(node)) { LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } // 被唤醒之后,再度尝试获取锁 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }
addConditionWaiter方法也比较简单,这里不再分析了。
然后就到了full.signal(),signal方法做的事情不多,真正干活的是doSignal方法
private void doSignal(Node first) { do { if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; first.nextWaiter = null; } while (!transferForSignal(first) && (first = firstWaiter) != null); }
signal会将Condition等待队列的头节点,通过transferForSignal转移到sync队列,让节点中的线程去竞争锁以获得执行的机会。有一点值得注意,调用signal方法之后,头节点中的线程并没有马上被唤醒,至于什么时候被唤醒,就得看sync队列里的节点的执行情况了。这和wait-notify是一样的,调用notify方法后,没有马上释放锁,只有执行完synchronized代码后,才会释放锁,让被唤醒的线程获取。