[笔记][Java7并发编程实战手册]4.5 运行多个任务并处理第一个结果ThreadPoolExecutor

[笔记][Java7并发编程实战手册]系列目录


简介

  看到这个标题的时候,我也很纳闷,因为没有明白所表达的意思。

  ok,并发编程比较常见的一个问题是:当采用多个并发任务来解决一个问题的时候,往往只需要关心这个任务的第一个结果,例如:验证一个算法的时候,假如一个执行5个算法,那么最先返回结果的,就是最快的。

在本章将会学习,如何使用ThreadPoolExecutor来实现类似场景;


本章ThreadPoolExecutor使用心得

  1. 使用 ThreadPoolExecutor.invokeAny(list); 让线程池来帮我们拿到最快返回结果的结果。//执行给定的任务,如果某个任务已成功完成(也就是未抛出异常),则返回其结果。一旦正常或异常返回后,则取消尚未完成的任务。如果此操作正在进行时修改了给定的 collection,则此方法的结果是不确定的。
  2. jdk 中说明了:任务成的标识是:未抛出异常,正常返回;
  3. 拿到第一个结果后,执行器会取消未完成的任务,而在多线程中:当线程在活动之前或活动期间处于正在等待、休眠或占用状态且该线程被中断时,抛出InterruptedException异常
  4. 如果所有任务都抛出了异常,那么最终返回结果的时候也会抛出异常。(抛出的异常按照最后返回task)
  5. 还有一个invokeAll方法:当所有任务完成时返回所有任务的future列表。

示例

场景描述:以下程序来模拟在数据库中查找两个用户名(根据用户名和密码),获取最快查询到的用户。

/**
 * Created by zhuqiang on 2015/8/30 0030.
 */
public class Client {
    public static void main(String[] args) {
        ArrayList<Task> list = new ArrayList<Task>();
        list.add(new Task(new UserValidator(),"小强","123456"));
        list.add(new Task(new UserValidator(), "小小强", "123456"));
        list.add(new Task(new UserValidator(), "小小小小强", "123456"));

        ThreadPoolExecutor es = (ThreadPoolExecutor) Executors.newFixedThreadPool(2);
        try {
            String s = es.invokeAny(list);//执行给定的任务,如果某个任务已成功完成(也就是未抛出异常),则返回其结果。一旦正常或异常返回后,则取消尚未完成的任务。如果此操作正在进行时修改了给定的 collection,则此方法的结果是不确定的。
            System.out.println(s + "   Main——用户验证通过");
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
        es.shutdown();
    }
}

// 用户验证对象,用来模拟在数据库中查询的过程
class UserValidator{
    public boolean validator(String name,String password){
        long  time = (long)(Math.random() * 10);
        try {
            TimeUnit.SECONDS.sleep(time);
            System.out.println("耗时=========" + Thread.currentThread().getName() + " 耗时:" + time);
        } catch (InterruptedException e) {
            System.out.println("取消=========" +Thread.currentThread().getName()+ " 该任务被中断");
            return false;
        }
//        return new Random().nextBoolean();  //返回一个随机值。表示是否验证通过的标识
        return true;  //始终返回true。用来验证,第一个被找到之后,后面的线程是否会被取消任务
    }
}

class Task implements Callable<String>{
    private UserValidator uv;
    private String name;
    private String password;

    public Task(UserValidator uv, String name, String password) {
        this.uv = uv;
        this.name = name;
        this.password = password;
    }

    @Override
    public String call() throws Exception {
        if (!uv.validator(name,password)){ //没有找到,并抛出一个异常
            System.out.println("没有找到用户========="  + "Task:" + Thread.currentThread().getName()  + "   " + name);
            throw new Exception("没有找到该用户:" + name);
        }
        System.out.println("找到用户========="  + "Task:" + Thread.currentThread().getName()  + "   " + name);
        return name;
    }
}

某一次运行结果:

耗时=========pool-1-thread-2 耗时:1
找到用户=========Task:pool-1-thread-2   小小强
小小强   Main——用户验证通过
取消=========pool-1-thread-2 该任务被中断
没有找到用户=========Task:pool-1-thread-2   小小小小强
取消=========pool-1-thread-1 该任务被中断
没有找到用户=========Task:pool-1-thread-1   小强

结果说明

1. 可以看到,第一个找到之后,结果就被返回了,并且后面未完成的任务被中断了。

2. 此示例的关键点es.invokeAny(list)

查看如果所有任务都没有返回结果呢,就是都抛出了异常

  修改上面的代码,让所有任务都抛出异常,会得到下面的某一次的运行结果:

耗时=========pool-1-thread-1 耗时:3
没有找到用户=========Task:pool-1-thread-1   小强
耗时=========pool-1-thread-1 耗时:0
没有找到用户=========Task:pool-1-thread-1   小小小小强
耗时=========pool-1-thread-2 耗时:6
java.util.concurrent.ExecutionException: java.lang.Exception: 没有找到该用户:小小强
没有找到用户=========Task:pool-1-thread-2   小小强
    at java.util.concurrent.FutureTask.report(FutureTask.java:122)
    at java.util.concurrent.FutureTask.get(FutureTask.java:192)
    at java.util.concurrent.AbstractExecutorService.doInvokeAny(AbstractExecutorService.java:193)
    at java.util.concurrent.AbstractExecutorService.invokeAny(AbstractExecutorService.java:215)
    at java7Concurrency.sync4_4.Client.main(Client.java:18)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)
Caused by: java.lang.Exception: 没有找到该用户:小小强
    at java7Concurrency.sync4_4.Task.call(Client.java:60)
    at java7Concurrency.sync4_4.Task.call(Client.java:45)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

版权声明:本文为博主原创文章,未经博主允许不得转载。

时间: 2024-10-10 17:00:32

[笔记][Java7并发编程实战手册]4.5 运行多个任务并处理第一个结果ThreadPoolExecutor的相关文章

[笔记][Java7并发编程实战手册]3.2 资源的并发访问控制Semaphore信号量

[笔记][Java7并发编程实战手册]系列目录 简介 本文学习信号量Semaphore机制. Semaphore 本质是一个共享锁 内部维护一个可用的信号集,获取信号量之前需要先申请获取信号数量:用完之后,则需要释放信号量:如果不释放,那么其他等待线程则一直阻塞直到获取信号量或则被中断为止 本人的理解是:互斥锁是同一时间只能一个线程访问,而在这里,是同一时间允许获取到了信号量的线程并发访问,而没有获取到信号量的则必须等待信号量的释放: 将信号量初始化为 1,使得它在使用时最多只有一个可用的许可,

[笔记][Java7并发编程实战手册]4.3 创建固定的线程执行器newFixedThreadPool线程池

[笔记][Java7并发编程实战手册]系列目录 简介 newFixedThreadPool(int nThreads, ThreadFactory threadFactory) 创建一个可重用固定线程数的线程池,以共享的无界队列方式来运行这些线程,在需要时使用提供的 ThreadFactory 创建新线程. newCachedThreadPool()创建的线程池的特性是:自动回收不使用的线程(终止并从缓存中移除那些已有 60 秒钟未被使用的线程),(在无可用线程的情况下)自动的为新来的task创

[笔记][Java7并发编程实战手册]3.3 资源的多副本并发访问控制Semaphore

[笔记][Java7并发编程实战手册]系列目录 简介 本文继续学习信号量Semaphore机制. 在3.2中其实已经讲解完了,之前对于信号量并发的使用场景不知道,看了本章节才想到一些: 下面就以 租车为列子来讲解并发访问的控制.(示例都很简单或许不符合现实逻辑) 信号量(非二进制信号量)是不保证同步的,需要额外的同步 示例 场景:有一个出租车公司,有三台车,有十个司机,每个司机工作的时间不一致,可以说是司机等待着别人还车后,接着租用汽车. /** * Created by zhuqiang on

[笔记][Java7并发编程实战手册]2.5使用Lock实现同步二

[笔记][Java7并发编程实战手册]系列目录 概要 接上一篇文章,练习修改锁的公平性,和在所中使用条件. 修改锁的公平性ReentrantLock /** *构造一个锁对象,默认为非公平锁 */ public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); } 根据ReentrantLock的构造可以看出来,默认会构造非公平锁: 公平锁与非公平锁有什么区别 公平锁 :有多个线程并发访

[笔记][Java7并发编程实战手册]系列目录

Java7并发编程实战手册 这一本实战的书籍.本笔记记录是看了该书.并且简化了书中的示例.的一些随笔记录 我觉得能给我更好的感觉.我觉得先看博客中转载的多线程系列 Java多线程系列-目录源码分析和理论.有时候真的觉得好烦躁.可是,没有这些理论实战中又觉得太多的未知. 所以本人觉得.先粗略的过一遍理论和源码分析.再来看学习实战,在写代码的过程中,去回想和联想理论就能更好的把知识串联起来了: 也可以看到本人在记录这些笔记的时候也会引用到博客中转载的多线程系列文章. [笔记][Java7并发编程实战

[笔记][Java7并发编程实战手册]第三章-线程同步辅助类-概要

[笔记][Java7并发编程实战手册]系列目录 有点着急了,没有太注重质量,自己也没有理解透,从本章起,读书和随笔笔记的质量会更好. 第三章 在本章中,我们将学习: 资源的并发访问控制 资源的多副本的并发访问控制 等待多个并发事件的完成 在集合点的同步 并发阶段任务的运行 并发阶段任务中的阶段交换 并发任务间的数据交换 回顾 在第二章中主要学习了以下接口 synchronized关键字 Lock接口以及实现类,如ReentrantLock.ReentrantReadWriteLock中的Read

[笔记][Java7并发编程实战手册]4.4 在执行器中执行任务并返回结果Callable、Future

[笔记][Java7并发编程实战手册]系列目录 简介 执行框架(Executor Framework)的优势之一就是,可以在运行并发任务的时候返回结果.但是需要以下两个类来实现功能: 1. 接口 Callable<V> 返回结果并且可能抛出异常的任务.实现者定义了一个不带任何参数的叫做 call 的方法. Callable 接口类似于 Runnable,两者都是为那些其实例可能被另一个线程执行的类设计的.但是 Runnable 不会返回结果,并且无法抛出经过检查的异常. Executors 类

[笔记][Java7并发编程实战手册]2.4在同步代码中使用条件-生产者与消费者

说明 在并发编程中一个典型的问题是生产者–消费者问题.在程序中,有可能会需要用到两个线程通信的情况,比如生产者消费者中,获取一个共享数据,有就消费.没有就等待着生产者生产之后再继续消费.那么这个实现过程就可以使用wait();notify();notifyAll()来达到效果: 以上方法详细解说请查看: Java多线程系列–"基础篇"05之 线程等待与唤醒 例子 /** * Created by zhuqiang on 2015/8/8 0008. */ public class Cl

[笔记][Java7并发编程实战手册]2.2使用syncronized实现同步方法

学习多线程之前,我觉得很有必要去学习下 [笔记][思维导图]读深入理解JAVA内存模型整理的思维导图 基础知识 锁除了让临界区互斥执行外, 还可以让释放锁的线程向获取同一个锁的线程发送消息 当线程获取锁时,JMM会把该线程对应的本地内存置为无效. 从而使得监视器保护的临界区代码必须要从主内存中去读取共享变量. 当线程释放锁时,JMM会把该线程对应的本地内存中的共享变量刷新到主内存中. 加锁使用的是同一个锁的话,使用syncronized的静态方法和普通方法需要注意的是:静态方法:同时只能被一个线