ReentrantReadWriteLock源码分析(一)

此处源码分析,主要是基于读锁,非公平机制,JDK1.8。

问题:

1、ReentrantReadWriteLock是如何创建读锁与写锁?

2、读锁与写锁的区别是什么?

3、锁的重入次数与获取锁的线程数分别是用哪种方式记录的?

4、当队列中出现多个共享模式的线程节点连续排列时,那么当第一个共享模式的线程拿到锁之后,后面的共享线程节点怎么获取锁?

一、创建ReadLock。

ReentrantReadWriteLock rrw = new ReentrantReadWriteLock();
public ReentrantReadWriteLock(boolean fair) {
	sync = fair ? new FairSync() : new NonfairSync();
	readerLock = new ReadLock(this);
	writerLock = new WriteLock(this);
}
rrw.readLock().lock();

1、当fair的值为false时,非公平的方式创建锁,当fair的值为true时,公平的方式创建锁。

2、初始化readerLock与writerLock,这两个变量是ReentrantReadWriteLock的内部变量。

3、sync执行非公平的锁。

二、lock()源码分析

2.1、sync.acquireShared(1)

public void lock() {
	sync.acquireShared(1);
}
public final void acquireShared(int arg) {
	if (tryAcquireShared(arg) < 0)
		doAcquireShared(arg);
}

(1)tryAcquireShared的作用是当前线程获取读锁,当返回1时,表示获取成功,-1表示获取失败。

(2)doAcquireShared,表示获取失败的时候调用。将获取失败的线程加入到等待队列中,并调用LockSupport.park方法阻塞住,等待线程释放permit。

2.2、tryAcquireShared(arg)

protected final int tryAcquireShared(int unused) {

	Thread current = Thread.currentThread();
	// 获取到占有锁的线程数
	int c = getState();
	// 如果写锁被占领了且不是当前线程占领,那么直接返回 -1
	if (exclusiveCount(c) != 0 &&
		getExclusiveOwnerThread() != current)
		return -1;
	int r = sharedCount(c); // 占有共享锁的线程数
	if (!readerShouldBlock() && // 如果队列的头节点的next节点是独享模式的线程节点即获取写锁的线程节点,返回true
		r < MAX_COUNT && 	// 共享的数据不能超过65535
		compareAndSetState(c, c + SHARED_UNIT)) {  // cas设置state
		if (r == 0) {   // 线程来拿读锁,读锁和写锁没有被任何线程拥有,那么r==0
			firstReader = current; //
			firstReaderHoldCount = 1;
		} else if (firstReader == current) { // 如果线程重复获取读锁,那么从这里开始重入
			firstReaderHoldCount++;
		} else { // 如果读锁被线程x占领,线程y也要来申请读锁,那么分支就走到这里了

		// HoldCounter类中存储了两个属性,一个是count,用于记录线程的重入次数,一个是tid,记录当前线程的id
			HoldCounter rh = cachedHoldCounter;

			// 线程x拥有读锁之后,线程y第一次申请的时候会走到这里
			//cachedHoldCounter 是一个缓存,保存当前操作线程的上一个线程的操作结果。线程y操作完之后,就会保存线程y的信息
			// 如果另外一个线程z来获取到读锁的时候,虽然rh!=null,但是rh.tid != getThreadId(current),
			//那么会创建一个默认的HoldCounter,并保存到cachedHoldCounter,并且默认的count=0
			if (rh == null || rh.tid != getThreadId(current))
			// readHolds.get(),查看源码可以知道,在这个方法中包含了数据初始化的过程,会调用ReentrantReadWriteLock.java
			// 下面的方法
			/**
			* public HoldCounter initialValue() {
               *             return new HoldCounter();
                 *       }
                 */
				cachedHoldCounter = rh = readHolds.get();
			else if (rh.count == 0) // 这个分支也会来到,当线程释放锁,但是没有关闭,当再次调用线程时,readHolds中会存在HoldCounter,count=0
				readHolds.set(rh);
			rh.count++; // 计算重入的次数
		}
		return 1;
	}
	return fullTryAcquireShared(current);
}

请注意:

(1)ReentrantReadWriteLock中维持了一个类ThreadLocalHoldCounter,这个类会生成一个map,key是线程的id,value是HoldCounter对象,HoldCounter对象如下:

   static final class HoldCounter {
            int count = 0;
            // Use id, not reference, to avoid garbage retention
            final long tid = getThreadId(Thread.currentThread());
        }

其中count就是线程的重入次数,tid就是当前线程的id。这个是与ReentrantLock区别的地方。

(2)ReentrantReadWriteLock使用32位int类型来表示占有锁的线程数,其中高16位是获取到读锁的线程数,低16位是获取到写锁的线程数,提供了计算线程数的方法。

static final int SHARED_SHIFT   = 16;(1)
static final int SHARED_UNIT    = (1 << SHARED_SHIFT);(2)
static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;(3)
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;(4)

/** Returns the number of shared holds represented in count  */
static int sharedCount(int c)    { return c >>> SHARED_SHIFT; }(5)
/** Returns the number of exclusive holds represented in count  */
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }(6)
c + SHARED_UNIT

其中(1)是共享移动常量;(2)是共享添加的常量;(3)是最大线程数65535(也就是11111111 11111111);(4)跟(3)一样;(5)计算共享线程数,把c的值向右移16为,并且高位补0; >>> 无符号右移,高位补0;(6)计算独享的线程数,把c的值与11111111 11111111 按位与,这样其实就是取到了写锁的线程数;(7)是共享线程+1。

源码分析:

2.2.1、readerShouldBlock()

这个方法的作用是把判断当前获取读锁的线程是否需要阻塞,条件是:在等待队列中头节点的下一个节点是独享模式的线程。

// 读锁应该被阻塞
final boolean readerShouldBlock() {
	return apparentlyFirstQueuedIsExclusive();
}

/**
 * Returns {@code true} if the apparent first queued thread, if one
 * exists, is waiting in exclusive mode.  If this method returns
 * {@code true}, and the current thread is attempting to acquire in
 * shared mode (that is, this method is invoked from {@link
 * #tryAcquireShared}) then it is guaranteed that the current thread
 * is not the first queued thread.  Used only as a heuristic in
 * ReentrantReadWriteLock.
 *如果第一个入队列的线程节点存在,并且工作在独享模式下,那么返回true;
 *如果这个方法返回true,并且当前线程以共享的模式获取锁,这个方法保证了它不是第一个入队列的
 *(读锁与读锁都是共存的,所以不会入队,只有当队列中有独享模式的线程节点的时候,获取共享模式的线程才会加入到队列中。)
 */
final boolean apparentlyFirstQueuedIsExclusive() {
	Node h, s;
	// 头节点存在,并且存在下一个节点,下一个节点是独享模式,下一个节点的thread不是空,则返回true
	return (h = head) != null &&
		(s = h.next)  != null &&
		!s.isShared()         &&
		s.thread != null;
}

2.2.2、fullTryAcquireShared(current)

这个方法的作用与tryAcquireShared的作用很类似。

// 进入这个方法的条件,
/**条件1:!readerShouldBlock() && // 如果第一个入队列的线程节点存在,并且工作在独享模式下,那么返回true;
	*	条件2:r < MAX_COUNT && 	// 共享的数据不能超过65535,读锁的线程数已经超过了65535
	*	条件3:compareAndSetState(c, c + SHARED_UNIT) // 两个竞争读锁的线程都运行到这里,第一个竞争成功,那么第二个就会竞争失败,返回false
    *  其实这个方法分别对这三种状态进行处理
   */
   /**
         * Full version of acquire for reads, that handles CAS misses
         * and reentrant reads not dealt with in tryAcquireShared.
         */
        final int fullTryAcquireShared(Thread current) {
            /*
             * This code is in part redundant with that in
             * tryAcquireShared but is simpler overall by not
             * complicating tryAcquireShared with interactions between
             * retries and lazily reading hold counts.
             */
            HoldCounter rh = null;
            for (;;) {
                int c = getState();
                // 如果排他锁被别的线程拿了,直接返回-1
                if (exclusiveCount(c) != 0) {
                    if (getExclusiveOwnerThread() != current)
                        return -1;
                    // else we hold the exclusive lock; blocking here
                    // would cause deadlock.
                } else if (readerShouldBlock()) {  // 这里是对条件1的处理
                // 如果队列的头的下一个节点是请求的排他锁的线程在等待,那么就返回true
                    // Make sure we‘re not acquiring read lock reentrantly
                    if (firstReader == current) {
                        // assert firstReaderHoldCount > 0;
                    } else {
                        if (rh == null) {
                            rh = cachedHoldCounter;
                            if (rh == null || rh.tid != getThreadId(current)) {
                                rh = readHolds.get();
                                // 如果当前线程的count==0,也就是说当前线程才进来,没有获取到锁,那么直接把它从readHolds中移除
                                if (rh.count == 0)
                                // 移除当前线程的HoldCounter
                                    readHolds.remove();
                            }
                        }
                        // 移除之后,返回-1
                        if (rh.count == 0)
                            return -1;
                    }
                }
                // 这里是对条件2的处理,直接抛出错误!
                if (sharedCount(c) == MAX_COUNT)
                    throw new Error("Maximum lock count exceeded");
                    // 这里是对条件3的处理,竞争设置state,如果竞争还是失败,那么就要再循环一次,直到死循环能够跳出去
                if (compareAndSetState(c, c + SHARED_UNIT)) {
                // 如果共享锁的数量为0
                    if (sharedCount(c) == 0) {
                    // 设置第一个线程为当前的线程
                        firstReader = current;
                        // 设置HoldCount =1
                        firstReaderHoldCount = 1;
                    } else if (firstReader == current) {
                        firstReaderHoldCount++;
                    } else {
                        if (rh == null)
                            rh = cachedHoldCounter;
                        if (rh == null || rh.tid != getThreadId(current))
                            rh = readHolds.get();
                        else if (rh.count == 0)
                            readHolds.set(rh);
                        rh.count++;
                        cachedHoldCounter = rh; // cache for release
                    }
                    return 1;
                }
            }
        }

  

 

  

原文地址:https://www.cnblogs.com/boywwj/p/8649837.html

时间: 2024-10-05 09:57:01

ReentrantReadWriteLock源码分析(一)的相关文章

Java显式锁学习总结之五:ReentrantReadWriteLock源码分析

概述 我们在介绍AbstractQueuedSynchronizer的时候介绍过,AQS支持独占式同步状态获取/释放.共享式同步状态获取/释放两种模式,对应的典型应用分别是ReentrantLock和Semaphore,AQS还可以混合两种模式使用,读写锁ReentrantReadWriteLock就是如此. 设想以下情景:我们在系统中有一个多线程访问的缓存,多个线程都可以对缓存进行读或写操作,但是读操作远远多于写操作,要求写操作要线程安全,且写操作执行完成要求对当前的所有读操作马上可见. 分析

ReentrantReadWriteLock源码分析

概述 ReentrantReadWriteLock维护了一对相关的锁,它们分别是共享readLock和独占writeLock.关于共享读锁和排他写锁的概念其实很好理解.所谓共享读锁就是一个线程读的时候,其它线程也可以来读(共享),但是不能来写.排他写锁是指一个线程在写的时候,其它线程不能来写或读(排他).除了这个特点之外,ReentrantReadWriteLock还有一个特点就是可重入的.它和ReentrantLock一样都是支持Condition的.而且ReentrantReadWerite

多线程之美7一ReentrantReadWriteLock源码分析

目录 前言 在多线程环境下,为了保证线程安全, 我们通常会对共享资源加锁操作,我们常用Synchronized关键字或者ReentrantLock 来实现,这两者加锁方式都是排他锁,即同一时刻最多允许一个线程操作,然而大多数场景中对共享资源读多于写,那么存在线程安全问题的是写操作(修改,添加,删除),我们是否应该考虑将读和写两个分开,只要运用合理,并发性能是不是可以提高,吞吐量增大呢? ReentrantReadWriteLock已经为我们实现了这种机制,我们一起来看它是怎样实现的吧! 1.读写

java并发锁ReentrantReadWriteLock读写锁源码分析

1.ReentrantReadWriterLock基础 所谓读写锁,是对访问资源共享锁和排斥锁,一般的重入性语义为 如果对资源加了写锁,其他线程无法再获得写锁与读锁,但是持有写锁的线程,可以对资源加读锁(锁降级):如果一个线程对资源加了读锁,其他线程可以继续加读锁. java.util.concurrent.locks中关于多写锁的接口:ReadWriteLock public interface ReadWriteLock { /** * Returns the lock used for r

[源码分析]读写锁ReentrantReadWriteLock

一.简介 读写锁. 读锁之间是共享的. 写锁是独占的. 首先声明一点: 我在分析源码的时候, 把jdk源码复制出来进行中文的注释, 有时还进行编译调试什么的, 为了避免和jdk原生的类混淆, 我在类前面加了"My". 比如把ReentrantLock改名为了MyReentrantLock, 在源码分析的章节里, 我基本不会对源码进行修改, 所以请忽视这个"My"即可. 1. ReentrantReadWriteLock类里的字段 unsafe在这里是用来给TID_O

supervisor启动worker源码分析-worker.clj

supervisor通过调用sync-processes函数来启动worker,关于sync-processes函数的详细分析请参见"storm启动supervisor源码分析-supervisor.clj".sync-processes函数代码片段如下: sync-processes函数代码片段 ;; sync-processes函数用于管理workers, 比如处理不正常的worker或dead worker, 并创建新的workers;; supervisor标识supervis

深入理解读写锁—ReadWriteLock源码分析

ReadWriteLock管理一组锁,一个是只读的锁,一个是写锁.读锁可以在没有写锁的时候被多个线程同时持有,写锁是独占的. 所有读写锁的实现必须确保写操作对读操作的内存影响.换句话说,一个获得了读锁的线程必须能看到前一个释放的写锁所更新的内容. 读写锁比互斥锁允许对于共享数据更大程度的并发.每次只能有一个写线程,但是同时可以有多个线程并发地读数据.ReadWriteLock适用于读多写少的并发情况. Java并发包中ReadWriteLock是一个接口,主要有两个方法,如下: public i

jdk源码分析总览

今天看到了一个源码分析按照重要性排序的例子, 这里拿过来用了,之后按照这个顺序不断的完善源码的内容. 引用的出处忘记了(对作者说声抱歉) 很多java开发的小伙伴都会阅读jdk源码,然而确不知道应该从哪读起.以下为小编整理的通常所需阅读的源码范围. 标题为包名,后面序号为优先级1-4,优先级递减 1.java.lang 1) Object 12) String 13) AbstractStringBuilder 14) StringBuffer 15) StringBuilder 16) Boo

死磕 java同步系列之ReentrantReadWriteLock源码解析

问题 (1)读写锁是什么? (2)读写锁具有哪些特性? (3)ReentrantReadWriteLock是怎么实现读写锁的? (4)如何使用ReentrantReadWriteLock实现高效安全的TreeMap? 简介 读写锁是一种特殊的锁,它把对共享资源的访问分为读访问和写访问,多个线程可以同时对共享资源进行读访问,但是同一时间只能有一个线程对共享资源进行写访问,使用读写锁可以极大地提高并发量. 特性 读写锁具有以下特性: 是否互斥 读 写 读 否 是 写 是 是 可以看到,读写锁除了读读