Java线程池入门

为什么要用线程池?什么情况下才会用到线程池?

并发的线程数量很多,并且每个线程都是执行一个时间很短的任务就结束了,这样频繁创建线程就会大大降低系统的效率,因为频繁创建线程和销毁线程需要时间。

因此,就用到了线程池;线程池中的线程可以复用,就是执行完一个任务,并不被销毁,而是继续执行下一个任务。

如下使用线程:

public class Test{

 

   public static void main(String[] args) {

 

      long start = System.currentTimeMillis();

      for(int i=0;i<10000;i++){

        new Thread(new Runnable() {

 

           @Override

           public void run() {

 

              System.out.println(Thread.currentThread().getName());

           }

        }).start();

      }

      long end = System.currentTimeMillis();

      System.out.println("运行时间:"+(end-start)+"毫秒");

   }

 

}

新建10000个线程来执行任务,测试结果得出需要时间是0.7秒左右。

使用线程池:

public class ThreadPoolTest{

 

   private static int produceTaskSleepTime = 2;

    private static int produceTaskMaxNumber = 20;

 

    public static void main(String[] args) {

       int corePoolSize = 2;

     

        int maximumPoolSize = 4;

      

        long keepAliveTime = 3;

       

        int blockingQueueSize = 3;

       

        ThreadPoolExecutor threadPool = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, TimeUnit.SECONDS,

             new LinkedBlockingQueue<Runnable>(blockingQueueSize),

                new ThreadPoolExecutor.CallerRunsPolicy());

        System.out.println(threadPool.getPoolSize());

        long start = System.currentTimeMillis();

        for (int i = 1; i <= produceTaskMaxNumber; i++){

            try{

              

                String task = "任务 #" + i;

                System.out.println("放入 " + task);

                threadPool.execute(new ThreadPoolTask(task));

            catch (Exception e){

                e.printStackTrace();

            }

        }

       

        System.out.println("关闭线程池");

        long end = System.currentTimeMillis();

      System.out.println("运行时间:"+(end-start)+"毫秒");

    }

 

}

public class ThreadPoolTask implements Runnable, Serializable {

   private static final long serialVersionUID = 0;

   private static int consumeTaskSleepTime = 2000;

   // 保存任务所需要的数据

   private Object threadPoolTaskData;

 

   ThreadPoolTask(Object tasks) {

      this.threadPoolTaskData = tasks;

   }

 

   public void run() {

      System.err.println("线程 " + Thread.currentThread().getName() + " 开始做 " + threadPoolTaskData);

     

      threadPoolTaskData = null;

   }

 

   public Object getTask() {

      return this.threadPoolTaskData;

   }

}

同样的使用线程池,只需要0.1秒左右的时间。很容易就可以发现这两者中间的差距。

正文

上面的线程池的例子中用到的ThreadPoolExecutor是JDK并发包提供的一个线程池服务,基于ThreadPoolExecutor可以很容易将一个Runnable接口的任务放入线程池中。

ThreadPoolExecutor提供了四个构造器,看源码可以发现前三个构造器都是调用的第四个构造器进行的初始化工作:

public class ThreadPoolExecutor extends AbstractExecutorService {

   ...

    public ThreadPoolExecutor(int corePoolSize,

                              int maximumPoolSize,

                              long keepAliveTime,

                              TimeUnit unit,

                              BlockingQueue<Runnable> workQueue) {

        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,

             Executors.defaultThreadFactory(), defaultHandler);

    }

    public ThreadPoolExecutor(int corePoolSize,

                              int maximumPoolSize,

                              long keepAliveTime,

                              TimeUnit unit,

                              BlockingQueue<Runnable> workQueue,

                              ThreadFactory threadFactory) {

        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,

             threadFactory, defaultHandler);

    }

    public ThreadPoolExecutor(int corePoolSize,

                              int maximumPoolSize,

                              long keepAliveTime,

                              TimeUnit unit,

                              BlockingQueue<Runnable> workQueue,

                              RejectedExecutionHandler handler) {

        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,

             Executors.defaultThreadFactory(), handler);

    }

    public ThreadPoolExecutor(int corePoolSize,

                              int maximumPoolSize,

                              long keepAliveTime,

                              TimeUnit unit,

                              BlockingQueue<Runnable> workQueue,

                              ThreadFactory threadFactory,

                              RejectedExecutionHandler handler) {

        if (corePoolSize < 0 ||

            maximumPoolSize <= 0 ||

            maximumPoolSize < corePoolSize ||

            keepAliveTime < 0)

            throw new IllegalArgumentException();

        if (workQueue == null || threadFactory == null || handler == null)

            throw new NullPointerException();

        this.corePoolSize = corePoolSize;

        this.maximumPoolSize = maximumPoolSize;

        this.workQueue = workQueue;

        this.keepAliveTime = unit.toNanos(keepAliveTime);

        this.threadFactory = threadFactory;

        this.handler = handler;

}

...

}

构造器中各个参数的含义:

corePoolSize:核心池的大小,这个参数跟后面讲述的线程池的实现原理有非常大的关系。在创建了线程池后,默认情况下,线程池中并没有任何线程,而是等待有任务到来才创建线程去执行任务,除非调用了prestartAllCoreThreads()或者prestartCoreThread()方法,从这2个方法的名字就可以看出,是预创建线程的意思,即在没有任务到来之前就创建corePoolSize个线程或者一个线程。默认情况下,在创建了线程池后,线程池中的线程数为0,当有任务来之后,就会创建一个线程去执行任务,当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中;

maximumPoolSize:线程池最大线程数,这个参数也是一个非常重要的参数,它表示在线程池中最多能创建多少个线程;

keepAliveTime:表示线程没有任务执行时最多保持多久时间会终止。默认情况下,只有当线程池中的线程数大于corePoolSize时,keepAliveTime才会起作用,直到线程池中的线程数不大于corePoolSize,即当线程池中的线程数大于corePoolSize时,如果一个线程空闲的时间达到keepAliveTime,则会终止,直到线程池中的线程数不超过corePoolSize。但是如果调用了allowCoreThreadTimeOut(boolean)方法,在线程池中的线程数不大于corePoolSize时,keepAliveTime参数也会起作用,直到线程池中的线程数为0;

unit:参数keepAliveTime(线程池维护线程所允许的空闲时间的单位),可选参数值有7种取值,在TimeUnit类中有7种静态属性:

TimeUnit.DAYS;               //天

TimeUnit.HOURS;             //小时

TimeUnit.MINUTES;           //分钟

TimeUnit.SECONDS;           //秒

TimeUnit.MILLISECONDS;      //毫秒

TimeUnit.MICROSECONDS;      //微妙

TimeUnit.NANOSECONDS;       //纳秒

workQueue:一个阻塞队列,用来存储等待执行的任务,这个参数的选择也很重要,会对线程池的运行过程产生重大影响,一般来说,这里的阻塞队列有以下几种选择:

ArrayBlockingQueue;

LinkedBlockingQueue;

SynchronousQueue;

ArrayBlockingQueue和PriorityBlockingQueue使用较少,一般使用LinkedBlockingQueue和Synchronous。线程池的排队策略与BlockingQueue有关。

threadFactory:线程工厂,主要用来创建线程;

handler:表示当拒绝处理任务时的策略,有以下四种取值(默认ThreadPoolExecutor.AbortPolicy):

ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。

ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。

ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)。

ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务,线程调用运行该任务的execute 本身。此策略提供简单的反馈控制机制,能够减缓新任务的提交速度。

从上面给出的ThreadPoolExecutor类的代码可以知道,ThreadPoolExecutor继承了AbstractExecutorService,AbstractExecutorService是一个抽象类,它实现了ExecutorService接口。而ExecutorService又是继承了Executor接口。(以上具体代码可以自行翻阅JDK源码,因为太多就不一一粘贴)

Executor是一个顶层类,里面只声明了一个execute()方法,返回值为void,参数为Runable类型,作用就是执行传进去的任务的。

在ThreadPoolExecutor类中有几个非常重要的方法:

execute()

submit()

shutdown()

shutdownNow()

execute()方法实际上是Executor中声明的方法,在ThreadPoolExecutor进行了具体的实现,这个方法是ThreadPoolExecutor的核心方法,通过这个方法可以向线程池提交一个任务,交由线程池去执行。

submit()方法是在ExecutorService中声明的方法,在AbstractExecutorService就已经有了具体的实现,在ThreadPoolExecutor中并没有对其进行重写,这个方法也是用来向线程池提交任务的,但是它和execute()方法不同,它能够返回任务执行的结果,去看submit()方法的实现,会发现它实际上还是调用的execute()方法,只不过它利用了Future来获取任务执行结果。

shutdown()和shutdownNow()是用来关闭线程池的;调用shutdown()方法,线程池处于SHUTDOWN状态,此时线程池不能接受新的任务,它会等待所有任务执行完毕;调用shutdownNow()方法,线程池处于STOP状态,此时线程池不能接受新的任务,并且回去尝试终止正在执行的任务,返回尚未执行的任务。

线程池的状态:

volatile int runState;

static final int RUNNING    = 0;

static final int SHUTDOWN   = 1;

static final int STOP       = 2;

static final int TERMINATED = 3;

RUNNING    :

当创建线程池后,初始时,线程池处于RUNNING状态

RUNNING -> SHUTDOWN   :

调用shutdown()方法,线程池处于SHUTDOWN状态

RUNNING or SHUTDOWN   -> STOP   :

调用shutdownNow()方法,线程池处于STOP状态

SHUTDOWN  or STOP  -> TERMINATED    :

当线程池处于SHUTDOWN或STOP状态,并且所有工作线程已经销毁,任务缓存队列已经清空或执行结束后,线程池被设置为TERMINATED状态

需要注意的是默认情况下,创建线程池之后线程池中是没有线程的,需要提交任务之后才会创建线程。

在实际中如果需要线程池创建之后立即创建线程,可以通过以下两个方法:

prestartCoreThread():初始化一个核心线程

prestartAllCoreThreads():初始化所有核心线程

线程池中比较重要的成员变量:

private final BlockingQueue<Runnable> workQueue;              //任务缓存队列,用来存放等待执行的任务

private final ReentrantLock mainLock = new ReentrantLock();   //线程池的主要状态锁,对线程池状态(比如线程池大小、runState等)的改变都要使用这个锁

private final HashSet<Worker> workers = new HashSet<Worker>();  //用来存放工作集

 

private volatile long  keepAliveTime;    //线程存活时间  

private volatile boolean allowCoreThreadTimeOut;   //是否允许为核心线程设置存活时间

private volatile int   corePoolSize;     //核心池的大小(即线程池中的线程数目大于这个参数时,提交的任务会被放进任务缓存队列)

private volatile int   maximumPoolSize;   //线程池最大能容忍的线程数(防止任务量突然过大)

 

private volatile int   poolSize;       //线程池中当前的线程数

 

private volatile RejectedExecutionHandler handler; //任务拒绝策略

 

private volatile ThreadFactory threadFactory;   //线程工厂,用来创建线程

 

private int largestPoolSize;   //用来记录线程池中曾经出现过的最大线程数,跟线程池的容量没有关系

 

private long completedTaskCount;   //用来记录已经执行完毕的任务个数

任务的执行过程:

线程池最基本的差不多说完了,下面再说下任务的执行过程。

在ThreadPoolExecutor中,最核心的任务提交方法就是execute()方法,虽然上面提到的submit()方法也可以提交任务,但是实际上submit()方法调用的还是execute()方法。

我们看下execute()方法的源码:

    public void execute(Runnable command) {

        if (command == null)

            throw new NullPointerException();

        if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {

            if (runState == RUNNING && workQueue.offer(command)) {

                if (runState != RUNNING || poolSize == 0)

                    ensureQueuedTaskHandled(command);

            }

            else if (!addIfUnderMaximumPoolSize(command))

                reject(command);

        }

    }

这段代码首先判断提交的任务command是否为null,如果是null,则抛空指针异常;

接着往下,这句代码比较有意思,

if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command))

由于使用的是或条件运算符,所以会先计算前半部分的值,如果线程池中当前线程数不小于核心池大小,那么就会直接进入下面的if语句块;如果线程池中当前线程数小于核心池大小,则接着执行后半部分:

addIfUnderCorePoolSize(command)

    private boolean addIfUnderCorePoolSize(Runnable firstTask) {

        Thread t = null;

        final ReentrantLock mainLock = this.mainLock;

        mainLock.lock();

        try {

            if (poolSize < corePoolSize && runState == RUNNING)

                t = addThread(firstTask);

        finally {

            mainLock.unlock();

        }

        if (t == null)

            return false;

        t.start();

        return true;

    }

addIfUnderCorePoolSize()方法首先调用mainLock加锁,再次判断当前线程数小于corePoolSize并且线程池处于RUNNING状态,则调用addThread()增加线程:

    private Thread addThread(Runnable firstTask) {

        Worker w = new Worker(firstTask);

        Thread t = threadFactory.newThread(w);

        if (t != null) {

            w.thread = t;

            workers.add(w);

            int nt = ++poolSize;

            if (nt > largestPoolSize)

                largestPoolSize = nt;

        }

        return t;

    }

addThread()方法首先创建Work对象,然后调用threadFactory创建新的线程,如果创建的线程不为null,将Work对象的thread属性设置为此创建出来的线程,并将此Work对象放入workers中,然后在增加当前线程池的中线程数,增加后回到addIfUnderCorePoolSize方法 ,释放mainLock,最后启动这个新创建的线程来执行新传入的任务。

回到execute()方法,如果执行完addIfUnderCorePoolSize这个方法返回false,则继续执行下面的if语句块,否则整个方法就直接执行完毕;

如果执行完addIfUnderCorePoolSize这个方法返回false,然后接着判断:

if (runState == RUNNING && workQueue.offer(command))

如果当前线程池处于RUNNING状态,则将任务放入任务缓存队列,如果当前线程池不处于RUNNING状态或者任务放入缓存队列失败,则执行:

addIfUnderMaximumPoolSize(command)

    private boolean addIfUnderMaximumPoolSize(Runnable firstTask) {

        Thread t = null;

        final ReentrantLock mainLock = this.mainLock;

        mainLock.lock();

        try {

            if (poolSize < maximumPoolSize && runState == RUNNING)

                t = addThread(firstTask);

        finally {

            mainLock.unlock();

        }

        if (t == null)

            return false;

        t.start();

        return true;

    }

如果执行addIfUnderMaximumPoolSize()方法返回false,则执行reject()方法进行任务拒绝处理。addIfUnderMaximumPoolSize()方法和addIfUnderCorePoolSize()方法类似,不同的是addIfUnderMaximumPoolSize()方法是判断的maximumPoolSize进行比较,如果超过最大线程数则返回false,而addIfUnderCorePoolSize()方法判断的是corePoolSize。留心的同学可能会发现这两个方法都有判断当前线程数目的大小是否小于corePoolSize(或者maximumPoolSize),原因很简单,前面的判断过程中并没有加锁,因此可能在execute()方法判断的时候poolSize小于corePoolSize,而判断完之后,在其他线程中又向线程池提交了任务,就可能导致poolSize不小于corePoolSize了,所以需要在这个地方继续判断。然后接着判断线程池的状态是否为RUNNING,原因也很简单,因为有可能在其他线程中调用了shutdown()或者shutdownNow()方法。

继续说if (runState == RUNNING && workQueue.offer(command))这句,如果当前线程池处于RUNNING状态且将任务放入任务缓存队列成功,则继续进行判断:

if (runState != RUNNING || poolSize == 0)

这句判断是为了防止在将此任务添加进任务缓存队列的同时其他线程突然调用shutdown或者shutdownNow方法关闭了线程池的一种应急措施。如果是这样就执行:

ensureQueuedTaskHandled(command);

ensureQueuedTaskHandled()方法是进行应急处理,从名字可以看出是保证添加到任务缓存队列中的任务得到处理。

    private void ensureQueuedTaskHandled(Runnable command) {

        final ReentrantLock mainLock = this.mainLock;

        mainLock.lock();

        boolean reject = false;

        Thread t = null;

        try {

            int state = runState;

            if (state != RUNNING && workQueue.remove(command))

                reject = true;

            else if (state < STOP &&

                     poolSize < Math.max(corePoolSize, 1) &&

                     !workQueue.isEmpty())

                t = addThread(null);

        finally {

            mainLock.unlock();

        }

        if (reject)

            reject(command);

        else if (t != null)

            t.start();

    }

ensureQueuedTaskHandled()方法判断线程池运行,如果状态不为运行状态,从workQueue中删除, 并调用reject做拒绝处理。

任务缓存队列及排队策略:

任务缓存队列,即workQueue,它用来存放等待执行的任务。

  workQueue的类型为BlockingQueue<Runnable>,通常可以取下面三种类型:

  1)ArrayBlockingQueue:基于数组的先进先出队列,此队列创建时必须指定大小;

  2)LinkedBlockingQueue:基于链表的先进先出队列,如果创建时没有指定此队列大小,则默认为Integer.MAX_VALUE;

  3)synchronousQueue:这个队列比较特殊,它不会保存提交的任务,而是将直接新建一个线程来执行新来的任务。

线程池容量调整:

ThreadPoolExecutor提供了动态调整线程池容量大小的方法:

setCorePoolSize()//设置核心池大小

setMaximumPoolSize()//设置线程池最大能创建的线程数目大小

当上述参数从小变大时,ThreadPoolExecutor进行线程赋值,还可能立即创建新的线程来执行任务。

注:

以上Demo及源码基于JDK1.6版本。

转载请注明:李锋镝的个人博客 >> http://www.lifengdi.com/article/10044.html

时间: 2024-10-19 05:11:14

Java线程池入门的相关文章

Java线程池应用

Executors工具类用于创建Java线程池和定时器. newFixedThreadPool:创建一个可重用固定线程数的线程池,以共享的无界队列方式来运行这些线程.在任意点,在大多数 nThreads 线程会处于处理任务的活动状态.如果在所有线程处于活动状态时提交附加任务,则在有可用线程之前,附加任务将在队列中等待.如果在关闭前的执行期间由于失败而导致任何线程终止,那么一个新线程将代替它执行后续的任务(如果需要).在某个线程被显式地关闭之前,池中的线程将一直存在. 创建一个固定大小的线程池来执

java线程池

1.new Thread的弊端 执行一个异步任务你还只是如下new Thread吗? 1 new Thread(new Runnable() { 2 3 @Override 4 public void run() { 5 // TODO Auto-generated method stub 6 } 7 }).start(); 那你就out太多了,new Thread的弊端如下: a. 每次new Thread新建对象性能差. b. 线程缺乏统一管理,可能无限制新建线程,相互之间竞争,及可能占用过

Java线程池使用说明

Java线程池使用说明 一 简介 线程的使用在java中占有极其重要的地位,在jdk1.4极其之前的jdk版本中,关于线程池的使用是极其简陋的.在jdk1.5之后这一情况有了很大的改观.Jdk1.5之后加入了java.util.concurrent包,这个包中主要介绍java中线程以及线程池的使用.为我们在开发中处理线程的问题提供了非常大的帮助. 二:线程池 线程池的作用: 线程池作用就是限制系统中执行线程的数量.     根 据系统的环境情况,可以自动或手动设置线程数量,达到运行的最佳效果:少

Java 线程池的原理与实现

最近在学习线程池.内存控制等关于提高程序运行性能方面的编程技术,在网上看到有一哥们写得不错,故和大家一起分享. [分享]Java 线程池的原理与实现 这几天主要是狂看源程序,在弥补了一些以前知识空白的同时,也学会了不少新的知识(比如 NIO),或者称为新技术吧.线程池就是其中之一,一提到线程,我们会想到以前<操作系统>的生产者与消费者,信号量,同步控制等等.一提到池,我们会想到数据库连接池,但是线程池又如何呢? 建议:在阅读本文前,先理一理同步的知识,特别是syncronized同步关键字的用

Java 线程池学习

Reference: <创建Java线程池>[1],<Java线程:新特征-线程池>[2], <Java线程池学习>[3],<线程池ThreadPoolExecutor使用简介>[4],<Java5中的线程池实例讲解>[5],<ThreadPoolExecutor使用和思考>[6] [1]中博主自己通过ThreadGroup实现一个线程池(挺方便理解的),使用的是jdk1.4版本,Jdk1.5版本以上提供了现成的线程池. [2]中介绍

JAVA线程池的分析和使用

http://www.infoq.com/cn/articles/java-threadPool/ 1. 引言 合理利用线程池能够带来三个好处.第一:降低资源消耗.通过重复利用已创建的线程降低线程创建和销毁造成的消耗.第二:提高响应速度.当任务到达时,任务可以不需要等到线程创建就能立即执行.第三:提高线程的可管理性.线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控.但是要做到合理的利用线程池,必须对其原理了如指掌. 2. 线程池

java线程池分析和应用

比较 在前面的一些文章里,我们已经讨论了手工创建和管理线程.在实际应用中我们有的时候也会经常听到线程池这个概念.在这里,我们可以先针对手工创建管理线程和通过线程池来管理做一个比较.通常,我们如果手工创建线程,需要定义线程执行对象,它实现的接口.然后再创建一个线程对象,将我们定义好的对象执行部分装载到线程中.对于线程的创建.结束和结果的获取都需要我们来考虑.如果我们需要用到很多的线程时,对线程的管理就会变得比较困难.我们手工定义线程的方式在时间和空间效率方面会存在着一些不足.比如说我们定义好的线程

Java线程池:ExecutorService,Executors

简单的Java线程池可以从Executors.newFixedThreadPool( int n)获得.此方法返回一个线程容量为n的线程池.然后ExecutorService的execute执行之. 现给出一个示例. package zhangphil.executorservice; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class ZhangPhil

JAVA线程池ThreadPoolExecutor与阻塞队列BlockingQueue .

从Java5开始,Java提供了自己的线程池.每次只执行指定数量的线程,java.util.concurrent.ThreadPoolExecutor 就是这样的线程池.以下是我的学习过程. 首先是构造函数签名如下: [java] view plain copy print ? public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<