1、LockSupport
LockSupport类的核心方法其实就两个:park()和unark(),其中park()方法用来阻塞当前调用线程,unpark()方法用于唤醒指定线程
LockSupport类使用了一种名为Permit(许可)的概念来做到阻塞和唤醒线程的功能,可以把许可看成是一种(0,1)信号量(Semaphore),但与 Semaphore 不同的是,许可的累加上限是1。
初始时,permit为0,当调用unpark()方法时,线程的permit加1,当调用park()方法时,如果permit为0,则调用线程进入阻塞状态。
所以以下代码不会阻塞
// 初始信号量为0,调用unpark,信号量+1
LockSupport.unpark();
// 当前信号量为1,调用park,信号量-1
LockSupport.park();
// 以下代码可以继续执行
doSomething()
2、AQS
AbstractQueueSynchronizer是并发工具的核心,是一个抽象类,提供公平 / 非公平获取锁,获取可重入 / 不可重入锁,共享 / 排他等功能支持
AQS框架,分离了构建同步器时的一系列关注点,它的所有操作都围绕着资源——同步状态(synchronization state)来展开,并替用户解决了如下问题:
- 资源是可以被同时访问?还是在同一时间只能被一个线程访问?(共享/独占功能)
- 访问资源的线程如何进行并发管理?(等待队列)
- 如果线程等不及资源了,如何从等待队列退出?(超时/中断)
这其实是一种典型的模板方法设计模式:父类(AQS框架)定义好骨架和内部操作细节,具体规则由子类去实现。
什么是资源:
同步器 | 资源的定义 |
---|---|
ReentrantLock | 资源表示独占锁。State为0表示锁可用;为1表示被占用;为N表示重入的次数 |
CountDownLatch | 资源表示倒数计数器。State为0表示计数器归零,所有线程都可以访问资源;为N表示计数器未归零,所有线程都需要阻塞。 |
Semaphore | 资源表示信号量或者令牌。State≤0表示没有令牌可用,所有线程都需要阻塞;大于0表示由令牌可用,线程每获取一个令牌,State减1,线程没释放一个令牌,State加1。 |
ReentrantReadWriteLock | 资源表示共享的读锁和独占的写锁。state逻辑上被分成两个16位的unsigned short,分别记录读锁被多少线程使用和写锁被重入的次数。 |
AQS-API
共享和排他
钩子方法 | 描述 |
---|---|
tryAcquire | 排它获取(资源数) |
tryRelease | 排它释放(资源数) |
tryAcquireShared | 共享获取(资源数) |
tryReleaseShared | 共享获取(资源数) |
isHeldExclusively | 是否排它状态 |
- 支持中断超时
- 支持独占和共享
- 支持Condition条件等待
CAS操作方法
Java中CAS操作的实现都委托给一个名为UnSafe类
方法名 | 修饰符 | 描述 |
---|---|---|
compareAndSetState | protected final | CAS修改同步状态值 |
compareAndSetHead | private final | CAS修改等待队列的头指针 |
compareAndSetTail | private final | CAS修改等待队列的尾指针 |
compareAndSetWaitStatus | private static final | CAS修改结点的等待状态 |
compareAndSetNext | private static final | CAS修改结点的next指针 |
等待队列
方法名 | 修饰符 | 描述 |
---|---|---|
enq | private | 入队操作 |
addWaiter | private | 入队操作 |
setHead | private | 设置头结点 |
unparkSuccessor | private | 唤醒后继结点 |
doReleaseShared | private | 释放共享结点 |
setHeadAndPropagate | private | 设置头结点并传播唤醒 |
资源获取操作
方法名 | 修饰符 | 描述 |
---|---|---|
cancelAcquire | private | 取消获取资源 |
shouldParkAfterFailedAcquire | private static | 判断是否阻塞当前调用线程 |
acquireQueued | final | 尝试获取资源,获取失败尝试阻塞线程 |
doAcquireInterruptibly | private | 独占地获取资源(响应中断) |
doAcquireNanos | private | 独占地获取资源(限时等待) |
doAcquireShared | private | 共享地获取资源 |
doAcquireSharedInterruptibly | private | 共享地获取资源(响应中断) |
doAcquireSharedNanos | private | 共享地获取资源(限时等待) |
acquire | public final | 独占地获取资源 |
acquireInterruptibly | public final | 独占地获取资源(响应中断) |
acquireInterruptibly | public final | 独占地获取资源(限时等待) |
acquireShared | public final | 共享地获取资源 |
acquireSharedInterruptibly | public final | 共享地获取资源(响应中断) |
tryAcquireSharedNanos | public final | 共享地获取资源(限时等待) |
资源释放:
方法名 | 修饰符 | 描述 |
---|---|---|
release | public final | 释放独占资源 |
releaseShared | public final | 释放共享资源 |
3、等待队列
CLH队列中的结点是对线程的包装,结点一共有两种类型:独占(EXCLUSIVE)和共享(SHARED)。
每种类型的结点都有一些状态,其中独占结点使用其中的CANCELLED(1)、SIGNAL(-1)、CONDITION(-2),共享结点使用其中的CANCELLED(1)、SIGNAL(-1)、PROPAGATE(-3)。
结点状态 | 值 | 描述 |
---|---|---|
CANCELLED | 1 | 取消。表示后驱结点被中断或超时,需要移出队列 |
SIGNAL | -1 | 发信号。表示后驱结点被阻塞了(当前结点在入队后、阻塞前,应确保将其prev结点类型改为SIGNAL,以便prev结点取消或释放时将当前结点唤醒。) |
CONDITION | -2 | Condition专用。表示当前结点在Condition队列中,因为等待某个条件而被阻塞了 |
PROPAGATE | -3 | 传播。适用于共享模式(比如连续的读操作结点可以依次进入临界区,设为PROPAGATE有助于实现这种迭代操作。) |
INITIAL | 0 | 默认。新结点会处于这种状态 |
static final class Node {
// 共享模式结点
private static final Node SHARED = new Node();
// 独占模式结点
private?static final Node EXCLUSIVE = null;
private?static final int CANCELLED = 1;
private?static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;
// 等待状态
volatile int waitStatus;
// 前驱指针
volatile Node prev;
// 后驱指针
volatile Node next;
// 结点所包装的线程
volatile Thread thread;
// Condition队列使用,存储condition队列中的后继节点
Node nextWaiter;
Node() {
}
Node(Thread thread, Node mode) {
this.nextWaiter = mode;
this.thread = thread;
}
}
4、加锁
这里以ReentrantLock为例,看看 FairSync 和 NonfairSync 的源码
- FairSync
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897090466540L;
// 加锁
final void lock() {
acquire(1);
}
/**
* Fair version of tryAcquire. Don't grant access unless
* recursive call or no waiters or is first.
*/
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
public final boolean hasQueuedPredecessors() {
// The correctness of this depends on head being initialized
// before tail and on head.next being accurate if the current
// thread is first in queue.
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
// 如果头和尾指向了同一个对象(null)或者头节点下一个为当前节点时,说明队列没有节点,或仅有一个当前节点
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
- NonfairSync
/**
* Sync object for non-fair locks
*/
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
/**
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
*/
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
可以发现他们两个都继承自 Sync,Sync继承自AbstractQueuedSynchronizer,在ReentrantLock中的实现如下
- Sync
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = -5179523762034025860L;
/**
* Performs {@link Lock#lock}. The main reason for subclassing
* is to allow fast path for nonfair version.
*/
abstract void lock();
/**
* Performs non-fair tryLock. tryAcquire is implemented in
* subclasses, but both need nonfair try for trylock method.
*/
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
}
public void lock() {
sync.lock();
}
ReentrantLock的加锁直接委托了Sync的 lock,在Sync中,lock是个抽象方法,依次查看NonfairSync 和 FairSync 的实现,如上文源码注释
非公平锁:
1、sync 修改 state (compareAndSetState),尝试直接获取锁,获取成功,则设置排他属性。获取失败,则执行AQS获取锁逻辑
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
2、执行获取锁,tryAcquire (NonfairSync 和 FairSync都有各自的实现),获取失败,获取等待队列,将线程放入等待队列中
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
// 将当前线程包装为独占节点加入队列
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
3、NonfairSync 对 tryAcquire 的实现
- 获取state状态
- 如果当前线程持有锁,设置nextc, 添加acquires,并设置给state,可见Reentrant支持可重入锁
- 如果当前线程没有持有锁,CAS尝试获取锁,获取成功,设置排他性,否则获取锁失败,返回false
// 委托给父类
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
// 父类实现
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
// 获取state状态
int c = getState();
// 如果当前线程持有锁,设置nextc, 添加acquires,并设置给state,可见Reentrant支持可重入锁
// 如果当前线程没有持有锁,CAS尝试获取锁,获取成功,设置排他性,否则获取锁失败,返回false
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
} else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
3、获取锁失败的处理:acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
- 为当前线程创建排队节点(这是一个双向链表)
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
// 将当前节点加入到队列尾部(算是优化吧)
node.prev = pred;
// 设置尾节点为当前节点
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 如果尾为null, 先初始化队列
enq(node);
return node;
}
// node构造函数
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
// 自旋将节点加入尾部,包含队列初始化
private Node enq(final Node node) {
for (;;) {
Node t = tail;
// Must initialize
if (t == null) {
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
为已经在队列中的线程以独占不间断模式获取。 由条件等待方法使用以及获取
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
// 如果当前节点的前驱节点是头节点,当前线程再次获取锁,如果成功,进入if
if (p == head && tryAcquire(arg)) {
// 将当前节点设置为头节点,返回中断状态
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 尝试阻塞线程
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
interrupted = true;
}
} finally {
// 退出等待
if (failed)
cancelAcquire(node);
}
}
// 判断是否能阻塞当前线程
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
// 判断前驱节点状态
if (ws == Node.SIGNAL)
// SIGNAL:前驱节点释放锁时,会唤醒当前节点,可以阻塞
return true;
if (ws > 0) {
// CANCELED:前驱节点已中断/取消,需要从队列中移除
// 循环检查,剔除队列前面无效的节点
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
// 再将当前节点放入队列
pred.next = node;
} else {
// 将前驱节点修改为 SIGNAL, 自旋再次执行此方法时,将走第一条分支
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
// 阻塞当前线程
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
acquireQueued方法抛出异常时会执行 cancelAcquire
private void cancelAcquire(Node node) {
// Ignore if node doesn't exist
if (node == null)
return;
node.thread = null;
// Skip cancelled predecessors
// 再次剔除无效的前驱节点
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
// 前驱节点的原后继节点,用于后续CAS操作
Node predNext = pred.next;
// 当前节点设置为打断
node.waitStatus = Node.CANCELLED;
// 如果当前节点为尾节点,设置最后一个有效节点为尾节点
if (node == tail && compareAndSetTail(node, pred)) {
// 有效的前驱节点属性next设置为null
compareAndSetNext(pred, predNext, null);
} else {
// If successor needs signal, try to set pred's next-link
// so it will get one. Otherwise wake it up to propagate.
int ws;
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
// 唤醒后继节点
unparkSuccessor(node);
}
node.next = node; // help GC
}
}
5、解锁
public void unlock() {
sync.release(1);
}
public final boolean release(int arg) {
// 尝试解锁
if (tryRelease(arg)) {
Node h = head;
// 释放锁成功唤醒后继节点
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
释放锁,本质为维护state变量,此处支持可重入锁,如果state值为0,说明释放,取消线程的独占,并更新state
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
唤醒后继节点
private void unparkSuccessor(Node node) {
// 状态置为0,初始化
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
// 如果 s 为 null, 从后向前迭代,找到最前的未被CANCALLED的节点
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
// 唤醒后继节点
if (s != null)
LockSupport.unpark(s.thread);
}
后继节点被唤醒后,回到之前的逻辑,开始争夺锁,并将头节点设置为当前节点
6、其他特性
公平锁和非公平锁:
公平锁尝试加锁时,先判断队列中是否有等待线程,如果有,直接进队列
非公平锁直接获取锁,获取失败才进队列
中断特性:
如下代码中,使用了一个bool标记返回标识线程的中断状态,而中断锁会直接抛出异常
private void doAcquireInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);//以独占模式放入队列尾部
boolean failed = true;
try {
for (; ; ) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
// 中断问题,抛出异常;非中断方法中,返回中断的 bool 变量
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
限时等待:
public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquire(arg) || doAcquireNanos(arg, nanosTimeout);
}
具体实现:
private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException {
if (nanosTimeout <= 0L)
return false;
final long deadline = System.nanoTime() + nanosTimeout;
final Node node = addWaiter(Node.EXCLUSIVE);// 加入队列
boolean failed = true;
try {
for (; ; ) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return true;
}
// 自旋时更新剩余等待时间
nanosTimeout = deadline - System.nanoTime();
if (nanosTimeout <= 0L)
// 超时直接返回获取失败
return false;
if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > spinForTimeoutThreshold)
// 阻塞指定时长,超时则线程自动被唤醒,自旋时,将在上一个if块退出
// 底层通过 unsafe 实现
LockSupport.parkNanos(this, nanosTimeout);
if (Thread.interrupted())// 当前线程中断状态
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
7、条件队列
当线程在指定Condition对象上等待的时候,是将线程包装成结点,加入了条件队列,然后阻塞。当线程被通知唤醒时,则是将条件队列中的结点转换成等待队列中的结点,之后的处理就和独占功能完全一样。
J.U.C包提供了Conditon接口,用以对原生的Object.wait()
、Object.notify()
进行增强。
public Condition newCondition() {
return sync.newCondition();
}
final ConditionObject newCondition() {
return new ConditionObject();
}
在ReentrantLock中,通过内部ConditionObject实现了Condition接口,提供对条件队列的支持
public class ConditionObject implements Condition, java.io.Serializable {
private static final long serialVersionUID = 1173984872572414699L;
/**
* First node of condition queue.
*/
private transient Node firstWaiter;
/**
* Last node of condition queue.
*/
private transient Node lastWaiter;
/**
* Creates a new {@code ConditionObject} instance.
*/
public ConditionObject() {
}
...
}
条件队列操作:
1、加入条件队列等待,条件队列入口
public final void await() throws InterruptedException {
// 如果当前线程被中断则直接抛出异常
if (Thread.interrupted())
throw new InterruptedException();
// 把当前节点加入条件队列
Node node = addConditionWaiter();
// 释放掉已经获取的独占锁资源
int savedState = fullyRelease(node);
int interruptMode = 0;
// 如果不在同步队列中则不断挂起
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
// 这里被唤醒可能是正常的signal操作也可能是中断
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
/**
* 走到这里说明节点已经条件满足被加入到了同步队列中或者中断了
* 和独占锁调用同样的获取锁方法,从这里可以看出条件队列只能用于独占锁
* 在处理中断之前首先要做的是从同步队列中成功获取锁资源
*/
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
// 走到这里说明已经成功获取到了独占锁,接下来就做些收尾工作
// 删除条件队列中被取消的节点
// clean up if cancelled
if (node.nextWaiter != null)
unlinkCancelledWaiters();
// 根据不同模式处理中断
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
addConditionWaiter:将当前线程包装为节点加入条件队列
/**
* 1.与同步队列不同,条件队列头尾指针是firstWaiter跟lastWaiter
* 2.条件队列是在获取锁之后,也就是临界区进行操作,因此很多地方不用考虑并发
*/
private Node addConditionWaiter() {
Node t = lastWaiter;
//如果最后一个节点被取消,则删除队列中被取消的节点
//至于为啥是最后一个节点后面会分析
if (t != null && t.waitStatus != Node.CONDITION) {
//删除所有被取消的节点
unlinkCancelledWaiters();
t = lastWaiter;
}
//创建一个类型为CONDITION的节点并加入队列,由于在临界区,所以这里不用并发控制
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
删除条件队列当中被取消的节点
/**
* 删除条件队列当中被取消的节点
*/
private void unlinkCancelledWaiters() {
Node t = firstWaiter;
Node trail = null;
while (t != null) {
Node next = t.nextWaiter;
if (t.waitStatus != Node.CONDITION) {
t.nextWaiter = null;
// 判断中间变量
if (trail == null)
firstWaiter = next;
else
trail.nextWaiter = next;
if (next == null)
lastWaiter = trail;
} else
// 保存的是最靠前的有效条件节点
trail = t;
t = next;
}
}
释放所有资源
/**
* 入参就是新创建的节点,即当前节点
*/
final int fullyRelease(Node node) {
boolean failed = true;
try {
//这里这个取值要注意,获取当前的state并释放,这从另一个角度说明必须是独占锁
int savedState = getState();
if (release(savedState)) {
failed = false;
return savedState;
} else {
//如果这里释放失败,则抛出异常
throw new IllegalMonitorStateException();
}
} finally {
/**
* 如果释放锁失败,则把节点取消,由这里就能看出来上面添加节点的逻辑中
* 只需要判断最后一个节点是否被取消就可以了
*/
if (failed)
node.waitStatus = Node.CANCELLED;
}
}
判断节点是否在同步队列中
/**
* 判断节点是否在同步队列中
*/
final boolean isOnSyncQueue(Node node) {
//快速判断1:节点状态或者节点没有前置节点
//注:同步队列是有头节点的,而条件队列没有
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
//快速判断2:next字段只有同步队列才会使用,条件队列中使用的是nextWaiter字段
if (node.next != null) // If has successor, it must be on queue
return true;
//上面如果无法判断则进入复杂判断
return findNodeFromTail(node);
}
2、唤醒等待队列
/**
* 通知条件队列当中节点到同步队列当中去排队
*/
public final void signal() {
// 节点不能已经持有独占锁
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
/**
* 发信号通知条件队列的节点准备到同步队列当中去排队
*/
doSignal(first);
}
排队过程
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) && (first = firstWaiter) != null);
}
transferForSignal方法会将CONDITON结点转换为初始结点,并插入【等待队列】
final boolean transferForSignal(Node node) {
// 尝试转化为初始节点
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
8、共享锁
AQS的共享功能,通过钩子方法tryAcquireShared暴露,与独占功能最主要的区别就是:
共享功能的结点,一旦被唤醒,会向队列后部传播(Propagate)状态,以实现共享结点的连续唤醒。这也是共享的含义,当锁被释放时,所有持有该锁的共享线程都会被唤醒,并从等待队列移除。
以CountDownLatch为例:CountDownLatch内部继承了AQS,覆盖了共享获取和释放锁的方法
构造:
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;
Sync(int count) {
setState(count);
}
int getCount() {
return getState();
}
protected int tryAcquireShared(int acquires)
// 只要 state == 0 就获取锁成功
return (getState() == 0) ? 1 : -1;
}
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}
private final Sync sync;
await():
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
// 响应中断
if (Thread.interrupted())
throw new InterruptedException();
// 尝试获取共享锁,取决于state变量是否为0(1:获取成功;-1:获取失败)
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
doAcquireSharedInterruptibly
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {
// 以共享节点形式加入等待队列
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
// 如果前驱节点为头节点
if (p == head) {
// 再次获取锁
int r = tryAcquireShared(arg);
// 获取成功设置当前节点为头节点
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
// 判断阻塞条件,响应中断
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
// 移除当前节点
if (failed)
cancelAcquire(node);
}
}
setHeadAndPropagate:
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head;
setHead(node);
// 判断头节点等待状态
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
doReleaseShared:
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
// 如果头节点等待状态为 SIGN, 设置头节点状态归0
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
// 传递唤醒后继节点
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
// 退出循环的条件是head并未改变
if (h == head) // loop if head changed
break;
}
}
countDown():
public void countDown() {
// 释放资源
sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
// 释放资源成功后执行
doReleaseShared();
return true;
}
return false;
}
原文地址:https://www.cnblogs.com/zuier/p/11388795.html