线程复用:线程池

一、核心线程池内部实现
为了能够更好地控制多线程,JDK提供了一套Executor框架,帮助开发人员有效地进行线程控制,其本质就是一个线程池。它的核心成员如图

以上成员均在java.util.concurrent包中,是JDK并发包的核心类。其中ThreadPoolExecutor表示一个线程池。Executors类则扮演着线程池工厂的角色,通过Executors可以取得一个拥有特定功能的线程池。

Executor框架提供了各种类型的线程池,主要有以下工厂方法:
public static ExecutorService newFixedThreadPool(int nThreads)
public static ExecutorService newSingleThreadExecutor()
public static ExecutorService newCachedThreadPool()
public static ScheduledExecutorService newSingleThreadScheduledExecutor()
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)

?newFixedThreadPool()方法:该方法返回一个固定线程数量的线程池。该线程池中的线程数量始终不变。当有一个新的任务提交时,线程池中若有空闲线程,则立即执行。若没有,则新的任务会被暂存在一个任务队列中,待有线程空闲时,便处理在任务队列中的任务。 ?newSingleThreadExecutor()方法:该方法返回一个只有一个线程的线程池。若多余一个任务被提交到该线程池,任务会被保存在一个任务队列中,待线程空闲,按先入先出的顺序执行队列中的任务。
?newCachedThreadPool()方法:该方法返回一个可根据实际情况调整线程数量的线程池。线程池的线程数量不确定,但若有空闲线
程可以复用,则会优先使用可复用的线程。若所有线程均在工作,又有新的任务提交,则会创建新的线程处理任务。所有线程在当前任务执行完毕后,将返回线程池进行复用。
?newSingleThreadScheduledExecutor()方法:该方法返回一个ScheduledExecutorSer-vice对象,线程池大小为1。ScheduledEx-ecutorService接口在ExecutorService接口之上扩展了在给定时间执行某任务的功能,如在某个固定的延时之后执行,或者周期性执行某个任务。
?newScheduledThreadPool()方法:该方法也返回一个ScheduledExecutorService对象,但该线程池可以指定线程数量。

另外一个值得注意的方法是newSched-uledThreadPool()。它返回一个ScheduledExecu-torService对象,可以根据时间需要对线程进行调度。它会在指定的时间,对任务进行调度。
它的一些主要方法如下:
public ScheduledFuture<?> schedule(Runnable command,long delay, TimeUnit unit);
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit);
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,long initialDelay, long delay,TimeUnit unit);

对于FixedRate方式来说,任务调度的频率是一定的。它是以上一个任务开始执行时间为起点,之后的period时间,调度下一次任务。而FixDe-lay则是在上一个任务结束后,再经过delay时间进行任务调度。
对于FixedRate 周期如果太短,那么任务就会在上一个任务结束后,立即被调用
另外一个值得注意的问题是,调度程序实际上并不保证任务会无限期的持续调用。如果任务本身抛出了异常,那么后续的所有执行都会被中断,因此,如果你想让你的任务持续稳定的执行,那么做好异常处理就非常重要,否则,你很有可能观察到你的调度器无疾而终。

ThreadPoolExecutor最重要的构造函数:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler
函数的参数含义如下。
?corePoolSize:指定了线程池中的线程数量。
?maximumPoolSize:指定了线程池中的最大线程数量。
?keepAliveTime:当线程池线程数量超过corePoolSize时,多余的空闲线程的存活时间。即,超过corePoolSize的空闲线程,在多长时间内,会被销毁。
?unit:keepAliveTime的单位。
?workQueue:任务队列,被提交但尚未被执行的任务。
?threadFactory:线程工厂,用于创建线程,一般用默认的即可。
?handler:拒绝策略。当任务太多来不及处理,如何拒绝任务。

参数workQueue指被提交但未执行的任务队列,它是一个BlockingQueue接口的对象,仅用于存放Runnable对象。根据队列功能分类,在ThreadPoolExecutor的构造函数中可使用以下几种BlockingQueue。
1.直接提交的队列:该功能由Syn-chronousQueue对象提供。Syn-chronousQueue是一个特殊的Block-ingQueue。SynchronousQueue没有容量,每一个插入操作都要等待一个相应的删除操作,反之,每一个删除操作都要等待对应的插入操作。如果使用SynchronousQueue,提交的任务不会被真实的保存,而总是将新任务提交给线程执行,如果没有空闲的进程,则尝试创建新的进程,如果进程数量已经达到最大值,则执行拒绝策略。因此,使用Syn-chronousQueue队列,通常要设置很大的maximumPoolSize值,否则很容易执行拒绝策略。
2.有界的任务队列:有界的任务队列可以使用ArrayBlockingQueue实现。ArrayBlock-ingQueue的构造函数必须带一个容量参数,表示该队列的最大容量,如下所示。public ArrayBlockingQueue(int capacity)当使用有界的任务队列时,若有新的任务需要执行,如果线程池的实际线程数小于core-PoolSize,则会优先创建新的线程,若大于corePoolSize,则会将新任务加入等待队列。若等待队列已满,无法加入,则在总线程数不大于maximumPoolSize的前提下,创建新的进程执行任务。若大于maximumPoolSize,则执行拒绝策略。可见,有界队列仅当在任务队列装满时,才可能将线程数提升到core-PoolSize以上,换言之,除非系统非常繁忙,否则确保核心线程数维持在在corePoolSize。
3.无界的任务队列:无界任务队列可以通过LinkedBlockingQueue类实现。与有界队列相比,除非系统资源耗尽,否则无界的任务队列不存在任务入队失败的情况。当有新的任务到来,系统的线程数小于corePoolSize时,线程池会生成新的线程执行任务,但当系统的线程数达到corePoolSize后,就不会继续增加。若后续仍有新的任务加入,而又没有空闲的线程资源,则任务直接进入队列等待。若任务创建和处理的速度差异很大,无界队列会保持快速增长,直到耗尽系统内存。
4.优先任务队列:优先任务队列是带有执行优先级的队列。它通过PriorityBlockingQueue实现,可以控制任务的执行先后顺序。它是一个特殊的无界队列。无论是有界队列Array-BlockingQueue,还是未指定大小的无界队列LinkedBlockingQueue都是按照先进先出算法处理任务的。而PriorityBlockingQueue则可以根据任务自身的优先级顺序先后执行,在确保系统性能的同时,也能有很好的质量保证(总是确保高优先级的任务先执

ThreadPoolExecutor线程池的核心调度代码,这段代码也充分体现了上述线程池的工作逻辑:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn‘t, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}

代码第5行的workerCountOf()函数取得了当前线程池的线程总数。当线程总数小于corePool-Size核心线程数时,会将任务通过addWorker()方法直接调度执行。否则,则在第10行代码处(workQueue.offer())进入等待队列。如果进入等待队列失败(比如有界队列到达了上限,或者使用了SynchronousQueue),则会执行第17行,将任务直接提交给线程池。如果当前线程数已经达到maximumPoolSize,则提交失败,就执行第18行的拒绝策略。

二、超负荷下的拒绝策略

JDK内置的拒绝策略如下:
?AbortPolicy策略:该策略会直接抛出异常,阻止系统正常工作。
?CallerRunsPolicy策略:只要线程池未关闭,该策略直接在调用者线程中,运行当前被丢弃的任务。显然这样做不会真的丢弃任务,但是,任务提交线程的性能极有可能会急剧下降。
?DiscardOledestPolicy策略:该策略将丢弃最老的一个请求,也就是即将被执行的一个任务,并尝试再次提交当前任务。 ?DiscardPolicy策略:该策略默默地丢弃无法处理的任务,不予任何处理。如果允许任务丢失,我觉得这可能是最好的一种方案了吧!以 上内置的策略均实现了RejectedExecution-Handler接口,若以上策略仍无法满足实际应用需要,完全可以自己扩展RejectedExecutionHandler接口。

三、ThreadFactory
ThreadFactory是一个接口,它只有一个方法,用来创建线程:Thread newThread(Runnable r);当线程池需要新建线程时,就会调用这个方法。
自定义线程池可以帮助我们做不少事。比如,我们可以跟踪线程池究竟在何时创建了多少线程,也可以自定义线程的名称、组以及优先级等信息,甚至可以任性地将所有的线程设置为守护线程。

四、ThreadPoolExecutor线程池扩展
ThreadPoolExecutor也是一个可以扩展的线程池。它提供了beforeExecute()、af-terExecute()和terminated()三个接口对线程池进行控制。
beforeExecute()、afterExecute()和ter-miniated()三个方法。这三个方法分别用于记录一个任务的开始、结束和整个线程池的退出。
以beforeExecute()、afterExecute()为例,
在ThreadPoolExecutor.Worker.runTask()方法内部提供了这样的实现:
boolean ran = false;
beforeExecute(thread, task);//运行前
try {
task.run(); //运行任务
ran = true;
afterExecute(task, null); //运行结束后
++completedTasks;
} catch (RuntimeException ex) {
if (!ran)
afterExecute(task, ex);//运行结束
throw ex;}

五、优化线程池线程数量
在《Java Concurrency in Practice》一书中给出了一个估算线程池大小的经验公式:
Ncpu=CPU的数量
Ucpu=目标CPU的使用率,0≤Ucpu≤1
W/C=等待时间与计算时间的比率为保持处理器达到期望的使用率,最优的池的大小等于:
Nthreads=Ncpu*Ucpu*(1+W/C)
在Java中,可以通过:Runtime.getRuntime().availableProcessors()取得可用的CPU数量。

六、分而治之:Fork/Join框架
在实际使用中,如果毫无顾忌地使用fork()开启线程进行处理,那么很有可能导致系统开启过多的线程而严重影响性能。所以,在JDK中,给出了一个ForkJoinPool线程池,对于fork()方法并不急着开启线程,而是提交给ForkJoinPool线程池进行处理,以节省系统资源。fork()用来开启线程,join()用来等待。
下ForkJoinPool的一个重要的接口:
public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task)
你可以向ForkJoinPool线程池提交一个ForkJoinTask任务。所谓ForkJoinTask任务就是支持fork()分解以及join()等待的任务。ForkJoinTask有两个重要的子类,RecursiveAction和Recur-siveTask。它们分别表示没有返回值的任务和可以携带返回值的任务。

下面我们简单地展示Fork/Join框架的使用,这里用来计算数列求和。
01 public class CountTask extends RecursiveTask<Long>{
02 private static final int THRESHOLD = 10000;
03 private long start;
04 private long end;
05
06 public CountTask(long start,long end){
07 this.start=start;
08 this.end=end;
09 }
10
11 public Long compute(){
12 long sum=0;
13 boolean canCompute = (end-start)<THRESHOLD;
14 if(canCompute){
15 for(long i=start;i<=end;i++){
16 sum +=i;
17 }
18 }else{
19 //分成100个小任务
20 long step=(start+end)/100;
21 ArrayList<CountTask> subTasks=new ArrayList<CountTask>();
22 long pos=start;
23 for(int i=0;i<100;i++){
24 long lastOne=pos+step;
25 if(lastOne>end)lastOne=end;
26 CountTask subTask=new CountTask(pos,lastOne);
27 pos+=step+1;
28 subTasks.add(subTask);
29 subTask.fork();
30 }
31 for(CountTask t:subTasks){
32 sum+=t.join();
33 }
34 }
35 return sum;
36 }
37
38 public static void main(String[]args){
39 ForkJoinPool forkJoinPool = new ForkJoinPool();
40 CountTask task = new CountTask(0,200000L);
41 ForkJoinTask<Long> result = forkJoinPool.submit(task);
42 try{
43 long res = result.get();
44 System.out.println("sum="+res);
45 }catch(InterruptedException e){
46 e.printStackTrace();
47 }catch(ExecutionException e){
48 e.printStackTrace();
49 }
50 }
51 }

由于计算数列的和必然是需要函数返回值的,因此选择RecursiveTask作为任务的模型。上述代码第39行,建立ForkJoinPool线程池。在第40行,构造一个计算1到200000求和的任务。在第41行将任务提交给线程池,线程池会返回一个携带结果的任务,通过get()方法可以得到最终结果(第43行)。如果在执行get()方法时,任务没有结束,那么主线程就会在get()方法时等待。

时间: 2024-12-28 11:57:26

线程复用:线程池的相关文章

线程的复用:线程池(读书笔记)

多线程的软件设计方法确实可以最大限度的发挥现代多核心处理器的计算能力,提高生产系统的吞吐量和性能,但是若不加控制和管理的随意使用线程,对熊的性能反而产生了不力的影响. 在实际生产环境中,线程的数量必须得到控制,盲目的大量创建线程对系统性能是有伤害的. 什么是线程池: 为了避免系统频繁的创建和销毁线程,我们可以让创建的线程进行复用,大家对数据库连接池肯定不陌生,线程池也是一个目的,线程池中,总有那么几条活跃的线程,当你需要线程的时候,可以从池子中随便哪一个空闲线程,当完成工作时,并不着急关闭线程,

java 线程、线程池基本应用示例代码回顾

package org.rui.thread; /** * 定义任务 * * @author lenovo * */ public class LiftOff implements Runnable { protected int countDown=10; private static int taskCount=0; private final int id=taskCount++; public LiftOff(){} public LiftOff(int countDown) { thi

线程与线程池,实例比较。

线程池: int count = 200000; long startTime = System.currentTimeMillis(); final List<Integer> l = new LinkedList<Integer>(); ThreadPoolExecutor tp = new ThreadPoolExecutor(1, 1, 60, TimeUnit.SECONDS, new LinkedBlockingDeque<Runnable>(count))

java 线程、线程池基本应用演示样例代码回想

package org.rui.thread; /** * 定义任务 * * @author lenovo * */ public class LiftOff implements Runnable { protected int countDown=10; private static int taskCount=0; private final int id=taskCount++; public LiftOff(){} public LiftOff(int countDown) { thi

InheritableThreadLocal线程复用

引自:http://www.cnblogs.com/sweetchildomine/p/6575666.html 虽然使用AOP可以获取方法签名,但是如果要获取方法中计算得出的数据,那么就得使用ThreadLocal,如果还涉及父线程,那么可以选择InheritableThreadLocal. 注意:理解一些原理能够减少很多不可控问题,最简单的使用方式就是不要交给线程池处理.为了提高一点性能,而导致数据错误得不偿失. 2018年4月12日 12:44:41更新 关于InheritableThre

InnoDB 存储引擎的线程与内存池

InnoDB 存储引擎的线程与内存池 InnoDB体系结构如下: 后台线程: 1.后台线程的主要作用是负责刷新内存池中的数据,保证缓冲池中的内存缓存的是最近的数据: 2.另外,将以修改的数据文件刷新到磁盘文件: 3.同时,保证在数据库发生异常的情况下,InnoDB能恢复到正常运行状态. 内存池:InnoDB有多个内存块,这些内存块组成了一个大的内存池.这些内存块包括有:缓冲池(innodb_buffer_pool)和日志缓冲(log_buffer)以及额外内存池(innodb_addtional

线程和线程池

首先线程有守护线程和用户线程两种,区别就是用户线程是否保持程序的运行状态.当程序在运行时,必定有一个或以上的线程是用户线程,而当程序结束时,所有守护线程也都将被关闭.使用Thread.setDaemon(ture)可以把线程标记为守护线程,默认线程状态继承自创建它的线程.线程的两种创建方法不多说了. 线程安全一般指的是共享变量被多个线程访问读写造成的数据不一致或者是数据不完整性.一般有如下几种方法可供参考: 1.synchronized方法,提供只能供一个线程访问的类,方法或语句块,控制变量的修

Android的线程和线程池

原文链接,转载请注明出处 http://sparkyuan.me/2016/03/25/Android的线程和线程池/ 在Java中默认情况下一个进程只有一个线程,也就是主线程,其他线程都是子线程,也叫工作线程.Android中的主线程主要处理和界面相关的事情,而子线程则往往用于执行耗时操作.线程的创建和销毁的开销较大,所以如果一个进程要频繁地创建和销毁线程的话,都会采用线程池的方式. Android中线程的形态 传统的Thread AsyncTask HandlerThread IntentS

线程系列04,传递数据给线程,线程命名,线程异常处理,线程池

本篇体验:如何传递数据给线程,如何给线程命名,线程的异常处理,线程池.实在是太基础的部分. □ 传递数据给线程 ※ 使用Lambda表达式 class Program { static void Main(string[] args) { Thread t = new Thread(() => Say("hello", "world")); t.Start(); } static void Say(string msg, string msg1) { Cons