private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } }
Worker 是ThreadpoolExecutor的内部类
private final class Worker extends AbstractQueuedSynchronizer implements Runnable
继承了aqs,实现了runnable接口,
aqs是一个同步队列,是reentrantlock的实际实现者,里面有一个 volatile 的 state属性,用cas操作来保证同步
runnable可以执行run方法,作为线程的逻辑实现。
言归正传,c = ctl.get(); ctl是一个原子类,
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
初始化的时候被赋值:RUNNING 是-1左移29位,也就是前三位是1,其他都是0
ctlof是把两个参数或操作,和0或不变
c= ctl.get()方法,因为c开始是负值,所以如果c自增,那么,前三位不变,从后面开始加
runstateof方法,
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
private static int runStateOf(int c) { return c & ~CAPACITY; }
1左移29位到30位,减一,后面29位都为1,再取反,前面3位为1,后面29位为0,这个和-1左移29位一样。
c和 这个数相与,前三位不变,后面的都变为0,最后得到一个负数,所以一般和shutdown也就是0相比较,是不会大于等于0的。
一般第一个if条件进不去(如果线程增加的很多,导致负数变为0或者正数,如果正好为0,那么 要么firsttalk不为空,要么workqueue为空,才能返回false)
workerCountOf()方法, 返回除了 前三位 之后的位数,也就是当前线程数大小,如果大于等于 2的29次方,或者大于设定的核心线程或者最大线程数,表明已经不能再增加线程了,返回false
如果
if (compareAndIncrementWorkerCount(c))
break retry;
break retry 是跳出所有循环,向下走,如果不用retry,只能跳出一层循环。
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
如果cas失败,说明有并发操作已经更改了workercount,那么再获取一次
如果runstate已经变了,用continue retry,说明,加入的线程数量已经使得ctl从负数变为0了,那么重新判断一次shutdown,
boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { final ReentrantLock mainLock = this.mainLock; w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int c = ctl.get(); int rs = runStateOf(c); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
获取aqs的worker里面,取当前属性中的线程,在new worker的时候,会
Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); }
new一个线程,引用付给thread,然后state设为-1.
所以后面是能取到thread的,然后加锁,如果rs小于0,说明线程并没有大到限值,(或者等于0,但是传入的task为空?)
如果当前线程已经是启动了start方法了,那么说明有异常,否则,将worker加入workers,然后启动线程。