Java并发编程系列-(2) 线程的并发工具类

2.线程的并发工具类

2.1 Fork-Join

JDK 7中引入了fork-join框架,专门来解决计算密集型的任务。可以将一个大任务,拆分成若干个小任务,如下图所示:

Fork-Join框架利用了分而治之的思想:什么是分而治之?规模为N的问题,N<阈值,直接解决,N>阈值,将N分解为K个小规模子问题,子问题互相对立,与原问题形式相同,将子问题的解合并得到原问题的解.

具体使用中,需要向ForkJoinPool线程池提交一个ForkJoinTask任务。ForkJoinTask任务有两个重要的子类,RecursiveAction类和RecursiveTask类,他们分别表示没有返回值的任务和可以有返回值的任务。

RecursiveAction类

下面的例子中,我们使用RecursiveAction遍历指定目录来寻找特定类型文件,需要实现compute方法。

public class FindDirsFiles extends RecursiveAction{

    private File path;//当前任务需要搜寻的目录

    public FindDirsFiles(File path) {
        this.path = path;
    }

    public static void main(String [] args){
        try {
            // 用一个 ForkJoinPool 实例调度总任务
            ForkJoinPool pool = new ForkJoinPool();
            FindDirsFiles task = new FindDirsFiles(new File("F:/"));

            pool.execute(task);//异步调用

            System.out.println("Task is Running......");
            Thread.sleep(1);
            int otherWork = 0;
            for(int i=0;i<100;i++){
                otherWork = otherWork+i;
            }
            System.out.println("Main Thread done sth......,otherWork="+otherWork);
            task.join();//阻塞的方法
            System.out.println("Task end");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    @Override
    protected void compute() {

        List<FindDirsFiles> subTasks = new ArrayList<>();

        File[] files = path.listFiles();
        if(files!=null) {
            for(File file:files) {
                if(file.isDirectory()) {
                    subTasks.add(new FindDirsFiles(file));
                }else {
                    //遇到文件,检查
                    if(file.getAbsolutePath().endsWith("txt")) {
                        System.out.println("文件:"+file.getAbsolutePath());
                    }
                }
            }
            if(!subTasks.isEmpty()) {
                for(FindDirsFiles subTask:invokeAll(subTasks)) {
                    subTask.join();//等待子任务执行完成
                }
            }
        }
    }
}

RecursiveTask类

下面的例子中,利用RecursiveTask来实现数值累加。

public class SumArray {
    private static class SumTask extends RecursiveTask<Integer>{

        private final static int THRESHOLD = MakeArray.ARRAY_LENGTH/10;
        private int[] src; //表示我们要实际统计的数组
        private int fromIndex;//开始统计的下标
        private int toIndex;//统计到哪里结束的下标

        public SumTask(int[] src, int fromIndex, int toIndex) {
            this.src = src;
            this.fromIndex = fromIndex;
            this.toIndex = toIndex;
        }

        @Override
        protected Integer compute() {
            if(toIndex-fromIndex < THRESHOLD) {
                int count = 0;
                for(int i=fromIndex;i<=toIndex;i++) {
                    //SleepTools.ms(1);
                    count = count + src[i];
                }
                return count;
            }else {
                //fromIndex....mid....toIndex
                //1...................70....100
                int mid = (fromIndex+toIndex)/2;
                SumTask left = new SumTask(src,fromIndex,mid);
                SumTask right = new SumTask(src,mid+1,toIndex);
                invokeAll(left,right);
//              left.fork();
//              right.fork();
                return left.join()+right.join();
            }
        }
    }

    public static void main(String[] args) {

        ForkJoinPool pool = new ForkJoinPool();
        int[] src = MakeArray.makeArray();

        SumTask innerFind = new SumTask(src,0,src.length-1);

        long start = System.currentTimeMillis();

        pool.invoke(innerFind);//同步调用
        System.out.println("Task is Running.....");

        System.out.println("The count is "+innerFind.join()
                +" spend time:"+(System.currentTimeMillis()-start)+"ms");

    }
}

注意到fork和invokeAll都能达到相同的效果,只是fork将task交给工作线程后立刻返回;但是invokeAll会fork其中一个任务后,同时同步的调用另外一个任务,然后等待两个任务完成,可以参考invokeAll的实现:

ublic static void invokeAll(ForkJoinTask<?> t1, ForkJoinTask<?> t2) {
        int s1, s2;
        t2.fork();
        if (((s1 = t1.doInvoke()) & ABNORMAL) != 0)
            t1.reportException(s1);
        if (((s2 = t2.doJoin()) & ABNORMAL) != 0)
            t2.reportException(s2);
    }

工作密取(Work Stealing)

在后台,fork-join框架使用了一种有效的方法来平衡可用线程的负载,称为工作密取(Work stealing)。每个工作线程都有一个双端队列(deque)来完成任务,一个工作线程将子任务压入其双端队列的队头。当一个工作线程空闲时,它会从另一个双端队列的队尾密取一个任务。

ForkJoinPool内部利用循环数组实现了一个双端队列,称为WorkQueue。对于这个Queue,有3种操作方法,分别是push、pop和poll。对于push和pop操作,只能被拥有该Queue的线程所调用。poll操作被用于其他工作线程从该Queue中获得task。

考虑到多线程steal work的情况,当进行poll操作时,会通过CAS操作来保证多线程下的安全性。如果CAS操作成功,则说明窃取成功。

更多细节可以查看ForkJoinPool的实现以及论文https://www.dre.vanderbilt.edu/~schmidt/PDF/work-stealing-dequeue.pdf。

invoke && execute && submit 区别

invoke是同步调用,它会马上执行执行,并且将task join到当前线程,也就是阻塞当前线程。

    /**
     * Performs the given task, returning its result upon completion.
     * If the computation encounters an unchecked Exception or Error,
     * it is rethrown as the outcome of this invocation.  Rethrown
     * exceptions behave in the same way as regular exceptions, but,
     * when possible, contain stack traces (as displayed for example
     * using {@code ex.printStackTrace()}) of both the current thread
     * as well as the thread actually encountering the exception;
     * minimally only the latter.
     *
     * @param task the task
     * @param <T> the type of the task's result
     * @return the task's result
     * @throws NullPointerException if the task is null
     * @throws RejectedExecutionException if the task cannot be
     *         scheduled for execution
     */
    public <T> T invoke(ForkJoinTask<T> task) {
        if (task == null)
            throw new NullPointerException();
        externalSubmit(task);
        return task.join();
    }

execute和submit是异步调用,它会将Task送到Work Queue中等待运行。如果需要看到运行结果,可以在execute和submit后调用join方法。两者的区别只是submit会返回task,execute返回空值。

    /**
     * Submits a ForkJoinTask for execution.
     *
     * @param task the task to submit
     * @param <T> the type of the task's result
     * @return the task
     * @throws NullPointerException if the task is null
     * @throws RejectedExecutionException if the task cannot be
     *         scheduled for execution
     */
    public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) {
        return externalSubmit(task);
    }

下面是execute的实现:

    /**
     * Arranges for (asynchronous) execution of the given task.
     *
     * @param task the task
     * @throws NullPointerException if the task is null
     * @throws RejectedExecutionException if the task cannot be
     *         scheduled for execution
     */
    public void execute(ForkJoinTask<?> task) {
        externalSubmit(task);
    }

2.2 CountDownLatch

Latch是门栓的意思,顾名思义,CountDownLatch是一个多线程的控制工具类。通常用于让一组线程等待直到倒计时结束,再开始执行。

CountDownLatch的用法如下,

  1. 初始化count down的次数;
  2. 在初始化线程中调用countDown对计数器进行减1;
  3. 工作线程中调用await进行等待,当计时器为0时,工作线程开始工作。
public class UseCountDownLatch {

    static CountDownLatch latch = new CountDownLatch(6);

    // 初始化线程(只有一步,有4个)
    private static class InitThread implements Runnable{

        @Override
        public void run() {
            System.out.println("Thread_"+Thread.currentThread().getId() +" finish init work......");

            latch.countDown();//初始化线程完成工作了,countDown方法只扣减一次;

            // We can add some tasks after the countDown is invoked
            for(int i =0;i<2;i++) {
                System.out.println("Thread_"+Thread.currentThread().getId() +" ........continue do its work");
            }
        }
    }

    // 业务线程
    private static class BusinessThread implements Runnable{
        @Override
        public void run() {
            try {
                latch.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            for(int i =0;i<3;i++) {
                System.out.println("BusinessThread_"+Thread.currentThread().getId() +" start to do business-----");
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        // 单独的初始化线程,初始化分为2步,需要扣减两次
        new Thread(new Runnable() {
            @Override
            public void run() {
                SleepTools.ms(1);
                System.out.println("Thread_"+Thread.currentThread().getId() +" finish init work step 1st......");
                latch.countDown();//每完成一步初始化工作,扣减一次

                System.out.println("begin step 2nd.......");
                SleepTools.ms(1);
                System.out.println("Thread_"+Thread.currentThread().getId() +" finish init work step 2nd......");
                latch.countDown();//每完成一步初始化工作,扣减一次
            }
        }).start();

        new Thread(new BusinessThread()).start();

        // Start 3 new init thread
        for(int i=0;i<=3;i++){
            Thread thread = new Thread(new InitThread());
            thread.start();
        }

        latch.await();

        System.out.println("Main do ites work........");
    }
}

2.3 CyclicBarrier

CyclicBarrier类实现了一个集结点,称为屏障(barrier)。当一个线程完成了那部分任务之后,它运行到屏障处,一旦所有线程都到达了这个屏障,屏障就撤销,线程就可以继续运行了。

CyclicBarrier的用法如下:

  1. 构造一个Barrier,需要给出参与的线程数。JDK里提供了两个构造函数,barrierAction为屏障打开之后需要执行的action。
CyclicBarrier(int parties) 

CyclicBarrier(int parties, Runnable barrierAction)
  1. 每个线程做一些事情,完成后在屏障上调用await等待,
public void run() {
    doSomeWork();
    barrier.await();
    ...
}
  1. 当所有线程都到达了await后,此时屏障打开。如果有定义屏障打开后执行的action,则会先执行action。然后其他线程继续往下执行await后面的部分。

下面是具体的例子:

在打开屏障后,输出了各个线程的id。

public class UseCyclicBarrier {

    private static CyclicBarrier barrier = new CyclicBarrier(5,new TaskAfterBarrierIsOpenThread());

    private static ConcurrentHashMap<String,Long> resultMap = new ConcurrentHashMap<>();//存放子线程工作结果的容器

    public static void main(String[] args) {
        for(int i=0;i< 5;i++){
            Thread thread = new Thread(new SubThread());
            thread.start();
        }
    }

    //负责屏障开放以后的工作
    private static class TaskAfterBarrierIsOpenThread implements Runnable{

        @Override
        public void run() {
            StringBuilder result = new StringBuilder();
            for(Map.Entry<String,Long> workResult:resultMap.entrySet()){
                result.append("["+workResult.getValue()+"]");
            }
            System.out.println(" the result = "+ result);
            System.out.println("do other business........");
        }
    }

    //工作线程
    private static class SubThread implements Runnable{

        @Override
        public void run() {
            long id = Thread.currentThread().getId();//线程本身的处理结果
            resultMap.put(Thread.currentThread().getId()+"",id);
            Random r = new Random();//随机决定工作线程的是否睡眠
            try {
                if(r.nextBoolean()) {
                    Thread.sleep(2000+id);
                    System.out.println("Thread_"+id+" ....do something ");
                }
                System.out.println(id+"....is await");
                barrier.await();
                Thread.sleep(1000+id);
                System.out.println("Thread_"+id+" ....do its business ");
            } catch (Exception e) {
                e.printStackTrace();
            }

        }
    }
}

注意屏障是可以重复使用的,当所有等待线程被释放后可以被重用。

CountDownLatch和CyclicBarrier对比
countdownlatch的放行由第三者控制,CyclicBarrier放行由一组线程本身控制
countdownlatch放行条件 >= 线程数,CyclicBarrier放行条件 = 线程数

2.4 Semaphore

Semaphore也叫信号量,在JDK1.5被引入,可以用来控制同时访问特定资源的线程数量,通过协调各个线程,以保证合理的使用资源。

Semaphore内部维护了一组虚拟的许可,许可的数量可以通过构造函数的参数指定。

  • 访问特定资源前,必须使用acquire方法获得许可,如果许可数量为0,该线程则一直阻塞,直到有可用许可。
  • 访问资源后,使用release释放许可。

示例程序如下:

public class MySemaphoreTest {
    static Semaphore semaphore = new Semaphore(4);

    private static class BusinessThread extends Thread {
        String name = "";

        BusinessThread(String name) {
            this.name = name;
        }

        @Override
        public void run() {
            try {
                System.out.println(name + " try to acquire lock...");
                System.out.println(name + " : available Semaphore permits now: " + semaphore.availablePermits());

                semaphore.acquire();

                System.out.println(name + " : got the permit!");            

                // Do some business work
                Thread.sleep(1000);

                System.out.println(name + " : release lock...");
                semaphore.release();

            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) {
        for (int i = 0; i < 6; i++) {
            new BusinessThread((i+1) + "").start();
        }
    }
}

2.5 Exchanger

当两个线程在同一个数据缓冲区的两个实例上工作时,就可以使用Exchanger。典型的情况是,一个线程向缓冲区填入数据,另一个线程消耗这些数据。当他们都完成之后,相互交换缓冲区。

下面的例子中,在两个线程中分别填入数据,然后交换数据,最后打印从对方线程交换得来的数据。

public class UseExchange {
    private static final Exchanger<Set<String>> exchange
        = new Exchanger<Set<String>>();

    public static void main(String[] args) {

        //第一个线程
        new Thread(new Runnable() {
            @Override
            public void run() {
                Set<String> setA = new HashSet<String>();//存放数据的容器
                try {
                    setA.add("1");
                    System.out.println(Thread.currentThread().getName()  + " Add 1 to the set");
                    setA.add("2");
                    System.out.println(Thread.currentThread().getName()  + " Add 2 to the set");
                    setA.add("3");
                    System.out.println(Thread.currentThread().getName()  + " Add 3 to the set");

                    setA = exchange.exchange(setA);//交换setA出去,返回交换来的数据setB

                    /*处理交换后的数据*/
                    System.out.println(Thread.currentThread().getName()  + " print the data after exchange ");
                    setA.forEach(string -> System.out.println(Thread.currentThread().getName() + " print" + string));
                } catch (InterruptedException e) {
                }
            }
        }).start();

      //第二个线程
        new Thread(new Runnable() {
            @Override
            public void run() {
                Set<String> setB = new HashSet<String>();//存放数据的容器
                try {
                    setB.add("A");
                    System.out.println(Thread.currentThread().getName()  + " Add A to the set");

                    setB.add("B");
                    System.out.println(Thread.currentThread().getName()  + " Add B to the set");

                    setB.add("C");
                    System.out.println(Thread.currentThread().getName()  + " Add C to the set");

                    setB = exchange.exchange(setB);//交换setB出去,返回交换来的数据setA

                    /*处理交换后的数据*/
                    System.out.println(Thread.currentThread().getName()  + " print the data after exchange ");
                    setB.forEach(string -> System.out.println(Thread.currentThread().getName() + " print" + string));
                } catch (InterruptedException e) {
                }
            }
        }).start();
    }
}

2.6 Future、Callable和FutureTask

Future是多线程开发中的一种常见设计模式,核心思想是异步调用。当需要调用一个函数方法时,如果这个函数很慢,需要进行等待,这时可以先处理一些其他任务,在真正需要数据的时候再去尝试获得需要的数据。

以上是Future的基本结构,RunnableFuture继承了Future和Runnable接口,FutureTask可以接收一个Callable实例作为运行的任务。

Future的使用比较简单,例子如下:

public class UseFuture {

    /*实现Callable接口,允许有返回值*/
    private static class UseCallable implements Callable<Integer>{

        private int sum;
        @Override
        public Integer call() throws Exception {
            System.out.println("Callable子线程开始计算");
            Thread.sleep(2000);
            for(int i=0;i<5000;i++) {
                sum = sum+i;
            }
            System.out.println("Callable子线程计算完成,结果="+sum);
            return sum;
        }

    }

    public static void main(String[] args)
            throws InterruptedException, ExecutionException {

        UseCallable useCallable = new UseCallable();
        FutureTask<Integer> futureTask = new FutureTask<Integer>(useCallable);
        new Thread(futureTask).start();

        Random r = new Random();
        SleepTools.second(1);
        if(r.nextBoolean()) {//随机决定是获得结果还是终止任务
            System.out.println("Get UseCallable result = "+futureTask.get());
        }else {
            System.out.println("中断计算");
            futureTask.cancel(true);
        }
    }
}

注意Future接口中声明了5个方法,分别为:

  • cancel方法:用来取消任务,如果取消任务成功则返回true,如果取消任务失败则返回false。参数mayInterruptIfRunning表示是否允许取消正在执行却没有执行完毕的任务,如果设置true,则表示可以取消正在执行过程中的任务。如果任务已经完成,则无论mayInterruptIfRunning为true还是false,此方法肯定返回false,即如果取消已经完成的任务会返回false;如果任务正在执行,若mayInterruptIfRunning设置为true,则返回true,若mayInterruptIfRunning设置为false,则返回false;如果任务还没有执行,则无论mayInterruptIfRunning为true还是false,肯定返回true。
  • isCancelled方法:表示任务是否被取消成功,如果在任务正常完成前被取消成功,则返回 true。
  • isDone方法:表示任务是否已经完成,若任务完成,则返回true;
  • get()方法:用来获取执行结果,这个方法会产生阻塞,会一直等到任务执行完毕才返回;
  • get(long timeout, TimeUnit unit):用来获取执行结果,如果在指定时间内,还没获取到结果,就直接返回null。


本文由『后端精进之路』原创,首发于博客 http://teckee.github.io/ , 转载请注明出处

搜索『后端精进之路』关注公众号,立刻获取最新文章和价值2000元的BATJ精品面试课程

原文地址:https://www.cnblogs.com/way2backend/p/12004018.html

时间: 2024-11-10 23:24:27

Java并发编程系列-(2) 线程的并发工具类的相关文章

Java并发编程系列(一)-线程的基本使用

最近在学习java并发编程基础.一切从简,以能理解概念为主. 并发编程肯定绕不过线程.这是最基础的. 那么就从在java中,如何使用线程开始. 继承Thread类 继承Thread类,重写run方法,new出对象,调用start方法. 在新启的线程里运行的就是重写的run方法. 1 /** 2 * 集成Thread类 实现run() 3 */ 4 public class C1 extends Thread { 5 6 @Override 7 public void run() { 8 try

Java 并发编程(四)常用同步工具类

同步工具类可以使任何一种对象,只要该对象可以根据自身的状态来协调控制线程的控制流.阻塞队列可以作为同步工具类,其他类型的同步工具类还包括:信号量(Semaphore).栅栏(Barrier)以及闭锁(Latch). 闭锁 首先我们来介绍闭锁. 闭锁作用相当于一扇门:在闭锁到达某一状态之前,这扇门一直是关闭的,所有的线程都会在这扇门前等待(阻塞).只有门打开后,所有的线程才会同时继续运行. 闭锁可以用来确保某些活动直到其它活动都完成后才继续执行,例如: 1.确保某个计算在其所有资源都被初始化之后才

Java并发编程的艺术,解读并发编程的优缺点

并发编程的优缺点 使用并发的原因 多核的CPU的背景下,催生了并发编程的趋势,通过并发编程的形式可以将多核CPU的计算能力发挥到极致,性能得到提升. 在特殊的业务场景下先天的就适合于并发编程. 比如在图像处理领域,一张1024X768像素的图片,包含达到78万6千多个像素.即时将所有的像素遍历一边都需要很长的时间, 面对如此复杂的计算量就需要充分利用多核的计算的能力.又比如当我们在网上购物时,为了提升响应速度,需要拆分,减库存, 生成订单等等这些操作,就可以进行拆分利用多线程的技术完成. 面对复

深入探索并发编程系列(八)-Acquire与Release语义

一般来说,在无锁(lock-free)注1编程中,线程有两种方法来操作共享内存:线程间相互竞争一种资源或者相互合作传递消息.Acquire与Release语义对后者来说很关键:保证在线程间可靠地相互传递消息.实际上,我大胆地猜测,不正确的或者缺乏Acquire与Release语义是导致无锁编程产生错误的最常见 原因. 在这篇文章中,我会去探讨许多在C++中获得Acquire与Release 语义的方法.还会简单介绍一下C++11原子库标准.所以,你事先不必具备这方面的知识.简明起见,这里的讨论仅

Java多线程编程基础之线程对象

在进入java平台的线程对象之前,基于基础篇(一)的一些问题,我先插入两个基本概念. [线程的并发与并行] 在单CPU系统中,系统调度在某一时刻只能让一个线程运行,虽然这种调试机制有多种形式(大多数是时间片轮巡为主),但无论如何,要通过不断切换需要运行的线程让其运行的方式就叫并发(concurrent).而在多CPU系统中,可以让两个以上的线程同时运行,这种可以同时让两个以上线程同时运行的方式叫做并行(parallel). 在上面包括以后的所有论述中,请各位朋友谅解,我无法用最准确的词语来定义储

多媒体编程——ios摄像头图像抓取工具类

工具类提供预览图像画面,自动处理旋转,并且以主动方式抓取图像(这样帧率可以无限大) 系统的接口多是异步接收图像,像我这种强迫症怎么受得了,必须吧被动接收图像的方式改成主动抓取. 头文件 #import <Foundation/Foundation.h> #import <AVFoundation/AVFoundation.h> //这些比例都是4:3的比例. typedef enum TKVideoFrameSize { tkVideoFrame480x360 = 480 <

JavApi 以类似 Java API 的形式提供一组 .NET 的工具类

javApi 以类似 Java API 的形式提供一组 .NET 的工具类. 下面是一些示例代码: SampleGZIPOutputStream2File.cs: GZip a file content into other file.?(Example for: FileInputStream, FileOutputStream, GZIPOutputStream) SampleTextFileOutputStream.cs: Write text or binaries to file.?(

java 实体对象与Map之间的转换工具类(自己还没看)

java实体对象与Map之间的转换工具类 import java.lang.reflect.Field; import java.util.HashMap; import java.util.Map; public class EntityUtils { /** * 实体类转Map * @param object * @return */ public static Map<String, Object> entityToMap(Object object) { Map<String,

Java并发编程系列之二十五:线程池

线程池简介 在之前介绍Executor框架的文章中对线程池有一个初步的认识,实际上线程池这种设计思想在Java中很普遍,比如JVM中常量池,以及Web开发使用到的数据库连接池.这些池本质上还是Java中的对象池,因为池中存放的都是Java对象.回到线程池,几乎所有需要异步或者执行并发任务的程序都可以使用到线程池.使用线程池带来的好处主要包括以下几个方面: 一,提高资源利用率.由于线程池中的线程使可以重复利用的,所以达到了循环利用的目的 二,提高响应速度.由于线程的创建也是需要开销的,如果请求到来