使用ReentrantLock和Condition来代替内置锁和wait(),notify(),notifyAll()

使用ReentrantLock可以替代内置锁,当使用内置锁的时候,我们可以使用wait() nitify()和notifyAll()来控制线程之间的协作,那么,当我们使用ReentrantLock的时候,我们怎么来处理线程之间的写作呢?
JDK5.0为我们提供了Condition对象来替代内置锁的 wait(),notify()和notifyAll()方法

内置锁的话,就只能有一个等待队列,所有的在某个对象上执行wait()方法的线程都会被加入到该对象的等待队列中去(线程会被挂起),需要其他的线程在同一个对象上调用notify()或者是notifyAll()方法来唤醒等待队列中的线程

而使用Condition的话,可以使用不同的等待队列,只需要使用lock.newCondition()即可定义一个Condition对象,每一个Condition对象上都会有一个等待队列(底层使用AQS),调用某个Condition对象的await()方法,就可以把当前线程加入到这个Condition对象的等待队列上

其他的线程调用同一个Condition对象的sinal()或者是signalAll()方法则会唤醒等待队列上的线程,使其能够继续执行

我们以一个现实中的例子来说明若何使用ReentrantLock和Condition如何替代synchronized和wait(),notify(),notifyAll():

我们模拟两个线程,一个线程执行登录操作,该登录操作会阻塞,然后等待另外一个线程将其唤醒(类似扫描登录的场景,页面会阻塞,等待扫码和确认,然后页面才会跳转)

首先是使用内置锁的例子:

package com.jiaoyiping.baseproject.condition;

import java.io.Serializable;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;

/**
 * Created with Intellij IDEA
 *
 * @author: jiaoyiping
 * Mail: [email protected]
 * Date: 2019/04/12
 * Time: 15:29
 * To change this template use File | Settings | Editor | File and Code Templates
 */

//使用内置锁来实现的等待/通知模型

public class LoginServiceUseInnerLock {
    private ConcurrentHashMap<String, Result> loginMap = new ConcurrentHashMap<>();

    public static void main(String[] args) throws InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(2);
        LoginServiceUseInnerLock loginService = new LoginServiceUseInnerLock();
        String uuid = UUID.randomUUID().toString();
        System.out.println("[" + Thread.currentThread().getName() + "] 使用的UUID是: " + uuid);
        new Thread(() -> {
            loginService.login(uuid, 20_000);
            countDownLatch.countDown();
        }, "登录线程").start();
        Thread.sleep(2_000);
        new Thread(() -> {
            loginService.confirm(uuid);
            countDownLatch.countDown();
        }, "确认线程").start();
        countDownLatch.await();
        System.out.println("[" + Thread.currentThread().getName() + "] 两个线程都执行完毕了");

    }

    public void login(String code, int timeout) {
        Result result = new Result();
        result.setMessage("超时");
        loginMap.put(code, result);
        synchronized (result) {
            try {
                //超时的话,会自动返回,程序继续
                System.out.println("[" + Thread.currentThread().getName() + "] 登录线程挂起");
                result.wait(timeout);
                System.out.println("[" + Thread.currentThread().getName() + "] 登录线程继续执行,得到的结果是:" + result.getMessage());
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                loginMap.remove(code);
            }
        }
    }

    public void confirm(String code) {
        assert code != null;
        Result result = loginMap.get(code);
        if (result == null) {
            System.out.println("[" + Thread.currentThread().getName() + "] 请求不存在或者已经过期");
            return;
        }
        result.setMessage("成功");
        synchronized (result) {
            //唤醒等待队列上的线程
            System.out.println("[" + Thread.currentThread().getName() + "] 确认线程开始唤醒阻塞的线程");
            result.notify();
        }

    }

    class Result implements Serializable {
        private static final long serialVersionUID = -4279280559711939661L;
        String message;

        public String getMessage() {
            return message;
        }

        public void setMessage(String message) {
            this.message = message;
        }

        public Result() {
        }
        public Result(String message) {
            this.message = message;
        }
    }

使用内置锁的时候,我们把random生成的key和一个自己定义的Result对象放置到ConcurrentHashMap中去,登录线程调用 Result对象的wait(timeout) 方法将当前线程挂起,并加入到Result对象的等待队列上去

确认线程根据key值,找到对应的Result对象,设置好message,然后调用Result对象的notify()方法唤醒等待队列上的线程,登录线程得以继续执行

那我们如何使用ReentrantLock和Condition来重写这个例子:

package com.jiaoyiping.baseproject.condition;

import java.io.Serializable;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

/**
 * Created with Intellij IDEA
 *
 * @author: jiaoyiping
 * Mail: [email protected]
 * Date: 2019/04/12
 * Time: 14:56
 * To change this template use File | Settings | Editor | File and Code Templates
 */

//使用ReentrantLock和Condition来实现的等待/通知模型

public class LoginServiceUseCondition {

    private ReentrantLock lock = new ReentrantLock();
    ConcurrentHashMap<String, Result> conditions = new ConcurrentHashMap<>();

    public static void main(String[] args) throws InterruptedException {
        LoginServiceUseCondition loginService = new LoginServiceUseCondition();
        String uuid = UUID.randomUUID().toString();
        System.out.println("[" + Thread.currentThread().getName() + "] 使用的UUID是:" + uuid);
        CountDownLatch countDownLatch = new CountDownLatch(2);
        new Thread(() -> {
            loginService.login(uuid, 30_000);
            countDownLatch.countDown();
        }, "登录线程").start();
        Thread.sleep(5_000);
        new Thread(() -> {
            try {
                Thread.sleep(3_000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            loginService.confirm(uuid);
            countDownLatch.countDown();
        }, "确认线程").start();
        countDownLatch.await();
        System.out.println("[" + Thread.currentThread().getName() + "] 两个线程都执行完毕了,退出");
    }

    /**
     * 过了超时时间之后,锁会自动释放
     *
     * @param code
     * @param timeout
     */
    public void login(String code, int timeout) {
        assert code != null;
        try {
            lock.tryLock(timeout, TimeUnit.MILLISECONDS);
            Condition condition = lock.newCondition();
            Result result = new Result("超时", condition);
            conditions.put(code, result);
            System.out.println("[" + Thread.currentThread().getName() + "] login()的请求开始阻塞");
            condition.await();
            System.out.println("[" + Thread.currentThread().getName() + "] 结束等待,继续执行,拿到的结果是" + result.getMessage());

        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }

    }

    //确认线程(拿这个UUID,去找到对应的Condition,唤醒上边的等待队列,并把Condition对象移除掉)

    public void confirm(String code) {
        assert code != null;
        Result result = conditions.get(code);
        Condition condition = result.getCondition();
        if (condition != null) {
            try {
                System.out.println("[" + Thread.currentThread().getName() + "] 找到对应的Condition对象,将其等待队列中的线程唤醒");
                lock.lock();
                result.setMessage("成功");
                condition.signal();
                conditions.remove(code);
            } finally {
                lock.unlock();
            }
        }
    }

    class Result implements Serializable {
        String message;
        final Condition condition;

        public Result(String message, Condition condition) {
            this.message = message;
            this.condition = condition;
        }

        public Result(Condition condition) {
            this.condition = condition;
        }

        public String getMessage() {
            return message;
        }

        public void setMessage(String message) {
            this.message = message;
        }

        public Condition getCondition() {
            return condition;
        }
    }

}    

上边的例子说明了怎么使用ReentrantLock和Condition来代替内置锁和wait(),notify(),notifyAll()

下边的一个来自jdk中的例子,演示了如何使用同一个ReentrantLock上的多个等待队列的情况

来自JDK文档中的示例(我稍加改造,加上了main方法和一些日志):

package com.jiaoyiping.baseproject.condition;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.IntStream;

/**
 * Created with Intellij IDEA
 *
 * @author: jiaoyiping
 * Mail: [email protected]
 * Date: 2019/04/12
 * Time: 21:17
 * To change this template use File | Settings | Editor | File and Code Templates
 */

public class BoundedBuffer {
    final Lock lock = new ReentrantLock();
    final Condition notFull = lock.newCondition();
    final Condition notEmpty = lock.newCondition();

    final Object[] items = new Object[10];
    int putptr, takeptr, count;

    public static void main(String[] args) throws InterruptedException {
        BoundedBuffer boundedBuffer = new BoundedBuffer();
        CountDownLatch countDownLatch = new CountDownLatch(40);
        //分别启动20个put线程和20个take线程

        IntStream.rangeClosed(1, 20).forEach(i -> {
            new Thread(() -> {
                try {
                    boundedBuffer.put(new Object());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    countDownLatch.countDown();
                }
            }, "put线程 - " + i).start();
        });

        IntStream.rangeClosed(1, 20).forEach(i -> {
            new Thread(() -> {
                try {
                    boundedBuffer.take();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    countDownLatch.countDown();
                }
            }, "take线程-" + i).start();
        });

        countDownLatch.await();
        System.out.println("[" + Thread.currentThread().getName() + "] 所有线程都执行完毕,退出");
    }

    public void put(Object x) throws InterruptedException {
        lock.lock();
        try {
            //put的线程,当队列满的时候挂起
            while (count == items.length) {
                System.out.println("[" + Thread.currentThread().getName() + "] 线程挂起");
                notFull.await();
            }
            Thread.sleep(1_000);
            items[putptr] = x;
            if (++putptr == items.length) {
                putptr = 0;
            }
            ++count;
            System.out.println("[" + Thread.currentThread().getName() + "] 执行完毕写操作,唤醒take线程");
            notEmpty.signal();
        } finally {
            lock.unlock();
        }
    }

    public Object take() throws InterruptedException {
        lock.lock();
        try {
            //take的线程,当队列为空的时候,挂起
            while (count == 0) {
                System.out.println("[" + Thread.currentThread().getName() + "] 线程挂起");
                notEmpty.await();
            }
            Thread.sleep(1_000);
            Object x = items[takeptr];
            if (++takeptr == items.length) {
                takeptr = 0;
            }
            --count;
            System.out.println("[" + Thread.currentThread().getName() + "] 执行完毕读操作,唤醒put线程");
            notFull.signal();
            return x;
        } finally {
            lock.unlock();
        }
    }
}    

我们看到,以上代码中,使用到了两个Condition:notFull和notEmpty,都是通过lock对象的newCondition()方法得来的

items被放满之后,put的线程会在notFull的等待队列上进行等待(执行了notFull.await()方法) put线程执行完操作之后,会调用 notEmpty.signal()来试图唤醒在notEmpty上等待的线程(也就是给take线程发了一个信号,告诉它,items不是空的了,你可以过来take了)

当item空了之后,take线程会在notEmpty的等待队列上进行等待(执行了notEmpty的await()方法) 当take线程执行完操作之后,会调用notFull.signal()来唤醒在notFull上等待的线程(也就是给put线程发一个信号,告诉它,items不满了,你可以进行put操作了)

和内置方法类似,在调用await(),signal(),signalAll()等方法的时候,也必须要获得锁,也就是必须在 lock.lock()和lock.unlock()代码块儿之间才能调用这些方法,否则就会抛出IllegalMonitorStateException

原文地址:https://www.cnblogs.com/jiaoyiping/p/10699273.html

时间: 2024-08-17 06:16:36

使用ReentrantLock和Condition来代替内置锁和wait(),notify(),notifyAll()的相关文章

Java 并发:内置锁 Synchronized

摘要: 在多线程编程中,线程安全问题是一个最为关键的问题,其核心概念就在于正确性,即当多个线程访问某一共享.可变数据时,始终都不会导致数据破坏以及其他不该出现的结果.而所有的并发模式在解决这个问题时,采用的方案都是序列化访问临界资源 .在 Java 中,提供了两种方式来实现同步互斥访问:synchronized 和 Lock.本文针对 synchronized 内置锁 详细讨论了其在 Java 并发 中的应用,包括它的具体使用场景(同步方法.同步代码块.实例对象锁 和 Class 对象锁).可重

转:【Java并发编程】之一:可重入内置锁

每个Java对象都可以用做一个实现同步的锁,这些锁被称为内置锁或监视器锁.线程在进入同步代码块之前会自动获取锁,并且在退出同步代码块时会自动释放锁.获得内置锁的唯一途径就是进入由这个锁保护的同步代码块或方法. 当某个线程请求一个由其他线程持有的锁时,发出请求的线程就会阻塞.然而,由于内置锁是可重入的,因此如果摸个线程试图获得一个已经由它自己持有的锁,那么这个请求就会成功."重入"意味着获取锁的操作的粒度是"线程",而不是调用.重入的一种实现 方法是,为每个锁关联一个

《java并发编程实战》读书笔记1--线程安全性,内置锁,重入,状态

什么是线程安全? 当多个线程访问某个类时,不管这些的线程的执行顺序如何,并且在主调代码中不需要任何额外的同步或协同,这个类都能表现出正确的行为,那么就称这个类是线程安全的. 哈哈书上的解释,还是翻译过来的,看了半天还是觉得有点奇怪.比如说 "类都能表现出正确的行为" 是毛线意思?在网上搜了一番 "线程安全就是说多线程访问同一代码,不会产生不确定的结果" 这样反而跟容易理解,果然读书要么读原版中文书要么读原版英文书,看英文的翻译版真的是蛋疼无比.说到这里,我以前貌似把

内置锁(二)synchronized下的等待通知机制

一.等待/通知机制的简介 线程之间的协作: ??为了完成某个任务,线程之间需要进行协作,采取的方式:中断.互斥,以及互斥上面的线程的挂起.唤醒:如:生成者--消费者模式.或者某个动作完成,可以唤醒下一个线程.管道流已准备等等: 等待/通知机制: ? ?等待/通知机制 是线程之间的协作一种常用的方式之一,在显示锁Lock 和 内置锁synchronized都有对应的实现方式. 等待/通知机制 经典的使用方式,便是在生产者与消费者的模式中使用: 1.生产者负责生产商品,并送到仓库中存储: 2.消费者

012 内置锁和synchronized

一 . 概述 在前面我们说到线程安全性问题解决的核心就是同步,同步的核心就是保证原子性. 在java之中最早就支持语法层面的同步解决了,并且提供了synchronized的方式解决问题. 二 .内置锁 在java之中每一个对象都是一个内置锁,这个在JVM的体系之中就规定好了. 内置锁的规定也就决定我们可以拿任意的对象进行同步操作. 内置锁常常配合synchronized使用. 三 .synchronized 该关键词的作用是同步,需要配合内置锁进行使用. 常见的synchronized的使用方式

jvm内置锁synchronized不能被中断

很久没看技术书籍了,今天看了一下<七周七并发模型>前面两章讲的java,写的还是有深度的.看到了一个有demo,说jvm内置锁synchronized是不能被中断的.照着书上写了个demo,验证了一下,是不能被中断 /** * @Author: * @Description: jdk内置锁不能被中断 * @Date: Created in : 2018/10/4 下午11:34 **/ public class Uninterruptible { private static final Ob

java 并发——内置锁

坚持学习,总会有一些不一样的东西. 一.由单例模式引入 引用一下百度百科的定义-- 线程安全是多线程编程时的计算机程序代码中的一个概念.在拥有共享数据的多条线程并行执行的程序中,线程安全的代码会通过同步机制保证各个线程都可以正常且正确的执行,不会出现数据污染等意外情况. 文字定义总是很含糊,举个反例就很清楚了,想起之前总结过单例模式,就从单例模式开始吧.如果不清楚单例模式的新同学,可以看一下这篇总结: java中全面的单例模式多种实现方式总结 单例模式中,懒汉式的实现方案如下: public c

Java并发编程(1):可重入内置锁

每个Java对象都可以用做一个实现同步的锁,这些锁被称为内置锁或监视器锁.线程在进入同步代码块之前会自动获取锁,并且在退出同步代码块时会自动释放锁.获得内置锁的唯一途径就是进入由这个锁保护的同步代码块或方法. 当某个线程请求一个由其他线程持有的锁时,发出请求的线程就会阻塞.然而,由于内置锁是可重入的,因此如果摸个线程试图获得一个已经由它自己持有的锁,那么这个请求就会成功."重入"意味着获取锁的操作的粒度是"线程",而不是调用.重入的一种实现方法是,为每个锁关联一个获

Synchronzied(内置锁)

原文地址:深入JVM锁机制1-synchronized 1. 线程的状态与转换 当多个线程同时请求某个对象监视器时,对象监视器会设置几种状态用来区分请求的线程: Contention List:所有请求锁的线程将被首先放置到该竞争队列 Entry List:Contention List 中那些有资格成为候选人(怎么才算有资格成为候选人)的线程被移到 Entry List Wait Set:那些调用wait方法被阻塞的线程被放置到Wait Set OnDeck:任何时刻最多只能有一个线程正在竞争