concurrent包分析之Executor框架

文章目录

  1. 线程生命周期的开销:线程比较少的情况使用new Thread(task)无多大影响,但是如果涉及到线程比较多的情况,应用的性能就会受到影响,如果jdbc创建连接一样,new Thead创建线程也会耗资源、耗时间的。
  2. 资源的消耗量:活动线程会消耗系统性能,如果运行的线程数量多余可用的处理器数,那么就会有大量空闲的线程占用内存,会给垃圾收集器带来压力,如果有cpu资源竞争,还会有其他性能开销。
  3. 限定创建线程的数目:如果不设定创建线程的数量,一个任务一个线程无限创建线程,高负载情况下就有可能造成OutOfMemoryError错误。所以像tomcat这种servlet容器的线程池都设置了最大线程数量的。

Executor框架组成

Eexecutor接口:包含Eexecutor、ExecutorService、ScheduledExecutorService
ThreadPool线程池:包含ThreadPoolExecutor、ScheduledThreadPoolExecutor
Fork/Join框架:JDK1.7新增
类之间的关系如下:

Executor框架将线程的创建与执行解耦,可以异步调用,让任务相互独立,用阻塞队列管理任务,直接在当前线程中消费队列,可以减少线程之间进行资源竞争,也可以减少线程的创建和系统的开销,要廉价多了。这种设计就是经典并发模式Active Object Models(也称Actor Models)的实现,如下图:

可以参考execute方法,就按照上面的模式来的。另外Executors工厂类创建了不同的连接池,为任务的执行分配了不同执行策略。

线程池

concurrent包里面主要包含ThreadPoolExecutor、ScheduledThreadPoolExecutor两种线程池,Executors类提供了很多创建线程池的方法

,newFixedThreadPool创建定长的线程池、newWorkStealingPool创建ForkJoinPool(jdk1.8新增)、newCachedThreadPool创建缓存线程池(可以回收空闲的线程)、newSingleThreadExecutor创建当个线程池(保证FIFOLIFO优先级),newScheduledThreadPool创建定时器线程池,如果有特殊处理的,也可以根据自己的需求来创建连接池,如tomcat也是基于ThreadPoolExecutor实现了自己的连接池。

ThreadPoolExecutor

public (int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
     Executors.defaultThreadFactory(), defaultHandler);
}

从ThreadPoolExecutor最简单的构造函数可以看出,线程池总是依赖阻塞队列而工作的,newFixedThreadPool与newSingleThreadExecutor使用的是LinkedBlockingQueue无界队列,newCachedThreadPool使用SynchronousQueue,ThreadPoolExecutor还定义了一个Worker执行任务线程,除此之外,还有个非常重要的变量ctl(线程池控制状态)由执行器状态和工作线程的数量组成,在控制执行的时候都是围绕这个变量来判断。

处理任务

参考前面的Active Object Models图,可以将线程池执行任务主要分为3个步骤:
1、如果任务少于线程池大小时,就作为firstTask分别创建工作线程执行任务
2、如果第n个任务超出了线程池大小,就加入到阻塞队列,并从新检测执行器状态状态以及工作线程数量(有可能线程发生RuntimeException挂掉,最后可工作线程数量变为0),如果工作线程挂完了就重新启动一个线程(解决线程泄露问题)。阻塞队列的任务执行就在第一步所创建的线程执行,参考代码:

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();

    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)

            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);
}

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        //判断当前任务或者检测阻塞队列
        while (task != null || (task = getTask()) != null) {
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task);
            大专栏  concurrent包分析之Executor框架     Throwable thrown = null;
                try {
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

3、如果任务入队列失败了,就重新开启一个线程去执行,如果还是失败了,就可以确定是执行器关闭了或者线程池已经达到饱和状态了。

线程池原理

从上面的执行步骤来就可以看出线程池依赖于阻塞队列,然后基于生产者消费者模式实现的,execute()方法一直添加任务(生产者),当任务数量超出线程池的最大长度就添加到阻塞队列等待排队执行,而第一次创建的所有工作线程(最大数量为线程池的最大长度)就会一直判断线程池里面是否有任务执行,如果有就执行任务(消费者)。这样做就可以重用线程了,不用每次去创建线程,性能肯定比一个任务一个线程好多了。

ScheduledThreadPoolExecutor

ScheduledThreadPoolExecutor由ScheduledFutureTask、DelayedWorkQueue组成,实现任务执行还是用的父类ThreadPoolExecutor的工作线程。jdk1.5之前使用定时任务都用的Timer,但是与ScheduledThreadPoolExecutor有着明显区别:

  1. Timer使用的System.currentTimeMillis()毫秒来控制时间,ScheduledThreadPoolExecutor使用System.nanoTime()纳秒控制更加精准,并且可以使用TimeUnit进行时间的跨单元转换。
  2. Timer只有单个工作线程,ScheduledThreadPoolExecutor可以配置多个工作线程。
  3. 如果工资线程发生异常,Timer会造成线程泄露没有重启的线程,ScheduledThreadPoolExecutor会一直检测工作线程的数量,如果没有工作线程了,会一直添加数量小于线程池的工作线程(使用父类ThreadPoolExecutor的addaddWorker方法)
    从以上比较可以看出,ScheduledThreadPoolExecutor就是取代Timer的。

还是从一个简单的例子看ScheduledThreadPoolExecutor是如何工作的。addWork参考领导/跟随者模式

@Test
public void testScheduleAtFixedRate() throws Exception {
    ScheduledThreadPoolExecutor p = new ScheduledThreadPoolExecutor(1);
    CountDownLatch latch = new CountDownLatch(3);
    p.scheduleAtFixedRate(new Task(latch), 0, 1000, TimeUnit.MILLISECONDS);
    latch.await();
}

线程池都是基于生产者消费者模式的,所以ScheduledThreadPoolExecutor也不例外,执行任务的时候会一直像队列里面添加任务:

private void delayedExecute(RunnableScheduledFuture<?> task) {
    if (isShutdown())
        reject(task);
    else {
        //添加到队列,让工作线程去消费
        super.getQueue().add(task);
        if (isShutdown() &&
            !canRunInCurrentRunState(task.isPeriodic()) &&
            remove(task))
            task.cancel(false);
        else
            //启动线程
            ensurePrestart();
    }
}

ScheduledThreadPoolExecutor第一次执行用的父类ThreadPoolExecutor的addWorker方法添加工作线程并启动它,然后每个工作线程会检查队列里面是否有消费的线程,参考ThreadPoolExecutor的runWorker方法,与ThreadPoolExecutor有点不同的是,ScheduledThreadPoolExecutor使用的是延时阻塞队列DelayedWorkQueue。每次消费就进行take操作:

public RunnableScheduledFuture<?> take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        for (;;) {
            RunnableScheduledFuture<?> first = queue[0];
            if (first == null)
                available.await();
            else {
                long delay = first.getDelay(NANOSECONDS);
                if (delay <= 0)
                    //返回队列中的执行任务之前,要先执行finally模块中的唤醒操作
                    return finishPoll(first);
                first = null; // don't retain ref while waiting
                if (leader != null)
                    available.await();
                else {
                    //领导线程初始化为null,当前线程为线程池中的线程
                    Thread thisThread = Thread.currentThread();
                    leader = thisThread;
                    try {
                        //当前线程等待延迟时间到期
                        available.awaitNanos(delay);
                    } finally {
                        if (leader == thisThread)
                            leader = null;
                    }
                }
            }
        }
    } finally {
        if (leader == null && queue[0] != null)
            available.signal();
        lock.unlock();
    }
}

参考资料:

原文地址:https://www.cnblogs.com/lijianming180/p/12230943.html

时间: 2024-08-03 10:38:39

concurrent包分析之Executor框架的相关文章

jdk8中java.util.concurrent包分析

并发框架分类 1. Executor相关类 2.future相关类 3.Queue相关类 4. atomic相关类 5. lock相关类

JDK源码分析之concurrent包(一) -- Executor架构

Java5新出的concurrent包中的API,是一些并发编程中实用的的工具类.在高并发场景下的使用非常广泛.笔者在这做了一个针对concurrent包中部分常用类的源码分析系列.本系列针对的读者是已经对并发包中的Executor框架和工具类有所了解并懂得如何使用的人群,如果对并发包还不了解的朋友,请先做些了解.网上对这方面的讲述有丰富的资源. 本篇博文是第一期,首先对Executor架构做一个概述.这里只简单介绍接口和类的继承.使用关系. 盗用一张类图来描述结构: 解析: Executor是

SSH框架jar包分析

Hibernate jar包 ================= 必要的包: hibernate3.jar,  这个是hibernate3.0的核心jar包,必须的,呵呵,没的选,像我们常用的Session,Query,Transaction都位于这个jar文件中,必要. cglib-2.1.3.jar, CGLIB库,Hibernate用它来实现PO字节码的动态生成,非常核心的库,必要. asm.jar      ASM字节码库    如果使用"cglib"则必要,必要 asm-at

java.util.concurrent包详细分析

java.util.concurrent包详细分析 java.util.concurrent 包含许多线程安全.测试良好.高性能的并发构建块.不客气地说,创建java.util.concurrent 的目的就是要实现 Collection 框架对数据结构所执行的并发操作.通过提供一组可靠的.高性能并发构建块,开发人员可以提高并发类的线程安全.可伸缩性.性能.可读性和可靠性.

JDK源码分析之concurrent包(三) -- Future方式的实现

上一篇我们基于JDK的源码对线程池ThreadPoolExecutor的实现做了分析,本篇来对Executor框架中另一种典型用法Future方式做源码解读.我们知道Future方式实现了带有返回值的程序的异步调用,关于异步调用的场景大家可以自行脑补Ajax的应用(获取返回结果的方式不同,Future是主动询问获取,Ajax是回调函数),这里不做过多说明. 在进入源码前,首先来看下Future方式相关的API: 接口Callable:有返回结果并且可能抛出异常的任务: 接口Future:表示异步

Concurrent包总结——线程任务执行框架

一 Executor接口 Executor接口的对象是一种执行任务的方式.它能够使提交的任务和执行任务的线程的管理解耦.我们通常用Executor来代替new一个Thread对象来执行任务.这样可以省略底层线程的管理细节. 例如: executor.excute(new RunnableTask()); concurrent包中提供了比较常用的Executor的实现,这些实现类都实现了一个更加灵活的类ExecutorService.而ThreadPoolExecutor则提供了一个可扩展的线程池

JDK源码分析之concurrent包(二) -- 线程池ThreadPoolExecutor

上一篇我们简单描述了Executor框架的结构,本篇正式开始并发包中部分源码的解读. 我们知道,目前主流的商用虚拟机在线程的实现上可能会有所差别.但不管如何实现,在开启和关闭线程时一定会耗费很多CPU资源,甚至在线程的挂起和恢复JDK1.6都做了自旋锁的优化.所以,使用线程池来管理和执行多线程任务会大大提高程序执行效率.关于使用线程池的优点这里不做过多说明,我们直接进入Java5并发包中ThreadPoolExecutor的实现的源码. 在解读源码前,我们先来看看创建线程池的一般做法和线程池的几

java 并发(concurrent)包源码分析

参考连接: http://www.cnblogs.com/luoxn28/p/6059881.html http://www.cnblogs.com/java-zhao/p/5140158.html 持续更新中..... 并发是一种能并行运行多个程序或并行运行一个程序中多个部分的能力.如果程序中一个耗时的任务能以异步或并行的方式运行,那么整个程序的吞吐量和可交互性将大大改善.现代的PC都有多个CPU或一个CPU中有多个核,是否能合理运用多核的能力将成为一个大规模应用程序的关键. Java多线程相

Java Executor 框架

Java Executor 框架 Executor框架是指java5中引入的一系列并发库中与executor相关的功能类,包括Executor.Executors. ExecutorService.CompletionService.Future.Callable等.(图片引用自 http://www.javaclubcn.com/a/jichuzhishi/2012/1116/170.html) 本篇博文分析Executor中几个比较重要的接口和类. Executor 1 public int