1.概述
使用Condition应在Lock的前提下,请先参见Java_并发线程_Lock、ReadWriteLock一文。在synchronized同步代码块中使用了obj的锁对象,然后通过obj.notify()和obj.wait()来配合处理多线程的问题。然而,同样lock和condition配合使用同样可以完成同样的功能,condition只有配合lock使用才有意义,只不过lock更加的灵活,使用的格式如下。
//lock 与 Condition private static ReentrantLock lock = new ReentrantLock(); private static Condition condition1 = lock.newCondition(); { lock.lock(); try{ // condition1.await();//condition1应在与其对应的lock区间被调用,等待其它线程调用 condition1.signal(); // }finally{ lock.unlock(); } }
2.await()与signal()原理分析
static final class Node { volatile Node prev; volatile Node next; volatile Thread thread; Node nextWaiter; }public class ConditionObject implements Condition, java.io.Serializable { /** First node of condition queue. */ private transient Node firstWaiter; /** Last node of condition queue. */ private transient Node lastWaiter; //... }Condition内部维护了一个Node的双向链表,调用condition.await(),则会新创建一个Node节点,lastWaiter指向这个新创建的节点对象;每次调用signal(),则会通过firstWaiter从链表的前面拿到Node对象,并将firstWaiter指向当前Node.nextWaiter对象。然后对这个Node对象进行操作判断。
(1).await()
作用:当前线程休眠停止调度;是否锁;置于队列等待signal()/** * 如果当前线程中断,则抛出InterruptedException * 阻塞直到 调用了signal和线程被中断 */ public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); //新创建node对象,放入链表尾 int savedState = fullyRelease(node); int interruptMode = 0; //判断是否在同步队列 while (!isOnSyncQueue(node)) { LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }
(2).signal()
/** * Moves the longest-waiting thread, if one exists, from the * wait queue for this condition to the wait queue for the * owning lock. * * @throws IllegalMonitorStateException if {@link #isHeldExclusively} * returns {@code false} */ public final void signal() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignal(first); }
3.自定义阻塞队列
class BoundedBuffer { final Lock lock = new ReentrantLock(); final Condition notFull = lock.newCondition(); final Condition notEmpty = lock.newCondition(); final Object[] items = new Object[100]; int putptr, takeptr, count; public void put(Object x) throws InterruptedException { lock.lock(); try { while (count == items.length) //等待没有慢 notFull.await(); items[putptr] = x; if (++putptr == items.length) putptr = 0; ++count; //已经不为空 notEmpty.signal(); } finally { lock.unlock(); } } public Object take() throws InterruptedException { lock.lock(); try { while (count == 0) //等待不为空 notEmpty.await(); Object x = items[takeptr]; if (++takeptr == items.length) takeptr = 0; --count; //已经不为满 notFull.signal(); return x; } finally { lock.unlock(); } } }
4.应用实例
private static ReentrantLock lock = new ReentrantLock(); private static Condition condition1 = lock.newCondition(); public static void main(String[] args) { new Thread() { @Override public void run() { lock.lock(); try { // System.out.println(Thread.currentThread().getName() + ", locked"); try { condition1.await(); System.out.println(Thread.currentThread().getName() + ", awaited"); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + ", will finally"); } finally { System.out.println(Thread.currentThread().getName() + ", finally"); lock.unlock(); System.out.println(Thread.currentThread().getName() + ", unlocked"); } } }.start(); new Thread() { @Override public void run() { lock.lock(); try { System.out.println(Thread.currentThread().getName() + ", locked"); condition1.signal();//但是没有释放锁,等待lock.unlock()后,condition1对应的线程才被唤醒,和synchronized一样 System.out.println(Thread.currentThread().getName() + ", will finally"); } finally { System.out.println(Thread.currentThread().getName() + ", finally"); try { Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + ", unlocked"); lock.unlock(); } } }.start(); } /* * Thread-1, locked Thread-2, locked Thread-2, will finally Thread-2, finally Thread-2, unlocked Thread-1, awaited Thread-1, will finally Thread-1, finally Thread-1, unlocked */
时间: 2024-10-10 00:46:35