Lock+Condition实现机制

前言:大部分多线程同步场景,在功能和性能层面,synchronized可以满足,少部分场景Lock可以满足,dubbo的源码也符合这个比例,需要使用到Condition的场景极少,整个dubbo源码中只在启动函数中,服务关闭这一处使用到了Lock+Condition机制。

1.Lock+Condition用法

生产者,消费者模式在面试coding中出场率很高,可以用synchronized+wait+ notify来实现,也可以使用Lock+Condition实现。直接上代码

public class LockConditionTest {
    private LinkedList<String> queue=new LinkedList<String>();

    private Lock lock = new ReentrantLock();

    private int maxSize = 5;

    private Condition providerCondition = lock.newCondition();

    private Condition consumerCondition = lock.newCondition();

    public void provide(String value){
        try {
            lock.lock();
            while (queue.size() == maxSize) {
                providerCondition.await();
            }
            queue.add(value);
            consumerCondition.signal();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    public String consume(){
        String result = null;
        try {
            lock.lock();
            while (queue.size() == 0) {
                consumerCondition.await();
            }
            result = queue.poll();
            providerCondition.signal();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
        return result;
    }

    public static void main(String[] args) {
        LockConditionTest t = new LockConditionTest();
        new Thread(new Provider(t)).start();
        new Thread(new Consumer(t)).start();

    }

}

以两个问题驱动
1.队列满了,生产者线程怎么停下来的?队列从满又变为不满的时候,怎么重新激活。
2.队列空了,消费者线程如何停下来,又如何重新开始消费。
一步一步解答这两个问题

2.ReentrantLock

ReentrantLock初始化的时候,默认是初始化一个NonfairSync。

public ReentrantLock() {
        sync = new NonfairSync();
    }

  

NonfairSync类图

核心代码在AbstractQueuedSynchronizer中,只看数据结构的话,这是一个基于Node,带头指针和尾指针的双向链表,每一个Node里面存一个线程,以及该线程的等待状态

static final class Node {
        volatile int waitStatus;
        volatile Node prev;
        volatile Node next;
        volatile Thread thread;
        Node nextWaiter;
 }
private transient volatile Node head;
private transient volatile Node tail;
private volatile int state;

  

那么,ReentrantLock在lock和unlock的时候,操作的就是这个双向链表sync queue。
先看获取锁的过程

final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

  

1.如果这个锁没有任何线程持有,那么当前线程直接可以获得。(这是非公平锁的设计,如果是公平锁,需要检查是否有线程在排队,如果有,当前线程不能直接抢占,也要加入排队。)
2.如果这个锁被占用了,占用线程是当前线程,那么state+1,这也是可重入锁之所以可以重入的由来。
3.都不满足,暂时获取锁失败,返回false

那么如果在3这一步获取锁失败,后续如何处理呢?

public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

  

1.addWaiter:在等待队列sync queue中添加一个节点
2.acquireQueued:节点自旋获取锁或者进入阻塞

addWaiter比较简单,即把当前等待线程加入sync queue的尾节点。

private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }

  

acquireQueued具体做了什么呢?

final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

  

1.自旋
2.如果当前就一个线程在等待,那么尝试获取锁。(判断条件:当前节点的前驱为head,即head.next = 当前节点)
3.不满足2,如果满足进入阻塞的条件,调用LockSupport.park(this)进入阻塞。

一句话总结lock的过程:当前线程直接去尝试获取锁,不成功,则加入sync queue尾节点进行阻塞等待(非公平)。

在看unlock的过程

public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

  

1.先释放当前线程占有的锁,核心就是维护state的值。加一次锁,state+1,释放反之。
2.unparkSuccessor :之前提到,lock的时候,会维护一个排队的双向队列sync queue,此时,会unpark唤醒其中的head.next,使其进入锁竞争状态。

注:公平锁,非公平锁加锁的过程小有区别,一个是抢占式的,一个是排队式的,释放锁的过程则是完全相同的。

3.Condition

先看类图

Condition

用过Object的wait,notify的对这些方法应该不陌生,对应这里的await和signal
先看await

public final void await() throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            Node node = addConditionWaiter();
            int savedState = fullyRelease(node);
            int interruptMode = 0;
            while (!isOnSyncQueue(node)) {
                LockSupport.park(this);
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null) // clean up if cancelled
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
        }

  

1.构造一个Node,形成一个单向链表condition queue,存储用于await在某一个condition上的线程。
2.释放当前Node持有的锁。这个释放过程跟之前提到的unlock过程类似。
3.LockSupport.park进行阻塞,等待signal的唤醒。
4.如果被唤醒,继续加入锁的竞争中去。

在看signal

public final void signal() {
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            Node first = firstWaiter;
            if (first != null)
                doSignal(first);
        }

  

在某个condition进行await的时候,形成了一个单向链表condition queue。
那么在signal的时候,先对头结点firstWaiter进行唤醒。
唤醒的过程见下

final boolean transferForSignal(Node node) {
        /*
         * If cannot change waitStatus, the node has been cancelled.
         */
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
            return false;

        /*
         * Splice onto queue and try to set waitStatus of predecessor to
         * indicate that thread is (probably) waiting. If cancelled or
         * attempt to set waitStatus fails, wake up to resync (in which
         * case the waitStatus can be transiently and harmlessly wrong).
         */
        Node p = enq(node);
        int ws = p.waitStatus;
        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
            LockSupport.unpark(node.thread);
        return true;
    }

  

1.将这个头结点,从condition queue中移到之前提到的sync queue中。
2.LockSupport.unpark唤醒这个节点中的线程,进行锁争夺。

4 总结

ReentrantLock lock依赖的是一个双向链表sync queue
condition依赖的是一个单项链表condition queue
二者的阻塞和唤醒依赖的都是LockSupport的park和unpark方法。

公平锁与非公平锁的区别就在于获取锁的方式不同,公平锁获取,当前线程必须检查sync queue里面是否已经有排队线程。而非公平锁则不用考虑这一点,当前线程可以直接去获取锁。

开篇实现生产者消费者模型的时候,有两个问题,现在有答案了
1.队列满了,生产者线程怎么停下来的?队列从满又变为不满的时候,怎么重新激活。
---通过lock机制,保证同一时刻,只有一个线程获取到锁,要么生产,要么消费,队列满了之后,生产者线程调用providerCondition.await(),进入阻塞等待状态,使得生产者线程停下来。当消费线程消费的时候,调用 providerCondition.signal(),重新激活生产者线程。

2.队列空了,消费者线程如何停下来,又如何重新开始消费。
---与第一个问题同理。

作者:北交吴志炜
链接:https://www.jianshu.com/p/b60273eb71a9
来源:简书
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。

原文地址:https://www.cnblogs.com/felixzh/p/11995958.html

时间: 2024-08-28 21:22:46

Lock+Condition实现机制的相关文章

Lock&Condition实现线程同步通信

import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class ConditionCommunication {     final Business business = new Business();     public static void main(Stri

Lock+Condition 相对于 wait+notify 的一个优势案例分析

问题的描述 启动3个线程打印递增的数字, 线程1先打印1,2,3,4,5, 然后是线程2打印6,7,8,9,10, 然后是线程3打印11,12,13,14,15. 接着再由线程1打印16,17,18,19,20....以此类推, 直到打印到45. wait+notify实现: package com.tonyluis; public class NumberPrintDemo { static int n = 1; static int state = 0; public static void

Lock&amp;Condition实现线程同步通信

一,Lock Lock比传统的线程模型中的synchronized方式更加面向对象,因为"锁"本身就是一个对象. 两个线程执行的代码要实现同步互斥的效果,他们必须用同一个Lock对象. 读写锁:(1)读锁:多个读锁不互斥,读锁与写锁互斥,这是由jvm自己控制的,我们只需要代码中用对相应的锁即可.如果只读数据,那么可以很多人(线程)同时读,但是不能同时写,此时就加读锁.如果代码需要修改数据,此时只能一个人(一个线程)写,此时不能同时读,那么就加写锁. 总之,读时,上读锁:写时,上写锁.

线程间通信 Object/wait(),notify() 和 Lock/Condition/await(),signal()

基本前提知识: 一:Object/wait(), notify(), notifyAll() 1:wait() 方法暂停当前线程,并立即释放对象锁; 2:notify()/notifyAll() 方法唤醒其他等待该对象锁的线程,并在执行完同步代码块中的后续步骤后,释放对象锁 3:notify()和notifyAll()的区别在于: notify只会唤醒其中一个线程, notifyAll则会唤醒全部线程. 至于notify会唤醒哪个线程,是由线程调度器决定的. 例子: public class T

Java多线程——Lock&amp;Condition

Lock比传统线程模型中的synchronized方式更加面向对象,与生活中的锁类似,锁本身也应该是一个对象.两个线程执行的代码片段要实现同步互斥的效果,它们必须用同一个Lock对象. package java_thread; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class LockTest { /** * @param args */ pu

lock Condition简单示例

package wj; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; /**  * Created by wangjia .  * Date:2015/8/12 0012  * Time:16:58  */ public class App {     static class

再谈AbstractQueuedSynchronizer:共享模式与基于Condition的等待/通知机制实现

共享模式acquire实现流程 上文我们讲解了AbstractQueuedSynchronizer独占模式的acquire实现流程,本文趁热打铁继续看一下AbstractQueuedSynchronizer共享模式acquire的实现流程.连续两篇文章的学习,也可以对比独占模式acquire和共享模式acquire的区别,加深对于AbstractQueuedSynchronizer的理解. 先看一下共享模式acquire的实现,方法为acquireShared和acquireSharedInte

Java 线程锁机制 -Synchronized Lock 互斥锁 读写锁

synchronized 是互斥锁: lock 更广泛,包含了读写锁 读写锁特点: 1)多个读者可以同时进行读2)写者必须互斥(只允许一个写者写,也不能读者写者同时进行)3)写者优先于读者(一旦有写者,则后续读者必须等待,唤醒时优先考虑写者) 互斥锁特点: 一次只能一个线程拥有互斥锁,其他线程只有等待 所谓互斥锁, 指的是一次最多只能有一个线程持有的锁. 在jdk1.5之前, 我们通常使用synchronized机制控制多个线程对共享资源的访问. 而现在, Lock提供了比synchronize

Lock的lock/unlock, condition的await/singal 和 Object的wait/notify 的区别

在使用Lock之前,我们都使用Object 的wait和notify实现同步的.举例来说,一个producer和consumer,consumer发现没有东西了,等待,produer生成东西了,唤醒. 线程consumer 线程producer synchronize(obj){     obj.wait();//没东西了,等待 } synchronize(obj){     obj.notify();//有东西了,唤醒 } 有了lock后,世道变了,现在是: lock.lock();   co