ThreadPoolExecutor参数解析

ThreadPoolExecutor是一个非常重要的类,用来构建带有线程池的任务执行器,通过配置不同的参数来构造具有不同规格线程池的任务执行器

写在前面的是:

线程池任务执行器,线程池的定义比较直接,可以看做多个线程的集合。而任务执行器的概念比较的具有针对性,它用来执行任务,通过对线程池的管理实现多任务的并发,是线程池的载体。

线程和任务的区别,线程不是任务,线程是用来执行任务的。

队列是用来存放任务的,不是用来存放线程的。

主要的几个参数解析:

  • 核心线程数(core pool sizes)和最大线程数(maxmum  pool sizes)

一开始两者的存在很让人摸不着头脑,简单的想法是用一个线程数(pool size)表示线程池的大小不就完了吗,不到规定的线程数就创建新的线程来执行新的任务,到了规定的线程数就等待其他线程处理完成,怎么还出现两个控制线程数的参数?

那这两个参数是什么意思干什么用的?

核心线程数:这个数与上面那个简单想法中的数有一个共同点,就是如果当前线程数达不到核心线程数时,不会使用已有的空闲的线程(如果有的话),来了新任务就会创建新的线程。

如果当前线程数达到核心线程数,而且没有空闲线程,那么来了新任务是否要创建新的线程呢?这取决于两点:

  1. 当前的任务队列是否已满。
  2. 线程池的最大线程数。

通过这个问题可以引出最大线程数的概念

最大线程数 : 最大线程数是和任务队列匹配使用的,确切的说是和有长度限制的任务队列(即有界任务队列)匹配使用的。

补充回答上面的问题,ThreadPoolExecutor的线程池拥有一个任务队列,这个任务队列只有在当前线程数>核心线程数的时候才开始使用,如果该线程池使用的任务队列是有界队列,比如10,那么当该队列被新任务填满时也就是说队列中有10个新任务时ThreadPoolExecutor才会创建一个新的线程来执行队列中的一个任务,如果再发生队列被填满,而且依旧没有空闲线程时ThreadPoolExecutor再次创建新的线程,一旦线程的数量等于最大线程数就不再创建新的线程了,如果此时队列中还有10个任务,那么新来的任务就会被拒绝(reject)。

上述是针对有界队列,如果这个任务执行器的队列是无界队列呢?

由于无界队列不会被填满,所以永远不能达到创建新线程所需要的条件,所以也就不会有新线程被创建,所以最大线程数在这种情况下也就失去了其存在的意义。

  • 线程空闲存活时间(keepAliveTime)

在介绍上面的核心线程数和最大线程数时有提到空闲的线程,所谓空闲的线程就是执行完任务之后闲着的线程。

超过这个时间会使得那么核心线程之外的空闲线程被杀死,如果想把这个时间也作用在核心线程上需要设置allowCoreThreadTimeOut(boolean)为true

这里有必要说一下的是,任务执行器如何实现线程的重复利用,当任务执行器执行execute(task)的时候会创建一个worker,它是一个Runnable类,可以看做task的载体,worker包含一个thread对象,这个thread启动的时候执行worker本身的run方法,这样worker和线程就融为一体。当worker的thread start的时候,就会执行worker的run方法,而worker的run会调用任务执行器的runWorker(worker),并将自身传递过去,意思是任务执行器启动了一个worker,而线程重复利用关键就在runWorker中,在启动了一个worker后,worker会从任务执行器中寻找可以运行的任务,而一开始创建worker使用的task就是它的第一个任务。

下面是jdk1.7的源码

//执行一个任务 task

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true)) // 将task装配到一个worker中
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }

 

添加一个worker 

private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            final ReentrantLock mainLock = this.mainLock;
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int c = ctl.get();
                    int rs = runStateOf(c);

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {//如果worker创建成功,就启动它的对应的thread
                    t.start(); //worker中的tread启动
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

运行worker

final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {//这里是关键,使用一个while来寻找任务执行器中(主要还是从任务队列中获取)还未执行的task。
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }

  

 

时间: 2024-10-12 15:33:55

ThreadPoolExecutor参数解析的相关文章

百度URL参数解析

百度URL参数解析 在用Python爬取百度搜索的内容时,发现百度搜索的url非常的长,往往会跟一大段的参数,但其实很多参数都是没有必要的,如同样是搜索java关键字,可以通过 http://www.baidu.com/s?wd=java 如此简单的URL来获取,而下面这个超级复杂的URL也是同样进行了关键字java的搜索: http://www.baidu.com/s?wd=java&rsv_spt=1&rsv_iqid=0xd3c8c51900052eb3&issp=1&

第5章4节《MonkeyRunner源码剖析》Monkey原理分析-启动运行: 命令行参数解析(原创)

天地会珠海分舵注:本来这一系列是准备出一本书的,详情请见早前博文"寻求合作伙伴编写<深入理解 MonkeyRunner>书籍".但因为诸多原因,没有如愿.所以这里把草稿分享出来,所以错误在所难免.有需要的就参考下吧,转发的话还请保留每篇文章结尾的出处等信息. 设置好Monkey的CLASSPATH环境变量以指定"/system/framework /framework/monkey.jar"后,/system/bin/monkey这个shell脚本就会通

把URL参数解析成一个Json对象

问题:请编写一个JavaScript函数parseQueryString,它的用途是把URL参数解析为一个对象.           eg:var obj=parseQueryString(url); 创建对象的三种形式:        一:                var Person=new Object();                Person.name="Sun";                Person.age=24;        二:         

QEMU 2: 参数解析

一.使用gdb分析QEMU代码 使用gdb不仅可以很好地调试代码,也可以利用它来动态地分析代码.使用gdb调试QEMU需要做一些准备工作: 1, 编译QEMU时需要在执行configure脚本时的参数中加入–enable-debug. 2, 从QEMU官方网站上下载一个精简的镜像--linux-0.2.img.linux-0.2.img只有8MB大小,启动后包含一些常用的shell命令,用于QEMU的测试. $wget http://wiki.qemu.org/download/linux-0.

四.jQuery源码解析之jQuery.fn.init()的参数解析

从return new jQuery.fn.init( selector, context, rootjQuery )中可以看出 参数selector和context是来自我们在调用jQuery方法时传过来的.那么selector和context都有哪些可能. 对于表格中的4~9行中的可能做具体分析. 如果selector是字符串,则首先检测是html代码还是#id. 126行的if语句:以"<"开头,以">"结尾,且长度>=3.则假设额这个是HT

document.execCommand()函数可用参数解析

隐藏在暗处的方法-execCommand() 关键字: javascript document document.execCommand()方法可用来执行很多我们无法实现的操作. execCommand方法是执行一个对当前文档,当前选择或者给出范围的命令. document.execCommand()方法处理Html数据时常用语法格式如下: 复制内容到剪贴板 代码: document.execCommand(sCommand[,交互方式, 动态参数]) 其中:sCommand为指令参数(如下例中

PHP 命令行参数解析工具类

<?php /** * 命令行参数解析工具类 * @author guolinchao */ class CommandLine { // 临时记录短选项的选项值 private static $shortOptVal = null; // options value private static $optsArr = array(); // command args private static $argsArr = array(); // 是否已解析过命令行参数 private static

CImageList类Create函数参数解析

前面提到了CImageList类的Create(...)函数,虽然MSDN上已经有所解释,但仍有网友问到参数的具体含义,下面就我的理解,对参数进行一次轻量级的剖析 函数原型(其他重载函数请参看msdn):   BOOL Create( int cx, int cy, UINT nFlags, int nInitial, int nGrow ); cx ,cy: 图片的实际像素宽与高,没有问题 nFlags:创建图像列表的类型,包括4/8/16/24/32/位色, nInitial : 创建Ima

java笔试之参数解析(正则匹配)

在命令行输入如下命令: xcopy /s c:\ d:\, 各个参数如下: 参数1:命令字xcopy 参数2:字符串/s 参数3:字符串c:\ 参数4: 字符串d:\ 请编写一个参数解析程序,实现将命令行各个参数解析出来. 解析规则: 1.参数分隔符为空格 2.对于用""包含起来的参数,如果中间有空格,不能解析为多个参数.比如在命令行输入xcopy /s "C:\program files" "d:\"时,参数仍然是4个,第3个参数应该是字符串C