四、线程的并发工具类

线程的并发工具类

一、CountDownLatch

【1】CountDownLatch是什么?

CountDownLatch,英文翻译为倒计时锁存器,是一个同步辅助类,在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待。

闭锁可以延迟线程的进度直到其到达终止状态,闭锁可以用来确保某些活动直到其他活动都完成才继续执行:

  • 确保某个计算在其需要的所有资源都被初始化之后才继续执行;
  • 确保某个服务在其依赖的所有其他服务都已经启动之后才启动;
  • 等待直到某个操作所有参与者都准备就绪再继续执行;

CountDownLatch有一个正数计数器,countDown()方法对计数器做减操作,await()方法等待计数器达到0。所有await的线程都会阻塞直到计数器为0或者等待线程中断或者超时。

闭锁(倒计时锁)主要用来保证完成某个任务的先决条件满足。是一个同步工具类,用来协调多个线程之间的同步。这个工具通常用来控制线程等待,它可以让某一个线程等待直到倒计时结束,再开始执行。

【2】CountDownLatch的两种典型用法

某一线程在开始运行前等待n个线程执行完毕。

将 CountDownLatch 的计数器初始化为n :new CountDownLatch(n),每当一个任务线程执行完毕,就将计数器减1 countdownlatch.countDown(),当计数器的值变为0时,在CountDownLatch上 await() 的线程就会被唤醒。一个典型应用场景就是启动一个服务时,主线程需要等待多个组件加载完毕,之后再继续执行。

实现多个线程开始执行任务的最大并行性。

注意是并行性,不是并发,强调的是多个线程在某一时刻同时开始执行。类似于赛跑,将多个线程放到起点,等待发令枪响,然后同时开跑。做法是初始化一个共享的 CountDownLatch 对象,将其计数器初始化为 1 :new CountDownLatch(1),多个线程在开始执行任务前首先 coundownlatch.await(),当主线程调用 countDown() 时,计数器变为0,多个线程同时被唤醒。

如下例所示,在多线程运行的情况下,计算多线程耗费的时间:

public class TestCountDownLatch {
    //CountDownLatch 为唯一的、共享的资源
    static CountDownLatch countDownLatch = new CountDownLatch(5);

    static class LatchDemo extends Thread{
        @Override
        public void run() {
            int sum = 0;
            for (int i = 0; i < 1000000; i++) {
                sum++;
            }
            System.out.println(getName()+"计算结果:"+sum);
            countDownLatch.countDown();
        }
    }
    public static void main(String[] args) throws InterruptedException {

        long begin = System.currentTimeMillis();
        System.out.println("开始了-----"+begin);
        for (int i = 0; i < 5; i++) {
            new LatchDemo().start();
        }
        countDownLatch.await();

        long end = System.currentTimeMillis();
        System.out.println("结束了-----"+end);
        System.out.println("总共用时:"+(end-begin));
    }
}

/**
开始了-----1571144894551
Thread-3计算结果:1000000
Thread-0计算结果:1000000
Thread-1计算结果:1000000
Thread-2计算结果:1000000
Thread-4计算结果:1000000
结束了-----1571144894559
总共用时:8
*/

二、CyclicBarrier

【1】CyclicBarrier是什么?

? CyclicBarrier即栅栏类,与CountDownLatch类似。它能阻塞一组线程直到某个事件的发生。栅栏与闭锁的关键区别在于,所有的线程必须同时到达栅栏位置,才能继续执行。

? CyclicBarrier可以使一定数量的线程反复地在栅栏位置处汇集。当线程到达栅栏位置时将调用await方法,这个方法将阻塞直到所有线程都到达栅栏位置。如果所有线程都到达栅栏位置,那么栅栏将打开,此时所有的线程都将被释放,而栅栏将被重置以便下次使用。

【2】CyclicBarrier构造方法

public CyclicBarrier(int parties) {
    this(parties, null);
}

public CyclicBarrier(int parties, Runnable barrierAction) {
    if (parties <= 0) throw new IllegalArgumentException();
    this.parties = parties;
    this.count = parties;
    this.barrierCommand = barrierAction;
}

CyclicBarrier默认的构造方法是CyclicBarrier(int parties),其参数表示屏障拦截的线程数量,每个线程使用await()方法告诉CyclicBarrier我已经到达了屏障,然后当前线程被阻塞。

CyclicBarrier的另一个构造函数CyclicBarrier(int parties, Runnable barrierAction),用于线程到达屏障时,优先执行barrierAction,方便处理更复杂的业务场景。

【3】CyclicBarrier应用示例

public class CyclicBarrierTest {
    // 自定义工作线程
    private static class Worker extends Thread {
        private CyclicBarrier cyclicBarrier;

        public Worker(CyclicBarrier cyclicBarrier) {
            this.cyclicBarrier = cyclicBarrier;
        }

        @Override
        public void run() {
            super.run();

            try {
                System.out.println(Thread.currentThread().getName() + "开始等待其他线程");
                cyclicBarrier.await();
                System.out.println(Thread.currentThread().getName() + "开始执行");
                // 工作线程开始处理,这里用Thread.sleep()来模拟业务处理
                Thread.sleep(1000);
                System.out.println(Thread.currentThread().getName() + "执行完毕");
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) {
        int threadCount = 3;
        CyclicBarrier cyclicBarrier = new CyclicBarrier(threadCount);

        for (int i = 0; i < threadCount; i++) {
            System.out.println("创建工作线程" + i);
            Worker worker = new Worker(cyclicBarrier);
            worker.start();
        }
    }
}
/**
创建工作线程0
创建工作线程1
Thread-0开始等待其他线程
创建工作线程2
Thread-1开始等待其他线程
Thread-2开始等待其他线程
Thread-2开始执行
Thread-0开始执行
Thread-1开始执行
Thread-1执行完毕
Thread-0执行完毕
Thread-2执行完毕
*/

? 在上述代码中,我们自定义的工作线程必须要等所有参与线程开始之后才可以执行,我们可以使用CyclicBarrier类来帮助我们完成。从程序的执行结果中也可以看出,所有的工作线程都运行await()方法之后都到达了栅栏位置,然后,3个工作线程才开始执行业务处理。

【4】CyclicBarrier和CountDownLatch的区别

  • CountDownLatch的计数器只能使用一次,而CyclicBarrier的计数器可以使用reset()方法重置,可以使用多次,所以CyclicBarrier能够处理更为复杂的场景;
  • CyclicBarrier还提供了一些其他有用的方法,比如getNumberWaiting()方法可以获得CyclicBarrier阻塞的线程数量,isBroken()方法用来了解阻塞的线程是否被中断;
  • CountDownLatch允许一个或多个线程等待一组事件的产生,而CyclicBarrier用于等待其他线程运行到栅栏位置。

三、Semaphore

【1】Semaphore是什么?

信号量(Semaphore),又被称为信号灯,在多线程环境下用于协调各个线程, 以保证它们能够正确、合理的使用公共资源。信号量维护了一个许可集,我们在初始化Semaphore时需要为这个许可集传入一个数量值,该数量值代表同一时间能访问共享资源的线程数量。

【2】Semaphore基本用法

线程可以通过acquire()方法获取到一个许可,然后对共享资源进行操作,注意如果许可集已分配完了,那么线程将进入等待状态,直到其他线程释放许可才有机会再获取许可,线程释放一个许可通过release()方法完成,"许可"将被归还给Semaphore。

【3】Semaphore实现互斥锁

public class TestSemaphore {
    //初始化为1,互斥信号量
    private final static Semaphore mutex = new Semaphore(1);

    static class thread extends Thread{
        @Override
        public void run() {
            try {
                mutex.acquire();
                System.out.println(getName()+"开始工作");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }finally {
                //使用完成释放锁
                mutex.release();
                System.out.println("锁释放!!!");
            }
        }
    }
    public static void main(String[] args) {
        for (int i = 0; i < 10; i++) {
            new Thread(new thread(),String.valueOf(i)).start();
        }
    }
}

创建一个数量为1的互斥信号量Semaphore,然后并发执行10个线程,在线程中利用Semaphore控制线程的并发执行,因为信号量数值只有1,因此每次只能一条线程执行,其他线程进入等待状态。

四、Callable、Future和FutureTask

Future接口,一般都是取回Callable执行的状态用的。其中的主要方法:

  • cancel,取消Callable的执行,当Callable还没有完成时
  • get,获得Callable的返回值
  • isCanceled,判断是否取消了
  • isDone,判断是否完成

原文地址:https://www.cnblogs.com/lee0527/p/11686936.html

时间: 2024-11-08 21:09:17

四、线程的并发工具类的相关文章

Java并发编程系列-(2) 线程的并发工具类

2.线程的并发工具类 2.1 Fork-Join JDK 7中引入了fork-join框架,专门来解决计算密集型的任务.可以将一个大任务,拆分成若干个小任务,如下图所示: Fork-Join框架利用了分而治之的思想:什么是分而治之?规模为N的问题,N<阈值,直接解决,N>阈值,将N分解为K个小规模子问题,子问题互相对立,与原问题形式相同,将子问题的解合并得到原问题的解. 具体使用中,需要向ForkJoinPool线程池提交一个ForkJoinTask任务.ForkJoinTask任务有两个重要

并发编程(2)--线程的并发工具类

1.线程的并发工具类 Fork-Join 什么是分而治之? 规模为N的问题,N<阈值,直接解决,N>阈值,将N分解为K个小规模子问题,子问题互相对立,与原问题形式相同,将子问题的解合并得到原问题的解 动态规范 工作密取 workStealing Fork/Join使用的标准范式 下面演示第一种用法:由于上下文切换的原因,所以性能上有可能不如单线程效果好. package com.xiangxue.ch2.forkjoin.sum; import java.util.Random; /** *

Java并发编程-线程的并发工具类

Fork-Join 什么是分而治之?规模为N的问题,N<阈值,直接解决,N>阈值,将N分解为K个小规模子问题,子问题互相对立,与原问题形式相同,将子问题的解合并得到原问题的解动态规范工作密取workStealing Fork/Join使用的标准范式 常用的并发工具类CountDownLatch作用:是一组线程等待其他的线程完成工作以后在执行,加强版joinawait用来等待,countDown负责计数器的减一CyclicBarrier让一组线程达到某个屏障,被阻塞,一直到组内最后一个线程达到屏

线程的并发工具类

一.fork/join 1.分而治之和工作密取概念 工作密取就是A干完了帮B线程干点活,并把结果返回给B: 2.代码 原文地址:https://www.cnblogs.com/wmqiang/p/11668883.html

并发工具类(四)线程间的交换数据 Exchanger

前言 ??JDK中为了处理线程之间的同步问题,除了提供锁机制之外,还提供了几个非常有用的并发工具类:CountDownLatch.CyclicBarrier.Semphore.Exchanger.Phaser: ??CountDownLatch.CyclicBarrier.Semphore.Phaser 这四个工具类提供一种并发流程的控制手段:而Exchanger工具类则提供了在线程之间交换数据的一种手段. 简介 ?? Exchanger的功能是使2个线程之间交换数据(有不少文章的说法是"传输数

Java线程与并发编程实践----并发工具类与Executor框架

java5之前,我们使用诸如synchronized,wait(),notify()方法对线程的操作属于对 底层线程的操作,这样会出现很多的问题: 低级的并发原语,比如synchronized,wait(),notify()经常难以正确使用.误用会导致 竞态条件,线程饿死,死锁等风险. 泰国依赖synchronized会影响程序性能以及程序的可扩展性 开发者经常需要高级线程结构,如线程池,信号量.java对底层线程的操作不包含这些结. 为解决这些问题,java5引入并发工具类,该工具类主要有下面

并发工具类(二)同步屏障CyclicBarrier

前言 ??JDK中为了处理线程之间的同步问题,除了提供锁机制之外,还提供了几个非常有用的并发工具类:CountDownLatch.CyclicBarrier.Semphore.Exchanger.Phaser: ??CountDownLatch.CyclicBarrier.Semphore.Phaser 这四个工具类提供一种并发流程的控制手段:而Exchanger工具类则提供了在线程之间交换数据的一种手段. 简介 ??CyclicBarrier 的字面意思是可循环使用(Cyclic)的屏障(Ba

Java并发(十六):并发工具类——Exchanger

Exchanger(交换者)是一个用于线程间协作的工具类.Exchanger用于进行线程间的数据交换.它提供一个同步点,在这个同步点两个线程可以交换彼此的数据.这两个线程通过exchange方法交换数据, 如果第一个线程先执行exchange方法,它会一直等待第二个线程也执行exchange,当两个线程都到达同步点时,这两个线程就可以交换数据,将本线程生产出来的数据传递给对方. 举例 class ExchangerTest { static class Producer implements R

【Java并发工具类】Java并发容器

前言 Java并发包有很大一部分都是关于并发容器的.Java在5.0版本之前线程安全的容器称之为同步容器.同步容器实现线程安全的方式:是将每个公有方法都使用synchronized修饰,保证每次只有一个线程能访问容器的状态.但是这样的串行度太高,将严重降低并发性,当多个线程竞争容器的锁时,吞吐量将严重降低.因此,在Java 5.0版本时提供了性能更高的容器来改进之前的同步容器,我们称其为并发容器. 下面我们先来介绍Java 5.0之前的同步容器,然后再来介绍Java 5.0之后的并发容器. Ja