读写锁 ReentrantReadWriteLock

一、读写锁 ReadWriteLock概念特点
读写锁维护了一对相关的锁,一个用于只读操作,一个用于写入操作。只要没有writer,读取锁可以由多个reader线程同时保持。写入锁是独占的。

互斥锁【ReetrantLock】一次只允许一个线程访问共享数据,哪怕进行的是只读操作;读写锁【ReadWriteLock】允许对共享数据进行更高级别的并发访问:对于写操作,一次只有一个线程(write线程)可以修改共享数据,对于读操作,允许任意数量的线程同时进行读取。writer可以获取读取锁,但reader不能获取写入锁。写入锁降级为读取锁,实现方式是:先获取写入锁,然后获取读取锁,最后释放写入锁。但是,从读取锁升级到写入锁是不可能的。
读写锁的读取锁和写入锁都支持锁获取期间的中断。并且写入锁提供了一个 Condition 实现,对于写入锁来说,该实现的行为与 ReentrantLock.newCondition() 提供的 Condition 实现对 ReentrantLock 所做的行为相同。当然,此 Condition 只能用于写入锁。读取锁不支持 Condition,readLock().newCondition() 会抛出 UnsupportedOperationException。

二、实现原理及核心代码

读写锁也是基于AQS实现。

AQS以单个 int 类型的原子变量来表示其状态,定义了4个抽象方法( tryAcquire(int)、tryRelease(int)、tryAcquireShared(int)、tryReleaseShared(int),前两个方法用于独占/排他模式,后两个用于共享模式 )留给子类实现,用于自定义同步器的行为以实现特定的功能。

对于 ReentrantLock,它是可重入的独占锁,内部的 Sync 类实现了 tryAcquire(int)、tryRelease(int) 方法,并用状态的值来表示重入次数,加锁或重入锁时状态加 1,释放锁时状态减 1,状态值等于 0 表示锁空闲。

对于 CountDownLatch,它是一个关卡,在条件满足前阻塞所有等待线程,条件满足后允许所有线程通过。内部类 Sync 把状态初始化为大于 0 的某个值,当状态大于 0 时所有wait线程阻塞,每调用一次 countDown 方法就把状态值减 1,减为 0 时允许所有线程通过。利用了AQS的共享模式。

【AQS一个状态如何表示多个读锁与单个写锁呢,】一个状态是没法既表示读锁,又表示写锁的,不够用啊,那就辦成两份用了,客家话说一个饭粒咬成两半吃,状态的高位部分表示读锁,低位表示写锁,由于写锁只有一个,所以写锁的重入计数也解决了,这也会导致写锁可重入的次数减小。

【如何表示每个读锁、写锁的重入次数呢】由于读锁可以同时有多个,肯定不能再用辦成两份用的方法来处理了,但我们有 ThreadLocal,可以把线程重入读锁的次数作为值存在 ThreadLocal 里。AQS 的状态是32位(int 类型)的,辦成两份,读锁用高16位,表示持有读锁的线程数(sharedCount),写锁低16位,表示写锁的重入次数(exclusiveCount)。状态值为 0 表示锁空闲,sharedCount不为 0 表示分配了读锁,exclusiveCount 不为 0 表示分配了写锁,sharedCount和exclusiveCount 肯定不会同时不为 0。
【读、写锁的公平性如何实现】对于公平性的实现,可以通过AQS的等待队列和它的抽象方法来控制,在状态值的另一半里存储当前持有读锁的线程数。如果读线程申请读锁,当前写锁重入次数不为 0 时,则等待,否则可以马上分配;如果是写线程申请写锁,当前状态为 0 则可以马上分配,否则等待。

abstract static class Sync extends AbstractQueuedSynchronizer {

    //
     //
       static final int SHARED_SHIFT   = 16;

       // 由于读锁用高位部分,所以读锁个数加1,其实是状态值加 2^16
       static final int SHARED_UNIT    = (1 << SHARED_SHIFT);

       // 写锁的可重入的最大次数、读锁允许的最大数量
       static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;

       // 写锁的掩码,用于状态的低16位有效值
       static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;

       // 读锁计数,当前持有读锁的线程数
    static int sharedCount(int c)    { return c >>> SHARED_SHIFT; }

    // 写锁的计数,也就是它的重入次数
    static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
/**
     * 每个线程特定的 read 持有计数。存放在ThreadLocal,不需要是线程安全的。
     */
    static final class HoldCounter {
        int count = 0;

        // 使用id而不是引用是为了避免保留垃圾。注意这是个常量。
        final long tid = Thread.currentThread().getId();
    }

    /**
     * 采用继承是为了重写 initialValue 方法,这样就不用进行这样的处理:
     * 如果ThreadLocal没有当前线程的计数,则new一个,再放进ThreadLocal里。
     * 可以直接调用 get。
     * */
    static final class ThreadLocalHoldCounter
        extends ThreadLocal<HoldCounter> {
        public HoldCounter initialValue() {
            return new HoldCounter();
        }
    }

    /**
     * 保存当前线程重入读锁的次数的容器。在读锁重入次数为 0 时移除。
     */
    private transient ThreadLocalHoldCounter readHolds;

    /**
     * 最近一个成功获取读锁的线程的计数。这省却了ThreadLocal查找,
     * 通常情况下,下一个释放线程是最后一个获取线程。这不是 volatile 的,
     * 因为它仅用于试探的,线程进行缓存也是可以的
     * (因为判断是否是当前线程是通过线程id来比较的)。
     */
    private transient HoldCounter cachedHoldCounter;

    /**
     * firstReader是这样一个特殊线程:它是最后一个把 共享计数 从 0 改为 1 的
     * (在锁空闲的时候),而且从那之后还没有释放读锁的。如果不存在则为null。
     * firstReaderHoldCount 是 firstReader 的重入计数。
     *
     * firstReader 不能导致保留垃圾,因此在 tryReleaseShared 里设置为null,
     * 除非线程异常终止,没有释放读锁。
     *
     * 作用是在跟踪无竞争的读锁计数时非常便宜。
     *
     * firstReader及其计数firstReaderHoldCount是不会放入 readHolds 的。
     */
    private transient Thread firstReader = null;
    private transient int firstReaderHoldCount;

    Sync() {
        readHolds = new ThreadLocalHoldCounter();
        setState(getState()); // 确保 readHolds 的内存可见性,利用 volatile 写的内存语义。
    }
}

//写锁的获取与释放
protected final boolean tryAcquire(int acquires) {
    Thread current = Thread.currentThread();
    int c = getState();
    int w = exclusiveCount(c);
    if (c != 0) { // 状态不为0,表示锁被分配出去了。

        // (Note: if c != 0 and w == 0 then shared count != 0)
      // c != 0 and w == 0 表示分配了读锁
      // w != 0 && current != getExclusiveOwnerThread() 表示其他线程获取了写锁。
        if (w == 0 || current != getExclusiveOwnerThread())
            return false ;

        // 写锁重入
        // 检测是否超过最大重入次数。
        if (w + exclusiveCount(acquires) > MAX_COUNT)
            throw new Error("Maximum lock count exceeded");

        // 更新写锁重入次数,写锁在低位,直接加上 acquire 即可。
        // Reentrant acquire
        setState(c + acquires);
        return true ;
    }

    // writerShouldBlock 留给子类实现,用于实现公平性策略。
    // 如果允许获取写锁,则用 CAS 更新状态。
    if (writerShouldBlock() ||
        !compareAndSetState(c, c + acquires))
        return false ; // 不允许获取锁 或 CAS 失败。

    // 获取写锁超过,设置独占线程。
    setExclusiveOwnerThread(current);
    return true;
}

protected final boolean tryRelease(int releases) {
    if (!isHeldExclusively()) // 是否是当前线程持有写锁
        throw new IllegalMonitorStateException();

    // 这里不考虑高16位是因为高16位肯定是 0。
    int nextc = getState() - releases;
    boolean free = exclusiveCount(nextc) == 0;
    if (free)
        setExclusiveOwnerThread( null); // 写锁完全释放,设置独占线程为null。
    setState(nextc);
    return free;
}

//读锁的获取与释放
// 参数变为 unused 是因为读锁的重入计数是内部维护的。
protected final int tryAcquireShared(int unused) {
    Thread current = Thread.currentThread();
    int c = getState();

    // 这个if语句是说:持有写锁的线程可以获取读锁。
    if (exclusiveCount(c) != 0 && // 已分配了写锁
        getExclusiveOwnerThread() != current) // 且当前线程不是持有写锁的线程
        return -1;

    int r = sharedCount(c); // 取读锁计数
    if (!readerShouldBlock() && // 由子类根据其公平策略决定是否允许获取读锁
        r < MAX_COUNT &&           // 读锁数量还没达到最大值

        // 尝试获取读锁。注意读线程计数的单位是  2^16
        compareAndSetState(c, c + SHARED_UNIT)) {
         // 成功获取读锁

     // 注意下面对firstReader的处理:firstReader是不会放到readHolds里的
     // 这样,在读锁只有一个的情况下,就避免了查找readHolds。
        if (r == 0) { // 是 firstReader,计数不会放入  readHolds。
            firstReader = current;
            firstReaderHoldCount = 1;
        } else if (firstReader == current) { // firstReader 重入
            firstReaderHoldCount++;
        } else {
             // 非 firstReader 读锁重入计数更新
            HoldCounter rh = cachedHoldCounter; // 首先访问缓存
            if (rh == null || rh.tid != current.getId())
                cachedHoldCounter = rh = readHolds.get();
            else if (rh.count == 0)
                readHolds.set(rh);
            rh.count++;
        }
        return 1;
    }
    // 获取读锁失败,放到循环里重试。
    return fullTryAcquireShared(current);
}

final int fullTryAcquireShared(Thread current) {
    HoldCounter rh = null;
    for (;;) {
        int c = getState();
        if (exclusiveCount(c) != 0) {
            if (getExclusiveOwnerThread() != current)
           // 写锁被分配,非写锁线程获取读锁,失败
                return -1;
            // 否则,当前线程持有写锁,在这里阻塞将导致死锁。

        } else if (readerShouldBlock()) {
            // 写锁空闲  且  公平策略决定 线程应当被阻塞
            // 下面的处理是说,如果是已获取读锁的线程重入读锁时,
            // 即使公平策略指示应当阻塞也不会阻塞。
            // 否则,这也会导致死锁的。
            if (firstReader == current) {
                // assert firstReaderHoldCount > 0;
            } else {
                if (rh == null) {
                    rh = cachedHoldCounter;
                    if (rh == null || rh.tid != current.getId()) {
                        rh = readHolds.get();
                        if (rh.count == 0)
                            readHolds.remove();
                    }
                }
                // 需要阻塞且是非重入(还未获取读锁的),获取失败。
                if (rh.count == 0)
                    return -1;
            }
        }

        // 写锁空闲  且  公平策略决定线程可以获取读锁
        if (sharedCount(c) == MAX_COUNT) // 读锁数量达到最多
            throw new Error( "Maximum lock count exceeded");
        if (compareAndSetState(c, c + SHARED_UNIT)) {
            // 申请读锁成功,下面的处理跟tryAcquireShared是类似的。

            if (sharedCount(c) == 0) {
                firstReader = current;
                firstReaderHoldCount = 1;
            } else if (firstReader == current) {
                firstReaderHoldCount++;
            } else {
           // 设定最后一次获取读锁的缓存
                if (rh == null)
                    rh = cachedHoldCounter;

                if (rh == null || rh.tid != current.getId())
                    rh = readHolds.get();
                else if (rh.count == 0)
                    readHolds.set(rh);
                rh.count++;

                cachedHoldCounter = rh; // 缓存起来用于释放
            }
            return 1;
        }
    }
}

protected final boolean tryReleaseShared(int unused) {
    Thread current = Thread.currentThread();
    // 清理firstReader缓存 或 readHolds里的重入计数
    if (firstReader == current) {
        // assert firstReaderHoldCount > 0;
        if (firstReaderHoldCount == 1)
            firstReader = null;
        else
            firstReaderHoldCount--;
    } else {
        HoldCounter rh = cachedHoldCounter;
        if (rh == null || rh.tid != current.getId())
            rh = readHolds.get();
        int count = rh.count;
        if (count <= 1) {
            // 完全释放读锁
            readHolds.remove();
            if (count <= 0)
                throw unmatchedUnlockException();
        }
        --rh.count; // 主要用于重入退出
    }
    // 循环在CAS更新状态值,主要是把读锁数量减 1
    for (;;) {
        int c = getState();
        int nextc = c - SHARED_UNIT;
        if (compareAndSetState(c, nextc))
            // 释放读锁对其他读线程没有任何影响,
            // 但可以允许等待的写线程继续,如果读锁、写锁都空闲。
            return nextc == 0;
    }
}

https://coderbee.net/index.php/concurrent/20140214/792/comment-page-1#comment-5314

Java 读写锁 ReentrantReadWriteLock 源码分析

原文地址:https://www.cnblogs.com/doit8791/p/9164151.html

时间: 2024-11-02 14:40:16

读写锁 ReentrantReadWriteLock的相关文章

[图解Java]读写锁ReentrantReadWriteLock

图解ReentrantReadWriteLock 如果之前使用过读写锁, 那么可以直接看本篇文章. 如果之前未使用过, 那么请配合我的另一篇文章一起看:[源码分析]读写锁ReentrantReadWriteLock 0. demo 我先给出一个demo, 这样大家就可以根据我给的这段代码, 边调试边看源码了. 还是那句话: 注意"My" , 我把ReentrantReadWriteLock类 改名为了 "MyReentrantReadWriteLock"类 , &q

被面试官吊打系列之JUC之 可重入读写锁ReentrantReadWriteLock 之 源码详尽分析

可重入读写锁 ReentrantReadWriteLock 其实基本上模拟了文件的读写锁操作.ReentrantReadWriteLock 和ReentrantLock 的差别还是蛮大的: 但是也有很多的相似之处: ReentrantReadWriteLock 的 writerLock 其实就是相当于ReentrantLock,但是它提供更多的细腻的控制:理解什么是读锁.写锁非常重要,虽然实际工作中区分读写锁这样的细分使用场景比较少. ReentrantReadWriteLock 把锁进行了细化

深入浅出 Java Concurrency (13): 锁机制 part 8 读写锁 (ReentrantReadWriteLock) (1)[转]

从这一节开始介绍锁里面的最后一个工具:读写锁(ReadWriteLock). ReentrantLock 实现了标准的互斥操作,也就是一次只能有一个线程持有锁,也即所谓独占锁的概念.前面的章节中一直在强调这个特点.显然这个特点在一定程度上面减低了吞吐量,实际上独占锁是一种保守的锁策略,在这种情况下任何“读/读”,“写/读”,“写/写”操作都不能同时发生.但是同样需要强调的一个概念是,锁是有一定的开销的,当并发比较大的时候,锁的开销就比较客观了.所以如果可能的话就尽量少用锁,非要用锁的话就尝试看能

深入浅出 Java Concurrency (14): 锁机制 part 9 读写锁 (ReentrantReadWriteLock) (2)[转]

这一节主要是谈谈读写锁的实现. 上一节中提到,ReadWriteLock看起来有两个锁:readLock/writeLock.如果真的是两个锁的话,它们之间又是如何相互影响的呢? 事实上在ReentrantReadWriteLock里锁的实现是靠java.util.concurrent.locks.ReentrantReadWriteLock.Sync完成的.这个类看起来比较眼熟,实际上它是AQS的一个子类,这中类似的结构在CountDownLatch.ReentrantLock.Semapho

多线程之使用读写锁ReentrantReadWriteLock实现缓存系统

简单地缓存系统:当有线程来取数据时,如果该数据存在我的内存中,我就返回数据:如果不存在我的缓存系统中,那么就去查数据库,返回数据的同时保存在我的缓存中. 其中涉及到读写问题:当多个线程执行读操作时(都加读锁),如果有数据返回:如果没有数据时,则让第一个读的线程,进行获取数据,然后进行写操作,这时需要第一个线程先释放掉读锁然后加写锁.第一个写完后,在家读锁,其他线程使用时判断,如果存在该数据,在直接过去读取不用加写锁. API上缓存例子如下: class CachedData { Object d

Java读写锁(ReentrantReadWriteLock)学习

什么是读写锁 平时,我们常见的synchronized和Reentrantlock基本上都是排他锁,这些锁在同一时刻只允许一个线程进行访问,哪怕是读操作.而读写锁是维护了一对锁(一个读锁和一个写锁),通过分离读锁和写锁,使得同一时刻可以允许多个读线程访问,但是在写线程进行访问时,所有的读线程和其他写线程均被阻塞. 读写锁的优点 1. 简化了读写交互场景编程的复杂度: 在常见的开发中,我们经常会定义一个共享的用作缓存的数据结构:比如一个大Map,缓存全部的城市Id和城市name对应关系.这个大Ma

深入浅出 Java Concurrency (13): 锁机制 part 8 读写锁 (ReentrantReadWriteLock) (1)

从这一节开始介绍锁里面的最后一个工具:读写锁(ReadWriteLock). ReentrantLock 实现了标准的互斥操作,也就是一次只能有一个线程持有锁,也即所谓独占锁的概念.前面的章节中一直在强调这个特点.显然这个特点在一定程度上面减低了吞吐量,实际上独占锁是一种保守的锁策略,在这种情况下任何"读/读","写/读","写/写"操作都不能同时发生.但是同样需要强调的一个概念是,锁是有一定的开销的,当并发比较大的时候,锁的开销就比较客观了.所

深入浅出 Java Concurrency (14): 锁机制 part 9 读写锁 (ReentrantReadWriteLock) (2)

这一节主要是谈谈读写锁的实现. 上一节中提到,ReadWriteLock看起来有两个锁:readLock/writeLock.如果真的是两个锁的话,它们之间又是如何相互影响的呢? 事实上在ReentrantReadWriteLock里锁的实现是靠java.util.concurrent.locks.ReentrantReadWriteLock.Sync完成的.这个类看起来比较眼熟,实际上它是AQS的一个子类,这中类似的结构在CountDownLatch.ReentrantLock.Semapho

[源码分析]读写锁ReentrantReadWriteLock

一.简介 读写锁. 读锁之间是共享的. 写锁是独占的. 首先声明一点: 我在分析源码的时候, 把jdk源码复制出来进行中文的注释, 有时还进行编译调试什么的, 为了避免和jdk原生的类混淆, 我在类前面加了"My". 比如把ReentrantLock改名为了MyReentrantLock, 在源码分析的章节里, 我基本不会对源码进行修改, 所以请忽视这个"My"即可. 1. ReentrantReadWriteLock类里的字段 unsafe在这里是用来给TID_O