Core Java 谈谈 ThreadPoolExecutor

  说起Java 7的Executors框架的线程池,同学们能想到有几种线程池,它们分别是什么?

  一共有四个,它们分别是Executors的 newSingleThreadPool(), newCachedThreadPool(), newFixedThreadPool(),newScheduledThread(),四个静态方法,当然在java 8中还有一个newWorkStealingThreadPool()。

  但今天这些这些不是咱们今天要说的重点,今天要说的重点是里边所使用的ThreadPoolExecutor, 这个是咱们要说的重点,咱们打开newSingleThreadPool()的源码,这个定一个线程的线程池。

    /**
     * Creates an Executor that uses a single worker thread operating
     * off an unbounded queue. (Note however that if this single
     * thread terminates due to a failure during execution prior to
     * shutdown, a new one will take its place if needed to execute
     * subsequent tasks.)  Tasks are guaranteed to execute
     * sequentially, and no more than one task will be active at any
     * given time. Unlike the otherwise equivalent
     * <tt>newFixedThreadPool(1)</tt> the returned executor is
     * guaranteed not to be reconfigurable to use additional threads.
     *
     * @return the newly created single-threaded Executor
     */
    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

  重点来了,里边有5个参数,分别是核心线程数,也就是启动的时候线程池里需要有的线程数,如果设置allowsCoreThreadTimeOut,那么核心线程数就会在KeepAliveTime之后核心线程空闲的也会停止掉;第二个maximumPoolSize是最大线程数,注意,最大线程数是包括核心线程数的;KeepAliveTime是那些超过了核心线程的其他线程在单位时间内停止掉;TimeUnit是时间单位;BlockingQueue是阻塞队列,用于存放那些超过了核心执行线程之外的任务。现在有一个问题,如果任务数超过了核心线程数,那么多余的任务是进入maximumPoolSize,还是进入组则队列呢?

    /**
     * Creates a new {@code ThreadPoolExecutor} with the given initial
     * parameters and default thread factory and rejected execution handler.
     * It may be more convenient to use one of the {@link Executors} factory
     * methods instead of this general purpose constructor.
     *
     * @param corePoolSize the number of threads to keep in the pool, even
     *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
     * @param maximumPoolSize the maximum number of threads to allow in the
     *        pool
     * @param keepAliveTime when the number of threads is greater than
     *        the core, this is the maximum time that excess idle threads
     *        will wait for new tasks before terminating.
     * @param unit the time unit for the {@code keepAliveTime} argument
     * @param workQueue the queue to use for holding tasks before they are
     *        executed.  This queue will hold only the {@code Runnable}
     *        tasks submitted by the {@code execute} method.
     * @throws IllegalArgumentException if one of the following holds:<br>
     *         {@code corePoolSize < 0}<br>
     *         {@code keepAliveTime < 0}<br>
     *         {@code maximumPoolSize <= 0}<br>
     *         {@code maximumPoolSize < corePoolSize}
     * @throws NullPointerException if {@code workQueue} is null
     */
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }

  我写了一个测试的例子,让大家更好的理解它的原理。线程池满了之后会有一个rejection的策略,所以我提前写了简单的一个。

package com.hqs.core;

import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;

public class RejectedFullHandler implements RejectedExecutionHandler {

    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        System.out.println(r.toString() + " I am rejected!");
    }

}

  然后写线程池测试类,为了大家的阅读方便,我添加了一些注释。

package com.hqs.core;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ThreadPoolExecutorTest implements Runnable {
    private int i;
    private CountDownLatch cdl;
    public ThreadPoolExecutorTest(int i, CountDownLatch cdl) {
        this.i = i;
        this.cdl = cdl;
    }
    public void run() {
        try {
            TimeUnit.SECONDS.sleep(3);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(i + " is running");
        cdl.countDown();
    }
    @Override
    public String toString() {
        return "i:" + i;
    }

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        BlockingQueue queue = new ArrayBlockingQueue<>(2); //定义一个阻塞队列,长度为2
        CountDownLatch cdl = new CountDownLatch(5); //设置一个减门闩,用于递减动作
     //定义了2个核心线程,3个最大线程,空闲线程持续5秒钟,阻塞队列,拒绝处理执行类      ThreadPoolExecutor threadPool = new ThreadPoolExecutor(2, 3, 5, TimeUnit.SECONDS, queue, new RejectedFullHandler()); 
     
        //threadPool.allowCoreThreadTimeOut(true); //设置是否将空闲的核心线程超时后停止

        for(int i = 1; i <= 6; i++ ) {
            ThreadPoolExecutorTest t1 = new ThreadPoolExecutorTest(i, cdl);
            threadPool.execute(t1);
//            FutureTask future = (FutureTask)threadPool.submit(t1); //此处注释,因为submit传的参数是Runnable而不是Callable,所以返回结果为null
            System.out.println("queue content:" + queue.toString()); //将queue的内容打印出来

        }
        try {
            cdl.await(); //所有线程执行完之后,主线程继续往下进行。
        } catch (InterruptedException e1) {
            e1.printStackTrace();
        }

        try {
            TimeUnit.SECONDS.sleep(6);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        System.out.println("poosize :" + threadPool.getPoolSize()); //输出PoolSize
        threadPool.shutdown();  //注意,所以线程池的最后都必须要显示的关闭

    }
}

queue content:[]
queue content:[]
queue content:[i:3]
queue content:[i:3, i:4]
queue content:[i:3, i:4]
i:6 I am rejected!
queue content:[i:3, i:4]
1 is running
2 is running
5 is running
3 is running
4 is running
poosize :2

  我简单解释一下这个输出结果,首先线程池初始化的时候启动两个线程,这时1,2进入到池子进行执行,但是1,2程序还没有执行完,紧接着3,4被放到了Queue队列里边,程序发现最大线程池是3个,目前2个核心线程池正在执行,还没有达到最大值,所以启用一个线程,执行5。当6进来的发现池子已经满了,队列也满了,此时只能reject掉了,所以会走reject的异常处理。

  在此,程序执行的顺序为 corePoolSize -> workQueue -> maximumPoolSize-corePoolSize -> RejectExecutionHandler (如果池子满了)

  还有一个比较重要的点,也是同学们想知道的点,那就是execute()和submit()的区别是什么? 

  1. execute方法是void的,没有返回值,而submit()方法返回的是Future对象。当然还有一些抛出的异常会不一致,这里就不详述了。

  2. execute方法是在Executor的service中定义的,而submit方法是在ExecutorService中定义的。

  3. execute方法只支持Runnable参数,而submit方法除了支持Runnable外还支持Callable参数,也就意味着通过submit执行的结果是可以返回的,通过Future.get()方法,也就意味着并发计算的结果是可以返回的(这就是两者为什么区别的根本原因)

  那么很多同学也会问,两个方法都是异步执行的,那什么情况下用execute或submmit方法呢?

  比如多线程并行执行,不需要执行结果返回的时候一般使用execute方法,如果需要多线程并行计算,并且都需要返回结果的时候,需要submmit方法,当然submmit一般情况下是blocking的,如果想在制定的时间取回结果,如果取不到就会抛异常是通过get()方法设置参数,一般同时处理大量文件的时候,将开启多个线程对文件内容分别处理并将结果返回的再统计或汇总的时候比较方便。

  

p.p1 { margin: 0.0px 0.0px 0.0px 0.0px; font: 10.0px Helvetica }

时间: 2024-10-18 10:29:41

Core Java 谈谈 ThreadPoolExecutor的相关文章

java多线程之路之同步器—Core Java学习

今天为大家介绍几种java内置的同步器. CountDownLatch:倒计数门栓 CountDownLatch让一个线程集等待直到计数变为0.该Latch为一次性的,一旦计数为0,就不能再使用了. Sample 1: public class Driver { public static void main(String[] args) throws InterruptedException { CountDownLatch startSignal = new CountDownLatch(1

Core Java (十一) Java 继承,类,超类和子类

Core Java (十一) Java 继承,类,超类和子类 标签: javaJavaJAVA 2013-01-22 17:08 1274人阅读 评论(0) 收藏 举报  分类: java(58) 读书笔记(46)  版权声明:本文为博主原创文章,未经博主允许不得转载. 目录(?)[+] 继承关系 两个类之间存在三种关系: 依赖,uses-a,如果一个类的方法操纵另一个对象,我们就说一个类依赖于另一个类. 聚合(关联),has-a,一个对象包含另外一个对象,聚合关系意味着类A的对象包含类B的对象

Core Java 学习笔记——1.术语 环境配置/Eclipse汉化字体快捷键/API文档

今天起开始学习Java,学习用书为Core Java.之前有过C的经验.准备把自己学习这一本书时的各种想法,不易理解的,重要的都记录下来.希望以后回顾起来能温故知新吧.也希望自己能够坚持把自己学习这本书的整个过程记录下来. I want to put a ding in the universe. 基本术语:       Object Oriented Programming——OOP——面向对象编程 Application Programming Interface——API——应用程序编程接

Core Java 笔记

Core Java 的读书笔记,持续更新中... Core Java笔记 1.对象与类 Core Java笔记 2.继承 Core Java笔记 3.反射

Core Java的那点事儿之ArrayList

Core Java的那点事儿之ArrayList 万丈高楼平地起,Java基础要拿起.今天就从我看的Core Java里找了些小基础点来分享一下. 首先隆重介绍一下专业级龙套演员---Employee类(PS:我可是专注龙套30年),下面会有多次出场,因此先在此介绍一下: 1 class Employee{ 2 private String name; 3 private double salary; 4 private int id; 5 6 //下面是set.get方法 7 } ArrayL

Java中ThreadPoolExecutor的使用规则

public ThreadPoolExecutor(                 int corePoolSize,                 int maximumPoolSize,                 long keepAliveTime,                 TimeUnit unit,                 BlockingQueue workQueue) corePoolSize 指的是保留的线程池大小. maximumPoolSize 指的

java学习笔记(Core Java)1-3

要准备学习下java了,按着<core java>的内容,简单的做了一下笔记.这本书有很多地方对C++和java的语法作了对比,所以对于从C++向java方向转的人来说,非常有利! javac xxx.java java xxx java applet: appletview xxx.html (浏览器加载) 第三章 基本类型1) 对大小写敏感 强调main方法时公有的 2)java没有无符号类型3)float后面必须有F 标记,double 也可以加上D4) 错误溢出:正无穷 负无穷 NaN

Top 10 tough core Java interview questions answers programming

Tough core Java interview questions and answersWhat is tough core java interview question ? Why do people look for tough Java questions before going for interview? well I don't thing I need to answer these tough questions because its pretty natural t

Core Java 简单谈谈HashSet

同学们在看这个问题的时候,我先提出者两个问题,然后大家带着问题看这个文章会理解的更好. HashSet为什么添加元素时不能添加重复元素? HashSet是否添加null元素? 打开源码, 我们看到如下代码,我们看到HashSet也有一个HashMap做为属性,HashSet()的构造方法就是将这个map实例化.如果大家对HashMap还不了解话,可以看我的这篇博文.还要注意有一个静态final的对象PRESENT,这个是干什么用的,咱们继续往下看. private transient HashM