AQS系列(一)- ReentrantLock的加锁

前言

AQS即AbstractQueuedSynchronizer,是JUC包中的一个核心抽象类,JUC包中的绝大多数功能都是直接或间接通过它来实现的。本文是AQS系列的第一篇,后面会持续更新多篇,争取将JUC包中AQS相关的常用功能讲清楚,一方面巩固自己的知识体系,一方面亦可与各位园友互相学习。寒冷的冬天,要用技术来温暖自己。

一、AQS与ReentrantLock的关系

先奉上一张自制的丑陋类图

从下往上看,ReentrantLock类内部有两个静态内部类FairSync和NonfairSync,分别代表了公平锁和非公平锁(注意ReentrantLock实现的锁是可重入排它锁)。这两个静态内部类又共同继承了ReentrantLock的一个内部静态抽象类Sync,此抽象类继承AQS。

类的关系搞清楚了,我们下面一起看一下源码。

二、源码解读

ReentrantLock的默认构造方法创建的是非公平锁,也可以通过传入true来指定生成公平锁。下面我们以公平锁的加锁过程为例,进行解读源码。在解读源码之前需要先明确一下AQS中的state属性,它是int类型,state=0表示当前lock没有被占用,state=1表示被占用,如果是重入状态,则重入了几次state就是几。

 1 public class JucLockDemo1 {
 2     public static void main(String[] args){
 3         ReentrantLock lock = new ReentrantLock(true);
 4         Thread t1 = new Thread(() -> {
 5             lock.lock();
 6             // 业务逻辑
 7             lock.unlock();
 8         });
 9         t1.start();
10         System.out.println("main end");
11     }
12 }

其中第5行lock方法点进去的代码:

1 public void lock() {
2         sync.lock();
3     }

直接调了sync的lock方法,sync下面的lock方法是抽象方法,方法逻辑取决于具体的实现类,因为我们这里创建的是公平锁,所以进FairSync看它的lock方法实现:

1 final void lock() {
2             acquire(1);
3         }

FairSync中的lock方法很简单,直接调用了acquire方法,参数是1,继续跟踪:

1 public final void acquire(int arg) {
2         if (!tryAcquire(arg) &&
3             acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
4             selfInterrupt();
5     }

acquire方法位于AQS中,很重要,虽然只有短短的三行,但是里面的内容非常多。下面对里面的方法分别进行解读。

方法1:tryAcquire(arg)

此方法在FairSync中进行了实现,代码如下所示:

 1 protected final boolean tryAcquire(int acquires) {
 2             final Thread current = Thread.currentThread();
 3             int c = getState();
 4             // 判断state状态,如果是0表示锁空闲,可以去尝试获取
 5             if (c == 0) {
 6                 if (!hasQueuedPredecessors() &&
 7                     compareAndSetState(0, acquires)) {
 8                     setExclusiveOwnerThread(current);
 9                     return true;
10                 }
11             }// exclusiceOwnerThread存放的是当前运行的独占线程,如果此处判断为true,说明是当前线程第二次加锁,可以重入,只是要将state+1
12             else if (current == getExclusiveOwnerThread()) {
13                 int nextc = c + acquires;
14                 if (nextc < 0)
15                     throw new Error("Maximum lock count exceeded");
16                 setState(nextc);
17                 return true;
18             }
19             return false;
20         }

第二个if判断很好理解,是ReentrantLock对重入和排他的支持(所以说它是可重入排他锁),但是判断c==0之后的逻辑就比较麻烦了。

首先理解一下当前的逻辑:如果state=0说明lock空闲,又因为是公平锁,所以要先判断当前AQS队列中还有没有排队的任务,如果没有的话,就走一个CAS将state改成1,然后设置排他的执行线程,获取执行权;如果队列中有任务,那么acquire方法只能先返回false了。那么可以推断出,hasQueuedPredecessors方法就是用来判断队列中是否有排队的

点进去看看Lea大神的实现逻辑吧。

 1 public final boolean hasQueuedPredecessors() {
 2         // The correctness of this depends on head being initialized
 3         // before tail and on head.next being accurate if the current
 4         // thread is first in queue.
 5         Node t = tail; // Read fields in reverse initialization order
 6         Node h = head;
 7         Node s;
 8         return h != t &&
 9             ((s = h.next) == null || s.thread != Thread.currentThread());
10     }

代码不多,但表达的意思比较晦涩。第一个判断h!=t,如果h=t,说明队列是空的,这时这个判断条件是false,方法直接就返回了,这时外面的if取反是true,会继续走CAS抢占state和排他线程,获取锁,这种情况的路就走完了。如果h!=t为true,说明现在队列中有任务,这时进入后面的大括号 ((s = h.next) == null || s.thread != Thread.currentThread()) ,在队列中有任务的情况下,还有两种可能,一种是队列中的第一个任务就是当前线程,另一种是第一个任务不是当前线程。因为是公平锁,如果第一个任务时当前线程的话,那么它有权再去申请一下获取锁,如果第一个任务不是当前线程,那么当前线程就乖乖排队吧,等前面的执行完了才能轮到你。后面的大括号就是对这两种情况进行了区分,我们用反向逻辑来分析。方法hasQueuedPredecessors表示如果当前线程可以去竞争锁则返回false,不能竞争锁则返回true后面大括号结果为false的话当前线程才会去抢占锁,一个或运算怎样才能是false?或的两边都是false,就是说要(s = h.next) != null && s.thread == Thread.currentThread(),意思就是队列中第一个任务不为空且第一个任务就是当前线程,而这个&&的非与上述源码中的||在逻辑上是等价的,所以到这里意思就清楚了,return的&&连接的两个条件意思是:判断是否队列不为空且(第一个任务为空或者不是当前线程)。

hasQueuedPredecessors方法讲完,tryAcquire方法就没有什么难点了,这时我们回到上面开始的acquire(int arg)方法。如果tryAcquire返回的是true,说明获取到了锁,那么就不会再走后面的流程了;如果返回的是false,则进入acquireQueue。但我们先看里面的addWaiter方法。

方法2:  addWaiter(Node.EXCLUSIVE), arg)

此方法用于生成当前线程的node节点并把它放在队尾,方法源码:

 1 private Node addWaiter(Node mode) {
 2         Node node = new Node(Thread.currentThread(), mode);// 创建当前线程的node节点
 3         // Try the fast path of enq; backup to full enq on failure
 4         Node pred = tail;
 5         if (pred != null) { // 判断队尾是否为空,如果不为空则将node节点拼接在后面
 6             node.prev = pred; // 将node节点连接到队尾节点
 7             if (compareAndSetTail(pred, node)) { // 通过CAS将node节点放到队尾
 8                 pred.next = node; // 如果CAS操作成功了,那么将原队尾节点的next连接到node节点,组成双向队列
 9                 return node;
10             }
11         }
12         enq(node); // 能到这里的话分两种情况:1、队尾是空的;2、队尾不是空的,但是进行CAS操作时由于被其他线程抢占导致失败;
13         return node;
14     }

通过注解大家应该能梳理清楚逻辑,下面着重说一下enq(node)方法的实现:

 1 private Node enq(final Node node) {
 2         for (;;) {
 3             Node t = tail;
 4             if (t == null) { // Must initialize 队尾是null,符合前面说的第一种情况
 5                 if (compareAndSetHead(new Node())) // 设置队首
 6                     tail = head; // 队首队尾都初始化成空node
 7             } else { // 队尾不为空,是前面说的第二种情况,此种情况的处理逻辑同上面对pred != null的处理
 8                 node.prev = t;
 9                 if (compareAndSetTail(t, node)) {
10                     t.next = node;
11                     return t;
12                 }
13             }
14         }
15     }

可以看到此方法无限循环,直到执行完else中的逻辑。此处需要注意的一点是,如果刚开始时队列是空的,即tail是null,会触发队首队尾的初始化,初始化之后再一次循环会进入else中,将node放到原队尾的后面,返回t。注意返回的t没有用到,是在其他场景的方法中用的。

 方法3:acquireQueued(final Node node, int arg)

该方法用于获取锁,返回值表示当前获取到锁的线程在获取锁的过程中是否中断过,下面先看源码:

 1 final boolean acquireQueued(final Node node, int arg) {
 2         boolean failed = true;
 3         try {
 4             boolean interrupted = false;
 5             for (;;) {
 6                 final Node p = node.predecessor(); // 获取当前节点的前一个节点
 7                 if (p == head && tryAcquire(arg)) { // 如果p==head说明node是第一个任务,那么就可以通过tryAcquire去获取锁
 8                     setHead(node); // 获取锁成功,则将node放到队首位置,并将thread和prev置为null
 9                     p.next = null; // help GC 再将p的next置为null,切断与外界的一切联系
10                     failed = false;
11                     return interrupted;
12                 }// 下面if中的两个方法很重要,着重讲解
13                 if (shouldParkAfterFailedAcquire(p, node) &&
14                     parkAndCheckInterrupt())
15                     interrupted = true;
16             }
17         } finally {
18             if (failed)
19                 cancelAcquire(node);
20         }
21     }

通过注解,相信对第一个if中的逻辑能理解清楚,下我们着重讲解第二个if中的两个方法。

第一个是 shouldParkAfterFailedAcquire(p, node) 方法,此方法的逻辑为:

 1 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
 2         int ws = pred.waitStatus; // 1、对于新建的Node节点,此状态都为0(只有addConditionWaiter新建node节点时才不是0)
 3         if (ws == Node.SIGNAL)
 4             // 3、在2中将ws置为-1后,该方法返回false,外层for循环再走一圈,第二次进入此方法时会进入这里,直接返回true。 -1的状态表示可以将当前线程park
 5             return true;
 6         if (ws > 0) {
 7
 8             do {
 9                 node.prev = pred = pred.prev;
10             } while (pred.waitStatus > 0);
11             pred.next = node;
12         } else {
13             // 2、是ws=0的话会进入这里,将ws置为-1,0的状态表示还不能park
14             compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
15         }
16         return false;
17     }

如果返回的是true,则进入第二个方法将当前线程暂停:

1 private final boolean parkAndCheckInterrupt() {
2         LockSupport.park(this);
3         return Thread.interrupted();
4     }

当前面的线程执行完毕,唤醒这个线程的时候,就会从第三行开始继续执行for循环中获取锁的逻辑,直到获取锁。

到这里,ReentrantLock的lock方法便结束了,整体流程就是这样。看JUC包中的源码,可以看到写的很简洁,有时一两个简单的判断条件却代表了非常多的意思,充分显示了编程者缜密又举重若轻的实力,读这样的源码,有一种看本格推理小说般的思维上的愉悦感。

下一节我们将介绍unlock方法的原理,与本节最后一个方法就能接上了,下期再会!

原文地址:https://www.cnblogs.com/zzq6032010/p/12002803.html

时间: 2024-10-11 07:47:52

AQS系列(一)- ReentrantLock的加锁的相关文章

AQS系列(三)- ReentrantReadWriteLock读写锁的加锁

前言 前两篇我们讲述了ReentrantLock的加锁释放锁过程,相对而言比较简单,本篇进入深水区,看看ReentrantReadWriteLock-读写锁的加锁过程是如何实现的,继续拜读老Lea凌厉的代码风. 一.读写锁的类图 读锁就是共享锁,而写锁是独占锁.读锁与写锁之间的互斥关系为:读读可同时执行(有条件的):读写与写写均互斥执行.注意此处读读可并行我用了有条件的并行,后文会对此做介绍. 继续奉上一张丑陋的类图: 可以看到ReentrantReadWriteLock维护了五个内部类,Ree

Java并发系列[5]----ReentrantLock源码分析

在Java5.0之前,协调对共享对象的访问可以使用的机制只有synchronized和volatile.我们知道synchronized关键字实现了内置锁,而volatile关键字保证了多线程的内存可见性.在大多数情况下,这些机制都能很好地完成工作,但却无法实现一些更高级的功能,例如,无法中断一个正在等待获取锁的线程,无法实现限定时间的获取锁机制,无法实现非阻塞结构的加锁规则等.而这些更灵活的加锁机制通常都能够提供更好的活跃性或性能.因此,在Java5.0中增加了一种新的机制:Reentrant

AQS系列(五)- CountDownLatch的使用及原理

前言 前面四节学完了AQS最难的两种重入锁应用,下面两节进入实战学习,看看JUC包中其他的工具类是如何运用AQS实现特定功能的.今天一起看一下CountDownLatch. CountDownLatch可以用来实现多个线程执行完一个功能后让另一个线程继续执行的功能.常见的场景比如大文件的处理,我们需要对一个或多个文件进行处理,处理完之后再统一入库,这时我们就可以用到CountDownLatch了. 一.使用样例 1 public static void main(String[] args) {

ReentrantLock 与 AQS 源码分析

ReentrantLock 与 AQS 源码分析 1. 基本结构 ?? 重入锁 ReetrantLock,JDK 1.5新增的类,作用与synchronized关键字相当,但比synchronized更加灵活.ReetrantLock本身也是一种支持重进入的锁,即该锁可以支持一个线程对资源重复加锁,但是加锁多少次,就必须解锁多少次,这样才可以成功释放锁. 1. 继承 没有继承任何类,因为很多操作都使用了组合完成. 2. 实现 Lock, java.io.Serializable ??这里着重介绍

死磕 java同步系列之AQS终篇(面试)

问题 (1)AQS的定位? (2)AQS的重要组成部分? (3)AQS运用的设计模式? (4)AQS的总体流程? 简介 AQS的全称是AbstractQueuedSynchronizer,它的定位是为Java中几乎所有的锁和同步器提供一个基础框架. 在之前的章节中,我们一起学习了ReentrantLock.ReentrantReadWriteLock.Semaphore.CountDownLatch的源码,今天我们一起来对AQS做个总结. 状态变量state AQS中定义了一个状态变量state

Java并发系列(5)ReentrantLock源码分析

在Java5.0之前,协调对共享对象的访问可以使用的机制只有synchronized和volatile.我们知道synchronized关键字实现了内置锁,而volatile关键字保证了多线程的内存可见性. 在大多数情况下,这些机制都能很好地完成工作,但却无法实现一些更高级的功能,例如,无法中断一个正在等待获取锁的线程,无法实现限定时间的获取锁机制,无法实现非阻塞结构的加锁规则等.而这些更灵活的加锁机制通常都能够提供更好的活跃性或性能. 因此,在Java5.0中增加了一种新的机制:Reentra

死磕 java同步系列之ReentrantReadWriteLock源码解析

问题 (1)读写锁是什么? (2)读写锁具有哪些特性? (3)ReentrantReadWriteLock是怎么实现读写锁的? (4)如何使用ReentrantReadWriteLock实现高效安全的TreeMap? 简介 读写锁是一种特殊的锁,它把对共享资源的访问分为读访问和写访问,多个线程可以同时对共享资源进行读访问,但是同一时间只能有一个线程对共享资源进行写访问,使用读写锁可以极大地提高并发量. 特性 读写锁具有以下特性: 是否互斥 读 写 读 否 是 写 是 是 可以看到,读写锁除了读读

死磕 java同步系列之StampedLock源码解析

问题 (1)StampedLock是什么? (2)StampedLock具有什么特性? (3)StampedLock是否支持可重入? (4)StampedLock与ReentrantReadWriteLock的对比? 简介 StampedLock是java8中新增的类,它是一个更加高效的读写锁的实现,而且它不是基于AQS来实现的,它的内部自成一片逻辑,让我们一起来学习吧. StampedLock具有三种模式:写模式.读模式.乐观读模式. ReentrantReadWriteLock中的读和写都是

死磕 java同步系列之CyclicBarrier源码解析——有图有真相

问题 (1)CyclicBarrier是什么? (2)CyclicBarrier具有什么特性? (3)CyclicBarrier与CountDownLatch的对比? 简介 CyclicBarrier,回环栅栏,它会阻塞一组线程直到这些线程同时达到某个条件才继续执行.它与CountDownLatch很类似,但又不同,CountDownLatch需要调用countDown()方法触发事件,而CyclicBarrier不需要,它就像一个栅栏一样,当一组线程都到达了栅栏处才继续往下走. 使用方法 pu