Java多线程20:多线程下的其他组件之CountDownLatch、Semaphore、Exchanger

前言

在多线程环境下,JDK给开发者提供了许多的组件供用户使用(主要在java.util.concurrent下),使得用户不需要再去关心在具体场景下要如何写出同时兼顾线程安全性与高效率的代码。之前讲过的线程池、BlockingQueue都是在java.util.concurrent下的组件,Timer虽然不在java.util.concurrent下,但也算是。后两篇文章将以例子的形式简单讲解一些多线程下其他组件的使用,不需要多深刻的理解,知道每个组件大致什么作用就行。

本文主要讲解的是CountDownLatch、Semaphore、Exchanger。

CountDownLatch

CountDownLatch主要提供的机制是当多个(具体数量等于初始化CountDownLatch时count参数的值)线程都达到了预期状态或完成预期工作时触发事件,其他线程可以等待这个事件来触发自己的后续工作。值得注意的是,CountDownLatch是可以唤醒多个等待的线程的。

到达自己预期状态的线程会调用CountDownLatch的countDown方法,等待的线程会调用CountDownLatch的await方法。如果CountDownLatch初始化的count值为1,那么这就退化为一个单一事件了,即是由一个线程来通知其他线程,效果等同于对象的wait和notifyAll,count值大于1是常用的方式,目的是为了让多个线程到达各自的预期状态,变为一个事件进行通知,线程则继续自己的行为。

看一个例子:

private static class WorkThread extends Thread
{
    private CountDownLatch cdl;
    private int sleepSecond;

    public WorkThread(String name, CountDownLatch cdl, int sleepSecond)
    {
        super(name);
        this.cdl = cdl;
        this.sleepSecond = sleepSecond;
    }

    public void run()
    {
        try
        {
            System.out.println(this.getName() + "启动了,时间为" + System.currentTimeMillis());
            Thread.sleep(sleepSecond * 1000);
            cdl.countDown();
            System.out.println(this.getName() + "执行完了,时间为" + System.currentTimeMillis());
        }
        catch (InterruptedException e)
        {
            e.printStackTrace();
        }
    }
}

private static class DoneThread extends Thread
{
    private CountDownLatch cdl;

    public DoneThread(String name, CountDownLatch cdl)
    {
        super(name);
        this.cdl = cdl;
    }

    public void run()
    {
        try
        {
            System.out.println(this.getName() + "要等待了, 时间为" + System.currentTimeMillis());
            cdl.await();
            System.out.println(this.getName() + "等待完了, 时间为" + System.currentTimeMillis());
        }
        catch (InterruptedException e)
        {
            e.printStackTrace();
        }
    }
}

public static void main(String[] args) throws Exception
{
    CountDownLatch cdl = new CountDownLatch(3);
    DoneThread dt0 = new DoneThread("DoneThread1", cdl);
    DoneThread dt1 = new DoneThread("DoneThread2", cdl);
    dt0.start();
    dt1.start();
    WorkThread wt0 = new WorkThread("WorkThread1", cdl, 2);
    WorkThread wt1 = new WorkThread("WorkThread2", cdl, 3);
    WorkThread wt2 = new WorkThread("WorkThread3", cdl, 4);
    wt0.start();
    wt1.start();
    wt2.start();
}

看一下运行结果:

DoneThread2要等待了, 时间为1444563077434
DoneThread1要等待了, 时间为1444563077434
WorkThread1启动了,时间为1444563077434
WorkThread3启动了,时间为1444563077435
WorkThread2启动了,时间为1444563077435
WorkThread1执行完了,时间为1444563079435
WorkThread2执行完了,时间为1444563080435
WorkThread3执行完了,时间为1444563081435
DoneThread1等待完了, 时间为1444563081435
DoneThread2等待完了, 时间为1444563081435

效果十分明显,解释一下:

1、启动2个线程DoneThread线程等待3个WorkThread全部执行完

2、3个WorkThread全部执行完,最后执行完的WorkThread3执行了秒符合预期

3、后三句从时间上看几乎同时出现,说明CountDownLatch设置为3,WorkThread3执行完,两个wait的线程马上就执行后面的代码了

这相当于是一种进化版本的等待/通知机制,它可以的实现的是多个工作线程完成任务后通知多个等待线程开始工作,之前的都是一个工作线程完成任务通知一个等待线程或者一个工作线程完成任务通知所有等待线程。

CountDownLatch其实是很有用的,特别适合这种将一个问题分割成N个部分的场景,所有子部分完成后,通知别的一个/几个线程开始工作。比如我要统计C、D、E、F盘的文件,可以开4个线程,分别统计C、D、E、F盘的文件,统计完成把文件信息汇总到另一个/几个线程中进行处理

Semaphore

Semaphore是非常有用的一个组件,它相当于是一个并发控制器,是用于管理信号量的。构造的时候传入可供管理的信号量的数值,这个数值就是控制并发数量的,我们需要控制并发的代码,执行前先通过acquire方法获取信号,执行后通过release归还信号 。每次acquire返回成功后,Semaphore可用的信号量就会减少一个,如果没有可用的信号,acquire调用就会阻塞,等待有release调用释放信号后,acquire才会得到信号并返回。

Semaphore分为单值和多值两种:

1、单值的Semaphore管理的信号量只有1个,该信号量只能被1个,只能被一个线程所获得,意味着并发的代码只能被一个线程运行,这就相当于是一个互斥锁了

2、多值的Semaphore管理的信号量多余1个,主要用于控制并发数

看一下代码例子:

public static void main(String[] args)
{
    final Semaphore semaphore = new Semaphore(5);

    Runnable runnable = new Runnable()
    {
        public void run()
        {
            try
            {
                semaphore.acquire();                              System.out.println(Thread.currentThread().getName() + "获得了信号量,时间为" + System.currentTimeMillis());
                Thread.sleep(2000);
          System.out.println(Thread.currentThread().getName() + "释放了信号量,时间为" + System.currentTimeMillis());
            }
            catch (InterruptedException e)
            {
                e.printStackTrace();
            }
            finally
            {
                semaphore.release();
            }
        }
    };

    Thread[] threads = new Thread[10];
    for (int i = 0; i < threads.length; i++)
        threads[i] = new Thread(runnable);
    for (int i = 0; i < threads.length; i++)
        threads[i].start();
}

看一下运行结果:

 1 Thread-1获得了信号量,时间为1444557040464
 2 Thread-2获得了信号量,时间为1444557040465
 3 Thread-0获得了信号量,时间为1444557040464
 4 Thread-3获得了信号量,时间为1444557040465
 5 Thread-4获得了信号量,时间为1444557040465
 6 Thread-2释放了信号量,时间为1444557042466
 7 Thread-4释放了信号量,时间为1444557042466
 8 Thread-0释放了信号量,时间为1444557042466
 9 Thread-1释放了信号量,时间为1444557042466
10 Thread-3释放了信号量,时间为1444557042466
11 Thread-9获得了信号量,时间为1444557042467
12 Thread-7获得了信号量,时间为1444557042466
13 Thread-6获得了信号量,时间为1444557042466
14 Thread-5获得了信号量,时间为1444557042466
15 Thread-8获得了信号量,时间为1444557042467
16 Thread-9释放了信号量,时间为1444557044467
17 Thread-6释放了信号量,时间为1444557044467
18 Thread-7释放了信号量,时间为1444557044467
19 Thread-5释放了信号量,时间为1444557044468
20 Thread-8释放了信号量,时间为1444557044468

前10行为一部分,运行的线程是1 2 0 3 4,看到时间差也都是代码约定的2秒;后10行为一部分,运行的线程是9 7 6 5 8,时间差也都是约定的2秒,这就体现出了Semaphore的作用了。

这种通过Semaphore控制并发并发数的方式和通过控制线程数来控制并发数的方式相比,粒度更小,因为Semaphore可以通过acquire方法和release方法来控制代码块的并发数。

最后注意两点:

1、Semaphore可以指定公平锁还是非公平锁

2、acquire方法和release方法是可以有参数的,表示获取/返还的信号量个数

Exchanger

Exchanger,从名字上理解就是交换。Exchanger用于在两个线程之间进行数据交换,注意也只能在两个线程之间进行数据交换。线程会阻塞在Exchanger的exchange方法上,直到另外一个线程也到了同一个Exchanger的exchange方法时,二者进行数据交换,然后两个线程继续执行自身相关的代码。

Exchanger只有一个exchange方法,用于交换数据。看一下例子:

public static class ExchangerThread extends Thread
{
    private String str;
    private Exchanger<String> exchanger;
    private int sleepSecond;

    public ExchangerThread(String str, Exchanger<String> exchanger, int sleepSecond)
    {
        this.str = str;
        this.exchanger = exchanger;
        this.sleepSecond = sleepSecond;
    }

    public void run()
    {
        try
        {
            System.out.println(this.getName() + "启动, 原数据为" + str + ", 时间为" + System.currentTimeMillis());
            Thread.sleep(sleepSecond * 1000);
            str = exchanger.exchange(str);
            System.out.println(this.getName() + "交换了数据, 交换后的数据为" + str + ", 时间为" + System.currentTimeMillis());
        }
        catch (InterruptedException e)
        {
            e.printStackTrace();
        }
    }
}

public static void main(String[] args)
{
    Exchanger<String> exchanger = new Exchanger<String>();
    ExchangerThread et0 = new ExchangerThread("111", exchanger, 3);
    ExchangerThread et1 = new ExchangerThread("222", exchanger, 2);
    et0.start();
    et1.start();
}

看一下运行结果:

Thread-0启动, 原数据为111, 时间为1444560972303
Thread-1启动, 原数据为222, 时间为1444560972303
Thread-0交换了数据, 交换后的数据为222, 时间为1444560975303
Thread-1交换了数据, 交换后的数据为111, 时间为1444560975303

看到两个线程交换了数据,由于一个线程睡2秒、一个线程睡3秒,既然要交换数据,肯定是睡2秒的要等待睡3秒的,所以看到时间差是3000ms即3s。

从这个例子看来,Exchanger有点像之前Java多线程15:Queue、BlockingQueue以及利用BlockingQueue实现生产者/消费者模型一文中的SynchronousQueue的双向形式,它可能在遗传算法和管道设计中很有用。

时间: 2024-12-05 12:09:12

Java多线程20:多线程下的其他组件之CountDownLatch、Semaphore、Exchanger的相关文章

自定义View——利用下拉刷新组件实现上拉加载

注:本文demo已经提交github,地址完整代码如下,demo工程已经上传至GitHub, github地址https://github.com/wsclwps123/UpLoadSwipeRefreshLayout 感谢大家支持! 在Android开发中,我们经常会用到列表下拉刷新和上拉加载的功能. Google在support.v4包中提供了一个组件可以用来进行下来刷新,这个组件是SwipeRefreshLayout. 下面我们来看一下这个组件的使用: 在布局文件中加上xml代码 <and

Java多线程21:多线程下的其他组件之CyclicBarrier、Callable、Future和FutureTask

CyclicBarrier 接着讲多线程下的其他组件,第一个要讲的就是CyclicBarrier.CyclicBarrier从字面理解是指循环屏障,它可以协同多个线程,让多个线程在这个屏障前等待,直到所有线程都达到了这个屏障时,再一起继续执行后面的动作.看一下CyclicBarrier的使用实例: public static class CyclicBarrierThread extends Thread { private CyclicBarrier cb; private int sleep

设计模式——单例模式(Java)——考虑多线程环境下的线程安全问题

设计模式--单例模式(Java)--考虑多线程环境下的线程安全问题 一:单例模式概念 单例模式是一种常用的软件设计模式.在它的核心结构中只包含一个被称为单例的特殊类.通过单例模式可以保证系统中一个类只有一个实例 二:单例模式的实现方式 特别注意,在多线程环境下,需要对获取对象实例的方法加对象锁(synchronized) 方式一:(懒汉式)程序执行过程中需要这个类的对象,再实例化这个类的对象 步骤: 1.定义静态私有对象 2.构造方法私有化保证在类的外部无法实例化该类的对象 3.定义对外开放的静

java 多线程 30: 多线程组件之 CyclicBarrier

CyclicBarrier 接着讲多线程下的其他组件,第一个要讲的就是CyclicBarrier.CyclicBarrier从字面理解是指循环屏障,它可以协同多个线程,让多个线程在这个屏障前等待,直到所有线程都达到了这个屏障时,再一起继续执行后面的动作.看一下CyclicBarrier的使用实例: public static class CyclicBarrierThread extends Thread { private CyclicBarrier cb; private int sleep

Java线程及多线程技术及应用

第6 章 Java线程及多线程技术及应用 6.1线程基本概念 1.进程和线程的基础知识 l 进程:运行中的应用程序称为进程,拥有系统资源(cpu.内存) l 线程:进程中的一段代码,一个进程中可以哦有多段代码.本身不拥有资源(共享所在进程的资源) 在java中,程序入口被自动创建为主线程,在主线程中可以创建多个子线程. 区别: 1.是否占有资源问题 2.创建或撤销一个进程所需要的开销比创建或撤销一个线程所需要的开销大. 3.进程为重量级组件,线程为轻量级组件 l 多进程: 在操作系统中能同时运行

[Android] 图片JNI(C++\Java)高斯模糊 多线程

在我的博客中,曾经发布了一篇高斯模糊(堆栈模糊)的文章:在其中使用了国外的一个堆栈模糊来实现对图片的模糊处理:同时弄了一个JNI C++ 的版本. 这篇文章依然是堆栈模糊:可以说最原始的地方还是堆栈模糊部分:只不过是支持多线程的. 纳尼??感情是之前那个不支持多线程?Sorry,我说错了:两个都是支持多线程调用的.不过新讲的这个是能在内部采用多线程进行分段模糊. 原来的:[Android]-图片JNI(C++\Java)高斯模糊的实现与比较 开工吧 说明:其中代码大部分来源于网络,不过都是开源的

[转]Java中的多线程你只要看这一篇就够了

如果对什么是线程.什么是进程仍存有疑惑,请先Google之,因为这两个概念不在本文的范围之内. 用多线程只有一个目的,那就是更好的利用cpu的资源,因为所有的多线程代码都可以用单线程来实现.说这个话其实只有一半对,因为反应“多角色”的程序代码,最起码每个角色要给他一个线程吧,否则连实际场景都无法模拟,当然也没法说能用单线程来实现:比如最常见的“生产者,消费者模型”. 很多人都对其中的一些概念不够明确,如同步.并发等等,让我们先建立一个数据字典,以免产生误会. 多线程:指的是这个程序(一个进程)运

java中的多线程入门

本文主要是想学习下java中多线程的东西. 一.理解多线程. 多线程是怎么样的机制?他是允许在程序中并发执行多个指令流,每个指令流都称为一个线程,彼此之间相互独立. 线程又称为轻量级进程,它和进程一样拥有独立的执行控制,有操作系统负责调度,区别在于线程诶有独立的存储空间,而是和所属进程中的其它线程共享一个存储空间,这使得线程间的通信远较进程简单. 多个线程的执行是并发的,也就是在逻辑上“同时”,而不管是否是物理上的“同时”,如果系统只有一个CPU,那么真正的“同时”是不可能的,但是由于CPU的速

java并发与多线程API学习

Executor接口 public interface Executor { void execute(Runnable command); } Executor接口中之定义了一个方法execute(Runnable command),该方法接收一个Runable实例,它用来执行一个任务,任务即一个实现了Runnable接口的类. 在Java 5之后,任务分两类:一类是实现了Runnable接口的类,一类是实现了Callable接口的类.两者都可以被ExecutorService执行,但是Run