Java线程池源码阅读

简单介绍

  线程池是池化技术的一种,对线程复用、资源回收、多任务执行有不错的实践。阅读源码,可以学习jdk的大师对于线程并发是怎么池化的,还有一些设计模式。同时,它也能给我们在使用它的时候多一种感知,出了什么问题可以马上意识到哪里的问题。

使用范例

  我们使用一个线程池,直接通过jdk提供的工具类直接创建。使用如下api创建一个固定线程数的线程池。

ExecutorService pool = Executors.newFixedThreadPool(5);

  使用如下api创建一个会不断增长线程的线程池。

 ExecutorService pool = Executors.newCachedThreadPool();

  使用如下api创建定时任务线程池。

ExecutorService pool = Executors.newScheduledThreadPool(1);

  使用如下api创建单线程的线程池。

ExecutorService pool = Executors.newSingleThreadExecutor();

  

  执行线程任务的代码示例。

        pool.execute(()->{
            System.out.println(Thread.currentThread().getName()+" execute");
        });

        pool.shutdown();

  或者使用Future模式。

        Future<String> future = pool.submit(() -> {
            System.out.println(Thread.currentThread().getName()+" execute !");
            return "SUCCESS";
        });

        String s = future.get();

        System.out.println(s);

  

  处理线程的异常,使用如下几种方式。重写ThreadFactory的未捕获异常的handler,处理异常。

        ExecutorService pool = Executors.newFixedThreadPool(5,r -> {
            Thread t = new Thread(r);
            t.setUncaughtExceptionHandler((t1,e)->{
                System.out.println("线程 :"+t1.getName()+",抛出异常:"+e.getMessage());
                e.printStackTrace();
            });
            return t;
        });

  继承线程池,重写afterHandler方法。

class ExecutorsExtend extends ThreadPoolExecutor{
    public ExecutorsExtend(int nThreads) {
        super(nThreads, nThreads,
                0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<Runnable>());
    }

    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        super.afterExecute(r, t);
        System.out.println("after handler invoke!!");
        try{
            if(r instanceof Future<?>){
                Object res = ((Future) r).get();
            }
        }catch (Exception e){
            e.printStackTrace();
        }
        if(t != null){
            t.printStackTrace();
        }
    }
}

  使用future方式捕获异常。

        Future future = pool.submit(()->{
            System.out.println(Thread.currentThread().getName()+" executes !");
            Object obj = null;
            System.out.println(obj.toString());
        });

        try{
            future.get();
        }catch (Exception e){
            System.out.println("future 方式捕获异常");
            e.printStackTrace();
        }

内部机制

  线程池的内部机制是在控制线程数以及阻塞队列的关系实现的,它的核心参数:核心线程数(corePoolSize)、最大线程数(maximumPoolSize)、存活时间(keepAliveTime)、阻塞队列(workQueue)。

  核心线程数,表示任务数只要少于这个值就一股脑儿地创建线程;最大线程数限制在阻塞队列无法存储任务的时候可以扩容的线城最大数目;存活时间就是一个线程能在空闲的时候存活多久的意思;阻塞队列,是存储任务的一个任务队列。

  它的构造方法有很详细的说明。

    /**
     * Creates a new {@code ThreadPoolExecutor} with the given initial
     * parameters and default thread factory and rejected execution handler.
     * It may be more convenient to use one of the {@link Executors} factory
     * methods instead of this general purpose constructor.
     *
     * @param corePoolSize the number of threads to keep in the pool, even
     *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
     * @param maximumPoolSize the maximum number of threads to allow in the
     *        pool
     * @param keepAliveTime when the number of threads is greater than
     *        the core, this is the maximum time that excess idle threads
     *        will wait for new tasks before terminating.
     * @param unit the time unit for the {@code keepAliveTime} argument
     * @param workQueue the queue to use for holding tasks before they are
     *        executed.  This queue will hold only the {@code Runnable}
     *        tasks submitted by the {@code execute} method.
     * @throws IllegalArgumentException if one of the following holds:<br>
     *         {@code corePoolSize < 0}<br>
     *         {@code keepAliveTime < 0}<br>
     *         {@code maximumPoolSize <= 0}<br>
     *         {@code maximumPoolSize < corePoolSize}
     * @throws NullPointerException if {@code workQueue} is null
     */
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }

  我们着重看它的execute方法。

  这里它执行每个关键步骤的时候都会去校验线程池的状态,是否关闭,线程数是否超过最大的限制等等。

  第一个if这里判断的是,线程数是否达到核心线程数,如果没达到就会去创建一个新的线程接受任务。

  第二个if这里判断的是,是否能放入阻塞队列,如果放入之后,其他已创建的线程执行完当前它的任务之后会去拉取阻塞队列中的任务继续执行;在成功放入阻塞队列中之后,还会继续校验线程池是否关闭,如果关闭则执行拒绝策略;接下来校验正在运行的线程数,是否为0,如果为0则增加一个没有当前任务的线程,去清空阻塞队列中的任务。

  最后一个条件这里,是尝试添加一个非核心线程,携带的是当前提交的任务,这是在阻塞队列添加失败的情况下才会出现;如果最后这个条件失败的话,就会触发拒绝策略拒绝接受任务。

  

    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn‘t, by returning false.
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }

  所以我们得出一个简单的结论,在添加线程少于核心线程数的时候会一直创建线程接受任务,大于核心线程数之后,就会添加到阻塞队列中缓存任务,一旦阻塞队列满了,就会创建非核心线程接受任务;如果都满了,则会拒绝接受任务。

  接下来我们看addWorker方法。它分为两部分,因为中间含有很多状态判断,所以变得比较难看懂。大体的意思是,先判断是否合规,合规之后创建线程并加入线程集合,最后跑线程。

    private boolean addWorker(Runnable firstTask, boolean core) {     // 这里主要判断是否达到核心线程数或最大线程数
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }
     // 这里主要是添加Worker,也就是新建线程
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }          // 在这里运行线程
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

  Worker这个对象是继承自AQS和线程任务接口Runnable的一个内部类。它执行任务的方法就是这个runWorker方法。

    private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
        /** Delegates main run loop to outer runWorker  */
        public void run() {
            runWorker(this);
        }

  这个runWorker方法,就是一个自旋跑任务的线程,跑完当前任务之后就会去getTask,从任务队列中获取任务。我们之前重写的afterExecute方法在这里可以看到了。

    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }

  退出线程的方法,这个方法会对缩容的时候有影响,如果阻塞队列还有任务它将缩小到1,如果没有就缩小到0。无论是否核心线程都会被释放掉。

    private void processWorkerExit(Worker w, boolean completedAbruptly) {
        if (completedAbruptly) // If abrupt, then workerCount wasn‘t adjusted
            decrementWorkerCount();

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            completedTaskCount += w.completedTasks;
            workers.remove(w);
        } finally {
            mainLock.unlock();
        }

        tryTerminate();

        int c = ctl.get();
        if (runStateLessThan(c, STOP)) {
            if (!completedAbruptly) {
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                if (min == 0 && ! workQueue.isEmpty())
                    min = 1;
                if (workerCountOf(c) >= min)
                    return; // replacement not needed
            }
            addWorker(null, false);
        }
    }

  

  这边有个小插曲,就是所有的状态它都定义在一个整型数的最高的三位。而线程池的最大线程数为Integer.SIZE-3,通过一堆位运算判断线程池的状态。

    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    private static final int COUNT_BITS = Integer.SIZE - 3;
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

    // runState is stored in the high-order bits
    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;

    // Packing and unpacking ctl
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
    private static int workerCountOf(int c)  { return c & CAPACITY; }
    private static int ctlOf(int rs, int wc) { return rs | wc; }

  

 

原文地址:https://www.cnblogs.com/chentingk/p/12183102.html

时间: 2024-07-30 00:46:10

Java线程池源码阅读的相关文章

java线程池源码解读

java线程池的顶级类是Executors 内置了几种线程池 1.newFixedThreadPool  并且重载了两个此方法  有固定线程数的线程池 当达到设置的线程数时 多余的任务会排队,当处理完一个马上就会去接着处理排队中的任务 源码如下 public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit

java线程池源码的简单分析

工作中用过线程池来进行多线程的操作,但是也仅仅是停留在使用方面,没有深入研究,现在通过源码来仔细研究下java的线程池.关于线程池的优缺点就不研究了,直接通过一个源码来看看java中线程池的原理. 使用ThreadPoolExecutor来创建一个线程池 public class MultipleThread { public static void main(String[] args) { /** * 通过ThreadPoolExecutor来创建一个线程池 * * 我们创建的线程池 核心线

java线程池源码的理解

线程池 所谓线程池,就是有一个池子,里面存放着已经创建好的线程,当有任务提交到线程池执行时,池子中的某个线程会主动执行该任务. 新建线程和切换线程的开销太大了,使用线程池可以节省系统资源. 线程池的关键类:ThreadPoolExecutor 主要流程 execute() –> addWorker() –>runWorker() -> getTask() 重要参数及变量 控制状态的变量 ctl: ctl是一个AtomicInteger原子操作类,能够保证线程安全. ctl变量定义如下:

面试官:你分析过线程池源码吗?

线程池源码也是面试经常被提问到的点,我会将全局源码做一分析,然后告诉你面试考啥,怎么答. 为什么要用线程池? 简洁的答两点就行. 降低系统资源消耗. 提高线程可控性. 如何创建使用线程池? JDK8提供了五种创建线程池的方法: 1.创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待. 1 public static ExecutorService newFixedThreadPool(int nThreads) { 2 return new ThreadPoolExecutor(

Java集合框架源码阅读之AbstractCollection

AbstractCollection是集合实现类的根抽象实现类,它实现了Collection接口,集合中的三个分支Set.List.Queue都是继承此类之后再进行各自实现的扩展,分别是AbstractSet.AbstractList.AbstractQueue.这三个分支有一些共同之处,需要用一些共同的方法,因此出现了AbstractCollection类,它包含了一些这三个分支都会用到的常用方法.而这三个分支也各有抽象类,因为这三个分支下面的一些具体实现也会有一些当前分支通用的方法,因此也给

【转】Java开源项目源码阅读方法及二次开发方法

一直以来,都想要阅读某些Java开源项目的源代码,甚至想要修改某些代码,实现对开源项目进行二次开发的目的.但总是不知从何入手,直接将开源项目的源代码导入Eclipse,总是会报很多错误,而无法编译.可以直接通过Eclipse打开开源项目的源代码,至少能够达到可视化源码阅读.源码导航的目的,还是能在一定程度上解决源码阅读不爽的问题,因为直接打开并没有改变源文件项目的目录结果,对于修改过后的代码,可以通过命令行找到源文件项目目录,并使用mvn或者ant对项目进行编译,再查看修改后的项目是否正确. 由

Java多线程——ReentrantReadWriteLock源码阅读

之前讲了<AQS源码阅读>和<ReentrantLock源码阅读>,本次将延续阅读下ReentrantReadWriteLock,建议没看过之前两篇文章的,先大概了解下,有些内容会基于之前的基础上阅读.这个并不是ReentrantLock简单的升级,而是落地场景的优化,我们来详细了解下吧. 背景 JUC包里面已经有一个ReentrantLock了,为何还需要一个ReentrantReadWriteLock呢?看看头注解找点线索.它是ReadWriteLock接口的实现.那看看这个接

Java并发编程中线程池源码分析及使用

当Java处理高并发的时候,线程数量特别的多的时候,而且每个线程都是执行很短的时间就结束了,频繁创建线程和销毁线程需要占用很多系统的资源和时间,会降低系统的工作效率. 参考http://www.cnblogs.com/dolphin0520/p/3932921.html 由于原文作者使用的API 是1.6 版本的,参考他的文章,做了一些修改成 jdk 1.8版本的方法,涉及到的内容比较多,可能有少许错误. API : jdk1.8.0_144 ThreadPoolExecutor类 Java中线

手撕ThreadPoolExecutor线程池源码

这篇文章对ThreadPoolExecutor创建的线程池如何操作线程的生命周期通过源码的方式进行详细解析.通过对execute方法.addWorker方法.Worker类.runWorker方法.getTask方法.processWorkerExit从源码角度详细阐述,文末有彩蛋. exexcte方法 public void execute(Runnable command) { if (command == null) throw new NullPointerException(); in