JDK之ReentrantLock,AbstractQueuedSynchronizer源码分析

ReentrantLock默认使用非公平锁

    public ReentrantLock() {
        sync = new NonfairSync();
    }

所谓公平锁就是先阻塞的程序先获得锁。非公平锁则有操作系统的调度系统来调度。

NonfairSync就是一个同步器

    final static class NonfairSync extends Sync {
        private static final long serialVersionUID = 7316153563782823691L;

        /**
         * Performs lock.  Try immediate barge, backing up to normal
         * acquire on failure.
         */
        final void lock() {
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
        }

        protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }
    }

而Sync则是这样实现的

    static abstract class Sync extends AbstractQueuedSynchronizer {

现在来关注下ReentrantLock的lock方法

    public void lock() {
        sync.lock();
    }

实际就是调用的NonFairSync的lock方法

        final void lock() {
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
        }

而这个用的AbstractQueueSynchronizer中的方法来实现的

    protected final boolean compareAndSetState(int expect, int update) {
        // See below for intrinsics setup to support this
        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    }

意思就是如果内存中的state这个字段是0,则将它置为1,否则执行acquire这个函数。注意compareAndSwapInt这句话是原子性的。所以保证只能有一个线程能成功将state置为1,从而获得锁。

同时执行

    protected final void setExclusiveOwnerThread(Thread t) {
        exclusiveOwnerThread = t;
    }

这个函数,置当前获得锁的函数。

那执行compareAndSetState失败的函数则执行acquire这个函数。看一下这个函数都干嘛了呢

<pre name="code" class="java">    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

看一个tryAcquire这个函数

        protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }
        final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

此时会先尝试着compareAndSetState这个函数,若又失败return false

    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

那么会先执行addWaiter这个函数。这个函数就是将当前线程生成一个节点对象,然后放入队列里头

    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }

如果队列不是空则放在队列尾部

否则初始化一个队列

    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

在看下acquireQueued都干了些什么呢

    final boolean acquireQueued(final Node node, int arg) {
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } catch (RuntimeException ex) {
            cancelAcquire(node);
            throw ex;
        }
    }

就是如果当前线程的前驱是头节点的话,就试着去再次获得锁,因为head节点是当前获得锁的线程。

若失败,则执行这个方法shouldParkAfterFailedAcquire,parkAndCheckInterrupt

意思就是如果获取失败,就挂起当前线程,进去这个方法里面,看一看

    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            /*
             * This node has already set status asking a release
             * to signal it, so it can safely park
             */
            return true;
        if (ws > 0) {
            /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
	    do {
		node.prev = pred = pred.prev;
	    } while (pred.waitStatus > 0);
	    pred.next = node;
        } else {
            /*
             * waitStatus must be 0 or PROPAGATE. Indicate that we
             * need a signal, but don't park yet. Caller will need to
             * retry to make sure it cannot acquire before parking.
             */
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }
    /**
     * Convenience method to park and then check if interrupted
     *
     * @return {@code true} if interrupted
     */
    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }

就是调用了LockSupport的park方法来挂起当前线程。interrupted的作用是干这个用的

  1. //清除中断标记,并返回上一次中断标记的值
  2. public static boolean interrupted() { ... }

若当成的中断是true的话,下面就将执行这句话selfInterrupt方法

  public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
    /**
     * Convenience method to interrupt current thread.
     */
    private static void selfInterrupt() {
        Thread.currentThread().interrupt();
    }

设置当前线程的中断标记,这里只是在线程上设置了一个中断标记,具体怎么处理这个中断标记,由应用程序来确定。

时间: 2024-10-08 10:08:49

JDK之ReentrantLock,AbstractQueuedSynchronizer源码分析的相关文章

Java并发系列[2]----AbstractQueuedSynchronizer源码分析之独占模式

在上一篇<Java并发系列[1]----AbstractQueuedSynchronizer源码分析之概要分析>中我们介绍了AbstractQueuedSynchronizer基本的一些概念,主要讲了AQS的排队区是怎样实现的,什么是独占模式和共享模式以及如何理解结点的等待状态.理解并掌握这些内容是后续阅读AQS源码的关键,所以建议读者先看完我的上一篇文章再回过头来看这篇就比较容易理解.在本篇中会介绍在独占模式下结点是怎样进入同步队列排队的,以及离开同步队列之前会进行哪些操作.AQS为在独占模

多线程之美5一 AbstractQueuedSynchronizer源码分析&lt;一&gt;

AQS的源码分析 目录结构 1.什么是CAS ? 2.同步器类结构 3.CLH同步队列 4.AQS中静态内部类Node 5.方法分析 ? 5.1.acquire(int arg ) ? 5.2.release(int arg) 释放锁 6.总结 前言 在多线程环境下,我们一般会对临界区资源(共享资源)进行加锁,释放锁,保证同一时刻最多只有一个线程(独占模式),就如去公共厕所里,在使用一个小房间时会加锁避免自己在使用的时候,别人突然闯进来一样,引起不必要的麻烦,在使用完后,再打开锁,其他人才可使用

jdk并发包 CopyOnWriteArrayList源码分析

CopyOnWriteArrayList是jdk1.5并法包里面用于处理高并发下,读多写少的情况下,降低锁等待的集合类.下面对该类实现做一个简要的分析 1,首先CopyOnWriteArrayList是实现了List接口,对=List接口的相关方法进行了实现. 2,下面的它的add方法,会首先加锁,然后copy原List内部的数组,然后对新数组长度加1后释放锁.由于数组copy速度很快,切在读多写少的情况下锁开销比较少 public boolean add(E e) { final Reentr

jdk 1.7 ArrayList 源码分析

1.首先看看ArrayList 的基类有哪些 public class ArrayList<E> extends AbstractList<E> implements List<E>, RandomAccess, Cloneable, java.io.Serializable 可以总结为: ArrayList 继承了AbstractList 实现了 List . RondomAcess . cloneable . serializable 对于一般的容器都会实现clon

jdk 1.7 LinkedList 源码分析

1 linkedList 的定义 public class LinkedList<E> extends AbstractSequentialList<E> implements List<E>, Deque<E>, Cloneable, java.io.Serializable 从这段代码中我们可以清晰地看出LinkedList继承AbstractSequentialList,实现List.Deque.Cloneable.Serializable.其中Abs

Java并发系列[1]----AbstractQueuedSynchronizer源码分析之概要分析

学习Java并发编程不得不去了解一下java.util.concurrent这个包,这个包下面有许多我们经常用到的并发工具类,例如:ReentrantLock, CountDownLatch, CyclicBarrier, Semaphore等.而这些类的底层实现都依赖于AbstractQueuedSynchronizer这个类,由此可见这个类的重要性.所以在Java并发系列文章中我首先对AbstractQueuedSynchronizer这个类进行分析,由于这个类比较重要,而且代码比较长,为了

Java并发编程-AbstractQueuedSynchronizer源码分析

简介 提供了一个基于FIFO队列,可以用于构建锁或者其他相关同步装置的基础框架.该同步器(以下简称同步器)利用了一个int来表示状态,期望它能够成为实现大部分同步需求的基础.使用的方法是继承,子类通过继承同步器并需要实现它的方法来管理其状态,管理的方式就是通过类似acquire和release的方式来操纵状态.然而多线程环境中对状态的操纵必须确保原子性,因此子类对于状态的把握,需要使用这个同步器提供的以下三个方法对状态进行操作: java.util.concurrent.locks.Abstra

AbstractQueuedSynchronizer源码分析

/* * ORACLE PROPRIETARY/CONFIDENTIAL. Use is subject to license terms. * * * * * * * * * * * * * * * * * * * * */ /* * * * * * * Written by Doug Lea with assistance from members of JCP JSR-166 * Expert Group and released to the public domain, as expl

Java并发系列[5]----ReentrantLock源码分析

在Java5.0之前,协调对共享对象的访问可以使用的机制只有synchronized和volatile.我们知道synchronized关键字实现了内置锁,而volatile关键字保证了多线程的内存可见性.在大多数情况下,这些机制都能很好地完成工作,但却无法实现一些更高级的功能,例如,无法中断一个正在等待获取锁的线程,无法实现限定时间的获取锁机制,无法实现非阻塞结构的加锁规则等.而这些更灵活的加锁机制通常都能够提供更好的活跃性或性能.因此,在Java5.0中增加了一种新的机制:Reentrant