Java并发编程之Condition

1.使用synchronized中的等待和唤醒实现消费者和生产者模式

/**
 * 使用Synchronized实现消费者生产者模式
 */
public class SynchronizedDemo {

    static List<Integer> list = new ArrayList<Integer>();

    private static int maxNum = 5;

    // 消费者
    private void Consumer(String name){
        synchronized (list){
            while(list.isEmpty()){
                // 如果list为空,调用wait等待,并且释放锁
                try {
                    System.out.println("----当前产品数量为0个,"+name+"无法消费");
                    list.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            list.remove(0);
            System.out.println("当前产品数量为"+list.size()+"个,"+name+"消费1个产品");
            list.notifyAll();
        }
    }

    // 生产者
    private void Producer(String name){
        synchronized (list){
            while(list.size()>maxNum){
                // 如果list容量大于5个,那么就等待,直到有空余容量再生产产品
                try {
                    System.out.println("++++当前容量已满,"+name+"无法生产");
                    list.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            list.add(1);
            System.out.println("当前容量为,"+list.size()+","+name+"生产1个产品");
            list.notifyAll();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        SynchronizedDemo demo = new SynchronizedDemo();

        Thread consumer1 = new Thread(()->demo.Consumer("consumer1"));
        Thread consumer2 = new Thread(()->demo.Consumer("consumer2"));
        Thread consumer3 = new Thread(()->demo.Consumer("consumer3"));
        Thread consumer4 = new Thread(()->demo.Consumer("consumer4"));

        Thread producer1 = new Thread(()->demo.Producer("producer1"));
        Thread producer2 = new Thread(()->demo.Producer("producer2"));
        Thread producer3 = new Thread(()->demo.Producer("producer3"));
        Thread producer4 = new Thread(()->demo.Producer("producer4"));

        consumer1.start();
        consumer2.start();
        consumer3.start();
        consumer4.start();

        producer1.start();
        producer2.start();
        producer3.start();
        producer4.start();

        TimeUnit.SECONDS.sleep(5);
        System.out.println("最后剩余多少个产品:"+demo.list.size());

    }
}

如上图,假设有一个公共的容量有限的池子,有两种人,一种是生产者,另一种是消费者。需要满足如下条件:

  1. 生产者产生资源往池子里添加,前提是池子没有满,如果池子满了,则生产者暂停生产,直到自己的生成能放下池子。
  2. 消费者消耗池子里的资源,前提是池子的资源不为空,否则消费者暂停消耗,进入等待直到池子里有资源数满足自己的需求。

注意:

  1. wait()方法和notifyAll()方法必须放在同步块内调用(synchronized块内),否则会报错。
  2. 调用wait()方法,线程会进入等待状态,同时释放锁。
  3. 调用notify()或者notifyAll()会唤醒正在等待的线程。

2.Condition

????任意一个Java对象,都拥有一组监视器方法(定义在java.lang.Object上),主要包括wait()、wait(long timeout)、notify()以及notifyAll()方法,这些方法与synchronized同步关键字配合,可以
实现等待/通知模式
Condition接口也提供了类似Object的监视器方法,与Lock配合可以实现等待/通知模式,但是这两者在使用方式以及功能特性上还是有差别的。

Object的监视器方法与Condition接口的对比:

3.Condition接口与示例

????Condition定义了等待/通知两种类型的方法,当前线程调用这些方法时,需要提前获取到
Condition对象关联的锁
Condition对象是由Lock对象(调用Lock对象的newCondition()方法)创
建出来的
,换句话说,Condition是依赖Lock对象的

使用Condition接口改写上面的消费者生产者代码:

/**
 * 使用Condition实现消费者生产者模式
 */
public class ConditionDemo {

    static List<Integer> list = new ArrayList<Integer>();

    static ReentrantLock lock = new ReentrantLock();

    static Condition condition = lock.newCondition();

    private static int maxNum = 5;

    // 消费者
    private void Consumer(String name){
        lock.lock();
        try{
            while(list.isEmpty()){
                System.out.println("----当前产品数量为0个,"+name+"进入等待队列");
                condition.await();
            }
            list.remove(0);
            System.out.println("当前产品数量为"+list.size()+"个,"+name+"消费1个产品");
            condition.signal();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            lock.unlock();
        }
    }

    // 生产者
    private void Producer(String name){
        lock.lock();
        try{
            while(list.size()>maxNum){
                System.out.println("++++当前容量已满,"+name+"进入等待队列");
                condition.await();
            }
            list.add(1);
            System.out.println("当前容量为,"+list.size()+","+name+"生产1个产品");
            condition.signal();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            lock.unlock();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        ConditionDemo demo = new ConditionDemo();

        Thread consumer1 = new Thread(()->demo.Consumer("consumer1"));
        Thread consumer2 = new Thread(()->demo.Consumer("consumer2"));
        Thread consumer3 = new Thread(()->demo.Consumer("consumer3"));
        Thread consumer4 = new Thread(()->demo.Consumer("consumer4"));

        Thread producer1 = new Thread(()->demo.Producer("producer1"));
        Thread producer2 = new Thread(()->demo.Producer("producer2"));
        Thread producer3 = new Thread(()->demo.Producer("producer3"));
        Thread producer4 = new Thread(()->demo.Producer("producer4"));

        consumer1.start();
        consumer2.start();
        consumer3.start();
        consumer4.start();

        producer1.start();
        producer2.start();
        producer3.start();
        producer4.start();

        TimeUnit.SECONDS.sleep(5);
        System.out.println("最后剩余多少个产品:"+demo.list.size());

    }
}

????这里创建了4个消费者4个生产者,有个容量池list,当list为空时,调用condition.await()进入等待队列,其他线程调用condition.signal()唤醒一个等待在Contidion上的线程,该线程从等待方法返回前必须获得与Contidion相关联的锁。当list大于最大容量时,生产者调用condition.await()进行等待,直到其他线程调用condition.signal()唤醒当前线程,并且去获取相关联的锁,只有获取到了锁才会从await方法返回。
????获取一个Condition必须通过Lock的newCondition()方法

????Condition的(部分)方法以及描述:

void await() throws InterruptedException

  1. 当前线程进入等待状态,如果其他线程调用condition的signal或者signalAll方法并且当前线程获取Lock从await方法返回,如果在等待状态中被中断会抛出被中断异常;
  2. 其他线程调用interrupt()可以中断正在等待的线程。
  3. 线程从await方法返回,说明线程已经获取到了锁。

void awaitUninterruptibly()

????当前线程进入等待状态直到被通知,该方法对中断不敏感,也就是在等待状态中不能被中断。

long awaitNanos(long nanosTimeout) throws InterruptedException

????当前线程进入等待状态,直到被通知,中断,或者超时。返回值表示剩余时间,如果返回值为0或者负数,说明已经超时了。如果在nanosTimeout之前就被唤醒了,那么返回值就是nanosTimeout-实际耗时。

boolean await(long time, TimeUnit unit) throws InterruptedException

????当前线程进入等待状态,直到被通知,中断,或者超时。支持自定义时间单位,false:表示方法超时之后自动返回的,true:表示等待还未超时时,await方法就返回了(超时之前,被其他线程唤醒了)。

boolean awaitUntil(Date deadline) throws InterruptedException

????当前线程进入等待状态,直到被通知,中断,或者到将来某个时间,如果没有到指定时间就被通知,返回true,如果到了某个时间,还未被唤醒,就返回false。

void signal()

????唤醒一个等待在Condition上的线程,该线程从等待方法返回前必须获得与Condition相关联的锁。

void signalAll()

????唤醒所有等待在Condition上的线程,能够从等待方法返回的线程必须是获得了与Condition相关联的锁。

4.同一个锁支持创建多个Condition

????使用两个Condition改写上面的消费者生产者:

/**
 * 使用Condition实现消费者生产者模式
 */
public class ConditionDemo {

    static List<Integer> list = new ArrayList<Integer>();

    static ReentrantLock lock = new ReentrantLock();

    // 当队列已满时,生产者不能生产产品,在full队列中等待
    static Condition full = lock.newCondition();

    // 当队列为空时,消费者不能消费产品,在empty队列中等待
    static Condition empty = lock.newCondition();

    private static int maxNum = 5;

    // 消费者
    private void Consumer(String name){
        lock.lock();
        try{
            while(list.isEmpty()){
                System.out.println("----当前产品数量为0个,"+name+"进入等待队列");
                empty.await();
            }
            list.remove(0);
            System.out.println("当前产品数量为"+list.size()+"个,"+name+"消费1个产品");
            // full队列中等待的线程
            full.signalAll();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            lock.unlock();
        }
    }

    // 生产者
    private void Producer(String name){
        lock.lock();
        try{
            while(list.size()>maxNum){
                System.out.println("++++当前容量已满,"+name+"进入等待队列");
                full.await();
            }
            list.add(1);
            System.out.println("当前容量为,"+list.size()+","+name+"生产1个产品");
            empty.signalAll();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            lock.unlock();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        ConditionDemo demo = new ConditionDemo();

        Thread consumer1 = new Thread(()->demo.Consumer("consumer1"));
        Thread consumer2 = new Thread(()->demo.Consumer("consumer2"));
        Thread consumer3 = new Thread(()->demo.Consumer("consumer3"));
        Thread consumer4 = new Thread(()->demo.Consumer("consumer4"));

        Thread producer1 = new Thread(()->demo.Producer("producer1"));
        Thread producer2 = new Thread(()->demo.Producer("producer2"));
        Thread producer3 = new Thread(()->demo.Producer("producer3"));
        Thread producer4 = new Thread(()->demo.Producer("producer4"));

        consumer1.start();
        consumer2.start();
        consumer3.start();
        consumer4.start();

        producer1.start();
        producer2.start();
        producer3.start();
        producer4.start();

        TimeUnit.SECONDS.sleep(5);
        System.out.println("最后剩余多少个产品:"+demo.list.size());

    }
}

????示例代码中用了两个Condition,因此会有两个等待队列,当消费者消费数据时,如果list为空,那么就进入empty队列中等待,当生产者生产数据时,如果list容量已满,那么就进入full队列中等待。此时生产者和消费者都在各自的等待队列中等待如果是synchronized的话,只支持一个等待队列,Condition可以支持多个等待队列

总结

1. 使用condition的步骤:创建condition对象,获取锁,然后调用condition的方法

2. 一个ReentrantLock支持创建多个condition对象

3. void await()throwsInterruptedException;方法会释放锁,让当前线程等待,支持唤醒,支持线程中断

4. void awaitUninterruptibly();方法会释放锁,让当前线程等待,支持唤醒,不支持线程中断

5. long awaitNanos(longnanosTimeout)throwsInterruptedException;参数为纳秒,此方法会释放锁,让当前线程等待,支持唤醒,支持中断。超时之后返回的,结果为负数;超时之前被唤醒返回的,结果为正数(表示返回时距离超时时间相差的纳秒数)

6. boolean await(longtime,TimeUnitunit)throwsInterruptedException;方法会释放锁,让当前线程等待,支持唤醒,支持中断。超时之后返回的,结果为false;超时之前被唤醒返回的,结果为true

7. boolean awaitUntil(Datedeadline)throwsInterruptedException;参数表示超时的截止时间点,方法会释放锁,让当前线程等待,支持唤醒,支持中断。超时之后返回的,结果为false;超时之前被唤醒返回的,结果为true

8. void signal();会唤醒一个等待中的线程,然后被唤醒的线程会被加入同步队列,去尝试获取锁

9. void signalAll();会唤醒所有等待中的线程,将所有等待中的线程加入同步队列,然后去尝试获取锁

原文资料:

java并发编程的艺术

https://mp.weixin.qq.com/s/n7OWc69dLAcesiBertbjDA

原文地址:https://www.cnblogs.com/chenshy/p/11669794.html

时间: 2024-11-05 06:04:14

Java并发编程之Condition的相关文章

java并发编程之Master-Worker模式

Master-Worker模式适合在一个任务可以拆分成多个小任务来进行的情况下使用. package cn.fcl.masterworker; import java.util.HashMap; import java.util.Map; import java.util.Queue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; public c

Java并发编程之volatile的理解

Java并发编程之volatile关键字的理解 Java中每个线程都有自己的工作内存,类比于处理器的缓存,线程的工作内存中保存了被该线程使用到的变量的主内存的拷贝.线程读写变量都是直接在自己的工作内存中进行的,而何时刷新数据(指将修改的结果更新到主存或者把主存的变量读取覆盖掉工作内存中的值)是不确定的. volatile关键字是修饰字段的关键字,貌似是JDK1.5之后才有的,在多线程编程中,很大的几率会用到这个关键字,volatile修饰变量后该变量有这么一种效果:线程每一次读该变量都是直接从主

java并发编程之future模式

1.当你想并发去执行一段代码,但是还想获取这段代码的返回结果,那么future多线程模式就可以派上用场了,代码实现如下. public class Client { public Data request() { final FutureData futureData = new FutureData(); new Thread(new Runnable() { @Override public void run() { futureData.setRealData(new RealData()

Java并发编程之ConcurrentHashMap

ConcurrentHashMap ConcurrentHashMap是一个线程安全的Hash Table,它的主要功能是提供了一组和HashTable功能相同但是线程安全的方法.ConcurrentHashMap可以做到读取数据不加锁,并且其内部的结构可以让其在进行写操作的时候能够将锁的粒度保持地尽量地小,不用对整个ConcurrentHashMap加锁. ConcurrentHashMap的内部结构 ConcurrentHashMap为了提高本身的并发能力,在内部采用了一个叫做Segment

java并发编程之Guarded Suspention

当客户端请求速度远远大于服务端的处理速度,这时候就非常适合使用Guarded Suspention模式 package cn.fcl.guardendSuspension; import java.util.ArrayList; import java.util.List; public class RequestQueue { private List<Integer> integers = new ArrayList<Integer>(); public synchronize

Java并发编程之set集合的线程安全类你知道吗

Java并发编程之-set集合的线程安全类 Java中set集合怎么保证线程安全,这种方式你知道吗? 在Java中set集合是 本篇是<凯哥(凯哥Java:kagejava)并发编程学习>系列之<并发集合系列>教程的第二篇: 本文主要内容:Set集合子类底层分别是什么?基于底层为什么set的子类可以存放一个数据?怎么解决set线程安全问题? 一:Set集合子类 Set的三个子类分别是:HaseSet.TreeSet.LinkedHashSet.这三个都是线程不安全的.那么这三个子类

java并发编程之CountDownLatch与CyclicBarrier

CountDownLatch和CyclicBarrier是jdk concurrent包下非常有用的两个并发工具类,它们提供了一种控制并发流程的手段.本文将会提供一些应用场景,结合源码,对它们的具体实现以及如何使用做一个具体分析. CountDownLatch CountDownLatch允许一个或多个线程等待其他线程完成操作. CountDownLatch使用案例 需求:解析一个文件下多个txt文件数据,可以考虑使用多线程并行解析以提高解析效率.每一个线程解析一个文件里的数据,等到所有数据解析

Java并发编程之Phaser类

Phaser这个类的使用场景为N个线程分阶段并行的问题.有这么一个任务为"做3道题",每个学生一个进程,5个学生可以并行做,这个就是常规的并发,但是如果加一个额外的 限制条件,必须等所有人都做完类第一题,才能开始做第二题,必须等所有人都做完了第二题,才能做第三题,这个问题就转变成了分阶段并发的问题,最适合用Phaser来解题,下面给出源代码,大家可以自己尝试: MyPhaser.java import java.util.concurrent.Phaser; public class

Java并发编程之ConcurrentHashMap原理分析

前言: 集合是编程中最常用的数据结构.而谈到并发,几乎总是离不开集合这类高级数据结构的支持.比如两个线程需要同时访问一个中间临界区(Queue),比如常会用缓存作为外部文件的副本(HashMap).这篇文章主要分析jdk1.5的3种并发集合类型(concurrent,copyonright,queue)中的ConcurrentHashMap,让我们从原理上细致的了解它们,能够让我们在深度项目开发中获益非浅. 在tiger之前,我们使用得最多的数据结构之一就是HashMap和Hashtable.大