JAVA并发编程8_线程池的使用

不使用线程池

1.串行执行任务

class SingleThreadWebServer {
	public static void main(String[] args) {
		ServerSocket socket = new ServerSocket(80);
		while (true) {
			Socket conn = socket.accept();
			handleRequest(conn);
		}
	}
}

需要等待一个连接处理完成再处理下一个用户请求,无法提供高的吞吐量和快速的响应速度,cpu利用率低。

2.为每个请求启动一个新线程

class ThreadPerTaskWebServer {
	public static void main(String[] args) {
		ServerSocket socket = new ServerSocket(80);
		while (true) {
			final Socket conn = socket.accept();
			Runnable runnable = new Runnable() {
				@Override
				public void run() {
					handleRequest(conn);
				}
			};
			new Thread(runnable).start();
		}
	}
}

为每个用户请求开启新线程,将任务的处理从主线程中分离出来,使得任务可以并行处理。提升了串行执行的性能和响应速度。

不足:

线程生命周期的开销非常高。线程的创建和销毁都是有代价的,不同平台开销也不同。

资源消耗。太多线程会消耗系统资源,如空闲线程的内存占用,大量线程竞争CPU时产生其他性能开销等。

稳定性。可创建线程数会受到限制。

Executor框架

任务是一组逻辑工作单元,而线程则是使任务异步执行的机制。前面两种方式都存在一些严格的限制:串行执行的问题在于糟糕的响应性和吞吐量,而“为每个任务分配一个线程”的问题在于资源管理的复杂性。

java.util.concurrent包提供了灵活的线程池实现作为Executor框架的一部分

public interface Executor {
    void execute(Runnable command);
}

Executor是个简单的接口,但却为灵活且强大的异步任务执行框架提供了基础,该框架能支持多种不同类型的任务执行策略。它提供了一种标准的方法将任务的提交过程和执行过程解耦开来,并用Runnable来表示任务。Executor的实现还提供了对生命周期的支持,以及统计信息、应用管理机制和性能监视等机制。

基于线程池的Web服务器

public class TaskExecutionWebServer {
    private static final int NTHREADS = 100;
    private static final Executor exec =
            Executors.newFixedThreadPool(NTHREADS);
    public static void main(String[] args) throws IOException {
        ServerSocket server = new ServerSocket(80);
        while (true){
            final Socket connection = server.accept();
            Runnable task = new Runnable() {
                @Override
                public void run() {
                    handlerRequest(connection);
                }
            };
            exec.execute(task);
        }
    }
}

利用Executor提供的框架,我们可以修改TaskExecutionWebServer为类似于SingleThreadWebServer的单线程(串行执行)行为。

public class SingleThreadExecutor implements Executor{

	@Override
	public void execute(Runnable command) {
		command.run();
	}
}

同样的,我们也可以修改TaskExecutionWebServer为类似于ThreadPerTaskWebServer的多线程(为每一个请求启动一个线程)行为。

public class ThreadPerTaskExecutor implements Executor{
	@Override
	public void execute(Runnable command) {
		new Thread(command).start();
	}
}

可以通过Executor中的静态工厂方法创建线程池

创建固定长度的线程池,每当提交一个任务时就创建一个线程,直到达到线程池的最大数量,若有线程发生异常,则会重新创建

public static ExecutorServicenewFixedThreadPool(int nThreads) {...}

创建单个线程来执行任务,若该线程发生异常,会创建一个新的线程。该池可按顺序执行队列中的任务(如FIFO,LIFO,优先级等)

public static ExecutorServicenewSingleThreadExecutor() {...}

该线程池无长度限制,在线程过多时会回收,过少时会创建

public static ExecutorServicenewCachedThreadPool() {...}

创建一个固定长度的线程池,并以延迟或定时的方式执行任务

public static ScheduledExecutorServicenewScheduledThreadPool{...}

Executor是最顶层的接口

ExecutorService继承自Executor,添加了一些用语生命周期管理的方法和一些任务提交的方法。

AbstractExecutorService实现了ExecutorService接口,ThreadPollExecutor继承自AbstractExecutorService。

Callable和Future

使用Runnable有很大的局限性,不能返回一个值或抛出一个受检查的异常。这时Callable就可以胜任。使用Callable可以返货线程运行的结果,这个结果在Fuuture里面通过get方法获得。

Future表示一个任务的生命周期,并提供了相应的方法判断是否已经完成或取消以及获取任务的结果和取消任务。get方法的行为取决于任务的状态,如果任务已经完成,那么get立即返回或抛出Exception。如果没有完成,get将阻塞并直到任务完成。若果任务抛出了异常那么get也会抛出异常。

public interface Callable<V> {
    V call() throws Exception;
}

public interface Future<V> {
    boolean cancel(boolean mayInterruptIfRunning);
    boolean isCancelled();
    boolean isDone();
    V get() throws InterruptedException, ExecutionException;
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

可以通过多种方法创建一个Future来描述任务。ExecutorService里面的所有submit方法都将返回一个Future,从而讲一个Runnable或Callable提交给Executor,并得到一个Future用于获得任务执行结果或取消任务。

class MyCallable implements Callable<String> {

	@Override
	public String call() {

		try {
			TimeUnit.SECONDS.sleep(2);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		return "haha-callable";
	}

}

public class TestExecutor {
	public static void main(String[] args) {
		List<Future<String> > futureList = new ArrayList<Future<String>>();
		ExecutorService exec = Executors.newCachedThreadPool();
		for (int i = 0; i < 5; i++) {
			futureList.add(exec.submit(new MyCallable()));
		}

		for (Future<String> future : futureList) {
			try {
				System.out.println(future.get());
			} catch (InterruptedException | ExecutionException e) {
				e.printStackTrace();
			}
		}
	}
}

输出

haha-callable
haha-callable
haha-callable
haha-callable
haha-callable

还可以显式的为某个指定的Runnable或Callable实例化一个FutureTask,FutureTask实现了Runnable,因此可以交给Thread或者Executor来执行。

class MyCallable implements Callable<String> {

	@Override
	public String call() {

		try {
			Random r = new Random();
			TimeUnit.SECONDS.sleep(r.nextInt(5));
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		return "haha-callable";
	}
}

public class TestExecutor {
	public static void main(String[] args) {
		List<Future<String> > futureList = new ArrayList<Future<String>>();
		ExecutorService exec = Executors.newCachedThreadPool();

		for (int i = 0; i < 5; i++) {
			FutureTask<String> future = new FutureTask<String>(new MyCallable());
			futureList.add(future);
			new Thread(future).start();
		}

		for (Future<String> future : futureList) {
			try {
				System.out.println(future.get());
			} catch (InterruptedException | ExecutionException e) {
				e.printStackTrace();
			}
		}
	}
}

如同上面的例子,向Executor提交了一组任务,并且希望计算完成后获得结果。可以按照上面的方法保留每个任务相关联的Future,然后在for循环里面使用get方法,存在的问题是get方法会阻塞等待返回结果。因此会按照任务添加进去的顺序等待计算的结果。

比如如果添加进去的任务第一个任务耗时5s,第二个4s…第五个1s,输出这五个任务的计算结果却不能按照先获取先执行完的任务的值,因为get方法会阻塞直到任务完成。

class MyCallable implements Callable<String> {

	private int id;
	public MyCallable(int id) {
		this.id = id;
	}

	@Override
	public String call() {
		try {
			TimeUnit.SECONDS.sleep(5-id);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		return id + " -- haha-callable";
	}
}

public class TestExecutor {
	public static void main(String[] args) {
		List<Future<String> > futureList = new ArrayList<Future<String>>();
		ExecutorService exec = Executors.newCachedThreadPool();
		for (int i = 0; i < 5; i++) {
			futureList.add(exec.submit(new MyCallable(i)));
		}
		for (Future<String> future : futureList) {
			try {
				System.out.println(future.get());
			} catch (InterruptedException | ExecutionException e) {
				e.printStackTrace();
			}
		}
	}
}

5S钟后会马上打印,并且不是先完成的先打印

0 -- haha-callable
1 -- haha-callable
2 -- haha-callable
3 -- haha-callable
4 -- haha-callable

事实上可以按照上面的方法保留每个任务相关联的Future,然后在for循环里面反复使用get方法,同时将timeout设置为0,或者判断当前任务isDone再get,通过轮询的方式判断任务是否完成。这种方法孙然可行但是却非常繁琐。

class MyCallable implements Callable<String> {
	SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
	private int id;
	public MyCallable(int id) {
		this.id = id;
	}

	@Override
	public String call() {
		try {
			TimeUnit.SECONDS.sleep(5-id);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		return df.format(new Date()) +" " + id + " -- haha-callable";
	}
}

public class TestExecutor {
	public static void main(String[] args) {
		List<Future<String> > futureList = new ArrayList<Future<String>>();
		List<Future<String> > finishedFutureList = new ArrayList<Future<String>>();
		ExecutorService exec = Executors.newCachedThreadPool();
		for (int i = 0; i < 5; i++) {
			futureList.add(exec.submit(new MyCallable(i)));
		}

		while (finishedFutureList.size()<5) {
			for (Future<String> future : futureList) {
				if (finishedFutureList.contains(future))
					continue;

				try {
					if (future.isDone()) {
						System.out.println(future.get(0,TimeUnit.SECONDS));
						finishedFutureList.add(future);
					}
				} catch (InterruptedException | ExecutionException | TimeoutException e) {
					e.printStackTrace();
				}
			}
		}

	}
}

每隔一秒输出,并且先完成的任务先打印

2015-08-24 10:12:08 4 -- haha-callable
2015-08-24 10:12:09 3 -- haha-callable
2015-08-24 10:12:10 2 -- haha-callable
2015-08-24 10:12:11 1 -- haha-callable
2015-08-24 10:12:12 0 -- haha-callable

通过轮询的方式可以将先完成的任务结果得到。但是比较麻烦。另一种更好的方式是完成服务(CompletetionService)

class MyCallable implements Callable<String> {
	SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
	private int id;

	public MyCallable(int id) {
		this.id = id;
	}

	@Override
	public String call() {
		try {
			TimeUnit.SECONDS.sleep(5 - id);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		return df.format(new Date()) + " " + id + " -- haha-callable";
	}
}

public class TestExecutor {
	public static void main(String[] args) {
		ExecutorService exec = Executors.newCachedThreadPool();
		CompletionService<String> completionService = new ExecutorCompletionService<String>(
				exec);
		for (int i = 0; i < 5; i++) {
			completionService.submit(new MyCallable(i));
		}

		 for (int j = 0; j < 5; j++) {
			try {
				Future<String> f = completionService.take();
				System.out.println(f.get());
			} catch (InterruptedException | ExecutionException e) {
				e.printStackTrace();
			}
		}
	}
}

每隔一秒输出,并且先完成的任务先打印

2015-08-24 10:29:53 4 -- haha-callable
2015-08-24 10:29:54 3 -- haha-callable
2015-08-24 10:29:55 2 -- haha-callable
2015-08-24 10:29:56 1 -- haha-callable
2015-08-24 10:29:57 0 -- haha-callable

CompletetionService通过take取出已经完成的任务,被封装成Future。

时间: 2024-10-16 15:42:32

JAVA并发编程8_线程池的使用的相关文章

【转】Java并发编程:线程池的使用

Java并发编程:线程池的使用 在前面的文章中,我们使用线程的时候就去创建一个线程,这样实现起来非常简便,但是就会有一个问题: 如果并发的线程数量很多,并且每个线程都是执行一个时间很短的任务就结束了,这样频繁创建线程就会大大降低系统的效率,因为频繁创建线程和销毁线程需要时间. 那么有没有一种办法使得线程可以复用,就是执行完一个任务,并不被销毁,而是可以继续执行其他的任务? 在Java中可以通过线程池来达到这样的效果.今天我们就来详细讲解一下Java的线程池,首先我们从最核心的ThreadPool

Java并发编程:线程池的使用(转)

Java并发编程:线程池的使用 在前面的文章中,我们使用线程的时候就去创建一个线程,这样实现起来非常简便,但是就会有一个问题: 如果并发的线程数量很多,并且每个线程都是执行一个时间很短的任务就结束了,这样频繁创建线程就会大大降低系统的效率,因为频繁创建线程和销毁线程需要时间. 那么有没有一种办法使得线程可以复用,就是执行完一个任务,并不被销毁,而是可以继续执行其他的任务? 在Java中可以通过线程池来达到这样的效果.今天我们就来详细讲解一下Java的线程池,首先我们从最核心的ThreadPool

Java并发编程之线程池

一.概述 在执行并发任务时,我们可以把任务传递给一个线程池,来替代为每个并发执行的任务都启动一个新的线程,只要池里有空闲的线程,任务就会分配一个线程执行.在线程池的内部,任务被插入一个阻塞队列(BlockingQueue),线程池里的线程会去取这个队列里的任务. 利用线程池有三个好处: 降低资源消耗.通过重复利用已创建的线程降低线程创建和销毁造成的消耗 提高响应速度.当任务到达时,任务可以不需要的等到线程创建就能立即执行 提高线程的可管理性.线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,

Java并发编程:线程池的使用

在前面的文章中,我们使用线程的时候就去创建一个线程,这样实现起来非常简便,但是就会有一个问题: 如果并发的线程数量很多,并且每个线程都是执行一个时间很短的任务就结束了,这样频繁创建线程就会大大降低系统的效率,因为频繁创建线程和销毁线程需要时间. 那么有没有一种办法使得线程可以复用,就是执行完一个任务,并不被销毁,而是可以继续执行其他的任务? 在Java中可以通过线程池来达到这样的效果.今天我们就来详细讲解一下Java的线程池,首先我们从最核心的ThreadPoolExecutor类中的方法讲起,

Java 并发编程之线程池的使用 (二)

设置线程池的大小 如果线程池过大,那么可能会耗尽资源 ,如果过小,那么 将导致许多空闲的处理器无法工作,从而降低吞吐率. 要设置正确的线程池大小,需要分析计算环境,资源预算和任务的特性,cpu数量,内存大小,任务是计算密集型还是I/O密集型,还是二者皆可.它们是否需要像JDBC连接这样的稀缺资源,下面给出一个计算公式 N(threads)=N(cpu)*U(cpu)*(1+w/c); N(threads)是最后得到的结果大小 . N(cpu)是cpu数量,我的电脑是双核四线程,cpu的数量会是4

Java 并发编程之线程池的使用

在任务与执行策略之间的隐性耦合 Executor框架可以将任务的提交与任务的执行策略解耦开来(就是独立化).虽然Executor框架为制定和修改执行策略都提供了相当大的灵活性,但并非所有的任务都能适用所有的执行策略 比如: 依赖性任务 比如依赖于执行时序,执行结果或者其他效果,那么任务就带有隐含的依赖性.此时必须小心 地维持这些执行策略以避免产生活跃性问题(死锁等造成执行困难的问题) 使用线程封闭机制的任务 与线程池相比,单线程的Executor能够对并发性做出更强的承诺,它们能确保任务不会并发

Java并发编程中线程池源码分析及使用

当Java处理高并发的时候,线程数量特别的多的时候,而且每个线程都是执行很短的时间就结束了,频繁创建线程和销毁线程需要占用很多系统的资源和时间,会降低系统的工作效率. 参考http://www.cnblogs.com/dolphin0520/p/3932921.html 由于原文作者使用的API 是1.6 版本的,参考他的文章,做了一些修改成 jdk 1.8版本的方法,涉及到的内容比较多,可能有少许错误. API : jdk1.8.0_144 ThreadPoolExecutor类 Java中线

Java并发编程:线程池

一.为什么使用线程池 使用线程的时候直接就去创建一个线程,这样实现起来非常简便,但是就会有一个问题: 如果并发的线程数量很多,并且每个线程都是执行一个时间很短的任务就结束了,这样频繁创建线程就会大大降低系统的效率,因为频繁创建线程和销毁线程需要时间. 通过使用线程池可以达到这样的效果:空闲下来的线程可以复用,就是执行完一个任务,并不被销毁,而是可以继续执行其他的任务. 二.Java中的ThreadPoolExecutor类 首先我们从最核心的ThreadPoolExecutor类中的方法讲起,然

Java 并发编程之线程池的使用 (三)

线程工厂 每当线程池需要创建一个线程时,都是通过线程工厂方法来完善的.默认的线程工厂方法将创建一个新的.非守护的线程,并且不包含特殊的配置信息,通过指定一个线程工厂方法,可以线程池的配置信息. 需要定制线程工厂方法的情景 : 需要为线程池里面的线程指定 个UncaughtExceptionHandler 实例化一个定制的Thread类执行调试信息的记录 需要修改线程的优先级或者守护线程的状态(这建设使用这两个功能,线程优先级会增加平台依赖性,并且导致活跃性问题,在大多数并发应用程序中,都可以使用