在ThreadPool 类中定义了一个LinkedList 类型的 workQueue 成员变量, 它表示工作队列, 用来存放线程池要执行的任务, 每个任务都是 Runnable 实例. ThreadPool 类的客户程序(利用 ThreadPool 来执行任务的程序) 只要调用 ThreadPool 类的execute(Runnable task) 方法, 就能向线程池提交任务. 在 ThreadPool 类的 execute() 方法中, 先判断线程池是否已经关闭. 如果线程池已经关闭, 就不再接受任务,
负责就把任务加入到工作队列中, 并且呼醒正在等待任务的工作线程.
在 ThreadPool 的构造方法中, 会创建并启动若干工作线程, 工作线程的数目由构造方法的参数 poolSize 决定. WorkThread 类表示工作线程, 它是 ThreadPool 类的内部类. 工作线程从工作队列中取出一个任务, 接着执行该任务, 然后再从工作队列中取出下一个任务并执行它, 如此反复.
工作线程从工作队列中取任务的操作是由 ThreadPool 类的 getTask() 方法实现的, 它的处理逻辑如下:
- 如果队列为空并且线程池已关闭, 那就返回 null, 表示已经没有任务可以执行了;
- 如果队列为空并且线程池没有关闭, 那就在此等待, 直到其他线程将其呼醒或者中断;
- 如果队列中有任务, 就取出第一个任务并将其返回.
线程池的 join() 和 close() 方法都可用来关闭线程池. join() 方法确保在关闭线程之前, 工作线程把队列中的所有任务都执行完. 而 close() 方法则立即清空队列, 并且中断所有的工作线程.
ThreadPool 类是 ThreadGroup 类的子类, ThreadGroup 类表示线程组, 它提供了一些管理线程组中线程的方法. 例如, interrupt() 方法相当于调用线程组中所有活着的线程的 interrupt() 方法. 线程池中的所有工作线程都加入到当前 ThreadPool 对象表示的线程组中. ThreadPool 类在 close() 方法中调用了interrupt() 方法:
/** 关闭线程池 */
public synchronized void close(){
if(!isClosed){
isClosed = true;
workQueue.clear(); //清空工作队列
interrupt(); //中断所有的的工作线程, 该方法继承自 ThreadGroup 类
}
}
以上 interrupt() 方法用于中断所有的工作线程. interrupt() 方法会对工作线程造成以下影响:
- 如果此时一个工作线程正在ThreadPool 的 getTask() 方法中因为执行 wait() 方法而阻塞, 则会抛出 InterruptedException;
- 如果此时一个工作线程正在执行任务, 并且这个任务不会被阻塞, 那么这个工作线程会正常执行完任务, 但是在执行下一轮 while(!isInterrupted()){.....} 循环时, 由于 isInterrupted() 方法返回 true, 因此退出 while 循环.
ThreadPoolTester 类用于测试 ThreadPool 的用法.
ThreadPoolTester 略..............
利用线程池ThreadPool 来完成与客户的通信任务的 EchoServer 类
EchoServer.java(利用线程池 ThreadPool 类)
package multithread2;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.PrintWriter;
import java.NET.ServerSocket;
import java.Net.Socket;
public class EchoServer {
private int port = 8000;
private ServerSocket serverSocket;
private ThreadPool threadPool; //线程池
private final int POOL_SIZE = 4; //单个CPU 时线程池中工作线程的数目
public EchoServer() throws IOException{
serverSocket = new ServerSocket(port);
//创建线程池
//Runtime 的 availablePocessors() 方法返回当前系统的CPU 的数目
//系统的CPU 越多, 线程池中工作线程的数目也越多
threadPool = new ThreadPool(Runtime.getRuntime().availableProcessors()* POOL_SIZE);
System.out.println("服务器启动");
}
public void service(){
while(true){
Socket socket = null;
try{
socket = serverSocket.accept();
threadPool.execute(new Handler(socket)); //把与可以通信的任务交给线程池
}catch(IOException e){
e.printStackTrace();
}
}
}
/**
* @param args
* @throws IOException
*/
public static void main(String[] args) throws IOException {
new EchoServer().service();
}
/** 负责与单个客户通信的任务, 代码与 6.1 的例子相同*/
class Handler implements Runnable{....}
在以上 EchoServer 的 service() 方法中, 每接收到一个客户连接, 就向线程池 ThreadPool 提交一个与客户通信的任务. ThreadPool 把任务加入到工作队列中, 工作线程会在适当的时候从队列中取出这个任务并执行它.
6.3 使用 JDK 类库提供的线程池
java.util.concurrent 包提供现成的线程池的实现, 它比 6.2 节介绍的线程池更加健壮, 而且功能也更强大. 如图3-4 所示是线程池的类框图.
图3-4 JDK 类库中的线程池的类框图
Executor 接口表示线程池, 它的 execute(Runable task) 方法用来执行 Runable 类型的任务. Executor 的子接口 ExecutorService 中声明了管理线程池的一些方法, 比如用于关闭线程池的 shutdown() 方法等. Executors 类中包含一些静态方法, 他们负责生成各种类型的线程池 ExecutorService 实例, 入表 3-1 所示.
表3-1 Executors 类生成的 ExecutorService 实例的静态方法
Executors类的静态方法 |
创建的ExecutorService线程池的类型 |
newCachedThreadPool() |
在有任务时才创建新线程,空闲线程被保留60秒 |
newFixedThreadPool(int nThreads) |
线程池中包含固定数目的线程,空闲线程会一直保留。参数nThreads设定线程池中线程的数目 |
newSingleThreadExecutor() |
线程池中只有一个工作线程,它依次执行每个任务 |
newScheduledThreadPool(int corePoolSize) |
线程池能按时间计划来执行任务,允许用户设定计划执行任务的时间。参数corePoolSize设定线程池中线程的最小数目。当任务较多时,线程池可能会创建更多的工作线程来执行任务 |
newSingleThreadScheduledExecutor() |
线程池中只有一个工作线程,它能按时间计划来执行任务 |
以下是利用上述线程池来负责与客户通信任务的 EchoServer
EchoServer.java( 使用 java.util.concurrent 包中的线程池类)
package multithread3;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.PrintWriter;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class EchoServer {
private int port = 8000;
private ServerSocket serverSocket;
private ExecutorService executoService; //线程池
private final int POOL_SIZE = 4; //单个CPU 时线程池中工作线程的数目
public EchoServer() throws IOException{
serverSocket = new ServerSocket(port);
//创建线程池
//Runtime 的 availablePocessors() 方法返回当前系统的CPU 的数目
//系统的CPU 越多, 线程池中工作线程的数目也越多
executoService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()* POOL_SIZE);
System.out.println("服务器启动");
}
public void service(){
while(true){
Socket socket = null;
try{
socket = serverSocket.accept();
executoService.execute(new Handler(socket)); //把与可以通信的任务交给线程池
}catch(IOException e){
e.printStackTrace();
}
}
}
public static void main(String[] args) throws IOException {
new EchoServer().service();
}
/** 负责与单个客户通信的任务, 代码与 6.1 的例子相同*/
class Handler implements Runnable{..}
在 EchoServer 的构造方法中, 调用 Executors.newFixedThreadPool() 创建了具有固定工作线程数目的线程池. 在EchoServer 的 service() 方法中, 通过调用 executorService.execute() 方法, 把与客户通信的任务交给了 ExecutorService 线程池来执行.
6.4 使用线程池的注意事项
虽然线程池能大大提高服务器的并发性能, 但使用它也会存在一定风险. 与所有多线程应用程序用于, 用线程池构建的应用程序容易产生各种并发问题, 如对共享资源的竞争和死锁. 此外, 如果线程池本身的实现不健壮, 或者没有合理地使用线程池, 还容易导致与线程池有关的死锁、系统资源不足和线程泄露等问题.
1. 死锁
任何多线程应用程序都有死锁风险. 造成死锁的最简单的情形是, 线程 A 持有对象 X 的锁, 并且在等待对象 Y 的锁, 而线程 B 持有对象Y 的锁, 并且在等待对象 X 的锁. 线程 A 和 线程 B 都不释放自己持有的锁, 并且等待对方的锁, 这就导致两个线程永远等待下去, 死锁就这样产生了.
虽然任何多线程程序都有死锁的风险, 但线程池还会导致另外一种死锁. 在这种情况下, 假定线程池中的所有工作线程都在执行各自任务时被阻塞, 他们都在等待某个任务 A 的执行结果. 而任务 A 依然在工作队列中, 由于没有空闲线程, 使得任务 A 一直不能被执行. 这使得线程池中的所有工作线程都永远阻塞下去, 死锁就这样产生了.
2. 系统资源不足
如果线程池中的线程数目非常多, 这些线程会消耗包括内存和其他系统资源在内的大量资源, 从而严重影响系统性能.
3. 并发错误
线程池的工作队列依靠 wait() 和 notify() 方法来使工作线程及时取得任务, 但这两个方法都难于使用. 如果编码不正确, 可能会丢失通知, 导致工作线程一直保持空闲状态, 无视工作队列中需要处理的任务. 因此使用这些方法时, 必须格外小心, 即便是专家也可能在这方面出错. 最好使用现有的、 比较成熟的线程池. 例如, 直接使用java.util.concurrent 包中的线程池类.
4. 线程泄露
使用线程池的一个严重风险是线程泄露. 对于工作线程数目固定的线程池, 如果工作线程在执行任务时抛出 RuntimeException 或 Error, 并且这些异常或错误没有被捕获, 那么这个工作线程就会异常终止, 使得线程池永远失去了一个工作线程. 如果所有的工作线程都异常终止, 线程池就最终变为空, 没有任何可用的工作线程来处理任务.
导致线程泄露的另一种情形是, 工作线程在执行一个任务时被阻塞, 如等待用户的输入数据, 但是由于用户一直不输入数据(可能是因为用户走开了), 导致这个工作线程一直被阻塞. 这样的工作线程名存实亡, 它实际上不执行任何任务了. 假如线程池中所有的工作线程都处于这样的阻塞状态, 那么线程池就无法处理新加入的任务了.
5. 任务过载
当工作队列中有大量排队等待执行的任务时, 这些任务本身可能会消耗太多的系统资源而引起系统资源缺乏.
综上所述, 线程池可能会带来种种风险, 为了尽可能避免他们, 使用线程池时需要遵循以下原则.
⑴ 如果任务 A 在执行过程中需要同步等待任务 B 的执行结果, 那么任务 A 不适合加入到线程池的工作队列中. 如果把像任务 A 一样的需要等待其他任务执行结果的任务加入到工作队列中, 可能会导致线程池的死锁.
⑵ 如果执行某个任务时可能会阻塞,并且是长时间的阻塞, 则应该设定超时时间, 避免工作线程永久的阻塞下去而导致线程泄露. 在服务器程序中, 当线程等待客户连接, 或者等待客户发送的数据时, 都可能会阻塞. 可以通过以下方式设定超时时间:
- 调用 ServerSocket 的 setSoTimeout(int milliseconds)方法, 设定等待客户连接的超时时间, 参见 5.1 节(SO_TIMEOUT 选项);
- 对于每个与客户连接的 Socket, 调用该 Socket 的 setSoTimeou(int milliseconds) 方法, 设定等待客户发送数据的超时时间, 参见本书第二章的 5.3 节(SO_TIMEOUT 选项) .
⑶ 了解任务的特点, 分析任务是执行经常会阻塞的 I/O 操作, 还是执行一直不会阻塞的运算操作. 前者时断时续地占用 CPU , 而后者对 CPU 具有更高的利用率. 预计完成任务大概需要多长时间? 是短时间任务还是长时间任务?
根据任务的特点, 对任务进行分类, 然后把不同类型的任务分别加入到不同线程池的工作队列中, 这样可以根据任务的特点, 分别调整每个线程池.
⑷ 调整线程池的大小. 线程池的最佳大小主要取决于系统的可用 CPU 的数目, 以及工作队列中任务的特点. 假如在一个具有N个CPU 的系统上只有一个工作队列, 并且其中全部是运算性质(不会阻塞) 的任务, 那么当线程池具有 N 或 N+1 个工作线程时, 一般会获得最大的 CPU 利用率.
如果工作队列中包含会执行 I/O 操作并常常阻塞的任务, 则要让线程池的大小超过可用的CPU 的数目, 因为并不是所有工作线程都一直在工作. 选择一个典型的任务, 然后估计在执行这个任务的过程中, 等待时间( WT ) 与实际占用 CPU 进行运算的时间( ST ) 之间的比例 WT/ST. 对于一个具有 N 个 CPU 的系统, 需要设置大约 N * (1+WT/ST) 个线程来保证 CPU 得到充分利用.
当然, CPU 利用率不是调整线程池大小过程中唯一要考虑的事项. 随着线程池中工作线程数目的增长, 还会碰到内存或者其他资源的限制, 如套接字, 打开的文件句柄或数据库连接数目等.
要保证多线程消耗的系统资源在系统的承载范围之内.
⑸ 避免任务过载. 服务器应根据系统的承载能力, 限制客户并发连接的数目. 当客户并发连接的数目超过了限制值, 服务器可以拒绝连接请求, 并友好地告知客户: 服务器正忙, 请稍后再试.
七. 关闭服务器
前面介绍的 EchoServer 服务器都无法关闭自身, 只有依靠操作系统来强行终止服务器程序. 这种强行终止服务器程序的方式尽管简单方便,
但是会导致服务器中正在执行的任务被突然中断. 如果服务器处理的任务不是非常重要, 允许随时中断, 则可以依靠操作系统来强行终止服务器程序; 如果服务器处理的任务非常重要, 不允许被突然中断, 则应该由服务器自身在恰当的时刻关闭自己.
本节介绍的 EchoServer 服务器就具有关闭自己的功能. 它除了在 8000 端口监听普通客户程序 EchoClient 的连接外, 还会在 8001 端口监听管理程序 AdminClient 的连接. 当 EchoServer 服务器在8001 端口接收到了 AdminClient 发送的 "shutdown" 命令时, EchoServer 就会开始关闭服务器, 它不会再接收任何新的 EchoClient 进程的连接请求, 对于那些已经接收但是还没有处理的客户连接, 则会丢弃与该客户的通信任务, 而不会把通信任务加入到线程池的工作队列中.
另外, EchoServer 会等到线程池把当前工作队列中的所有任务执行完, 才结束程序.
下面是具有关闭服务器功能的 EchoServer 的源代码, 其中关闭服务器的任务是由 shutdown-thread 线程来负责的.
EchoServer.java (具有关闭服务器的功能)
package multithread4;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.PrintWriter;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
public class EchoServer {
private int port = 8000;
private ServerSocket serverSocket;
private ExecutorService executorService;
private final int POOL_SIZE = 4;
private int portForShutdown = 8001; //用于监听关闭服务器命令的端口
private ServerSocket serverSocketForShutdown;
private boolean isShutdown = false; //服务器是否关闭标志
private Thread shutdownThread = new Thread() { //负责关闭服务器的进程
public void start() {
this.setDaemon(true); //设置为守护进程(也称为后台进程)
super.start();
}
public void run() {
while (!isShutdown) {
Socket socketForShutdown = null;
try {
socketForShutdown = serverSocketForShutdown.accept();
BufferedReader br = new BufferedReader(
new InputStreamReader(socketForShutdown
.getInputStream()));
String command = br.readLine();
if (command != null && command.equalsIgnoreCase("shutdown")) {
long beginTime = System.currentTimeMillis();
socketForShutdown.getOutputStream().write(
"服务器正在关闭/r/n".getBytes());
isShutdown = true;
//请求关闭线程池
//线程池不再接收新的任务, 但是会继续执行完工作队列中现有的任务
executorService.shutdown();
//等待关闭线程池, 每次等待的超时时间为 30 秒
while (!executorService.isTerminated())
executorService.awaitTermination(30, TimeUnit.SECONDS);
serverSocket.close(); //关闭与EchoClient 客户通信的 ServerSocket
long endTime = System.currentTimeMillis();
socketForShutdown.getOutputStream().write(("服务器已经关闭,关闭服务器所用的时间:"
+ (endTime - beginTime) + "毫秒/r/n").getBytes());
socketForShutdown.close();
serverSocketForShutdown.close();
}else{
//接到其他命令的处理
socketForShutdown.getOutputStream().write("错误的命令/r/n".getBytes());
socketForShutdown.close();
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
};
public EchoServer() throws IOException{
serverSocket = new ServerSocket(port);
serverSocket.setSoTimeout(60000); //设定等待客户连接的超时时间为 60 秒
serverSocketForShutdown = new ServerSocket(portForShutdown); //启动关闭服务器的服务, 监听 8001 端口
shutdownThread.start(); //启动负责关闭服务器的线程
//创建线程池
executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * POOL_SIZE);
System.out.println("服务器启动");
}
public void service() {
while (!isShutdown) {
Socket socket = null;
try {
socket = serverSocket.accept();
//可能会抛出 SocketTimeoutExcepiton 和 SocketException
socket.setSoTimeout(60000); //把等待客户发送数据的超时时间设为 60 秒
//如果线程池被标示为停止 或者 任务为null, 执行execute()方法会抛出 RejectedException,
//有兴趣的可以看看ThreadPoolExecutor 的execute() 和 shutdown()方法
executorService.execute(new Handler(socket));
} catch (SocketTimeoutException e) {
//不必处理等待客户连接时出现的超时异常
} catch (RejectedExecutionException e) {
//这个是线程池被标示为停止后, 执行executorService.execute() 抛出的异常
try{
if(socket != null) socket.close();
}catch(IOException xe){}
return;
}catch (SocketException e) {
//serverSocket 被 ShutdownThread 线程关闭后,
//在执行 serverSocket.accept() 方法时, 将会抛出SocketException,
//如果确实是这个原因导致的异常, 退出 service() 方法
//作者是写 socket closed, 其实应该是 Socket is closed
if(e.getMessage().indexOf("Socket is closed") != -1) return;
}catch (IOException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) throws IOException {
new EchoServer().service();
}
}
/** 负责与单个客户通信的任务, 代码与 6.1 的例子相同 */
class Handler implements Runnable {....}
shutdownThread 线程负责关闭服务器. 它一直监听 8001 端口, 如果接收到了 Adminclient 发送的"shutdown" 命令, 就把 isShutdown 变量设为 true. shutdownThread 线程接着执行 executorService.shutdown() 方法, 该方法请求关闭线程池 线程池将不再接收新任务, 但是会继续执行完工作队列中现有的任务. shutdownThread 线程接着等待线程池关闭:
while (!executorService.isTerminated())
executorService.awaitTermination(30,TimeUnit.SECONDS);
当线程池的工作队列中的所有任务执行完毕, executorService.isTerminated() 方法就会返回 true.
shutdownThread 线程接着关闭监听 8000 端口的 ServerSocket, 最后再关闭监听 8001 端口的 ServerSocket.
shutdownThread 线程在执行上述代码时, 主线程正在执行 EchoServer 的 service() 方法. shutdownThread 线程一系列操作会对主线程造成以下影响:
- 如果 shutdownThread 线程已经把 isShutdown 变量设为 true, 而主线程正准备执行 service() 方法的下一轮 while(!isShutdown){...} 循环时, 由于 isShutdwon 变量为 true, 就会退出循环.
- 如果 shutdownThread 线程已经执行了监听 8000 端口的 serverSocket 的 close() 方法, 而主线程正在执行该 ServerSocket 的 accept() 方法, 那么该方法会抛出 SocketException. EchoServer 的 service() 方法捕获了该异常, 在异常处理代码块中退出了 service() 方法.
- 如果 shutdownThread 线程已经执行了 executorService.shutdown() 方法, 而主线程正在执行 executorService.execute() 方法, 那么该方法会抛出 RejectedExecutionException. EchoServer 的 service() 方法捕获了该异常, 在异常处理代码块中退出 service() 方法.
- 如果 shutdownThread 线程已经把 isShutdown 变量设为 true, 但还没有调用监听 8000 端口的 serverSocket 的 close() 方法, 而主线程正在执行 serverSocket 的 accept() 方法, 主线程阻塞 60 秒后会抛出 SocketTimeoutException. 在准备执行 service() 方法的下一轮 while(!isShutdown){...} 循环时, 由于 isShutdown 变量为 true, 就会退出循环.
- 由此可见, 当 shutdownThread 线程开始执行关闭服务器的操作时, 主线程尽管不会立即终止, 但是迟早会结束运行.
下面是 AdminClient 的源代码, 它负责向 EchoServer 发送 "shutdown" 命令, 从而关闭 EchoServer.
AdminClient.java
package multithread4;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.net.Socket;
public class AdminClient {
public static void main(String[] args) {
Socket socket = null;
try {
socket = new Socket("localhost", 8001);
//发送关闭命令
OutputStream socketOut = socket.getOutputStream();
socketOut.write("shutdown/r/n".getBytes());
//接收服务器的反馈
BufferedReader br = new BufferedReader(new InputStreamReader(socket
.getInputStream()));
String msg = null;
while ((msg = br.readLine()) != null) {
System.out.println(msg);
}
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
if (socket != null)
socket.close(); // 断开连接
} catch (IOException e) {
}
}
}
}
下面按照以下方式运行 EchoServer, EchoClient 和 AdminClient, 以观察 EchoServer 服务器的关闭过程.
⑴ 先运行 EchoServer , 然后运行 AdminClient. EchoServer 与 AdminClient 进程都结束运行, 并且在 AdminClient 的控制台打印如下结果:
服务器正在关闭
服务器已经关闭,关闭服务器所用的时间:0毫秒
⑵ 先运行 EchoServer, 再运行 EchoClient, 然后再运行 AdminClient. EchoServer 程序不会立即结束, 因为它与 EchoClient 的通信任务还没有结束. 在 EchoClient 的控制台中输入 "bye" , EchoServer, EchoClient 和 AdminClient 进程都会结束运行.
⑶ 先运行 EchoServer, 再运行 EchoClient , 然后再运行 AdminClient. EchoServe 程序不会立即结束, 因为它与 EchoClient 的通信任务还没有结束. 不要在 EchoClient 的控制台输入任何字符串, 过 60 秒后, EchoServer 等待 EchoClient 的发送数据超时, 结束与 EchoClient 的通信任务, EchoServer 和 AdminClient 进程结束运行. 如果在 EchoClient 的控制台再输入字符串,
则会抛出 "连接已断开" 的 SocketException.(最后一句有问题, EchoClient 是不会抛出 SocketException 异常的)