读写锁简介
对共享资源有读和写的操作,且写操作没有读操作那么频繁。在没有写操作的时候,多个线程同时读一个资源没有任何问题,所以应该允许多个线程读取共享资源;但是如果一个线程想去写这些共享资源,就不应该允许其他线程对该资源进行读和写操作了。
读写锁ReentrantReadWriteLock,它表示两个锁,一个是读操作相关的锁,称为共享锁;一个是写相关的锁,称为排他锁,描述如下:
读锁的条件:
1. 没有其他线程的写锁;
2. 对写锁请求的线程必须是同一个。
写锁的条件:
1. 没有其他线程的读锁;
2. 没有其他线程的写锁。
读写锁的三个重要特性:
①. 公平选择权:支持非公平(默认)和公平的锁获取方式,非公平锁吞吐量由于公平锁。
②. 重进入:读锁和写锁都支持线程重进入。
③. 锁降级:遵循获取写锁、获取读锁、释放写锁的次序,写锁能够降级成为读锁。
源码解读
ReentrantReadWriteLock类的整体结构:
public class ReentrantReadWriteLock implements ReadWriteLock, java.io.Serializable { // 读锁 private final ReentrantReadWriteLock.ReadLock readerLock; // 写锁 private final ReentrantReadWriteLock.WriteLock writerLock; final Sync sync; // 使用默认(非公平)的排序属性创建一个新的 ReentrantReadWriteLock public ReentrantReadWriteLock() { this(false); } // 使用给定的公平策略创建一个新的 ReentrantReadWriteLock public ReentrantReadWriteLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); readerLock = new ReadLock(this); writerLock = new WriteLock(this); } // 返回用于写入操作的锁 public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; } // 返回用于读取操作的锁 public ReentrantReadWriteLock.ReadLock readLock() { return readerLock; } // 继承AQS abstract static class Sync extends AbstractQueuedSynchronizer {} // 非公平锁 static final class NonfairSync extends Sync {} // 公平锁 static final class FairSync extends Sync {} // 读锁 public static class ReadLock implements Lock, java.io.Serializable {} // 写锁 public static class WriteLock implements Lock, java.io.Serializable {} }
类的继承关系
public class ReentrantReadWriteLock implements ReadWriteLock, java.io.Serializable {}
ReentrantReadWriteLock实现了ReadWriteLock接口,ReadWriteLock接口定义了获取读锁和写锁的规范,具体需要实现类去实现;同时其还实现了Serializable接口,表示可以进行序列化。
类的内部类
ReentrantReadWriteLock有五个内部类,五个内部类之间也是相互关联的。
说明:如上图所示,Sync继承AQS、NonfairSync和FairSync继承自Sync类;ReadLock和WriteLock实现了Lock接口。
Sync类
(1)类的继承关系
abstract static class Sync extends AbstractQueuedSynchronizer {}
Sync抽象类继承自AQS抽象类,Sync类提供了对ReentrantReadWriteLock的支持
(2)类的构造器
Sync() { // 本地线程计数器 readHolds = new ThreadLocalHoldCounter(); // 设置AQS的状态 setState(getState()); // 确保readhold的可见性 }
(3)类的属性
abstract static class Sync extends AbstractQueuedSynchronizer { // 版本序列号 private static final long serialVersionUID = 6317671515068378041L; // 高16位为读锁,低16位为写锁 static final int SHARED_SHIFT = 16; // 读锁单位。SHARED_SHIFT * 2 static final int SHARED_UNIT = (1 << SHARED_SHIFT); // 读锁最大数量 static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1; // 写锁最大数量 static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1; // 本地线程计数器 private transient ThreadLocalHoldCounter readHolds; // 缓存的计数器 private transient HoldCounter cachedHoldCounter; // 第一个读线程 private transient Thread firstReader = null; // 第一个读线程的计数 private transient int firstReaderHoldCount; }
(4)内部类
// 计数器static final class HoldCounter { // 计数 int count = 0; // Use id, not reference, to avoid garbage retention // 获取当前线程的TID属性的值 final long tid = getThreadId(Thread.currentThread()); }// 本地线程计数器 static final class ThreadLocalHoldCounter extends ThreadLocal<HoldCounter> { // 重写初始化方法,在没有进行set的情况,获取的都是该HoldCounter值 public HoldCounter initialValue() { return new HoldCounter(); } }
HoldCounter主要有两个属性,count和tid,其中count表示某个读线程重入的次数,tid表示该线程的tid字段的值,该字段可以用来唯一标识一个线程。
ThreadLocalHoldCounter重写了ThreadLocal的initialValue方法,ThreadLocal类可以将线程与对象相关联。在没有进行set的情况下,get到的均是initialValue方法里面生成的那个HolderCounter对象。
(5)类的方法
// 返回读锁线程数量static int sharedCount(int c) { return c >>> SHARED_SHIFT; }// 返回写锁线程数量static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }说明:直接将state右移16位,就可以得到读锁的线程数量,因为state的高16位表示读锁,对应的第十六位表示写锁数量。
写锁的获取
protected final boolean tryAcquire(int acquires) { Thread current = Thread.currentThread(); //当前线程 int c = getState(); //获取状态 int w = exclusiveCount(c); //写线程数量 // 当同步状态state != 0,则已有线程获取读锁或写锁 if (c != 0) { // 如果写锁状态为0,说明读锁此时被占用 则返回false;如果写锁状态不为0,且写锁没有被当前线程持有 则返回false if (w == 0 || current != getExclusiveOwnerThread()) return false; if (w + exclusiveCount(acquires) > MAX_COUNT) //判断同一线程获取写锁是否超过最大次数(65535),也算可重入 throw new Error("Maximum lock count exceeded"); // 更新状态 setState(c + acquires); return true; } // 到这里说明c=0,读/写锁都没有被获取。 判断是否正在阻塞 或 CAS更新状态失败 if (writerShouldBlock() || !compareAndSetState(c, c + acquires)) return false; setExclusiveOwnerThread(current); //设置锁为当前线程所有 return true; }
获取写锁的步骤如下:
(1)首先获取c、w。c表示当前锁状态;w表示写线程数量。然后判断同步状态state是否为0。如果state!=0,说明已经有其他线程获取了读锁或写锁,执行(2);否则执行(5)。
(2)如果锁状态不为零(c != 0),而写锁的状态为0(w = 0),说明读锁此时被其他线程占用,所以当前线程不能获取写锁,自然返回false。或者锁状态不为零,而写锁的状态也不为0,但是获取写锁的线程不是当前线程,则当前线程也不能获取写锁。
(3)判断当前线程获取写锁是否超过最大次数,若超过,抛异常,反之更新同步状态(此时当前线程已获取写锁,更新是线程安全的),返回true。
(4)如果state为0,此时读锁或写锁都没有被获取,判断是否需要阻塞(公平和非公平方式实现不同),在非公平策略下总是不会被阻塞,在公平策略下会进行判断(判断同步队列中是否有等待时间更长的线程,若存在,则需要被阻塞,否则,无需阻塞),如果不需要阻塞,则CAS更新同步状态,若CAS成功则返回true,失败则说明锁被别的线程抢去了,返回false。如果需要阻塞则也返回false。
(5)成功获取写锁后,将当前线程设置为占有写锁的线程,返回true。
方法流程图:
写锁的释放
protected final boolean tryRelease(int releases) { // 锁不是当前线程持有者 if (!isHeldExclusively()) throw new IllegalMonitorStateException(); // 写锁的新线程数。如果重入了几次,就要执行几次释放 int nextc = getState() - releases; // 如果写(独占)模式重入数为0了,说明独占模式被释放 boolean free = exclusiveCount(nextc) == 0; if (free) setExclusiveOwnerThread(null); //写锁释放完成,设置锁的持有者为null setState(nextc); //更新重入数 return free; }
写锁释放过程:
1. 首先查看当前线程是否为写锁的持有者,如果不是抛出异常。
2. 然后检查释放后写锁的线程数是否为0,如果为0则表示写锁空闲了,释放锁资源将锁的持有线程设置为null,否则释放仅仅只是一次重入锁而已,并不能将写锁的线程清空。
说明:此方法用于释放写锁资源,首先会判断该线程是否为独占线程,若不为独占线程,则抛出异常,否则,计算释放资源后的写锁的数量,若为0,表示成功释放,资源不将被占用,否则,表示资源还被占用。
其方法流程图如下。
读锁的获取
// 从名称可见,读锁为共享锁,可被多个线程持有 protected final int tryAcquireShared(int unused) { // 当前线程 Thread current = Thread.currentThread(); // 获取状态 int c = getState(); // 如果写锁线程数 !=0,且独占锁不是当前线程 返回false。 因为存在锁降级 if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) return -1; // 读锁数量 int r = sharedCount(c); // 读锁是否被阻塞 && 线程数小于最大值 && CAS设置成功 if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) { // r == 0, 表示第一个读锁线程,首个读锁firstRead不会加入到readHolds中 if (r == 0) { //读锁数量0 firstReader = current; //设置第一个线程 firstReaderHoldCount = 1; //读锁占用资源数为1 } else if (firstReader == current) { //当前线程为第一个读线程,即线程重入 firstReaderHoldCount++; //占用资源数加1 } else { //读锁数量不为0,且不是当前线程 // 获取计数器 HoldCounter rh = cachedHoldCounter; //计数器为空 || 计数器tid不是当前线程的tid if (rh == null || rh.tid != getThreadId(current)) cachedHoldCounter = rh = readHolds.get(); //获取当前线程的计数器 else if (rh.count == 0) //计数为0 readHolds.set(rh); //加入readHolds中 rh.count++; } return 1; } //三个条件不满足(读线程是否应该被阻塞、小于最大值、比较设置成功)则会进行fullTryAcquireShared函数中,它用来保证相关操作可以成功。 return fullTryAcquireShared(current); }
fullTryAcquireShared()方法
final int fullTryAcquireShared(Thread current) { // HoldCounter rh = null; for (;;) { int c = getState(); //获取状态 //写线程数不为0 if (exclusiveCount(c) != 0) { // 不为当前线程 if (getExclusiveOwnerThread() != current) return -1; } else if (readerShouldBlock()) { // 写线程数量为0,且读线程被阻塞 // 当前线程是第一个度线程 if (firstReader == current) { // assert firstReaderHoldCount > 0; } else { //当前线程不是第一个读线程 if (rh == null) { //计数器为空 rh = cachedHoldCounter; // 计数器为空 或者 计数器的tid不为当前正在运行的线程的tid if (rh == null || rh.tid != getThreadId(current)) { rh = readHolds.get(); if (rh.count == 0) readHolds.remove(); } } if (rh.count == 0) return -1; } } if (sharedCount(c) == MAX_COUNT) // 读锁数为最大值,异常 throw new Error("Maximum lock count exceeded"); // CAS成功 if (compareAndSetState(c, c + SHARED_UNIT)) { // 读数量为0 if (sharedCount(c) == 0) { firstReader = current; // 设置第一个读线程 firstReaderHoldCount = 1; //读线程占用资源数 } else if (firstReader == current) { // 读线程重入 firstReaderHoldCount++; } else { //读锁数量不为0,并且不为当前线程 if (rh == null) rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) // 计数器为空 || 计数器的tid不为当前线程的tid rh = readHolds.get(); //获取当前线程的计数器 else if (rh.count == 0) //计数为0 readHolds.set(rh); //加入到readHolds中 rh.count++; cachedHoldCounter = rh; // cache for release } return 1; } } }
读写锁取锁过程:
1. 首先判断写锁是否为0,,且当前线程不占有独占锁(写锁),之间返回;
2. 否则,判断读线程是否被阻塞 && 读锁数小于最大值 && CAS成功,若当前没有读锁,则设置第一个读线程firstReader和firstReaderHoldCount;
3. 若当前线程线程为第一个读线程,则增加firstReaderHoldCount;
4. 否则,将设置当前线程对应的HoldCounter对象的值。
流程图:
注意:更新成功后会在firstReaderHoldCount中或readHolds(ThreadLocal类型的)的本线程副本中记录当前线程重入数(23行至43行代码),这是为了实现jdk1.6中加入的getReadHoldCount()方法的,这个方法能获取当前线程重入共享锁的次数(state中记录的是多个线程的总重入次数),加入了这个方法让代码复杂了不少,但是其原理还是很简单的:如果当前只有一个线程的话,还不需要动用ThreadLocal,直接往firstReaderHoldCount这个成员变量里存重入数,当有第二个线程来的时候,就要动用ThreadLocal变量readHolds了,每个线程拥有自己的副本,用来保存自己的重入数。
读锁的释放
protected final boolean tryReleaseShared(int unused) { Thread current = Thread.currentThread(); //当前线程 if (firstReader == current) { // 当前线程是否为第一个读线程 // assert firstReaderHoldCount > 0; if (firstReaderHoldCount == 1) //读线程占用资源数为1 firstReader = null; else //减少占用的资源 firstReaderHoldCount--; } else { // 当前线程不是第一个读线程 // 获取缓存的计数器 HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) //计数器为空 || 计数器的tid不为当前正在运行的线程的tid rh = readHolds.get(); // 获取当前线程的计数器 int count = rh.count; // 获取计数 if (count <= 1) { // 计数小于等于1 readHolds.remove(); //移除 if (count <= 0) //计数小于等于0,异常 throw unmatchedUnlockException(); } --rh.count; // 减少计数 } for (;;) { int c = getState(); // 获取状态 int nextc = c - SHARED_UNIT; if (compareAndSetState(c, nextc)) return nextc == 0; } }
读锁释放过程:
1. 首先判断当前线程是否为第一个读线程firstReader,若是,则判断第一个读线程占有的资源数firstReaderHoldCount是否为1,若是,则设置第一个读线程firstReader为空。
否则,将第一个读线程占有的资源数firstReaderHoldCount减1;
2. 若当前线程不是第一个读线程,那么首先会获取缓存计数器(上一个读锁线程对应的计数器 ),若计数器为空或者tid不等于当前线程的tid值,则获取当前线程的计数器.
如果计数器的计数count小于等于1,则移除当前线程对应的计数器;如果计数器的计数count小于等于0,则抛出异常,之后再减少计数即可。
流程图
在读锁的获取、释放过程中,总是会有一个对象存在着,同时该对象在获取线程获取读锁是+1,释放读锁时-1,该对象就是HoldCounter。
要明白HoldCounter就要先明白读锁。前面提过读锁的内在实现机制就是共享锁,对于共享锁其实我们可以稍微的认为它不是一个锁的概念,它更加像一个计数器的概念。
一次共享锁操作就相当于一次计数器的操作,获取共享锁计数器+1,释放共享锁计数器-1。只有当线程获取共享锁后才能对共享锁进行释放、重入操作。
所以HoldCounter的作用就是当前线程持有共享锁的数量,这个数量必须要与线程绑定在一起,否则操作其他线程锁就会抛出异常。
对于非公平/公平内部类、读/写锁内部类的方法,大多都会转到调用Sync内部类的方法。
除内部类的方法外的其他方法,都是一些基本信息:读线程数、写线程数、是否被加读/写锁等等,不是很难 结合源码自行查看。
图解重要函数及对象关系
AQS图解
读写锁的加锁解锁操作
从图中可见操作最终都是调用ReentrantReadWriteLock
类的内部类Sync
提供的方法。
AQS无锁状态
AQS写锁无等待状态
AQS写锁重入状态
AQS写锁等待状态
AQS读锁无等待状态(首节点)
AQS读锁重入状态(首节点)
AQS读锁无等待状态(非首节点)
AQS读锁等待状态(非首节点)
读锁获取添加等待队列 写锁获取添加等待队列
参考:https://segmentfault.com/a/1190000015768003
原文地址:https://www.cnblogs.com/FondWang/p/12112237.html