java 线程池(1)

ThreadPoolExecutor概述

ThreadPoolExecutor 下文简称 TPE ,我们使用它都是从Executror 这个类中的方法 :

  1     public static ExecutorService newFixedThreadPool(int nThreads) {
  2         return new ThreadPoolExecutor(nThreads, nThreads,
  3                                       0L, TimeUnit.MILLISECONDS,
  4                                       new LinkedBlockingQueue<Runnable>());
  5     }
  6
  7
  8     public static ExecutorService newSingleThreadExecutor() {
  9         return new FinalizableDelegatedExecutorService
 10             (new ThreadPoolExecutor(1, 1,
 11                                     0L, TimeUnit.MILLISECONDS,
 12                                     new LinkedBlockingQueue<Runnable>()));
 13     }
 14
 15
 16
 17     public static ExecutorService newCachedThreadPool() {
 18         return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
 19                                       60L, TimeUnit.SECONDS,
 20                                       new SynchronousQueue<Runnable>());
 21     }
 22
 23
 24     public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
 25         return new ScheduledThreadPoolExecutor(corePoolSize);
 26     }
 27
 28     //ScheduledExecutorService
 29     public ScheduledThreadPoolExecutor(int corePoolSize) {
 30         super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
 31               new DelayedWorkQueue());
 32     }
 33
 34
 35     public class ScheduledThreadPoolExecutor
 36         extends ThreadPoolExecutor
 37         implements ScheduledExecutorService
 38
 39 

Executror 的方法名很明显地说明了创建的对象的用途,我们也可以看到它们实际的都是走到了TLE构造函数,只是传入的参数不同。

  1     public ThreadPoolExecutor(int corePoolSize,
  2                               int maximumPoolSize,
  3                               long keepAliveTime,
  4                               TimeUnit unit,
  5                               BlockingQueue<Runnable> workQueue,
  6                               ThreadFactory threadFactory) {
  7         this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
  8              threadFactory, defaultHandler);
  9     }
 10
 11
 12     public ThreadPoolExecutor(int corePoolSize,
 13                               int maximumPoolSize,
 14                               long keepAliveTime,
 15                               TimeUnit unit,
 16                               BlockingQueue<Runnable> workQueue,
 17                               ThreadFactory threadFactory,
 18                               RejectedExecutionHandler handler) {
 19         if (corePoolSize < 0 ||
 20             maximumPoolSize <= 0 ||
 21             maximumPoolSize < corePoolSize ||
 22             keepAliveTime < 0)
 23             throw new IllegalArgumentException();
 24         if (workQueue == null || threadFactory == null || handler == null)
 25             throw new NullPointerException();
 26         this.corePoolSize = corePoolSize;
 27         this.maximumPoolSize = maximumPoolSize;
 28         this.workQueue = workQueue;
 29         this.keepAliveTime = unit.toNanos(keepAliveTime);
 30         this.threadFactory = threadFactory;
 31         this.handler = handler;
 32     }

由此可以推断通过配置TLE的各个参数,实现不同的功能。

ThreadPoolExecutor重要知识点

      官方文档中有详细介绍,细节请看官方文档

Core and maximum pool sizes

这两个在构造方法中需要指定,核心线程数和最大线程池线程数,很好理解,就像两条上限线,当来任务时没达到核心线程数,那么就开启一条新线程去执行,要是达到核心数量了,怎么办,任务入列,要是超过了我设定的最大线程数量,那么不再接受任务。这两个值可以动态设置:   setCorePoolSize(int)setMaximumPoolSize(int).

On-demand construction

默认情况下,甚至核心线程最初只在新任务到达时创建并启动,但可以使用方法prestartCoreThread()或prestartAllCoreThreads()动态覆盖。 如果使用非空队列构造池,则可能需要预启动线程。

Creating new threads

使用ThreadFactory创建新线程。 如果没有另外指定,则使用Executors.defaultThreadFactory(),它将所有线程创建在同一个ThreadGroup中,并具有相同的NORM_PRIORITY优先级和非守护进程状态。 通过提供不同的ThreadFactory,您可以更改线程的名称,线程组,优先级,守护程序状态等。如果ThreadFactory在通过从newThread返回null请求时无法创建线程,则执行程序将继续,但可能无法 执行任何任务。 线程应该拥有“modifyThread”RuntimePermission。 如果使用池的工作线程或其他线程不具有此权限,则服务可能会降级:配置更改可能不会及时生效,并且关闭池可能保持可以终止但未完成的状态。(取至官方文档谷歌翻译)

Keep-alive times

          前面说到  Core and maximum pool sizes  就像两个上限线,当超过了核心线程数后,任务开始执行完成,那么线程就空闲了,此时要是空闲达到了 Keep-alive times 这个设定值,那么线程就会被回收。使用Long.MAX_VALUE类型的TimeUnit.NANOSECONDS有效地禁止空闲线程在关闭之前终止。

the keep-alive policy applies only when there are more than corePoolSize threads. But method allowCoreThreadTimeOut(boolean) can be used to apply this time-out policy to core threads as well, so long as the keepAliveTime value is non-zero

      
Queuing

队列的使用和线程池的线程数有关。

  • 如果现有线程少于 corePoolSize 线程数量,尝试开启一条新的线程执行(能执行任务就不排队解决的原则)
  • 达到或超过 corePoolSize ,任务入列
  • 任务队列满了,开新线程直到 maximumPoolSize,当线程数达到 maximumPoolSize,拒绝请求。

队列处理策略 :

  • 直接交付

  • 无界队列
  • 有界队列

状态标识

          状态控制有一个变量 ctl ,它又两部分组成,
 
  • (后29位)workCount : 指示当前有效的线程数量,但是并不能代表当前存活的活跃的线程数。
  • (高3位)runStatus : 指示线程池本身的状态

下面是状态变量的定义

  1     private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
  2     private static final int COUNT_BITS = Integer.SIZE - 3;
  3     private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
  4
  5     // runState is stored in the high-order bits
  6     private static final int RUNNING    = -1 << COUNT_BITS;
  7     private static final int SHUTDOWN   =  0 << COUNT_BITS;
  8     private static final int STOP       =  1 << COUNT_BITS;
  9     private static final int TIDYING    =  2 << COUNT_BITS;
 10     private static final int TERMINATED =  3 << COUNT_BITS;
 11
 12     // Packing and unpacking ctl
 13     private static int runStateOf(int c)     { return c & ~CAPACITY; }
 14     private static int workerCountOf(int c)  { return c & CAPACITY; }
 15     private static int ctlOf(int rs, int wc) { return rs | wc; }
 16
 17     /*
 18      * Bit field accessors that don‘t require unpacking ctl.
 19      * These depend on the bit layout and on workerCount being never negative.
 20      */
 21
 22     private static boolean runStateLessThan(int c, int s) {
 23         return c < s;
 24     }
 25
 26     private static boolean runStateAtLeast(int c, int s) {
 27         return c >= s;
 28     }
 29
 30     private static boolean isRunning(int c) {
 31         return c < SHUTDOWN;
 32     }

runStatus 提供这几种状态 :

  • RUNNING : 运行
  • SHUTDOWN : 停止接收新任务,执行已经入队的任务。
  • STOP :  不再接受新任务,不执行已经入列的任务,中断正在执行的任务。
  • TIDING : 停止接收新任务,所有任务被暂停,workCount 变为 0 ,线程状态变为 TIDYING.将执行hook 方法 terminated.
  • TERMINATED : terminated方法完成。

ThreadPoolExecutor源码分析

看 execute 方法。

  1      //可以创建线程处理就处理,不行就入列,入列也失败,拒绝!
  2      public void execute(Runnable command) {
  3         if (command == null)
  4             throw new NullPointerException();
  5         /*
  6          * Proceed in 3 steps:
  7          *
  8          * 1. If fewer than corePoolSize threads are running, try to
  9          * start a new thread with the given command as its first
 10          * task.  The call to addWorker atomically checks runState and
 11          * workerCount, and so prevents false alarms that would add
 12          * threads when it shouldn‘t, by returning false.
 13          *
 14          * 2. If a task can be successfully queued, then we still need
 15          * to double-check whether we should have added a thread
 16          * (because existing ones died since last checking) or that
 17          * the pool shut down since entry into this method. So we
 18          * recheck state and if necessary roll back the enqueuing if
 19          * stopped, or start a new thread if there are none.
 20          *
 21          * 3. If we cannot queue task, then we try to add a new
 22          * thread.  If it fails, we know we are shut down or saturated
 23          * and so reject the task.
 24          */
 25         int c = ctl.get();
 26         //未达到 corePoolSize 增加新线程执行
 27         if (workerCountOf(c) < corePoolSize) {
 28             if (addWorker(command, true))
 29                 return;
 30             c = ctl.get();
 31         }
 32         //增加线程失败,或者有可能 worker的数量大于等于 core ,或是大于 maxSize ,任务入列
 33         if (isRunning(c) && workQueue.offer(command)) {
 34             //注意 :此时任务已成功入列!!!
 35             int recheck = ctl.get();
 36             //再次检查,要是 此时是 SHUTDOWN 状态(线程池关闭),那么移除这个任务,同时拒绝这个请求
 37             if (! isRunning(recheck) && remove(command))
 38                 reject(command);
 39             //非running 状态 ,同时 workerCounter 为 0 (可能线程池里的任务都执行完了),那么新建一个线程,而不去处理,为什么要这样呢?
 40             //因为任务此时在队列中了,创建线程后,自动会去获取任务并处理
 41             //要是都不是就退出这个方法,此时任务在队列中等待被处理
 42             else if (workerCountOf(recheck) == 0)
 43                 addWorker(null, false);
 44         }
 45         //线程池 shut down 或是队列满了,再次新建一个线程执行,但是这次的线程数的判断边界是 maxSize ,即是
 46         // addWorker的第二个参数来指定
 47         else if (!addWorker(command, false))
 48             reject(command);
 49     }

假设目前还未达到core 的数量,那么进入 addWorker方法 。

  1     private boolean addWorker(Runnable firstTask, boolean core) {
  2         retry:
  3         for (;;) {
  4             int c = ctl.get();
  5             int rs = runStateOf(c);
  6
  7             // Check if queue empty only if necessary.
  8             // 线程池满足如下条件中的任意一种时, 就会直接结束该方法, 并且返回 false
  9            // 表示没有创建新线程, 新提交的任务也没有被执行.
 10            // 1 .处于 STOP, TYDING 或 TERMINATD 状态
 11            // 2 .处于 SHUTDOWN 状态, 并且参数 firstTask != null
 12           // 3 .处于 SHUTDOWN 状态, firstTask == null 且阻塞队列 workQueue为空
 13             if (rs >= SHUTDOWN &&
 14                 ! (rs == SHUTDOWN &&
 15                    firstTask == null &&
 16                    ! workQueue.isEmpty()))
 17                 return false;
 18             for (;;) {
 19                 int wc = workerCountOf(c);
 20                 //此处可以看到 第二个参数,core 是用来 选定线程数边界的
 21                 if (wc >= CAPACITY ||
 22                     wc >= (core ? corePoolSize : maximumPoolSize))
 23                     return false;
 24                 //自旋增加 c , ctl的值 ,成功就 break 退出
 25                 if (compareAndIncrementWorkerCount(c))
 26                     break retry;
 27                 c = ctl.get();  // Re-read ctl
 28                 if (runStateOf(c) != rs)
 29                     //说明有人抢了,
 30                     continue retry;
 31                 // else CAS failed due to workerCount change; retry inner loop
 32                 // 或者 CAS 失败是因为 workerCount 改变,继续loop
 33             }
 34         }
 35
 36         //下面是增加一个线程的操作,创建 worker ,上锁,再次判断,创建成功后线程开始执行。
 37         boolean workerStarted = false;
 38         boolean workerAdded = false;
 39         Worker w = null;
 40         try {
 41             w = new Worker(firstTask);
 42             final Thread t = w.thread;
 43             if (t != null) {
 44                 final ReentrantLock mainLock = this.mainLock;
 45                 mainLock.lock();
 46                 try {
 47                     // Recheck while holding lock.
 48                     // Back out on ThreadFactory failure or if
 49                     // shut down before lock acquired.
 50                     // recheck 当获得锁的时候 ,退出因为 ThreadFactory 失败或是 在获得锁之前 线程池 shut down
 51                     int rs = runStateOf(ctl.get());
 52
 53                     if (rs < SHUTDOWN ||
 54                         (rs == SHUTDOWN && firstTask == null)) {
 55                         if (t.isAlive()) // precheck that t is startable
 56                             throw new IllegalThreadStateException();
 57                         workers.add(w);
 58                         int s = workers.size();
 59                         if (s > largestPoolSize)
 60                             largestPoolSize = s;
 61                         workerAdded = true;
 62                     }
 63                 } finally {
 64                     mainLock.unlock();
 65                 }
 66                 if (workerAdded) {
 67                     //走到这里线程肯定是创建了,并且线程池一定是正常的。
 68                     t.start();
 69                     workerStarted = true;
 70                 }
 71             }
 72         } finally {
 73             if (! workerStarted)
 74                 addWorkerFailed(w);
 75         }
 76         return workerStarted;
 77     }

addWoker 中创建了一个Worker,我们先看一下构造方法,再慢慢分析它。

  1         Worker(Runnable firstTask) {
  2             setState(-1); // inhibit interrupts until runWorker
  3             this.firstTask = firstTask;
  4             this.thread = getThreadFactory().newThread(this);
  5         }
  1 public interface ThreadFactory {
  2
  3     /**
  4      * Constructs a new {@code Thread}.  Implementations may also initialize
  5      * priority, name, daemon status, {@code ThreadGroup}, etc.
  6      *
  7      * @param r a runnable to be executed by new thread instance
  8      * @return constructed thread, or {@code null} if the request to
  9      *         create a thread is rejected
 10      */
 11     Thread newThread(Runnable r);
 12 }

可以看到 ThreadFactory 实际就是创建线程的方法,同时传入一个 Runnable , worker 里面传入了一个this ,我们赶紧看一下worker的定义。

  1     private final class Worker  extends AbstractQueuedSynchronizer  implements Runnable

意图很明显,就是worker 本身带有一个任务(可以为NULL),让刚创建的线程去执行这个任务。下面看一下它到底执行了什么?

  1         /** Delegates main run loop to outer runWorker  */
  2         public void run() {
  3             runWorker(this);
  4         }
  5
  6
  7
  8         /** Delegates main run loop to outer runWorker  */
  9     public void run() {
 10         runWorker(this);
 11     }
 12
 13
 14      /**
 15      * Main worker run loop.  Repeatedly gets tasks from queue and
 16      * executes them, while coping with a number of issues:
 17      *
 18      * 1. We may start out with an initial task, in which case we
 19      * don‘t need to get the first one. Otherwise, as long as pool is
 20      * running, we get tasks from getTask. If it returns null then the
 21      * worker exits due to changed pool state or configuration
 22      * parameters.  Other exits result from exception throws in
 23      * external code, in which case completedAbruptly holds, which
 24      * usually leads processWorkerExit to replace this thread.
 25      *
 26      * 2. Before running any task, the lock is acquired to prevent
 27      * other pool interrupts while the task is executing, and then we
 28      * ensure that unless pool is stopping, this thread does not have
 29      * its interrupt set.
 30      *
 31      * 3. Each task run is preceded by a call to beforeExecute, which
 32      * might throw an exception, in which case we cause thread to die
 33      * (breaking loop with completedAbruptly true) without processing
 34      * the task.
 35      *
 36      * 4. Assuming beforeExecute completes normally, we run the task,
 37      * gathering any of its thrown exceptions to send to afterExecute.
 38      * We separately handle RuntimeException, Error (both of which the
 39      * specs guarantee that we trap) and arbitrary Throwables.
 40      * Because we cannot rethrow Throwables within Runnable.run, we
 41      * wrap them within Errors on the way out (to the thread‘s
 42      * UncaughtExceptionHandler).  Any thrown exception also
 43      * conservatively causes thread to die.
 44      *
 45      * 5. After task.run completes, we call afterExecute, which may
 46      * also throw an exception, which will also cause thread to
 47      * die. According to JLS Sec 14.20, this exception is the one that
 48      * will be in effect even if task.run throws.
 49      *
 50      * The net effect of the exception mechanics is that afterExecute
 51      * and the thread‘s UncaughtExceptionHandler have as accurate
 52      * information as we can provide about any problems encountered by
 53      * user code.
 54      *
 55      * @param w the worker
 56      *
 57      *
 58      *  这里可以看到执行完任务后,就会阻塞在 getTask ,而线程没有被回收
 59      *  除非getTask 返回 null ,所有我们利用 一些调用满足 getTask 返回 null
 60      *  例如 : 超时设置
 61      *
 62      *
 63      **/
 64     final void runWorker(Worker w) {
 65         Thread wt = Thread.currentThread();
 66         Runnable task = w.firstTask;
 67         w.firstTask = null;
 68         w.unlock(); // allow interrupts  允许中断,释放锁(为了下面抢任务)
 69         boolean completedAbruptly = true;
 70         try {
 71             //抢任务,抢到就加锁,即是 firstTask 为null 时才会去 getTask
 72             while (task != null || (task = getTask()) != null) {
 73                 // 加锁,防止线程被其他线程中断
 74                 w.lock();
 75                 // If pool is stopping, ensure thread is interrupted;
 76                 // if not, ensure thread is not interrupted.  This
 77                 // requires a recheck in second case to deal with
 78                 // shutdownNow race while clearing interrupt
 79                 if ((runStateAtLeast(ctl.get(), STOP) ||
 80                      (Thread.interrupted() &&
 81                       runStateAtLeast(ctl.get(), STOP))) &&
 82                     !wt.isInterrupted())
 83                     wt.interrupt();
 84                 try {
 85                     //子类实现
 86                     beforeExecute(wt, task);
 87                     Throwable thrown = null;
 88                     try {
 89                         task.run();
 90                     } catch (RuntimeException x) {
 91                         thrown = x; throw x;
 92                     } catch (Error x) {
 93                         thrown = x; throw x;
 94                     } catch (Throwable x) {
 95                         thrown = x; throw new Error(x);
 96                     } finally {
 97                         //抛出异常后,依旧执行这里
 98                         afterExecute(task, thrown);
 99                     }
100                 } finally {
101                     task = null;
102                     w.completedTasks++;
103                     w.unlock();
104                 }
105             }
106             //来到这里,1.task == null
107             completedAbruptly = false;
108         } finally {
109             processWorkerExit(w, completedAbruptly);
110         }
111     }
112
113
114
115
116
117     /**
118      * Performs blocking or timed wait for a task, depending on
119      * current configuration settings, or returns null if this worker
120      * must exit because of any of:
121      * 1. There are more than maximumPoolSize workers (due to
122      *    a call to setMaximumPoolSize).
123      * 2. The pool is stopped.
124      * 3. The pool is shutdown and the queue is empty.
125      * 4. This worker timed out waiting for a task, and timed-out
126      *    workers are subject to termination (that is,
127      *    {@code allowCoreThreadTimeOut || workerCount > corePoolSize})
128      *    both before and after the timed wait, and if the queue is
129      *    non-empty, this worker is not the last thread in the pool.
130      *
131      * @return task, or null if the worker must exit, in which case
132      *         workerCount is decremented
133      *
134      *
135      *
136      * 上面的注解是 4 种返回 null 的情况, 其中第一种的原因有没有可能是因为其他线程创建线程导致的呢?
137      *  worker 必须退出,顺便数量在这里减少一
138      *
139      */
140     private Runnable getTask() {
141         boolean timedOut = false; // Did the last poll() time out?
142
143         for (;;) {
144             int c = ctl.get();
145             int rs = runStateOf(c);
146
147             // Check if queue empty only if necessary.
148             if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
149                 decrementWorkerCount();
150                 return null;
151             }
152
153             int wc = workerCountOf(c);
154
155             // Are workers subject to culling?
156             boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
157
158             //自旋失败后继续 loop 直到 成功
159             if ((wc > maximumPoolSize || (timed && timedOut))
160                 && (wc > 1 || workQueue.isEmpty())) {
161                 //这里自旋
162                 if (compareAndDecrementWorkerCount(c))
163                     return null;
164                 continue;
165             }
166
167             try {
168                 //前面  boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
169                 //可以知道 keepAliveTime 区别于 上面两个条件  1. allowCoreThreadTimeOut  2.wc > corePoolSize
170                 Runnable r = timed ?
171                     workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
172                     workQueue.take();
173                 if (r != null)
174                     return r;
175                 timedOut = true;
176             // 在这里会捕获中断异常!!这里很重要,提供了假如是调用了 shutDown() 方法,线程可以退出的出口
177             } catch (InterruptedException retry) {
178                 timedOut = false;
179             }
180         }
181     }

思路就是从任务队列中不断拿取任务(无任务状态下,线程阻塞),然后执行该任务。有个很奇怪的地方,runworker 中在获取任务前释放了锁,在获取到任务后再次获取锁。为什么呢?在 worker 这个类前有注解。

  1    /**
  2      * Class Worker mainly maintains interrupt control state for
  3      * threads running tasks, along with other minor bookkeeping.
  4      * This class opportunistically extends AbstractQueuedSynchronizer
  5      * to simplify acquiring and releasing a lock surrounding each
  6      * task execution.  This protects against interrupts that are
  7      * intended to wake up a worker thread waiting for a task from
  8      * instead interrupting a task being run.  We implement a simple
  9      * non-reentrant mutual exclusion lock rather than use
 10      * ReentrantLock because we do not want worker tasks to be able to
 11      * reacquire the lock when they invoke pool control methods like
 12      * setCorePoolSize.  Additionally, to suppress interrupts until
 13      * the thread actually starts running tasks, we initialize lock
 14      * state to a negative value, and clear it upon start (in
 15      * runWorker).
 16      */

worker 使用了“中断控制状态”来维护线程运行,该类继承 AbstractQueueSynchronizer ,它的作用是当这个线程在执行任务时不被其他线程中断,而是让其他线程等待被唤醒。同时,该类使用无重入的独占互斥锁而不是 ReentrantLock ,因为我们不想在调用setCorePoolSize 重入该锁。

现在线程池就在愉快地执行任务了,假如我这时候停止线程池。

  1     public void shutdown() {
  2         final ReentrantLock mainLock = this.mainLock;
  3         mainLock.lock();
  4         try {
  5             //检查是否可以 shutdown
  6             checkShutdownAccess();
  7             //设置为 SHUTDOWN
  8             advanceRunState(SHUTDOWN);
  9             //中断所有worker
 10             interruptIdleWorkers();
 11             onShutdown(); // hook for ScheduledThreadPoolExecutor
 12         } finally {
 13             mainLock.unlock();
 14         }
 15         //终结这个线程池
 16         tryTerminate();
 17     }
 18
 19
 20
 21     /**
 22      * If there is a security manager, makes sure caller has
 23      * permission to shut down threads in general (see shutdownPerm).
 24      * If this passes, additionally makes sure the caller is allowed
 25      * to interrupt each worker thread. This might not be true even if
 26      * first check passed, if the SecurityManager treats some threads
 27      * specially.
 28      */
 29     private void checkShutdownAccess() {
 30         SecurityManager security = System.getSecurityManager();
 31         if (security != null) {
 32             security.checkPermission(shutdownPerm);
 33             final ReentrantLock mainLock = this.mainLock;
 34             mainLock.lock();
 35             try {
 36                 for (Worker w : workers)
 37                     security.checkAccess(w.thread);
 38             } finally {
 39                 mainLock.unlock();
 40             }
 41         }
 42     }
 43
 44
 45
 46     /**
 47      * Transitions runState to given target, or leaves it alone if
 48      * already at least the given target.
 49      *
 50      * @param targetState the desired state, either SHUTDOWN or STOP
 51      *        (but not TIDYING or TERMINATED -- use tryTerminate for that)
 52      */
 53     private void advanceRunState(int targetState) {
 54         for (;;) {
 55             int c = ctl.get();
 56             if (runStateAtLeast(c, targetState) ||
 57                 ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
 58                 break;
 59         }
 60     }
 61
 62
 63     private void interruptIdleWorkers() {
 64         interruptIdleWorkers(false);
 65     }
 66
 67
 68
 69     /**
 70      * Interrupts threads that might be waiting for tasks (as
 71      * indicated by not being locked) so they can check for
 72      * termination or configuration changes. Ignores
 73      * SecurityExceptions (in which case some threads may remain
 74      * uninterrupted).
 75      *
 76      * @param onlyOne If true, interrupt at most one worker. This is
 77      * called only from tryTerminate when termination is otherwise
 78      * enabled but there are still other workers.  In this case, at
 79      * most one waiting worker is interrupted to propagate shutdown
 80      * signals in case all threads are currently waiting.
 81      * Interrupting any arbitrary thread ensures that newly arriving
 82      * workers since shutdown began will also eventually exit.
 83      * To guarantee eventual termination, it suffices to always
 84      * interrupt only one idle worker, but shutdown() interrupts all
 85      * idle workers so that redundant workers exit promptly, not
 86      * waiting for a straggler task to finish.
 87      *
 88      *
 89      *  从方法名可以看出 : 中断空闲 workers
 90      *  从下面的代码也可以看到要是传过来的参数是 false
 91      *  那么所有线程将被中断  (shutDown()方法有运用到)
 92      *
 93      */
 94     private void interruptIdleWorkers(boolean onlyOne) {
 95         final ReentrantLock mainLock = this.mainLock;
 96         mainLock.lock();
 97         try {
 98
 99             for (Worker w : workers) {
100                 Thread t = w.thread;
101                 //这里 worker 的 tryLock 需要注意一下,这里要是 worker 正在执行任务(这就解释了为什么在runWorker 方法中,worker要加锁了),
102                 // 那么 tryLock 返回 false,
103                 if (!t.isInterrupted() && w.tryLock()) {
104                     try {
105                         t.interrupt();
106                     } catch (SecurityException ignore) {
107                         //注意这里忽略了这个 exception
108                         //所以文档中指出了有可能某些线程依旧会保持为非中断状态
109                     } finally {
110                         w.unlock();
111                     }
112                 }
113                 if (onlyOne)
114                     break;
115             }
116         } finally {
117             mainLock.unlock();
118         }
119     }
120
121
122     /**
123      * Performs any further cleanup following run state transition on
124      * invocation of shutdown.  A no-op here, but used by
125      * ScheduledThreadPoolExecutor to cancel delayed tasks.
126      */
127     void onShutdown() {
128     }
129
130
131
132
133     /**
134      * Attempts to stop all actively executing tasks, halts the
135      * processing of waiting tasks, and returns a list of the tasks
136      * that were awaiting execution. These tasks are drained (removed)
137      * from the task queue upon return from this method.
138      *
139      * <p>This method does not wait for actively executing tasks to
140      * terminate.  Use {@link #awaitTermination awaitTermination} to
141      * do that.
142      *
143      * <p>There are no guarantees beyond best-effort attempts to stop
144      * processing actively executing tasks.  This implementation
145      * cancels tasks via {@link Thread#interrupt}, so any task that
146      * fails to respond to interrupts may never terminate.
147      *
148      * @throws SecurityException {@inheritDoc}
149      */
150     public List<Runnable> shutdownNow() {
151         List<Runnable> tasks;
152         final ReentrantLock mainLock = this.mainLock;
153         mainLock.lock();
154         try {
155             checkShutdownAccess();
156             // STOP 终于在这里发挥了作用!!S:D
157             advanceRunState(STOP);
158             interruptWorkers();
159             //remove task 从队列中
160             tasks = drainQueue();
161         } finally {
162             mainLock.unlock();
163         }
164         tryTerminate();
165         return tasks;
166     }
167 

看一下 tryTerminate 方法。

  1     /**
  2      * Transitions to TERMINATED state if either (SHUTDOWN and pool
  3      * and queue empty) or (STOP and pool empty).  If otherwise
  4      * eligible to terminate but workerCount is nonzero, interrupts an
  5      * idle worker to ensure that shutdown signals propagate. This
  6      * method must be called following any action that might make
  7      * termination possible -- reducing worker count or removing tasks
  8      * from the queue during shutdown. The method is non-private to
  9      * allow access from ScheduledThreadPoolExecutor.
 10      *
 11      *
 12      * 如果满足其中一个条件 :
 13      *    1. SHUTDOWN 并且 workerCount为 0 并且 队列为空
 14      *    2. STOP 并且 workerCount为 0
 15      *    那么将状态转化为 TERMINATED ;
 16      *
 17      *  如果  workerCount(c)!=0 ,那么调用  interruptIdleWorkers(true); 然后就return
 18      *
 19      *
 20      *  所以我们可以知道在线程池正常的状态的下必定直接 return
 21      *
 22      */
 23     final void tryTerminate() {
 24         for (;;) {
 25             int c = ctl.get();
 26             if (isRunning(c) ||
 27                 runStateAtLeast(c, TIDYING) ||
 28                 (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
 29                 return;
 30             //具备条件时执行下面
 31             if (workerCountOf(c) != 0) { // Eligible to terminate
 32                 // 传入一个参数  true ,表示只中断一个,这是因为,每个线程当自己没任务时,肯定
 33                 interruptIdleWorkers(ONLY_ONE);
 34                 return;
 35             }
 36
 37             final ReentrantLock mainLock = this.mainLock;
 38             mainLock.lock();
 39             try {
 40                 if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
 41                     try {
 42                         //看到了吗,介绍线程池状态时,讲到要是是 TIDYING 状态时,会调用这个钩子方法
 43                         terminated();
 44                     } finally {
 45                         ctl.set(ctlOf(TERMINATED, 0));
 46                         termination.signalAll();
 47                     }
 48                     return;
 49                 }
 50             } finally {
 51                 mainLock.unlock();
 52             }
 53             // else retry on failed CAS
 54         }
 55     }

从上面我们看到要是调用了 shutDown() 或是 shutDownNow ()那么我们正阻塞在 getTask()方法的线程就会收到中断异常,于是就会getTask就会返回 null 。那么我们继续看一下线程继续向下执行的逻辑。

  1     /**
  2      * Performs cleanup and bookkeeping for a dying worker. Called
  3      * only from worker threads. Unless completedAbruptly is set,
  4      * assumes that workerCount has already been adjusted to account
  5      * for exit.  This method removes thread from worker set, and
  6      * possibly terminates the pool or replaces the worker if either
  7      * it exited due to user task exception or if fewer than
  8      * corePoolSize workers are running or queue is non-empty but
  9      * there are no workers.
 10      *
 11      * @param w the worker
 12      * @param completedAbruptly if the worker died due to user exception
 13      */
 14     private void processWorkerExit(Worker w, boolean completedAbruptly) {
 15         if (completedAbruptly) // If abrupt, then workerCount wasn‘t adjusted
 16             decrementWorkerCount();
 17
 18         //加锁执行
 19         final ReentrantLock mainLock = this.mainLock;
 20         mainLock.lock();
 21         try {
 22             completedTaskCount += w.completedTasks;
 23             //移除 worker
 24             workers.remove(w);
 25         } finally {
 26             mainLock.unlock();
 27         }
 28
 29         tryTerminate();
 30
 31         int c = ctl.get();
 32         // replacement 的意思在这里,是上面已经移除了一个 worker , 这里调用 addWorker 再补充
 33         if (runStateLessThan(c, STOP)) {
 34             if (!completedAbruptly) {
 35                 int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
 36                 if (min == 0 && ! workQueue.isEmpty())
 37                     min = 1;
 38                 if (workerCountOf(c) >= min)
 39                     return; // replacement not needed
 40             }
 41             addWorker(null, false);
 42         }
 43     }
 44 

下一篇我们我们讲TLE 具体衍生的不同类型的线程池。

参考资料 :

原文地址:https://www.cnblogs.com/Benjious/p/10200248.html

时间: 2024-10-09 18:34:09

java 线程池(1)的相关文章

Java线程池应用

Executors工具类用于创建Java线程池和定时器. newFixedThreadPool:创建一个可重用固定线程数的线程池,以共享的无界队列方式来运行这些线程.在任意点,在大多数 nThreads 线程会处于处理任务的活动状态.如果在所有线程处于活动状态时提交附加任务,则在有可用线程之前,附加任务将在队列中等待.如果在关闭前的执行期间由于失败而导致任何线程终止,那么一个新线程将代替它执行后续的任务(如果需要).在某个线程被显式地关闭之前,池中的线程将一直存在. 创建一个固定大小的线程池来执

java线程池

1.new Thread的弊端 执行一个异步任务你还只是如下new Thread吗? 1 new Thread(new Runnable() { 2 3 @Override 4 public void run() { 5 // TODO Auto-generated method stub 6 } 7 }).start(); 那你就out太多了,new Thread的弊端如下: a. 每次new Thread新建对象性能差. b. 线程缺乏统一管理,可能无限制新建线程,相互之间竞争,及可能占用过

Java线程池使用说明

Java线程池使用说明 一 简介 线程的使用在java中占有极其重要的地位,在jdk1.4极其之前的jdk版本中,关于线程池的使用是极其简陋的.在jdk1.5之后这一情况有了很大的改观.Jdk1.5之后加入了java.util.concurrent包,这个包中主要介绍java中线程以及线程池的使用.为我们在开发中处理线程的问题提供了非常大的帮助. 二:线程池 线程池的作用: 线程池作用就是限制系统中执行线程的数量.     根 据系统的环境情况,可以自动或手动设置线程数量,达到运行的最佳效果:少

Java 线程池的原理与实现

最近在学习线程池.内存控制等关于提高程序运行性能方面的编程技术,在网上看到有一哥们写得不错,故和大家一起分享. [分享]Java 线程池的原理与实现 这几天主要是狂看源程序,在弥补了一些以前知识空白的同时,也学会了不少新的知识(比如 NIO),或者称为新技术吧.线程池就是其中之一,一提到线程,我们会想到以前<操作系统>的生产者与消费者,信号量,同步控制等等.一提到池,我们会想到数据库连接池,但是线程池又如何呢? 建议:在阅读本文前,先理一理同步的知识,特别是syncronized同步关键字的用

Java 线程池学习

Reference: <创建Java线程池>[1],<Java线程:新特征-线程池>[2], <Java线程池学习>[3],<线程池ThreadPoolExecutor使用简介>[4],<Java5中的线程池实例讲解>[5],<ThreadPoolExecutor使用和思考>[6] [1]中博主自己通过ThreadGroup实现一个线程池(挺方便理解的),使用的是jdk1.4版本,Jdk1.5版本以上提供了现成的线程池. [2]中介绍

JAVA线程池的分析和使用

http://www.infoq.com/cn/articles/java-threadPool/ 1. 引言 合理利用线程池能够带来三个好处.第一:降低资源消耗.通过重复利用已创建的线程降低线程创建和销毁造成的消耗.第二:提高响应速度.当任务到达时,任务可以不需要等到线程创建就能立即执行.第三:提高线程的可管理性.线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控.但是要做到合理的利用线程池,必须对其原理了如指掌. 2. 线程池

java线程池分析和应用

比较 在前面的一些文章里,我们已经讨论了手工创建和管理线程.在实际应用中我们有的时候也会经常听到线程池这个概念.在这里,我们可以先针对手工创建管理线程和通过线程池来管理做一个比较.通常,我们如果手工创建线程,需要定义线程执行对象,它实现的接口.然后再创建一个线程对象,将我们定义好的对象执行部分装载到线程中.对于线程的创建.结束和结果的获取都需要我们来考虑.如果我们需要用到很多的线程时,对线程的管理就会变得比较困难.我们手工定义线程的方式在时间和空间效率方面会存在着一些不足.比如说我们定义好的线程

Java线程池:ExecutorService,Executors

简单的Java线程池可以从Executors.newFixedThreadPool( int n)获得.此方法返回一个线程容量为n的线程池.然后ExecutorService的execute执行之. 现给出一个示例. package zhangphil.executorservice; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class ZhangPhil

JAVA线程池ThreadPoolExecutor与阻塞队列BlockingQueue .

从Java5开始,Java提供了自己的线程池.每次只执行指定数量的线程,java.util.concurrent.ThreadPoolExecutor 就是这样的线程池.以下是我的学习过程. 首先是构造函数签名如下: [java] view plain copy print ? public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<

Java线程池使用和分析(二) - execute()原理

相关文章目录: Java线程池使用和分析(一) Java线程池使用和分析(二) - execute()原理 execute()是 java.util.concurrent.Executor接口中唯一的方法,JDK注释中的描述是“在未来的某一时刻执行命令command”,即向线程池中提交任务,在未来某个时刻执行,提交的任务必须实现Runnable接口,该提交方式不能获取返回值.下面是对execute()方法内部原理的分析,分析前先简单介绍线程池有哪些状态,在一系列执行过程中涉及线程池状态相关的判断