再谈AbstractQueuedSynchronizer:基于AbstractQueuedSynchronizer的并发类实现

公平模式ReentrantLock实现原理

前面的文章研究了AbstractQueuedSynchronizer的独占锁和共享锁,有了前两篇文章的基础,就可以乘胜追击,看一下基于AbstractQueuedSynchronizer的并发类是如何实现的。

ReentrantLock显然是一种独占锁,首先是公平模式的ReentrantLock,Sync是ReentractLock中的基础类,继承自AbstractQueuedSynchronizer,看一下代码实现:

 1 abstract static class Sync extends AbstractQueuedSynchronizer {
 2     private static final long serialVersionUID = -5179523762034025860L;
 3
 4     /**
 5      * Performs {@link Lock#lock}. The main reason for subclassing
 6      * is to allow fast path for nonfair version.
 7      */
 8     abstract void lock();
 9
10     /**
11      * Performs non-fair tryLock.  tryAcquire is
12      * implemented in subclasses, but both need nonfair
13      * try for trylock method.
14      */
15     final boolean nonfairTryAcquire(int acquires) {
16         final Thread current = Thread.currentThread();
17         int c = getState();
18         if (c == 0) {
19             if (compareAndSetState(0, acquires)) {
20                 setExclusiveOwnerThread(current);
21                 return true;
22             }
23         }
24         else if (current == getExclusiveOwnerThread()) {
25             int nextc = c + acquires;
26             if (nextc < 0) // overflow
27                 throw new Error("Maximum lock count exceeded");
28             setState(nextc);
29             return true;
30         }
31         return false;
32     }
33
34     protected final boolean tryRelease(int releases) {
35         int c = getState() - releases;
36         if (Thread.currentThread() != getExclusiveOwnerThread())
37             throw new IllegalMonitorStateException();
38         boolean free = false;
39         if (c == 0) {
40             free = true;
41             setExclusiveOwnerThread(null);
42         }
43         setState(c);
44         return free;
45     }
46
47     protected final boolean isHeldExclusively() {
48         // While we must in general read state before owner,
49         // we don‘t need to do so to check if current thread is owner
50         return getExclusiveOwnerThread() == Thread.currentThread();
51     }
52
53     final ConditionObject newCondition() {
54         return new ConditionObject();
55     }
56
57     // Methods relayed from outer class
58
59     final Thread getOwner() {
60         return getState() == 0 ? null : getExclusiveOwnerThread();
61     }
62
63     final int getHoldCount() {
64         return isHeldExclusively() ? getState() : 0;
65     }
66
67     final boolean isLocked() {
68         return getState() != 0;
69     }
70
71     /**
72      * Reconstitutes this lock instance from a stream.
73      * @param s the stream
74      */
75     private void readObject(java.io.ObjectInputStream s)
76         throws java.io.IOException, ClassNotFoundException {
77         s.defaultReadObject();
78         setState(0); // reset to unlocked state
79     }
80 }

Sync属于一个公共类,它是抽象的说明Sync会被继承,简单整理一下Sync主要做了哪些事(因为Sync不是ReentrantLock公平锁的关键):

  1. 定义了一个lock方法让子类去实现,我们平时之所以能调用ReentrantLock的lock()方法,就是因为Sync定义了它
  2. 实现了非公平锁tryAcquira的方法
  3. 实现了tryRelease方法,比较简单,状态-1,独占锁的线程置空
  4. 实现了isHeldExclusively方法
  5. 定义了newCondition方法,让开发者可以利用Condition实现通知/等待

接着,看一下公平锁的实现,FairSync类,它继承自Sync:

 1 static final class FairSync extends Sync {
 2     private static final long serialVersionUID = -3000897897090466540L;
 3
 4     final void lock() {
 5         acquire(1);
 6     }
 7
 8     /**
 9      * Fair version of tryAcquire.  Don‘t grant access unless
10      * recursive call or no waiters or is first.
11      */
12     protected final boolean tryAcquire(int acquires) {
13         final Thread current = Thread.currentThread();
14         int c = getState();
15         if (c == 0) {
16             if (!hasQueuedPredecessors() &&
17                 compareAndSetState(0, acquires)) {
18                 setExclusiveOwnerThread(current);
19                 return true;
20             }
21         }
22         else if (current == getExclusiveOwnerThread()) {
23             int nextc = c + acquires;
24             if (nextc < 0)
25                 throw new Error("Maximum lock count exceeded");
26             setState(nextc);
27             return true;
28         }
29         return false;
30     }
31 }

整理一下要点:

  1. 每次acquire的时候,state+1,如果当前线程lock()之后又lock()了,state不断+1,相应的unlock()的时候state-1,直到将state减到0为之,说明当前线程释放完所有的状态,其它线程可以竞争
  2. state=0的时候,通过hasQueuedPredecessors方法做一次判断,hasQueuedPredecessors的实现为"h != t && ((s = h.next) == null || s.thread != Thread.currentThread());",其中h是head、t是tail,由于代码中对结果取反,因此取反之后的判断为"h == t || ((s = h.next) != null && s.thread == Thread.currentThread());",总结起来有两种情况可以通过!hasQueuedPredecessors()这个判断:
    1. h==t,h==t的情况为要么当前FIFO队列中没有任何数据,要么只构建出了一个head还没往后面连过任何一个Node,因此head就是tail
    2. (s = h.next) != null && s.thread == Thread.currentThread(),当前线程为正在等待的第一个Node中的线程  
  3. 如果没有线程比当前线程等待更久去执行acquire操作,那么通过CAS操作将state从0变为1的线程tryAcquire成功
  4. 没有tryAcquire成功的线程,按照tryAcquire的先后顺序,构建为一个FIFO队列,即第一个tryAcquire失败的排在head的后一位,第二个tryAcquire失败的排在head的后二位
  5. 当tryAcquire成功的线程release完毕,第一个tryAcquire失败的线程第一个尝试tryAcquire,这就是先到先得,典型的公平锁

非公平模式ReentrantLock实现原理

看完了公平模式ReentrantLock,接着我们看一下非公平模式ReentrantLock是如何实现的。NonfairSync类,同样是继承自Sync类,实现为:

 1 static final class NonfairSync extends Sync {
 2     private static final long serialVersionUID = 7316153563782823691L;
 3
 4     /**
 5      * Performs lock.  Try immediate barge, backing up to normal
 6      * acquire on failure.
 7      */
 8     final void lock() {
 9         if (compareAndSetState(0, 1))
10             setExclusiveOwnerThread(Thread.currentThread());
11         else
12             acquire(1);
13     }
14
15     protected final boolean tryAcquire(int acquires) {
16         return nonfairTryAcquire(acquires);
17     }
18 }

结合nonfairTryAcquire方法一起讲解,nonfairTryAcquire方法的实现为:

 1 final boolean nonfairTryAcquire(int acquires) {
 2     final Thread current = Thread.currentThread();
 3     int c = getState();
 4     if (c == 0) {
 5         if (compareAndSetState(0, acquires)) {
 6             setExclusiveOwnerThread(current);
 7             return true;
 8         }
 9     }
10     else if (current == getExclusiveOwnerThread()) {
11         int nextc = c + acquires;
12         if (nextc < 0) // overflow
13             throw new Error("Maximum lock count exceeded");
14         setState(nextc);
15         return true;
16     }
17     return false;
18 }

看到差别就在于非公平锁lock()的时候会先尝试通过CAS看看能不能把state从0变为1(即获取锁),如果可以的话,直接获取锁而不需要排队。举个实际例子就很好理解了:

  1. 线程1、线程2、线程3竞争锁,线程1竞争成功获取锁,线程2、线程3依次排队
  2. 线程1执行完毕,释放锁,state变为0,唤醒了第一个排队的线程2
  3. 此时线程4来尝试获取锁了,由于线程2被唤醒了,因此线程2与线程4竞争锁
  4. 线程4成功将state从0变为1,线程2竞争锁失败,继续park

看到整个过程中,后来的线程4反而比先来的线程2先获取锁,相当于是一种非公平的模式,

那为什么非公平锁效率会比公平锁效率高?上面第(3)步如果线程2和线程4不竞争锁就是答案。为什么这么说,后面的解释很重要,希望大家可以理解:

线程1是先将state设为0,再去唤醒线程2,这两个过程之间是有时间差的。

那么如果线程1将state设置为0的时候,线程4就通过CAS算法获取到了锁,且在线程1唤醒线程2之前就已经使用完毕锁,那么相当于线程2获取锁的时间并没有推迟,在线程1将state设置为0到线程1唤醒线程2的这段时间里,反而有线程4获取了锁执行了任务,这就增加了系统的吞吐量,相当于单位时间处理了更多的任务。

从这段解释我们也应该能看出来了,非公平锁比较适合加锁时间比较短的任务。这是因为加锁时间长,相当于线程2将state设为0并去唤醒线程2的这段时间,线程4无法完成释放锁,那么线程2被唤醒由于没法获取到锁,又被阻塞了,这种唤醒-阻塞的操作会引起线程的上下文切换,继而影响系统的性能。

Semaphore实现原理

Semaphore即信号量,用于控制代码块的并发数,将Semaphore的permits设置为1相当于就是synchronized或者ReentrantLock,Semaphore具体用法可见Java多线程19:多线程下的其他组件之CountDownLatch、Semaphore、Exchanger信号量允许多条线程获取锁,显然它的锁是一种共享锁,信号量也有公平模式与非公平模式,相信看懂了上面ReentrantLock的公平模式与非公平模式的朋友应该对Semaphore的公平模式与非公平模式理解起来会更快,这里就放在一起写了。

首先还是看一下Semaphore的基础设施,它和ReentrantLock一样,也有一个Sync:

 1 abstract static class Sync extends AbstractQueuedSynchronizer {
 2     private static final long serialVersionUID = 1192457210091910933L;
 3
 4     Sync(int permits) {
 5         setState(permits);
 6     }
 7
 8     final int getPermits() {
 9         return getState();
10     }
11
12     final int nonfairTryAcquireShared(int acquires) {
13         for (;;) {
14             int available = getState();
15             int remaining = available - acquires;
16             if (remaining < 0 ||
17                 compareAndSetState(available, remaining))
18                 return remaining;
19         }
20     }
21
22     protected final boolean tryReleaseShared(int releases) {
23         for (;;) {
24             int current = getState();
25             int next = current + releases;
26             if (next < current) // overflow
27                 throw new Error("Maximum permit count exceeded");
28             if (compareAndSetState(current, next))
29                 return true;
30         }
31     }
32
33     final void reducePermits(int reductions) {
34         for (;;) {
35             int current = getState();
36             int next = current - reductions;
37             if (next > current) // underflow
38                 throw new Error("Permit count underflow");
39             if (compareAndSetState(current, next))
40                 return;
41         }
42     }
43
44     final int drainPermits() {
45         for (;;) {
46             int current = getState();
47             if (current == 0 || compareAndSetState(current, 0))
48                 return current;
49         }
50     }
51 }

和ReentrantLock的Sync差不多,Semaphore的Sync定义了以下的一些主要内容:

  1. getPermits方法获取当前的许可剩余量还剩多少,即还有多少线程可以同时获得信号量
  2. 定义了非公平信号量获取共享锁的逻辑nonfairTryAcquireShared
  3. 定义了公平模式释放信号量的逻辑tryReleaseShared,相当于释放一次信号量,state就向上+1(信号量每次的获取与释放都是以1为单位的)

再看下公平信号量的实现,同样的FairSync,继承自Sync,代码为:

 1 static final class FairSync extends Sync {
 2     private static final long serialVersionUID = 2014338818796000944L;
 3
 4     FairSync(int permits) {
 5         super(permits);
 6     }
 7
 8     protected int tryAcquireShared(int acquires) {
 9         for (;;) {
10             if (hasQueuedPredecessors())
11                 return -1;
12             int available = getState();
13             int remaining = available - acquires;
14             if (remaining < 0 ||
15                 compareAndSetState(available, remaining))
16                 return remaining;
17         }
18     }
19 }

首先第10行的hasQueuedPredecessors方法,前面已经说过了,如果已经有了FIFO队列或者当前线程不是FIFO队列中在等待的第一条线程,返回-1,表示无法获取共享锁成功。

接着获取available,available就是state,用volatile修饰,所以线程中可以看到最新的state,信号量的acquires是1,每次获取信号量都对state-1,两种情况直接返回:

  1. remaining减完<0
  2. 通过cas设置成功

之后就是和之前说过的共享锁的逻辑了,如果返回的是一个<0的数字,那么构建FIFO队列,线程阻塞,直到前面的执行完才能唤醒后面的。

接着看一下非公平信号量的实现,NonfairSync继承Sync:

 1 static final class NonfairSync extends Sync {
 2     private static final long serialVersionUID = -2694183684443567898L;
 3
 4     NonfairSync(int permits) {
 5         super(permits);
 6     }
 7
 8     protected int tryAcquireShared(int acquires) {
 9         return nonfairTryAcquireShared(acquires);
10     }
11 }

nonfairTryAcquireShared在父类已经实现了,再贴一下代码:

1 final int nonfairTryAcquireShared(int acquires) {
2     for (;;) {
3         int available = getState();
4         int remaining = available - acquires;
5         if (remaining < 0 ||
6             compareAndSetState(available, remaining))
7             return remaining;
8     }
9 }

看到这里和公平Semaphore只有一点差别:不会前置进行一次hasQueuedPredecessors()判断。即当前有没有构建为一个FIFO队列,队列里面第一个等待的线程是不是自身都无所谓,对于非公平Semaphore都一样,反正线程调用Semaphore的acquire方法就将当前state-1,如果得到的remaining设置成功或者CAS操作成功就返回,这种操作没有遵循先到先得的原则,即非公平信号量。

至于非公平信号量对比公平信号量的优点,和ReentrantLock的非公平锁对比ReentrantLock的公平锁一样,就不说了。

CountDownLatch实现原理

CountDownLatch即计数器自减的一种闭锁,某线程阻塞,对一个计数器自减到0,此线程被唤醒,CountDownLatch具体用法可见Java多线程19:多线程下的其他组件之CountDownLatch、Semaphore、Exchanger

CountDownLatch是一种共享锁,通过await()方法与countDown()两个方法实现自身的功能,首先看一下await()方法的实现:

 1 public void await() throws InterruptedException {
 2     sync.acquireSharedInterruptibly(1);
 3 }

acquireSharedInterruptibly最终又回到tryAcquireShared方法上,直接贴整个Sync的代码实现:

 1 private static final class Sync extends AbstractQueuedSynchronizer {
 2     private static final long serialVersionUID = 4982264981922014374L;
 3
 4     Sync(int count) {
 5         setState(count);
 6     }
 7
 8     int getCount() {
 9         return getState();
10     }
11
12     protected int tryAcquireShared(int acquires) {
13         return (getState() == 0) ? 1 : -1;
14     }
15
16     protected boolean tryReleaseShared(int releases) {
17         // Decrement count; signal when transition to zero
18         for (;;) {
19             int c = getState();
20             if (c == 0)
21                 return false;
22             int nextc = c-1;
23             if (compareAndSetState(c, nextc))
24                 return nextc == 0;
25         }
26     }
27 }

其实看到tryAcquireShared方法,理解AbstractQueuedSynchronizer共享锁原理的,不用看countDown方法应该都能猜countDown方法是如何实现的。我这里总结一下:

  1. 传入一个count,state就等于count,await的时候判断是不是0,是0返回1表示成功,不是0返回-1表示失败,构建FIFO队列,head头只连接一个Node,Node中的线程就是调用CountDownLatch的await()方法的线程
  2. 每次countDown的时候对state-1,直到state减到0的时候才算tryReleaseShared成功,tryReleaseShared成功,唤醒被挂起的线程

为了验证(2),看一下上面Sync的tryReleaseShared方法就可以了,确实是这么实现的。

时间: 2024-09-30 16:14:39

再谈AbstractQueuedSynchronizer:基于AbstractQueuedSynchronizer的并发类实现的相关文章

再谈缓存穿透、缓存并发、热点缓存之最佳招式

一.前言 在之前的一篇缓存穿透.缓存并发.缓存失效之思路变迁文章中介绍了关于缓存穿透.并发的一些常用思路,但是个人感觉文章中没有明确一些思路的使用场景,本文继续将继续深化与大家共同探讨,同时也非常感谢这段时间给我提宝贵建议的朋友们. 说明:本文中提到的缓存可以理解为Redis. 二.缓存穿透与并发方案 相信不少朋友之前看过很多类似的文章,但是归根结底就是二个问题: 如何解决穿透 如何解决并发 当并发较高的时候,其实我是不建议使用缓存过期这个策略的,我更希望缓存一直存在,通过后台系统来更新缓存系统

再谈AbstractQueuedSynchronizer:独占模式

关于AbstractQueuedSynchronizer JDK1.5之后引入了并发包java.util.concurrent,大大提高了Java程序的并发性能.关于java.util.concurrent包我总结如下: AbstractQueuedSynchronizer是并发类诸如ReentrantLock.CountDownLatch.Semphore的核心 CAS算法是AbstractQueuedSynchronizer的核心 可以说AbstractQueuedSynchronizer是

第七章定制并发类

Java 7 并发编程实战手册目录 代码下载(https://github.com/Wang-Jun-Chao/java-concurrency) 第七章定制并发类 7.1简介 Java并发API提供了大量接口和类来实现并发应用程序.这些接口和类既包含了底层机制,如Thread类.Runnable接口或Callable接口.synchronized关键字,也包含了高层机制,如在Java 7中增加的Executor框架和Fork/Join框架.尽管如此,在开发应用程序时,仍会发现己有的Java类无

再谈消息队列技术

上周,我们举办了第二届技术沙龙,我这边主要演讲了消息队列技术的议题,现分享给大家: 在我们团队内部,随着消息应用中心(任务中心)的广泛应用,有时候我们感觉不到消息队列的存在,但这不影响消息队列在高可用.分布式.高并发架构下的核心地位. 消息队列都应用到了哪些实际的应用场景中? 一.再谈消息队列的应用场景 异步处理:例如短信通知.终端状态推送.App推送.用户注册等 数据同步:业务数据推送同步 重试补偿:记账失败重试 系统解耦:通讯上下行.终端异常监控.分布式事件中心 流量消峰:秒杀场景下的下单处

再谈 Go 语言在前端的应用前景

12 月 23 日,七牛云 CEO & ECUG 社区发起人许式伟先生在 ECUG Con 2018 现场为大家带来了主题为<再谈 Go 语言在前端的应用前景>的内容分享. 本文是对演讲内容的实录整理. 今年是举办 ECUG Con 的第 11 年,之前我谈的基本都是服务端的开发实践.从去年起我开始不谈后端而是谈前端.当然,去年我没有说为什么我会关注前端.今天再谈 Go 语言在前端的应用之前,我先简单聊一下思路脉络,为什么我今天会关注前端. 前端的演进 最早的 PC 时期,常见的设备主

再谈ORACLE CPROCD进程

罗列一下有关oprocd的知识点 oprocd是oracle在rac中引入用来fencing io的 在unix系统下,如果我们没有采用oracle之外的第三方集群软件,才会存在oprocd进程 在linux系统下,只有在10.2.0.4版本后,才会具有oprocd进程 在window下,不会存在oprocd 进程,但是会存在一个oraFenceService服务,用来实现相同的功能,该服务采用的技术是基于windows的,与oprocd不同 oprocd进程可以运行在两者模式下:fatal和n

Another Look at Events(再谈Events)

转载:http://www.qtcn.org/bbs/simple/?t31383.html Another Look at Events(再谈Events) 最近在学习Qt事件处理的时候发现一篇很不错的文章,是2004年季刊的一篇文章,网上有这篇文章的翻译版,但是感觉部分地方翻译的比较粗糙,不是很明确.索性重新翻译了一遍,并引用了原翻译版的一段译注.以下都是用自己能理解的方式来翻译的,由于水平有限,有很多不足的地方,希望大家指正. Another Look at Events (再谈Event

线程学习--(七)单例和多线程、同步类容器和并发类容器

一.同步类容器 同步类容器都是线程安全的,但在某些场景下可能需要加锁来保护复合操作.复合类操作如:迭代(反复访问元素,遍历完容器中的所有元素).跳转(根据指定的顺序找到当前元素的下一个元素).以及条件运算.这些复合操作在多线程并发的修改容器时,可能会表现出意外的行为,最经典的便是ConcurrentModificationException,原因是当容器迭代的过程中,被并发的修改了内容,这是由于早期迭代器设计的时候并没有考虑并发修改的问题. 同步类容器:如古老的Vector/HashTable.

基于Actor的并发方案

共享可变状态的问题 Actor模型 Actor系统 定义Actor 消息处理 副作用 非类型化 异步和非阻塞 创建一个Actor 发送消息 消息应答 问询Ask机制 有状态的Actor 小结 译者注: 本文原文标题:<The Neophyte's Guide to Scala Part 14: The Actor Approach to Concurrency>,作者:Daniel Westheide, 原文链接:http://danielwestheide.com/blog/2013/02/