AQS源码解析(1)-CLH

目录

  • AQS解析
  • 一、简介
  • 二、同步的状态和基本属性
  • 三、入队 addWaiter
    • 3.1 基本步骤介绍
    • 3.2 addWaiter()
    • 3.3 enq(Node node)
  • 四、出队
  • 参考

AQS解析

一、简介

Provides a framework for implementing blocking locks and related synchronizers (semaphores, events, etc) that rely on first-in-first-out (FIFO) wait queues. This class is designed to be a useful basis for most kinds of synchronizers that rely on a single atomic int value to represent state. Subclasses must define the protected methods that change this state, and which define what that state means in terms of this object being acquired or released. Given these, the other methods in this class carry out all queuing and blocking mechanics. Subclasses can maintain other state fields, but only the atomically updated int value manipulated using methods getState(), setState(int) and compareAndSetState(int, int) is tracked with respect to synchronization.

AQS使用一个先进先出的同步队列实现,为相关的锁和同步锁提供基本框架。它使用整数进行状态的表示以此来为同步器提供可用的基本骨架。子类通过重写AQS中受保护的方法进而实现锁的释放和获取,通过这种形式子类就能具有完整的入队和锁机制。

不得不提出的AQS中使用的先进先出的同步队列实现的理论来自于CLH,也就是说将AQS基本就是在讲CLH的实现方式

The wait queue is a variant of a "CLH" (Craig, Landin, and* Hagersten) lock queue. CLH locks are normally used for* spinlocks.

二、同步的状态和基本属性

static final class Node {
    /** 共享模式 */
    static final Node SHARED = new Node();
    /** 独占模式 */
    static final Node EXCLUSIVE = null;

    /** 标明当前线程已经被取消 */
    static final int CANCELLED =  1;
    /** 线程的下一个等待线程需要被唤醒 */
    static final int SIGNAL    = -1;
    /** 当前线程正在等待中 */
    static final int CONDITION = -2;
    /** 下一次的acquire方法应该被无条件的传播*/
    static final int PROPAGATE = -3;

    /** 当前等待状态*/
    volatile int waitStatus;

    /** 前驱节点*/
    volatile Node prev;

    /** 与上面类似         */
    volatile Node next;

    /** 当前node持有的线程,在构造器中初始化,在退出队列后被置为null*/
    volatile Thread thread;

    /** 指向当前节点的后面第一个处于CODITION状态的节点,或者为SHARED,只有对于独占式才会有CODTION节点的存         *在,对于共享式的其nextWaiter为SHARED(变量)
         */
    Node nextWaiter;

    /**
         * Returns true if node is waiting in shared mode.
         */
    final boolean isShared() {
        return nextWaiter == SHARED;
    }

    /** 返回前驱节点,添加一层封装*/
    final Node predecessor() throws NullPointerException {
        Node p = prev;
        if (p == null)
            throw new NullPointerException();
        else
            return p;
    }

    Node() {    // 用于创建出事头结点和SHARED标志的构造器
    }

    Node(Thread thread, Node mode) {     // Used by addWaiter
        this.nextWaiter = mode;
        this.thread = thread;
    }

    Node(Thread thread, int waitStatus) { // Used by Condition
        this.waitStatus = waitStatus;
        this.thread = thread;
    }
}

同步的状态分为**5种,0为INITIAL,1为CANCELLED,-1为SINGAL,-2为CONDITION,-3为PROPAGETE**,这五种状态的作用和状态信息如下

  • SIGNAL:该节点的后继结点已经通过LockSupport.part()方法阻塞,当前节点在被释放或者被删除后需要唤醒它的后继节点,为避免线程之间的竞争,获取资源acquire的所有方法都应该设置SIGNAL标志,然后重新进行原子性的获取操作,如果获取失败,就阻塞
  • CANCELLED:节点因为超时或者中断进入CANCELLED状态,节点如果进入该状态就不会再转为其他状态,该状态的线程不会被阻塞
  • CONDITION:该节点处于等待队列中,他不会作为同步队列中的普通节点使用(也就是不会被前驱节点唤醒或unpark()),除非他的状态被设置为0INITIAL
  • PROPAGETE:共享锁的释放(releaseShared)应该被传递到其他节点。在doReleaseShared中用来保证头结点一定会继续传播信息
  • INITIAL:初始状态或者说是中间状态

这几类状态可以用更简单的被区分:如果>0就是CANCELLED<=0就是可以使用的状态

CLH同步队列,结构图如下

  • prev为node的前驱节点,next为node的后驱节点

nextWaiter字段,保存的是同步状态的模式(Mode),tryAcquire(int)tryAcquireShared(int)方法通过独占方式或者共享方式进行状态获取,如果失败就调用addWaiter(Node mode)的方式进行入队。nextWaiter用于表示当前处于那种形式

  • SHARED 枚举共享模式,值为new Node(),这个值是唯一的,使用static进行修饰
  • EXCLUSIVE 枚举独占模式,值为null

#predecessor() 方法,获得 Node 节点的前一个 Node 节点。在方法的内部,Node p = prev 的本地拷贝,是为了避免并发情况下,prev 判断完 == null 时,恰好被修改,从而保证线程安全。

三、入队 addWaiter

3.1 基本步骤介绍

  1. 生成新的节点node
  2. 将新节点node的前驱指向原来的尾节点tail
    1. 通过UNSAFE设置尾节点tail为新的节点node
    2. 设置倒数第二个节点也就是原来的尾节点old tail的后驱节点next为新的尾节点node
  3. 如果失败通过enq再次进行重试
    1. 如果头结点为空,将新节点node设置到头结点
    2. 否者进行类似第2步的操作

使用图形化的形式来描述入队的问题

3.2 addWaiter()

深入到addWaiter()源码进行查看

private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // 先进行一次简单的入队尝试
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }

3.3 enq(Node node)

enq方法部分方法和addWaiter中一致,主要不同是当考虑到节点未进行初始化时需要将当前节点设置为初始化节点head node

private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // 如果头部不存在就进行初始化
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;//该步骤和3.2中的步骤类似
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

四、出队

CLH 同步队列遵循 FIFO,首节点的线程释放同步状态后,将会唤醒它的下一个节点(Node.next)。而后继节点将会在获取同步状态成功时,将自己设置为首节点( head )。

这个过程非常简单,head 执行该节点并断开原首节点的 next 和当前节点的 prev 即可。注意,在这个过程是不需要使用 CAS 来保证的,因为只有一个线程,能够成功获取到同步状态。

过程图如下:

#setHead(Node node) 方法,实现上述的出列逻辑。代码如下:

private void setHead(Node node) {
    head = node;    node.thread = null;    node.prev = null;
                                }

参考

原文地址:https://www.cnblogs.com/Heliner/p/11575224.html

时间: 2024-07-31 17:36:21

AQS源码解析(1)-CLH的相关文章

AbstractQueuedSynchronizer(AQS)源码解析(一)

在JDK1.5版本,新增了并发包,其中包含了显示锁.并发容器.在这些锁和容器里,都有同步器(AQS)的身影.为了更好地理解JDK的并发包,我会用三个主题来详细描述AbstractQueuedSynchronizer的实现. 在AQS中,涉及到同步队列以及Condition对象,这也是我为什么要用三个主题来讲述的原因.本节将主要讲述同步队列,后面两节会分别讲述Condition对象以及AQS的主要功能实现. AQS同步队列的主要功能是将无法获得资源的线程放入同步队列中,进行等待,它是通过链表来

AQS源码解析

文大篇幅引用自HongJie大佬的一行一行源码分析清楚AbstractQueuedSynchronizer,这只是一篇简单的个人整理思路和总结(倒垃圾),如果觉得有些难懂的话,不要犹豫也不要疑惑,很明显是我这篇文章的问题,不是你的问题,这时你最好直接转去看HongJie大佬的原文,那个会好懂很多.还是看不懂的话建议隔一段时间再看,然后像我一样写(复制)一篇总结捋一下思路,加油! AQS 结构 属性 private transient volatile Node head; private tra

Java 并发之AbstractQueuedSynchronizer(AQS)源码解析

关键字:CLH,Node,线程,waitStatus,CAS,中断 目录 图解AQS的操作细节 0.前言 1.基本概念 1.1.CAS自旋 1.2.Node 1.3.CLH & AQS 1.4.ReentrantLock 2.图解AQS 2.1.线程A单独运行 2.2.线程B开始运行 2.3.线程C开始运行 2.4.线程A停止运行,线程B继续运行 2.5.1.线程B停止运行,线程C继续运行 2.5.2.线程C放弃竞争 3.问题总结 3.1.为什么在unparkSuccessor操作中从尾节点开始

AQS源码解析(一)-AtomicBoolean源码解析

基本类: AtomicInteger AtomicLong AtomicBoolean 数组类型: AtomicIntegerArray AtomicLongArray AtomicReferenceArray 介绍 由于在多线程条件下,如果对共享变量修改容易造成数据不一致的情况,所以对于共享变量需要保证线程安全有有如下几种方式: 使用lock或者synchronized进行同步共享变量 使用CAS方法来保证修改变量为原子性操作 该类为后者,基于CAS方式修改具有原子性. 实现原理 将boole

源码解析之AQS源码解析

要理解Lock首先要理解AQS,而要理解并发类最好的方法是先理解其并发控制量不同值得含义以及该类运作流程,然后配合一步步看源码.该类有一个重要的控制量是WaitStates. /** waitStatus value to indicate thread has cancelled */ static final int CANCELLED = 1; //该节点被取消了 /** waitStatus value to indicate successor's thread needs unpar

AbstractQueuedSynchronizer源码解析

1.简介 AbstractQueuedSynchronizer队列同步器,用来实现锁或者其他同步组件的基础框架 AbstractQueuedSynchronizer使用int类型的volatile变量维护同步状态 一般使用AQS的主要方式是继承,子类通过实现它提供的抽象方法来管理同步状态,主要管理的方式是通过tryAcquire和tryRelease类似的方法来操作状态,同时,AQS提供以下线程安全的方法来对状态进行操作 protected final int getState() { retu

死磕 java同步系列之StampedLock源码解析

问题 (1)StampedLock是什么? (2)StampedLock具有什么特性? (3)StampedLock是否支持可重入? (4)StampedLock与ReentrantReadWriteLock的对比? 简介 StampedLock是java8中新增的类,它是一个更加高效的读写锁的实现,而且它不是基于AQS来实现的,它的内部自成一片逻辑,让我们一起来学习吧. StampedLock具有三种模式:写模式.读模式.乐观读模式. ReentrantReadWriteLock中的读和写都是

FutureTask 源码解析

FutureTask 源码解析 版权声明:本文为本作者原创文章,转载请注明出处.感谢 码梦为生| 刘锟洋 的投稿 站在使用者的角度,future是一个经常在多线程环境下使用的Runnable,使用它的好处有两个:1. 线程执行结果带有返回值2. 提供了一个线程超时的功能,超过超时时间抛出异常后返回. 那,怎么实现future这种超时控制呢?来看看代码: FutureTask的实现只是依赖了一个内部类Sync实现的,Sync是AQS (AbstractQueuedSynchronizer)的子类,

JDK 源码解析 —— Executors ExecutorService ThreadPoolExecutor 线程池

零. 简介 Executors 是 Executor.ExecutorService.ThreadFactory.Callable 类的工厂和工具方法. 一. 源码解析 创建一个固定大小的线程池:通过重用共享无界队列里的线程来减少线程创建的开销.当所有的线程都在执行任务,新增的任务将会在队列中等待,直到一个线程空闲.由于在执行前失败导致的线程中断,如果需要继续执行接下去的任务,新的线程会取代它执行.线程池中的线程会一直存在,除非明确地 shutdown 掉. public static Exec