线程池可以解决两个不同问题:由于减少了每个任务调用的开销,它们通常可以在执行大量异步任务时提供增强的性能,并且还可以提供绑定和管理资源(包括执行任务集时使用的线程)的方法。每个 ThreadPoolExecutor 还维护着一些基本的统计数据,如完成的任务数。
Java常用的线程池有四种。Executors.newCachedThreadPool()
(无界线程池,可以进行自动线程回收)、Executors.newFixedThreadPool(int)
(固定大小线程池)、Executors.newSingleThreadExecutor()
(单个后台线程),以及ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler)
1.Executors.newCachedThreadPool()
(无界线程池,可以进行自动线程回收)
创建一个可根据需要创建新线程的线程池,但是在以前构造的线程可用时将重用它们。对于执行很多短期异步任务的程序而言,这些线程池通常可提高程序性能。调用 execute 将重用以前构造的线程(如果线程可用)。如果现有线程没有可用的,则创建一个新线程并添加到池中。终止并从缓存中移除那些已有 60 秒钟未被使用的线程。因此,长时间保持空闲的线程池不会使用任何资源。
2.Executors.newFixedThreadPool(int nThreads)
(固定大小线程池)
创建一个可重用固定线程数的线程池,以共享的无界队列方式来运行这些线程。在任意点,在大多数 nThreads 线程会处于处理任务的活动状态。如果在所有线程处于活动状态时提交附加任务,则在有可用线程之前,附加任务将在队列中等待。如果在关闭前的执行期间由于失败而导致任何线程终止,那么一个新线程将代替它执行后续的任务(如果需要)。在某个线程被显式地关闭
之前,池中的线程将一直存在。
3.Executors.newSingleThreadExecutor()
(单个后台线程)
创建一个使用单个 worker 线程的 Executor,以无界队列方式来运行该线程。(注意,如果因为在关闭前的执行期间出现失败而终止了此单个线程,那么如果需要,一个新线程将代替它执行后续的任务)。可保证顺序地执行各个任务,并且在任意给定的时间不会有多个线程是活动的。
4.ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler)
ThreadPoolExecutor 将根据 corePoolSize(参见 getCorePoolSize()
)和 maximumPoolSize(参见 getMaximumPoolSize()
)设置的边界自动调整池大小。当新任务在方法 execute(java.lang.Runnable)
中提交时,如果运行的线程少于 corePoolSize,则创建新线程来处理请求,即使其他辅助线程是空闲的。如果运行的线程多于 corePoolSize 而少于 maximumPoolSize,则仅当队列满时才创建新线程。如果设置的 corePoolSize 和 maximumPoolSize 相同,则创建了固定大小的线程池。如果将 maximumPoolSize 设置为基本的无界值(如 Integer.MAX_VALUE),则允许池适应任意数量的并发任务。在大多数情况下,核心和最大池大小仅基于构造来设置,不过也可以使用 setCorePoolSize(int)
和 setMaximumPoolSize(int)
进行动态更改。
如果池中当前有多于 corePoolSize 的线程,则这些多出的线程在空闲时间超过 keepAliveTime 时将会终止这提供了当池处于非活动状态时减少资源消耗的方法。
所有 BlockingQueue
都可用于传输和保持提交的任务。可以使用此队列与池大小进行交互:如果运行的线程少于 corePoolSize,则 Executor 始终首选添加新的线程,而不进行排队。如果运行的线程等于或多于 corePoolSize,则 Executor 始终首选将请求加入队列,而不添加新的线程。如果无法将请求加入队列,则创建新的线程,除非创建此线程超出 maximumPoolSize,在这种情况下,任务将被拒绝。
排队有三种通用策略:
(1)直接提交。工作队列的默认选项是 SynchronousQueue
,它将任务直接提交给线程而不保持它们。在此,如果不存在可用于立即运行任务的线程,则试图把任务加入队列将失败,因此会构造一个新的线程。此策略可以避免在处理可能具有内部依赖性的请求集时出现锁。直接提交通常要求无界 maximumPoolSizes 以避免拒绝新提交的任务。当命令以超过队列所能处理的平均数连续到达时,此策略允许无界线程具有增长的可能性。
(2)无界队列。使用无界队列(例如,不具有预定义容量的 LinkedBlockingQueue
)将导致在所有 corePoolSize 线程都忙时新任务在队列中等待。这样,创建的线程就不会超过 corePoolSize。(因此,maximumPoolSize 的值也就无效了。)当每个任务完全独立于其他任务,即任务执行互不影响时,适合于使用无界队列;例如,在 Web 页服务器中。这种排队可用于处理瞬态突发请求,当命令以超过队列所能处理的平均数连续到达时,此策略允许无界线程具有增长的可能性。
(3)有界队列。当使用有限的 maximumPoolSizes 时,有界队列(如 ArrayBlockingQueue
)有助于防止资源耗尽,但是可能较难调整和控制。队列大小和最大池大小可能需要相互折衷:使用大型队列和小型池可以最大限度地降低 CPU 使用率、操作系统资源和上下文切换开销,但是可能导致人工降低吞吐量。如果任务频繁阻塞(例如,如果它们是 I/O 边界),则系统可能为超过您许可的更多线程安排时间。使用小型队列通常要求较大的池大小,CPU 使用率较高,但是可能遇到不可接受的调度开销,这样也会降低吞吐量。
将有限边界用于最大线程和工作队列容量,排队任务已经饱和时,在方法 execute(java.lang.Runnable)
中提交的新任务将被拒绝。在以上两种情况下,execute 方法都将调用其 RejectedExecutionHandler
的 RejectedExecutionHandler.rejectedExecution(java.lang.Runnable, java.util.concurrent.ThreadPoolExecutor)
方法。下面提供了四种预定义的处理程序策略:
(1)在默认的ThreadPoolExecutor.AbortPolicy
中,处理程序遭到拒绝将抛出运行时 RejectedExecutionException
。
(2)在ThreadPoolExecutor.CallerRunsPolicy
中,线程调用运行该任务的execute本身,相当于直接调用run()方法。
(3)在ThreadPoolExecutor.DiscardPolicy
中,不能执行的任务将被删除。
(4)在ThreadPoolExecutor.DiscardOldestPolicy
中,如果执行程序尚未关闭,则位于工作队列头部的任务将被删除,然后重试执行程序(如果再次失败,则重复此过程)。
代码举例如下
1 import java.text.SimpleDateFormat; 2 import java.util.Date; 3 import java.util.concurrent.ArrayBlockingQueue; 4 import java.util.concurrent.ThreadPoolExecutor; 5 import java.util.concurrent.TimeUnit; 6 7 public class MyThread extends Thread 8 { 9 private String name; 10 11 private static SimpleDateFormat df = new SimpleDateFormat("HH:mm:ss"); 12 13 public MyThread(String name) 14 { 15 this.name = name; 16 } 17 18 public void run() 19 { 20 try 21 { 22 System.out.println("Thread " + this.name + ":" + df.format(new Date())); 23 Thread.sleep(1000); // 增加代码执行时间 24 } 25 catch (InterruptedException e) 26 { 27 e.printStackTrace(); 28 } 29 } 30 31 public static void main(String[] args) 32 { 33 // 生成一个线程池。核心线程数3,最大线程数5,空闲线程回收时间1000ms,阻塞队列长度5,队列满后新任务放弃 34 ThreadPoolExecutor threadPool = 35 new ThreadPoolExecutor(3, 5, 1000, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(5), 36 new ThreadPoolExecutor.DiscardPolicy()); 37 threadPool.execute(new MyThread("01"));// 需要返回结果时,用submit 38 threadPool.execute(new MyThread("02")); 39 threadPool.execute(new MyThread("03")); 40 threadPool.execute(new MyThread("04")); 41 threadPool.execute(new MyThread("05")); 42 threadPool.execute(new MyThread("06")); 43 threadPool.execute(new MyThread("07")); 44 threadPool.execute(new MyThread("08")); 45 threadPool.execute(new MyThread("09")); 46 threadPool.execute(new MyThread("10")); 47 threadPool.execute(new MyThread("11")); 48 threadPool.execute(new MyThread("12")); 49 50 } 51 }
打印结果
1 Thread 01:12:49:25 2 Thread 03:12:49:25 3 Thread 10:12:49:25 4 Thread 02:12:49:25 5 Thread 09:12:49:25 6 Thread 04:12:49:26 7 Thread 05:12:49:26 8 Thread 06:12:49:26 9 Thread 07:12:49:26 10 Thread 08:12:49:26
依次添加12个任务时,
(1)前三个直接创建线程执行;
(2)4-8这5个任务进入队列排队等待执行;
(3)由于队列已满且maximumPoolSize=5,任务9、10创建线程执行(总线程数小于等于MaxSize),11、12直接抛弃;
(4)1、2、3、9、10任务执行结束,队列中的任务执行;
(5)执行结束后,两条线程在超过超时时间(1000ms)后被终止,存活的线程保持corePoolSize=3大小,即使此时没有任务。