Java线程与并发编程实践----并发工具类与Executor框架

java5之前,我们使用诸如synchronized,wait(),notify()方法对线程的操作属于对

底层线程的操作,这样会出现很多的问题:

  • 低级的并发原语,比如synchronized,wait(),notify()经常难以正确使用。误用会导致

竞态条件,线程饿死,死锁等风险。

  • 泰国依赖synchronized会影响程序性能以及程序的可扩展性
  • 开发者经常需要高级线程结构,如线程池,信号量。java对底层线程的操作不包含这些结。

为解决这些问题,java5引入并发工具类,该工具类主要有下面这些包构成:

并发工具类可以被分为executor、同步器(synchronized)、以及锁框架。

下面主要详解executor框架:

----------------------------------------------------------------------------------

Executor框架简介

在Java 5之后,并发编程引入了一堆新的启动、调度和管理线程的API。Executor框架便是Java 5中引入的,其内部使

用了线程池机制,它在java.util.cocurrent 包下,通过该框架来控制线程的启动、执行和关闭,可以简化并发编程的操作。

因此,在Java 5之后,通过Executor来启动线程比使用Thread的start方法更好,除了更易管理,效率更好(用线程池实现,

节约开销)外,还有关键的一点:有助于避免this逃逸问题——如果我们在构造器中启动一个线程,因为另一个任务可能会

在构造器结束之前开始执行,此时可能会访问到初始化了一半的对象用Executor在构造器中。

Executor框架包括:线程池,Executor,Executors,ExecutorService,CompletionService,Future,Callable等。

Executor接口中之定义了一个方法execute(Runnable command),该方法接收一个Runable实例,它用来执行一个

任务,任务即一个实现了Runnable接口的类。ExecutorService接口继承自Executor接口,它提供了更丰富的实现多线程

的方法,比如,ExecutorService提供了关闭自己的方法,以及可为跟踪一个或多个异步任务执行状况而生成 Future 的方

法。 可以调用ExecutorService的shutdown()方法来平滑地关闭 ExecutorService,调用该方法后,将导致ExecutorService

停止接受任何新的任务且等待已经提交的任务执行完成(已经提交的任务会分两类:一类是已经在执行的,另一类是还没

有开始执行的),当所有已经提交的任务执行完毕后将会关闭ExecutorService。因此我们一般用该接口来实现和管理多线程。

ExecutorService的生命周期包括三种状态:运行、关闭、终止。创建后便进入运行状态,当调用了shutdown()方法时,

便进入关闭状态,此时意味着ExecutorService不再接受新的任务,但它还在执行已经提交了的任务,当素有已经提交了的

任务执行完后,便到达终止状态。如果不调用shutdown()方法,ExecutorService会一直处在运行状态,不断接收新

的任务,执行新的任务,服务器端一般不需要关闭它,保持一直运行即可。

Executors提供了一系列工厂方法用于创先线程池,返回的线程池都实现了ExecutorService接口。

public static ExecutorService newFixedThreadPool(int nThreads)

创建固定数目线程的线程池。

public static ExecutorService newCachedThreadPool()

创建一个可缓存的线程池,调用execute将重用以前构造的线程(如果线程可用)。如果现有线程没有可用的,

则创建一个新线   程并添加到池中。终止并从缓存中移除那些已有 60 秒钟未被使用的线程。

public static ExecutorService newSingleThreadExecutor()

创建一个单线程化的Executor。

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)

创建一个支持定时及周期性的任务执行的线程池,多数情况下可用来替代Timer类。

这四种方法都是用的Executors中的ThreadFactory建立的线程,下面就以上四个方法做个比较


newCachedThreadPool()                                                                                                                                         
-缓存型池子,先查看池中有没有以前建立的线程,如果有,就 reuse.如果没有,就建一个新的线程加入池中
-缓存型池子通常用于执行一些生存期很短的异步型任务
 因此在一些面向连接的daemon型SERVER中用得不多。但对于生存期短的异步任务,它是Executor的首选。
-能reuse的线程,必须是timeout IDLE内的池中线程,缺省     timeout是60s,超过这个IDLE时长,线程实例将被终止及移出池。
  注意,放入CachedThreadPool的线程不必担心其结束,超过TIMEOUT不活动,其会自动被终止。


newFixedThreadPool(int)                                                      
-newFixedThreadPool与cacheThreadPool差不多,也是能reuse就用,但不能随时建新的线程
-其独特之处:任意时间点,最多只能有固定数目的活动线程存在,此时如果有新的线程要建立,只能放在另外的队列中等待直到当前的线程中某个线程终止直接被移出池子
-和cacheThreadPool不同,FixedThreadPool没有IDLE机制(可能也有,但既然文档没提,肯定非常长,类似依赖上层的TCP或UDP IDLE机制之类的),所以FixedThreadPool多数针对一些很稳定很固定的正规并发线程,多用于服务器
-从方法的源代码看,cache池和fixed 池调用的是同一个底层 池,只不过参数不同:
fixed池线程数固定,并且是0秒IDLE(无IDLE)    
cache池线程数支持0-Integer.MAX_VALUE(显然完全没考虑主机的资源承受能力),60秒IDLE

newScheduledThreadPool(int) -调度型线程池
-这个池子里的线程可以按schedule依次delay执行,或周期执行

SingleThreadExecutor() -单例线程,任意时间池中只能有一个线程
-用的是和cache池和fixed池相同的底层池,但线程数目是1-1,0秒IDLE(无IDLE)

一般来说,CachedTheadPool在程序执行过程中通常会创建与所需数量相同的线程,然后在它回收旧线程时

停止创建新线程,因此它是合理的Executor的首选,只有当这种方式会引发问题时(比如需要大量长时间面向连

接的线程时),才需要考虑用FixedThreadPool。(该段话摘自《Thinking in Java》第四版)

Executor执行Runnable任务

通过Executors的以上四个静态工厂方法获得 ExecutorService实例,而后调用该实例的execute

(Runnable command)方法即可。一旦Runnable任务传递到execute()方法,该方法便会自动在一

个线程上执行。下面是是Executor执行Runnable任务的示例代码:

[java] view plain copy

  1. import java.util.concurrent.ExecutorService;
  2. import java.util.concurrent.Executors;
  3. public class TestCachedThreadPool{
  4. public static void main(String[] args){
  5. ExecutorService executorService = Executors.newCachedThreadPool();
  6. //      ExecutorService executorService = Executors.newFixedThreadPool(5);
  7. //      ExecutorService executorService = Executors.newSingleThreadExecutor();
  8. for (int i = 0; i < 5; i++){
  9. executorService.execute(new TestRunnable());
  10. System.out.println("************* a" + i + " *************");
  11. }
  12. executorService.shutdown();
  13. }
  14. }
  15. class TestRunnable implements Runnable{
  16. public void run(){
  17. System.out.println(Thread.currentThread().getName() + "线程被调用了。");
  18. }
  19. }

某次执行后的结果如下:

从结果中可以看出,pool-1-thread-1和pool-1-thread-2均被调用了两次,这是随机的,execute会首先在

线程池中选择一个已有空闲线程来执行任务,如果线程池中没有空闲线程,它便会创建一个新的线程来执行任务。

Executor执行Callable任务

在Java 5之后,任务分两类:一类是实现了Runnable接口的类,一类是实现了Callable接口的类。两者都可

以被ExecutorService执行,但是Runnable任务没有返回值,而Callable任务有返回值。并且Callable的call()

方法只能通过ExecutorService的submit(Callable<T> task) 方法来执行,并且返回一个 <T>Future<T>,

是表示任务等待完成的 Future。

Callable接口类似于Runnable,两者都是为那些其实例可能被另一个线程执行的类设计的。但是 Runnable

不会返回结果,并且无法抛出经过检查的异常而Callable又返回结果,而且当获取返回结果时可能会抛出异常。

Callable中的call()方法类似Runnable的run()方法,区别同样是有返回值,后者没有。

当将一个Callable的对象传递给ExecutorService的submit方法,则该call方法自动在一个线程上执行,并且

会返回执行结果Future对象。同样,将Runnable的对象传递给ExecutorService的submit方法,则该run方法

自动在一个线程上执行,并且会返回执行结果Future对象,但是在该Future对象上调用get方法,将返回null。

下面给出一个Executor执行Callable任务的示例代码:

[java] view plain copy

  1. import java.util.ArrayList;
  2. import java.util.List;
  3. import java.util.concurrent.*;
  4. public class CallableDemo{
  5. public static void main(String[] args){
  6. ExecutorService executorService = Executors.newCachedThreadPool();
  7. List<Future<String>> resultList = new ArrayList<Future<String>>();
  8. //创建10个任务并执行
  9. for (int i = 0; i < 10; i++){
  10. //使用ExecutorService执行Callable类型的任务,并将结果保存在future变量中
  11. Future<String> future = executorService.submit(new TaskWithResult(i));
  12. //将任务执行结果存储到List中
  13. resultList.add(future);
  14. }
  15. //遍历任务的结果
  16. for (Future<String> fs : resultList){
  17. try{
  18. while(!fs.isDone);//Future返回如果没有完成,则一直循环等待,直到Future返回完成
  19. System.out.println(fs.get());     //打印各个线程(任务)执行的结果
  20. }catch(InterruptedException e){
  21. e.printStackTrace();
  22. }catch(ExecutionException e){
  23. e.printStackTrace();
  24. }finally{
  25. //启动一次顺序关闭,执行以前提交的任务,但不接受新任务
  26. executorService.shutdown();
  27. }
  28. }
  29. }
  30. }
  31. class TaskWithResult implements Callable<String>{
  32. private int id;
  33. public TaskWithResult(int id){
  34. this.id = id;
  35. }
  36. /**
  37. * 任务的具体过程,一旦任务传给ExecutorService的submit方法,
  38. * 则该方法自动在一个线程上执行
  39. */
  40. public String call() throws Exception {
  41. System.out.println("call()方法被自动调用!!!    " + Thread.currentThread().getName());
  42. //该返回结果将被Future的get方法得到
  43. return "call()方法被自动调用,任务返回的结果是:" + id + "    " + Thread.currentThread().getName();
  44. }
  45. }

某次执行结果如下:

从结果中可以同样可以看出,submit也是首先选择空闲线程来执行任务,如果没有,才会创建新的线程来执行任务。

另外,需要注意:如果Future的返回尚未完成,则get()方法会阻塞等待,直到Future完成返回,可以通过调用

isDone()方法判断Future是否完成了返回。

自定义线程池

自定义线程池,可以用ThreadPoolExecutor类创建,它有多个构造方法来创建线程池,用该类很容易实现

自定义的线程池,这里先贴上示例程序:

[java] view plain copy

  1. import java.util.concurrent.ArrayBlockingQueue;
  2. import java.util.concurrent.BlockingQueue;
  3. import java.util.concurrent.ThreadPoolExecutor;
  4. import java.util.concurrent.TimeUnit;
  5. public class ThreadPoolTest{
  6. public static void main(String[] args){
  7. //创建等待队列
  8. BlockingQueue<Runnable> bqueue = new ArrayBlockingQueue<Runnable>(20);
  9. //创建线程池,池中保存的线程数为3,允许的最大线程数为5
  10. ThreadPoolExecutor pool = new ThreadPoolExecutor(3,5,50,TimeUnit.MILLISECONDS,bqueue);
  11. //创建七个任务
  12. Runnable t1 = new MyThread();
  13. Runnable t2 = new MyThread();
  14. Runnable t3 = new MyThread();
  15. Runnable t4 = new MyThread();
  16. Runnable t5 = new MyThread();
  17. Runnable t6 = new MyThread();
  18. Runnable t7 = new MyThread();
  19. //每个任务会在一个线程上执行
  20. pool.execute(t1);
  21. pool.execute(t2);
  22. pool.execute(t3);
  23. pool.execute(t4);
  24. pool.execute(t5);
  25. pool.execute(t6);
  26. pool.execute(t7);
  27. //关闭线程池
  28. pool.shutdown();
  29. }
  30. }
  31. class MyThread implements Runnable{
  32. @Override
  33. public void run(){
  34. System.out.println(Thread.currentThread().getName() + "正在执行。。。");
  35. try{
  36. Thread.sleep(100);
  37. }catch(InterruptedException e){
  38. e.printStackTrace();
  39. }
  40. }
  41. }

运行结果如下:

从结果中可以看出,七个任务是在线程池的三个线程上执行的。这里简要说明下用到的ThreadPoolExecuror

类的构造方法中各个参数的含义。

public ThreadPoolExecutor (int corePoolSize, int maximumPoolSize, long         keepAliveTime, TimeUnit unit,BlockingQueue<Runnable> workQueue)

corePoolSize:线程池中所保存的核心线程数,包括空闲线程。

maximumPoolSize:池中允许的最大线程数。

keepAliveTime:线程池中的空闲线程所能持续的最长时间。

unit:持续时间的单位。

workQueue:任务执行前保存任务的队列,仅保存由execute方法提交的Runnable任务。

根据ThreadPoolExecutor源码前面大段的注释,我们可以看出,当试图通过excute方法讲一个

Runnable任务添加到线程池中时,按照如下顺序来处理:

1、如果线程池中的线程数量少于corePoolSize,即使线程池中有空闲线程,也会创建一个新的线程

来执行新添加的任务;

2、如果线程池中的线程数量大于等于corePoolSize,但缓冲队列workQueue未满,则将新添加的

任务放到workQueue中,按照FIFO的原则依次等待执行(线程池中有线程空闲出来后依次将缓冲队列

中的任务交付给空闲的线程执行);

3、如果线程池中的线程数量大于等于corePoolSize,且缓冲队列workQueue已满,但线程池中的

线程数量小于maximumPoolSize,则会创建新的线程来处理被添加的任务;

4、如果线程池中的线程数量等于了maximumPoolSize,有4种才处理方式(该构造方法调用了含

有5个参数的构造方法,并将最后一个构造方法为RejectedExecutionHandler类型,它在处理线程溢

出时有4种方式,这里不再细说,要了解的,自己可以阅读下源码)。

总结起来,也即是说,当有新的任务要处理时,先看线程池中的线程数量是否大于corePoolSize,

再看缓冲队列workQueue是否满,最后看线程池中的线程数量是否大于maximumPoolSize。

另外,当线程池中的线程数量大于corePoolSize时,如果里面有线程的空闲时间超过了keepAliveTime

,就将其移除线程池,这样,可以动态地调整线程池中线程的数量。

我们大致来看下Executors的源码,newCachedThreadPool的不带RejectedExecutionHandler参数

(即第五个参数,线程数量超过maximumPoolSize时,指定处理方式)的构造方法如下:

[java] view plain copy

  1. public static ExecutorService newCachedThreadPool() {
  2. return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
  3. 60L, TimeUnit.SECONDS,
  4. new SynchronousQueue<Runnable>());
  5. }

它将corePoolSize设定为0,而将maximumPoolSize设定为了Integer的最大值,线程空

闲超过60秒,将会从线程池中移除。由于核心线程数为0,因此每次添加任务,都会先从线

程池中找空闲线程,如果没有就会创建一个线程(SynchronousQueue<Runnalbe>决定的,

后面会说)来执行新的任务,并将该线程加入到线程池中,而最大允许的线程数为Integer的最

大值,因此这个线程池理论上可以不断扩大。

再来看newFixedThreadPool的不带RejectedExecutionHandler参数的构造方法,如下:

[java] view plain copy

  1. public static ExecutorService newFixedThreadPool(int nThreads) {
  2. return new ThreadPoolExecutor(nThreads, nThreads,
  3. 0L, TimeUnit.MILLISECONDS,
  4. new LinkedBlockingQueue<Runnable>());
  5. }

它将corePoolSize和maximumPoolSize都设定为了nThreads,这样便实现了线程池的大小的固定,

不会动态地扩大,另外,keepAliveTime设定为了0,也就是说线程只要空闲下来,就会被移除线程池,

敢于LinkedBlockingQueue下面会说。

下面说说几种排队的策略:

1、直接提交。缓冲队列采用 SynchronousQueue,它将任务直接交给线程处理而不保持它们。如果

不存在可用于立即运行任务的线程(即线程池中的线程都在工作),则试图把任务加入缓冲队列将会失

败,因此会构造一个新的线程来处理新添加的任务,并将其加入到线程池中。直接提交通常要求无界

maximumPoolSizes(Integer.MAX_VALUE) 以避免拒绝新提交的任务。newCachedThreadPool

采用的便是这种策略。

2、无界队列。使用无界队列(典型的便是采用预定义容量的 LinkedBlockingQueue,理论上是

该缓冲队列可以对无限多的任务排队)将导致在所有 corePoolSize 线程都工作的情况下将新任务加

入到缓冲队列中。这样,创建的线程就不会超过 corePoolSize,也因此,maximumPoolSize 的

值也就无效了。当每个任务完全独立于其他任务,即任务执行互不影响时,适合于使用无界队列。

newFixedThreadPool采用的便是这种策略。

3、有界队列。当使用有限的 maximumPoolSizes 时,有界队列(一般缓冲队列使用ArrayBlockingQueue,

并制定队列的最大长度)有助于防止资源耗尽,但是可能较难调整和控制,队列大小和最大池

大小需要相互折衷,需要设定合理的参数。

原文地址:http://blog.51cto.com/12222886/2061711

时间: 2024-11-08 09:01:41

Java线程与并发编程实践----并发工具类与Executor框架的相关文章

java并发编程之五、工具类

java在线程同步和互斥方面在语言和工具方面都提供了相应的支撑,与此同时,java还提供了一系列的并发容器和原子类,来使得并发编程更容易. 一.并发容器 (一).同步容器 同步容器指的是容器本身使用synchronized关键字来同步访问,包括我们都知道的HashTable,也包括Vector和Stack.另外,也可以通过工具类Collections.synchronizedList(List<T> list)这个方法将线程不安全的ArrayList转成线程安全的包装类,其他的set,map等

Java并发编程系列之十五:Executor框架

Java使用线程完成异步任务是很普遍的事,而线程的创建与销毁需要一定的开销,如果每个任务都需要创建一个线程将会消耗大量的计算资源,JDK 5之后把工作单元和执行机制区分开了,工作单元包括Runnable和Callable,而执行机制则由Executor框架提供.Executor框架为线程的启动.执行和关闭提供了便利,底层使用线程池实现.使用Executor框架管理线程的好处在于简化管理.提高效率,还能避免this逃逸问题--是指不完整的对象被线程调用. Executor框架使用了两级调度模型进行

Java线程与并发编程实践----锁框架

Java.util.concurrent.locks包提供了一个包含多种接口和类的框架,它 针对条件进行加锁和等待.不同于对象的内置加锁同步以及java.lang.Object的等 待/通知机制,包含锁框架的并发工具类通过轮询锁.显示等待及其它方式改善这种 机制. 锁框架包含了经常使用的锁.重入锁.条件.读写锁以及冲入读写锁等类别. 一.锁(Lock) Lock 实现提供了比使用 synchronized 方法和语句可获得的更广泛的锁定操作.此实 现允许更灵活的结构,可以具有差别很大的属性,可以

Java线程与并发编程实践----同步器(交换器、信号量)

一.交换器 交换器提供了一个线程之间能够交换对象的同步点.每个线程都会往这个 交换器的exchange()方法传入一些对象,匹配伙伴线程,同时接受伙伴对象作为返 回值.java.util.conurrent.Exchange<V>实现了交换器. 下面是一个代码小实例: import java.util.concurrent.Exchanger;   import java.util.concurrent.ExecutorService;   import java.util.concurren

[Java 并发] Java并发编程实践 思维导图 - 第二章 线程安全性

根据<Java并发编程实践>一书整理的思维导图.

[Java 并发] Java并发编程实践 思维导图 - 第一章 简介

阅读<Java并发编程实践>一书后整理的思维导图.

读Java并发编程实践中,向已有线程安全类添加功能--客户端加锁实现示例

在Java并发编程实践中4.4中提到向客户端加锁的方法.此为验证示例,写的不好,但可以看出结果来. package com.blackbread.test; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public

[Java 并发] Java并发编程实践 思维导图 - 第六章 任务执行

根据<Java并发编程实践>一书整理的思维导图.希望能够有所帮助. 第一部分: 第二部分: 第三部分:

[Java 并发] Java并发编程实践 思维导图 - 第五章 基础构建模块

根据<Java并发编程实践>一书整理的思维导图.希望能够有所帮助. 第一部分: 第二部分: