Semaphore是一个计数的信号量。从概念上来说,信号量维持一组许可(permits)。acquire方法在必须的时候都会阻塞直到有一个许可可用,然后就会拿走这个许可。release方法添加一个许可,会有可能释放一个阻塞中的获取者(acquirer)。然而,Semaphore没有使用真实的许可对象,只是保持一个可用计数并且采取相应的行为。
信号量一般用于限制可以访问一些(物理上或者逻辑上)的资源的并发线程数。
信号量初始化为1的时候,意味着它最多只有一个允许可用,这样就能作为互斥独占锁使用。这种更多地被称为二进制信号量(binary semaphore),因为它只有两个状态:一个许可可用,或者0个许可可用。当使用这种方式的时候,二进制信号量就有这样的属性(不像大部分锁的实现):锁可以被拥有者(就如信号量没有拥有者的概念)之外的另外线程释放。这种属性在某些特殊的上下文中很有用,例如死锁恢复。
类的构造函数可选地接受一个fairness参数。当设为false的时候,该类就不会保证线程获取许可的顺序。特别地,插队是允许的,也就是说,线程调用acquire方法可以在另外的等待线程之前分配许可————逻辑上新线程会把自己放在等待线程队列头。当fairness设为true,信号量就保证调用acquire方法的线程会以它们调用方法的处理顺序来获取许可(FIFO)。注意FIFO的顺序决定特指在这些方法的内部执行点。因此,有可能一条线程在另外一条线程之前调用了acquire方法,但实际顺序会在另外一条线程之后,同样也适用于函数返回的先后顺序。同样要注意非超时版本tryAcquire方法不会遵循fairness设置,会马上获取任何可用的许可。
一般来说,信号量用来控制资源访问的话,就应该被初始化为公平(fairness设为true),这样可以确保没有线程会在访问资源的时候饿坏。当使用在其他同步控制的情况下使用信号量,非公平的吞吐量优势一般优于公平的考虑。
其实总体上来看,fairness参数以及状态量的概念很接近AQS(AbstractQueuedSynchronizer)提供的功能,因此大家也应该猜到Semaphore的内部实现也是通过一个继承AQS的内部类实现接口功能。接下来我们仔细看看内部的实现。
具体实现
先来看看Semaphore的构造函数:
public Semaphore(int permits) { sync = new NonfairSync(permits); } public Semaphore(int permits, boolean fair) { sync = fair ? new FairSync(permits) : new NonfairSync(permits); }
可以看到构造函数与ReentrantLock实现类似,都是按照fair参数分配创建不同的锁类,再来看看Semaphore的acquire和release的接口实现
public void acquire() throws InterruptedException { sync.acquireSharedInterruptibly(1); } public void release() { sync.releaseShared(1); }
可以看到acquire和release的实现都是调用内部类Sync的方法实现,当然了,这些方法也就是AQS提供出来的获取和释放共享锁接口。接下来看看整个实现里最主要的内部类Sync的相关实现:
abstract static class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 1192457210091910933L; Sync(int permits) { setState(permits); } final int nonfairTryAcquireShared(int acquires) { for (;;) { int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } } protected final boolean tryReleaseShared(int releases) { for (;;) { int current = getState(); int next = current + releases; if (next < current) // overflow throw new Error("Maximum permit count exceeded"); if (compareAndSetState(current, next)) return true; } } //省略一些次要方法 } /** * 非公平版本 */ static final class NonfairSync extends Sync { private static final long serialVersionUID = -2694183684443567898L; NonfairSync(int permits) { super(permits); } protected int tryAcquireShared(int acquires) { return nonfairTryAcquireShared(acquires); } } /** * 公平版本 */ static final class FairSync extends Sync { private static final long serialVersionUID = 2014338818796000944L; FairSync(int permits) { super(permits); } protected int tryAcquireShared(int acquires) { for (;;) { if (hasQueuedPredecessors()) return -1; int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } } }
为了方便了解主要逻辑,Sync类省略掉一些次要的方法。非公平版本NonfairSync类和公平版本FairSync类都继承于Sync类,Sync类继承于AQS类,NonfairSync和FairSync类都有同样的tryReleaseShared实现,只不过在tryAcquireShared实现上有稍微不同。
先来看看非公平类NonfairSync实现。tryAcquireShared调用的是Sync类的nonfairTryAcquireShared方法,方法的实现相当简单,只是在循环内判断当前锁状态值减去请求值acquires后,如果remaining < 0(则表示此次acquire失败,直接返回负值remaining即可)或者remaining >=0 时,compareAndSetState成功(表示此次acquire成功,直接返回大于等于0的remaining即可),如果CAS失败,则继续循环重试,直到其中一种情况发生。
再来看看公平类FairSync的实现。tryAcquireShared直接被重写,与非公平类版本对比,增加了hasQueuedPredecessors的判断,该方法在AQS中表示是否有结点在当前的等待队列前排在自己前面,如果返回true,则表示当前线程需要进入等待队列,直接返回-1表示acquire失败。
tryReleaseShared的实现也很简单,也是一个循环里不断CAS把锁状态增加请求的releases即可。
Semaphore还有其它一些辅助方法,其实现也都是简单地调用内部类Sync的方法,这里便不再赘述。
总结
总体来看,AQS的锁状态值就等于Semaphore的许可量,acquire的实现就是把当前锁状态值,也就是许可量减去对应值,release的实现就是把锁状态值增加对应值即可。整个实现结构和ReentrantLock类似,但没有了重入的逻辑,而且实现更是相对简单,理解起来应该没有难度。
Semaphore实现Andoird版源码剖析