简单看看读写锁ReentantReadWriteLock

  前面我们看了可重入锁ReentrantLock,其实这个锁只适用于写多读少的情况,就是多个线程去修改一个数据的时候,适合用这个锁,但是如果多个线程都去读一个数据,还用这个锁的话会降低效率,因为同一时刻只能是一个线程去读取!

  本次我们看看读写锁ReentantReadWriteLock,这个锁采用了读写分离的策略,分成了读锁和写锁,多个线程可以同时获取读锁;

一.简单使用读写锁

  啥也别问,问就是先会用了再说,还记得前面用ReentrantLock实现了一个线程安全的List吗?我们可以使用读写锁稍微改造一下就好了;

package com.example.demo.study;

import java.util.ArrayList;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

public class Study0204 {
    // 线程不安全的List
    private ArrayList<String> list = new ArrayList<String>();
    // 读写锁
    private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
    // 获取读锁
    private final Lock readLock = lock.readLock();
    // 获取写锁
    private final Lock writeLock = lock.writeLock();

    // 往集合中添加元素,写锁
    public void add(String str) {
        writeLock.lock();
        try {
            list.add(str);
        } finally {
            writeLock.unlock();
        }
    }

    // 删除集合中的元素,写锁
    public void remove(String str) {
        writeLock.lock();
        try {
            list.remove(str);
        } finally {
            writeLock.unlock();
        }
    }

    // 根据索引获取集合中某个元素,读锁
    public String get(int index) {
        readLock.lock();
        try {
            return list.get(index);
        } finally {
            readLock.unlock();
        }
    }

}

二.读写锁的结构

  这里最核心的还是用了AQS,可以看到里面还是有一个Sync这个内部工具类,然后还有两个内部工具类,一个是读锁ReadLock,一个是写锁WriteLock

  我们还能看到Sync这个类就是继承AQS,然后有NonfairSync和FairSync这两个类去继承Sync,到这里结构还是和ReentrantLock是一样的;

  我们再看看读锁和写锁,可以看出来就是实现了Lock这个接口,然后通过传进去的sync对象去实现Lock中的所有方法

  大概的结构就是这样的,我们可以使用下面这个图显示出来,其实ReentrantReadWriteLock最重要的就是三个类:

    一个是Sync工具类用于操作AQS阻塞队列和state的值,而且有基于Sync实现的公平策略和非公平策略;

    一个是写锁,实现了Lock接口,内部有个Sync字段,在Lock的实现方法中就是调用Sync对象的方法去实现的

    另外一个是读锁,和写锁一样,实现了Lock接口,内部有个Sync字段,在Lock的实现方法也都是调用Sync对象的方法实现

 二.分析Sync

  上篇博客中我们知道了在ReentrantLock中的state表示的是锁的可重入次数,而且state是AQS中定义的int类型,那么在读写锁这里有两个状态是怎么表示呢?

  总有一些人会想到一些花里胡哨的东西,还别说,挺管用的,由于state是int类型,共有32位,我们可以一分为二,前面的16位叫做高16位,表示获取读锁的次数,后面的叫做的低16位,表示写锁的可重入次数,具体的,我们可以看看Sync类的属性,主要是涉及到基本的二进制运算,有兴趣的可以研究一下;

abstract static class Sync extends AbstractQueuedSynchronizer {

    //这个可以说是读锁(共享锁)移动的位数
    static final int SHARED_SHIFT   = 16;
    //读锁状态单位值,这里就是将1有符号左移16位,1用二进制表示为:00000000 00000000 00000000 00000001
    //左移16位之后:00000000 00000001 00000000 00000000,也就是2的16次方,就是65536
    static final int SHARED_UNIT    = (1 << SHARED_SHIFT);
    //读锁的线程最大数65535个
    static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;

    //写锁(排它锁)掩码,这里用二进制表示 00000000 00000000 11111111 11111111
    static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;

    //返回读锁的个数,这里也就是将state无符号右移16位,那么有效数字肯定就是高16位,转成十进制后就是获取读锁的次数
    static int sharedCount(int c)    {
        return c >>> SHARED_SHIFT;
    }

    //返回写锁的个数,这里就是将state和上面的写锁掩码做按位与运算,高16位被置为0,有效数字位第16位,转成十进制就是写锁的可重入次数
    static int exclusiveCount(int c) {
        return c & EXCLUSIVE_MASK;
    }

    //省略很多代码
}

  其中,Sync中还有几个比较重要的属性如下,不懂不要紧,后面用到了再回头看看就好;

 //记录第一个获取读锁的线程
 private transient Thread firstReader = null;

 //记录第一个获取读锁的线程继续获取读锁的可重入次数
 private transient int firstReaderHoldCount;

 //记录最后一个获取读锁的线程获取读锁的可重入次数,HoldCounter类如下
 private transient HoldCounter cachedHoldCounter;

static final class HoldCounter {
    int count = 0;
    final long tid = getThreadId(Thread.currentThread());
}

//记录除去第一个获取读锁的线程外,获取的读锁的可重入次数,ThreadLocalHoldCounter类如下
private transient ThreadLocalHoldCounter readHolds;

static final class ThreadLocalHoldCounter extends ThreadLocal<HoldCounter> {
    public HoldCounter initialValue() {
        return new HoldCounter();
    }
}

三.写锁的获取和释放

  写锁在获取的时候有一个前提:没有其他线程持有写锁或者读锁,当前线程才能获取写锁,否则就把当前线程丢到阻塞队列里去了,记住,不能一边读还一边写

  1.lock方法:

  写锁主要是ReentantReadWriteLock中的内部类WriteLock类实现的,这是一个独占锁,同一时刻只能有一个线程可以获取该锁,时刻重入的;

public void lock() {
    sync.acquire(1);
}

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

protected final boolean tryAcquire(int acquires) {

    Thread current = Thread.currentThread();
    int c = getState();
    //这个方法的是实现static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; },就是将state的低十六位转为十进制,
    //也就是写锁的可重入次数
    int w = exclusiveCount(c);
    //state不为0,说明读锁或者写锁被占用了
    if (c != 0) {
        //如果w==0,而c!=0,说明c的高16为不为0,即有线程获取了读锁,此时写锁是不能获取的,注意,别人在读的时候是不能写入的呀!返回false
        //如果w!=0表示有线程获取了写锁,但是占用锁的线程不是当前线程,那么线程获取写锁失败,返回false
        if (w == 0 || current != getExclusiveOwnerThread())
            return false;
        //到这里说明写锁可以获取成功,那么就要判断写锁的可重入次数是否大于65535
        if (w + exclusiveCount(acquires) > MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        //将state加一
        setState(c + acquires);
        return true;
    }
    //能到这里来,说明c==0,也就是说读锁和写锁都在空闲着,下面我们要看看公平策略下和非公平下的writerShouldBlock实现
    if (writerShouldBlock() || !compareAndSetState(c, c + acquires))
        return false;
    setExclusiveOwnerThread(current);
    return true;
}

  我们要看看最后的那里的if语句,其中writerShouldBlock的实现,能到这里说明读锁和写锁都在空闲这,可以随时去获取;

  非公平策略下始终返回的是false,于是会走到compareAndSetState(c, c + acquires),这里是用CAS尝试获取写锁,获取失败的话就返回发了;获取成功的话就走到setExclusiveOwnerThread(current);设置占用读锁的线程是当前线程;

  公平策略下的话,这个方法前面好像说过,就是判断当前线程节点前面有没有前驱节点,如果有的话那就肯定获取失败啊,要让前驱节点先获取,于是在上面最后的if那里直接返回false;如果这里判断没有前驱节点,这里返回true,那么上面就会走到最后setExclusiveOwnerThread(current)设置当前线程占有写锁

  2.tryLock方法

  这个方法和上面的lock方法一样,注意,这里最后那里默认使用的是非公平模式;

public boolean tryLock( ) {
    return sync.tryWriteLock();
}

final boolean tryWriteLock() {
    Thread current = Thread.currentThread();
    int c = getState();
    if (c != 0) {
        int w = exclusiveCount(c);
        if (w == 0 || current != getExclusiveOwnerThread())
            return false;
        if (w == MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
    }
    //这里默认使用的是非公平模式
    if (!compareAndSetState(c, c + 1))
        return false;
    setExclusiveOwnerThread(current);
    return true;
}

  3.unlock方法

public void unlock() {
    sync.release(1);
}
//这个是AQS中的方法,说过tryRelease留给具体子类去实现的,重点看看怎么实现的
public final boolean release(int arg) {
if (tryRelease(arg)) {
    Node h = head;
    if (h != null && h.waitStatus != 0)
        unparkSuccessor(h);
    return true;
}
return false;
}

protected final boolean tryRelease(int releases) {
    //isHeldExclusively方法在下面,因为是当前线程调用的release方法,要判断当前线程是不是持有写锁的线程,不是的话就抛错了
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    //state减一
    int nextc = getState() - releases;
    //获取低十六位,看是不是等于0,如果等于0,说明此时没有线程占用写锁,于是就调用setExclusiveOwnerThread(null)
    //将占用写锁的线程设置为null,最后更新state就行了
    boolean free = exclusiveCount(nextc) == 0;
    if (free)
        setExclusiveOwnerThread(null);
    setState(nextc);
    return free;
}

protected final boolean isHeldExclusively() {
    return getExclusiveOwnerThread() == Thread.currentThread();
}

四.读锁的获取和释放

  结合前面的写锁一起说一下:

    (1).没有其他线程持有写锁或者读锁,当前线程才能获取写锁,否则就把当前线程丢到阻塞队列里去了;当前线程获取了写锁之后,其他线程不能获取写锁和读锁;

    (2)没有其他线程获取写锁时,当前线程才能获取读锁,否则就丢到阻塞队列里去了,不能 一边读一边写;当前线程获取了读锁之后,其他线程只能获取读锁,不能获取写锁;

    (3)当前线程获取的写锁之后,还能继续获取写锁,这叫做可重入;也可以继续获取读锁,这叫做锁降级;

    (4)当前线程获取的读锁之后,还能继续获取读锁;

  1.lock方法

public void lock() {
    //acquireShared方法在AQS中
    sync.acquireShared(1);
}

public final void acquireShared(int arg) {
    //tryAcquireShared实现在ReentrantReadWriteLock中的Sync中
    if (tryAcquireShared(arg) < 0)
        //这个方法在AQS中,主要是将当前线程放到阻塞队列中
        doAcquireShared(arg);
    }
protected final int tryAcquireShared(int unused) {

    Thread current = Thread.currentThread();
    int c = getState();
    //这里就是判断:如果其他线程获取了写锁,那么就返回-1
    //先判断写锁的可重入次数不为0,表示有线程占用写锁,而且还不是当前线程,那么直接返回-1
    //这里注意一下:一个线程在获取写锁之后,还可以再获取读锁,释放的时候两个所都要释放啊!!!
    if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current)
        return -1;
    //这个方法获取读锁的次数,读锁的次数最多只能是65535个
    int r = sharedCount(c);
    //readerShouldBlock方法分为公平策略和非公平策略,这个方法的意思就是:当前线程获取已经获取读锁,再读锁被阻塞了,那么说明还有其他线程正在获取写锁
    //如果返回false,说明此时没有线程获取写锁,而且这个方法分为公平策略和非公平策略
    //公平策略的话,如果当前线程节点有前驱节点就返回true,没有前驱节点返回false;
    //非公平策略的话,判断阻塞队列中哨兵节点后面的那个节点是不是正在获取写锁,是的话返回true,不是的话返回false
    //compareAndSetState(c, c + SHARED_UNIT)方法中,SHARED_UNIT表示65536,这个CAS表示对高16为增加1,对于整个32位来说,就是加2的16次方
    if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) {
        //r==0表示读锁空闲,于是记录第一个读锁,和第一个获取读锁的线程获取读锁的可重入次数
        if (r == 0) {
            firstReader = current;
            firstReaderHoldCount = 1;
        //如果当前线程就是第一个获取读锁的线程,再获取读锁,这里就将可重入次数加一即可
        } else if (firstReader == current) {
            firstReaderHoldCount++;
        } else {
            //能到这里就说明读锁已经被其他线程占用,当前线程是最后一个是最后获取读锁的线程,我们更新一下cacheHoldCounter和readHolds就行了
            //cacheHoldCounter表示最后一个获取读锁的线程获取读锁的可重入次数
            //readHolds记录了除了第一个获取读锁的线程外,其他线程获取读锁的可重入次数
            HoldCounter rh = cachedHoldCounter;
            if (rh == null || rh.tid != getThreadId(current))
                cachedHoldCounter = rh = readHolds.get();
            else if (rh.count == 0)
                readHolds.set(rh);
            rh.count++;
        }
        return 1;
    }
    //能到这里说明readerShouldBlock方法返回的是true,而且当前线程在之前已经获取了写锁,再获取读锁,就是锁降级!!!
    return fullTryAcquireShared(current);
}

//锁降级操作
final int fullTryAcquireShared(Thread current) {
    HoldCounter rh = null;
    for (;;) {
        int c = getState();
        //写锁被其他线程占用,就返回-1
        if (exclusiveCount(c) != 0) {
            if (getExclusiveOwnerThread() != current)
                return -1;
        //获取读锁被阻塞,此时还有其他线程在获取写锁,
        } else if (readerShouldBlock()) {
            if (firstReader == current) {
            }
            else {
                //有其他线程在尝试获取写锁,结束当前线程获取读锁,就更新一下readHolds就行了
                //就从readHolds中移除当前线程的持有数,然后返回-1,结束尝试获取锁步骤
                if (rh == null) {
                    rh = cachedHoldCounter;
                    if (rh == null || rh.tid != getThreadId(current)) {
                        rh = readHolds.get();
                        if (rh.count == 0)
                            readHolds.remove();
                    }
                }
                if (rh.count == 0)
                    return -1;
            }
        }
        //读锁数量达到了最大数量就抛错
        if (sharedCount(c) == MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        //CAS更新读锁的数量,然后更新一些变量
        if (compareAndSetState(c, c + SHARED_UNIT)) {
            //读锁数量为0,就让当前线程作为第一个获取读锁的线程
            if (sharedCount(c) == 0) {
                firstReader = current;
                firstReaderHoldCount = 1;
            //当前线程已经获取过读锁了,就把第一个获取读锁的可重入次数加一
            } else if (firstReader == current) {
                firstReaderHoldCount++;
            } else {
            //这里前面说过了
                if (rh == null)
                    rh = cachedHoldCounter;
                if (rh == null || rh.tid != getThreadId(current))
                    rh = readHolds.get();
                else if (rh.count == 0)
                    readHolds.set(rh);
                rh.count++;
                cachedHoldCounter = rh; // cache for release
            }
            return 1;
        }
    }
}

  2.tryLock方法

public boolean tryLock() {
    return sync.tryReadLock();
}

final boolean tryReadLock() {
    Thread current = Thread.currentThread();
    for (;;) {
        int c = getState();
        //如果当前写锁已经被占用,获取读锁失败
        if (exclusiveCount(c) != 0 &&
            getExclusiveOwnerThread() != current)
            return false;
        int r = sharedCount(c);
        //读锁不能超过最大数量
        if (r == MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        //更新state,将高16位加一,更新成功,如果读锁没有线程占有,就把当前线程更新为第一个获取读锁的线程和更新第一个获取读锁的可重入次数
        if (compareAndSetState(c, c + SHARED_UNIT)) {
            if (r == 0) {
                firstReader = current;
                firstReaderHoldCount = 1;
            //当前线程就是第一个获取读锁的线程,就将可重入次数加一
            } else if (firstReader == current) {
                firstReaderHoldCount++;
            } else {
            //到这里说明当前线程获取读锁成功,虽然不是第一个获取读锁的线程,于是更新一下cachedHoldCounter和readHolds
            //cachedHoldCounter:最后一个线程获取读锁的可重入次数
            //readHolds:除去第一个线程,其他线程获取读锁的可重入次数
                HoldCounter rh = cachedHoldCounter;
                if (rh == null || rh.tid != getThreadId(current))
                    cachedHoldCounter = rh = readHolds.get();
                else if (rh.count == 0)
                    readHolds.set(rh);
                rh.count++;
            }
            return true;
        }
    }
}

  3.unlock方法

public void unlock() {
    sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
    //实现在下面,就是尝试释放读锁,并判断还有没有线程占用读锁,没有线程占用读锁,就会进入到if里面doReleaseShared方法
    if (tryReleaseShared(arg)) {
        //前面可能有一些线程在获取写锁的时候,由于当前线程读锁没有释放,所以那些线程就被阻塞了
        //当前方法就是把那些线程释放一个
        doReleaseShared();
        return true;
    }
    return false;
}
protected final boolean tryReleaseShared(int unused) {
    Thread current = Thread.currentThread();
    //如果当前线程是第一个获取读锁的线程
    if (firstReader == current) {
        //第一个线程获取读锁的可重入次数为1,就释放,否则,可重入次数减一
        if (firstReaderHoldCount == 1)
            firstReader = null;
        else
            firstReaderHoldCount--;
    //当前线程不是第一个获取读锁的线程就更新cachedHoldCounter和readHolds
    } else {
        HoldCounter rh = cachedHoldCounter;
        if (rh == null || rh.tid != getThreadId(current))
            rh = readHolds.get();
        int count = rh.count;
        if (count <= 1) {
            readHolds.remove();
            if (count <= 0)
                throw unmatchedUnlockException();
        }
        --rh.count;
    }
    //这里一个无限循环,获取state,将高十六位减一,用CAS更新,更新成功的话,就判断读锁有没有被占用
    for (;;) {
        int c = getState();
        int nextc = c - SHARED_UNIT;
        if (compareAndSetState(c, nextc))
            return nextc == 0;
    }
}

  

五.总结

  我们用下面这个图来总结一下ReentrantReadWriteLock,这个锁就是利用state是32位的,高16位用于表示读锁的个数,低16位表示写锁的可重入次数,通过CAS对其进行了读写分离,适用于读多写少的场景;

原文地址:https://www.cnblogs.com/wyq1995/p/12267517.html

时间: 2024-07-28 21:27:04

简单看看读写锁ReentantReadWriteLock的相关文章

聊聊高并发(十五)实现一个简单的读-写锁(共享-排他锁)

读写锁是数据库中很常见的锁,又叫共享-排他锁,S锁和X锁.读写锁在大量读少量写的情况下有很高的效率优势. 读写锁是基于普通的互斥锁构建出来的更复杂的锁,它有两个基本特点: 1. 当任一线程持有读锁或写锁时,不允许其他线程再持有写锁 2. 当任一线程持有写锁时,不允许其他线程再持有读锁 也就是说,写锁是排他的,只要有一个线程持有写锁,就不允许其他线程再上锁.读锁是共享的,可以有多个线程持有读锁,但不允许同时持有写锁. 读锁和写锁还存在一个锁升级的问题,比如一个线程先持有了读锁,想升级成写锁,这时候

简单的读写锁。写优先

1.非常简单的,一线程写,一线程读. 读线程阻塞,直到写线程通知读线程.简单,重要是基本不会用错. #include <stdio.h> #include <string> #include <iostream> #include <memory> #include <mutex> #include <condition_variable> #include <vector> #include <thread>

一步一步实现读写锁

多线程编程中,需要对共享变量进行加锁.但是频繁地加锁,会对程序效率有很大影响.在某些读多写少的场景下,多个线程进行读数据时,如果都加互斥锁,这显然是不必须的.于是读写锁便应运而生. 读写锁的加锁规则: 1 如果没有加写锁时,那么多个线程可以同时加读锁:如果有加写锁时,不可以加读锁 2 不管是加了读锁还是写锁,都不能继续加写锁. 满足这两个条件,便可以初步实现一个读写锁.我们用两个锁,一个变量,实现一个简单的读写锁,代码如下 class rwlock { public: rwlock(): rea

读写锁学习(3)——微软源码实现的阅读

以下是微软源码,主要看了申请读锁EnterReadLock().申请写锁EnterWriteLock().释放读锁ExitReadLock().释放写锁ExitWriteLock()这四个方法,在下研究了两天,里面有很多细节问题还是了解的不够透彻,表述如有错误希望各位大神能指点一下,主要是通过myLock置0或置1来判断是否可以申请锁资源,通过threadRWCount这个数据结构来统计线程中存在读写锁的数量,这个数据结构通过LockId这个值来标识区分.用readercount和writerc

C# 防止同时调用=========使用读写锁三行代码简单解决多线程并发的问题

http://www.jb51.net/article/99718.htm 本文主要介绍了C#使用读写锁三行代码简单解决多线程并发写入文件时提示"文件正在由另一进程使用,因此该进程无法访问此文件"的问题.需要的朋友可以参考借鉴 在开发程序的过程中,难免少不了写入错误日志这个关键功能.实现这个功能,可以选择使用第三方日志插件,也可以选择使用数据库,还可以自己写个简单的方法把错误信息记录到日志文件. 选择最后一种方法实现的时候,若对文件操作与线程同步不熟悉,问题就有可能出现了,因为同一个文

C#使用读写锁三句代码简单解决多线程并发写入文件时提示“文件正在由另一进程使用,因此该进程无法访问此文件”的问题

在开发程序的过程中,难免少不了写入错误日志这个关键功能.实现这个功能,可以选择使用第三方日志插件,也可以选择使用数据库,还可以自己写个简单的方法把错误信息记录到日志文件. 选择最后一种方法实现的时候,若对文件操作与线程同步不熟悉,问题就有可能出现了,因为同一个文件并不允许多个线程同时写入,否则会提示“文件正在由另一进程使用,因此该进程无法访问此文件”. 这是文件的并发写入问题,就需要用到线程同步.而微软也给进程同步提供了一些相关的类可以达到这样的目的,本文使用到的 System.Threadin

ReentrantReadWriteLock读写锁的使用2

本文可作为传智播客<张孝祥-Java多线程与并发库高级应用>的学习笔记. 这一节我们做一个缓存系统. 在读本节前 请先阅读 ReentrantReadWriteLock读写锁的使用1 第一版 public class CacheDemo { private Map<String, Object> cache = new HashMap<String, Object>(); public static void main(String[] args) { CacheDem

深入浅出 Java Concurrency (14): 锁机制 part 9 读写锁 (ReentrantReadWriteLock) (2)[转]

这一节主要是谈谈读写锁的实现. 上一节中提到,ReadWriteLock看起来有两个锁:readLock/writeLock.如果真的是两个锁的话,它们之间又是如何相互影响的呢? 事实上在ReentrantReadWriteLock里锁的实现是靠java.util.concurrent.locks.ReentrantReadWriteLock.Sync完成的.这个类看起来比较眼熟,实际上它是AQS的一个子类,这中类似的结构在CountDownLatch.ReentrantLock.Semapho

linux线程同步(3)-读写锁

一.概述                                                    读写锁与互斥量的功能类似,对临界区的共享资源进行保护!互斥量一次只让一个线程进入临界区,读写锁比它有更高的并行性.读写锁有以下特点: 1.如果一个线程用读锁锁定了临界区,那么其他线程也可以用读锁来进入临界区,这样就可以多个线程并行操作.但这个时候,如果再进行写锁加锁就会发生阻塞,写锁请求阻塞后,后面如果继续有读锁来请求,这些后来的读锁都会被阻塞!这样避免了读锁长期占用资源,防止写锁饥饿