Java 多线程与并发(六):AQS

我们前面几张提到过,JUC 这个包里面的工具类的底层就是使用 CAS 和 volatile 来保证线程安全的,整个 JUC 包里面的类都是基于它们构建的。今天我们介绍一个非常重要的同步器,这个类是 JDK 在 CAS 和 volatile 的基础上为我们提供的一个同步工具类。

背景

AbstractQueuedSynchronizer,JDK 1.5 引入了 JUC 包,这个包提供了一些列支持并发的组件,这些组件是一些列同步器,他们主要完成以下功能:

  • 内部状态的管理和更新,比如表示一个锁的状态是获取还是释放。
  • 线程同步状态阻塞。
  • 线程同步状态释放。

AQS 是一个小框架,基于这个框架我们可以实现很多的同步器,ReentrantLock,CountDownLatch,Semaphore 等都是基于 AQS 实现的。

功能

  • 独占锁:每次只有一个线程能够持有锁,比如前面给大家演示的 ReentrantLock 就是以独占方式实现的互斥锁。
  • 共享锁:允许多个线程同时获取锁,并发访问共享资源,比如 ReentrantReadWriteLock。

设计思想

同步器的核心方法是 acquire 和 release 操作。

acquire

while(当前同步器的状态不允许获取操作){

? 如果当前线程不再队列中,将其加入队列

? 阻塞当前线程

}

线程如果位于队列中,将其移出队列

release

更新同步器的状态

if(新的状态允许某个被阻塞的线程获取成功)

? 解除队列中一个或多个线程的阻塞状态。

从上面的操作思想中我们可以提出三大关键操作:同步器状态变更,线程阻塞和释放,插入和移出队列。由此可以引申出三个基本组件:

  • 同步器状态的原子性管理
  • 线程阻塞与解除阻塞
  • 队列的管理

同步状态
AQS 类使用 int 值来保存同步状态,并且暴露出 getState,setState 和 compareAndSet 操作来读取和更新这个同步状态。线程通过修改(加/减指定的数量)码是否成功来决定当前线程是否成功获取到同步状态。

State 被声明成了 volatile,保证了可见性和有序性。又通过 CAS 指令来实现 compareAndSet ,使得当且仅当同步状态拥有一个一致的期望值的时候,才会被原子地设置成新值,这样就保证了同步状态的原子性。

阻塞
直到 JSR166,阻塞线程和解除线程阻塞都是基于 Java 的内置管程。

JUC 包使用 LockSupport 类来解决这个问题。LockSupport.park 阻塞当前线程直到有 LockSupport.unpark 方法被调用。

队列
整个框架的核心就是如何管理线程阻塞队列,该队列是严格的 FIFO 队列,因此不支持线程优先级的同步。同步队列的最佳选择是自身没有使用底层锁来构造的非阻塞数据结构。这里采用了 CLH 锁。

CLH队列实际并不那么像队列,它的入队和出队与实际的业务密切相关。它是一个链表队列。用过 AQS 的两个字段 head(头节点) 和 tail(尾节点)来存取,这两个字段初始化的时候都指向了一个空节点。
入队操作:

CLH 队列是 FIFO 队列,所以新的节点来到的时候,是要插入到当前队列的尾节点之后。当一个线程获取到同步状态之后,其他线程无法获取,转而被构造成节点加入到同步队列中,而且这个加入队列的过程必须要保证线程安全,因此使用了 CAS方法,它需要传递当前线程认为的尾节点和当前节点,只有设置成功后,当前节点才正式与之前的尾节点建立关联。

出队操作:

因为是 FIFO 队列,所以能成功获取到 AQS 同步状态的必定是首节点,首节点的线程在释放同步状态时,会唤醒后续节点,而后续节点会在获取 AQS 同步状态成功的时候将自己设置为首届点。设置首节点是由获取同步成功的线程来完成的,所以不需要像入队这样的 CAS 操作。

条件队列

上一节是 AQS 的同步队列,这一节是条件队列。AQS 只有一个同步队列,但是可以有多个条件队列。AQS 框架提供了一个 ConditionObject 类,给维护独占同步的类以及实现 Lock 接口的类使用。

ConditionObject 类 和 AQS 共用了内部节点,有自己单独的条件队列。Singal 操作是通过将节点从条件队列转移到同步队列来实现的。
singal:

await:

方法结构

组件 数据结构
同步状态 volatile int state
阻塞 LockSupport类
队列 Node节点
条件队列 ConditionObject

源代码

我们通过独占式同步状态的释放和获取,以及共享式同步状态的释放和获取来看看 AQS 是如何实现的。

独占式

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

上述代码主要完成了同步状态的获取,节点构造,加入同步队列以及在同步队列中自旋等待等相关工作。

  1. 调用子类实现的 tryAcquire 方法,该方法保证线程安全同时获取同步状态。
  2. 获取同步状态失败,则构造独占式同步节点。
  3. 通过 addWriter 将该节点加入到同步队列的尾部。
  4. 最后通过 acquireQueued 方法,使得该节点以自选的方式获取同步状态。

来看看节点构造和加入队列的实现:

private Node addWaiter(Node mode) {
        // 当前线程构造成Node节点
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        // 尝试快速在尾节点后新增节点 提升算法效率 先将尾节点指向pred
        Node pred = tail;
        if (pred != null) {
            //尾节点不为空  当前线程节点的前驱节点指向尾节点
            node.prev = pred;
            //并发处理 尾节点有可能已经不是之前的节点 所以需要CAS更新
            if (compareAndSetTail(pred, node)) {
                //CAS更新成功 当前线程为尾节点 原先尾节点的后续节点就是当前节点
                pred.next = node;
                return node;
            }
        }
        //第一个入队的节点或者是尾节点后续节点新增失败时进入enq
        enq(node);
        return node;
    }
private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                //尾节点为空  第一次入队  设置头尾节点一致 同步队列的初始化
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                //所有的线程节点在构造完成第一个节点后 依次加入到同步队列中
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

节点进入同步队列后,就进入了一个自旋的过程,每个线程节点都在自旋地观察,当条件满足,获取到了同步状态,就可以从自旋过程中退出,否则依旧自旋。

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            //获取当前线程节点的前驱节点
            final Node p = node.predecessor();
            //前驱节点为头节点且成功获取同步状态
            if (p == head && tryAcquire(arg)) {
                //设置当前节点为头节点
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            //是否阻塞
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

shouldParkAfterFailedAcquire 和 parkAndCheckInterrupt 阻塞线程的过程。

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        //前驱节点的状态决定后续节点的行为
     int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            /*前驱节点为-1 后续节点可以被阻塞
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             */
            return true;
        if (ws > 0) {
            /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /*前驱节点是初始或者共享状态就设置为-1 使后续节点阻塞
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don't park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             */
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }
private final boolean parkAndCheckInterrupt() {
        //阻塞线程
        LockSupport.park(this);
        return Thread.interrupted();
    }


当获取同步状态成功之后,对于锁这种并发组件而言,就意味着当前线程获取到了锁。

再看 release 方法:

head节点表示获取锁成功的节点,当头结点在释放同步状态时,会唤醒后继节点,如果后继节点获得锁成功,会把自己设置为头结点,节点的变化过程如下。修改head节点指向下一个获得锁的节点,新的获得锁的节点,将prev的指针指向null。

public final boolean release(int arg) {
        if (tryRelease(arg)) {//同步状态释放成功
            Node h = head;
            if (h != null && h.waitStatus != 0)
                //直接释放头节点
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
private void unparkSuccessor(Node node) {
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        /*寻找符合条件的后续节点
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            //唤醒后续节点
            LockSupport.unpark(s.thread);
    }

总结:在获取同步状态时,同步器维护一个同步队列,获取状态失败的线程都会被加入到队列中进行自旋。移除的条件是前驱节点是头节点并且成功获取了同步状态。释放时,会唤醒头节点的后继节点。

应用

ReentrantLock:ReentrantLock 类使用 AQS 同步状态来保存锁重复持有的次数。当锁被一个线程获取时,ReentrantLock 也会记录下当前获得锁的线程表示,以便检查是否重复获取。

ReentrantReadWriteLock:ReentrantReadWriteLock 使用 AQS 同步状态中的 16 为来保存写锁的持有次数,剩下的 16 为来保存读锁的持有次数。WriteLock 的构建方式和 ReentrantLock 一样。ReadLock 则通过使用 acquireShared 方法来支持同时允许多个读线程。

Semaphore:信号量使用 AQS 同步状态来保存信号量当前计数。它里面定义的 acquireShared 方法会减少计数,当计数为非正值时阻塞线程。tryRelease 会增加技术,在计数为正值时还要解除线程的阻塞。

CountDownLatch:使用 AQS 同步状态来表示计数。当该计数为 0 时,所有的 acquire 方法才能通过。

FutureTask:使用 AQS 的同步状态来表示某个异步计算任务的运行状态(初始化,运行中,被取消和完成)。设置(FutureTask 的 set 方法)或取消(FutureTask 的 cancel 方法)一个 FutureTask 时会调用 AQS 的 release 操作。等待计算结果的线程阻塞解除是通过 AQS 的 acquire 实现的。

SynchronousQueues:SynchronousQueues类使用了内部的等待节点,这些节点可以用于协调生产者和消费者。同时,它使用AQS同步状态来控制当某个消费者消费当前一项时,允许一个生产者继续生产,反之亦然。

流程图

  1. 多线程并发修改同步状态,修改成功的线程标记为拥有同步状态。

  2. 获取失败的线程,加入到同步队列的队尾;加入到队列中后,如果当前节点的前驱节点为头节点再次尝试获取同步状态(下文代码:p == head && tryAcquire(arg))。

  3. 如果头节点的下一个节点尝试获取同步状态失败后,会进入等待状态;其他节点则继续自旋。

  4. 当线程执行完相应逻辑后,需要释放同步状态,使后继节点有机会同步状态(让出资源,让排队的线程使用)。这时就需要调用release(int arg)方法。调用该方法后,会唤醒后继节点。

  5. 后继节点获取同步状态成功,头节点出队。需要注意的事,出队操作是间接的,有节点获取到同步状态时,会将当前节点设置为head,而原本的head设置为null。

  6. 当同步队列中头节点唤醒后继节点时,此时可能有其他线程尝试获取同步状态。

  7. 假设获取成功,将会被设置为头节点。

  8. 头节点后续节点获取同步状态失败。

  9. 共享模式和独占模式最主要的区别是在支持同一时刻有多个线程同时获取同步状态。为了避免带来额外的负担,在上文中提到的同步队列中都是用独占模式进行讲述,其实同步队列中的节点应该是独占和共享节点并存的。

  10. 共享节点尝试获取同步状态。

  11. 当一个同享节点获取到同步状态,并唤醒后面等待的共享状态的结果如下图所示:

  12. 最后,获取到同步状态的线程执行完毕,同步队列中只有一个独占节点:

总结

  1. AQS通过一个int同步状态码,和一个(先进先出)队列来控制多个线程访问资源
  2. 支持独占和共享两种模式获取同步状态码
  3. 当线程获取同步状态失败会被加入到同步队列中
  4. 当线程释放同步状态,会唤醒后继节点来获取同步状态
  5. 共享模式下的节点获取到同步状态或者释放同步状态时,不仅会唤醒后继节点,还会向后传播,唤醒所有同步节点
  6. 使用volatile关键字保证状态码在线程间的可见性,CAS操作保证修改状态码过程的原子性。

原文地址:https://www.cnblogs.com/paulwang92115/p/12168023.html

时间: 2024-07-31 08:58:45

Java 多线程与并发(六):AQS的相关文章

JAVA多线程和并发性知识点总结

转载请注明出处:http://blog.csdn.net/zhoubin1992/article/details/46861397 上次我总结了一份JAVA 面向对象和集合知识点总结: http://blog.csdn.net/zhoubin1992/article/details/46481759 受到了博友们的激励,这次对JAVA多线程和并发性相关知识点进行总结,方便各位博友学习以及自己复习之用. 一. 什么是进程.线程?线程和进程的区别? 1. 进程 当一个程序进入内存运行时,即变成一个进

Java多线程视频教程并发编程面试知识

课程目录:  1-1.并发编程入门到实战课程简介1-2.什么是并发编程1-3.并发编程的挑战之频繁的上下文切换1-4.并发编程的挑战之死锁1-5.并发编程的挑战之线程安全1-6.并发编程的挑战之资源限制2-1.进程与线程的区别2-2.线程的状态及其相互转换2-3.创建线程的方式(上)2-4.创建线程的方式(下)2-5.线程的挂起及其恢复2-6.线程的中断操作2-7.线程的优先级2-8.守护线程3-1.什么是线程安全性3-2.从字节码角度剖析线程不安全操作3-3.原子性操作3-4.深入理解sync

JAVA多线程和并发基础面试问答【转】

JAVA多线程和并发基础面试问答 多线程和并发问题是Java技术面试中面试官比较喜欢问的问题之一.在这里,从面试的角度列出了大部分重要的问题,但是你仍然应该牢固的掌握Java多线程基础知识来对应日后碰到的问题.(校对注:非常赞同这个观点) Java多线程面试问题 1. 进程和线程之间有什么不同? 一个进程是一个独立(self contained)的运行环境,它可以被看作一个程序或者一个应用.而线程是在进程中执行的一个任务.Java运行环境是一个包含了不同的类和程序的单 一进程.线程可以被称为轻量

JAVA 多线程和并发学习笔记(三)

Java并发编程中使用Executors类创建和管理线程的用法 1.类 Executors Executors类可以看做一个“工具类”.援引JDK1.6 API中的介绍: 此包中所定义的 Executor.ExecutorService.ScheduledExecutorService.ThreadFactory 和 Callable 类的工厂和实用方法.此类支持以下各种方法: 创建并返回设置有常用配置字符串的 ExecutorService 的方法. 创建并返回设置有常用配置字符串的 Sche

Java多线程与并发应用-(5)-如何优雅的使用ThreadLocal类

内容来自,张孝祥老师的张孝祥-Java多线程与并发库高级应用>视频教程 package com.lipeng; public class MyThreadLocalTest2 { private final static MyThreadLocal<String> myThreadLocal=new MyThreadLocal<String>(); /** * @param args */ public static void main(String[] args) { ne

Java多线程与并发---学习总结(很详细)

Java多线程与并发---学习总结(很详细) 1.      计算机系统 使用高速缓存来作为内存与处理器之间的缓冲,将运算需要用到的数据复制到缓存中,让计算能快速进行:当运算结束后再从缓存同步回内存之中,这样处理器就无需等待缓慢的内存读写了. 缓存一致性:多处理器系统中,因为共享同一主内存,当多个处理器的运算任务都设计到同一块内存区域时,将可能导致各自的缓存数据不一致的情况,则同步回主内存时需要遵循一些协议. 乱序执行优化:为了使得处理器内部的运算单位能尽量被充分利用. 2.      JAVA

17、JAVA多线程和并发基础面试问答

JAVA多线程和并发基础面试问答 原文链接:http://ifeve.com/java-multi-threading-concurrency-interview-questions-with-answers/ 多线程和并发问题是Java技术面试中面试官比较喜欢问的问题之一.在这里,从面试的角度列出了大部分重要的问题,但是你仍然应该牢固的掌握Java多线程基础知识来对应日后碰到的问题.(校对注:非常赞同这个观点) Java多线程面试问题 1. 进程和线程之间有什么不同? 一个进程是一个独立(se

Java多线程与并发库高级应用之公共屏障点CyclicBarrier

一个小队去登山,每位队员登山的速度不同.山上有几个集合点,在每一集合点处,先到达的队员只有等后面的队员全部到达集合点后才能继续向下一个集合点出发. JDK1.5提供的CyclicBarrier模拟了这种情况.每一个线程相当于一个登山队员,CyclicBarrier相当于山上的集合点.只有等所有线程都执行到了CyclicBarrier后才可以继续向下执行. CyclicBarrier允许一组线程互相等待,直到到达某个公共屏障点 (common barrier point).在涉及一组固定大小的线程

Java多线程与并发库高级应用之信号量Semaphore

JDK1.5提供了一个计数信号量Semaphore类.Semaphore 通常用于限制可以访问某些资源(物理或逻辑的)的线程数目,并提供了同步机制. Semaphore提供了两个构造器来创建对象: 1)Semaphore(int permits):创建具有给定的许可数和非公平的公平设置的Semaphore. 2)Semaphore(int permits, boolean fair):创建具有给定的许可数和给定的公平设置的Semaphore.如果此信号量保证在争用时按先进先出的顺序授予许可,则为

Java多线程与并发库高级应用之阻塞队列BlockingQueue

JDK1.5提供了阻塞队列接口BlockingQueue,它是一个有界阻塞队列.BlockingQueue实现是线程安全的,可以安全地与多个生产者和多个使用者一起使用. 使用时用其实现类 ArrayBlockingQueue,它一个由数组支持的有界阻塞队列.此队列按 FIFO(先进先出)原则对元素进行排序.队列的头部 是在队列中存在时间最长的元素.队列的尾部是在队列中存在时间最短的元素.新元素插入到队列的尾部,队列获取操作则是从队列头部开始获得元素. 这是一个典型的"有界缓存区",固定