深入浅出 Java Concurrency (8): 锁机制 part 3[转]

接上篇,这篇从Lock.lock/unlock开始。特别说明在没有特殊情况下所有程序、API、文档都是基于JDK 6.0的。

public void java.util.concurrent.locks.ReentrantLock.lock()

获取锁。

如果该锁没有被另一个线程保持,则获取该锁并立即返回,将锁的保持计数设置为 1。

如果当前线程已经保持该锁,则将保持计数加 1,并且该方法立即返回。

如果该锁被另一个线程保持,则出于线程调度的目的,禁用当前线程,并且在获得锁之前,该线程将一直处于休眠状态,此时锁保持计数被设置为 1。

从上面的文档可以看出ReentrantLock是可重入锁的实现。而内部是委托java.util.concurrent.locks.ReentrantLock.Sync.lock()实现的。java.util.concurrent.locks.ReentrantLock.Sync是抽象类,有java.util.concurrent.locks.ReentrantLock.FairSync和java.util.concurrent.locks.ReentrantLock.NonfairSync两个实现,也就是常说的公平锁和不公平锁。

公平锁和非公平锁

如果获取一个锁是按照请求的顺序得到的,那么就是公平锁,否则就是非公平锁。

在没有深入了解内部机制及实现之前,先了解下为什么会存在公平锁和非公平锁。公平锁保证一个阻塞的线程最终能够获得锁,因为是有序的,所以总是可以按照请求的顺序获得锁。不公平锁意味着后请求锁的线程可能在其前面排列的休眠线程恢复前拿到锁,这样就有可能提高并发的性能。这是因为通常情况下挂起的线程重新开始与它真正开始运行,二者之间会产生严重的延时。因此非公平锁就可以利用这段时间完成操作。这是非公平锁在某些时候比公平锁性能要好的原因之一。

二者在实现上的区别会在后面介绍,我们先从公平锁(FairSync)开始。

前面说过java.util.concurrent.locks.AbstractQueuedSynchronizer (AQS)是Lock的基础,对于一个FairSync而言,lock()就直接调用AQS的acquire(int arg);

public final void acquire(int arg) 以独占模式获取对象,忽略中断。通过至少调用一次 tryAcquire(int) 来实现此方法,并在成功时返回。否则在成功之前,一直调用 tryAcquire(int) 将线程加入队列,线程可能重复被阻塞或不被阻塞。

在介绍实现之前先要补充上一节的知识,对于一个AQS的实现而言,通常情况下需要实现以下方法来描述如何锁定线程。

  • tryAcquire(int) 试图在独占模式下获取对象状态。此方法应该查询是否允许它在独占模式下获取对象状态,如果允许,则获取它。

    此方法总是由执行 acquire 的线程来调用。如果此方法报告失败,则 acquire 方法可以将线程加入队列(如果还没有将它加入队列),直到获得其他某个线程释放了该线程的信号。也就是说此方法是一种尝试性方法,如果成功获取锁那最好,如果没有成功也没有关系,直接返回false。

  • tryRelease(int) 试图设置状态来反映独占模式下的一个释放。 此方法总是由正在执行释放的线程调用。释放锁可能失败或者抛出异常,这个在后面会具体分析。
  • tryAcquireShared(int) 试图在共享模式下获取对象状态。
  • tryReleaseShared(int) 试图设置状态来反映共享模式下的一个释放。
  • isHeldExclusively() 如果对于当前(正调用的)线程,同步是以独占方式进行的,则返回 true

除了tryAcquire(int)外,其它方法会在后面具体介绍。首先对于ReentrantLock而言,不管是公平锁还是非公平锁,都是独占锁,也就是说同时能够有一个线程持有锁。因此对于acquire(int arg)而言,arg==1。在AQS中acquire的实现如下:

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

这个看起来比较复杂,我们分解以下4个步骤。

    1. 如果tryAcquire(arg)成功,那就没有问题,已经拿到锁,整个lock()过程就结束了。如果失败进行操作2。
    2. 创建一个独占节点(Node)并且此节点加入CHL队列末尾。进行操作3。
    3. 自旋尝试获取锁,失败根据前一个节点来决定是否挂起(park()),直到成功获取到锁。进行操作4。
    4. 如果当前线程已经中断过,那么就中断当前线程(清除中断位)。

这是一个比较复杂的过程,我们按部就班一个一个分析。

tryAcquire(acquires)

对于公平锁而言,它的实现方式如下:

protected final boolean tryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {
            if (isFirst(current) &&
                compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }
}

在这段代码中,前面说明对于AQS存在一个state来描述当前有多少线程持有锁。由于AQS支持共享锁(例如读写锁,后面会继续讲),所以这里state>=0,但是由于ReentrantLock是独占锁,所以这里不妨理解为0<=state,acquires=1。isFirst(current)是一个很复杂的逻辑,包括踢出无用的节点等复杂过程,这里暂且不提,大体上的意思是说判断AQS是否为空或者当前线程是否在队列头(为了区分公平与非公平锁)。

    1. 如果当前锁有其它线程持有,c!=0,进行操作2。否则,如果当前线程在AQS队列头部,则尝试将AQS状态state设为acquires(等于1),成功后将AQS独占线程设为当前线程返回true,否则进行2。这里可以看到compareAndSetState就是使用了CAS操作。
    2. 判断当前线程与AQS的独占线程是否相同,如果相同,那么就将当前状态位加1(这里+1后结果为负数后面会讲,这里暂且不理它),修改状态位,返回true,否则进行3。这里之所以不是将当前状态位设置为1,而是修改为旧值+1呢?这是因为ReentrantLock是可重入锁,同一个线程每持有一次就+1。
    3. 返回false。

比较非公平锁的tryAcquire实现java.util.concurrent.locks.ReentrantLock.Sync.nonfairTryAcquire(int),公平锁多了一个判断当前节点是否在队列头,这个就保证了是否按照请求锁的顺序来决定获取锁的顺序(同一个线程的多次获取锁除外)。

现在再回头看公平锁和非公平锁的lock()方法。公平锁只有一句acquire(1);而非公平锁的调用如下:

final void lock() {
    if (compareAndSetState(0, 1))
        setExclusiveOwnerThread(Thread.currentThread());
    else
        acquire(1);
}

很显然,非公平锁在第一次获取锁,或者其它线程释放锁后(可能等待),优先采用compareAndSetState(0,1)然后设置AQS独占线程而持有锁,这样有时候比acquire(1)顺序检查锁持有而要高效。即使在重入锁上,也就是compareAndSetState(0,1)失败,但是是当前线程持有锁上,非公平锁也没有问题。

addWaiter(mode)

tryAcquire失败就意味着入队列了。此时AQS的队列中节点Node就开始发挥作用了。一般情况下AQS支持独占锁和共享锁,而独占锁在Node中就意味着条件(Condition)队列为空(上一篇中介绍过相关概念)。在java.util.concurrent.locks.AbstractQueuedSynchronizer.Node中有两个常量,

static final Node EXCLUSIVE = null; //独占节点模式

static final Node SHARED = new Node(); //共享节点模式

addWaiter(mode)中的mode就是节点模式,也就是共享锁还是独占锁模式。

前面一再强调ReentrantLock是独占锁模式。

private Node addWaiter(Node mode) {
     Node node = new Node(Thread.currentThread(), mode);
     // Try the fast path of enq; backup to full enq on failure
     Node pred = tail;
     if (pred != null) {
         node.prev = pred;
         if (compareAndSetTail(pred, node)) {
             pred.next = node;
             return node;
         }
     }
     enq(node);
     return node;
}

上面是节点如队列的一部分。当前仅当队列不为空并且将新节点插入尾部成功后直接返回新节点。否则进入enq(Node)进行操作。

private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            Node h = new Node(); // Dummy header
            h.next = node;
            node.prev = h;
            if (compareAndSetHead(h)) {
                tail = node;
                return h;
            }
        }
        else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

enq(Node)去队列操作实现了CHL队列的算法,如果为空就创建头结点,然后同时比较节点尾部是否是改变来决定CAS操作是否成功,当且仅当成功后才将为不节点的下一个节点指向为新节点。可以看到这里仍然是CAS操作。

acquireQueued(node,arg)

自旋请求锁,如果可能的话挂起线程,直到得到锁,返回当前线程是否中断过(如果park()过并且中断过的话有一个interrupted中断位)。

final boolean acquireQueued(final Node node, int arg) {
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } catch (RuntimeException ex) {
        cancelAcquire(node);
        throw ex;
    }
}

下面的分析就需要用到上节节点的状态描述了。acquireQueued过程是这样的:

    1. 如果当前节点是AQS队列的头结点(如果第一个节点是DUMP节点也就是傀儡节点,那么第二个节点实际上就是头结点了),就尝试在此获取锁tryAcquire(arg)。如果成功就将头结点设置为当前节点(不管第一个结点是否是DUMP节点),返回中断位。否则进行2。
    2. 检测当前节点是否应该park(),如果应该park()就挂起当前线程并且返回当前线程中断位。进行操作1。

一个节点是否该park()是关键,这是由方法java.util.concurrent.locks.AbstractQueuedSynchronizer.shouldParkAfterFailedAcquire(Node, Node)实现的。

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int s = pred.waitStatus;
    if (s < 0) return true;
    if (s > 0) {
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else compareAndSetWaitStatus(pred, 0, Node.SIGNAL);
    return false;
}

    1. 如果前一个节点的等待状态waitStatus<0,也就是前面的节点还没有获得到锁,那么返回true,表示当前节点(线程)就应该park()了。否则进行2。
    2. 如果前一个节点的等待状态waitStatus>0,也就是前一个节点被CANCELLED了,那么就将前一个节点去掉,递归此操作直到所有前一个节点的waitStatus<=0,进行4。否则进行3。
    3. 前一个节点等待状态waitStatus=0,修改前一个节点状态位为SINGAL,表示后面有节点等待你处理,需要根据它的等待状态来决定是否该park()。进行4。
    4. 返回false,表示线程不应该park()。

selfInterrupt()

private static void selfInterrupt() {
    Thread.currentThread().interrupt();
}

如果线程曾经中断过(或者阻塞过)(比如手动interrupt()或者超时等等,那么就再中断一次,中断两次的意思就是清除中断位)。

大体上整个Lock.lock()就这样一个流程。除了lock()方法外,还有lockInterruptibly()/tryLock()/unlock()/newCondition()等,在接下来的章节中会一一介绍。

时间: 2024-12-25 22:08:06

深入浅出 Java Concurrency (8): 锁机制 part 3[转]的相关文章

深入浅出 Java Concurrency (9): 锁机制 part 4[转]

本小节介绍锁释放Lock.unlock(). Release/TryRelease unlock操作实际上就调用了AQS的release操作,释放持有的锁. public final boolean release(int arg) {    if (tryRelease(arg)) {        Node h = head;        if (h != null && h.waitStatus != 0)            unparkSuccessor(h);       

深入浅出 Java Concurrency (15): 锁机制 part 10 锁的一些其它问题[转]

主要谈谈锁的性能以及其它一些理论知识,内容主要的出处是<Java Concurrency in Practice>,结合自己的理解和实际应用对锁机制进行一个小小的总结. 首先需要强调的一点是:所有锁(包括内置锁和高级锁)都是有性能消耗的,也就是说在高并发的情况下,由于锁机制带来的上下文切换.资源同步等消耗是非常可观的.在某些极端情况下,线程在锁上的消耗可能比线程本身的消耗还要多.所以如果可能的话,在任何情况下都尽量少用锁,如果不可避免那么采用非阻塞算法是一个不错的解决方案,但是却也不是绝对的.

深入浅出 Java Concurrency (6): 锁机制 part 1[转]

前面的章节主要谈谈原子操作,至于与原子操作一些相关的问题或者说陷阱就放到最后的总结篇来整体说明.从这一章开始花少量的篇幅谈谈锁机制. 上一个章节中谈到了锁机制,并且针对于原子操作谈了一些相关的概念和设计思想.接下来的文章中,尽可能的深入研究锁机制,并且理解里面的原理和实际应用场合. 尽管synchronized在语法上已经足够简单了,在JDK 5之前只能借助此实现,但是由于是独占锁,性能却不高,因此JDK 5以后就开始借助于JNI来完成更高级的锁实现. JDK 5中的锁是接口java.util.

深入浅出 Java Concurrency(二)—锁机制(一)

前面的章节主要谈谈原子操作,至于与原子操作一些相关的问题或者说陷阱就放到最后的总结篇来整体说明.从这一章开始花少量的篇幅谈谈锁机制. 上一个章节 中谈到了锁机制,并且针对于原子操作谈了一些相关的概念和设计思想.接下来的文章中,尽可能的深入研究锁机制,并且理解里面的原理和实际应用场合. 尽管synchronized在语法上已经足够简单了,在JDK 5之前只能借助此实现,但是由于是独占锁,性能却不高,因此JDK 5以后就开始借助于JNI来完成更高级的锁实现. JDK 5中的锁是接口java.util

深入浅出 Java Concurrency (7): 锁机制 part 2 AQS[转]

在理解J.U.C原理以及锁机制之前,我们来介绍J.U.C框架最核心也是最复杂的一个基础类:java.util.concurrent.locks.AbstractQueuedSynchronizer. AQS AbstractQueuedSynchronizer,简称AQS,是J.U.C最复杂的一个类,导致绝大多数讲解并发原理或者实战的时候都不会提到此类.但是虚心的作者愿意借助自己有限的能力和精力来探讨一二(参考资源中也有一些作者做了部分的分析.). 首先从理论知识开始,在了解了相关原理后会针对源

深入浅出 Java Concurrency (12): 锁机制 part 7 信号量(Semaphore)[转]

Semaphore 是一个计数信号量.从概念上讲,信号量维护了一个许可集.如有必要,在许可可用前会阻塞每一个 acquire(),然后再获取该许可.每个 release() 添加一个许可,从而可能释放一个正在阻塞的获取者.但是,不使用实际的许可对象,Semaphore 只对可用许可的号码进行计数,并采取相应的行动. 说白了,Semaphore是一个计数器,在计数器不为0的时候对线程就放行,一旦达到0,那么所有请求资源的新线程都会被阻塞,包括增加请求到许可的线程,也就是说Semaphore不是可重

深入浅出 Java Concurrency (13): 锁机制 part 8 读写锁 (ReentrantReadWriteLock) (1)[转]

从这一节开始介绍锁里面的最后一个工具:读写锁(ReadWriteLock). ReentrantLock 实现了标准的互斥操作,也就是一次只能有一个线程持有锁,也即所谓独占锁的概念.前面的章节中一直在强调这个特点.显然这个特点在一定程度上面减低了吞吐量,实际上独占锁是一种保守的锁策略,在这种情况下任何“读/读”,“写/读”,“写/写”操作都不能同时发生.但是同样需要强调的一个概念是,锁是有一定的开销的,当并发比较大的时候,锁的开销就比较客观了.所以如果可能的话就尽量少用锁,非要用锁的话就尝试看能

深入浅出 Java Concurrency (11): 锁机制 part 6 CyclicBarrier[转]

如果说CountDownLatch是一次性的,那么CyclicBarrier正好可以循环使用.它允许一组线程互相等待,直到到达某个公共屏障点 (common barrier point).所谓屏障点就是一组任务执行完毕的时刻. 清单1 一个使用CyclicBarrier的例子 package xylz.study.concurrency.lock; import java.util.concurrent.BrokenBarrierException;import java.util.concur

深入浅出 Java Concurrency (10): 锁机制 part 5 闭锁 (CountDownLatch)[转]

此小节介绍几个与锁有关的有用工具. 闭锁(Latch) 闭锁(Latch):一种同步方法,可以延迟线程的进度直到线程到达某个终点状态.通俗的讲就是,一个闭锁相当于一扇大门,在大门打开之前所有线程都被阻断,一旦大门打开所有线程都将通过,但是一旦大门打开,所有线程都通过了,那么这个闭锁的状态就失效了,门的状态也就不能变了,只能是打开状态.也就是说闭锁的状态是一次性的,它确保在闭锁打开之前所有特定的活动都需要在闭锁打开之后才能完成. CountDownLatch是JDK 5+里面闭锁的一个实现,允许一