源码解析之AQS源码解析

要理解Lock首先要理解AQS,而要理解并发类最好的方法是先理解其并发控制量不同值得含义以及该类运作流程,然后配合一步步看源码。该类有一个重要的控制量是WaitStates。
/** waitStatus value to indicate thread has cancelled */
static final int CANCELLED =  1;   //该节点被取消了
/** waitStatus value to indicate successor‘s thread needs unparking */
static final int SIGNAL    = -1;   //该节点后续节点需要被唤醒
/** waitStatus value to indicate thread is waiting on condition */
static final int CONDITION = -2;  //该节点进入了等待队列,即Condition的队列里
/**
 * waitStatus value to indicate the next acquireShared should
 * unconditionally propagate
 */
static final int PROPAGATE = -3;  //共享节点,该节点进锁后会释放锁,。

AQS流程图:

Condition与Lock配合:

源码分析:核心方法 aquaire和release及他们方法体里使用到的方法。

public final void acquire(int arg) {
        if (!tryAcquire(arg) &&  //如果tryacquire失败 且 队列里获取节点成功 且被中断过
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();//当前线程中断 interrupt()            
        //说下中断的事,1、如果在acquireQueued过程线程被interrupt,如果线程没进入等待状态,并不会中断线程。只是改变interrupt变量
        // 且被传回到这里(因为是用interrupted,所以返回true之后又把线程的interrupt变量设为false)。然后selfinterrupt,将interrupt变量设为true。
        // 2、如果线程被park了然后被interrupt,则被唤醒,循环一次发现还是阻塞又进入park等待状态。直到被unpark,interrupt参数为true传回到这里。
        //然后interrupt参数又变回false(受interrupted影响),selfinterrupt则又把它设为true。
    }
    final boolean acquireQueued(final Node node, int arg) {//从队列里尝试获取锁
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();//前节点
                if (p == head && tryAcquire(arg)) {//前节点为头节点,且尝试获取锁获取到了。则清理前节点,
                    setHead(node);                  //head指向现节点
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                //shouldParkAfterFailedAcquire如果前节点为-1则返回true,如果0(初始),-3(共享)则设为-1,如果1则
                //  找到前面<-0的节点连他后面
                //parkAndCheckInterrupt 阻塞当前线程,并返回interrupted状态(阻塞则设置失败),并清除中断状态
                if (shouldParkAfterFailedAcquire(p, node) &&//前节点状态为-1 return true
                        parkAndCheckInterrupt()) //中断状态为true  return true
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);//节点取消获取,队列中删除节点,
        }
    }
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {//获取失败后判断是否暂停该线程
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)//前节点处于当前节点后面的节点需要被唤醒状态
            /*
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             */
            return true;
        if (ws > 0) {//前节点处于取消状态
            /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
            do {//当前节点找到前面状态<=0的节点连他后面。
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {//如果前节点不处于取消状态,则设为signal -1.(0或-3设为-1)
            /*
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don‘t park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             */
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }
private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);//阻塞该线程,至此该线程进入等待状态,等着unpark和interrupt叫醒他
        return Thread.interrupted();//叫醒之后返回该线程是否在中断状态, 并会清除中断记号。

    }
private void cancelAcquire(Node node) {
        // Ignore if node doesn‘t exist
        if (node == null)
            return;

        node.thread = null;//thread指向null

        // Skip cancelled predecessors
        Node pred = node.prev;//当前节点找到前面status<=0的节点连它后面
        while (pred.waitStatus > 0)
            node.prev = pred = pred.prev;

        // predNext is the apparent node to unsplice. CASes below will
        // fail if not, in which case, we lost race vs another cancel
        // or signal, so no further action is necessary.
        Node predNext = pred.next;

        // Can use unconditional write instead of CAS here.
        // After this atomic step, other Nodes can skip past us.
        // Before, we are free of interference from other threads.
        node.waitStatus = Node.CANCELLED;//status状态设为1 取消状态

        // If we are the tail, remove ourselves.
        if (node == tail && compareAndSetTail(node, pred)) {//节点为末尾,把找到的status<0节点后面节点都切掉
            compareAndSetNext(pred, predNext, null);
        } else {//节点不为末尾,前节点连上当前节点后节点
            // If successor needs signal, try to set pred‘s next-link
            // so it will get one. Otherwise wake it up to propagate.
            int ws;
            //节点不为末尾,找到status<=0的前节点不是头节点且该节点线程不是null、且status为<=0的前节点状态为-1(不是(-3,0)则设为-1)
            if (pred != head &&
                    ((ws = pred.waitStatus) == Node.SIGNAL ||
                            (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
                    pred.thread != null) {
                Node next = node.next;
                if (next != null && next.waitStatus <= 0)
                    compareAndSetNext(pred, predNext, next);
            } else {//节点不为末尾,status<=0的前节点是头节点 或status<=0的前节点线程为null,
                unparkSuccessor(node);//专门给头节点用的启动继任者函数,只有前status<=0节点是头节点,且
                //现节点后有节点才需unpark后续节点。(因为前节点可能唤醒的是当前线程,如果你删除当前节点
                //不unpark后面节点可能就停止工作。如果你是尾节点,那无所谓,反正你后面也没线程需要unpark)
            }

            node.next = node; // help GC
        }
    }
public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);//释放操作,启动继任者
        return true;
    }
    return false;
}
    private void unparkSuccessor(Node node) {//启动继任者线程
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
        int ws = node.waitStatus;
        if (ws < 0)//-2,-3等待队列或共享锁线程改为0 空白状态
            compareAndSetWaitStatus(node, ws, 0);

        /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {//如果后节点为null或waitstatus>0(线程取消状态)
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)//找到节点后面status<=0的节点启动它
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }

原文地址:https://www.cnblogs.com/llsblog/p/10629784.html

时间: 2024-10-10 19:29:45

源码解析之AQS源码解析的相关文章

AbstractQueuedSynchronizer(AQS)源码解析(一)

在JDK1.5版本,新增了并发包,其中包含了显示锁.并发容器.在这些锁和容器里,都有同步器(AQS)的身影.为了更好地理解JDK的并发包,我会用三个主题来详细描述AbstractQueuedSynchronizer的实现. 在AQS中,涉及到同步队列以及Condition对象,这也是我为什么要用三个主题来讲述的原因.本节将主要讲述同步队列,后面两节会分别讲述Condition对象以及AQS的主要功能实现. AQS同步队列的主要功能是将无法获得资源的线程放入同步队列中,进行等待,它是通过链表来

AQS源码解析

文大篇幅引用自HongJie大佬的一行一行源码分析清楚AbstractQueuedSynchronizer,这只是一篇简单的个人整理思路和总结(倒垃圾),如果觉得有些难懂的话,不要犹豫也不要疑惑,很明显是我这篇文章的问题,不是你的问题,这时你最好直接转去看HongJie大佬的原文,那个会好懂很多.还是看不懂的话建议隔一段时间再看,然后像我一样写(复制)一篇总结捋一下思路,加油! AQS 结构 属性 private transient volatile Node head; private tra

Java 并发之AbstractQueuedSynchronizer(AQS)源码解析

关键字:CLH,Node,线程,waitStatus,CAS,中断 目录 图解AQS的操作细节 0.前言 1.基本概念 1.1.CAS自旋 1.2.Node 1.3.CLH & AQS 1.4.ReentrantLock 2.图解AQS 2.1.线程A单独运行 2.2.线程B开始运行 2.3.线程C开始运行 2.4.线程A停止运行,线程B继续运行 2.5.1.线程B停止运行,线程C继续运行 2.5.2.线程C放弃竞争 3.问题总结 3.1.为什么在unparkSuccessor操作中从尾节点开始

浩哥解析MyBatis源码(十一)——Parsing解析模块之通用标记解析器(GenericTokenParser)与标记处理器(TokenHandler)

原创作品,可以转载,但是请标注出处地址:http://www.cnblogs.com/V1haoge/p/6724223.html 1.回顾 上面的几篇解析了类型模块,在MyBatis中类型模块包含的就是Java类型与Jdbc类型,和其间的转换处理.类型模块在整个MyBatis功能架构中属于基础组件之一,是提前注册到注册器中,并配置到Configuration中备用. 从这一篇开始解析Parsing解析模块,这个模块不同于Type模块,这个模块更像是一套工具模块.本篇先解析通用标记解析器Gene

structs2源码解读(6)之解析package标签

structs2源码解读之解析package标签 上面讨论过,在创建Dispacher对象时,调用dispacher.init()方法完成初始化,在这个方法中先创建各种配置文件的解析器(ConfigurationProvider),然后循环遍历这些解析器的register()方法解析各个配置文件.  for (final ContainerProvider containerProvider : providers)         {             containerProvider

struct2源码解读(3)之解析配置文件

struct2源码解读之解析配置文件 上篇博文讲到struct2在初始化时,在创建Dispacher之后,会Dispacher.init()中会对配置文件进行解析,下面就让我们一起探讨下struct2是如何解析配置文件的. public Dispatcher initDispatcher( HostConfig filterConfig ) {           //创建Dispacher实例         Dispatcher dispatcher = createDispatcher(f

观V8源码中的array.js,解析 Array.prototype.slice为什么能将类数组对象转为真正的数组?

在官方的解释中,如[mdn] The slice() method returns a shallow copy of a portion of an array into a new array object. 简单的说就是根据参数,返回数组的一部分的copy.所以了解其内部实现才能确定它是如何工作的.所以查看V8源码中的Array.js     可以看到如下的代码: 一.方法  ArraySlice,源码地址,直接添加到Array.prototype上的"入口",内部经过参数.类型

Spring 源码解析之HandlerAdapter源码解析(二)

Spring 源码解析之HandlerAdapter源码解析(二) 前言 看这篇之前需要有Spring 源码解析之HandlerMapping源码解析(一)这篇的基础,这篇主要是把请求流程中的调用controller流程单独拿出来了 解决上篇文章遗留的问题 getHandler(processedRequest) 这个方法是如何查找到对应处理的HandlerExecutionChain和HandlerMapping的,比如说静态资源的处理和请求的处理肯定是不同的HandlerMapping ge

mybatis源码追踪1——Mapper方法用法解析

Mapper中的方法执行时会构造为org.apache.ibatis.binding.MapperMethod$MethodSignature对象,从该类源码中可以了解如何使用Mapper方法. [支持的特殊参数类型] RowBounds.ResultHandler.普通参数 (作为sql执行时使用的变量) 其中普通参数可以是单一的model.查询条件的map或直接将一到多个查询条件作为参数(多个条件在框架中最终将封装为map使用) 另外普通参数支持添加@Param注解以修改参数名,如不修改则参