多线程编程学习九(并发工具类).

CountDownLatch

  1. CountDownLatch 允许一个或多个线程等待其他线程完成操作。
  2. CountDownLatch 可以替代 join 的作用,并提供了更丰富的用法。
  3. CountDownLatch 的 countDown 方法,N 会减1;CountDownLatch 的 await 方法会阻塞当前线程,直到 N 变成零。
  4. CountDownLatch 不可能重新初始化或者修改 CountDownLatch 对象的内部计数器的值。
  5. CountDownLatch 内部由 AQS 共享锁实现。
public class CountDownLatchTest {

    private static final CountDownLatch DOWN_LATCH = new CountDownLatch(2);

    public static void main(String[] args) throws InterruptedException {
        new Thread(() -> {
            System.out.println(1);
            DOWN_LATCH.countDown();
            System.out.println(2);
            DOWN_LATCH.countDown();

        }).start();
        DOWN_LATCH.await();
        System.out.println("3");
    }
}

CyclicBarrier

  1. CyclicBarrier 设置一个屏障(也可以叫同步点),拦截阻塞一组线程,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续运行。
  2. CyclicBarrier 默认的构造方法是CyclicBarrier(int parties),其参数表示屏障拦截的线程数量,每个线程调用await方法告诉 CyclicBarrier 我已经到达了屏障,然后当前线程被阻塞。
  3. CyclicBarrier 还提供一个更高级的构造函数 CyclicBarrier(int parties,Runnable barrierAction),用于在线程到达屏障时,优先执行 barrierAction,方便处理更复杂的业务场景。
  4. getNumberWaiting 方法可以获得 CyclicBarrier 阻塞的线程数量;isBroken()方法用来了解阻塞的线程是否被中断。
  5. CyclicBarrier 的计数器可以使用 reset() 方法重置(CountDownLatch 的计数器只能使用一次)。所以 CyclicBarrier 能处理更为复杂的业务场景。例如,如果计算发生错误,可以重置计数器,并让线程重新执行一次。
  6. CyclicBarrier 可以用于多线程计算数据,最后合并计算结果的场景。
  7. CyclicBarrier 内部采用重入锁 ReentrantLock 实现。
public class BankWaterService implements Runnable {

    // 创建4个屏障,处理完之后执行当前类的run方法
    private CyclicBarrier barrier = new CyclicBarrier(4, this);
    // 假设有4个计算任务,所以只启动4个线程
    private Executor executor = Executors.newFixedThreadPool(4);
    // 保存每个任务的计算结果
    private ConcurrentHashMap<String, Integer> sheetBankWaterCount = new ConcurrentHashMap<>();

    private AtomicInteger atomicInteger = new AtomicInteger(1);

    private void count() {
        for (int i = 0; i < 4; i++) {
            Thread thread = new Thread(() -> {
                // 当前任务的计算结果,计算过程忽略
                sheetBankWaterCount.put(Thread.currentThread().getName(), 1);
                // 计算完成,插入一个屏障
                try {
                    barrier.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }

            }, "线程" + atomicInteger.getAndIncrement());
            executor.execute(thread);
        }
    }

    @Override
    public void run() {
        int result = 0;
        // 汇总每个任务计算出的结果
        for (Map.Entry<String, Integer> sheet : sheetBankWaterCount.entrySet()) {
            result += sheet.getValue();
        }
        //将结果输出
        sheetBankWaterCount.put("result", result);
        System.out.println(result);
    }

    public static void main(String[] args) {
        BankWaterService bankWaterCount = new BankWaterService();
        bankWaterCount.count();
    }
}

Semaphore

  1. Semaphore(信号量)是用来控制同时访问特定资源的线程数量,它通过协调各个线程,以保证合理的使用公共资源。
  2. Semaphore 可以用于做流量控制,特别是公用资源有限的应用场景,比如数据库连接。
  3. Semaphore的构造方法 Semaphore(int permits) 接受一个整型的数字,表示可用的许可证数量。
  4. 首先线程使用 Semaphore 的 acquire() 方法获取一个许可证,使用完之后调用 release() 方法归还许可证。还可以用 tryAcquire() 方法尝试获取许可证。
  5. intavailablePermits():返回此信号量中当前可用的许可证数。
  6. intgetQueueLength():返回正在等待获取许可证的线程数。
  7. booleanhasQueuedThreads():是否有线程正在等待获取许可证。
  8. Semaphore 内部使用 AQS 共享锁实现。
public class SemaphoreTest {

    private static final int THREAD_COUNT = 30;
    private static ExecutorService EXECUTOR = Executors.newFixedThreadPool(THREAD_COUNT);
    private static Semaphore SEMAPHORE = new Semaphore(10);
    private static AtomicInteger ATOMICINTEGER = new AtomicInteger(1);

    public static void main(String[] args) {
        for (int i = 0; i < THREAD_COUNT; i++) {
            EXECUTOR.execute(() -> {
                try {
                    SEMAPHORE.acquire();
                    System.out.println("save data" + ATOMICINTEGER.getAndIncrement());
                    SEMAPHORE.release();
                } catch (InterruptedException e) {
                }

            });
        }
        EXECUTOR.shutdown();
    }
}

Exchanger

  1. Exchanger(交换者)是一个用于线程间协作的工具类 —— 用于线程间的数据交换。它提供一个同步点,在这个同步点,两个线程可以交换彼此的数据。这两个线程通过 exchange 方法交换数据,如果第一个线程先执行 exchange() 方法,它会一直等待第二个线程也执行 exchange 方法。
  2. 可简单地将 Exchanger 对象理解为一个包含两个格子的容器,通过 exchanger 方法可以向两个格子中填充信息。当两个格子中的均被填充时,该对象会自动将两个格子的信息交换,然后返回给线程,从而实现两个线程的信息交换。
  3. Exchanger 可用于遗传算法。(遗传算法:需要选出两个人作为交配对象,这时候会交换两人的数据,并使用交叉规则得出交配结果)
  4. Exchanger 可用于校对工作,比如一份数据需要两个人同时进行校对,都校对无误后,才能进行后续处理。这时,就可以使用 Exchanger 比较两份校对结果。
  5. Exchanger 内部采用无锁 CAS 实现,Exchange 使用了内部对象 Node 的两个属性 — item 、match,分布存储两个线程的值。
public class ExchangerTest {

    private static final Exchanger<String> exchange = new Exchanger<>();
    private static ExecutorService threadPool = Executors.newFixedThreadPool(2);

    public static void main(String[] args) {
        threadPool.execute(() -> {
            try {
                String result = exchange.exchange("数据A");
                System.out.println("A的exchange结果:" + result);
            } catch (InterruptedException e) {
            }

        });
        threadPool.execute(() -> {
            try {
                String result = exchange.exchange("数据B");
                System.out.println("B的exchange结果:" + result);
            } catch (InterruptedException e) {
            }
        });
        threadPool.shutdown();
    }
}

原文地址:https://www.cnblogs.com/jmcui/p/11511413.html

时间: 2024-10-09 04:16:39

多线程编程学习九(并发工具类).的相关文章

Java并发编程-线程的并发工具类

Fork-Join 什么是分而治之?规模为N的问题,N<阈值,直接解决,N>阈值,将N分解为K个小规模子问题,子问题互相对立,与原问题形式相同,将子问题的解合并得到原问题的解动态规范工作密取workStealing Fork/Join使用的标准范式 常用的并发工具类CountDownLatch作用:是一组线程等待其他的线程完成工作以后在执行,加强版joinawait用来等待,countDown负责计数器的减一CyclicBarrier让一组线程达到某个屏障,被阻塞,一直到组内最后一个线程达到屏

多线程系列五:并发工具类和并发容器

一.并发容器 1.ConcurrentHashMap 为什么要使用ConcurrentHashMap 在多线程环境下,使用HashMap进行put操作会引起死循环,导致CPU利用率接近100%,HashMap在并发执行put操作时会引起死循环,是因为多线程会导致HashMap的Entry链表 形成环形数据结构,一旦形成环形数据结构,Entry的next节点永远不为空,就会产生死循环获取Entry. HashTable容器使用synchronized来保证线程安全,但在线程竞争激烈的情况下Hash

Java学习笔记—多线程(并发工具类,java.util.concurrent.atomic包)

在JDK的并发包里提供了几个非常有用的并发工具类.CountDownLatch.CyclicBarrier和Semaphore工具类提供了一种并发流程控制的手段,Exchanger工具类则提供了在线程间交换数据的一种手段.本章会配合一些应用场景来介绍如何使用这些工具类. CountDownLatch CountDownLatch允许一个或多个线程等待其他线程完成操作.假如有这样一个需求:我们需要解析一个Excel里多个sheet的数据,此时可以考虑使用多线程,每个线程解析一个sheet里的数据,

多线程编程学习笔记——使用并发集合(三)

接上文 多线程编程学习笔记——使用并发集合(一) 接上文 多线程编程学习笔记——使用并发集合(二) 四.   使用ConcurrentBag创建一个可扩展的爬虫 本示例在多个独立的即可生产任务又可消费任务的工作者间如何扩展工作量. 1.程序代码如下. using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; using Sy

Java线程与并发编程实践----并发工具类与Executor框架

java5之前,我们使用诸如synchronized,wait(),notify()方法对线程的操作属于对 底层线程的操作,这样会出现很多的问题: 低级的并发原语,比如synchronized,wait(),notify()经常难以正确使用.误用会导致 竞态条件,线程饿死,死锁等风险. 泰国依赖synchronized会影响程序性能以及程序的可扩展性 开发者经常需要高级线程结构,如线程池,信号量.java对底层线程的操作不包含这些结. 为解决这些问题,java5引入并发工具类,该工具类主要有下面

Java并发编程系列-(2) 线程的并发工具类

2.线程的并发工具类 2.1 Fork-Join JDK 7中引入了fork-join框架,专门来解决计算密集型的任务.可以将一个大任务,拆分成若干个小任务,如下图所示: Fork-Join框架利用了分而治之的思想:什么是分而治之?规模为N的问题,N<阈值,直接解决,N>阈值,将N分解为K个小规模子问题,子问题互相对立,与原问题形式相同,将子问题的解合并得到原问题的解. 具体使用中,需要向ForkJoinPool线程池提交一个ForkJoinTask任务.ForkJoinTask任务有两个重要

并发编程(2)--线程的并发工具类

1.线程的并发工具类 Fork-Join 什么是分而治之? 规模为N的问题,N<阈值,直接解决,N>阈值,将N分解为K个小规模子问题,子问题互相对立,与原问题形式相同,将子问题的解合并得到原问题的解 动态规范 工作密取 workStealing Fork/Join使用的标准范式 下面演示第一种用法:由于上下文切换的原因,所以性能上有可能不如单线程效果好. package com.xiangxue.ch2.forkjoin.sum; import java.util.Random; /** *

并发编程—2并发工具类

目录 2.线程的工具类 2.1 fork/join框架 2.2 CountDownLatch 一般用法 2.3 CycliBarrier 2.4 Semaphore 2.5 Exchange 使用举例 2.6 Callable Future and FutureTask 2.线程的工具类 2.1 fork/join框架 ### 什么是分而治之 简单地说把一个大的问题,拆分成若干个子问题,每个问题相互独立,且和原来问题形式相同.最后将每个子问题的解合并得到原问题的解答. ### 什么是工作密取 #

多线程编程学习笔记——线程同步(三)

接上文 多线程编程学习笔记——线程同步(一) 接上文 多线程编程学习笔记——线程同步(二) 七.使用Barrier类 Barrier类用于组织多个线程及时在某个时刻会面,其提供一个回调函数,每次线程调用了SignalAndWait方法后该回调函数就会被执行. 1.代码如下: using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading; //