[Java并发] AQS抽象队列同步器源码解析--锁获取过程

要深入了解java并发知识,AbstractQueuedSynchronizer(AQS)是必须要拿出来深入学习的,AQS可以说是贯穿了整个JUC并发包,例如ReentrantLock,CountDownLatch,CyclicBarrier等并发类都涉及到了AQS。接下来就对AQS的实现原理进行分析。

在开始分析之前,势必先将CLH同步队列了解一下

CLH同步队列

CLH自旋锁: CLH(Craig, Landin, and Hagersten locks): 是一个自旋锁,能确保无饥饿性,提供先来先服务的公平性。CLH自旋锁是一种基于隐式链表(节点里面没有next指针)的可扩展、高性能、公平的自旋锁,申请线程只在本地变量上自旋,它不断轮询前驱的状态,如果发现前驱释放了锁就结束自旋。

AQS中的CLH同步队列:AQS中CLH同步队列是对CLH自旋锁进行了优化,其主要从两方面进行了改造:节点的结构与节点等待机制。

1.在结构上引入了头结点和尾节点,他们分别指向队列的头和尾,尝试获取锁、入队列、释放锁等实现都与头尾节点相关,并且每个节点都引入前驱节点和后后续节点的引用;

2.在等待机制上由原来的自旋改成阻塞唤醒。

源码中CLH的简单表示

*      +------+  prev +-----+       +-----+
* head |      | <---- |     | <---- |     |  tail
*      +------+       +-----+       +-----+

Node就是实现CLH同步队列的数据结构,计算下就了解下该类的相关字段属性

AQS中重要的内部类Node

static final class Node {
    // 共享模式
    static final Node SHARED = new Node();
    // 独占模式
    static final Node EXCLUSIVE = null;

    // 如果属性waitStatus == Node.CANCELLED,则表明该节点已经被取消
    static final int CANCELLED =  1;
    // 如果属性waitStatus == Node.SIGNAL,则表明后继节点等待被唤醒
    static final int SIGNAL    = -1;
    // 如果属性waitStatus == Node.CONDITION,则表明是Condition模式中的节点等待条件唤醒
    static final int CONDITION = -2;
    // 如果属性waitStatus == Node.PROPAGATE,在共享模式下,传播式唤醒后继节点
    static final int PROPAGATE = -3;
    // 用于标记当前节点的状态,取值为1,-1,-2,-3,分别对应以上4个取值
    volatile int waitStatus;
    // 前驱节点
    volatile Node prev;
    // 后继节点
    volatile Node next;
    // 当前节点对应的线程,阻塞与唤醒的线程
    volatile Thread thread;
    // 使用Condtion时(共享模式下)的后继节点,在独占模式中不会使用
    Node nextWaiter;
    final boolean isShared() {
            return nextWaiter == SHARED;
    }
    final Node predecessor() throws NullPointerException {
        Node p = prev;
        if (p == null)
            throw new NullPointerException();
        else
            return p;
    }

    Node() {    // Used to establish initial head or SHARED marker
    }

    Node(Thread thread, Node mode) {     // Used by addWaiter
        this.nextWaiter = mode;
        this.thread = thread;
    }

    Node(Thread thread, int waitStatus) { // Used by Condition
        this.waitStatus = waitStatus;
        this.thread = thread;
    }
}

下面就开始着重对AQS中的重要方法进行分析说明

获取锁

1.acquire 开始获取锁

public final void acquire(int arg) {
    //如果tryAcquire返回true,即获取到锁就停止执行,否则继续向下执行向同步队列尾部添加节点
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

tryAcquire是用于获取锁的方法,在AQS中默认没有实现具体逻辑,由子类自定义实现。

如果返回true则说明获取到锁,否则需要将当前线程封装为Node节点添加到同步队列尾部。

2.当前节点入队列

将当前执行的线程封装为Node节点并加入到队尾

private Node addWaiter(Node mode) {// 首先尝试快速添加到队尾,失败再正常执行添加到队尾
    Node node = new Node(Thread.currentThread(), mode);
    // 快速方式尝试直接添加到队尾
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    // 如果快速添加到队尾失败则执行enq(node)添加到队尾
    enq(node);
    return node;
}

enq方法循环遍历添加到队尾

private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            // 添加到队列尾部
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

addWaiter(Node mode)方法执行完后,接下来执行acquireQueued方法, 返回的是该线程是否需要中断,该方法也是不停地循环获取锁,如果前节点是头节点,则尝试获取锁,获取锁成功则返回是否需要中断标志,如果获取锁失败,则判断是否需要阻塞并阻塞线程

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        // 标记是否需要被中断
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            // 如果前驱节点是头节点,并且获取锁成功,则返回
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            // 判断获取锁失败后是否需要阻塞当前线程,如果阻塞线程后再判断是否需要被中断线程
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

shouldParkAfterFailedAcquire(p, node)方法判断是否需要阻塞当前线程

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    // 如果ws == Node.SIGNAL,则说明当前线程已经准备好被唤醒,因此现在可以被阻塞,之后等待被唤醒
    if (ws == Node.SIGNAL)
        return true;
    if (ws > 0) {
        // 如果ws > 0,说明当前节点已经被取消,因此循环剔除ws>0的前驱节点
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        //如果ws<=0,则将标志位设置为Node.SIGNAL,当还不可被阻塞,需要的等待下次执行shouldParkAfterFailedAcquire判断是否需要阻塞
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

如果shouldParkAfterFailedAcquire(p, node)方法返回true,说明需要阻塞当前线程,则执行parkAndCheckInterrupt方法阻塞线程,并返回阻塞过程中线程是否被中断

private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this); // 阻塞线程,等待unpark()或interrupt()唤醒自己
    // 线程被唤醒后查看是否被中断过。
    return Thread.interrupted();
}

那么重新回到获取锁的方法acquire方法,如果acquireQueued(final Node node, int arg)返回true,也即是阻塞过程中线程被中断,则执行中断线程操作selfInterrupt()

public final void acquire(int arg) {
    //如果tryAcquire返回true,即获取到锁就停止执行,否则继续向下执行向同步队列尾部添加节点,然后判断是否被中断过,是则执行中断
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

中断当前线程

static void selfInterrupt() {
    Thread.currentThread().interrupt();
}

小结

AQS获取锁的过程:

1.执行tryAcquire方法获取锁,如果获取锁成功则直接返回,否则执行步骤2

2.执行addWaiter方法将当前线程封装位Node节点并添加到同步队列尾部,执行步骤3

3.执行acquireQueued循环尝试获取锁,,如果获取锁成功,则判断返回中断标志位,如果获取锁失败则调用shouldParkAfterFailedAcquire方法判断是否需要阻塞当前线程,如果需要阻塞线程则调用parkAndCheckInterrupt阻塞线程,并在被唤醒后再判断再阻塞过程中是否被中断过。

4.如果acquireQueued返回true,说明在阻塞过程中线程被中断过,则执行selfInterrupt中断线程

好了,以上就是AQS的锁获取过程,关于锁释放的分析会在后续继续输出。

原文地址:https://www.cnblogs.com/zyg-coding/p/12045194.html

时间: 2024-10-11 03:08:48

[Java并发] AQS抽象队列同步器源码解析--锁获取过程的相关文章

软件开发工程师(JAVA)中级考试大纲--spring源码解析

spring源码解析(1)----IOC 一.IOC容器 在Spring中,IOC容器的重要地位我们就不多说了,对于Spring的使用者而言,IOC容器实际上是什么呢?我们可以说BeanFactory就是我们看到的IoC容器,当然了Spring为我们准备了许多种IoC容器来使用,这样可以方便我们从不同的层面,不同的资源位置,不同的形式的定义信息来建立我们需要的IoC容器. 在Spring中,最基本的IOC容器接口是BeanFactory - 这个接口为具体的IOC容器的实现作了最基本的功能规定 

秋招之路8:JAVA锁体系和AQS抽象队列同步器

整个的体系图 悲观锁,乐观锁 是一个广义概念:体现的是看待线程同步的不同角度. 悲观锁 认为在自己使用数据的时候一定有别的线程来修改数据,在获取数据的时候会先加锁,确保数据不被别的线程修改. 实现:关键字synchronized,接口Lock的实现类 适用场景:写操作多,先加锁可以保证写操作时的数据正确. 乐观锁 认为自己在使用数据的时候不会有别的线程修改数据,所以不会添加锁,只是在更新数据的时候去判断之前有没有别的线程更新了这个数据. 实现:CAS算法,例如AtomicInteger类的原子自

Java并发包下锁学习第二篇Java并发基础框架-队列同步器介绍

Java并发包下锁学习第二篇队列同步器 还记得在第一篇文章中,讲到的locks包下的类结果图吗?如下图: ? 从图中,我们可以看到AbstractQueuedSynchronizer这个类很重要(在本文中,凯哥就用AQS来代替这个类).我们先来了解这个类.对这个类了解之后,学习后面的会更容易了. 本篇是<凯哥(凯哥Java:kagejava)并发编程学习>系列之<Lock系列>教程的第一篇:<Java并发包下锁学习第二篇:队列同步器>. 本文主要内容:同步器介绍:同步器

Java并发编程中线程池源码分析及使用

当Java处理高并发的时候,线程数量特别的多的时候,而且每个线程都是执行很短的时间就结束了,频繁创建线程和销毁线程需要占用很多系统的资源和时间,会降低系统的工作效率. 参考http://www.cnblogs.com/dolphin0520/p/3932921.html 由于原文作者使用的API 是1.6 版本的,参考他的文章,做了一些修改成 jdk 1.8版本的方法,涉及到的内容比较多,可能有少许错误. API : jdk1.8.0_144 ThreadPoolExecutor类 Java中线

Java并发编程之CAS二源码追根溯源

在上一篇文章中,我们知道了什么是CAS以及CAS的执行流程,在本篇文章中,我们将跟着源码一步一步的查看CAS最底层实现原理. 本篇是<凯哥(凯哥Java:kagejava)并发编程学习>系列之<CAS系列>教程的第二篇:从源码追根溯源查看CAS最底层是怎么实现的. 本文主要内容:CAS追根溯源,彻底找到CAS的根在哪里. 一:查看AtomicInteger.compareAndSet源码 通过上一篇文章学习,我们知道了AtomicInteger.compareAndSet方法不加锁

【原创】JAVA并发编程——Callable和Future源码初探

JAVA多线程实现方式主要有三种:继承Thread类.实现Runnable接口.使用ExecutorService.Callable.Future实现有返回结果的多线程.其中前两种方式线程执行完后都没有返回值,只有最后一种是带返回值的. thread和runnable不讨论了. 太多地方可以找到他们的探究了 重点看callable和future, 先上一段代码: package com.future.chenjun.test; import java.util.concurrent.Callab

Netty 解码器抽象父类 ByteToMessageDecoder 源码解析

前言 Netty 的解码器有很多种,比如基于长度的,基于分割符的,私有协议的.但是,总体的思路都是一致的. 拆包思路:当数据满足了 解码条件时,将其拆开.放到数组.然后发送到业务 handler 处理. 半包思路: 当读取的数据不够时,先存起来,直到满足解码条件后,放进数组.送到业务 handler 处理. 而实现这个逻辑的就是我们今天的主角:ByteToMessageDecoder. 看名字的意思是:将字节转换成消息的解码器.人如其名.而他本身也是一个入站 handler,所以,我们还是从他的

java常用关键词关键字,方法源码解析

transient volatile native final Integer String Class &&Object newInstance Class.forName,ClassLoader.loadClass ClassLoader .getResources(), ClassLoader.getSystemResources() ClassLoader .getResources(), ClassLoader.getSystemResources() public Enumer

pinpoint web报警机制源码解析

背景:(简述) Pinpoint 是一套APM (Application Performance Management)工具,主要用于帮助分析系统的总体结构和组件如何相互调用,也可用于追踪线上性能问题,方便定位出现问题的点. Pinpoint主要有如下几个组成部分: Pinpoint Agent :通过字节码增强技术,附加到 用户的java 应用来做采样,程序启动时指定javaagent以及agentId,pplicationName. HBase :用于存储agent采样的数据. Pinpoi