AQS系列(三)- ReentrantReadWriteLock读写锁的加锁

前言

前两篇我们讲述了ReentrantLock的加锁释放锁过程,相对而言比较简单,本篇进入深水区,看看ReentrantReadWriteLock-读写锁的加锁过程是如何实现的,继续拜读老Lea凌厉的代码风。

一、读写锁的类图

读锁就是共享锁,而写锁是独占锁。读锁与写锁之间的互斥关系为:读读可同时执行(有条件的);读写与写写均互斥执行。注意此处读读可并行我用了有条件的并行,后文会对此做介绍。

继续奉上一张丑陋的类图:

可以看到ReentrantReadWriteLock维护了五个内部类,ReentrantReadWriteLock中存放了Sync、ReadLock、WriteLock三个成员变量,如下截图所示:

而ReadLock和WriteLock中又存放了Sync变量,截图如下所示,这样一组合,有了四种锁,公平读锁、公平写锁、非公平读锁、非公平写锁。对于公平与非公平的实现区别,我们上一篇已经做过讲解,本文将着重关注读锁和写锁的实现区别。

二、加锁源码

在前文中我们知道,ReentrantLock中用state来判断当前锁是否被占用,而读写锁ReentrantReadWriteLock中由于同时存在两种锁,所以老Lea用state的高16位来存放读锁的占用状态以及重入次数,低16位存放写锁的占用状态和重入次数。

1、读锁加锁,即共享锁加锁

1 public void lock() {
2             sync.acquireShared(1); // 获取共享锁方法
3         }

上述lock方法中调用的获取共享锁方法是在AbstractQueuedSynchronizer中实现的,代码如下:

1 public final void acquireShared(int arg) {
2         if (tryAcquireShared(arg) < 0)
3             doAcquireShared(arg);
4     }

可以看到获取共享锁分成了两步,第一步是尝试获取,如果获取不到再进入if里面执行doAcquireShared方法,下面分别追踪。

1)、tryAcquireShared方法

 1 protected final int tryAcquireShared(int unused) {
 2             Thread current = Thread.currentThread();
 3             int c = getState();
 4             // 1.有写锁占用并且不是当前线程,则直接返回获取失败
 5             if (exclusiveCount(c) != 0 &&
 6                 getExclusiveOwnerThread() != current)
 7                 return -1;
 8             // 执行到这里,有两种情况 没有写锁占用或者是当前线程
 9             int r = sharedCount(c); // 获取读锁次数
10             // 2、不应该阻塞则获取锁  @此方法有点意思,需着重讲解,作用:判断读锁是否需要阻塞
11             if (!readerShouldBlock() &&
12                 r < MAX_COUNT &&
13                 compareAndSetState(c, c + SHARED_UNIT)) {
14                 // 如果CAS成功,则将当前线程对应的计数+1
15                 if (r == 0) { // 如果读锁持有数为0,则说明当前线程是第一个reader,分别给firstReader和firstReaderHoldCount初始化
16                     firstReader = current;
17                     firstReaderHoldCount = 1;
18                 } else if (firstReader == current) { // 如果读锁持有数不为0且当前线程就是firstReader,那么直接给firstReaderHoldCount+1,表示读锁重入
19                     firstReaderHoldCount++;
20                 } else { // 其他情况,即当前线程不是firstReader且还有其他线程持有读锁,则要获取到当前线程对应的HoldCounter,然后给里面的计数+1
21                     HoldCounter rh = cachedHoldCounter;
22                     if (rh == null || rh.tid != getThreadId(current))
23                         cachedHoldCounter = rh = readHolds.get();
24                     else if (rh.count == 0)
25                         readHolds.set(rh);
26                     rh.count++;
27                 }
28                 return 1;
29             }
30             // 3、应该阻塞或者CAS失败则进入此方法获取锁
31             return fullTryAcquireShared(current);
32         }

结合上述代码中的注释,将逻辑分三部分,我们一步步分析此方法的逻辑。

首先第一步,判断如果有写锁并且当前线程不是写锁的线程,则直接退出获取读锁的尝试,因为读写是互斥的,退出此方法后就会进入doAcquireShared方法,后续逻辑见下面的2)。但此处还是要看一下写锁状态统计方法exclusiveCount和读锁状态统计方法sharedCount,方法源码如下截图所示:

可以看到,exclusiveCount方法是将c和独占掩码进行与操作,独占掩码EXCLUSIVE_MASK高16位均为0,低16位均为1,按位与计算之后就剩下c的低16位,这就是第二部分一开始说的低16位存放写锁重入次数;同理看sharedCount方法,将c有符号右移16位,这样移位之后低16位就是原来的高16位,即读锁的加锁次数。老Lea通过这两个方法实现了用一个int类型的state存放写锁读锁两个加锁次数的结果,是不是看起来就很高端!

然后看第二步,判断读不应该阻塞(即readerShouldBlock方法返回false)且读锁持有次数小于最大值且CAS成功,则进入方法中尝试获取读锁。先看看重点方法readerShouldBlock什么时候会返回false(不阻塞)什么时候返回true(阻塞)。此方法在非公平模式和公平模式中有不同的实现,公平模式代码:

1 final boolean readerShouldBlock() {
2             return hasQueuedPredecessors();
3         }

看到了一个熟悉的身影,hashQueuedPredecessors方法,这不就是在ReentrantLock中公平锁加锁时的方法么?详细可看我的AQS系列(一)中的讲解,总结一下就是该方法判断队列前面是否有在排队的非当前线程,意思就是按排队顺序获取锁,不要争抢。

非公平模式代码:

1 final boolean readerShouldBlock() {
2             return apparentlyFirstQueuedIsExclusive();
3         }
1 final boolean apparentlyFirstQueuedIsExclusive() {
2         Node h, s;
3         return (h = head) != null &&
4             (s = h.next)  != null &&
5             !s.isShared()         &&
6             s.thread != null;
7     }

在后面的方法中,返回了一个四个条件组成的布尔值,逻辑为头节点不为空并且头节点后的第一个节点不为空并且这个节点是独占的并且线程不为空,此时返回true即当前这个读操作应该阻塞,不让它获取到锁。那么问题来了,为什么要有这个逻辑?此处是为了避免一种异常情况的发生,如果后面有一个排队的写锁在等待获取锁,而这时有一个读锁正在执行中,若在读锁执行完之前又来了一个读锁,因为读锁与读锁不阻塞所以后来的的读锁又获取到了锁,这时在队列第一个位置排队的写锁仍然在傻傻的等着,没办法,谁让你没关系。就这样,如果一直有读锁在当前正在执行的读锁执行完之前进来获取读锁,那么后面的写锁就会一直傻等在那,永远都没法获取锁。所以Lea就设计了这个方法来避免这种情况的发生,即如果判断队列第一位排队的是写锁,那么后面的读锁就先等一等,等这个写锁执行完了你们再执行。这也就是我在文章的开始讲的-读读同时执行是有条件的,这个条件就是指这里。

看第二步之前要先说说读锁的处理逻辑,因为是可重入的读锁,所以需要记录每个获取读锁线程的重入次数,即每个读的线程都有一个与其对应的重入次数。然后继续看第二步中读锁获取锁成功(即CAS成功)之后的逻辑:如果读锁持有数为0,则说明当前线程是第一个reader,分别给firstReader和firstReaderHoldCount初始化;如果读锁持有数不为0且当前线程就是firstReader,那么直接给firstReaderHoldCount+1,表示读锁重入;否则,即当前线程不是firstReader且还有其他线程持有读锁,则要获取到当前线程对应的HoldCounter,然后给里面的计数+1。

下面再一起看看【否则】中的逻辑,粘贴一下Sync中的部分代码

 1 abstract static class Sync extends AbstractQueuedSynchronizer {
 2        // ...
 3        static final class HoldCounter {
 4             int count = 0;
 5             // Use id, not reference, to avoid garbage retention
 6             final long tid = getThreadId(Thread.currentThread());
 7         }
 8
 9         static final class ThreadLocalHoldCounter
10             extends ThreadLocal<HoldCounter> {
11             public HoldCounter initialValue() {
12                 return new HoldCounter();
13             }
14         }
15
16         private transient ThreadLocalHoldCounter readHolds;
17
18         private transient HoldCounter cachedHoldCounter;
19
20         private transient Thread firstReader = null;
21         private transient int firstReaderHoldCount;
22
23         Sync() {
24             readHolds = new ThreadLocalHoldCounter();
25             setState(getState()); // ensures visibility of readHolds
26         }
27         // ...
28 }

可以看到,Sync中缓存了一个HoldCounter,存放的是最近一次读锁记录。而如果当前线程不是最近一次记录的HoldCounter,则去readHolds中取,readHolds是ThreadLocalHoldCounter类型,在Sync的无参构造器中初始化,它与HoldCounter都是Sync的内部类,ThreadLocalHoldCounter就是一个ThreadLocal,内部维护了一个线程与HoldCounter的键值对map,一个线程对应一个HoldCounter。所以【否则】中的逻辑加注释如下所示:

1                     HoldCounter rh = cachedHoldCounter; // 获取最近一次记录的HoldCounter,此缓存是为了提高效率,不用每次都去ThreadLocal中取
2                     if (rh == null || rh.tid != getThreadId(current)) // 判断当前线程是不是最近一次记录的HoldCounter
3                         cachedHoldCounter = rh = readHolds.get(); // 如果不是,则去Sync中的ThreadLocal中获取,然后再放在缓存中
4                     else if (rh.count == 0) // 如果count计数为0,说明是第一次重入,则将HoldCounter加入ThreadLocal中
5                         readHolds.set(rh);
6                     rh.count++; // 当前线程重入次数+1

下面进入第三步,fullTryAcquireShared方法,进入此方法的前提条件是没有写锁且 (读应该阻塞或者读锁CAS失败)。看这个full方法的逻辑:

 1 final int fullTryAcquireShared(Thread current) {
 2
 3             HoldCounter rh = null;
 4             for (;;) { // 无限循环直到有确定的结果返回
 5                 int c = getState();
 6                 if (exclusiveCount(c) != 0) { // 1、有独占锁且不是当前线程,直接返回读锁加锁失败
 7                     if (getExclusiveOwnerThread() != current)
 8                         return -1;
 9                     // else we hold the exclusive lock; blocking here
10                     // would cause deadlock.
11                 } else if (readerShouldBlock()) { // 2、判断读是否应该阻塞
12                     // Make sure we‘re not acquiring read lock reentrantly
13                     if (firstReader == current) { // 判断如果当前线程就是firstReader,那么什么都不做,进入3中尝试获取锁,why? 因为这说明当前线程之前就持有了锁还没释放,所以可以继续获取
14                         // assert firstReaderHoldCount > 0;
15                     } else { // 2.5 此处逻辑需要仔细研读,乍看时看的一头雾水
16                         if (rh == null) { // 第一次进来时rh肯定==null
17                             rh = cachedHoldCounter;
18                             if (rh == null || rh.tid != getThreadId(current)) {
19                                 rh = readHolds.get();
20                                 if (rh.count == 0) // 如果当前线程没获取到过读锁,则从本地线程变量中移除HoldCounter,因为下一步就要判定它获取锁失败先不让它获取了
21                                     readHolds.remove();
22                             }
23                         }// 能走到这里,说明当前读锁应该阻塞且不是firstReader
24                         if (rh.count == 0) // 再加上当前线程没获取到过读锁,则先不让它尝试获取锁了,直接返回获取失败
25                             return -1;
26                     }
27                 }
28                 if (sharedCount(c) == MAX_COUNT)
29                     throw new Error("Maximum lock count exceeded");
30                 // 3、再次尝试获取锁
31                 if (compareAndSetState(c, c + SHARED_UNIT)) {
32                     if (sharedCount(c) == 0) {
33                         firstReader = current;
34                         firstReaderHoldCount = 1;
35                     } else if (firstReader == current) {
36                         firstReaderHoldCount++;
37                     } else {
38                         if (rh == null)
39                             rh = cachedHoldCounter;
40                         if (rh == null || rh.tid != getThreadId(current))
41                             rh = readHolds.get();
42                         else if (rh.count == 0)
43                             readHolds.set(rh);
44                         rh.count++;
45                         cachedHoldCounter = rh; // cache for release
46                     }
47                     return 1;
48                 }
49             }
50         }

详细看看注解以及源代码注释、代码逻辑,相信能理解这个过程。

 2)、doAcquireShared方法

 1 private void doAcquireShared(int arg) {
 2         // 将当前读锁加到队列后面
 3         final Node node = addWaiter(Node.SHARED);
 4         boolean failed = true;
 5         try {
 6             boolean interrupted = false;
 7             for (;;) {
 8                 // 得到前一个节点
 9                 final Node p = node.predecessor();
10                 if (p == head) { // 如果前一个节点是头节点,则尝试获取锁
11                     int r = tryAcquireShared(arg);
12                     if (r >= 0) { // 设置头节点并且激活后续的节点
13                         setHeadAndPropagate(node, r);
14                         p.next = null; // help GC
15                         if (interrupted)
16                             selfInterrupt();
17                         failed = false;
18                         return;
19                     }
20                 }// 判断应该挂起则挂起线程
21                 if (shouldParkAfterFailedAcquire(p, node) &&
22                     parkAndCheckInterrupt())
23                     interrupted = true;
24             }
25         } finally {
26             if (failed)
27                 cancelAcquire(node);
28         }
29     }

该方法跟之前系列中ReentrantLock的加锁过程类似,在此就不做过多的解释了,总之还是通过park来挂起。

 2、写锁加锁,即独占锁加锁

进入lock方法:

1 public void lock() {
2             sync.acquire(1);
3         }

熟悉的样子,继续 点进去:

1 public final void acquire(int arg) {
2         if (!tryAcquire(arg) &&
3             acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
4             selfInterrupt();
5     }

还是原先的方法,但是各个方法的实现有区别了。先看第一个tryAcquire:

 1 protected final boolean tryAcquire(int acquires) {
 2             Thread current = Thread.currentThread();
 3             int c = getState();
 4             int w = exclusiveCount(c);
 5             if (c != 0) { // 如果排它锁存在,则判断是不是当前线程,如果也不是当前线程,则直接返回获取失败
 6                 // (Note: if c != 0 and w == 0 then shared count != 0)
 7                 if (w == 0 || current != getExclusiveOwnerThread())
 8                     return false;
 9                 if (w + exclusiveCount(acquires) > MAX_COUNT)
10                     throw new Error("Maximum lock count exceeded");
11                 // Reentrant acquire
12                 setState(c + acquires);
13                 return true;
14             } // 判断读锁要不要阻塞,此处针对公平锁和非公平锁有不同的实现,对于非公平锁统一返回false表示不要阻塞,而公平锁则会查看前面还有没有锁来判断要不要阻塞
15             if (writerShouldBlock() ||
16                 !compareAndSetState(c, c + acquires))
17                 return false;
18             setExclusiveOwnerThread(current);
19             return true;
20         }

然后是addWaiter在队列末尾添加node节点排队,这个方法在AbstractQueuedSynchronizer中,同样是熟悉的方法了,此处略过不提。

最后是acquireQueued方法,如下所示,又是熟悉的代码,跟ReentrantLock中的加锁方法一毛一样,唯一的不同点是第7行调用的tryAcquire方法的实现,此处调的是ReentrantReadWriteLock类中Sync的方法,也就是上面的第一个方法。

 1 final boolean acquireQueued(final Node node, int arg) {
 2         boolean failed = true;
 3         try {
 4             boolean interrupted = false;
 5             for (;;) {
 6                 final Node p = node.predecessor();
 7                 if (p == head && tryAcquire(arg)) {
 8                     setHead(node);
 9                     p.next = null; // help GC
10                     failed = false;
11                     return interrupted;
12                 }
13                 if (shouldParkAfterFailedAcquire(p, node) &&
14                     parkAndCheckInterrupt())
15                     interrupted = true;
16             }
17         } finally {
18             if (failed)
19                 cancelAcquire(node);
20         }
21     }

写锁的加锁过程基本就这些了,相对来说比读锁加锁容易了很多,因为大多都跟ReentrantLock中的实现相仿。

后记

读写锁的加锁过程到此为止,最近每晚下班回来读一会,断断续续的四晚上才搞定,难受 ><

原文地址:https://www.cnblogs.com/zzq6032010/p/12037854.html

时间: 2024-10-26 09:02:40

AQS系列(三)- ReentrantReadWriteLock读写锁的加锁的相关文章

JUC之ReadWriteLock、ReentrantReadWriteLock读写锁

读写锁简介 对共享资源有读和写的操作,且写操作没有读操作那么频繁.在没有写操作的时候,多个线程同时读一个资源没有任何问题,所以应该允许多个线程读取共享资源:但是如果一个线程想去写这些共享资源,就不应该允许其他线程对该资源进行读和写操作了. 读写锁ReentrantReadWriteLock,它表示两个锁,一个是读操作相关的锁,称为共享锁:一个是写相关的锁,称为排他锁,描述如下: 读锁的条件: 1. 没有其他线程的写锁: 2. 对写锁请求的线程必须是同一个. 写锁的条件: 1. 没有其他线程的读锁

ReentrantReadWriteLock读写锁的使用2

本文可作为传智播客<张孝祥-Java多线程与并发库高级应用>的学习笔记. 这一节我们做一个缓存系统. 在读本节前 请先阅读 ReentrantReadWriteLock读写锁的使用1 第一版 public class CacheDemo { private Map<String, Object> cache = new HashMap<String, Object>(); public static void main(String[] args) { CacheDem

Linux环境编程之同步(三):读写锁

概述 互斥锁把试图进入我们称之为临界区的所有其他线程都阻塞住.该临界区通常涉及对由这些线程共享一个或多个数据的访问或更新.读写锁在获取读写锁用于读某个数据和获取读写锁用于写直接作区别.读写锁的分配规则如下: 1.只要没有线程持有某个给定的读写锁用于写,那么任意数目的线程可以持有该读写锁用于读. 2.仅当没有线程持有某个给定的读写锁用于读或用于写时,才能分配该读写锁用于写. 即只要没有线程在修改某个给定的数据,那么任意数目的线程都可以拥有该数据的读访问权.仅当没有其他线程在读或修改某个给定的数据时

java中ReentrantReadWriteLock读写锁的使用

ReentrantReadWriteLock读写锁的使用 Lock比传统线程模型中的synchronized方式更加面向对象,与生活中的锁类似,锁本身也应该是一个对象.两个线程执行的代码片段要实现同步互斥的效果,它们必须用同一个Lock对象. 读写锁:分为读锁和写锁,多个读锁不互斥,读锁与写锁互斥,这是由jvm自己控制的,你只要上好相应的锁即可.如果你的代码只读数据,可以很多人同时读,但不能同时写,那就上读锁:如果你的代码修改数据,只能有一个人在写,且不能同时读取,那就上写锁.总之,读的时候上读

ReentrantReadWriteLock读写锁详解

一.读写锁简介 现实中有这样一种场景:对共享资源有读和写的操作,且写操作没有读操作那么频繁.在没有写操作的时候,多个线程同时读一个资源没有任何问题,所以应该允许多个线程同时读取共享资源:但是如果一个线程想去写这些共享资源,就不应该允许其他线程对该资源进行读和写的操作了. 针对这种场景,JAVA的并发包提供了读写锁ReentrantReadWriteLock,它表示两个锁,一个是读操作相关的锁,称为共享锁:一个是写相关的锁,称为排他锁,描述如下: 线程进入读锁的前提条件: 没有其他线程的写锁, 没

java并发锁ReentrantReadWriteLock读写锁源码分析

1.ReentrantReadWriterLock基础 所谓读写锁,是对访问资源共享锁和排斥锁,一般的重入性语义为 如果对资源加了写锁,其他线程无法再获得写锁与读锁,但是持有写锁的线程,可以对资源加读锁(锁降级):如果一个线程对资源加了读锁,其他线程可以继续加读锁. java.util.concurrent.locks中关于多写锁的接口:ReadWriteLock public interface ReadWriteLock { /** * Returns the lock used for r

AQS系列(一)- ReentrantLock的加锁

前言 AQS即AbstractQueuedSynchronizer,是JUC包中的一个核心抽象类,JUC包中的绝大多数功能都是直接或间接通过它来实现的.本文是AQS系列的第一篇,后面会持续更新多篇,争取将JUC包中AQS相关的常用功能讲清楚,一方面巩固自己的知识体系,一方面亦可与各位园友互相学习.寒冷的冬天,要用技术来温暖自己. 一.AQS与ReentrantLock的关系 先奉上一张自制的丑陋类图 从下往上看,ReentrantLock类内部有两个静态内部类FairSync和NonfairSy

ReentrantReadWriteLock读写锁

ReentrantLock实现了标准的互斥锁:一次最多只有一个线程能够持有相同ReentrantLock.但是互斥通常做为保护数据一致性的很强的加锁约束,因此,过分的限制了并发性.互斥是保守的加锁策略,避免了 "写/写"和"写/读"的重读,但是同样避开了"读/读"的重叠.在很多情况下,数据结构是"频繁被读取"的--它们是可变的,有时候会被改变,但多数访问只进行读操作.此时,如果能够放宽,允许多个读者同时访问数据结构就 非常好了

ReentrantReadWriteLock读写锁的使用

Lock比传统线程模型中的synchronized方式更加面向对象,与生活中的锁类似,锁本身也应该是一个对象.两个线程执行的代码片段要实现同步互斥的效果,它们必须用同一个Lock对象. 读写锁:分为读锁和写锁,多个读锁不互斥,读锁与写锁互斥,这是由jvm自己控制的,你只要上好相应的锁即可.如果你的代码只读数据,可以很多 人同时读,但不能同时写,那就上读锁:如果你的代码修改数据,只能有一个人在写,且不能同时读取,那就上写锁.总之,读的时候上读锁,写的时候上写锁! ReentrantReadWrit