JAVA并发之ReentrantLock源码(二)

  上一篇我们讲到了ReentrantLock通过使用AQS框架实现了tryAcquire、tryRelease方法,从ReentrantLock层面分析源码,本次我们将进一步深入AQS类,查看AQS底层是如何实现线程同步的。

1.acquire()

  首先自然从加锁开始看起,从lock.lock调用AQS中的acquire方法,我们已经进入了AQS源码层面,一个看起来很简洁的acquire如下:

 1     /**
 2      * Acquires in exclusive mode, ignoring interrupts.  Implemented
 3      * by invoking at least once {@link #tryAcquire},
 4      * returning on success.  Otherwise the thread is queued, possibly
 5      * repeatedly blocking and unblocking, invoking {@link
 6      * #tryAcquire} until success.  This method can be used
 7      * to implement method {@link Lock#lock}.
 8      *
 9      * @param arg the acquire argument.  This value is conveyed to
10      *        {@link #tryAcquire} but is otherwise uninterpreted and
11      *        can represent anything you like.
12      */
13     public final void acquire(int arg) {
14         if (!tryAcquire(arg) &&
15             acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
16             selfInterrupt();
17     }

  acquire进行了如下的操作:

  1、tryAcquire(arg):尝试获取需要的资源,如果成功返回true,这个方法就完成啦,如果失败,将会进入下一个步骤

在这个方法是需要使用者自己实现的语意,可以查看上一篇博客中reentrantlock中的实现。

  2、addWaiter(Node.EXCLUSIVE), arg):新建一个当前线程的节点(独占),将其放入CLH队列中等待:

 1     private Node addWaiter(Node mode) {
 2         Node node = new Node(Thread.currentThread(), mode);
 3         // Try the fast path of enq; backup to full enq on failure
 4         Node pred = tail;
 5         if (pred != null) {//如果原来有头有尾
 6             node.prev = pred;//设置当前节点的前置节点
 7             if (compareAndSetTail(pred, node)) {//CAS尝试将其放入末尾
 8                 pred.next = node;
 9                 return node;
10             }
11         }
12         enq(node);//当原来没头没尾的时候要同时CAS设置头尾节点为当前节点
13         return node;
14     }
15
16     private Node enq(final Node node) {
17         for (;;) {//自旋不断的用cas设置头尾节点
18             Node t = tail;
19             if (t == null) { // Must initialize
20                 if (compareAndSetHead(new Node()))
21                     tail = head;
22             } else {
23                 node.prev = t;
24                 if (compareAndSetTail(t, node)) {
25                     t.next = node;
26                     return t;
27                 }
28             }
29         }
30     }

  3、acquireQueued:会获取中断,如果中断了,就通过返回值true使acquire方法执行selfInterrupt()再将线程变成中断状态。让我们来看看acquireQueued内部的实现:

 1     final boolean acquireQueued(final Node node, int arg) {
 2         boolean failed = true;
 3         try {
 4             boolean interrupted = false;
 5             for (;;) {//不断自旋
 6                 final Node p = node.predecessor();
 7                 if (p == head && tryAcquire(arg)) {//如果当前节点的前一个节点排队排到了头的位置并且可以获取到资源
 8                     setHead(node);
 9                     p.next = null; // help GC
10                     failed = false;
11                     return interrupted;
12                 }
13                 if (shouldParkAfterFailedAcquire(p, node) &&
14                     parkAndCheckInterrupt())//判断是否需要去休息(如果是的话),就执行parkAndCheckInterrupt()方法去休息
15                     interrupted = true;
16             }
17         } finally {
18             if (failed)
19                 cancelAcquire(node);
20         }
21     }
22
23     private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
24         int ws = pred.waitStatus;
25         if (ws == Node.SIGNAL)
26             /*
27              * This node has already set status asking a release
28              * to signal it, so it can safely park.
29              * 如果前一个节点已经变成 SIGNAL(会提醒下一个节点)状态,当前节点的线程就可以去“睡觉”了
30              */
31             return true;
32         if (ws > 0) {
33             /*
34              * Predecessor was cancelled. Skip over predecessors and
35              * indicate retry.
36              * 如果前一个线程已经被取消了,那么将他们跳过不断重复
37              */
38             do {
39                 node.prev = pred = pred.prev;
40             } while (pred.waitStatus > 0);
41             pred.next = node;
42         } else {
43             /*
44              * waitStatus must be 0 or PROPAGATE.  Indicate that we
45              * need a signal, but don‘t park yet.  Caller will need to
46              * retry to make sure it cannot acquire before parking.
47              * 如果前一个节点没有变成SIGNAL状态,就把它置为signal状态
48              */
49             compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
50         }
51         return false;
52     }
53
54     private final boolean parkAndCheckInterrupt() {
55         LockSupport.park(this);//通过park方法让当前线程阻塞
56         return Thread.interrupted();//返回当前线程的中断情况
57     }

  到这里,acquire方法已经看完了,通过AQS简介以及ReentranLock的tryAcquire方法,我们基本理解了ReentranLock加锁的整个流程:

  尝试获取资源->获取失败则加一个独占的节点到CLH队列排队->让前一个节点变为signal状态->把自己阻塞等待前一个节点唤醒(当然唤醒操作会在release中实现)。

  相信看到这里,都会迫不及待的点开release方法查看当前线程是怎么被前一个线程唤醒的吧,那我们就一起来看一看!

 1     public final boolean release(int arg) {
 2         if (tryRelease(arg)) {//1
 3             Node h = head;
 4             if (h != null && h.waitStatus != 0)
 5                 unparkSuccessor(h);
 6             return true;
 7         }
 8         return false;
 9     }
10     private void unparkSuccessor(Node node) {
11         /*
12          * If status is negative (i.e., possibly needing signal) try
13          * to clear in anticipation of signalling.  It is OK if this
14          * fails or if status is changed by waiting thread.
15          */
16         int ws = node.waitStatus;
17         if (ws < 0)
18             compareAndSetWaitStatus(node, ws, 0);
19         /*
20          * Thread to unpark is held in successor, which is normally
21          * just the next node.  But if cancelled or apparently null,
22          * traverse backwards from tail to find the actual
23          * non-cancelled successor.
24          */
25         Node s = node.next;
26         if (s == null || s.waitStatus > 0) {//找到下一个不是取消状态的节点
27             s = null;
28             for (Node t = tail; t != null && t != node; t = t.prev)
29                 if (t.waitStatus <= 0)
30                     s = t;
31         }
32         if (s != null)//节点不为空就把下一个节点唤醒
33             LockSupport.unpark(s.thread);
34     }

  1、尝试释放资源(一般来说都是会成功的,除非被中断了)

  2、找到头节点,如果头节点不为空,并且当前节点的等待状态不为0(有下一个节点改变过当前节点的等待状态)

  3、unparkSuccessor方法找到下一个不是取消状态的节点,如果不是空,就把它唤醒

  看完这个,结合三篇博客,我们对ReentranLock的流程,以及AQS在其中做了哪些重要的事情都弄明白了,也对使用AQS构造同步变量需要实现哪些方法有了一定的理解,虽然不一定能立刻上手实现一个,但是给你一个实现的同步器比如CountdownLatch等,查看它的内部实现一定不会出现翻源码绕晕的情况了。

  那么本篇ReentranLock基于AQS层面的实现就到此结束啦,也算是填了之前挖的一个坑。

原文地址:https://www.cnblogs.com/zzzdp/p/9326366.html

时间: 2024-08-04 17:03:33

JAVA并发之ReentrantLock源码(二)的相关文章

Java并发之AQS源码分析(二)

我在Java并发之AQS源码分析(一)这篇文章中,从源码的角度深度剖析了 AQS 独占锁模式下的获取锁与释放锁的逻辑,如果你把这部分搞明白了,再看共享锁的实现原理,思路就会清晰很多.下面我们继续从源码中窥探共享锁的实现原理. 共享锁 获取锁 public final void acquireShared(int arg) { // 尝试获取共享锁,小于0表示获取失败 if (tryAcquireShared(arg) < 0) // 执行获取锁失败的逻辑 doAcquireShared(arg)

Java并发编程 ReentrantLock 源码分析

ReentrantLock 一个可重入的互斥锁 Lock,它具有与使用 synchronized 方法和语句所访问的隐式监视器锁相同的一些基本行为和语义,但功能更强大. 这个类主要基于AQS(AbstractOwnableSynchronizer)封装的 公平与非公平锁. 所谓公平锁就是指 在多个线程的争用下,这些锁倾向于将访问权授予等待时间最长的线程,换句话说也就是先被锁定的线程首先获得锁. 非公平锁正好相反,解锁时没有固定顺序. 让我们边分析源代码边学习如何使用该类 先来看一下构造参数,默认

java并发之hashmap源码

在上篇博客中分析了hashmap的用法,详情查看java并发之hashmap 本篇博客重点分析下hashmap的源码(基于JDK1.8) 一.成员变量 HashMap有以下主要的成员变量 /** * The default initial capacity - MUST be a power of two. 默认初始容量 */ static final int DEFAULT_INITIAL_CAPACITY = 1 << 4; // aka 16 /** * The maximum capa

深入java并发包源码(二)AQS的介绍与使用

深入java并发包源码(一)简介 深入java并发包源码(二)AQS的介绍与使用 深入java并发包源码(三)AQS独占方法源码分析 AQS 本文章会讲解 AQS 的使用方法,然后通过 DEBUG 跟踪 AQS 执行的一系列操作来分析源码,读者跟着文章 DEBUG 跟踪源码能更容易理解. AQS 是什么? AbstractQueuedSynchronizer 队列同步器(AQS)是一个抽象类,作为并发工具的基础组件,为真正的实现类提供基础设施.并发工具是面向使用者的,AQS 面向的是并发工具的实

Java并发系列[5]----ReentrantLock源码分析

在Java5.0之前,协调对共享对象的访问可以使用的机制只有synchronized和volatile.我们知道synchronized关键字实现了内置锁,而volatile关键字保证了多线程的内存可见性.在大多数情况下,这些机制都能很好地完成工作,但却无法实现一些更高级的功能,例如,无法中断一个正在等待获取锁的线程,无法实现限定时间的获取锁机制,无法实现非阻塞结构的加锁规则等.而这些更灵活的加锁机制通常都能够提供更好的活跃性或性能.因此,在Java5.0中增加了一种新的机制:Reentrant

Java并发编程之ReentrantLock源码分析

ReentrantLock介绍 从JDK1.5之前,我们都是使用synchronized关键字来对代码块加锁,在JDK1.5引入了ReentrantLock锁.synchronized关键字性能比ReentrantLock锁要差,而且ReentrantLock锁功能要比synchronized关键字功能强大. 特点 synchronized关键字和ReentrantLock锁都是重入锁,可重入锁是指当一个线程获取到锁后,此线程还可继续获得这把锁,在此线程释放这把锁前其他线程则不可获得这边锁.相比

java线程池ThreadPoolExector源码分析

java线程池ThreadPoolExector源码分析 今天研究了下ThreadPoolExector源码,大致上总结了以下几点跟大家分享下: 一.ThreadPoolExector几个主要变量 先了解下ThreadPoolExector中比较重要的几个变量.  corePoolSize:核心线程数量     maximumPoolSize:最大线程数量 allowCoreThreadTimeOut:是否允许线程超时(设置为true时与keepAliveTime,TimeUnit一起起作用)

《java.util.concurrent 包源码阅读》13 线程池系列之ThreadPoolExecutor 第三部分

这一部分来说说线程池如何进行状态控制,即线程池的开启和关闭. 先来说说线程池的开启,这部分来看ThreadPoolExecutor构造方法: public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecut

Java IO 之 OutputStream源码

Writer      :BYSocket(泥沙砖瓦浆木匠) 微         博:BYSocket 豆         瓣:BYSocket FaceBook:BYSocket Twitter    :BYSocket 一.前言 上一篇<Java IO 之 InputStream源码>,说了InputStream.JDK1.0中就有了这传统的IO字节流,也就是 InputStream 和 OutputStream.梳理下两者的核心: InputStream中有几个 read() 方法和 O