AQS与重入锁ReetrantLock原理

一、AQS原理

AQS(AbstractQueuedSynchronizer)队列同步器是用来构建锁、同步组件的基础框架。

AQS内部通过一个volatile int类型的成员变量state控制同步状态【0代表锁未被占用,1表示已占用】,通过内部类Node构成FIFO的同步队列实现等待获取锁的线程排队工作,通过内部类ConditionObject构建条件等待队列,来完成等待条件线程的排队工作。当线程调用Condition对象的wait方法后会被加入等待队列中,当有线程调用Condition的signal方法后,线程将从等待队列移动到同步队列进行锁竞争。AQS内部会有一个同步队列和可能多个等待队列,前者存放等待获取锁的线程,后者分别存放等待不同条件的线程。

public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer{
//指向同步队列队头
private transient volatile Node head;

//指向同步的队尾
private transient volatile Node tail;

//同步状态,0代表锁未被占用,1代表锁已被占用
private volatile int state;

static final class Node {
    //共享模式
    static final Node SHARED = new Node();
    //独占模式
    static final Node EXCLUSIVE = null;

    //标识线程已处于结束状态
    static final int CANCELLED =  1;
    //等待被唤醒状态
    static final int SIGNAL    = -1;
    //条件状态,
    static final int CONDITION = -2;
    //在共享模式中使用表示获得的同步状态会被传播
    static final int PROPAGATE = -3;

    //等待状态,存在CANCELLED、SIGNAL、CONDITION、PROPAGATE 4种
    volatile int waitStatus;

    //同步队列中前驱结点
    volatile Node prev;

    //同步队列中后继结点
    volatile Node next;

    //请求锁的线程
    volatile Thread thread;

    //等待队列中的后继结点,这个与Condition有关,稍后会分析
    Node nextWaiter;

    //判断是否为共享模式
    final boolean isShared() {
        return nextWaiter == SHARED;
    }

    //获取前驱结点
    final Node predecessor() throws NullPointerException {
        Node p = prev;
        if (p == null)
            throw new NullPointerException();
        else
            return p;
    }

    //.....
}//上面Node head、tail对象构建同步队列,这里用ConditionObject类的对象创建条件等待队列
class ConditionObject implements Condition, java.io.Serializable {
    //等待队列第一个等待结点
    private transient Node firstWaiter;
    //等待队列最后一个等待结点
    private transient Node lastWaiter;
    //省略其他代码.......
}

//AQS中的模板方法,由其子类实现
 //独占模式下获取锁的方法
    protected boolean tryAcquire(int arg) {
        throw new UnsupportedOperationException();
    }

    //独占模式下解锁的方法
    protected boolean tryRelease(int arg) {
        throw new UnsupportedOperationException();
    }

    //共享模式下获取锁的方法
    protected int tryAcquireShared(int arg) {
        throw new UnsupportedOperationException();
    }

    //共享模式下解锁的方法
    protected boolean tryReleaseShared(int arg) {
        throw new UnsupportedOperationException();
    }
    //判断是否为持有独占锁
    protected boolean isHeldExclusively() {
        throw new UnsupportedOperationException();
    }
}

二、AQS的应用

AQS通过state状态管理、同步队列、等待队列实现了多线程同步锁获取与释放,多线程并发排队、条件等待等复杂功能。

作为基础组件,它对锁的两种模式【独占模式和共享模式】都提供支持。设计上AQS采用模板方法模式构建,其内部提供了并发操作的核心方法、而将一些实现不同模式下实现可能有差异的操作定义为模板方法,让其子类实现。如ReentrantLock通过内部类Sync及其子类继承AQS实现tryAcuire()和tryRelease()方法来实现独占锁,而SemaPhore则通过内部类继承AQS实现tryAcquireShared()方法和tryReleaseShared()方法实现共享模式锁。AQS的继承关系图如下:

三、ReetrantLock非公平锁实现分析AQS的用法

ReetrantLock中非公平锁
//加锁操作
public void lock() {
     sync.lock();
}

/**
 * 非公平锁实现sync.lock()
 */
static final class NonfairSync extends Sync {
    //加锁
    final void lock() {
        //执行CAS操作,获取同步状态
        if (compareAndSetState(0, 1))
       //成功则将独占锁线程设置为当前线程
          setExclusiveOwnerThread(Thread.currentThread());
        else
            //否则再次请求同步状态
            acquire(1);
    }
}

//acquire为AQS本身实现的方法,其实现如下:
public final void acquire(int arg) {
    //再次尝试获取同步状态
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

//AQS子类Sync的子类NonfairSync中实现的tryAcquire方法
    protected final boolean tryAcquire(int acquires) {
         return nonfairTryAcquire(acquires);
     }

  //nonfairTryAcquire方法
  final boolean nonfairTryAcquire(int acquires) {
      final Thread current = Thread.currentThread();
      int c = getState();
      //判断同步状态是否为0,并尝试再次获取同步状态
      if (c == 0) {
          //执行CAS操作
          if (compareAndSetState(0, acquires)) {
              setExclusiveOwnerThread(current);
              return true;
          }
      }
      //如果当前线程已获取锁,属于重入锁,再次获取锁后将status值加1
      else if (current == getExclusiveOwnerThread()) {
          int nextc = c + acquires;
          if (nextc < 0) // overflow
              throw new Error("Maximum lock count exceeded");
          //设置当前同步状态,当前只有一个线程持有锁,因为不会发生线程安全问题,可以直接执行 setState(nextc);
          setState(nextc);
          return true;
      }
      return false;
  }

//AQS本身实现的方法,将当前获取锁失败的线程构造成node结点加入的同步队列尾部,若队列为空或者并发入队失败,则调用enq方法重试。
private Node addWaiter(Node mode) {
    //将请求同步状态失败的线程封装成结点
    Node node = new Node(Thread.currentThread(), mode);

    Node pred = tail;
    //如果是第一个结点加入肯定为空,跳过。
    //如果非第一个结点则直接执行CAS入队操作,尝试在尾部快速添加
    if (pred != null) {
        node.prev = pred;
        //使用CAS执行尾部结点替换,尝试在尾部快速添加
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    //如果第一次加入或者CAS操作没有成功执行enq入队操作
    enq(node);
    return node;
}
//AQS本身实现方法,通过循环CAS操作将当前线程构造的node结点入队,解决上面队列为空或者是并发入队失败的情况;
private Node enq(final Node node) {
    //死循环
    for (;;) {
         Node t = tail;
         //如果队列为null,即没有头结点
         if (t == null) { // Must initialize
             //创建并使用CAS设置头结点
             if (compareAndSetHead(new Node()))
                 tail = head;
         } else {//队尾添加新结点
             node.prev = t;
             if (compareAndSetTail(t, node)) {
                 t.next = node;
                 return t;
             }
         }
     }
    }
//AQS本身方法,队列中结点循环观察,当自己的前驱是head结点执行的结点时尝试获取锁,若成功将其设置为头结点,否则循环尝试;若当前结点前驱结点不是头结点,则在设置其前驱结点状态后将自己挂起
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        //自旋,死循环
        for (;;) {
            //获取前驱结点
            final Node p = node.predecessor();
            当且仅当p为头结点才尝试获取同步状态
            if (p == head && tryAcquire(arg)) {
                //将node设置为头结点
                setHead(node);
                //清空原来头结点的引用便于GC
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            //如果前驱结点不是head,判断是否挂起线程
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            //最终都没能获取同步状态,结束该线程的请求
            cancelAcquire(node);
    }
}
//设置为头结点
private void setHead(Node node) {
        head = node;
        //清空结点数据
        node.thread = null;
        node.prev = null;
}
//如果前驱结点不是head,判断是否挂起线程
if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())

      interrupted = true;
}

//AQS本身的方法,若前驱结点状态为Node.SIGNAL则返回true,表示可以挂起当前结点,否则找到非结束状态的前驱结点,并设置其状态后,返回false
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        //获取当前结点的等待状态
        int ws = pred.waitStatus;
        //如果为等待唤醒(SIGNAL)状态则返回true
        if (ws == Node.SIGNAL)
            return true;
        //如果ws>0 则说明是结束状态,
        //遍历前驱结点直到找到没有结束状态的结点
        if (ws > 0) {
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            //如果ws小于0又不是SIGNAL状态,
            //则将其设置为SIGNAL状态,代表该结点的线程正在等待唤醒。
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }
//AQS本身方法,挂起当前线程并检查其中断状态
private final boolean parkAndCheckInterrupt() {
        //将当前线程挂起
        LockSupport.park(this);
        //获取线程中断状态,interrupted()是判断当前中断状态,
        //并非中断线程,因此可能true也可能false,并返回
        return Thread.interrupted();
}

//ReentrantLock类的unlock
public void unlock() {
    sync.release(1);
}

//AQS类的release()方法
public final boolean release(int arg) {
    //尝试释放锁
    if (tryRelease(arg)) {

        Node h = head;
        if (h != null && h.waitStatus != 0)
            //唤醒后继结点的线程
            unparkSuccessor(h);
        return true;
    }
    return false;
}

//ReentrantLock类中的内部类Sync实现的tryRelease(int releases)
protected final boolean tryRelease(int releases) {

      int c = getState() - releases;
      if (Thread.currentThread() != getExclusiveOwnerThread())
          throw new IllegalMonitorStateException();
      boolean free = false;
      //判断状态是否为0,如果是则说明已释放同步状态
      if (c == 0) {
          free = true;
          //设置Owner为null
          setExclusiveOwnerThread(null);
      }
      //设置更新同步状态
      setState(c);
      return free;
  }
//AQS本身方法,唤醒后续挂起的结点
private void unparkSuccessor(Node node) {
    //这里,node一般为当前线程所在的结点。
    int ws = node.waitStatus;
    if (ws < 0)//置零当前线程所在的结点状态,允许失败。
        compareAndSetWaitStatus(node, ws, 0);

    Node s = node.next;//找到下一个需要唤醒的结点s
    if (s == null || s.waitStatus > 0) {//如果为空或已取消
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)//从这里可以看出,<=0的结点,都是还有效的结点。
                s = t;
    }
    if (s != null)
        LockSupport.unpark(s.thread);//唤醒
}

剖析基于并发AQS的重入锁(ReetrantLock)及其Condition实现原理

原文地址:https://www.cnblogs.com/doit8791/p/9095424.html

时间: 2024-10-02 20:30:32

AQS与重入锁ReetrantLock原理的相关文章

Java 重入锁 ReentrantLock 原理分析

1.简介 可重入锁ReentrantLock自 JDK 1.5 被引入,功能上与synchronized关键字类似.所谓的可重入是指,线程可对同一把锁进行重复加锁,而不会被阻塞住,这样可避免死锁的产生.ReentrantLock 的主要功能和 synchronized 关键字一致,均是用于多线程的同步.但除此之外,ReentrantLock 在功能上比 synchronized 更为丰富.比如 ReentrantLock 在加锁期间,可响应中断,可设置超时等. ReentrantLock 是我们

ReentrantLock重入锁

上次博客讲到了通过wait()方法和notify()方法来实现循环打印数字和字母得问题.其实使用重入锁也可以实现同样得功能,那么开始我们先通过源码来了解一下重入锁把. public void lock() { sync.lock(); } 首先它有一个lock()方法,它用来加锁,从代码中可以看到,它调用得是sync.lock()方法, public class ReentrantLock implements Lock, java.io.Serializable { private stati

ReentrantLock (重入锁) 源码浅析

一.ReentrantLock简介ReentrantLock重入锁,顾名思义,就是支持重入的锁,它表示能够支持一个线程对资源的重复加锁:我们之前学习过Synchronized锁,它也是支持重入的一种锁,参考我的另一篇Synchronized 锁的实现原理与应用,Synchronized支持隐式的重入锁,比如递归方法,在方法运行时,执行线程在获取到了锁之后仍能连续多次地获取锁:ReentrantLock虽然不能隐式重入,但是获取到锁的线程多次调用lock方法,不会阻塞进入同步队列:除此之外在获取锁

可重入锁的机制

1.听故事把知识掌握了 在一个村子里面,有一口井水,水质非常的好,村民们都想打井里的水.这井只有一口,村里的人那么多,所以得出个打水的规则才行.村长绞尽脑汁,最终想出了一个比较合理的方案,咱们来仔细的看看聪明的村长大人的智慧. 井边安排一个看井人,维护打水的秩序. 打水时,以家庭为单位,哪个家庭任何人先到井边,就可以先打水,而且如果一个家庭占到了打水权,其家人这时候过来打水不用排队.而那些没有抢占到打水权的人,一个一个挨着在井边排成一队,先到的排在前面.打水示意图如下 : 是不是感觉很和谐,如果

【死磕Java并发】-----J.U.C之重入锁:ReentrantLock

此篇博客所有源码均来自JDK 1.8 ReentrantLock,可重入锁,是一种递归无阻塞的同步机制.它可以等同于synchronized的使用,但是ReentrantLock提供了比synchronized更强大.灵活的锁机制,可以减少死锁发生的概率. API介绍如下: 一个可重入的互斥锁定 Lock,它具有与使用 synchronized 方法和语句所访问的隐式监视器锁定相同的一些基本行为和语义,但功能更强大.ReentrantLock 将由最近成功获得锁定,并且还没有释放该锁定的线程所拥

聊聊高并发(二十七)解析java.util.concurrent各个组件(九) 理解ReentrantLock可重入锁

这篇讲讲ReentrantLock可重入锁,JUC里提供的可重入锁是基于AQS实现的阻塞式可重入锁.这篇 聊聊高并发(十六)实现一个简单的可重入锁 模拟了可重入锁的实现.可重入锁的特点是: 1. 是互斥锁,基于AQS的互斥模式实现,也就是说同时只有一个线程进入临界区,唤醒下一个线程时也只能释放一个等待线程 2. 可重入,通过设置了一个字段exclusiveOwnerThread来标示当前获得锁的线程.获取锁操作是,如果当前线程是已经获得锁的线程,那么获取操作成功.把当前状态作为获得锁次数的计数器

两种方式实现自己的可重入锁

本篇文章将介绍两种自己动手实现可重入锁的方法. 我们都知道JDK中提供了一个类ReentrantLock,利用这个类我们可以实现一个可重入锁,这种锁相对于synchronized来说是一种轻量级锁. 重入锁的概念 重入锁实际上指的就是一个线程在没有释放锁的情况下,可以多次进入加锁的代码块. public void a() { lock2.lock(); System.out.println("a"); b(); lock2.unlock(); } public void b() { l

多线程——重入锁

当某个线程请求一个由其它线程持有的锁时,该线程就会进入等待或者阻塞状态,一直到持有锁的线程释放锁,然后再去竞争获取锁.然而,内置锁(即Synchronized修饰的锁对象)是可重入的,因此如果某个线程试图获得一个已经由它自己持有的锁,那么它会成功获得此锁,这就是所谓的重入锁,也可以理解为锁的重入. "重入",意味着获取锁的操作的粒度是"线程",而不是"调用","调用"只是获取锁的途径. "重入"的一种实现方

Java 中15种锁的介绍:公平锁,可重入锁,独享锁,互斥锁,乐观锁,分段锁,自旋锁等等(转)

Java 中15种锁的介绍 在读很多并发文章中,会提及各种各样锁如公平锁,乐观锁等等,这篇文章介绍各种锁的分类.介绍的内容如下: 公平锁 / 非公平锁 可重入锁 / 不可重入锁 独享锁 / 共享锁 互斥锁 / 读写锁 乐观锁 / 悲观锁 分段锁 偏向锁 / 轻量级锁 / 重量级锁 自旋锁 上面是很多锁的名词,这些分类并不是全是指锁的状态,有的指锁的特性,有的指锁的设计,下面总结的内容是对每个锁的名词进行一定的解释. 公平锁 / 非公平锁 公平锁 公平锁是指多个线程按照申请锁的顺序来获取锁. 非公