ReentrantLock 的公平锁源码分析

ReentrantLock 源码分析   以公平锁源码解析为例:

1:数据结构:

维护Sync 对象的引用:   private final Sync sync;

Sync对象继承 AQS,  Sync  分为两个类:处理公平锁锁和非公平锁:

FairSync   NonfairSync

具体的类图如下:

2:接下来重点分析AQS这个类:AbstractQueuedSynchronizer:

AQS中的成员变量:

private transient volatile Node head;   //AQS维护队列的头结点

private transient volatile Node tail;     // AQS维护队列的尾结点

private volatile int state;                            // AQS 锁的状态  数量标识锁被获取的次数

下面看看NODE 结点的成员变量:

volatile int waitStatus;   //等待状态

volatile Node prev;      //前继节点

volatile Node next;      //后继节点

volatile Thread thread;   //线程对象

Node nextWaiter;       //下一个等待节点

从NODE的数据结构可以看出来,AQS里面维护的队列的数据结构是双链表的形式;

 
   

3:接下来分析 ReentrantLock  的构造方法:

ReentrantLock lock = new ReentrantLock(true);   //传入true,说明是构造公平锁

具体的构造方法如下,返回FairSync 对象:

public ReentrantLock(boolean fair) {

sync = fair ? new FairSync() : new NonfairSync();

}

4:lock方法的分析:因为是公平锁,所以调用 FairSync 下的lock方法:

final void lock() {

acquire(1);

}

Acquire 的方法如下:

public final void acquire(int arg) {    // arg=1

if (!tryAcquire(arg) &&

acquireQueued(addWaiter(Node.EXCLUSIVE), arg))

selfInterrupt();

}

接下来分两种场景来分析tryAcquire(arg)  1:同一线程第一次或N次获取锁(其他线程没有获取到锁)   2:其他线程已经获取到锁,当前线程尝试去获取锁;

//场景 1:

protected final boolean tryAcquire(int acquires) {   // acquires=1

final Thread current = Thread.currentThread();   //当前线程:main-thread

int c = getState();    //如果为第一次获取锁c=0  如果main线程已经获取过锁,则c为加锁的次数

if (c == 0) {     //当前线程第一次获取锁

if (!hasQueuedPredecessors() &&  // hasQueuedPredecessor的分析如下单独分析:

compareAndSetState(0, acquires)) {  //cas原子操作 state=1

setExclusiveOwnerThread(current);  //独占线程设置为当前线程

return true;  //返回true表明加锁成功

}

}

else if (current == getExclusiveOwnerThread()) {  // c=n 的情况下

int nextc = c + acquires;    // nextc=n+1

if (nextc < 0)

throw new Error("Maximum lock count exceeded");

setState(nextc);    //设置 state=n+1

return true;  //获取到锁,返回true

}

return false;

}

说明:hasQueuedPredecessors主要是判断当前线程所在的节点是不是CLH队列的首个位置,这个判断的目的是公平锁的公平获取锁的机制

hasQueuedPredecessors的源码如下:以该线程是第一次获取锁为例分析:

public final boolean hasQueuedPredecessors() {

Node t = tail;      // tail=null

Node h = head;    // head=null

Node s;

return h != t &&    //返回false

((s = h.next) == null || s.thread != Thread.currentThread());

}

//场景2 分析:有其他线程未释放锁(main-thread 持有锁,thread-1尝试去获取锁)

protected final boolean tryAcquire(int acquires) {   // acquires=1

final Thread current = Thread.currentThread();   //当前线程:thread-1

int c = getState();    // 由于其他持有锁 state 至少为1

if (c == 0) {

if(!hasQueuedPredecessors()&&  compareAndSetState(0, acquires)) {

setExclusiveOwnerThread(current);

return true;

}

}

else if (current == getExclusiveOwnerThread()) {  // current=thread1  // getExclusiveOwnerThread()=main

int nextc = c + acquires;

if (nextc < 0)

throw new Error("Maximum lock count exceeded");

setState(nextc);

return true;

}

return false;   //此时返回false表明尝试获取锁失败

}

5:上面的场景1 获取到锁后返回true,则lock 方法执行结束。下面分析场景2:

public final void acquire(int arg) {

if (!tryAcquire(arg) &&     //尝试获取锁失败,返回false

acquireQueued(addWaiter(Node.EXCLUSIVE), arg))

selfInterrupt();

}

!tryAcquire(arg)  返回为true;接下来进入

acquireQueued(addWaiter(Node.EXCLUSIVE), arg))  这个逻辑;

这个方法有两层处理:

1:addWaiter(Node.EXCLUSIVE)   将当前线程调价到CLH队列中

2:acquireQueued();逐步执行CLH队列中的线程,如果当前线程获取到锁则返回,否则,当前线程进行休眠,直到唤醒并重新获取到锁才返回;

以下分析这两个方法:场景:main线程获取到锁但未释放,这是线程 thread-1去获取锁:(假设此时没有其他线程在CLH队列中,即CLH队列为null)

addWaiter(Node.EXCLUSIVE)方法:入参 Node.EXCLUSIVE 为null;

private Node addWaiter(Node mode) {   //mode=null  EXCLUSIVE 标识节点为独占锁模型

Node node = new Node(Thread.currentThread(), mode); //创建新节点:新节点中线程为当前线程,节点模型为独占锁:thread= thread-1   nextWaiter= mode

// Try the fast path of enq; backup to full enq on failure

Node pred = tail;   // 此时tail=null

if (pred != null) {

node.prev = pred;

if (compareAndSetTail(pred, node)) {

pred.next = node;

return node;

}

}

enq(node);  //进入enq方法

return node;

}

Enq() 方法如下:

private Node enq(final Node node) {   //入参为上一步新建的节点

for (;;) {

Node t = tail;    // 第一次遍历逻辑 tail=null

if (t == null) { // Must initialize

if (compareAndSetHead(new Node()))  //CAS 创建表头 Head

tail = head;

} else {

node.prev = t;  //第二次遍历逻辑:node为上面新建的节点:thread= thread-1   nextWaiter= mode, node.prev指向表头 t 为表头

if (compareAndSetTail(t, node)) {  //CAS设置队列尾节点为当前node

t.next = node;   // 表头后继节点指向当前节点

return t;

}

}

}

}

该场景经过上面的处理之后 CLH队列的数据结构如下:

第一次遍历:

Head,tail节点

第二次遍历:

Head               当前线程node:设置为tail

 
   

接下来分析

acquireQueued这个方法

final boolean acquireQueued(final Node node, int arg) {  //node为当前线程节点 arg=1

boolean failed = true;

try {

boolean interrupted = false; //当前线程在休眠时,有没有被中断过

for (;;) {

final Node p = node.predecessor(); //获取前继节点,这里为head节点

if (p == head && tryAcquire(arg)) { //这里p== head为true,接下来进入

// tryAcquire(arg)这个方法,tryAcquire(arg)方法前面分析过了,这里返回false;

setHead(node);

p.next = null; // help GC

failed = false;

return interrupted;

}

//   接下来会进入以下的逻辑:下面会分析这两个方法

if (shouldParkAfterFailedAcquire(p, node) &&

parkAndCheckInterrupt())

interrupted = true;

}

} finally {

if (failed)

cancelAcquire(node);

}

}

这里再次回顾下tryAcquire(arg) 方法:返回fasle

protected final boolean tryAcquire(int acquires) {    // acquires=1

final Thread current = Thread.currentThread(); //当前线程 thread-1

int c = getState();   //state=1

if (c == 0) {

if (!hasQueuedPredecessors() &&

compareAndSetState(0, acquires)) {

setExclusiveOwnerThread(current);

return true;

}

}

else if (current == getExclusiveOwnerThread()) {  // getExclusiveOwnerThread

获取到的线程是tryAcquire(int acquires)中设置的值 这里是 main; current=thread-1

int nextc = c + acquires;

if (nextc < 0)

throw new Error("Maximum lock count exceeded");

setState(nextc);

return true;

}

return false;

}

}

1): acquireQueued(final Node node, int arg) 方法中for循环第一次执行shouldParkAfterFailedAcquire(p, node) 方法分析:源码如下:

入参:pred为前继节点,这里是head  node为当前节点

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {

int ws = pred.waitStatus;  // ws=0

if (ws == Node.SIGNAL)

return true;

if (ws > 0) {

do {

node.prev = pred = pred.prev;

} while (pred.waitStatus > 0);

pred.next = node;

} else {

compareAndSetWaitStatus(pred, ws, Node.SIGNAL);  //设置前继节点waitStatus= Node.SIGNAL,

}

return false;  //返回false;

}

For循环第二次执行 shouldParkAfterFailedAcquire(p, node)方法:

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {

int ws = pred.waitStatus;  // ws=-1   在第一次已经设置为-1

if (ws == Node.SIGNAL)   //返回true

return true;

if (ws > 0) {

do {

node.prev = pred = pred.prev;

} while (pred.waitStatus > 0);

pred.next = node;

} else {

compareAndSetWaitStatus(pred, ws, Node.SIGNAL);  //设置前继节点waitStatus= Node.SIGNAL,

}

return false;  //返回false;

}

返回true后,接下进入parkAndCheckInterrupt()这个方法:

源码分析如下:

private final boolean parkAndCheckInterrupt() {

LockSupport.park(this);  //阻塞当前线程

return Thread.interrupted(); // 返回线程的中断状态

}

LockSupport.park(this); //作用:前继线程节点的状态是 Node.SIGNAL;挂起当前线程;

Thread.interrupted(); //当前被挂起的线程被前继线程中断,返回线程的中断状态;

下面解释一个线程的行为:LockSupport.park(this)  线程被挂起:

当线程被挂起的时候唤醒的方式有两种:

1:unpark 的方式唤醒,前继节点线程使用完锁后,通过unpark方式唤醒当前线程

2:中断唤醒,其他线程通过 interrupt 中断当前线程

接下来继续分析:acquire(int arg)

public final void acquire(int arg) {

if (!tryAcquire(arg) &&

acquireQueued(addWaiter(Node.EXCLUSIVE), arg))

selfInterrupt();

}

这个时候的重点放在分析 selfInterrupt(); 这个方法上;

进入这个方法的条件是 当前线程被中断过,并且获取锁成功了;

static void selfInterrupt() {

Thread.currentThread().interrupt();  //当前线程产生一个中断,真正被唤醒

}

到此为止,ReentrantLock 的公平锁源码分析结束。

原文地址:https://www.cnblogs.com/beppezhang/p/11122200.html

时间: 2024-10-08 21:55:25

ReentrantLock 的公平锁源码分析的相关文章

ReentrantLock实现原理及源码分析

ReentrantLock是Java并发包中提供的一个可重入的互斥锁.ReentrantLock和synchronized在基本用法,行为语义上都是类似的,同样都具有可重入性.只不过相比原生的Synchronized,ReentrantLock增加了一些高级的扩展功能,比如它可以实现公平锁,同时也可以绑定多个Conditon. 可重入性/公平锁/非公平锁 可重入性 所谓的可重入性,就是可以支持一个线程对锁的重复获取,原生的synchronized就具有可重入性,一个用synchronized修饰

公平锁与非公平锁源码对比

语义: 1.公平锁:每个线程在获取锁的时候,会先检查该锁维护的等待队列,如果该队列是空或者当前线程是第一个,则占有锁,否则按照FIFO的原则,进入等待队列,等待获取锁: 2.非公平锁:当前线程在获取锁的时候,不管该锁的维护队列种是否有其它等待线程,直接CAS,如果cas失败,则再执行公平锁的那一套: 源码比较如下:公平锁: final void lock() { acquire(1); } 非公平锁: final void lock() { if (compareAndSetState(0, 1

同步锁源码分析(一)AbstractQueuedSynchronizer原理

文章转载自 AbstractQueuedSynchronizer的介绍和原理分析 建议去看一下原文的评论,会有不少收获. 简介 AbstractQueuedSynchronizer 提供了一个基于FIFO队列,可以用于构建锁或者其他相关同步装置的基础框架.该同步器(以下简称同步器)利用了一个int来表示状态,期望它能够成为实现大部分同步需求的基础.使用的方法是继承,子类通过继承同步器并需要实现它的方法来管理其状态,管理的方式就是通过类似acquire和release的方式来操纵状态.然而多线程环

ReentrantLock (重入锁) 源码浅析

一.ReentrantLock简介ReentrantLock重入锁,顾名思义,就是支持重入的锁,它表示能够支持一个线程对资源的重复加锁:我们之前学习过Synchronized锁,它也是支持重入的一种锁,参考我的另一篇Synchronized 锁的实现原理与应用,Synchronized支持隐式的重入锁,比如递归方法,在方法运行时,执行线程在获取到了锁之后仍能连续多次地获取锁:ReentrantLock虽然不能隐式重入,但是获取到锁的线程多次调用lock方法,不会阻塞进入同步队列:除此之外在获取锁

【分布式锁】06-Zookeeper实现分布式锁:可重入锁源码分析

前言 前面已经讲解了Redis的客户端Redission是怎么实现分布式锁的,大多都深入到源码级别. 在分布式系统中,常见的分布式锁实现方案还有Zookeeper,接下来会深入研究Zookeeper是如何来实现分布式锁的. Zookeeper初识 文件系统 Zookeeper维护一个类似文件系统的数据结构 image.png 每个子目录项如NameService都被称为znoed,和文件系统一样,我们能够自由的增加.删除znode,在znode下增加.删除子znode,唯一不同的在于znode是

不得不知道的golang之sync.Mutex互斥锁源码分析

针对Golang 1.9的sync.Mutex进行分析,与Golang 1.10基本一样除了将panic改为了throw之外其他的都一样.源代码位置:sync\mutex.go.可以看到注释如下: Mutex can be in 2 modes of operations: normal and starvation. In normal mode waiters are queued in FIFO order, but a woken up waiter does not own the m

【JDK源码分析】通过源码彻底理解ReentrantLock显示锁

前言ReentrantLock和synchronized一样是一个可重入的互斥锁,但ReentrantLock功能更强大,它提供了非公平和公平两种锁争用策略供使用者选择,而synchronized只有非公平一种.ReentrantLock提供了可中断的锁等待机制以及可用于多组线程需要分组唤醒的条件. 类图下面是ReentrantLock的类图,内部抽象类Sync继承了AbstractQueuedSynchronizer(以下简称AQS),公平锁FairSync.非公平锁NonfairSync继承

可重入的独占锁——ReentrantLock源码分析

ReentrantLock面试题分析 1.ReentrantLock是怎么实现的? 2.ReentrantLock的公平锁和非公平锁是如何实现的? 1.ReentrantLock类图结构 从类图我们可以直观地了解到,ReentrantLock最终还是使用AQS来实现地,并且根据参数来决定其内部是一个公平??还是非公平锁??,默认是非公平锁??. public ReentrantLock() { sync = new NonfairSync(); } public ReentrantLock(bo

【JUC】JDK1.8源码分析之ReentrantLock(三)

一.前言 在分析了AbstractQueuedSynchronier源码后,接着分析ReentrantLock源码,其实在AbstractQueuedSynchronizer的分析中,已经提到过ReentrantLock,ReentrantLock表示下面具体分析ReentrantLock源码. 二.ReentrantLock数据结构 ReentrantLock的底层是借助AbstractQueuedSynchronizer实现,所以其数据结构依附于AbstractQueuedSynchroni