深入学习Java线程池

在前面的例子中,我们都是通过new Thread来创建一个线程,由于线程的创建和销毁都需要消耗一定的CPU资源,所以在高并发下这种创建线程的方式将严重影响代码执行效率。而线程池的作用就是让一个线程执行结束后不马上销毁,继续执行新的任务,这样就节省了不断创建线程和销毁线程的开销。

ThreadPoolExecutor

创建Java线程池最为核心的类为ThreadPoolExecutor

它提供了四种构造函数来创建线程池,其中最为核心的构造函数如下所示:

1234567
public ThreadPoolExecutor(int corePoolSize,                              int maximumPoolSize,                              long keepAliveTime,                              TimeUnit unit,                              BlockingQueue<Runnable> workQueue,                              ThreadFactory threadFactory,                              RejectedExecutionHandler handler)

这7个参数的含义如下:

  1. corePoolSize 线程池核心线程数。即线程池中保留的线程个数,即使这些线程是空闲的,也不会被销毁,除非通过ThreadPoolExecutor的allowCoreThreadTimeOut(true)方法开启了核心线程的超时策略;
  2. maximumPoolSize 线程池中允许的最大线程个数;
  3. keepAliveTime 用于设置那些超出核心线程数量的线程的最大等待时间,超过这个时间还没有新任务的话,超出的线程将被销毁;
  4. unit 超时时间单位;
  5. workQueue 线程队列。用于保存通过execute方法提交的,等待被执行的任务;
  6. threadFactory 线程创建工程,即指定怎样创建线程;
  7. handler 拒绝策略。即指定当线程提交的数量超出了maximumPoolSize后,该使用什么策略处理超出的线程。

在通过这个构造方法创建线程池的时候,这几个参数必须满足以下条件,否则将抛出IllegalArgumentException异常:

  1. corePoolSize不能小于0;
  2. keepAliveTime不能小于0;
  3. maximumPoolSize 不能小于等于0;
  4. maximumPoolSize不能小于corePoolSize;

此外,workQueue、threadFactory和handler不能为null,否则将抛出空指针异常。

下面举些例子来深入理解这几个参数的含义。

使用上面的构造方法创建一个线程池:

123456789101112131415161718192021
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(                1, 2, 10,                TimeUnit.SECONDS,                new ArrayBlockingQueue<>(1), (ThreadFactory) Thread::new,                new ThreadPoolExecutor.AbortPolicy());System.out.println("线程池创建完毕");

int activeCount = -1;int queueSize = -1;while (true) {    if (activeCount != threadPoolExecutor.getActiveCount()            || queueSize != threadPoolExecutor.getQueue().size()) {        System.out.println("活跃线程个数 " + threadPoolExecutor.getActiveCount());        System.out.println("核心线程个数 " + threadPoolExecutor.getCorePoolSize());        System.out.println("队列线程个数 " + threadPoolExecutor.getQueue().size());        System.out.println("最大线程数 " + threadPoolExecutor.getMaximumPoolSize());        System.out.println("------------------------------------");        activeCount = threadPoolExecutor.getActiveCount();        queueSize = threadPoolExecutor.getQueue().size();    }}

上面的代码创建了一个核心线程数量为1,允许最大线程数量为2,最大活跃时间为10秒,线程队列长度为1的线程池。

假如我们通过execute方法向线程池提交1个任务,看看结果如何:

1234567891011121314151617181920212223
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(                1, 2, 10,                TimeUnit.SECONDS,                new ArrayBlockingQueue<>(1), (ThreadFactory) Thread::new,                new ThreadPoolExecutor.AbortPolicy());System.out.println("线程池创建完毕");

threadPoolExecutor.execute(() -> sleep(100));

int activeCount = -1;int queueSize = -1;while (true) {    if (activeCount != threadPoolExecutor.getActiveCount()            || queueSize != threadPoolExecutor.getQueue().size()) {        System.out.println("活跃线程个数 " + threadPoolExecutor.getActiveCount());        System.out.println("核心线程个数 " + threadPoolExecutor.getCorePoolSize());        System.out.println("队列线程个数 " + threadPoolExecutor.getQueue().size());        System.out.println("最大线程数 " + threadPoolExecutor.getMaximumPoolSize());        System.out.println("------------------------------------");        activeCount = threadPoolExecutor.getActiveCount();        queueSize = threadPoolExecutor.getQueue().size();    }}

ThreadPoolExecutor的execute和submit方法都可以向线程池提交任务,区别是,submit方法能够返回执行结果,返回值类型为Future

sleep方法代码:

12345678
private static void sleep(long value) {    try {        System.out.println(Thread.currentThread().getName() + "线程执行sleep方法");        TimeUnit.SECONDS.sleep(value);    } catch (InterruptedException e) {        e.printStackTrace();    }}

启动程序,控制台输出如下:

线程池核心线程数量为1,通过execute提交了一个任务后,由于核心线程是空闲的,所以任务被执行了。由于这个任务的逻辑是休眠100秒,所以在这100秒内,线程池的活跃线程数量为1。此外,因为提交的任务被核心线程执行了,所以并没有线程需要被放到线程队列里等待,线程队列长度为0。

假如我们通过execute方法向线程池提交2个任务,看看结果如何:

12
threadPoolExecutor.execute(() -> sleep(100));threadPoolExecutor.execute(() -> sleep(100));

线程池核心线程数量为1,通过execute提交了2个任务后,一开始核心线程是空闲的,Thread-0被执行。由于这个任务的逻辑是休眠100秒,所以在这100秒内,线程池的活跃线程数量为1。因为核心线程数量为1,所以另外一个任务在这100秒内不能被执行,于是被放到线程队列里等待,线程队列长度为1。

假如我们通过execute方法向线程池提交3个任务,看看结果如何:

123
threadPoolExecutor.execute(() -> sleep(100));threadPoolExecutor.execute(() -> sleep(100));threadPoolExecutor.execute(() -> sleep(100));

这三个任务都是休眠100秒,所以核心线程池中第一个任务正在被执行,第二个任务被放入到了线程队列。而当第三个任务被提交进来时,线程队列满了(我们定义的长度为1),由于该线程池允许的最大线程数量为2,所以线程池还可以再创建一个线程来执行另外一个任务,于是乎之前在线程队列里的线程被取出执行(FIFO),第三个任务被放入到了线程队列。

改变第二个和第三个任务的睡眠时间,观察输出:

123
threadPoolExecutor.execute(() -> sleep(100));threadPoolExecutor.execute(() -> sleep(5));threadPoolExecutor.execute(() -> sleep(5));

第二个任务提交5秒后,任务执行完毕,所以线程队列里的任务被执行,于是队列线程个数为0,活跃线程数量为2(第一个和第三个任务)。再过5秒后,第三个任务执行完毕,于是活跃线程数量为1(第一个100秒还没执行完毕)。

在第三个任务结束的瞬间,我们观察线程快照:

可以看到,线程池中有两个线程,Thread-0在执行第一个任务(休眠100秒,还没结束),Thread-1执行完第三个任务后并没有马上被销毁。过段时间后(10秒钟后)再观察线程快照:

可以看到,Thread-1这个线程被销毁了,因为我们在创建线程池的时候,指定keepAliveTime 为10秒,10秒后,超出核心线程池线程外的那些线程将被销毁。

假如一次性提交4个任务,看看会怎样:

1234
threadPoolExecutor.execute(() -> sleep(100));threadPoolExecutor.execute(() -> sleep(100));threadPoolExecutor.execute(() -> sleep(100));threadPoolExecutor.execute(() -> sleep(100));

因为我们设置的拒绝策略为AbortPolicy,所以最后提交的那个任务直接被拒绝了。更多拒绝策略下面会介绍到。

关闭线程池

线程池包含以下几个状态:

当线程池中所有任务都处理完毕后,线程并不会自己关闭。我们可以通过调用shutdownshutdownNow方法来关闭线程池。两者的区别在于:

  1. shutdown方法将线程池置为shutdown状态,拒绝新的任务提交,但线程池并不会马上关闭,而是等待所有正在折行的和线程队列里的任务都执行完毕后,线程池才会被关闭。所以这个方法是平滑的关闭线程池。
  2. shutdownNow方法将线程池置为stop状态,拒绝新的任务提交,中断正在执行的那些任务,并且清除线程队列里的任务并返回。所以这个方法是比较“暴力”的。

举两个例子观察下两者的区别:

shutdown例子:

123456789101112131415161718192021222324252627282930313233343536373839
public static void main(String[] args) {    ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(            2, 4, 10,            TimeUnit.SECONDS,            new ArrayBlockingQueue<>(2), (ThreadFactory) Thread::new,            new ThreadPoolExecutor.AbortPolicy());

    threadPoolExecutor.execute(new shortTask());    threadPoolExecutor.execute(new longTask());    threadPoolExecutor.execute(new longTask());    threadPoolExecutor.execute(new shortTask());

    threadPoolExecutor.shutdown();    System.out.println("已经执行了线程池shutdown方法");}

static class shortTask implements Runnable {    @Override    public void run() {        try {            TimeUnit.SECONDS.sleep(1);            System.out.println(Thread.currentThread().getName() + "执行shortTask完毕");        } catch (InterruptedException e) {            System.err.println("shortTask执行过程中被打断" + e.getMessage());        }    }}

static class longTask implements Runnable {    @Override    public void run() {        try {            TimeUnit.SECONDS.sleep(5);            System.out.println(Thread.currentThread().getName() + "执行longTask完毕");        } catch (InterruptedException e) {            System.err.println("longTask执行过程中被打断" + e.getMessage());        }    }}

启动程序,控制台输出如下:

可以看到,虽然在任务都被提交后马上执行了shutdown方法,但是并不会马上关闭线程池,而是等待所有被提交的任务都执行完了才关闭。

shutdownNow例子:

1234567891011121314151617181920212223242526272829303132333435363738394041
public static void main(String[] args) {    ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(            2, 4, 10,            TimeUnit.SECONDS,            new ArrayBlockingQueue<>(2), (ThreadFactory) Thread::new,            new ThreadPoolExecutor.AbortPolicy());

    threadPoolExecutor.execute(new shortTask());    threadPoolExecutor.execute(new longTask());    threadPoolExecutor.execute(new longTask());    threadPoolExecutor.execute(new shortTask());

    List<Runnable> runnables = threadPoolExecutor.shutdownNow(); // 马上关闭,并返回还未被执行的任务    System.out.println(runnables);

    System.out.println("已经执行了线程池shutdownNow方法");}

static class shortTask implements Runnable {    @Override    public void run() {        try {            TimeUnit.SECONDS.sleep(1);            System.out.println(Thread.currentThread().getName() + "执行shortTask完毕");        } catch (InterruptedException e) {            System.err.println("shortTask执行过程中被打断" + e.getMessage());        }    }}

static class longTask implements Runnable {    @Override    public void run() {        try {            TimeUnit.SECONDS.sleep(5);            System.out.println(Thread.currentThread().getName() + "执行longTask完毕");        } catch (InterruptedException e) {            System.err.println("longTask执行过程中被打断" + e.getMessage());        }    }}

启动程序,控制台输出如下:

可以看到,在执行shutdownNow方法后,线程池马上就被关闭了,正在执行中的两个任务被打断,并且返回了线程队列中等待被执行的两个任务。

通过上面两个例子我们还可以看到shutdownshutdownNow方法都不是阻塞的。常与shutdown搭配的方法有awaitTermination

awaitTermination方法接收timeout和TimeUnit两个参数,用于设定超时时间及单位。当等待超过设定时间时,会监测ExecutorService是否已经关闭,若关闭则返回true,否则返回false。该方法是阻塞的:

123456789101112131415161718192021222324252627282930313233343536373839404142434445
public static void main(String[] args) throws InterruptedException {    ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(            2, 4, 10,            TimeUnit.SECONDS,            new ArrayBlockingQueue<>(2), (ThreadFactory) Thread::new,            new ThreadPoolExecutor.AbortPolicy());

    threadPoolExecutor.execute(new shortTask());    threadPoolExecutor.execute(new longTask());    threadPoolExecutor.execute(new longTask());    threadPoolExecutor.execute(new shortTask());

    threadPoolExecutor.shutdown();    boolean isShutdown = threadPoolExecutor.awaitTermination(3, TimeUnit.SECONDS);    if (isShutdown) {        System.out.println("线程池在3秒内成功关闭");    } else {        System.out.println("等了3秒还没关闭,不等了╰(‵□′)╯");    }    System.out.println("------------");}

static class shortTask implements Runnable {    @Override    public void run() {        try {            TimeUnit.SECONDS.sleep(1);            System.out.println(Thread.currentThread().getName() + "执行shortTask完毕");        } catch (InterruptedException e) {            System.err.println("shortTask执行过程中被打断" + e.getMessage());        }    }}

static class longTask implements Runnable {    @Override    public void run() {        try {            TimeUnit.SECONDS.sleep(5);            System.out.println(Thread.currentThread().getName() + "执行longTask完毕");        } catch (InterruptedException e) {            System.err.println("longTask执行过程中被打断" + e.getMessage());        }    }}

启动程序输出如下:

4大拒绝策略

当线程池无法再接收新的任务的时候,可采取如下四种策略:

CallerRunsPolicy

CallerRunsPolicy策略:由调用线程处理该任务:

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253
public static void main(String[] args) throws InterruptedException {    ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(            2, 3, 10,            TimeUnit.SECONDS,            new ArrayBlockingQueue<>(1), (ThreadFactory) Thread::new,            new ThreadPoolExecutor.CallerRunsPolicy());

    threadPoolExecutor.execute(new shortTask("任务1"));    threadPoolExecutor.execute(new longTask("任务2"));    threadPoolExecutor.execute(new longTask("任务3"));    threadPoolExecutor.execute(new shortTask("任务4"));    threadPoolExecutor.execute(new shortTask("任务5"));

    threadPoolExecutor.shutdown();}

static class shortTask implements Runnable {

    private String name;

    public shortTask(String name) {        this.name = name;    }

    @Override    public void run() {        try {            TimeUnit.SECONDS.sleep(1);            System.out.println(Thread.currentThread().getName() + "执行shortTask-name-" + name + "完毕");        } catch (InterruptedException e) {            System.err.println("shortTask执行过程中被打断" + e.getMessage());        }    }}

static class longTask implements Runnable {

    private String name;

    public longTask(String name) {        this.name = name;    }

    @Override    public void run() {        try {            TimeUnit.SECONDS.sleep(5);            System.out.println(Thread.currentThread().getName() + "执行longTask-name-" + name + "完毕");        } catch (InterruptedException e) {            System.err.println("longTask执行过程中被打断" + e.getMessage());        }    }}

上面的线程池最多只能一次性提交4个任务,第5个任务提交后会被拒绝策略处理。启动程序输出如下:

可以看到,第5个提交的任务由调用线程(即main线程)处理该任务。

AbortPolicy

AbortPolicy策略:丢弃任务,并抛出RejectedExecutionException异常。前面的例子就是使用该策略,所以不再演示。

DiscardOldestPolicy

DiscardOldestPolicy策略:丢弃最早被放入到线程队列的任务,将新提交的任务放入到线程队列末端:

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253
public static void main(String[] args) throws InterruptedException {    ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(            2, 3, 10,            TimeUnit.SECONDS,            new ArrayBlockingQueue<>(1), (ThreadFactory) Thread::new,            new ThreadPoolExecutor.DiscardOldestPolicy());

    threadPoolExecutor.execute(new shortTask("任务1"));    threadPoolExecutor.execute(new longTask("任务2"));    threadPoolExecutor.execute(new longTask("任务3"));    threadPoolExecutor.execute(new shortTask("任务4"));    threadPoolExecutor.execute(new shortTask("任务5"));

    threadPoolExecutor.shutdown();}

static class shortTask implements Runnable {

    private String name;

    public shortTask(String name) {        this.name = name;    }

    @Override    public void run() {        try {            TimeUnit.SECONDS.sleep(1);            System.out.println(Thread.currentThread().getName() + "执行shortTask-name-" + name + "完毕");        } catch (InterruptedException e) {            System.err.println("shortTask执行过程中被打断" + e.getMessage());        }    }}

static class longTask implements Runnable {

    private String name;

    public longTask(String name) {        this.name = name;    }

    @Override    public void run() {        try {            TimeUnit.SECONDS.sleep(5);            System.out.println(Thread.currentThread().getName() + "执行longTask-name-" + name + "完毕");        } catch (InterruptedException e) {            System.err.println("longTask执行过程中被打断" + e.getMessage());        }    }}

启动程序输出如下:

可以看到最后提交的任务被执行了,而第3个任务是第一个被放到线程队列的任务,被丢弃了。

DiscardPolicy

DiscardPolicy策略:直接丢弃新的任务,不抛异常:

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253
public static void main(String[] args) throws InterruptedException {    ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(            2, 3, 10,            TimeUnit.SECONDS,            new ArrayBlockingQueue<>(1), (ThreadFactory) Thread::new,            new ThreadPoolExecutor.DiscardPolicy());

    threadPoolExecutor.execute(new shortTask("任务1"));    threadPoolExecutor.execute(new longTask("任务2"));    threadPoolExecutor.execute(new longTask("任务3"));    threadPoolExecutor.execute(new shortTask("任务4"));    threadPoolExecutor.execute(new shortTask("任务5"));

    threadPoolExecutor.shutdown();}

static class shortTask implements Runnable {

    private String name;

    public shortTask(String name) {        this.name = name;    }

    @Override    public void run() {        try {            TimeUnit.SECONDS.sleep(1);            System.out.println(Thread.currentThread().getName() + "执行shortTask-name-" + name + "完毕");        } catch (InterruptedException e) {            System.err.println("shortTask执行过程中被打断" + e.getMessage());        }    }}

static class longTask implements Runnable {

    private String name;

    public longTask(String name) {        this.name = name;    }

    @Override    public void run() {        try {            TimeUnit.SECONDS.sleep(5);            System.out.println(Thread.currentThread().getName() + "执行longTask-name-" + name + "完毕");        } catch (InterruptedException e) {            System.err.println("longTask执行过程中被打断" + e.getMessage());        }    }}

启动程序,输出如下:

第5个任务直接被拒绝丢弃了,而没有抛出任何异常。

线程池工厂方法

除了使用ThreadPoolExecutor的构造方法创建线程池外,我们也可以使用Executors提供的工厂方法来创建不同类型的线程池:

newFixedThreadPool

查看newFixedThreadPool方法源码:

12345
public static ExecutorService newFixedThreadPool(int nThreads) {    return new ThreadPoolExecutor(nThreads, nThreads,                                  0L, TimeUnit.MILLISECONDS,                                  new LinkedBlockingQueue<Runnable>());}

可以看到,通过newFixedThreadPool创建的是一个固定大小的线程池,大小由nThreads参数指定,它具有如下几个特点:

  1. 因为corePoolSize和maximumPoolSize的值都为nThreads,所以线程池中线程数量永远等于nThreads,不可能新建除了核心线程数的线程来处理任务,即keepAliveTime实际上在这里是无效的。
  2. LinkedBlockingQueue是一个无界队列(最大长度为Integer.MAX_VALUE),所以这个线程池理论是可以无限的接收新的任务,这就是为什么上面没有指定拒绝策略的原因。

newCachedThreadPool

查看newCachedThreadPool方法源码:

12345
public static ExecutorService newCachedThreadPool() {    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,                                  60L, TimeUnit.SECONDS,                                  new SynchronousQueue<Runnable>());}

这是一个理论上无限大小的线程池:

  1. 核心线程数为0,SynchronousQueue队列是没有长度的队列,所以当有新的任务提交,如果有空闲的还未超时的(最大空闲时间60秒)线程则执行该任务,否则新增一个线程来处理该任务。
  2. 因为线程数量没有限制,理论上可以接收无限个新任务,所以这里也没有指定拒绝策略。

newSingleThreadExecutor

查看newSingleThreadExecutor源码:

123456
public static ExecutorService newSingleThreadExecutor() {    return new FinalizableDelegatedExecutorService        (new ThreadPoolExecutor(1, 1,                                0L, TimeUnit.MILLISECONDS,                                new LinkedBlockingQueue<Runnable>()));}
  1. 核心线程数和最大线程数都为1,每次只能有一个线程处理任务。
  2. LinkedBlockingQueue队列可以接收无限个新任务。

newScheduledThreadPool

查看newScheduledThreadPool源码:

123456789
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {    return new ScheduledThreadPoolExecutor(corePoolSize);}......

public ScheduledThreadPoolExecutor(int corePoolSize) {    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,          new DelayedWorkQueue());}

所以newScheduledThreadPool理论是也是可以接收无限个任务,DelayedWorkQueue也是一个无界队列。

使用newScheduledThreadPool创建的线程池除了可以处理普通的Runnable任务外,它还具有调度的功能:

1.延迟指定时间后执行:

123
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);// 延迟5秒执行executorService.schedule(() -> System.out.println("hello"), 5, TimeUnit.SECONDS);

2.按指定的速率执行:

12345
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);// 延迟1秒执行,然后每5秒执行一次executorService.scheduleAtFixedRate(        () -> System.out.println(LocalTime.now()), 1, 5, TimeUnit.SECONDS);

3.按指定的时延执行:

1234
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);executorService.scheduleWithFixedDelay(        () -> System.out.println(LocalTime.now()), 1, 5, TimeUnit.SECONDS);

乍一看,scheduleAtFixedRate和scheduleWithFixedDelay没啥区别,实际它们还是有区别的:

  • scheduleAtFixedRate按照固定速率执行任务,比如每5秒执行一个任务,即使上一个任务没有结束,5秒后也会开始处理新的任务;
  • scheduleWithFixedDelay按照固定的时延处理任务,比如每延迟5秒执行一个任务,无论上一个任务处理了1秒,1分钟还是1小时,下一个任务总是在上一个任务执行完毕后5秒钟后开始执行。

对于这些线程池工厂方法的使用,阿里巴巴编程规程指出:

因为这几个线程池理论是都可以接收无限个任务,所以这就有内存溢出的风险。实际上只要我们掌握了ThreadPoolExecutor构造函数7个参数的含义,我们就可以根据不同的业务来创建出符合需求的线程池。一般线程池的创建可以参考如下规则:

  • IO密集型任务,线程池线程数量可以设置为2 X CPU核心数;
  • 计算密集型任务,线程池线程数量可以设置为CPU核心数 + 1。

一些API的用法

ThreadPoolExecutor提供了几个判断线程池状态的方法:

12345678910111213141516171819202122
public static void main(String[] args) throws InterruptedException {    ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(            1, 2, 5, TimeUnit.SECONDS,            new ArrayBlockingQueue<>(1), (ThreadFactory) Thread::new,            new ThreadPoolExecutor.AbortPolicy()    );

    threadPoolExecutor.execute(() -> {        try {            TimeUnit.SECONDS.sleep(5);        } catch (InterruptedException e) {            e.printStackTrace();        }    });

    threadPoolExecutor.shutdown();    System.out.println("线程池为shutdown状态:" + threadPoolExecutor.isShutdown());    System.out.println("线程池正在关闭:" + threadPoolExecutor.isTerminating());    System.out.println("线程池已经关闭:" + threadPoolExecutor.isTerminated());    threadPoolExecutor.awaitTermination(6, TimeUnit.SECONDS);    System.out.println("线程池已经关闭" + threadPoolExecutor.isTerminated());}

程序输出如下:

前面我们提到,线程池核心线程即使是空闲状态也不会被销毁,除非使用allowCoreThreadTimeOut设置了允许核心线程超时:

12345678910111213141516
public static void main(String[] args) throws InterruptedException {       ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(               1, 2, 3, TimeUnit.SECONDS,               new ArrayBlockingQueue<>(1), (ThreadFactory) Thread::new,               new ThreadPoolExecutor.AbortPolicy()       );       threadPoolExecutor.allowCoreThreadTimeOut(true);       threadPoolExecutor.execute(() -> {           try {               TimeUnit.SECONDS.sleep(5);               System.out.println("任务执行完毕");           } catch (InterruptedException e) {               e.printStackTrace();           }       });   }

程序输出如下所示:

5秒后任务执行完毕,核心线程处于空闲的状态。因为通过allowCoreThreadTimeOut方法设置了允许核心线程超时,所以3秒后(keepAliveTime设置为3秒),核心线程被销毁。核心线程被销毁后,线程池也就没有作用了,于是就自动关闭了。

值得注意的是,如果一个线程池调用了allowCoreThreadTimeOut(true)方法,那么它的keepAliveTime不能为0。

ThreadPoolExecutor提供了一remove方法,查看其源码:

12345
public boolean remove(Runnable task) {    boolean removed = workQueue.remove(task);    tryTerminate(); // In case SHUTDOWN and now empty    return removed;}

可看到,它删除的是线程队列中的任务,而非正在被执行的任务。举个例子:

123456789101112131415161718192021
public static void main(String[] args) throws InterruptedException {    ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(            1, 2, 3, TimeUnit.SECONDS,            new ArrayBlockingQueue<>(1), (ThreadFactory) Thread::new,            new ThreadPoolExecutor.AbortPolicy()    );    threadPoolExecutor.execute(() -> {        try {            TimeUnit.SECONDS.sleep(5);            System.out.println("任务执行完毕");        } catch (InterruptedException e) {            e.printStackTrace();        }    });

    Runnable r = () -> System.out.println("看看我是否会被删除");    threadPoolExecutor.execute(r);    threadPoolExecutor.remove(r);

    threadPoolExecutor.shutdown();}

执行程序,输出如下:

可看到任务并没有被执行,已经被删除,因为唯一一个核心线程已经在执行任务了,所以后提交的这个任务被放到了线程队列里,然后通过remove方法删除。

默认情况下,只有当往线程池里提交了任务后,线程池才会启动核心线程处理任务。我们可以通过调用prestartCoreThread方法,让核心线程即使没有任务提交,也处于等待执行任务的活跃状态:

1234567891011121314
public static void main(String[] args) throws InterruptedException {    ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(            2, 2, 3, TimeUnit.SECONDS,            new ArrayBlockingQueue<>(1), (ThreadFactory) Thread::new,            new ThreadPoolExecutor.AbortPolicy()    );    System.out.println("活跃线程数: " + threadPoolExecutor.getActiveCount());    threadPoolExecutor.prestartCoreThread();    System.out.println("活跃线程数: " + threadPoolExecutor.getActiveCount());    threadPoolExecutor.prestartCoreThread();    System.out.println("活跃线程数: " + threadPoolExecutor.getActiveCount());    threadPoolExecutor.prestartCoreThread();    System.out.println("活跃线程数: " + threadPoolExecutor.getActiveCount());}

程序输出如下所示:

该方法返回boolean类型值,如果所以核心线程都启动了,返回false,反之返回true。

还有一个和它类似的prestartAllCoreThreads方法,它的作用是一次性启动所有核心线程,让其处于活跃地等待执行任务的状态。

ThreadPoolExecutor的invokeAny方法用于随机执行任务集合中的某个任务,并返回执行结果,该方法是同步方法:

123456789101112131415161718
public static void main(String[] args) throws InterruptedException, ExecutionException {    ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(            2, 5, 3, TimeUnit.SECONDS,            new ArrayBlockingQueue<>(1), (ThreadFactory) Thread::new,            new ThreadPoolExecutor.AbortPolicy()    );

    // 任务集合    List<Callable<Integer>> tasks = IntStream.range(0, 4).boxed().map(i -> (Callable<Integer>) () -> {        TimeUnit.SECONDS.sleep(ThreadLocalRandom.current().nextInt(5));        return i;    }).collect(Collectors.toList());    // 随机执行结果    Integer result = threadPoolExecutor.invokeAny(tasks);    System.out.println("-------------------");    System.out.println(result);    threadPoolExecutor.shutdownNow();}

启动程序,输出如下:

ThreadPoolExecutor的invokeAll则是执行任务集合中的所有任务,返回Future集合:

1234567891011121314151617181920212223
public static void main(String[] args) throws InterruptedException, ExecutionException {    ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(            2, 5, 3, TimeUnit.SECONDS,            new ArrayBlockingQueue<>(1), (ThreadFactory) Thread::new,            new ThreadPoolExecutor.AbortPolicy()    );

    List<Callable<Integer>> tasks = IntStream.range(0, 4).boxed().map(i -> (Callable<Integer>) () -> {        TimeUnit.SECONDS.sleep(ThreadLocalRandom.current().nextInt(5));        return i;    }).collect(Collectors.toList());

    List<Future<Integer>> futureList = threadPoolExecutor.invokeAll(tasks);    futureList.stream().map(f->{        try {            return f.get();        } catch (InterruptedException | ExecutionException e) {           return null;        }    }).forEach(System.out::println);

    threadPoolExecutor.shutdownNow();}

输出如下:

总结下这些方法:

方法 描述
allowCoreThreadTimeOut(boolean value) 是否允许核心线程空闲后超时,是的话超时后核心线程将销毁,线程池自动关闭
awaitTermination(long timeout, TimeUnit unit) 阻塞当前线程,等待线程池关闭,timeout用于指定等待时间。
execute(Runnable command) 向线程池提交任务,没有返回值
submit(Runnable task) 向线程池提交任务,返回Future
isShutdown() 判断线程池是否为shutdown状态
isTerminating() 判断线程池是否正在关闭
isTerminated() 判断线程池是否已经关闭
remove(Runnable task) 移除线程队列中的指定任务
prestartCoreThread() 提前让一个核心线程处于活跃状态,等待执行任务
prestartAllCoreThreads() 提前让所有核心线程处于活跃状态,等待执行任务
getActiveCount() 获取线程池活跃线程数
getCorePoolSize() 获取线程池核心线程数
threadPoolExecutor.getQueue() 获取线程池线程队列
getMaximumPoolSize() 获取线程池最大线程数
shutdown() 让线程池处于shutdown状态,不再接收任务,等待所有正在运行中的任务结束后,关闭线程池。
shutdownNow() 让线程池处于stop状态,不再接受任务,尝试打断正在运行中的任务,并关闭线程池,返回线程队列中的任务。

原文地址:https://www.cnblogs.com/7788IT/p/11426443.html

时间: 2024-09-29 01:13:50

深入学习Java线程池的相关文章

从使用到原理学习Java线程池

来源:SilenceDut http://www.codeceo.com/article/java-threadpool-learn.html 线程池的技术背景 在面向对象编程中,创建和销毁对象是很费时间的,因为创建一个对象要获取内存资源或者其它更多资源.在Java中更是如此,虚拟机将试图跟踪每一个对象,以便能够在对象销毁后进行垃圾回收. 所以提高服务程序效率的一个手段就是尽可能减少创建和销毁对象的次数,特别是一些很耗资源的对象创建和销毁.如何利用已有对象来服务就是一个需要解决的关键问题,其实这

Java 线程池学习

Reference: <创建Java线程池>[1],<Java线程:新特征-线程池>[2], <Java线程池学习>[3],<线程池ThreadPoolExecutor使用简介>[4],<Java5中的线程池实例讲解>[5],<ThreadPoolExecutor使用和思考>[6] [1]中博主自己通过ThreadGroup实现一个线程池(挺方便理解的),使用的是jdk1.4版本,Jdk1.5版本以上提供了现成的线程池. [2]中介绍

java线程池的学习

package advancedJava;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.TimeUnit; * java 线程池学习 * @author: cuiH * Date: 13-12-7public class ThreadPoolTest { * 线程池的概念 * jdk5提出了ThreadPool的概念 * 之

【Java线程池快速学习教程】

1. Java线程池 线程池:顾名思义,用一个池子装载多个线程,使用池子去管理多个线程. 问题来源:应用大量通过new Thread()方法创建执行时间短的线程,较大的消耗系统资源并且系统的响应速度变慢.[在一个什么程度上能够判断启用线程池对系统的资源消耗比启动定量的new Thread()资源消耗低?这个怎么测试?][用户体验卡顿?慢?观察CPU百分比?] 解决办法:使用线程池管理短时间执行完毕的大量线程,通过重用已存在的线程,降低线程创建和销毁造成的消耗,提高系统响应速度. 2. Java线

Java深入学习13:Java线程池

Java深入学习13:Java线程池 一.线程池的作用 线程池提供一个线程队列,队列中保存着所有等待状态的线程.避免了创建与销毁等额外开销,提交了响应的速度. 二.类关系 Java线程池相关的接口和类均在 java.util.concurrent 包下,其相关关系(部分)如下 三.Executors类以及相关常用方法介绍 1-Executors类简介:简单的说是线程方法的工具类,提供了 创建线程池等方法. 2-ExecutorService 类创建线程池 //创建缓存线程池,线程数量不固定,可以

Java线程池学习

一.实现Java多线程的方法 1.继承Thread类创建多线程 Thread类本质实现了Runnable接口.启动线程为start0()方法.是个native方法. 1 public class ThreadProcess extends Thread { 2 3 @Override 4 public void run(){ 5 long lastTime = System.currentTimeMillis(); 6 for(int i = 0; i < 1; i++){ 7 int ele

Java 线程池的原理与实现

最近在学习线程池.内存控制等关于提高程序运行性能方面的编程技术,在网上看到有一哥们写得不错,故和大家一起分享. [分享]Java 线程池的原理与实现 这几天主要是狂看源程序,在弥补了一些以前知识空白的同时,也学会了不少新的知识(比如 NIO),或者称为新技术吧.线程池就是其中之一,一提到线程,我们会想到以前<操作系统>的生产者与消费者,信号量,同步控制等等.一提到池,我们会想到数据库连接池,但是线程池又如何呢? 建议:在阅读本文前,先理一理同步的知识,特别是syncronized同步关键字的用

JAVA线程池ThreadPoolExecutor与阻塞队列BlockingQueue .

从Java5开始,Java提供了自己的线程池.每次只执行指定数量的线程,java.util.concurrent.ThreadPoolExecutor 就是这样的线程池.以下是我的学习过程. 首先是构造函数签名如下: [java] view plain copy print ? public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<

Java线程池与java.util.concurrent

Java(Android)线程池 介绍new Thread的弊端及Java四种线程池的使用,对Android同样适用.本文是基础篇,后面会分享下线程池一些高级功能. 1.new Thread的弊端执行一个异步任务你还只是如下new Thread吗? new Thread(new Runnable() { @Override public void run() { // TODO Auto-generated method stub } }).start(); 那你就out太多了,new Thre