JAVA的Executor框架

Executor框架分离了任务的创建和执行。JAVA SE5的java.util.concurrent包中的执行器(Executor)管理Thread对象,从而简化了并发编程。Executor引入了一些功能类来管理和使用线程Thread,其中包括线程 池,Executor,Executors,ExecutorService,CompletionService,Future,Callable等.

1.Executor接口

public interface Executor{   void execute(Runnable task);};Executor接口是Executor框架中最基础的部分,定义了一个用于执行Runnable的execute方法。它没有直接的实现类,有一个重要的子接口ExecutorService.执行已提交的Runnable任务的对象。此接口提供一种将任务提交与每个任务将如何运行的机制(包括线程使用的细节、调度等)分离开来的方法。通常使用  Executor而不是显式地创建线程。例如,可能会使用以下方法,而不是为一组任务中的每个任务调用 new Thread(new(RunnableTask())).start()
 Executor executor = anExecutor;
 executor.execute(new RunnableTask1());
 executor.execute(new RunnableTask2());
 ...
内存一致性效果:线程中将Runnable 对象提交到 Executor之前的操作 happen-before其执行开始(可能在另一个线程中)。

2.ExecutorService
public interface ExecutorService extends Executor {
    void shutdown();//关闭方法,调用后执行之前提交的任务,不再接受新的任务
    List<Runnable> shutdownNow();//从语义上可以看出是立即停止的意思,将暂停所有等待处理的任务并返回这些任务的列表
    boolean isShutdown();// 判断执行器是否已经关闭
    boolean isTerminated();//关闭后所有任务是否都已完成
    boolean awaitTermination(long timeout, TimeUnit unit)
            throws InterruptedException;
    <T> Future<T> submit(Callable<T> task);//提交一个Callable任务
    <T> Future<T> submit(Runnable task, T result);//提交一个Runable任务,result要返回的结果
    Future<?> submit(Runnable task);//提交一个任务
    /**
     *执行所有给定的任务,当所有任务完成,返回保持任务状态和结果的Future列表
     */
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
            throws InterruptedException;
    /**
     *执行给定的任务,当所有任务完成或超时期满时(无论哪个首先发生),返回保持任务状态和结果的 Future 列表。
     */
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
            long timeout, TimeUnit unit) throws InterruptedException;
    /**
     *执行给定的任务,如果某个任务已成功完成(也就是未抛出异常),则返回其结果。
     */
    <T> T invokeAny(Collection<? extends Callable<T>> tasks)
            throws InterruptedException, ExecutionException;
    /**
     *执行给定的任务,如果在给定的超时期满前某个任务已成功完成(也就是未抛出异常),则返回其结果.
     */
    <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout,
            TimeUnit unit) throws InterruptedException, ExecutionException,
            TimeoutException;
}
ExecutorService接口继承自Executor接口,定义了终止、提交任务、跟踪任务返回结果等方法。

ExecutorService涉及到Runnable、Callable、Future接口,这些接口的具体内容如下。
 1 // 实现Runnable接口的类将被Thread执行,表示一个基本的任务
 2 public interface Runnable {
 3     // run方法就是它所有的内容,就是实际执行的任务
 4     public abstract void run();
 5 }
 6 // Callable同样是任务,与Runnable接口的区别在于它接收泛型,同时它执行任务后带有返回内容
 7 public interface Callable<V> {
 8     // 相对于run方法的带有返回值的call方法
 9     V call() throws Exception;
10 }

3.Future:代表异步任务执行的结果

public interface Future<V> {
    //尝试取消一个任务,如果这个任务不能被取消(通常是因为已经执行完了),返回false,否则返回true。
    boolean cancel(boolean mayInterruptIfRunning);
    // 返回代表的任务是否在完成之前被取消了
    boolean isCancelled();
    //如果任务已经完成,返回true
    boolean isDone();
    //获取异步任务的执行结果(如果任务没执行完将等待
    V get() throws InterruptedException, ExecutionException;
    //获取异步任务的执行结果(有最常等待时间的限制) 26 * 27 * timeout表示等待的时间,unit是它时间单位 28
    V get(long timeout, TimeUnit unit) throws InterruptedException,
            ExecutionException, TimeoutException;
}

4.ScheduledExecutorService

//可以安排指定时间或周期性的执行任务的ExecutorService
public interface ScheduledExecutorService extends ExecutorService {
    //在指定延迟后执行一个任务,只执行一次
    public ScheduledFuture<?> schedule(Runnable command, long delay,TimeUnit unit);
    //与上面的方法相同,只是接受的是Callable任务
    public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay,
            TimeUnit unit);
    //创建并执行一个周期性的任务,在initialDelay延迟后每间隔period个单位执行一次,时间单位都是unit
    // 每次执行任务的时间点是initialDelay, initialDelay+period, initialDelay + 2 * period
    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
            long initialDelay, long period, TimeUnit unit);
    // 创建并执行一个周期性的任务,在initialDelay延迟后开始执行,在执行结束后再延迟delay个单位开始执行下一次任务,
    // 时间单位都是unit.每次执行任务的时间点是initialDelay, initialDelay+(任务运行时间+delay),
    // initialDelay + 2 * (任务运行时间+delay)...
    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
            long initialDelay, long delay, TimeUnit unit);
}

 5.AbstractExecutorService

//提供ExecutorService的默认实现
public abstract class AbstractExecutorService implements ExecutorService {
    //为指定的Runnable和value构造一个FutureTask, value表示默认被返回的Future
    protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
        return new FutureTask<T>(runnable, value);
    }
    //为指定的Callable创建一个FutureTask
    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
        return new FutureTask<T>(callable);
    }
    // 提交Runnable任务
    public Future<?> submit(Runnable task) {
        if (task == null)
            throw new NullPointerException();
        // 通过newTaskFor方法构造RunnableFuture,默认的返回值是null
        RunnableFuture<Object> ftask = newTaskFor(task, null);
        // 调用具体实现的execute方法
        execute(ftask);
        return ftask;
    }
    // 提交Runnable任务
    public <T> Future<T> submit(Runnable task, T result) {
        if (task == null)
            throw new NullPointerException();
        //通过newTaskFor方法构造RunnableFuture,默认的返回值是result
        RunnableFuture<T> ftask = newTaskFor(task, result);
        execute(ftask);
        return ftask;
    }
    //提交Callable任务
    public <T> Future<T> submit(Callable<T> task) {
        if (task == null)
            throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
    }
    // doInvokeAny的具体实现(核心内容),其它几个方法都是重载方法,都对这个方法进行调用
    // tasks是被执行的任务集,timed标志是否定时的,nanos表示定时的情况下执行任务的限制时间
    private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,boolean timed,long nanos)
         throws InterruptedException, ExecutionException, TimeoutException {
         // tasks空判断
         if (tasks==null)
             throw new NullPointerException();
        // 任务数量
         int ntasks=tasks.size();
         if (ntasks == 0)
             throw new IllegalArgumentException();
         // 创建对应数量的Future返回集
         List<Future<T>> futures= new ArrayList<Future<T>>(ntasks);
         ExecutorCompletionService<T> ecs =new ExecutorCompletionService<T>(this);
         try {
             // 执行异常
             ExecutionException ee = null;
             // System.nanoTime()根据系统计时器当回当前的纳秒值
             long lastTime = (timed)? System.nanoTime(): 0;
             // 获取任务集的遍历器
             Iterator<? extends Callable<T>> it = tasks.iterator();
             // 向执行器ExecutorCompletionService提交一个任务,并将结果加入futures中
             futures.add(ecs.submit(it.next0));
             // 修改任务计数器
             --ntasks;
             // 活跃任务计数器
             int active =1;
             for (;;) {
                 // 获取并移除代表已完成任务的Future,如果不存在,返回null
                 Future<T> f = ecs.poll();
                 if (f == null) {
                     // 没有任务完成,且任务集中还有未提交的任务
                     if (ntasks > 0) {
                         // 剩余任务计数器减
                         --ntasks;
                         // 提交任务并添加结果
                         futures.add(ecs.submit(it.next()));
                         // 活跃任务计数器加
                         ++active;
                     }
                     // 没有剩余任务,且没有活跃任务(所有任务可能都会取消),跳过这一次循环
                     else if (active==0)
                         break;
                     else if (timed) {
                         // 获取并移除代表已完成任务的Future,如果不存在,会等待nanos指定的纳秒数
                         f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
                         if (f == null)
                             throw new TimeoutException();
                         // 计算剩余可用时间
                         long now = System.nanoTime();
                         nanos -= now - lastTime;
                         lastTime = now;
                     }
                     else
                         // 获取并移除表示下一个已完成任务的未来,等待,如果目前不存在。
                         // 执行到这一步说明已经没有任务任务可以提交,只能等待某一个任务的返回
                         f = ecs.take();
                 }
                 // f不为空说明有一个任务完成了
                 if (f != null) {
                     // 已完成一个任务,所以活跃任务计数减
                     --active;
                     try {
                         // 返回该任务的结果
                         return f.get();
                     } catch (InterruptedException ie) {
                         throw ie;
                     } catch (ExecutionException eex) {
                         ee = eex;
                     } catch (RuntimeException rex) {
                         ee = new ExecutionException(rex);
                     }
                 }
             }
             // 如果没有成功返回结果则抛出异常
             if (ee == null)
                 ee = new ExecutionException();
             throw ee;
         } finally {
             // 无论执行中发生异常还是顺利结束,都将取消剩余未执行的任务
             for (Future<T> f : futures)
                 f.cancel(true);
         }
     }
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
         throws InterruptedException, ExecutionException {
         try {
             // 非定时任务的doInvokeAny调用
             return doInvokeAny(tasks, false,0);
         } catch (TimeoutException cannotHappen) {
             assert false;
             return null;
         }
     }
    // 定时任务的invokeAny调用,timeout表示超时时间,unit表示时间单位
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks,long timeout,TimeUnit unit)
                    throws InterruptedException,
            ExecutionException, TimeoutException {
        return doInvokeAny(tasks, true, unit.toNanos(timeout));
    }
    // 无超时设置的invokeAll方法
    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
            throws InterruptedException {
        // 空任务判断
        if (tasks == null)
            throw new NullPointerException();
        // 创建大小为任务数量的结果集
        List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
        // 是否完成所有任务的标记
        boolean done = false;
        try {
            // 遍历并执行任务
            for (Callable<T> t : tasks) {
                RunnableFuture<T> f = newTaskFor(t);
                futures.add(f);
                execute(f);
            }
            // 遍历结果集
            for (Future<T> f : futures) {
                // 如果某个任务没完成,通过f调用get()方法
                if (!f.isDone()) {
                    try {
                        // get方法等待计算完成,然后获取结果(会等待)。所以调用get后任务就会完成计算,否则会等待
                        f.get();
                    } catch (CancellationException ignore) {
                    } catch (ExecutionException ignore) {
                    }
                }
            }
            // 标志所有任务执行完成
            done = true;
            // 返回结果
            return futures;
        } finally {
            // 假如没有完成所有任务(可能是发生异常等情况),将任务取消
            if (!done)
                for (Future<T> f : futures)
                    f.cancel(true);
        }
    }
    // 超时设置的invokeAll方法
    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                          long timeout, TimeUnit unit)
         throws InterruptedException {
         // 需要执行的任务集为空或时间单位为空,抛出异常
         if (tasks == null || unit == null)
             throw new NullPointerException();
         // 将超时时间转为纳秒单位
         long nanos = unit.toNanos(timeout);
         // 创建任务结果集
         List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
         // 是否全部完成的标志
         boolean done = false;
         try {
             // 遍历tasks,将任务转为RunnableFuture
             for (Callable<T> t : tasks)
                 futures.add(newTaskFor(t));
             // 记录当前时间(单位是纳秒)
             long lastTime = System.nanoTime();
             // 获取迭代器
             Iterator<Future<T>> it = futures.iterator();
             // 遍历
             while (it.hasNext()) {
                 // 执行任务
                 execute((Runnable)(it.next()));
                 // 记录当前时间
                 long now = System.nanoTime();
                 // 计算剩余可用时间
                 nanos -= now - lastTime;
                 // 更新上一次执行时间
                 lastTime = now;
                 // 超时,返回保存任务状态的结果集
                 if (nanos <= 0)
                     return futures;
             }

             for (Future<T> f : futures) {
                 // 如果有任务没完成
                 if (!f.isDone()) {
                     // 时间已经用完,返回保存任务状态的结果集
                     if (nanos <= 0)
                         return futures;
                     try {
                         // 获取计算结果,最多等待给定的时间nanos,单位是纳秒
                         f.get(nanos, TimeUnit.NANOSECONDS);
                     } catch (CancellationException ignore) {
                     } catch (ExecutionException ignore) {
                     } catch (TimeoutException toe) {
                         return futures;
                     }
                     // 计算可用时间
                     long now = System.nanoTime();
                     nanos -= now - lastTime;
                     lastTime = now;
                 }
             }
             // 修改是否全部完成的标记
             done = true;
             // 返回结果集
             return futures;
         } finally {
             // 假如没有完成所有任务(可能是时间已经用完、发生异常等情况),将任务取消
             if (!done)
                 for (Future<T> f : futures)
                     f.cancel(true);
         }
     }
}
时间: 2024-08-24 21:36:45

JAVA的Executor框架的相关文章

Java的Executor框架和线程池实现原理

一,Java的Executor框架 1,Executor接口 public interface Executor { void execute(Runnable command); } Executor接口是Executor框架中最基础的部分,定义了一个用于执行Runnable的execute方法,它没有实现类只有另一个重要的子接口ExecutorService 2,ExecutorService接口 //继承自Executor接口 public interface ExecutorServic

Java多线程—Executor框架概述

1. 任务Task相关的接口与类 1.1 Runnable 表示一个可被执行的命令,通常用于在不同线程中执行任务. package java.lang; public interface Runnable { public void run(); } 1.2 Callable<V> 表示一个有返回结果的任务 package java.util.concurrent; public interface Callable<V> { V call() throws Exception;

Java并发Executor框架

1 Executor框架简介 从JDK5开始,工作单元和执行机制隔离开来,工作单元包括Runnable和Callable,执行机制由Executor提供. 调用关系:Java线程一对一映射到本地操作系统的系统线程,当多线程程序分解若干任务,使用用户级的调度器(Executor框架)将任务映射为固定数量的线程,底层,操作系统吧.内核将这些线程映射到硬件处理器上. 2.EXecutor结构成员 Executor是一个接口,它将任务的提交与任务的执行分离开来. ThreadPoolExecutor是线

Java Executor 框架

Java Executor 框架 Executor框架是指java5中引入的一系列并发库中与executor相关的功能类,包括Executor.Executors. ExecutorService.CompletionService.Future.Callable等.(图片引用自 http://www.javaclubcn.com/a/jichuzhishi/2012/1116/170.html) 本篇博文分析Executor中几个比较重要的接口和类. Executor 1 public int

JAVA 1.5 并发之 Executor框架 (二)execute VS submit

http://www.cnblogs.com/rockman12352/p/3788688.html 上一篇对于整体框架讲了很多东西,但是具体在使用时有一些细节并没有说出来 首先是执行任务 execute(); 执行任务,返回空,相当于 new Thread(task).start(); submit();   执行任务,但是会返回一个future<T>,就是计算好的结果,如果没有计算好则会阻塞,还有一个好处是可以管理exception public static void main(Stri

Java并发和多线程(二)Executor框架

Executor框架 1.Task?Thread? 很多人在学习多线程这部分知识的时候,容易搞混两个概念:任务(task)和线程(thread). 并发编程可以使我们的程序可以划分为多个分离的.独立运行的任务.而这些任务具体得由线程来驱动.Java中,Thread类自身不执行任何操作,它只是驱动赋予它的任务,任务由Runnable接口提供. 2.executor Executor是个简单的接口,但它却提供了一种标准的方法将任务的提交过程与任务的执行过程解耦开来,从而无须太大困难就可以为某种类型的

Java并发(基础知识)—— Executor框架及线程池

在Java并发(基础知识)—— 创建.运行以及停止一个线程中讲解了两种创建线程的方式:直接继承Thread类以及实现Runnable接口并赋给Thread,这两种创建线程的方式在线程比较少的时候是没有问题的,但是当需要创建大量线程时就会出现问题,因为这种使用方法把线程创建语句随意地散落在代码中,无法统一管理线程,我们将无法管理创建线程的数量,而过量的线程创建将直接使系统崩溃. 从高内聚角度讲,我们应该创建一个统一的创建以及运行接口,为我们管理这些线程,这个统一的创建与运行接口就是JDK 5的Ex

Java并发编程系列之十五:Executor框架

Java使用线程完成异步任务是很普遍的事,而线程的创建与销毁需要一定的开销,如果每个任务都需要创建一个线程将会消耗大量的计算资源,JDK 5之后把工作单元和执行机制区分开了,工作单元包括Runnable和Callable,而执行机制则由Executor框架提供.Executor框架为线程的启动.执行和关闭提供了便利,底层使用线程池实现.使用Executor框架管理线程的好处在于简化管理.提高效率,还能避免this逃逸问题--是指不完整的对象被线程调用. Executor框架使用了两级调度模型进行

转:【Java并发编程】之十九:并发新特性—Executor框架与线程池(含代码)

  Executor框架简介 在Java5之后,并发编程引入了一堆新的启动.调度和管理线程的API.Executor框架便是Java 5中引入的,其内部使用了线程池机制,它在java.util.cocurrent 包下,通过该框架来控制线程的启动.执行和关闭,可以简化并发编程的操作.因此,在Java 5之后,通过Executor来启动线程比使用Thread的start方法更好,除了更易管理,效率更好(用线程池实现,节约开销)外,还有关键的一点:有助于避免this逃逸问题--如果我们在构造器中启动