【Java并发编程实战】—–“J.U.C”:ReentrantLock之二lock方法分析

前一篇博客简介了ReentrantLock的定义和与synchronized的差别,以下尾随LZ的笔记来扒扒ReentrantLock的lock方法。我们知道ReentrantLock有公平锁、非公平锁之分,所以lock()我也已公平锁、非公平锁来进行阐述。首先我们来看ReentrantLock的结构【图来自Java多线程系列–“JUC锁”03之 公平锁(一)】:

从上图我们能够看到,ReentrantLock实现Lock接口。Sync与ReentrantLock是组合关系,且FairSync(公平锁)、NonfairySync(非公平锁)是Sync的子类。Sync继承AQS(AbstractQueuedSynchronizer)。在详细分析lock时。我们须要了解几个概念:

AQS(AbstractQueuedSynchronizer):为java中管理锁的抽象类。该类为实现依赖于先进先出 (FIFO) 等待队列的堵塞锁和相关同步器(信号量、事件,等等)提供一个框架。该类提供了一个非常重要的机制。在JDK API中是这样描写叙述的:为实现依赖于先进先出 (FIFO) 等待队列的堵塞锁和相关同步器(信号量、事件。等等)提供一个框架。此类的设计目标是成为依靠单个原子 int 值来表示状态的大多数同步器的一个实用基础。子类必须定义更改此状态的受保护方法。并定义哪种状态对于此对象意味着被获取或被释放。

假定这些条件之后,此类中的其他方法就能够实现全部排队和堵塞机制。

子类能够维护其他状态字段,但仅仅是为了获得同步而仅仅追踪使用 getState()、setState(int) 和 compareAndSetState(int, int) 方法来操作以原子方式更新的 int 值。 这么长的话用一句话概括就是:维护锁的当前状态和线程等待列表。

CLH:AQS中“等待锁”的线程队列。

我们知道在多线程环境中我们为了保护资源的安全性常使用锁将其保护起来,同一时刻仅仅能有一个线程能够訪问,其余线程则须要等待,CLH就是管理这些等待锁的队列。

CAS(compare and swap):比較并交换函数,它是原子操作函数,也就是说全部通过CAS操作的数据都是以原子方式进行的。

公平锁(FairSync):lock

lock()定义例如以下:

final void lock() {
            acquire(1);
        }

lock()内部调用acquire(1),为何是”1”呢?首先我们知道ReentrantLock是独占锁,1表示的是锁的状态state。对于独占锁而言。假设所处于可获取状态,其状态为0,当锁初次被线程获取时状态变成1。

acquire()是AbstractQueuedSynchronizer中的方法。其源代码例如以下:

public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

从该方法的实现中我们能够看出,它做了非常多的工作,详细工作我们先晾着。先看这些方法的实现:

tryAcquire

tryAcquire方法是在FairySync中实现的,其源代码例如以下:

protected final boolean tryAcquire(int acquires) {
        //当前线程
        final Thread current = Thread.currentThread();
        //获取锁状态state
        int c = getState();
        /*
         * 当c==0表示锁没有被不论什么线程占用。在该代码块中主要做例如以下几个动作:
         * 则推断“当前线程”是不是CLH队列中的第一个线程线程(hasQueuedPredecessors)。
         * 若是的话,则获取该锁。设置锁的状态(compareAndSetState),
         * 并切设置锁的拥有者为“当前线程”(setExclusiveOwnerThread)。

*/
        if (c == 0) {
            if (!hasQueuedPredecessors() &&
                compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        /*
         * 假设c != 0,表示该锁已经被线程占有,则推断该锁是否是当前线程占有。若是设置state,否则直接返回false
         */
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }

在这里我们能够肯定tryAcquire主要是去尝试获取锁,获取成功则设置锁状态并返回true。否则返回false。

hasQueuedPredecessors:”当前线程”是不是在CLH队列的队首。来返回AQS中是不是有比“当前线程”等待更久的线程(公平锁)。

public final boolean hasQueuedPredecessors() {
        Node t = tail;
        Node h = head;
        Node s;
        return h != t &&
            ((s = h.next) == null || s.thread != Thread.currentThread());
    }

Node是AbstractQueuedSynchronizer的内部类。它代表着CLH列表的一个线程节点。对于Node以后LZ会详细阐述的。

compareAndSetState:设置锁状态

protected final boolean compareAndSetState(int expect, int update) {
            return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
        }

compareAndSwapInt() 是sun.misc.Unsafe类中的一个本地方法。

对此,我们须要了解的是 compareAndSetState(expect, update) 是以原子的方式操作当前线程。若当前线程的状态为expect。则设置它的状态为update。

setExclusiveOwnerThread:设置当前线程为锁的拥有者

protected final void setExclusiveOwnerThread(Thread t) {
        exclusiveOwnerThread = t;
    }

addWaiter(Node.EXCLUSIVE)

private Node addWaiter(Node mode) {
        //new 一个Node节点
        Node node = new Node(Thread.currentThread(), mode);

        //CLH队列尾节点
        Node pred = tail;

        //CLH尾节点!= null,表示CLH队列 != null,则将线程增加到CLH队列队尾
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        //若CLH队列为空,则调用enq()新建CLH队列,然后再将“当前线程”增加到CLH队列中。
        enq(node);
        return node;
    }

addWaiter()主要是将当前线程增加到CLH队列队尾。

当中compareAndSetTail和enq的源代码例如以下:

/**
     * 推断CLH队列的队尾是不是为expect。是的话,就将队尾设为update
     * @param expect
     * @param update
     * @return
     */
    private final boolean compareAndSetTail(Node expect, Node update) {
        return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
    }

    /**
     * 假设CLH队列为空,则新建一个CLH表头;然后将node增加到CLH末尾。

否则,直接将node增加到CLH末尾
     * @param node
     * @return
     */
    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) {
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

addWaiter的实现比較简单且实现功能明了:当前线程增加到CLH队列队尾。

acquireQueued

final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            //线程中断标志位
            boolean interrupted = false;
            for (;;) {
                //上一个节点。由于node相当于当前线程,所以上一个节点表示“上一个等待锁的线程”
                final Node p = node.predecessor();
                /*
                 * 假设当前线程是head的直接后继则尝试获取锁
                 * 这里不会和等待队列中其他线程发生竞争,但会和尝试获取锁且尚未进入等待队列的线程发生竞争。

这是非公平锁和公平锁的一个重要差别。
                 */
                if (p == head && tryAcquire(arg)) {
                    setHead(node);     //将当前节点设置设置为头结点
                    p.next = null;
                    failed = false;
                    return interrupted;
                }
                /* 假设不是head直接后继或获取锁失败。则检查是否要堵塞当前线程,是则堵塞当前线程
                 * shouldParkAfterFailedAcquire:推断“当前线程”是否须要堵塞
                 * parkAndCheckInterrupt:堵塞当前线程
                 */
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

在这个for循环中。LZ不是非常明确为什么要加p==head,Java多线程系列–“JUC锁”03之 公平锁(一)这篇博客有一个较好的解释例如以下:

p == head && tryAcquire(arg) 
首先,推断“前继节点”是不是CHL表头。假设是的话,则通过tryAcquire()尝试获取锁。 
事实上,这样做的目的是为了“让当前线程获取锁”,可是为什么须要先推断p==head呢?理解这个对理解“公平锁”的机制非常重要。由于这么做的原因就是为了保证公平性! 
      (a) 前面,我们在shouldParkAfterFailedAcquire()我们推断“当前线程”是否须要堵塞; 
      (b) 接着。“当前线程”堵塞的话。会调用parkAndCheckInterrupt()来堵塞线程。当线程被解除堵塞的时候,我们会返回线程的中断状态。而线程被解决堵塞,可能是由于“线程被中断”,也可能是由于“其他线程调用了该线程的unpark()函数”。 
      (c) 再回到p==head这里。

假设当前线程是由于其他线程调用了unpark()函数而被唤醒,那么唤醒它的线程。应该是它的前继节点所相应的线程(关于这一点,后面在“释放锁”的过程中会看到)。

OK,是前继节点调用unpark()唤醒了当前线程!

此时,再来理解p==head就非常easy了:当前继节点是CLH队列的头节点,而且它释放锁之后。就轮到当前节点获取锁了。然后。当前节点通过tryAcquire()获取锁。获取成功的话,通过setHead(node)设置当前节点为头节点。并返回。

总之,假设“前继节点调用unpark()唤醒了当前线程”而且“前继节点是CLH表头”。此时就是满足p==head,也就是符合公平性原则的。否则,假设当前线程是由于“线程被中断”而唤醒,那么显然就不是公平了。这就是为什么说p==head就是保证公平性!

在该方法中有两个方法比較重要。shouldParkAfterFailedAcquire和parkAndCheckInterrupt。当中

shouldParkAfterFailedAcquire:推断“当前线程”是否须要堵塞,源代码例如以下:

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
           //当前节点的状态
            int ws = pred.waitStatus;
            if (ws == Node.SIGNAL)
                return true;
            if (ws > 0) {
                do {
                    node.prev = pred = pred.prev;
                } while (pred.waitStatus > 0);
                pred.next = node;
            } else {
                compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
            }
            return false;
        }

waitStatus是节点Node定义的,她是标识线程的等待状态。他主要有例如以下四个值:

CANCELLED = 1:线程已被取消;

SIGNAL = -1:当前线程的后继线程须要被unpark(唤醒);

CONDITION = -2 :线程(处在Condition休眠状态)在等待Condition唤醒;

PROPAGATE = –3:(共享锁)其他线程获取到“共享锁”.

有了这四个状态,我们再来分析上面代码,当ws == SIGNAL时表明当前节点须要unpark(唤醒),直接返回true,当ws > 0 (CANCELLED),表明当前节点已经被取消了。则通过回溯的方法(do{}while())向前找到一个非CANCELLED的节点并返回false。其他情况则设置该节点为SIGNAL状态。我们再回到if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())。p是当前节点的前继节点。当该前继节点状态为SIGNAL时返回true。表示当前线程须要堵塞。则调用parkAndCheckInterrupt()堵塞当前线程。

parkAndCheckInterrupt:堵塞当前线程,而且返回“线程被唤醒之后”的中断状态,源代码例如以下:

private final boolean parkAndCheckInterrupt() {
    //通过LockSupport的park()堵塞“当前线程”。
        LockSupport.park(this);
        return Thread.interrupted();
    }

从上面我们能够总结,acquireQueued()是当前线程会依据公平性原则来进行堵塞等待,直到获取锁为止。而且返回当前线程在等待过程中有没有并中断过。

selfInterrupt

private static void selfInterrupt() {
        Thread.currentThread().interrupt();
    }

selfInterrupt()产生一个中断。假设在acquireQueued()中当前线程被中断过。则须要产生一个中断。

Fairy lock()总结

我们再看acquire()源代码:

public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

首先通过tryAcquire方法尝试获取锁,假设成功直接返回。否则通过acquireQueued()再次获取。在acquireQueued()中会先通过addWaiter将当前线程增加到CLH队列的队尾,在CLH队列中等待。在等待过程中线程处于休眠状态,直到成功获取锁才会返回。例如以下:

非公平锁(NonfairSync):lock

非公平锁NonfairSync的lock()与公平锁的lock()在获取锁的流程上是一直的,可是由于它是非公平的,所以获取锁机制还是有点不同。通过前面我们了解到公平锁在获取锁时採用的是公平策略(CLH队列),而非公平锁则採用非公平策略它无视等待队列,直接尝试获取。

例如以下:

final void lock() {
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
        }

lock()通过compareAndSetState尝试设置所状态,若成功直接将锁的拥有者设置为当前线程(简单粗暴),否则调用acquire()尝试获取锁;

acquire

public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

在非公平锁中acquire()的实现和公平锁一模一样,可是他们尝试获取锁的机制不同(也就是tryAcquire()的实现不同)。

protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }

tryAcquire内部调用nonfairyTryAcquire:

final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

与公平锁相比,非公平锁的不同之处就体如今if(c==0)的条件代码块中:

//----------------非公平锁-----
  if (c == 0) {
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
  //----------------公平锁-----
 if (c == 0) {
                if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }

是否已经发现了不同之处。公平锁中要通过hasQueuedPredecessors()来推断该线程是否位于CLH队列中头部,是则获取锁;而非公平锁则无论你在哪个位置都直接获取锁。

參考文献:

1、Java多线程系列–“JUC锁”03之 公平锁(一)

2、ReentrantLock源代码之中的一个lock方法解析(锁的获取)

时间: 2024-12-22 13:51:29

【Java并发编程实战】—–“J.U.C”:ReentrantLock之二lock方法分析的相关文章

【Java并发编程实战】—–“J.U.C”:CountDownlatch

上篇博文([Java并发编程实战]-–"J.U.C":CyclicBarrier)LZ介绍了CyclicBarrier.CyclicBarrier所描述的是"允许一组线程互相等待,直到到达某个公共屏障点,才会进行后续任务".而CountDownlatch和它也有一点点相似之处:CountDownlatch所描述的是"在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待".在JDK API中是这样阐述的: 用给定的计数 初始化 Co

【Java并发编程实战】—–“J.U.C”:ReentrantReadWriteLock

ReentrantLock实现了标准的互斥操作,也就是说在某一时刻只有有一个线程持有锁.ReentrantLock采用这种独占的保守锁直接,在一定程度上减低了吞吐量.在这种情况下任何的"读/读"."读/写"."写/写"操作都不能同时发生.然而在实际的场景中我们就会遇到这种情况:有些资源并发的访问中,它大部分时间都是执行读操作,写操作比较少,但是读操作并不影响数据的一致性,如果在进行读操作时采用独占的锁机制,这样势必会大大降低吞吐量.所以如果能够做

【Java并发编程实战】—–“J.U.C”:Semaphore

信号量Semaphore是一个控制访问多个共享资源的计数器,它本质上是一个"共享锁". Java并发提供了两种加锁模式:共享锁和独占锁.前面LZ介绍的ReentrantLock就是独占锁.对于独占锁而言,它每次只能有一个线程持有,而共享锁则不同,它允许多个线程并行持有锁,并发访问共享资源. 独占锁它所采用的是一种悲观的加锁策略,  对于写而言为了避免冲突独占是必须的,但是对于读就没有必要了,因为它不会影响数据的一致性.如果某个只读线程获取独占锁,则其他读线程都只能等待了,这种情况下就限

【Java并发编程实战】—–“J.U.C”:CyclicBarrier

在上篇博客([Java并发编程实战]-–"J.U.C":Semaphore)中,LZ介绍了Semaphore,下面LZ介绍CyclicBarrier.在JDK API中是这么介绍的: 一个同步辅助类,它允许一组线程互相等待,直到到达某个公共屏障点 (common barrier point).在涉及一组固定大小的线程的程序中,这些线程必须不时地互相等待,此时 CyclicBarrier 很有用.因为该 barrier 在释放等待线程后可以重用,所以称它为循环 的 barrier. Cy

【Java并发编程实战】—– AQS(四):CLH同步队列

在[Java并发编程实战]-–"J.U.C":CLH队列锁提过,AQS里面的CLH队列是CLH同步锁的一种变形. 其主要从双方面进行了改造:节点的结构与节点等待机制.在结构上引入了头结点和尾节点,他们分别指向队列的头和尾,尝试获取锁.入队列.释放锁等实现都与头尾节点相关.而且每一个节点都引入前驱节点和后兴许节点的引用:在等待机制上由原来的自旋改成堵塞唤醒. 其结构例如以下: 知道其结构了,我们再看看他的实现.在线程获取锁时会调用AQS的acquire()方法.该方法第一次尝试获取锁假设

《Java并发编程实战》要点笔记及java.util.concurrent 的结构介绍

买了<java并发编程实战>这本书,看了好几遍都不是很懂,这个还是要在实战中找取其中的要点的,后面看到一篇文章笔记做的很不错分享给大家!! 原文地址:http://blog.csdn.net/cdl2008sky/article/details/26377433 Subsections  1.线程安全(Thread safety) 2.锁(lock) 3.共享对象 4.对象组合 5.基础构建模块 6.任务执行 7.取消和关闭 8.线程池的使用 9.性能与可伸缩性 10.并发程序的测试 11.显

《Java并发编程实战》第十五章 原子变量与非阻塞同步机制 读书笔记

一.锁的劣势 锁定后如果未释放,再次请求锁时会造成阻塞,多线程调度通常遇到阻塞会进行上下文切换,造成更多的开销. 在挂起与恢复线程等过程中存在着很大的开销,并且通常存在着较长时间的中断. 锁可能导致优先级反转,即使较高优先级的线程可以抢先执行,但仍然需要等待锁被释放,从而导致它的优先级会降至低优先级线程的级别. 二.硬件对并发的支持 处理器填写了一些特殊指令,例如:比较并交换.关联加载/条件存储. 1 比较并交换 CAS的含义是:"我认为V的值应该为A,如果是,那么将V的值更新为B,否则不需要修

《Java并发编程实战》读书笔记

Subsections 线程安全(Thread safety) 锁(lock) 共享对象 对象组合 基础构建模块 任务执行 取消和关闭 线程池的使用 性能与可伸缩性 并发程序的测试 显示锁 原子变量和非阻塞同步机制 一.线程安全(Thread safety) 无论何时,只要多于一个线程访问给定的状态变量.而且其中某个线程会写入该变量,此时必须使用同步来协助线程对该变量的访问. 线程安全是指多个线程在访问一个类时,如果不需要额外的同步,这个类的行为仍然是正确的. 线程安全的实例: (1).一个无状

java并发编程实战学习(3)--基础构建模块

转自:java并发编程实战 5.3阻塞队列和生产者-消费者模式 BlockingQueue阻塞队列提供可阻塞的put和take方法,以及支持定时的offer和poll方法.如果队列已经满了,那么put方法将阻塞直到空间可用:如果队列为空,那么take方法将阻塞直到有元素可用.队列可以是有界的也可以是无界的. 如果生产者生成工作的速率比消费者处理工作的速率款,那么工作项会在队列中累计起来,最终好紧内存.同样,put方法的阻塞特性也极大地简化了生产者的编码.如果使用有界队列,当队列充满时,生产者将阻