java并发系列(二)-----线程之间的协作(wait、notify、join、CountDownLatch、CyclicBarrier)

在java中,线程之间的切换是由操作系统说了算的,操作系统会给每个线程分配一个时间片,在时间片到期之后,线程让出cpu资源,由其他线程一起抢夺,那么如果开发想自己去在一定程度上(因为没办法100%控制它)让线程之间互相协作、通信,有哪些方式呢?

wait、notify、notifyAll

1、void wait( )
导致当前的线程等待,直到其他线程调用此对象的notify( ) 方法或 notifyAll( ) 方法
2、void wait(long timeout)
导致当前的线程等待,直到其他线程调用此对象的notify() 方法或 notifyAll() 方法,或者指定的时间过完。
3、void notify()
唤醒在此对象监视器上等待的单个线程
4、void notifyAll()
唤醒在此对象监视器上等待的所有线程

举例说明:

public class WaitTest1 {

    public static void main(String[] args) {

        ThreadA ta = new ThreadA("ta");

        synchronized(ta) { // 通过synchronized(ta)获取“对象ta的同步锁”
            try {
                System.out.println(Thread.currentThread().getName()+" start ta");
                ta.start();

                System.out.println(Thread.currentThread().getName()+" block");
                ta.wait();    // 等待

                System.out.println(Thread.currentThread().getName()+" continue");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    static class ThreadA extends Thread{

        public ThreadA(String name) {
            super(name);
        }

        public void run() {
            synchronized (this) { // 通过synchronized(this)获取“当前对象的同步锁”。这个this代表是内部类ThreadA的对象。
                System.out.println(Thread.currentThread().getName()+" wakup others");
                notify();    // 唤醒“当前对象上的等待线程”
            }
        }
    }
}

运行结果

main start ta
main block
ta wakup others
main continue

注:notify()与notifyAll()区别:notify()会使等待获取某对象锁的一个线程到Runnable状态,但是notifyAll()会使所有的线程到Runnable状态,虽然notifyAll()方法性能开销略大,但是不存在信号丢失问题,因此优先推荐使用notifyAll()。

Thread.join()

1、作用

当一个线程希望等待另外一个或多个线程运行结束时可以使用。

2、方法签名

public final void join()throws InterruptedException
//等待多少秒
public void join(long millis)throwsInterruptedException
//看了下源码,也不知道这方法是干啥的。。。
public final void join(long millis, int nanos)throws InterruptedException

3、示例

package com.ty.thread;

public class JoinDemo {

    class Boss implements Runnable {

        @Override
        public void run() {
            System.out.println("当前线程为:" + Thread.currentThread().getName() + "老板给工人发工资");
        }

    }

    static class Worker implements Runnable {

        @Override
        public void run() {
            System.out.println("当前线程为:" + Thread.currentThread().getName() + "工人准备干活喽*****");

            try {
                //模拟工人干活所花费的时间
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            System.out.println("当前线程为:" + Thread.currentThread().getName() + "工人干活结束*****");
        }

    }

    public static void main(String[] args) throws InterruptedException {
        Thread worker1 = new Thread(new Worker(), "工人1线程");
        Thread worker2 = new Thread(new Worker(), "工人2线程");
        Thread worker3 = new Thread(new Worker(), "工人3线程");
        //worker与boss一个为static,一个不为static,只是为了展示下不同的写法
        Thread boss = new Thread(new JoinDemo().new Boss(), "boss线程");
        worker1.start();
        worker2.start();
        worker3.start();
        //等待工人干完活,老板才能给钱啊,可恶的资本主义。。。
        worker1.join();
        worker2.join();
        worker3.join();
        boss.start();
    }
}

运行结果如下:

当前线程为:工人1线程工人准备干活喽*****
当前线程为:工人3线程工人准备干活喽*****
当前线程为:工人2线程工人准备干活喽*****
当前线程为:工人1线程工人干活结束*****
当前线程为:工人2线程工人干活结束*****
当前线程为:工人3线程工人干活结束*****
当前线程为:boss线程老板给工人发工资

反正boss线程肯定是等所有工人线程运行结束之后才开始运行

Condition

wait()/notify()存在的问题:
1、过于底层,并且不好控制
2、存在过早唤醒的情况
3、wait(long)无法区分是等待超时还是被通知线程唤醒

过早唤醒:上图中Notify1线程调用了notifyAll()或notify(),唤醒了wait4线程,但是其实这时候wait4不满足唤醒条件,这种情况就叫做过早唤醒。可以这么理解,你在大街上看到一个美女,然后你大喊一声美女,N个女生回头,就是这个意思。

因此出现了Condition接口,它作为wait/notify的替代品,解决了过早唤醒的情况,并且解决了wait(long)不能区分其返回是否由等待超时而导致的问题。Condition中的await()、signal()以及signalAll()分别替代wait()、notify()以及notifyAll()。不同的是,Object中的wait(),notify(),notifyAll()方法是和"同步锁"(synchronized关键字)捆绑使用的;而Condition是需要与"互斥锁"/"共享锁"捆绑使用的。

主要方法:

// 造成当前线程在接到信号或被中断之前一直处于等待状态。
void await()
// 造成当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态。
boolean await(long time, TimeUnit unit)
// 造成当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态。
long awaitNanos(long nanosTimeout)
// 造成当前线程在接到信号之前一直处于等待状态。
void awaitUninterruptibly()
// 造成当前线程在接到信号、被中断或到达指定最后期限之前一直处于等待状态。
boolean awaitUntil(Date deadline)
// 唤醒一个等待线程。
void signal()
// 唤醒所有等待线程。
void signalAll()

示例:

import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

public class ConditionTest1 {

    private static Lock lock = new ReentrantLock();
    private static Condition condition = lock.newCondition();

    public static void main(String[] args) {

        ThreadA ta = new ThreadA("ta");

        lock.lock(); // 获取锁
        try {
            System.out.println(Thread.currentThread().getName()+" start ta");
            ta.start();

            System.out.println(Thread.currentThread().getName()+" block");
            condition.await();    // 等待

            System.out.println(Thread.currentThread().getName()+" continue");
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();    // 释放锁
        }
    }

    static class ThreadA extends Thread{

        public ThreadA(String name) {
            super(name);
        }

        public void run() {
            lock.lock();    // 获取锁
            try {
                System.out.println(Thread.currentThread().getName()+" wakup others");
                condition.signal();    // 唤醒“condition所在锁上的其它线程”
            } finally {
                lock.unlock();    // 释放锁
            }
        }
    }
}

运行结果:

main start ta
main block
ta wakup others
main continue

那么Condition是如何预防过早唤醒的呢?如下图所示:

其实本质就是不同的notify线程跟wait线程之间使用不同的condition,与wait/notify相比,粒度更细,因此预防过早唤醒的情况出现。

并且针对一个锁,可以有多个condition,示例如下:

import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

class BoundedBuffer {
    final Lock lock = new ReentrantLock();
    final Condition notFull  = lock.newCondition();
    final Condition notEmpty = lock.newCondition(); 

    final Object[] items = new Object[5];
    int putptr, takeptr, count;

    public void put(Object x) throws InterruptedException {
        lock.lock();    //获取锁
        try {
            // 如果“缓冲已满”,则等待;直到“缓冲”不是满的,才将x添加到缓冲中。
            while (count == items.length)
                notFull.await();
            // 将x添加到缓冲中
            items[putptr] = x;
            // 将“put统计数putptr+1”;如果“缓冲已满”,则设putptr为0。
            if (++putptr == items.length) putptr = 0;
            // 将“缓冲”数量+1
            ++count;
            // 唤醒take线程,因为take线程通过notEmpty.await()等待
            notEmpty.signal();

            // 打印写入的数据
            System.out.println(Thread.currentThread().getName() + " put  "+ (Integer)x);
        } finally {
            lock.unlock();    // 释放锁
        }
    }

    public Object take() throws InterruptedException {
        lock.lock();    //获取锁
        try {
            // 如果“缓冲为空”,则等待;直到“缓冲”不为空,才将x从缓冲中取出。
            while (count == 0)
                notEmpty.await();
            // 将x从缓冲中取出
            Object x = items[takeptr];
            // 将“take统计数takeptr+1”;如果“缓冲为空”,则设takeptr为0。
            if (++takeptr == items.length) takeptr = 0;
            // 将“缓冲”数量-1
            --count;
            // 唤醒put线程,因为put线程通过notFull.await()等待
            notFull.signal();

            // 打印取出的数据
            System.out.println(Thread.currentThread().getName() + " take "+ (Integer)x);
            return x;
        } finally {
            lock.unlock();    // 释放锁
        }
    }
}

public class ConditionTest2 {
    private static BoundedBuffer bb = new BoundedBuffer();

    public static void main(String[] args) {
        // 启动10个“写线程”,向BoundedBuffer中不断的写数据(写入0-9);
        // 启动10个“读线程”,从BoundedBuffer中不断的读数据。
        for (int i=0; i<10; i++) {
            new PutThread("p"+i, i).start();
            new TakeThread("t"+i).start();
        }
    }

    static class PutThread extends Thread {
        private int num;
        public PutThread(String name, int num) {
            super(name);
            this.num = num;
        }
        public void run() {
            try {
                Thread.sleep(1);    // 线程休眠1ms
                bb.put(num);        // 向BoundedBuffer中写入数据
            } catch (InterruptedException e) {
            }
        }
    }

    static class TakeThread extends Thread {
        public TakeThread(String name) {
            super(name);
        }
        public void run() {
            try {
                Thread.sleep(10);                    // 线程休眠1ms
                Integer num = (Integer)bb.take();    // 从BoundedBuffer中取出数据
            } catch (InterruptedException e) {
            }
        }
    }
}

运行结果如下;

p1 put  1
p4 put  4
p5 put  5
p0 put  0
p2 put  2
t0 take 1
p3 put  3
t1 take 4
p6 put  6
t2 take 5
p7 put  7
t3 take 0
p8 put  8
t4 take 2
p9 put  9
t5 take 3
t6 take 6
t7 take 7
t8 take 8
t9 take 9

先描述下这种场景,有一个缓冲区,另外有一个读线程,一个写线程。读线程与写线程互不干扰的工作,但是当写线程开始工作后,通知读线程去读,并且读线程开始读之后,通知写线程工作。这种场景用wait/notify机制根本无法实现,但是可以通过多个condition可以实现。也就是说比如A、B两个线程,A需要唤醒B,使用一个condition,但同时B也需要唤醒A,就得使用另一个condition。

CountDownLatch

countDownLatch与thread.join()功能类似,但是join的粒度太大,必须要等整个线程执行结束才能执行后续相关动作,但是countDownLatch可以实现更精细的粒度。

1、原理

其实这种方式也就是所谓的闭锁,一种同步方法,可以延迟线程的进度直到线程到达某个终点状态。通俗的讲就是,一个闭锁相当于一扇大门,在大门打开之前所有线程都被阻断,一旦大门打开所有线程都将通过,但是一旦大门打开,所有线程都通过了,那么这个闭锁的状态就失效了,门的状态也就不能变了,只能是打开状态。也就是说闭锁的状态是一次性的,它确保在闭锁打开之前所有特定的活动都需要在闭锁打开之后才能完成。内部维护一个用于表示未完成的先决操作数量的计数器,通过countDown()方法来完成的。每调用一次这个方法,在构造函数中初始化的count值就减1,所以当N个线程都调用了这个方法count的值等于0,然后主线程就能通过await方法,恢复自己的任务。

2、主要方法

public CountDownLatch(int count); //指定计数的次数,只能被设置1次
public void countDown();          //调用此方法则计数减1
public void await() throws InterruptedException   //调用此方法会一直阻塞当前线程,直到计时器的值为0,除非线程被中断。
public Long getCount();           //得到当前的计数
public boolean await(long timeout, TimeUnit unit) //调用此方法会一直阻塞当前线程,直到计时器的值为0,除非线程被中断或者计数器超时,返回false代表计数器超时。

3、示例

public class CountDownLatchDemo {
    final static SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    public static void main(String[] args) throws InterruptedException {
        CountDownLatch latch=new CountDownLatch(2);//两个工人的协作
        Worker worker1=new Worker("zhang san", 5000, latch);
        Worker worker2=new Worker("li si", 8000, latch);
        worker1.start();//
        worker2.start();//
        latch.await();//等待所有工人完成工作
        System.out.println("all work done at "+sdf.format(new Date()));
    }  

    static class Worker extends Thread{
        String workerName;
        int workTime;
        CountDownLatch latch;
        public Worker(String workerName ,int workTime ,CountDownLatch latch){
             this.workerName=workerName;
             this.workTime=workTime;
             this.latch=latch;
        }
        public void run(){
            System.out.println("Worker "+workerName+" do work begin at "+sdf.format(new Date()));
            doWork();//工作了
            System.out.println("Worker "+workerName+" do work complete at "+sdf.format(new Date()));
            latch.countDown();//工人完成工作,计数器减一  

        }  

        private void doWork(){
            try {
                Thread.sleep(workTime);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

CyclicBarrier(栅栏)

1、概念

字面意思回环栅栏,通过它可以实现让一组线程等待至某个状态之后再全部同时执行。叫做回环是因为当所有等待线程都被释放以后,CyclicBarrier可以被重用。我们暂且把这个状态就叫做barrier,当调用await()方法之后,线程就处于barrier了。

2、构造器

public CyclicBarrier(int parties, Runnable barrierAction) {
}

public CyclicBarrier(int parties) {
}

3、主要方法

public int await() throws InterruptedException, BrokenBarrierException { };
public int await(long timeout, TimeUnit unit)throws InterruptedException,BrokenBarrierException,TimeoutException { };

4、示例

场景:假若有若干个线程都要进行写数据操作,并且只有所有线程都完成写数据操作之后,这些线程才能继续做后面的事情

public class Test {
    public static void main(String[] args) {
        int N = 4;
        CyclicBarrier barrier  = new CyclicBarrier(N);
        for(int i=0;i<N;i++)
            new Writer(barrier).start();
    }
    static class Writer extends Thread{
        private CyclicBarrier cyclicBarrier;
        public Writer(CyclicBarrier cyclicBarrier) {
            this.cyclicBarrier = cyclicBarrier;
        }

        @Override
        public void run() {
            System.out.println("线程"+Thread.currentThread().getName()+"正在写入数据...");
            try {
                Thread.sleep(5000);      //以睡眠来模拟写入数据操作
                System.out.println("线程"+Thread.currentThread().getName()+"写入数据完毕,等待其他线程写入完毕");
                cyclicBarrier.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }catch(BrokenBarrierException e){
                e.printStackTrace();
            }
            System.out.println("所有线程写入完毕,继续处理其他任务...");
        }
    }
}

运行结果:

线程Thread-0正在写入数据...
线程Thread-3正在写入数据...
线程Thread-2正在写入数据...
线程Thread-1正在写入数据...
线程Thread-2写入数据完毕,等待其他线程写入完毕
线程Thread-0写入数据完毕,等待其他线程写入完毕
线程Thread-3写入数据完毕,等待其他线程写入完毕
线程Thread-1写入数据完毕,等待其他线程写入完毕
所有线程写入完毕,继续处理其他任务...
所有线程写入完毕,继续处理其他任务...
所有线程写入完毕,继续处理其他任务...
所有线程写入完毕,继续处理其他任务...

原文地址:https://www.cnblogs.com/alimayun/p/10909074.html

时间: 2024-10-16 07:05:01

java并发系列(二)-----线程之间的协作(wait、notify、join、CountDownLatch、CyclicBarrier)的相关文章

Java 并发编程:线程间的协作(wait/notify/sleep/yield/join)

Java并发编程系列[未完]: Java 并发编程:核心理论 Java并发编程:Synchronized及其实现原理 Java并发编程:Synchronized底层优化(轻量级锁.偏向锁) Java 并发编程:线程间的协作(wait/notify/sleep/yield/join) 一.线程的状态 Java中线程中状态可分为五种:New(新建状态),Runnable(就绪状态),Running(运行状态),Blocked(阻塞状态),Dead(死亡状态). New:新建状态,当线程创建完成时为新

【Java并发系列02】Object的wait()、notify()、notifyAll()方法使用

一.前言 对于并发编程而言,除了Thread以外,对Object对象的wati和notify对象也应该深入了解其用法,虽然知识点不多. 二.线程安全基本知识 首先应该记住以下基本点,先背下来也无妨: 同一时间一个锁只能被一个线程持有 调用对象的wait()和notify()前必须持有它 三.wait()和notify()理解 3.1 wait()和notify()方法简介 wait()和notify()都是Object的方法,可以认为任意一个Object都是一种资源(或者资源的一个代表),当多个

19、Java并发编程:线程间协作的两种方式:wait、notify、notifyAll和Condition

Java并发编程:线程间协作的两种方式:wait.notify.notifyAll和Condition 在前面我们将了很多关于同步的问题,然而在现实中,需要线程之间的协作.比如说最经典的生产者-消费者模型:当队列满时,生产者需要等待队列有空间才能继续往里面放入商品,而在等待的期间内,生产者必须释放对临界资源(即队列)的占用权.因为生产者如果不释放对临界资源的占用权,那么消费者就无法消费队列中的商品,就不会让队列有空间,那么生产者就会一直无限等待下去.因此,一般情况下,当队列满时,会让生产者交出对

Java并发编程:线程间协作的两种方式:wait、notify、notifyAll和Condition

在前面我们将了很多关于同步的问题,然而在现实中,需要线程之间的协作.比如说最经典的生产者-消费者模型:当队列满时,生产者需要等待队列有空间才能继续往里面放入商品,而在等待的期间内,生产者必须释放对临界资源(即队列)的占用权.因为生产者如果不释放对临界资源的占用权,那么消费者就无法消费队列中的商品,就不会让队列有空间,那么生产者就会一直无限等待下去.因此,一般情况下,当队列满时,会让生产者交出对临界资源的占用权,并进入挂起状态.然后等待消费者消费了商品,然后消费者通知生产者队列有空间了.同样地,当

【转】Java并发编程:线程池的使用

Java并发编程:线程池的使用 在前面的文章中,我们使用线程的时候就去创建一个线程,这样实现起来非常简便,但是就会有一个问题: 如果并发的线程数量很多,并且每个线程都是执行一个时间很短的任务就结束了,这样频繁创建线程就会大大降低系统的效率,因为频繁创建线程和销毁线程需要时间. 那么有没有一种办法使得线程可以复用,就是执行完一个任务,并不被销毁,而是可以继续执行其他的任务? 在Java中可以通过线程池来达到这样的效果.今天我们就来详细讲解一下Java的线程池,首先我们从最核心的ThreadPool

Java并发编程:线程池的使用(转)

Java并发编程:线程池的使用 在前面的文章中,我们使用线程的时候就去创建一个线程,这样实现起来非常简便,但是就会有一个问题: 如果并发的线程数量很多,并且每个线程都是执行一个时间很短的任务就结束了,这样频繁创建线程就会大大降低系统的效率,因为频繁创建线程和销毁线程需要时间. 那么有没有一种办法使得线程可以复用,就是执行完一个任务,并不被销毁,而是可以继续执行其他的任务? 在Java中可以通过线程池来达到这样的效果.今天我们就来详细讲解一下Java的线程池,首先我们从最核心的ThreadPool

并发编程之线程共享和协作(一)

更多Android架构进阶视频学习请点击:https://space.bilibili.com/474380680本篇文章将从以下几个内容来阐述线程共享和协作: [基础概念之CPU核心数.线程数,时间片轮转机制解读][线程之间的共享][线程间的协作] 一.基础概念 CPU核心数.线程数两者的关系:cpu的核心数与线程数是1:1的关系,例如一个8核的cpu,支持8个线程同时运行.但在intel引入超线程技术以后,cpu与线程数的关系就变成了1:2.此外在开发过程中并没感觉到线程的限制,那是因为cp

JAVA并发编程3_线程同步之synchronized关键字

在上一篇博客里讲解了JAVA的线程的内存模型,见:JAVA并发编程2_线程安全&内存模型,接着上一篇提到的问题解决多线程共享资源的情况下的线程安全问题. 不安全线程分析 public class Test implements Runnable { private int i = 0; private int getNext() { return i++; } @Override public void run() { // synchronized while (true) { synchro

Java并发系列[1]----AbstractQueuedSynchronizer源码分析之概要分析

学习Java并发编程不得不去了解一下java.util.concurrent这个包,这个包下面有许多我们经常用到的并发工具类,例如:ReentrantLock, CountDownLatch, CyclicBarrier, Semaphore等.而这些类的底层实现都依赖于AbstractQueuedSynchronizer这个类,由此可见这个类的重要性.所以在Java并发系列文章中我首先对AbstractQueuedSynchronizer这个类进行分析,由于这个类比较重要,而且代码比较长,为了