源代码解析
Re‘entrantLock lock = new ReentrantLock(fair);
Condition
notEmpty = lock.newCondition(); //返回内部类 AbstractQueuedSyncronizer.ConditionObject
各自维护了两个队列.一个是阻塞同步队列 syncQueue 双向队列,一个是条件等待队列.
Condition.await两个作用.1.放入同步队列 park 2.realse锁,3等待别人获取锁acquire(),并且.signal .unlock()之后调用acquiredQueue()从阻塞同步队列里复活出来.
Condition..signal 1.在父类Locker.lock()获取锁之后,从条件队列迁移到阻塞同步队列.2.等待之后的unlcok
释放锁,并唤醒next线程.
能够park .signal 完成线程加入到阻塞队列中( 因为signal必须在对应的lock()后操作. 所以从条件队列中迁移出不可能获得锁,只能加入到线程队列中.)
之前的误区: 何时await的线程被唤醒?和正在syncQueue中的线程优先级哪个高?
我理解为signal之后唤醒await线程.
正确理解: signal只是转移线程,并不是唤醒await队列的地方.真正唤醒await线程的地方在持有Locker.unlock的时候.(见LinkedBlockingQueue中的signalNotFull()方法.)
await线程被转移到syncQueue时,已经有线程在排队,那么只好放在队尾.
下面有LinkedBlockingQueue的先take,await, 然后被put.signal的时序图.
private void signalNotEmpty() {
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
notEmpty.signal(); // phil:必须在lock()后面,锁已经被线程获取了.
} finally {
takeLock.unlock();
}
}
ReentrantLock.unlock完成下一个线程(可能刚好是signal加入的)的unpark.
所以总结: signal后不一定是之前的那个await的线程. 获得锁执行..
/** * Implements interruptible condition wait. * <ol> * <li> If current thread is interrupted, throw InterruptedException * <li> Save lock state returned by {@link #getState} * <li> Invoke {@link #release} with * saved state as argument, throwing * IllegalMonitorStateException if it fails. * <li> Block until signalled or interrupted * <li> Reacquire by invoking specialized version of * {@link #acquire} with saved state as argument. * <li> If interrupted while blocked in step 4, throw exception * </ol> */ public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); int savedState = fullyRelease(node); int interruptMode = 0; while (!isOnSyncQueue(node)) { condition.signal后,迁移线程到父类的阻塞线程队列,如果父类ReentrantLocker执行release()操作,唤醒队列头线程,但线程还在队列里. 所以不用死循环. 所以说: Lock的孩子的条件队列和Lock自己的阻塞队列是互斥的. LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); } |
/** * Removes and transfers nodes until hit non-cancelled one or * null. Split out from signal in part to encourage compilers * to inline the case of no waiters. * @param first (non-null) the first node on condition queue */ private void doSignal(Node first) { do { if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; first.nextWaiter = null; //隔断老的队列头和原队列的连接. } while (!transferForSignal(first) && (first = firstWaiter) != null); } /** * Transfers a node from a condition queue onto sync queue. * Returns true if successful. * @param node the node * @return true if successfully transferred (else the node was * cancelled before signal). */ final boolean transferForSignal(Node node) { /* * If cannot change waitStatus, the node has been cancelled. */ if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; /* * Splice onto queue and try to set waitStatus of predecessor to * indicate that thread is (probably) waiting. If cancelled or * attempt to set waitStatus fails, wake up to resync (in which * case the waitStatus can be transiently and harmlessly wrong). */ Node p = enq(node); int ws = p.waitStatus; if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true; } |
何时unpark()呢?
ArrayBlockingQueue. 内部只有一个Locker.
所以所有的.await都在这个locker.lock()和unlock()之间.
在.signal()和父类locker的unlocker() 两个动作先后执行之后 unpark.
LinkedBlockingQueue 中
由于 take的Condition的signal是在put()内执行的.
所以特意在.signal()的外面封装了 takeLocker.lock()和 unLock();
这两个操作(signal和unlock() )才能成为一个整体 ,实现线程从条件队列迁移到阻塞队列,然后线程的唤醒,然后从阻塞队列的剔除这个线程.
LinkedBlockingQueue的先take,await, 然后被put.signal的时序图.高清版右键下载
用https://www.gliffy.com [email protected]画出来
[源码]Condition的原理,简单案例(ArrayBlockingQueue),复杂案例(LinkedBlockingQueue).