Callable+Future+newFixedThreadPool的应用

  最近在处理很多的数据,数据量比较大,但是处理的相对简单一些,没有什么复杂的业务逻辑,然后就使用了多线程去处理。因为一直停留在Thread和Runnable的知识中,项目中使用Callable,刚好可以学习新的东西,就使用了Callable和Future结合加上Executors.newFixedThreadPool()。

一、Callable和Future基础知识

  Thread和Runnable这2个很多人都知道并且使用过,可能Callable相对陌生一些,future应该更加陌生,他们2个一个生成结果一个接受结果。Thread和Runnable实现的线程不会返回结果,Callable相对特殊一些,他会返回结果,这个结果可以被Future拿到,也就是说,Future可以拿到异步执行任务的结果。我们先看一下Callable类:

package java.util.concurrent;

/**
 *一个带有返回结果并可能引发异常的任务.实现定义了一个没有调用参数
 *的方法call。
 *Callable接口类似于{@link java.lang.Runnable},因为它们都是为其
 *实例可能被另一个线程执行的类设计的。然而,Runnable不返回结果,
 *也不能抛出被检查的异常。{@link Executors}类包含从其他常用形式
 *转换为Callable类的实用程序方法。
  */
public interface Callable<V> {

    /**
    * 计算一个结果,如果不能这样做,就会抛出一个异常。.
    *
    * @return 结算结果
    * @throws 如果无法计算结果,则抛出异常
    */
    V call() throws Exception;
}

  Future表示一个任务的生命周期,并提供了方法来判断是否已经完成或取消,以及获取任务的结果和取消任务等。Future接口:

public interface Future<V> {

    /**
    * 尝试取消执行此任务。 如果任务已经完成,已经被取消或由于某种其他原因而无法取消,则此尝试将失败。
    * 如果成功,并且调用<tt> cancel </ tt>时,此任务尚未启动,则此任务不应运行。 如果任务已经开始,
    * 那么<tt> mayInterruptIfRunning </ tt>参数决定了执行该任务的线程是否应该被中断以试图停止该任务。
    */
    boolean cancel( boolean mayInterruptIfRunning );

    /**
     * 如果此任务在正常完成之前已被取消,返回true
     * @return <tt>true</tt> 如果此任务在正常完成之前已被取消,返回true
     */
    boolean isCancelled();

    /**
     * 如果任务已经完成返回true
     * 完成可能是由于正常终止,异常或异常 - 在所有这些情况下,此方法都将返回<tt> true </ tt>。
     * @return <tt>true</tt> 如果任务已经完成返回true
     */
    boolean isDone();

    /**
     * 等待计算完成,然后获得其结果。
     *
     */
    V get() throws InterruptedException, ExecutionException;

    /**
     * 如果需要等待最多在给定的时间计算完成,然后检索其结果(如果可用)。
     *
     */
    V get( long timeout, TimeUnit unit )
    throws InterruptedException, ExecutionException, TimeoutException;
}

二、线程池之固定线程池newFixedThreadPool

  创建一个可重用固定线程数的线程池,以共享的无界队列方式来运行这些线程。在任意点,在大多数 nThreads 线程会处于处理任务的活动状态。如果在所有线程处于活动状态时提交附加任务,则在有可用线程之前,附加任务将在队列中等待。如果在关闭前的执行期间由于失败而导致任何线程终止,那么一个新线程将代替它执行后续的任务(如果需要)。在某个线程被显式地关闭之前,池中的线程将一直存在。Executors.newFixedThreadPool(10)的实现如下:

public static ExecutorService newFixedThreadPool( int nThreads )
{
    return(new ThreadPoolExecutor( nThreads, nThreads,
                       0L, TimeUnit.MILLISECONDS,
                       new LinkedBlockingQueue<Runnable>() ) );
}

  nThreads :是固定线程数,在了解newFixedThreadPool之前我们先了解一下ThreadPoolExecutor,ThreadPoolExecutor作为java.util.concurrent包对外提供基础实现,以内部线程池的形式对外提供管理任务执行,线程调度,线程池管理等等服务;Executors方法提供的线程服务,都是通过参数设置来实现不同的线程池机制。ThreadPoolExecutor的构造方法如下:

public ThreadPoolExecutor( int corePoolSize,
               int maximumPoolSize,
               long keepAliveTime,
               TimeUnit unit,
               BlockingQueue<Runnable> workQueue )
{
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         Executors.defaultThreadFactory(), defaultHandler);
}
public ThreadPoolExecutor( int corePoolSize,
               int maximumPoolSize,
               long keepAliveTime,
               TimeUnit unit,
               BlockingQueue<Runnable> workQueue,
               ThreadFactory threadFactory,
               RejectedExecutionHandler handler )
{
    if ( corePoolSize < 0 ||
         maximumPoolSize <= 0 ||
         maximumPoolSize < corePoolSize ||
         keepAliveTime < 0 )
        throw new IllegalArgumentException();
    if ( workQueue == null || threadFactory == null || handler == null )
        throw new NullPointerException();
    this.corePoolSize    = corePoolSize;
    this.maximumPoolSize    = maximumPoolSize;
    this.workQueue        = workQueue;
    this.keepAliveTime    = unit.toNanos( keepAliveTime );
    this.threadFactory    = threadFactory;
    this.handler        = handler;
}

ThreadPoolExecutor构造方法参数讲解:

ThreadPoolExecutor构造方法参数
参数名 作用
corePoolSize 核心线程池大小
maximumPoolSize 最大线程池大小
keepAliveTime 线程池中超过corePoolSize数目的空闲线程最大存活时间;可以allowCoreThreadTimeOut(true)使得核心线程有效时间
TimeUnit keepAliveTime时间单位
workQueue 阻塞任务队列
threadFactory 新建线程工厂
RejectedExecutionHandler 当提交任务数超过maxmumPoolSize+workQueue之和时,任务会交给RejectedExecutionHandler来处理

  这样我们在回过头来看newFixedThreadPool的实现,核心线程池大小和最大线程池大小都是传入进去的数字,keepAliveTime为0,时间单位为TimeUnit.MILLISECONDS毫秒,对列为LinkedBlockingQueue,线程池工厂为默认,RejectedExecutionHandler为默认。这样我们就知道newFixedThreadPool的代码实现了。关于ThreadPoolExecutor自己的构建请自行了解。
三、实际应用

package com.roc.thread;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class FixedThreadPoolTest {
    public static void main( String[] args )
    {
        FixedThreadPoolTest fixedThreadPoolTest = new FixedThreadPoolTest();
        fixedThreadPoolTest.execut();
    }

    private void execut()
    {
        ExecutorService        executorService = Executors.newFixedThreadPool( 10 );/* 创建为10个线程的固定线程池 */
        List<Integer>        datas        = new ArrayList<Integer>( 100 );
        List<Future<Integer> >    results        = new ArrayList<Future<Integer> >();
        int            count        = 0;
        for ( int i = 0; i < 100; i++ )/* 实际项目中数据可以查数据库或者文件,这里仅仅表示模拟 */
        {
            datas.add( i );
        }
        for ( int i = 0; i < datas.size(); i++ )
        {
            results.add( executorService.submit( new executTask( datas.get( i ) ) ) );
        }
        try {
            for ( Future<Integer> future : results )
            {
                count += future.get();
            }
        } catch ( InterruptedException e ) {
            /* TODO Auto-generated catch block */
            e.printStackTrace();
        } catch ( ExecutionException e ) {
            /* TODO Auto-generated catch block */
            e.printStackTrace();
        }
        System.out.println( Thread.currentThread() + "处理数据总数:" + count );
    }

    class executTask implements Callable<Integer> {
        private int data;

        public executTask( int data )
        {
            this.data = data;
        }

        @Override
        public Integer call() throws Exception
        {
            try {
                System.out.println( Thread.currentThread() + "处理完数据:" + data ); /* 实际项目中这里可以处理业务逻辑 */
            } catch ( Exception e ) {
                return(-1);
            }
            return(1);
        }
    }
}            

结果:

Thread[pool-1-thread-10,5,main]处理完数据:9
Thread[pool-1-thread-4,5,main]处理完数据:97
Thread[pool-1-thread-9,5,main]处理完数据:95
Thread[pool-1-thread-3,5,main]处理完数据:96
Thread[pool-1-thread-7,5,main]处理完数据:94
Thread[pool-1-thread-4,5,main]处理完数据:98
Thread[pool-1-thread-5,5,main]处理完数据:99
Thread[main,5,main]处理数据总数:100

时间: 2024-12-28 01:37:23

Callable+Future+newFixedThreadPool的应用的相关文章

Callable/Future

Callable 和 Future 是比较有趣的一对组合.当我们需要获取线程的执行结果时,就需要用到它们.Callable用于产生结果,Future用于获取结果. 第1部分 Callable Callable 是一个接口,它只包含一个call()方法.Callable是一个返回结果并且可能抛出异常的任务. 为了便于理解,我们可以将Callable比作一个Runnable接口,而Callable的call()方法则类似于Runnable的run()方法. Callable的源码如下: public

java 并发runable,callable,future,futureTask

转载自:http://www.cnblogs.com/dolphin0520/p/3949310.html package future_call; import java.util.concurrent.Callable; /** * Created by luozhitao on 2017/8/10. */ public class Task implements Callable<Integer> { // @Override public Integer call() throws E

12 Callable &amp; Future &amp; FutureTask

创建线程的2种方式,一种是直接继承Thread,另外一种就是实现Runnable接口. 这2种方式都有一个缺陷就是:在执行完任务之后无法获取执行结果. 如果需要获取执行结果,就必须通过共享变量或者使用线程通信的方式来达到效果,这样使用起来就比较麻烦. 而自从Java 1.5开始,就提供了Callable和Future,通过它们可以在任务执行完毕之后得到任务执行结果. 今天我们就来讨论一下Callable.Future和FutureTask三个类的使用方法. 1 Callable与Runnable

并发实现-Callable/Future 实现返回值控制的线程

package chartone; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; /** * 并发实现-Callable/Future 实现返回值控制的线程 * * @author suntao * @history 2020-0

多线程编程(六)--Callable&amp;Future

Thread类和Runnable接口和Java内存管理模型使得多线程编程简单直接.但是Thread类和Runnable接口都不允许声明检查型异常,也不能定义返回值. Callable接口和Future接口的引入以及它们对线程池的支持优雅的解决了这两个问题. Callable接口类似于Runnable接口,Callable接口被线程执行后,可以返回值,这个返回值可以被Future拿到,也就是说,Future可以拿到异步执行任务的返回值.Future取得的结果类型和Callable返回的结果类型必须

多线程-Callable&amp;Future

Callable和Future出现的原因 创建线程的2种方式,一种是直接继承Thread,另外一种就是实现Runnable接口. 这2种方式都有一个缺陷就是:在执行完任务之后无法获取执行结果. 如果需要获取执行结果,就必须通过共享变量或者使用线程通信的方式来达到效果,这样使用起来就比较麻烦. 而自从Java 1.5开始,就提供了Callable和Future,通过它们可以在任务执行完毕之后得到任务执行结果. Callable和Future介绍 Callable接口代表一段可以调用并返回结果的代码

Java 并发编程——Callable+Future+FutureTask

项目中经常有些任务需要异步(提交到线程池中)去执行,而主线程往往需要知道异步执行产生的结果,这时我们要怎么做呢?用runnable是无法实现的,我们需要用callable实现. import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Ex

Java线程池(Callable+Future模式)

Java通过Executors提供四种线程池 1)newCachedThreadPool创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程. 2)newFixedThreadPool 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待. 3)newScheduledThreadPool 创建一个定长线程池,支持定时及周期性任务执行. 4)newSingleThreadExecutor 创建一个单线程化的线程池,它只会用唯一的工作线程来执行

java.util.concuttent Callable Future详解

在传统的多线程实现方式中(继承Thread和实现Runnable)无法直接获取线程执行的返回结果,如果需要获取执行结果,就必须通过共享变量或者使用线程通信的方式来达到效果,这样使用起来就比较麻烦. 从Java 1.5开始,java.util.concurrent包中提供了 Callable和 Future两个接口,通过它们就可以在任务执行完毕之后得到任务执行结果. Callable Callable与Runnable的功能大致相似,Callable中有一个call()函数,但是call()函数有