Java线程池实现原理与技术

本文将通过实现一个简易的线程池理解线程池的原理,以及介绍JDK中自带的线程池ThreadPoolExecutor和Executor框架。

1.无限制线程的缺陷

多线程的软件设计方法确实可以最大限度地发挥多核处理器的计算能力,提高生产系统的吞吐量和性能。但是,若不加控制和管理的随意使用线程,对系统的性能反而会产生不利的影响。

一种最为简单的线程创建和回收的方法类似如下:

        new Thread(new Runnable() {
            @Override
            public void run() {
                //do sth
            }
        }).start();

以上代码创建了一条线程,并在run()方法结束后,自动回收该线程。在简单的应用系统中,这段代码并没有太多问题。但是在真实的生产环境中,系统由于真实环境的需要,可能会开启很多线程来支撑其应用。而当线程数量过大时,反而会耗尽CPU和内存资源。

首先,虽然与进程相比,线程是一种轻量级的工具,但其创建和关闭依然需要花费时间,如果为每一个小的任务都创建一个线程,很有可能出现创建和销毁线程所占用的时间大于该线程真实工作所消耗的时间,反而会得不偿失。

其次,线程本身也是要占用内存空间的,大量的线程会抢占宝贵的内部资源。

因此,在实际生产环境中,线程的数量必须得到控制。盲目地大量创建线程对系统性能是有伤害的。

2.简单的线程池实现

下面给出一个最简单的线程池,该线程池不是一个完善的线程池,但已经实现了一个基本线程池的核心功能,有助于快速理解线程池的实现。

1.线程池的实现

public class ThreadPool {
    private static ThreadPool instance = null;

    //空闲的线程队列
    private List<PThread> idleThreads;
    //已有的线程总数
    private int threadCounter;
    private boolean isShutDown = false;

    private ThreadPool() {
        this.idleThreads = new Vector<>(5);
        threadCounter = 0;
    }

    public int getCreatedThreadCounter() {
        return threadCounter;
    }

    //取得线程池的实例
    public synchronized static ThreadPool getInstance() {
        if (instance == null) {
            instance = new ThreadPool();
        }
        return instance;
    }

    //将线程池放入池中
    protected synchronized void repool(PThread repoolingThread) {
        if (!isShutDown) {
            idleThreads.add(repoolingThread);
        } else {
            repoolingThread.shutDown();
        }
    }

    //停止池中所有线程
    public synchronized void shutDown() {
        isShutDown = true;
        for (int threadIndex = 0; threadIndex < idleThreads.size(); threadIndex++) {
            PThread pThread = idleThreads.get(threadIndex);
            pThread.shutDown();
        }
    }

    //执行任务
    public synchronized void start(Runnable target) {
        PThread thread = null;
        //如果有空闲线程,则直接使用
        if (idleThreads.size() > 0) {
            int lastIndex = idleThreads.size() - 1;
            thread = idleThreads.get(lastIndex);
            idleThreads.remove(thread);
            //立即执行这个任务
            thread.setTarget(target);
        }//没有空闲线程,则创建线程
        else {
            threadCounter++;
            //创建新线程
            thread = new PThread(target, "PThread #" + threadCounter, this);
            //启动这个线程
            thread.start();
        }
    }

}

2.要实现上面的线程池,就需要一个永不退出的线程与之配合。PThread就是一个这样的线程。它的主体部分是一个无限循环,该线程在手动关闭前永不结束,并一直等待新的任务到达。

public class PThread extends Thread {
    //线程池
    private ThreadPool pool;
    //任务
    private Runnable target;
    private boolean isShutDown = false;
    private boolean isIdle = false; //是否闲置
    //构造函数
    public PThread(Runnable target,String name, ThreadPool pool){
        super(name);
        this.pool = pool;
        this.target = target;
    }

    public Runnable getTarget(){
        return target;
    }

    public boolean isIdle() {
        return isIdle;
    }

    @Override
    public void run() {
        //只要没有关闭,则一直不结束该线程
        while (!isShutDown){
            isIdle =  false;
            if (target != null){
                //运行任务
                target.run();
            }
            try {
                //任务结束了,到闲置状态
                isIdle = true;
                pool.repool(this);
                synchronized (this){
                    //线程空闲,等待新的任务到来
                    wait();
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            isIdle = false;
        }
    }

    public synchronized void setTarget(Runnable newTarget){
        target = newTarget;
        //设置了任务之后,通知run方法,开始执行这个任务
        notifyAll();
    }

    //关闭线程
    public synchronized void shutDown(){
        isShutDown = true;
        notifyAll();
    }

}

3.测试Main方法

    public static void main(String[] args) throws InterruptedException {
        for (int i = 0; i < 1000; i++) {
            ThreadPool.getInstance().start(new Runnable() {
                @Override
                public void run() {
                    try {
                        //休眠100ms
                        Thread.sleep(100);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
        }
    }

3.ThreadPoolExecutor

为了能够更好地控制多线程,JDK提供了一套Executor框架,帮助开发人员有效地进行线程控制。Executor框架无论是newFixedThreadPool()方法、newSingleThreadExecutor()方法还是ewCachedThreadPool()方法,其内部实现均使用了 ThreadPoolExecutor:

    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

由以上线程池的实现代码可以知道,它们只是对 ThreadPoolExecutor 类的封装。为何 ThreadPoolExecutor 类有如此强大的功能?来看一下 ThreadPoolExecutor 最重要的构造方法。

4.1 构造方法

ThreadPoolExecutor最重要的构造方法如下:

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)

方法参数如下:

参数 说明
corePoolSize 指定了线程池中的线程数量
maximumPoolSize 指定了线程池中最大的线程数量
keepAliveTime 当线程池线程数量超过corePoolSize时,多余的空闲线程的存活时间。
即,超过corePoolSize的空闲线程,在多长时间内会被销毁
unit keepAliveTime 的单位,如:TimeUnit.SECONDS
workQueue 任务队列,被提交但尚未被执行的任务。
threadFactory 线程工厂,用于创建线程,一般用默认的即可。
handler 拒绝策略。当任务太多来不及处理,如何拒绝任务。

ThreadPoolExecutor的使用示例,通过execute()方法提交任务。

    public static void main(String[] args) {
        ThreadPoolExecutor executor = new ThreadPoolExecutor(4, 5, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<>());
        for (int i = 0; i < 10; i++) {
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    System.out.println(Thread.currentThread().getName());
                }
            });
        }
        executor.shutdown();
    }

或者通过submit()方法提交任务

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ThreadPoolExecutor executor = new ThreadPoolExecutor(4, 5, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<>());
        List<Future> futureList = new Vector<>();
        //在其它线程中执行100次下列方法
        for (int i = 0; i < 100; i++) {
            futureList.add(executor.submit(new Callable<String>() {
                @Override
                public String call() throws Exception {
                    return Thread.currentThread().getName();
                }
            }));
        }
        for (int i = 0;i<futureList.size();i++){
            Object o = futureList.get(i).get();
            System.out.println(o.toString());
        }
        executor.shutdown();
    }

运行结果:

...
pool-1-thread-4
pool-1-thread-3
pool-1-thread-2

下面主要讲解ThreadPoolExecutor的构造方法中workQueue和RejectedExecutionHandler参数,其它参数都很简单。

4.2 workQueue任务队列

用于保存等待执行的任务的阻塞队列。可以选择以下几个阻塞队列。

  • ArrayBlockingQueue: 是一个基于数组结构的有界阻塞队列,按FIFO原则进行排序
  • LinkedBlockingQueue: 一个基于链表结构的阻塞队列,吞吐量高于ArrayBlockingQueue。静态工厂方法Excutors.newFixedThreadPool()使用了这个队列
  • SynchronousQueue: 一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量高于LinkedBlockingQueue,静态工厂方法Excutors.newCachedThreadPool()使用了这个队列
  • PriorityBlockingQueue: 一个具有优先级的无限阻塞队列。

4.3 RejectedExecutionHandler饱和策略

当队列和线程池都满了,说明线程池处于饱和状态,那么必须采取一种策略还处理新提交的任务。它可以有如下四个选项:

  • AbortPolicy : 直接抛出异常,默认情况下采用这种策略
  • CallerRunsPolicy : 只用调用者所在线程来运行任务
  • DiscardOldestPolicy : 丢弃队列里最近的一个任务,并执行当前任务
  • DiscardPolicy : 不处理,丢弃掉

更多的时候,我们应该通过实现RejectedExecutionHandler 接口来自定义策略,比如记录日志或持久化存储等。

4.2 submit()与execute()

可以使用execute和submit两个方法向线程池提交任务。

  1. execute方法用于提交不需要返回值的任务,利用这种方式提交的任务无法得知是否正常执行
  2. submit方法用于提交一个任务并带有返回值,这个方法将返回一个Future类型对象。可以通过这个返回对象判断任务是否执行成功,并且可以通过future.get()方法来获取返回值,get()方法会阻塞当前线程直到任务完成。

4.3 shutdown()与shutdownNow()

可以通过调用 shutdown()shutdownNow() 方法来关闭线程池。它们的原理是遍历线程池中的工作线程,然后逐个调用线程的 interrupt 方法来中断线程,所以无法响应中断的任务可能永远无法停止。

这俩方法的区别是,shutdownNow() 首先将线程池的状态设置成STOP,然后尝试停止所有的正在执行或暂停任务的线程,并返回等待执行任务的列表,而 shutdown() 只是将线程池的状态设置成 SHUTDOWN 状态,然后中断所有没有正在执行任务的线程。

只要调用了这两个关闭方法的任意一个,isShutdown 方法就会返回 true。当所有的任务都已关闭了,才表示线程池关闭成功,这时调用 isTerminaced 方法会返回 true。

通常调用 shutdown() 方法来关闭线程池,如果任务不一定要执行完,则可以调用 shutdownNow() 方法。

4.4 合理配置线程池

要想合理地配置线程池,首先要分析任务特性

  • 任务的性质:CPU密集型任务、IO密集型任务和混合型任务。
  • 任务的优先级:高、中和低。
  • 任务的执行时间:长、中和短。
  • 任务的依赖性:是否依赖其他系统资源,如数据库连接。

性质不同的任务可以用不同规模的线程池分开处理。

CPU密集型任务应该配置尽可能少的线程,如配置N+1个线程,N位CPU的个数。

而IO密集型任务线程并不是一直在执行任务,则应配置尽可能多的线程,如2*N。

混合型任务,如果可以拆分,将其拆分成一个CPU密集型任务和一个IO密集型任务,只要这两个任务执行的时间相差不是太大,那么分解后执行的吞吐量将高于串行执行的吞吐量。如果这两个任务执行的时间相差很大,则没有必要进行分解。可以通过Runtime.getRuntime().availableProcessors()方法获得当前设备的CPU个数。

优先级不同的任务可以使用优先级队列PriorityBlockingQueue来处理。它可以让优先级高的任务先执行。

4.5 线程池的监控

由于大量的使用线程池,所以很有必要对其进行监控。可以通过继承线程池来自定义线程池,重写线程池的beforeExecute、afterExecute 和 terminated 方法,也可以在任务执行前,执行后和线程池关闭前执行一些代码来进行监控。在监控线程池的时候可以使用一下属性:

(1) taskCount:线程池需要执行的任务数量

(2) completedTaskCount:线程池在运行过程中已完成的任务数量,小于或等于taskCount

(3) largestPoolSize: 线程池里曾经创建过最大的线程数量。通过这个数据可以知道线程池是否曾经满过。如该数值等于线程池最大大小,则表示线程池曾经满过。

(4) getPoolSize:线程池的线程数量。如果线程池不销毁的话,线程池里的线程不会自动销毁,所以这个大小只增不减。

(5) getActiveCount:获取活动的线程数

3.Executor多线程框架

ThreadPoolExecutor 表示一个线程池,Executors 类则扮演着线程池工厂的角色,通过 Executors 可以取得一个特定功能的线程池。

使用 Executors 框架实现上节中的例子,其代码如下:

    public static void main(String[] args) {
        //新建一个线程池
        ExecutorService executor = Executors.newCachedThreadPool();
        //在其它线程中执行100次下列方法
        for (int i = 0; i < 100; i++) {
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    System.out.println(Thread.currentThread().getName());
                }
            });
        }
        //执行完关闭
        executor.shutdown();
    }

3.1 Executors框架的结构

  1. 任务

    包括被执行任务需要实现的接口:Runnable 接口或 Callable 接口。

  2. 任务的执行

    包括任务执行机制的核心接口 Executor,以及继承自 Executor 的ExecutorService 接口。Executor框架有两个关键类实现了 ExecutorService 接口(ThreadPoolExecutor 和 ScheduledThreadPoolExecutor)。

  3. 异步计算的结果

    包括接口 Future 和实现Future接口的FutureTask类。

3.2 Executors工厂方法

Executors工厂类的主要方法:

public static ExecutorService newFixedThreadPool(int nThreads)
  • 该方法返回一个固定线程数量的线程池,该线程池中的线程数量始终不变。当有一个新的任务提交时,线程池中若有空闲线程,则立即执行。若没有,则新的任务会被暂存在一个任务队列中,待有线程空闲时,便处理在任务队列中的任务。
public static ExecutorService newSingleThreadExecutor()
  • 该方法返回一个只有一个线程的线程池。若多余一个任务被提交到线程池,任务会被保存在一个任务队列中,待线程空闲,按先入先出的顺序执行队列中的任务。
public static ExecutorService newCachedThreadPool()
  • 该方法返回一个可根据实际情况调整线程数量的线程池。线程池的线程数量不确定,但若有空闲线程可以复用,则会优先使用可复用的线程。但所有线程均在工作,又有新的任务提交,则会创建新的线程处理任务。所有线程在当前任务执行完毕后,将返回线程池进行复用。
public static ScheduledExecutorService newSingleThreadScheduledExecutor() 
  • 该方法返回一个ScheduledExecutorService对象,线程池大小为1。ScheduledExecutorService接口在ExecutorService接口之上扩展了在给定时间执行某任务的功能,如在某个固定的延时之后执行,或者周期性执行某个任务。
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)
  • 该方法也返回一个 ScheduledExecutorService 对象,但该线程池可以指定线程数量。

3.3 ThreadPoolExecutor与ScheduledThreadPoolExecutor

在前面提到了Executors 类扮演着线程池工厂的角色,通过 Executors 可以取得一个特定功能的线程池。Executors 工厂类的主要方法可以创建 ThreadPoolExecutor 和 ScheduledThreadPoolExecutor 线程池。

关于ThreadPoolExecutor ,前面第3节已经详细叙述。ScheduledThreadPoolExecutor 也是ExecutorService接口的实现类,可以在给定的延迟后运行命令,或者定期执行命令。ScheduledThreadPoolExecutor 比 Timer 更灵活,功能更强大。

3.3 Future与FutureTask

上面的示例中使用 execute() 方法提交任务,用于提交不需要返回值的任务。如果我们需要获取执行任务之后的返回值,可以使用submit()方法。

示例代码:

   public static void main(String[] args) throws InterruptedException, ExecutionException {
        //新建一个线程池
        ExecutorService executor = Executors.newCachedThreadPool();
        List<Future> futureList = new Vector<>();
        //在其它线程中执行100次下列方法
        for (int i = 0; i < 100; i++) {
            futureList.add(executor.submit(new Callable<String>() {
                @Override
                public String call() throws Exception {
                    return Thread.currentThread().getName()+" "+System.currentTimeMillis()+" ";
                }
            }));
        }
        for (int i = 0;i<futureList.size();i++){
            Object o = futureList.get(i).get();
            System.out.println(o.toString()+i);
        }
        executor.shutdown();
    }

运行结果:

...
pool-1-thread-11 1537872778612 96
pool-1-thread-11 1537872778613 97
pool-1-thread-10 1537872778613 98
pool-1-thread-10 1537872778613 99

到这里,就不得不提Future接口与FutureTask实现类,它们代表异步计算的结果。

Future<T> submit(Callable<T> task)
Future<?> submit(Runnable task);
Future<T> submit(Runnable task, T result);

当我们submit()提交后,会返回一个Future对象,到JDK1.8,返回的实际是FutureTask对象。

Runnable 接口 和Callable 接口的区别就是 Runnable 不会返回结果,Callable 会返回结果。

主线程可以执行 futureTask.get() 方法来阻塞当前线程直到任务执行完成,任务完成后返回任务执行的结果。

futureTask.get(long timeout, TimeUnit unit) 方法则会阻塞当前线程一段时间立即返回,这时候有可能任务没有执行完。

主线程也可以执行 futureTask.cancel(boolean mayInterruptIfRunning) 来取消此任务的执行。

如果没有什么特殊要求,可以直接使用JDK中的内置线程池,来改善系统的性能。

参考

《Java程序性能优化》葛一鸣著

《Java并发编程的艺术》方、魏、程著

原文地址:https://www.cnblogs.com/yueshutong/p/9706199.html

时间: 2024-08-05 22:35:32

Java线程池实现原理与技术的相关文章

Java 线程池的原理与实现

最近在学习线程池.内存控制等关于提高程序运行性能方面的编程技术,在网上看到有一哥们写得不错,故和大家一起分享. [分享]Java 线程池的原理与实现 这几天主要是狂看源程序,在弥补了一些以前知识空白的同时,也学会了不少新的知识(比如 NIO),或者称为新技术吧.线程池就是其中之一,一提到线程,我们会想到以前<操作系统>的生产者与消费者,信号量,同步控制等等.一提到池,我们会想到数据库连接池,但是线程池又如何呢? 建议:在阅读本文前,先理一理同步的知识,特别是syncronized同步关键字的用

Java 线程池的原理与实现 (转)

  最近在学习线程池.内存控制等关于提高程序运行性能方面的编程技术,在网上看到有一哥们写得不错,故和大家一起分享. [分享]Java 线程池的原理与实现 这几天主要是狂看源程序,在弥补了一些以前知识空白的同时,也学会了不少新的知识(比如 NIO),或者称为新技术吧.线程池就是其中之一,一提到线程,我们会想到以前<操作系统>的生产者与消费者,信号量,同步控制等等.一提到池,我们会想到数据库连接池,但是线程池又如何呢? 建议:在阅读本文前,先理一理同步的知识,特别是syncronized同步关键字

Java线程池的原理及几类线程池的介绍

刚刚研究了一下线程池,如果有不足之处,请大家不吝赐教,大家共同学习.共同交流. 在什么情况下使用线程池? 单个任务处理的时间比较短 将需处理的任务的数量大 使用线程池的好处: 减少在创建和销毁线程上所花的时间以及系统资源的开销 如不使用线程池,有可能造成系统创建大量线程而导致消耗完系统内存以及"过度切换". 线程池工作原理: 为什么要用线程池? 诸如 Web 服务器.数据库服务器.文件服务器或邮件服务器之类的许多服务器应用程序都面向处理来自某些远程来源的大量短小的任务.请求以某种方式到

我眼中的java线程池实现原理

最近在看java线程池实现方面的源码,在此做个小结,因为网上关于线程池源码分析的博客挺多的,我也不打算重复造轮子啦,仅仅用纯语言描述的方式做做总结啦! 个人认为要想理解清楚java线程池实现原理,明白下面几个问题就可以了: (1):线程池存在哪些状态,这些状态之间是如何进行切换的呢? (2):线程池的种类有哪些? (3):创建线程池需要哪些参数,这些参数的具体含义是什么? (4):将任务添加到线程池之后运行流程? (5):线程池是怎么做到重用线程的呢? (6):线程池的关闭 首先回答第一个问题:

11 java 线程池 实现原理

一 关键类的实现 1 ThreadPoolExecutor类 java.uitl.concurrent.ThreadPoolExecutor类是线程池中最核心的一个类,因此如果要透彻地了解Java中的线程池,必须先了解这个类. 下面我们来看一下ThreadPoolExecutor类的具体实现源码. 在ThreadPoolExecutor类中提供了四个构造方法: 1 public class ThreadPoolExecutor extends AbstractExecutorService {

深入浅出JAVA线程池使用原理1

前言: Java中的线程池是并发框架中运用最多的,几乎所有需要异步或并发执行任务的程序都可以使用线程池,线程池主要有三个好处: 1.降低资源消耗:可以重复使用已经创建的线程降低线程创建和销毁带来的消耗 2.提高响应速度:执行任务时,不需要等待线程的创建就可以直接执行任务 3.提高线程的可管理性:线程是稀缺资源,如果无限制地创建不仅会消耗系统资源,还会降低系统的稳定性,线程池可以对线程进行统一分配.调优和监控 一.线程池的实现原理 在了解线程池实现原理之前,先了解线程池的一些元素 1.核心线程池

Java 线程池的原理与实现学习(一)

线程池:    多线程技术主要解决处理器单元内多个线程执行的问题,它可以显著减少处理器单元的闲置时间,增加处理器单元的吞吐能力.        假设一个服务器完成一项任务所需时间为:T1 创建线程时间,T2 在线程中执行任务的时间,T3 销毁线程时间.        如果:T1 + T3 远大于 T2,则可以采用线程池,以提高服务器性能.                一个线程池包括以下四个基本组成部分:                1.线程池管理器(ThreadPool):用于创建并管理线程

深入浅出JAVA线程池使用原理2

一.Executor框架介绍 Executor框架将Java多线程程序分解成若干个任务,将这些任务分配给若干个线程来处理,并得到任务的结果 1.1.Executor框架组成 任务:被执行任务需要实现的接口:Runnable接口或Callable接口 任务的执行:任务执行的核心接口Executor以及其子类ExecutorService接口 任务的结果:包括Future接口以及Future接口的实现类FutureTask类 Executor接口是Executor框架的基础,将任务的提交与执行分离开

Java 线程池的原理与实现学习(二)

摘自:http://www.cnblogs.com/lxzh/archive/2013/01/20/2868736.html execute(Runnable command):履行Ruannable类型的任务 submit(task):可用来提交Callable或Runnable任务,并返回代表此任务的Future对象 invokeAll(collection of tasks):执行给定的任务,当所有任务完成时,返回保持任务状态和结果的 Future 列表. shutdown():在完成已提