多线程(十七、深入了解线程池-ThreadPoolExecutor)

ThreadPoolExecutor原理

ThreadPoolExecutor构造函数参数

/**
 * 使用给定的参数创建ThreadPoolExecutor.
 *
 * @param corePoolSize    核心线程池中的最大线程数
 * @param maximumPoolSize 总线程池中的最大线程数
 * @param keepAliveTime   空闲线程的存活时间
 * @param unit            keepAliveTime的单位
 * @param workQueue       任务队列, 保存已经提交但尚未被执行的任务
 * @param threadFactory   线程工厂(用于指定如果创建一个线程)
 * @param handler         拒绝策略 (当任务太多导致工作队列满时的处理策略)
 */
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
                          BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);   // 使用纳秒保存存活时间
    this.threadFactory = threadFactory;
    this.handler = handler;
}

1、ThreadPoolExecutor在逻辑上将自身管理的线程池划分为两部分:核心线程池(大小对应为corePoolSize)、非核心线程池(大小对应为maximumPoolSize-corePoolSize)。
2、ThreadPoolExecutor中只有一种类型的线程,名叫Worker,它是ThreadPoolExecutor内部类,封装着Runnable任务和执行该任务的Thread对象,我们称它为【工作线程】,它也是ThreadPoolExecutor唯一需要进行维护的线程;
3、【核心线程池】【非核心线程池】都是逻辑上的概念,ThreadPoolExecutor在任务调度过程中会根据corePoolSize和maximumPoolSize的大小,判断如何执行任务。

线程池状态和管理

1、ThreadPoolExecutor内部定义了一个AtomicInteger变量—ctl,通过按位划分的方式,在一个变量中记录线程池状态和工作线程数量

1、低29位保存线程数
2、高3位保存线程池状态

//保存线程池状态和工作线程数:低29位: 工作线程数,高3位 : 线程池状态
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    private static final int COUNT_BITS = Integer.SIZE - 3;
    // 最大线程数: 2^29-1
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

    // 线程池状态
    private static final int RUNNING    = -1 << COUNT_BITS; //接受新任务, 且处理已经进入阻塞队列的任务
    private static final int SHUTDOWN   =  0 << COUNT_BITS; //不接受新任务, 但处理已经进入阻塞队列的任务
    private static final int STOP       =  1 << COUNT_BITS; //不接受新任务, 且不处理已经进入阻塞队列的任务, 同时中断正在运行的任务
    private static final int TIDYING    =  2 << COUNT_BITS; //所有任务都已终止, 工作线程数为0, 线程转化为TIDYING状态并准备调用terminated方法
    private static final int TERMINATED =  3 << COUNT_BITS; //terminated方法已经执行完成

工作线程

工作线程(Worker),Worker内部类,实现了AQS框架,ThreadPoolExecutor通过一个HashSet来保存工作线程:

Worker定义

/**
 * Worker表示线程池中的一个工作线程, 可以与任务相关联.
 * 由于实现了AQS框架, 其同步状态值的定义如下:
 * -1: 初始状态
 * 0:  无锁状态
 * 1:  加锁状态
 */
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {

    /**
     * 与该Worker关联的线程.
     */
    final Thread thread;
    /**
     *初始化任务,可以为空,为空的时候则去任务队列workQueue里获取
     */
    Runnable firstTask;
    /**
     * 当前工作线程处理完成的任务数
     */
    volatile long completedTasks;

    Worker(Runnable firstTask) {
        setState(-1); // 初始的同步状态值
        this.firstTask = firstTask;
        this.thread = getThreadFactory().newThread(this);
    }

    /**
     * 执行任务
     */
    public void run() {
        runWorker(this);
    }

    /**
     * 是否加锁
     */
    protected boolean isHeldExclusively() {
        return getState() != 0;
    }

    /**
     * 尝试获取锁
     */
    protected boolean tryAcquire(int unused) {
        if (compareAndSetState(0, 1)) {
            setExclusiveOwnerThread(Thread.currentThread());
            return true;
        }
        return false;
    }

    /**
     * 尝试释放锁
     */
    protected boolean tryRelease(int unused) {
        setExclusiveOwnerThread(null);
        setState(0);
        return true;
    }

    public void lock() {
        acquire(1);
    }

    public boolean tryLock() {
        return tryAcquire(1);
    }

    public void unlock() {
        release(1);
    }

    public boolean isLocked() {
        return isHeldExclusively();
    }

    /**
     * 中断线程(仅任务非初始状态)
     */
    void interruptIfStarted() {
        Thread t;
        if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
            try {
                t.interrupt();
            } catch (SecurityException ignore) {
            }
        }
    }
}

线程池执行execute

execute代码逻辑

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        //获取线程池状态和工作线程数量
        int c = ctl.get();
        //如果工作线程数 < 核心线程数,则创建工作线程
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true)) //addWorker创建工作线程
                return;
            c = ctl.get();
        }
        //工作线程创建失败,或者,工作线程 >= 核心线程数,任务插入任务队列workQueue
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            //再次检查线程池状态,如果不是运行状态,移除任务,并拒绝
            if (! isRunning(recheck) && remove(command))
                reject(command);
            //如果工作线程为0,则创建一个不带任务的线程,线程自动去任务队列获取任务执行
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        //任务插入队列失败,创建非核心工作线程,如果失败,则说明工作线程 > 总线程数量,则执行拒绝策略
        else if (!addWorker(command, false))
            reject(command);
    }

执行流程图

说明:
1、如果工作线程数小于核心线程池上限(CorePoolSize),则直接新建一个工作线程并执行任务;
2、如果工作线程数大于等于CorePoolSize,则尝试将任务加入到队列等待以后执行。如果队列已满,则在总线程池未满的情况下(CorePoolSize ≤ 工作线程数 < maximumPoolSize)新建一个工作线程立即执行任务,否则执行拒绝策略。

创建工作线程addWorker

/**
     * 添加工作线程并执行任务
     *
     * @param firstTask 如果指定了该参数, 表示将立即执行该firstTask任务; 否则从工作队列中获取任务并执行
     * @param core      执行任务的工作线程归属于哪个线程池:  true-核心线程池  false-非核心线程池
     */
    private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            /**
             * 这个if主要是判断哪些情况下, 线程池不再接受新任务执行, 而是直接返回.总结下, 有以下几种情况:
             * 1. 线程池状态为 STOP 或 TIDYING 或 TERMINATED: 线程池状态为上述任一一种时, 都不会再接受任务,所以直接返回
             * 2. 线程池状态≥ SHUTDOWN 且 firstTask != null: 因为当线程池状态≥ SHUTDOWN时, 不再接受新任务的提交,所以直接返回
             * 3. 线程池状态≥ SHUTDOWN 且 队列为空: 队列中已经没有任务了, 所以也就不需要执行任何任务了,可以直接返回
             */
            if (rs >= SHUTDOWN &&
                    ! (rs == SHUTDOWN &&
                            firstTask == null &&
                            ! workQueue.isEmpty()))
                return false;

            for (;;) {
                //获取工作线程数
                int wc = workerCountOf(c);
                /**
                 * 这个if主要是判断工作线程数是否超限, 以下任一情况属于超限, 直接返回:
                 * 1. 工作线程数超过最大工作线程数(2^29-1)
                 * 2. 工作线程数超过核心线程池上限(入参core为true, 表示归属核心线程池)
                 * 3. 工作线程数超过总线程池上限(入参core为false, 表示归属非核心线程池)
                 */
                if (wc >= CAPACITY ||
                        wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                //增加工作线程数
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();
                //CAS失败,自旋重新操作
                if (runStateOf(c) != rs)
                    continue retry;

            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            //将任务包装成工作线程
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                //加锁
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    //重新检查线程池状态
                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN ||
                            (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        //工作线程加入集合
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    //启动线程,这个很关键,因为t在new的时候是用Work作为任务的,
                    // Work实现了Runnale接口,所以t.start就是执行Work的run方法
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

特别注意:
启动线程,这个很关键,因为thread在new的时候是用Work(this)作为任务的, Work实现了Runnale接口,所以t.start就是执行Work的run方法。

工作线程的执行runWorker

final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        // 任务, 如果是null则从队列取任务
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // 允许执行线程被中断
        boolean completedAbruptly = true;
        try {
            // 当task==null时会通过getTask从队列取任务
            while (task != null || (task = getTask()) != null) {
                w.lock();

                if ((runStateAtLeast(ctl.get(), STOP) ||
                        (Thread.interrupted() &&
                                runStateAtLeast(ctl.get(), STOP))) &&
                        !wt.isInterrupted())
                    wt.interrupt();
                try {
                    // 钩子方法,由子类自定义实现
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        // 执行任务
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        // 钩子方法,由子类自定义实现
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;  // 完成任务数+1
                    w.unlock();
                }
            }
            //说明该工作线程自身既没有携带任务, 也没从任务队列中获取到任务
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }

获取任务方法getTask

private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?

        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            /**
             * 以下IF用于判断哪些情况下不允许再从队列获取任务:
             * 1. 线程池进入停止状态(STOP/TIDYING/TERMINATED), 此时即使队列中还有任务未执行, 也不再执行
             * 2. 线程池非RUNNING状态, 且队列为空
             */
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }

            int wc = workerCountOf(c);

            /**
             * timed变量用于判断是否需要进行超时控制:
             * 对于核心线程池中的工作线程, 除非设置了allowCoreThreadTimeOut==true, 否则不会超时回收;
             * 对于非核心线程池中的工作线程, 都需要超时控制
             */
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
            // 这里主要是当外部通过setMaximumPoolSize方法重新设置了最大线程数时
            // 需要回收多出的工作线程
            if ((wc > maximumPoolSize || (timed && timedOut))
                    && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
                //区分超时操作还是非超时获取
                Runnable r = timed ?
                        workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                        workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

拒绝策略

所谓拒绝策略,就是在构造ThreadPoolExecutor时,传入的RejectedExecutionHandler对象,一共4种

1.AbortPolicy(默认),抛出一个RejectedExecutionException异常

2、DiscardPolicy,什么都不做,等任务自己被回收

3、DiscardOldestPolicy,丢弃任务队列中的最近一个任务,并执行当前任务

4、CallerRunsPolicy,以自身线程来执行任务,减缓新任务提交的速度

线程池关闭

1、shutdown方法将线程池切换到SHUTDOWN状态(如果已经停止,则不用切换),并调用interruptIdleWorkers方法中断所有空闲的工作线程,最后调用tryTerminate尝试结束线程池

2、shutdownNow方法的主要不同之处就是,它会将线程池的状态至少置为STOP,同时中断所有工作线程(无论该线程是空闲还是运行中),同时返回任务队列中的所有任务

原文地址:https://blog.51cto.com/janephp/2416396

时间: 2024-10-11 19:56:11

多线程(十七、深入了解线程池-ThreadPoolExecutor)的相关文章

java线程API学习 线程池ThreadPoolExecutor(转)

线程池ThreadPoolExecutor继承自ExecutorService.是jdk1.5加入的新特性,将提交执行的任务在内部线程池中的可用线程中执行. 构造函数 ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, Rejected

线程池(ThreadPoolExecutor JDK1.7)

平常我们经常都会使用到线程池,但是有没考虑过为什么需要使用线程池呢?下面我列举一下问题,大家可以思考一下 1.当前服务器的硬件环境是多少核的CPU,它和线程的关系又是什么? 2.jvm能创建多少个线程? 3.多线程主要解决什么问题? 4.你使用线程池的目的是什么? 以上几个问题都是帮助你更好的使用java的线程(还可以衍生更多的小问题,如:jvm维护线程的消耗,cpu调度线程的消耗,应该使用多少个线程才能最大化利用多核CPU..).答案需要自己去百度,我这也讲不好,反而误导大家. 线程池顾名思义

java面试总躲不过的并发(一): 线程池ThreadPoolExecutor基础梳理

本文核心:线程池ThreadPoolExecutor基础梳理 一.实现多线程的方式 1.继承Thread类,重写其run方法 2.实现Runnable接口,实现run方法 3.实现Callable接口,实现call方法 由于Java的设计,只支持单继承,但是支持多实现形式,所以一般面向接口开发,Runnable接口与Callable接口的区别在于Callable接口中的call方法是带返回值的,其返回一个Future的异步类,我们可以通过Future的get方法获取结果,如果线程还没有执行完,g

C#多线程之旅(3)——线程池

v博客前言 先交代下背景,写<C#多线程之旅>这个系列文章主要是因为以下几个原因:1.多线程在C/S和B/S架构中用得是非常多的;2.而且多线程的使用是非常复杂的,如果没有用好,容易造成很多问题. v写在前面 多线程,有利也有弊,使用需谨慎. v正文开始 原文地址:C#多线程之旅(3)——线程池 C#多线程之旅目录: C#多线程之旅(1)——介绍和基本概念 C#多线程之旅(2)——创建和开始线程 C#多线程之旅(3)——线程池 C#多线程之旅(4)——同步本质 ...... 一.介绍 无论你什

《Java源码分析》:线程池 ThreadPoolExecutor

<Java源码分析>:线程池 ThreadPoolExecutor ThreadPoolExecutor是ExecutorService的一张实现,但是是间接实现. ThreadPoolExecutor是继承AbstractExecutorService.而AbstractExecutorService实现了ExecutorService接口. 在介绍细节的之前,先介绍下ThreadPoolExecutor的结构 1.线程池需要支持多个线程并发执行,因此有一个线程集合Collection来执行

线程池ThreadPoolExecutor使用简介(转)

一.简介 线程池类为 java.util.concurrent.ThreadPoolExecutor,常用构造方法为: ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, RejectedExecutionHandler handler) corePoolSize: 线程池维护线程的最少数量 maximumPool

JAVA程序设计(18.1)----- 1多线程轮流打印 线程调度 线程池 synchronized wait notify 内部类

1.两个线程 一个打印A 一个打印B 另两个线程轮流进行打印工作 多线程初级应用 线程调度  线程池(预先建立N个线程,需要的程序直接调用,执行完毕后归还回线程池,典型的以空间换时间 synchronized wait notify  内部类使用 package com.lovo; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * 类:测试 wait notify 用

[转载]线程池ThreadPoolExecutor使用简介

一.简介 线程池类为 java.util.concurrent.ThreadPoolExecutor,常用构造方法为: ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) corePoolSize: 线程池维护线程的最少数量

C# 多线程的自动管理(线程池) 基于Task的方式

C# 多线程的自动管理(线程池) 在多线程的程序中,经常会出现两种情况:    1. 应用程序中线程把大部分的时间花费在等待状态,等待某个事件发生,然后给予响应.这一般使用 ThreadPool(线程池)来解决.     2. 线程平时都处于休眠状态,只是周期性地被唤醒.这一般使用 Timer(定时器)来解决. ThreadPool 类提供一个由系统维护的线程池(可以看作一个线程的容器),该容器需要 Windows 2000 以上系统支持,因为其中某些方法调用了只有高版本的Windows 才有的

【转载】5天不再惧怕多线程——第五天 线程池

说到多线程,不可不说线程池,C#中关于池的概念很多,今天来整理下ThreadPool的使用. 是的,如果你很懒,如果你的执行任务比较短,如果你不想对线程做更精细的控制,那么把这些繁琐的东西丢给线程池吧. 一:ThreadPool 好了,下面看看TheadPool下有哪些常用的方法. 1:GetMaxThreads,GetMinThreads 首先我们肯定好奇线程池到底给我们如何控制线程数,下面就具体的看一看. 1 class Program 2 { 3 static void Main(stri