5、java的concurrent包下的线程池

1、java.util.concurrent

Class Executurs

public class Executors extends Object

其中提供了返回类型为Executor、ExecutorService、ScheduledExecutorService、ThreadFactory和Callable的方法。

public static ExecutorService newFixedThreadPool(int nThreads)

创建一个固定线程个数的线程池。当所有的线程都处于活跃状态,此时有一个任务提交到线程池,那么任务将会处于等待状态,直到线程池当中有可用的线程。当线程在执行任务时间

由于某些原因而消亡,那么一个新的线程将会替换它来完成后续的任务。这些线程将会一直存在于线程池除非被显性的shutdown。

public static ExecutorService newSingleThreadExecutor()

创建一个单一的线程。如果线程在处理任务时,由于某种原因消亡,那么会有一个新的线程来替换它继续执行后续的任务。

public static ExecutorService newCachedThreadPool()

按需创建线程的线程池,但是当线程池当中的线程处于可用状态时,它们能够复用。这个池能够改善执行许多生命周期短的异步任务的性能。线程如果在60s之内没有被利用,就会被中止,

并且移除线程池。

public static ScheduleExecutorService newSingleThreadScheduleExecutor()

创建一个单一的线程执行器(executor),它能够在给定的延迟时间或者以一定的频率来调度任务。

public static ScheduleExecutorService newScheduledThreadPool(int corePoolSize)

public static Callable<Object> callable(Runnable task)

运行一个给定的任务,并且返回一个Callable对象

2、层级关系

interface Executor;

interface ExecutorService implements Executor;

abstract class AbstractExecutorService implements ExecutorService

class ThreadPoolExecutor extends AbstractExecutorService;

 1     /**
 2      * @param corePoolSize 保持在池中的线程数,即使它们处于空闲状态,除非allowCoreThreadTimeOut设置了
 3      * @param maximumPoolSize 允许在池中的最大线程数
 4      * @param keepAliveTime 当池中的线程数大于核心线程数时,多于核心线程数的那部分线程等待新任务的最大时间为keepAliveTime,如果这段时间内没有新任务,那么这些线程将会终止。
 5      * @param unit 为keepAliveTime设置时间单元
 6      * @param workQueue 在任务被执行前,它们都存储在workQueue当中。这个队列指挥保留通过execute方法提交的Runnable任务。
 7      * @param threadFactory 创建新线程将用到的线程工厂
 8      * @param handler 由于线程大小数和队列容量已经达到边界,执行就会被阻塞,此时会执行handler。即,提供了在到达界限时,希望做的事。
 9      */
10     public ThreadPoolExecutor(int corePoolSize,
11                               int maximumPoolSize,
12                               long keepAliveTime,
13                               TimeUnit unit,
14                               BlockingQueue<Runnable> workQueue,
15                               ThreadFactory threadFactory,
16                               RejectedExecutionHandler handler) {
17         if (corePoolSize < 0 ||
18             maximumPoolSize <= 0 ||
19             maximumPoolSize < corePoolSize ||
20             keepAliveTime < 0)
21             throw new IllegalArgumentException();
22         if (workQueue == null || threadFactory == null || handler == null)
23             throw new NullPointerException();
24         this.corePoolSize = corePoolSize;
25         this.maximumPoolSize = maximumPoolSize;
26         this.workQueue = workQueue;
27         this.keepAliveTime = unit.toNanos(keepAliveTime);
28         this.threadFactory = threadFactory;
29         this.handler = handler;
30     }

线程池有它自己的生命周期,在这个生命周期中有五个状态,然后线程池还有一个“线程数”的属性。这六个属性线程池是用一个变量来表示的,这就是ctl控制状态符。

它是一个原子态的整型(AtomicInteger),它表示了概念上的两个属性:

wokerCount:指示了有效的线程数

runState:指示了running、shutting down等等这些状态。

为了将这些打包进一个int类型的数据,于是限制了wokerCount数目最多为(2^9-1,大约500million,5亿)个线程而不是(2^31-1,2billion,20亿)。当然如果将来因此

出现问题,就会将变量改为AtomicLong或者在常量下面进行移位来调节。但是在这之前都会用int,这样代码将会快一点而且简单一点。

wokerCount是一个被允许启动但是不允许停止的工作者线程数。这个值也许会短暂的与真实存活的线程数不同,比如当ThreadPool要求创建一个线程失败时,这时由存在

的线程来执行记录直到终止。

runState提供了主要的声明周期状态

RUNNING:接收新的任务并且处理排队中的任务                                                                             11100000000000000000000000000000

SHUTDOWN:不再接收新的任务,但是处理已经进入队列当中的任务                                                   00000000000000000000000000000000

STOP:不再接收新的任务,并且也不再处理队列中的任务,而且正在被执行的任务也会被中断(interrupt)       00100000000000000000000000000000

TIDYING(使整洁,有条理):所有的任务已经完成,workCount为0,线程变成TIDYING.                         01000000000000000000000000000000

TERMINAL:当terminated()方法完成时                                                                                     01100000000000000000000000000000

CAPACITY                                                                                                                           00011111111111111111111111111111

判断现在的工作者线程数:

private static int workerCountOf(int c)  { return c & CAPACITY; }

执行3部曲:

1、如果少于corePoolSize的线程数正在运行,那么新任务尝试启动新的线程。调用addWorker会检测runState和workerCount

2、如果一个任务能够成功入队,那么我们仍需要再次确认是否应该添加线程,因为在最后一次检查完成后,存在的某个线程已经死亡活着当进入这个方法后,

线程池被shutdown。于是我们需要冲洗检测状态。

3、如果我们不能够是任务进入队列,我们尝试新的线程。如果失败,我们知道我们已经shutdown或者已经饱和(saturated),于是我们拒绝任务。

 1  public void execute(Runnable command) {
 2         if (command == null)
 3             throw new NullPointerException();
 4
 5         int c = ctl.get();
 6         if (workerCountOf(c) < corePoolSize) {
 7             if (addWorker(command, true))
 8                 return;
 9             c = ctl.get();
10         }
11         if (isRunning(c) && workQueue.offer(command)) {
12             int recheck = ctl.get();
13             if (! isRunning(recheck) && remove(command))
14                 reject(command);
15             else if (workerCountOf(recheck) == 0)
16                 addWorker(null, false);
17         }
18         else if (!addWorker(command, false))
19             reject(command);
20     }

判断运行状态:

private static int runStateOf(int c)     { return c & ~CAPACITY; }

 1  private boolean addWorker(Runnable firstTask, boolean core) {
 2         retry:
 3         for (;;) {
 4             int c = ctl.get();//得到运行状态和线程数的控制符
 5             int rs = runStateOf(c);//用整数的高3位来呈现5种运行状态
 6
 7             // 在必要是检查队列是否为空
 8             if (rs >= SHUTDOWN &&//如果此时处于关闭、完成或者
 9                 ! (rs == SHUTDOWN &&//所加入的任务为空,那么就不入
10                    firstTask == null &&//队列
11                    ! workQueue.isEmpty()))
12                 return false;
13
14             for (;;) {
15                 int wc = workerCountOf(c);//得到工作者线程数
16 /*如果此时工作者线程数已经超过了最大容量或者是超过了核心容器的线程数,那么就不能再如队列了。已经饱和。*/
17                 if (wc >= CAPACITY ||
18                     wc >= (core ? corePoolSize : maximumPoolSize))
19                     return false;
20                 if (compareAndIncrementWorkerCount(c))
21                     break retry;
22                 c = ctl.get();  // Re-read ctl
23                 if (runStateOf(c) != rs)
24                     continue retry;
25                 // else CAS failed due to workerCount change; retry inner loop
26             }
27         }
28
29         boolean workerStarted = false;
30         boolean workerAdded = false;
31         Worker w = null;
32         try {
33             final ReentrantLock mainLock = this.mainLock;
34             w = new Worker(firstTask);
35             final Thread t = w.thread;
36             if (t != null) {
37                 mainLock.lock();
38                 try {
39                     // Recheck while holding lock.
40                     // Back out on ThreadFactory failure or if
41                     // shut down before lock acquired.
42                     int c = ctl.get();
43                     int rs = runStateOf(c);
44
45                     if (rs < SHUTDOWN ||
46                         (rs == SHUTDOWN && firstTask == null)) {
47                         if (t.isAlive()) // precheck that t is startable
48                             throw new IllegalThreadStateException();
49                         workers.add(w);
50                         int s = workers.size();
51                         if (s > largestPoolSize)
52                             largestPoolSize = s;
53                         workerAdded = true;
54                     }
55                 } finally {
56                     mainLock.unlock();
57                 }
58                 if (workerAdded) {
59                     t.start();
60                     workerStarted = true;
61                 }
62             }
63         } finally {
64             if (! workerStarted)
65                 addWorkerFailed(w);
66         }
67         return workerStarted;
68     }
时间: 2024-10-11 14:11:41

5、java的concurrent包下的线程池的相关文章

java.util.concurrent包下的几个常用类

1.Callable<V> Callable<V>与Runnable类似,理解Callable<V>可以从比较其与Runnable的区别开始: 1)从使用上:实现的Callable<V>的类需要实现call()方法,此方法有返回对象V:而Runnable的子类需要实现run()方法,但没有返回值: 2)如果直接调用Callable<V>的子类的call()方法,代码是同步顺序执行的:而Runnable的子类是线程,是代码异步执行. 3)将Call

Java基础知识-java.util.concurrent包下常见类的使用

一,Condition 一个场景,两个线程数数,同时启动两个线程,线程A数1.2.3,然后线程B数4.5.6,最后线程A数7.8.9,程序结束,这涉及到线程之间的通信. public class ConditionTest { static class NumberWrapper { public int value = 1; } public static void main(String[] args) { //初始化可重入锁 final Lock lock = new ReentrantL

concurrent包下常用的类

转自 http://www.importnew.com/21889.html 在java 1.5中,提供了一些非常有用的辅助类来帮助我们进行并发编程,比如CountDownLatch,CyclicBarrier和Semaphore,今天我们就来学习一下这三个辅助类的用法. 以下是本文目录大纲: 一.CountDownLatch用法 二.CyclicBarrier用法 三.Semaphore用法 一.CountDownLatch用法 CountDownLatch类位于java.util.concu

《java.util.concurrent 包源码阅读》13 线程池系列之ThreadPoolExecutor 第三部分

这一部分来说说线程池如何进行状态控制,即线程池的开启和关闭. 先来说说线程池的开启,这部分来看ThreadPoolExecutor构造方法: public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecut

《java.util.concurrent 包源码阅读》14 线程池系列之ScheduledThreadPoolExecutor 第一部分

ScheduledThreadPoolExecutor是ThreadPoolExecutor的子类,同时实现了ScheduledExecutorService接口. public class ScheduledThreadPoolExecutor extends ThreadPoolExecutor implements ScheduledExecutorService ScheduledThreadPoolExecutor的功能主要有两点:在固定的时间点执行(也可以认为是延迟执行),重复执行.

《java.util.concurrent 包源码阅读》10 线程池系列之AbstractExecutorService

AbstractExecutorService对ExecutorService的执行任务类型的方法提供了一个默认实现.这些方法包括submit,invokeAny和InvokeAll. 注意的是来自Executor接口的execute方法是未被实现,execute方法是整个体系的核心,所有的任务都是在这个方法里被真正执行的,因此该方法的不同实现会带来不同的执行策略.这个在后面分析ThreadPoolExecutor和ScheduledThreadPoolExecutor就能看出来. 首先来看su

《java.util.concurrent 包源码阅读》11 线程池系列之ThreadPoolExecutor 第一部分

先来看ThreadPoolExecutor的execute方法,这个方法能体现出一个Task被加入到线程池之后都发生了什么: public void execute(Runnable command) { if (command == null) throw new NullPointerException(); /* 如果运行中的worker线程数少于设定的常驻线程数,增加worker线程,把task分配给新建的worker线程 */ int c = ctl.get(); if (worker

《java.util.concurrent 包源码阅读》09 线程池系列之介绍篇

concurrent包中Executor接口的主要类的关系图如下: Executor接口非常单一,就是执行一个Runnable的命令. public interface Executor { void execute(Runnable command); } ExecutorService接口扩展了Executor接口,增加状态控制,执行多个任务返回Future. 关于状态控制的方法: // 发出关闭信号,不会等到现有任务执行完成再返回,但是现有任务还是会继续执行, // 可以调用awaitTe

java中concurrent包内容

有BlockingQueue及其相关的类,跟阻塞队列有关系. ConcurrentHashMap,ConcurrentLinkedQueue等,这些是相关集合的线程同步版本. CopyOnWriteArrayList,也是一种并发用的容器,当我们改变这个数组的时候,先复制一个副本,修改这个副本,再复制回去.这样就实现了读写分离,适用于读多写少的并发场景. CountDownLatch,这个类适用于这种情况:多个线程同时工作,然后其中几个可以随意并发执行,但有一个线程需要等其他线程工作结束后,才能