ThreadPoolExecutor 的 addworker方法

private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

Worker 是ThreadpoolExecutor的内部类

private final class Worker extends AbstractQueuedSynchronizer implements Runnable

继承了aqs,实现了runnable接口,

aqs是一个同步队列,是reentrantlock的实际实现者,里面有一个 volatile 的 state属性,用cas操作来保证同步

runnable可以执行run方法,作为线程的逻辑实现。

言归正传,c = ctl.get(); ctl是一个原子类,

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

初始化的时候被赋值:RUNNING 是-1左移29位,也就是前三位是1,其他都是0

ctlof是把两个参数或操作,和0或不变

c= ctl.get()方法,因为c开始是负值,所以如果c自增,那么,前三位不变,从后面开始加

runstateof方法,

private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

private static int runStateOf(int c)     { return c & ~CAPACITY; }

1左移29位到30位,减一,后面29位都为1,再取反,前面3位为1,后面29位为0,这个和-1左移29位一样。

c和 这个数相与,前三位不变,后面的都变为0,最后得到一个负数,所以一般和shutdown也就是0相比较,是不会大于等于0的。

一般第一个if条件进不去(如果线程增加的很多,导致负数变为0或者正数,如果正好为0,那么 要么firsttalk不为空,要么workqueue为空,才能返回false)

workerCountOf()方法, 返回除了 前三位 之后的位数,也就是当前线程数大小,如果大于等于 2的29次方,或者大于设定的核心线程或者最大线程数,表明已经不能再增加线程了,返回false

如果

if (compareAndIncrementWorkerCount(c))
break retry;

break retry 是跳出所有循环,向下走,如果不用retry,只能跳出一层循环。

c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)

如果cas失败,说明有并发操作已经更改了workercount,那么再获取一次

如果runstate已经变了,用continue retry,说明,加入的线程数量已经使得ctl从负数变为0了,那么重新判断一次shutdown,

  boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            final ReentrantLock mainLock = this.mainLock;
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int c = ctl.get();
                    int rs = runStateOf(c);

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

获取aqs的worker里面,取当前属性中的线程,在new worker的时候,会

 Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }

new一个线程,引用付给thread,然后state设为-1.

所以后面是能取到thread的,然后加锁,如果rs小于0,说明线程并没有大到限值,(或者等于0,但是传入的task为空?)

如果当前线程已经是启动了start方法了,那么说明有异常,否则,将worker加入workers,然后启动线程。

时间: 2024-11-26 12:49:19

ThreadPoolExecutor 的 addworker方法的相关文章

concurrent包分析之Executor框架

文章目录 线程生命周期的开销:线程比较少的情况使用new Thread(task)无多大影响,但是如果涉及到线程比较多的情况,应用的性能就会受到影响,如果jdbc创建连接一样,new Thead创建线程也会耗资源.耗时间的. 资源的消耗量:活动线程会消耗系统性能,如果运行的线程数量多余可用的处理器数,那么就会有大量空闲的线程占用内存,会给垃圾收集器带来压力,如果有cpu资源竞争,还会有其他性能开销. 限定创建线程的数目:如果不设定创建线程的数量,一个任务一个线程无限创建线程,高负载情况下就有可能

12.ThreadPoolExecutor线程池原理及其execute方法

jdk1.7.0_79  对于线程池大部分人可能会用,也知道为什么用.无非就是任务需要异步执行,再者就是线程需要统一管理起来.对于从线程池中获取线程,大部分人可能只知道,我现在需要一个线程来执行一个任务,那我就把任务丢到线程池里,线程池里有空闲的线程就执行,没有空闲的线程就等待.实际上对于线程池的执行原理远远不止这么简单. 在Java并发包中提供了线程池类——ThreadPoolExecutor,实际上更多的我们可能用到的是Executors工厂类为我们提供的线程池:newFixedThread

ThreadPoolExecutor机制探索-我们到底能走多远系列(41)

我们到底能走多远系列(41) 扯淡: 这一年过的不匆忙,也颇多感受,成长的路上难免弯路,这个世界上没人关心你有没有变强,只有自己时刻提醒自己,不要忘记最初出发的原因. 其实这个世界上比我们聪明的人无数,很多人都比我们努力,当我门奇怪为什么他们可以如此轻松的时候,是不会问他们付出过什么.怨天尤人是无用的,使自己变好,哪怕是变好一点点,我觉得生活着就是有意义的. 未来,太远.唯有不停的积累,不要着急,抓得住的才能叫机会. 羊年,一定要不做被动的人.大家加油! 目录留白: 主题: 直接进ThreadP

ThreadPoolExecutor的一点理解

整个ThreadPoolExecutor的任务处理有4步操作: 第一步,初始的poolSize < corePoolSize,提交的runnable任务,会直接做为new一个Thread的参数,立马执行 第二步,当提交的任务数超过了corePoolSize,就进入了第二步操作.会将当前的runable提交到一个block queue中 第三步,如果block queue是个有界队列,当队列满了之后就进入了第三步.如果poolSize < maximumPoolsize时,会尝试new 一个Th

J.U.C ThreadPoolExecutor解析

Java里面线程池顶级接口是Executor,但严格意义上讲Executor并不是一个线程池,而是一个线程执行工具,真正的线程池接口是ExecutorService.关系类图如下: 首先Executor的execute方法只是执行一个Runnable任务而已,当然从某种角度上讲最后的实现类也是在线程中启动该任务. void execute(Runnable command); ExecutorService在Executor的基础上增加了一些任务提交方法: <T> Future<T>

ThreadPoolExecutor 线程池的实现

---恢复内容开始--- ThreadPoolExecutor继承自 AbstractExecutorService.AbstractExecutorService实现了 ExecutorService 接口. 首先是ThreadPoolExecutor的构造方法: public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Ru

Java并发包源码学习之线程池(一)ThreadPoolExecutor源码分析

Java中使用线程池技术一般都是使用Executors这个工厂类,它提供了非常简单方法来创建各种类型的线程池: public static ExecutorService newFixedThreadPool(int nThreads) public static ExecutorService newSingleThreadExecutor() public static ExecutorService newCachedThreadPool() public static Scheduled

ThreadPoolExecutor源码解析

java doc中的解释是: An ExecutorService that executes each submitted task using one of possibly several pooled threads, normally configured using Executors factory methods. 一个使用线程池来执行提交的任务的ExecutorService子类,正常通过Executors工具类中的工厂方法进行配置. 那我们就先看一下比较熟悉的Executor

Java并发---- Executor并发框架--ThreadToolExecutor类详解(execute方法)

1.构造方法 请参考上篇文章:http://blog.csdn.net/ochangwen/article/details/53044733 2.源码详解 线程池内部有一些状态,先来了解下这些状态的机制.以下用代码注释的方式来解释其中的含义. /* 这个是用一个int来表示workerCount和runState的,其中runState占int的高3位, 其它29位为workerCount的值. workerCount:当前活动的线程数: runState:线程池的当前状态. 用AtomicIn