创建状态依赖类的最简单方法通常是在类库中现有状态依赖类的基础上进行构造。如果类库中没有提供你需要的功能,可以使用java语言和类库提供的底层机制来构造自己的同步机制,包括内置的条件队列、显示地Condition对象以及AbstractQueuedSynchronizer框架。
在单线程程序中调用方法时,如果基于某个状态的前提条件未得到满足,那么这个条件永远无法成真。而在并发程序中,基于状态的条件可能会由于其他线程的操作而改变。
可阻塞的状态依赖操作
acquire lock on object state while(precondition does not hold){ release lock wait until precondition hold operation fail if interrupted or timeout expires reacquire lock } perform action reacquire lock
这种加锁模式有些不寻常,因为锁是在操作的执行过程中被释放与重新获取的。
构成前提条件的状态变量必须由对象的锁来保护,从而使它们在测试前提条件地同时保持不变。如果前提条件尚未满足,就必须释放锁,以便其他线程可以修改对象的状态,否则前提条件永远无法成真。而再次测试前提条件之前,必须重新获得锁。
- 提问:有界缓存的实现
/** * BaseBoundedBuffer实现了一个基于数组的循环缓存,其缓存变量buf,tail,head,count均由内置锁来保护。 * 提供同步的doput和dotake方法,在其子类中来实现put和take方法。底层状态对子类隐藏。 * * @param <V> */ public abstract class BaseBoundedBuffer<V> { private final V[] buf; private int tail; private int head; private int count; protected BaseBoundedBuffer(int capacity){ this.buf = (V[]) new Object[capacity]; } protected synchronized final void doput(V v){ buf[tail] = v; if(++tail == buf.length){ tail = 0; } ++count; } protected synchronized final V dotake(){ V v = buf[head]; buf[head] = null; if(++head == buf.length){ head = 0; } --count; return v; } public synchronized final boolean isFull(){ return count == buf.length; } public synchronized final boolean isEmpty(){ return count == 0; } }
将前提条件的失败传递给调用者:
/** * 尽管实现很简单,但是实现缓存时得到的简化并不能抵消在使用时的复杂性。调用者必须做好捕获异常的准备,并且每次缓存操作时都需要重试。 * * @param <V> */ public class GrumpyBoundedBuffer<V> extends BaseBoundedBuffer<V>{ protected GrumpyBoundedBuffer(int capacity) { super(capacity); } public synchronized void put(V v) throws BufferFullException{ if(isFull()){ throw new BufferFullException(); } doput(v); } public synchronized V take() throws BufferFullException{ if(isEmpty()){ throw new BufferFullException(); } return dotake(); } }
/** * 客户端代码: * 调用者可以不进入休眠状态,而直接调用take方法,这种方法被称为忙等待或自旋等待。 * 如果缓存状态在很长一段时间内都不会发生变化,那么这种方式将会消耗大量的CPU时间。 * 调用者也可以进入休眠状态来避免消耗过多的CPU时间,但如果缓存状态在刚调完sleep就立即发生变化,那么将不必要地休眠一段时间。 * 因此客户端代码必须在二者之间进行选择要么容忍自旋导致的CPU时钟浪费,要么容忍由于休眠而导致的低响应性。 * 另一种选择是Thread.yield,可能使整体的执行过程更快。 */ while(true){ try{ V item = buffer.take(); //... break; }catch(BufferEmptyException e){ Thread.sleep(SLEEP_TIME); } }
- 改进:通过轮询与休眠来实现简单的阻塞
/** * 调用者无须在每次调用时都实现重试逻辑,简化了对缓存的使用。 * 如果线程在休眠或者被阻塞时持有一个锁,通常是不好的做法,因为只要线程不释放这个锁,有些条件(缓存为满/空)就永远无法成真。 * SleepyBoundedBuffer要求调用者处理InterruptedException。当一个方法由于等待某个条件为真而阻塞时,需要提供一种取消机制。 * * @param <V> */ public class SleepyBoundedBuffer<V> extends BaseBoundedBuffer<V>{ protected SleepyBoundedBuffer(int capacity) { super(capacity); } public void put(V v) throws InterruptedException{ while(true){ synchronized(this){ if(!isFull()){ doput(v); return; } Thread.sleep(SLEEP_TIME); } } } public V take() throws InterruptedException{ while(true){ synchronized(this){ if(!isEmpty()){ return dotake(); } Thread.sleep(SLEEP_TIME); } } } }
条件队列
它使得一组线程能够通过某种方法来等待特定的条件变成真。队列中的元素是一个个正在等待相关条件地线程。
正如每个java对象都可以作为一个锁,每个对象同样可以作为一个条件队列,并且Object中的wait,notify,notifyAll方法就构成了内部条件队列的API。
Object.wait会自动释放锁,并请求操作系统挂起当前线程,从而使其他线程能够获得这个锁并修改对象的状态。当挂起的线程醒来时,他将在返回前重新获得锁。
- 改进:条件队列实现
/** * 简单易用,且实现了明晰的状态依赖性管理 * * @param <V> */ public class BoundedBuffer<V> extends BaseBoundedBuffer<V> { protected BoundedBuffer(int capacity) { super(capacity); } public synchronized void put(V v) throws InterruptedException{ while(isFull()){ wait(); } doput(v); notifyAll(); } public synchronized V take() throws InterruptedException{ while(isEmpty()){ wait(); } V v = dotake(); notifyAll(); return v; } }
使用条件队列
在条件等待中存在一种重要的三元关系,包括加锁、wait方法和一个条件谓词。
在条件谓词中包含多个状态变量,而状态变量由一个锁来保护,因此在测试条件谓词之前必须先持有这个锁。
锁对象与条件队列对象(即调用wait和notify等方法所在的对象)必须是同一个对象。
synchronized(lock){ while(!condition){ lock.wait(); } dosomething(); }
- 改进:优化条件队列实现
/** * 上面BoundedBuffer的put和take采用的通知机制是保守的,每当将一个对象放入缓存或者从缓存中移走一个对象时,就执行一次通知。 * 可以进行优化,仅当从空变为非空,或从满变为非满时才发出通知 * * @param v * @throws InterruptedException */ public synchronized void put(V v) throws InterruptedException{ while(isFull()){ wait(); } boolean isEmpty = isEmpty(); doput(v); if(isEmpty){ notifyAll(); } }
阀门类
/** * ThreadGate可以打开和关闭阀门,并提供一个await方法,该方法能一直阻塞直到阀门打开。 * 在await中使用的条件谓词比测试isOpen复杂得多,这是必需的。 * 因为如果当阀门打开时有N个线程正在等待它,那么这些线程都应该允许被执行。 * 然而,如果阀门在打开后又非常快速的关闭了,并且await方法只检查isOpen,那么所有线程都可能无法释放。 * 因此,在ThreadGate中使用了一个更复杂的条件谓词:每次阀门打开时,递增一个generation计数器, * 如果阀门现在是打开的(isOpen=true),或者阀门自从该线程到达后就一直是打开的,那么线程都可以通过await * 即使打开后又快速关闭的关闭了(arrivalGeneration != generation),在打开之前被阻塞的线程也能通过。 */ public class ThreadGate { //条件谓词 isOpen || generation > n private boolean isOpen; private int generation; public synchronized void close(){ isOpen = false; } public synchronized void open(){ ++generation; isOpen = true; notifyAll(); } public synchronized void await() throws InterruptedException{ int arrivalGeneration = generation; while(!isOpen && arrivalGeneration == generation){ wait(); } } }
显示地Condition对象
public interface Condition{ void await() throws InterruptedException; boolean await(long time,TimeUnit unit) throws InterruptedException; long awaitNanos(long nanosTimeOut) throws InterruptedException; void awaitUninterruptibly(); boolean awaitUntil(Date deadline) throws InterruptedException; void signal(); void signalAll(); }
内置条件队列存在一些缺陷。每个内置锁都只能有一个想关联的条件队列,因而在像BoundedBuffer这种类中,多个线程可能在同一个条件队列上等待不同的条件谓词,并且在最常见的加锁模式下公开条件队列对象。这些因素都使得无法满足在使用notifyAll时所有等待线程为同一类型的需求。如果想编写一个带有多个条件谓词的并发对象,或者想获得除了条件队列可见性之外的更多控制权,就可以使用显示地Lock和Condition而不是内置锁和条件队列。
一个Condition和一个Lock关联在一起,就像一个条件队列和一个内置锁相关联一样。
要创建一个Condition,可以再相关联的Lock上调用Lock.newCondition方法。正如Lock比内置锁提供了更为丰富的功能,Condition同样比内置条件队列提供了更丰富的功能:在每个锁上可存在多个等待、条件等待可以是可中断或不可中断、基于时限的等待,以及公平的或非公平的队列操作。每个Lock可以有任意数量的Condition对象,Condition对象继承了Lock对象的公平性。
- 改进:使用显示条件变量的有界缓存
/** * 在分析使用多个Condition的类时,比分析一个使用单一内部队列加多个条件谓词的类简单得多。 * 通过将两个条件谓词分开并放到两个等待线程集中,Condition使其更容易满足单词通知的需求。 * signal比signalAll更高效,它能极大地减少在每次缓存操作中发生的上下文切换与锁请求的次数。 * 与内置锁和条件队列一样,当使用显示的Lock和Condition时,也必须满足锁、条件谓词和条件变量之间的三元关系。 * 在条件谓词中包含的变量必须由Lock来保护,并且在检查条件谓词以及调用await和signal时,必须持有Lock对象。 * * @param <T> */ public class ConditionBoundedBuffer<T> { protected final Lock lock = new ReentrantLock(); private final Condition notFull = lock.newCondition(); private final Condition notEmpty = lock.newCondition(); private final T[] items = (T[])new Object[10]; private int tail; private int head; private int count; //阻塞直到notFull public void put(T x) throws InterruptedException{ lock.lock(); try{ while(count == items.length){ notFull.wait(); } items[tail] = x; if(++tail == items.length){ tail = 0; } ++count; notEmpty.signal(); }finally{ lock.unlock(); } } //阻塞直到notEmpty public T take() throws InterruptedException{ lock.lock(); try{ while(count == 0){ notEmpty.await(); } T t = items[head]; items[head] = null; if(++head == items.length){ head = 0; } --count; notFull.signal(); return t; }finally{ lock.unlock(); } } }
Synchronizer
在ReentrantLock和Semaphore这两个接口之间存在许多共同点,这两个类都可以用做一个阀门,即每次只允许一定数量的线程通过,并当线程到达阀门时,可以通过或等待或取消。或许会认为Semaphore是基于ReentrantLock实现的,或者认为ReentrantLock实际上是带有一个许可的Semaphore。这些实现方式都是可行的。
- 示例:通过锁来实现计数信号量
public class SemaphoreOnLock { private final Lock lock = new ReentrantLock(); private final Condition permitAvailable = lock.newCondition(); private int permits; SemaphoreOnLock(int permits){ lock.lock(); try{ this.permits = permits; }finally{ lock.unlock(); } } public void acquire() throws InterruptedException{ lock.lock(); try{ while(permits <= 0){ permitAvailable.await(); } --permits; }finally{ lock.unlock(); } } public void release(){ lock.lock(); try{ ++permits; permitAvailable.signal(); }finally{ lock.unlock(); } } }
事实上,它们的实现都使用了一个共同的基类,AbstractQueuedSynchronizer,这个类也是其他许多同步类的基类。AQS是一个用于构建锁和同步器的框架,如ReentrantLock、Semaphore、CountDownLatch、ReentrantReadWriteLock、SynchronousQueue和FutureTask。
在SemaphoreOnLock中,获取许可的操作可能在两个时刻阻塞---当锁保护信号量状态时,以及当许可不可用时。在基于AQS构建的同步器中,只可能在一个时刻发生阻塞,从而降低上下文切换的开销,并提高吞吐量。AQS在设计时充分考虑了可伸缩性。
AbstractQueuedSynchronizer
在基于AQS构建的同步器类中,最基本的操作包括各种形式的获取操作和释放操作。获取操作时一种依赖状态的操作,并且通常会阻塞。
当使用锁或信号量时 ,获取操作的含义就很直观,即获取的是锁或者许可,并且调用者可能会一直等待直到同步器类处于可被获取的状态。
在使用CountDownLatch时,获取操作意味着等待并直到闭锁到达结束状态,在使用FutureTask时,意味着等待并直到任务已经完成。释放并不是一个可阻塞的操作,当执行释放操作时,所有在请求时被阻塞的线程都会开始执行。
如果一个类想成为状态依赖的类,那么它必须拥有一些状态。AQS负责管理同步器类中的状态,它管理了一个整数状态信息,可以通过getState,setState以及compareAndSetState等protected类型方法来进行操作。这个整数可以用于表示任意状态。
在ReentrantLock中,它用来表示所有者线程已经重复获取该锁的次数,Semaphore用它来表示剩余的许可数量,FutureTask用它来表示任务的状态。在同步器类中还可以自行管理一些额外的状态变量,如ReentrantLock保存了锁的当前所有者信息,这样就能区分某个获取操作是重入还是竞争的。
事实上,java.util.concurrent中的所有同步器类都没有直接扩展AQS,而是都将它们的相应功能委托给私有的AQS子类来实现。
AQS中获取操作和释放操作的标准形式
boolean acquire() throws InterruptedException{ while(当前状态不允许获取操作){ if(需要阻塞获取请求){ 如果当前线程不在队列中,则将其插入队列 阻塞当前线程 }else{ 返回失败 } } 可能更新同步器的状态 如果线程位于队列中,则将其移出队列 返回成功 } void release(){ 更新同步器的状态 if(新的状态允许某个被阻塞的线程获取成功){ 解除队列中一个或多个线程的阻塞状态 } }
- AQS示例:一个简单的闭锁
/** * 在OneShotLatch中,AQS状态用来表示闭锁状态---关闭(0)或者打开(1)。 * await方法调用AQS的acquireSharedInterruptibly,然后接着调用OneShotLatch中的tryAcquireShared。 * 在tryAcquireShared的实现中必须返回一个值来表示该获取操作能否执行。 * 如果之前已经打开了闭锁,那么tryAcquireShared将返回成功并允许线程通过,否则就会返回一个表示获取操作失败的值。 * acquireSharedInterruptibly处理失败的方式,是把这个线程放入等待线程队列中。 * 类似的,signal将调用releaseShared,接下来又会调用tryReleaseShared。 * 在tryReleaseShared中将无条件地把闭锁的状态设置为打开,表示该同步类处于完全释放的状态。 */ public class OneShotLatch { private final Sync sync = new Sync(); public void signal(){ sync.release(0); } public void await() throws InterruptedException{ sync.acquireSharedInterruptibly(0); } private class Sync extends AbstractQueuedSynchronizer{ protected int tryAcquireShared(int ignored){ return (getState() == 1) ? 1 : -1; } protected boolean tryReleaseShared(int ignored){ setState(1); return true; } } }
- AQS示例:java.util.concurrent中的ReentrantLock
/** * ReentrantLock只支持独占方式的获取操作,因此它实现了tryAcquire、tryRelease和isHeldExclusively。 * * ReentrantLock将同步状态用于保存锁获取操作的次数,并且维护一个owner变量来保存当前所有者线程的标识符, * 只有在当前线程刚刚获取到锁,或者正要释放锁的时候,才会修改这个变量。 * 在tryRelease中检查owner域,从而确保当前线程在执行unlock操作之前已经获取了锁 * 在tryAcquire中将使用这个域来区分获取操作是重入还是竞争的。 * * 当一个线程尝试获取锁时,tryAcquire将首先检查锁的状态。如果锁未被持有,那么它将尝试更新锁的状态以表示锁已经被持有。 * 由于状态可能在检查后被立即修改,因此tryAcquire使用compareAndSetState来更新状态。 * 如果锁状态表明它已经被持有,并且如果当前线程是锁的持有者,那么获取计数会递增 * 如果当前线程不是锁的拥有者,那么获取操作将失败。 * @param ignored * @return */ protected boolean tryAcquire(int ignored){ final Thread current = Thread.currentThread(); int c = getState(); if(c == 0){ if(compareAndSetState(0,1)){ owner = current; return true; } }else if(current == owner){ setState(c+1); return true; } return false; }
- AQS示例:java.util.concurrent中的Semaphore与CountDownLatch
/** * Semaphore将AQS的同步状态用于保存当前可用许可的数量。 * * tryAcquireShared方法首先计算剩余许可的数量,如果没有足够的许可,那么会返回一个值表示获取操作失败。 * 如果还有剩余的许可,那么tryAcquireShared会通过compareAndSetState方式来降低许可的计数。 * * 如果这个操作成功,那么将返回一个值表示获取操作的成功。 * 在返回值中还包含了表示其他共享获取操作能否成功的信息,如果成功,那么其他等待的线程同样会解除阻塞。 * 当没有足够的许可,或者当tryAcquireShared可以通过原子方式来更新许可计数以响应获取操作时,while循环将终止。 * * 虽然compareAndSetState的调用可能由于与另一个线程发生竞争而失败,并使其重新尝试, * 但在经过了一定次数的重试操作以后,在这两个结束条件中有一个会变为真。 * 同样,tryReleaseShared将增加许可计数,这可能会解除等待中线程的阻塞状态,并且不断地重试直到更新操作成功。 * tryReleaseShared的返回值表示在这次释放操作中解除了其他线程的阻塞。 * * CountDownLatch使用AQS的方式很相似,在同步状态中保存的是当前的计数值。 * countDown方法调用release,从而导致计数值递减,并且当计数值为零时,解除所有等待线程的阻塞。 * await调用acquire,当计数器为零时,acquire将立即返回,否则将阻塞。 * * @param acquires * @return */ protected int tryAcquireShared(int acquires){ while(true){ int available = getState(); int remaining = available - acquires; if(remaining < 0 || compareAndSetState(available,remaining)){ return remaining; } } } protected boolean tryReleaseShared(int release){ while(true){ int p = getState(); if(compareAndSetState(p,p + release)){ return true; } } }
FutureTask
Future.get()的语义非常类似于闭锁的语义-----如果发生了某个事件,那么线程就可以恢复执行,否则这些线程将停留在队列中并直到该事件发生。
在FutureTask中,AQS同步状态被用来保存任务的状态,如:正在执行、已完成或已取消。FutureTask还维护一些额外的状态变量,用来保存计算结果或者抛出异常。
此外,它还维护了一个引用,指向正在执行计算任务的线程,因为如果任务取消,该线程就会中断。
ReentrantReadWriteLock
ReadWritelock接口表示存在两个锁:一个读取锁和一个写入锁,但在基于AQS实现的ReentrantReadWriteLock中,单个AQS子类将同时管理读取加锁和写入加锁。
ReentrantReadWriteLock使用了一个16位的状态来表示写入锁的计数,并且使用了另一个16位的状态来表示读取锁的计数。在读取锁上的操作将使用共享的获取方法与释放方法,在写入锁上的操作将使用独占的获取方法与释放方法。
AQS在内部维护一个等待线程队列,其中记录了某个线程请求的是独占访问还是共享访问。在ReentrantReadWriteLock中,当锁可用时,如果位于队列头部的线程执行写入操作,那么线程就会得到这个锁,如果位于队列头部的线程执行读取访问,那么队列中在第一个写入线程之前的所有线程都将获得这个锁。
#笔记内容参考 《java并发编程实战》
原文地址:https://www.cnblogs.com/shanhm1991/p/9900155.html