java.util.concurrent常用类(CountDownLatch,Semaphore,CyclicBarrier,Future)

CyclicBarrier

CyclicBarrier是用来一个关卡来阻挡住所有线程,等所有线程全部执行到关卡处时,再统一执行下一步操作。假设一个场景:每个线程代表一个跑步运动员,当运动员都准备好后,才一起出发,只要有一个人没有准备好,大家就等待 。

代码示例:

public class UseCyclicBarrier {

    static class Runner implements Runnable {
        private CyclicBarrier barrier;
        private String name;  

        public Runner(CyclicBarrier barrier, String name) {
            this.barrier = barrier;
            this.name = name;
        }
        @Override
        public void run() {
            try {
                Thread.sleep(1000 * (new Random()).nextInt(5));
                System.out.println(name + " 准备OK.");
                barrier.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (BrokenBarrierException e) {
                e.printStackTrace();
            }
            System.out.println(name + " Go!!");
        }
    } 

    public static void main(String[] args) throws IOException, InterruptedException {
        CyclicBarrier barrier = new CyclicBarrier(3);  // 3
        ExecutorService executor = Executors.newFixedThreadPool(3);  

        executor.submit(new Thread(new Runner(barrier, "zhangsan")));
        executor.submit(new Thread(new Runner(barrier, "lisi")));
        executor.submit(new Thread(new Runner(barrier, "wangwu")));  

        executor.shutdown();
    }  

}  

结果:只有都准备OK了以后才继续执行await后面的代码

wangwu 准备OK.
lisi 准备OK.
zhangsan 准备OK.
zhangsan Go!!
lisi Go!!
wangwu Go!!

CountDownLacth

CountDownLatch是一个计数器闭锁,主要的功能就是通过await()方法来阻塞住当前线程,然后等待计数器减少到0了,再唤起这些线程继续执行。常用于监听某些初始化操作,等待初始化执行完毕后,通知主线程继续工作。

代码示例:

public class UseCountDownLatch {

    public static void main(String[] args) {

        final CountDownLatch countDown = new CountDownLatch(2);

        Thread t1 = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    System.out.println("进入线程t1" + "等待其他线程处理完成...");
                    countDown.await();
                    System.out.println("t1线程继续执行...");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"t1");

        Thread t2 = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    System.out.println("t2线程进行初始化操作...");
                    Thread.sleep(3000);
                    System.out.println("t2线程初始化完毕,通知t1线程继续...");
                    countDown.countDown();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
        Thread t3 = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    System.out.println("t3线程进行初始化操作...");
                    Thread.sleep(4000);
                    System.out.println("t3线程初始化完毕,通知t1线程继续...");
                    countDown.countDown();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });

        t1.start();
        t2.start();
        t3.start();
    }
}

结果:

t2线程进行初始化操作...
t3线程进行初始化操作...
进入线程t1等待其他线程处理完成...
t2线程初始化完毕,通知t1线程继续...
t3线程初始化完毕,通知t1线程继续...
t1线程继续执行...

CyclicBarrier和CountDownLatch的区别

CountDownLacth的计数器只能使用一次,而CyclicBarrier的计数器可以使用reset方法重置。所以CyclicBarrier能处理更为复杂的业务场景。例如,若计算发生错误,可以重置计数器,并让线程重新执行一次。

CyclicBarrier还提供其他有用的方法,比如getNumberWaiting方法可以获得CyclicBarrier阻塞的线程数量。isBroken()方法用来了解阻塞的线程是否被中断。

Semaphore

Semaphore与CountDownLatch相似,不同的地方在于Semaphore的值被获取到后是可以释放的,并不像CountDownLatch那样一直减到底。它也被更多地用来限制流量,类似阀门的 功能。如果限定某些资源最多有N个线程可以访问,那么超过N个主不允许再有线程来访问,同时当现有线程结束后,就会释放,然后允许新的线程进来。有点类似于锁的lock与 unlock过程。相对来说他也有两个主要的方法:

  1. 用于获取权限的acquire(),其底层实现与CountDownLatch.countdown()类似;
  2. 用于释放权限的release(),其底层实现与acquire()是一个互逆的过程。

代码层面的限流策略

Semaphore sema = new Semaphore(5);//这里的5就表示最多接受5个线程。

sema.aquire();//获取授权

代码块;

sema.release();//释放

代码示例:

public class UseSemaphore {  

    public static void main(String[] args) {
        // 线程池
        ExecutorService exec = Executors.newCachedThreadPool();
        // 只能5个线程同时访问
        final Semaphore semp = new Semaphore(5);
        // 模拟20个客户端访问
        for (int index = 0; index < 20; index++) {
            final int NO = index;
            Runnable run = new Runnable() {
                public void run() {
                    try {
                        // 获取许可
                        semp.acquire();
                        System.out.println("Accessing: " + NO);
                        //模拟实际业务逻辑
                        Thread.sleep((long) (Math.random() * 10000));
                        // 访问完后,释放
                        semp.release();
                    } catch (InterruptedException e) {
                    }
                }
            };
            exec.execute(run);
        } 

        try {
            Thread.sleep(10);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        //System.out.println(semp.getQueueLength());

        // 退出线程池
        exec.shutdown();
    }
}

Future

它的原理在之前介绍过了,下面看下concurrent包下的Future是怎么用的?

代码示例:

public class UseFuture implements Callable<String>{
    private String para;

    public UseFuture(String para){
        this.para = para;
    }

    /**
     * 这里是真实的业务逻辑,其执行可能很慢
     */
    @Override
    public String call() throws Exception {
        //模拟执行耗时
        Thread.sleep(5000);
        String result = this.para + "处理完成";
        return result;
    }

    //主控制函数
    public static void main(String[] args) throws Exception {
        String queryStr = "query";
        //构造FutureTask,并且传入需要真正进行业务逻辑处理的类,该类一定是实现了Callable接口的类
        FutureTask<String> future = new FutureTask<String>(new UseFuture(queryStr));

        FutureTask<String> future2 = new FutureTask<String>(new UseFuture(queryStr));
        //创建一个固定线程的线程池且线程数为1,
        ExecutorService executor = Executors.newFixedThreadPool(2);
        //这里提交任务future,则开启线程执行RealData的call()方法执行
        //submit和execute的区别: 第一点是submit可以传入实现Callable接口的实例对象, 第二点是submit方法有返回值

        Future f1 = executor.submit(future);        //单独启动一个线程去执行的
        Future f2 = executor.submit(future2);
        System.out.println("请求完毕");

        try {
            //这里可以做额外的数据操作,也就是主程序执行其他业务逻辑
            System.out.println("处理实际的业务逻辑...");
            Thread.sleep(1000);
        } catch (Exception e) {
            e.printStackTrace();
        }
        //调用获取数据方法,如果call()方法没有执行完成,则依然会进行等待
        System.out.println("数据:" + future.get());
        System.out.println("数据:" + future2.get());

        executor.shutdown();
    }

}

原文地址:https://www.cnblogs.com/lostyears/p/8426989.html

时间: 2024-07-31 10:16:20

java.util.concurrent常用类(CountDownLatch,Semaphore,CyclicBarrier,Future)的相关文章

java同步并发工具类CountDownLatch、CyclicBarrier和Semaphore

闭锁CountDownLatch 闭锁是一种同步工具类,可以延迟线程的进度直到其到达终止状态.闭锁的作用相当于一扇门:在闭锁到达结束状态之前,这扇门一直是关闭的,并且没有任何线程能通过,当到达结束状态时,这扇门会打开并允许所有的线程通过.当闭锁到达结束状态后,将不会再改变状态,因此这扇门将永远保持打开状态.闭锁可以用来确保某些活动直到其他活动都完成后才继续执行,例如: 确保某个计算在其需要的所有资源都被初始化之后才继续执行.二元闭锁(包括两个状态)可以用来表示"资源R已经被初始化",而

1.3.4 并发工具类CountDownLatch/Semaphore/CyclicBarrier/FutureTask

CountDownLatch的2个用途: 1. 所有线程都到达相同的起跑线后,再一起开始跑(并非同时开始,而是队列中一个唤醒另一个)[此情况需到达起跑线后再调用await()等待其他线程] 2. 所有线程都到达终点(执行完)后,再一起庆祝 (并非同时开始,而是队列中一个唤醒另一个)[此情况需到达起终点后再调用await()等待其他线程] package com.study.concurrent_utils; import java.util.concurrent.CountDownLatch;

[转载] java多线程学习-java.util.concurrent详解(二)Semaphore/FutureTask/Exchanger

转载自http://janeky.iteye.com/blog/770393 ----------------------------------------------------------------------------- 3. Semaphore     我们先来学习一下JDK1.5 API中关于这个类的详细介绍: “一个计数信号量.从概念上讲,信号量维护了一个许可集.如有必要,在许可可用前会阻塞每一个 acquire(),然后再获取该许可.每个 release() 添加一个许可,从

使用java.util.concurrent.ThreadFactory类创建线程

工厂设计模式是Java中最常用的设计模式之一.它是一种创建型设计模式,能够用于创建一个或多个类所需要的对象.有了这个工厂,我们就能集中的创建对象. 集中创建方式给我们带来了一些好处,例如: 1. 能够很容易的改变类创建的对象或者创建对象的方式: 2. 能够很容易限制对象的创建,例如:我们只能为a类创建N个对象: 3. 能够很容易的生成有关对象创建的统计数据. 在Java中,我们通常使用两种方式来创建线程:继承Thread类和实现Runnable接口.Java还提供了一个接口,既ThreadFac

java多线程学习-java.util.concurrent详解(三) Semaphore

转载于:http://janeky.iteye.com/blog/769965 我们先来学习一下JDK1.5 API中关于这个类的详细介绍: “一个计数信号量.从概念上讲,信号量维护了一个许可集.如有必要,在许可可用前会阻塞每一个 acquire(),然后再获取该许可.每个 release() 添加一个许可,从而可能释放一个正在阻塞的获取者.但是,不使用实际的许可对象,Semaphore 只对可用许可的号码进行计数,并采取相应的行动.” 我们一般用它来控制某个对象的线程访问对象 例如,对于某个容

Jakob Jenkov 写的 java.util.concurrent API 指南

1. java.util.concurrent - Java 并发工具包 Java 5 添加了一个新的包到 Java 平台,java.util.concurrent 包.这个包包含有一系列能够让 Java 的并发编程变得更加简单轻松的类.在这个包被添加以前,你需要自己去动手实现自己的相关工具类.本文我将带你一一认识 java.util.concurrent 包里的这些类,然后你可以尝试着如何在项目中使用它们.本文中我将使用 Java 6 版本,我不确定这和 Java 5 版本里的是否有一些差异.

Java 并发工具包 java.util.concurrent 大全

1. java.util.concurrent - Java 并发工具包 Java 5 添加了一个新的包到 Java 平台,java.util.concurrent 包.这个包包含有一系列能够让 Java 的并发编程变得更加简单轻松的类.在这个包被添加以前,你需要自己去动手实现自己的相关工具类. 本文我将带你一一认识 java.util.concurrent 包里的这些类,然后你可以尝试着如何在项目中使用它们.本文中我将使用 Java 6 版本,我不确定这和 Java 5 版本里的是否有一些差异

Java_并发工具包 java.util.concurrent 用户指南(转)

译序 本指南根据 Jakob Jenkov 最新博客翻译,请随时关注博客更新:http://tutorials.jenkov.com/java-util-concurrent/index.html.本指南已做成中英文对照阅读版的 pdf 文档,有兴趣的朋友可以去 Java并发工具包java.util.concurrent用户指南中英文对照阅读版.pdf[带书签] 进行下载. 1. java.util.concurrent - Java 并发工具包 Java 5 添加了一个新的包到 Java 平台

同步工具类 java.util.concurrent.CountDownLatch

1.类介绍 java.util.concurrent.CountDownLatch 一个同步辅助类,在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待. 用给定的计数 初始化 CountDownLatch.由于调用了 countDown() 方法,所以在当前计数到达零之前,await 方法会一直受阻塞.之后,会释放所有等待的线程,await 的所有后续调用都将立即返回.这种现象只出现一次--计数无法被重置.如果需要重置计数,请考虑使用 CyclicBarrier. 2.使用场景