聊聊高并发(四十四)解析java.util.concurrent各个组件(二十) Executors工厂类

Executor框架为了更方便使用,提供了Executors这个工厂类,通过一系列的静态工厂方法,可以快速地创建相应的Executor实例。

只有一个nThreads参数的newFixedThreadPool方法会创建一个ThreadPoolExecutor,corePoolSize和maximumPoolSize都是nThreads,并且keepAliveTime为0表示不会设置过期时间,采用LinkedBlockingQueue作为工作队列

这个方法创建的ThreadPoolExecutor采用固定线程数nThreads,当线程少于nThreads时会为新的任务创建新的Worker工作线程,直到线程数达到nThreads。线程达到nThreads后不会回收。后续新建的任务会进入工作队列,工作队列是无界的。当任务量过大时,可能会因为无界的工作队列造成OOM的问题。

 public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

这个方法和上面的方法基本一致,只是多了一个ThreadFactory,可以自定义创建的线程属性。

 public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>(),
                                      threadFactory);
    }

这个newSingleThreadExecutor是上面的方法基本一致,只是创建了单线程的线程池

 public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

这个方法和上面的方法基本一致,只是多了一个ThreadFactory,可以自定义创建的线程属性。

public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>(),
                                    threadFactory));
    }

这个newCachedThreadPool返回一个ThreadPoolExecutor,corePoolSize为0,maximumPoolSize为Integer.MAX_VALUE,表示的意思是线程数没有限制。KeepAliveTime为60秒,表示的意思是当线程空闲时间超过60秒才会回收线程。这个就是所谓的Cache。空闲的意思之前说了,表示Worker在工作队列中取任务时,如果超过60秒没取到任务,这个线程就超时,要被回收。采用了SynchronousQueue同步队列作为工作队列,意思是来一个新任务就把任务交给Worker工作线程,不入队列。如果没有可用的工作线程,就创建新的工作线程。这个方法的问题是当任务量大时,会消耗太多的CPU资源,创建太多线程,增大线程上线文切换等消耗。

 public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

这个方法和上面的方法基本一致,只是多了一个ThreadFactory,可以自定义创建的线程属性。

 public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>(),
                                      threadFactory);
    }

这个方法创建单线程的ScheduledThreadPoolExecutor。DelegatedScheduleExecutorService是个包装类,将ScheduledThreadPoolExecutor的对外接口缩小

 public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
        return new DelegatedScheduledExecutorService
            (new ScheduledThreadPoolExecutor(1));
    }

这个方法和上面的方法基本一致,只是多了一个ThreadFactory,可以自定义创建的线程属性。

public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) {
        return new DelegatedScheduledExecutorService
            (new ScheduledThreadPoolExecutor(1, threadFactory));
    }

这个方法创建corePoolSize个线程的ScheduledThreadPoolExecutor。其他特性和newFixedThreadPool(nThreads)一致

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }

这个方法和上面的方法基本一致,只是多了一个ThreadFactory,可以自定义创建的线程属性。

public static ScheduledExecutorService newScheduledThreadPool(
            int corePoolSize, ThreadFactory threadFactory) {
        return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
    }

这个方法把Runnable接口适配成Callable接口

 public static <T> Callable<T> callable(Runnable task, T result) {
        if (task == null)
            throw new NullPointerException();
        return new RunnableAdapter<T>(task, result);
    }

版权声明:本文为博主原创文章,未经博主允许不得转载。

时间: 2024-08-14 11:25:19

聊聊高并发(四十四)解析java.util.concurrent各个组件(二十) Executors工厂类的相关文章

聊聊高并发(三十)解析java.util.concurrent各个组件(十二) 理解CyclicBarrier栅栏

这篇讲讲CyclicBarrier栅栏,从它的名字可以看出,它是可循环使用的.它的功能和CountDownLatch类似,也是让一组线程等待,然后一起开始往下执行.但是两者还是有几个区别 1. 等待的对象不同.CountDownLatch的一组线程等待的是一个事件,或者说是一个计数器归0的事件.而CyclicBarrier等待的对象是线程,只有线程都到齐了才往下执行 2. 使用方式不同,这个也是由等待的对象不同引起的,CountDownLatch需要调用await()来让线程等待,调用count

谈论高并发(三十)解析java.util.concurrent各种组件(十二) 认识CyclicBarrier栅栏

这次谈话CyclicBarrier栅栏,如可以从它的名字可以看出,它是可重复使用. 它的功能和CountDownLatch类别似,也让一组线程等待,然后开始往下跑起来.但也有在两者之间有一些差别 1. 不同的对象等.CountDownLatch组线程等待的是一个事件.或者说是一个计数器归0的事件.而CyclicBarrier等待的对象是线程,仅仅有线程都到齐了才往下运行 2. 使用方式不同,这个也是由等待的对象不同引起的,CountDownLatch须要调用await()来让线程等待.调用cou

聊聊高并发(四十)解析java.util.concurrent各个组件(十六) ThreadPoolExecutor源代码分析

ThreadPoolExecutor是Executor运行框架最重要的一个实现类.提供了线程池管理和任务管理是两个最主要的能力.这篇通过分析ThreadPoolExecutor的源代码来看看怎样设计和实现一个基于生产者消费者模型的运行器. 生产者消费者模型 生产者消费者模型包括三个角色:生产者,工作队列,消费者.对于ThreadPoolExecutor来说, 1. 生产者是任务的提交者,是外部调用ThreadPoolExecutor的线程 2. 工作队列是一个堵塞队列的接口,详细的实现类能够有非

聊聊高并发(四十一)解析java.util.concurrent各个组件(十七) 任务的异步执行和状态控制

聊聊高并发(三十九)解析java.util.concurrent各个组件(十五) 理解ExecutorService接口的设计这篇说了ExecutorService接口扩展了Executor接口,在执行任务的基础上,提供了执行框架生命周期的管理,任务的异步执行,批量任务的执行的能力.AbstractExecutorService抽象类实现了ExecutorService接口,提供了任务异步执行和批量执行的默认实现.这篇说说任务的异步执行和状态控制 说明一点,使用Executor框架执行任务的方式

聊聊高并发(二十)解析java.util.concurrent各个组件(二) 12个原子变量相关类

这篇说说java.util.concurrent.atomic包里的类,总共12个.网上有非常多文章解析这几个类.这里挑些重点说说. 这12个类能够分为三组: 1. 普通类型的原子变量 2. 数组类型的原子变量 3. 域更新器 普通类型的原子变量的6个, 1. 当中AtomicBoolean, AtomicInteger, AtomicLong, AtomicReference分别相应boolean, int,  long, object完毕主要的原子操作 2. AtomicMarkableRe

聊聊高并发(三十九)解析java.util.concurrent各个组件(十五) 理解ExecutorService接口的设计

上一篇讲了Executor接口的设计,目的是将任务的运行和任务的提交解耦.能够隐藏任务的运行策略.这篇说说ExecutorService接口.它扩展了Executor接口,对Executor的生命周期进行管理.并进行了进一步的扩展. Executor负责执行任务. 它的生命周期有3个:执行,关闭和已终止. 在执行的不论什么时刻,有些 任务可能已经完毕,有些可能正在执行,有些可能正在队列中等待执行.所以假设要关闭Executor的话.就有多种方式,比方优雅平滑的关闭,当执行关闭时就不在接受新的任务

聊聊高并发(二十九)解析java.util.concurrent各个组件(十一) 再看看ReentrantReadWriteLock可重入读-写锁

上一篇聊聊高并发(二十八)解析java.util.concurrent各个组件(十) 理解ReentrantReadWriteLock可重入读-写锁 讲了可重入读写锁的基本情况和主要的方法,显示了如何实现的锁降级.但是下面几个问题没说清楚,这篇补充一下 1. 释放锁时的优先级问题,是让写锁先获得还是先让读锁先获得 2. 是否允许读线程插队 3. 是否允许写线程插队,因为读写锁一般用在大量读,少量写的情况,如果写线程没有优先级,那么可能造成写线程的饥饿 关于释放锁后是让写锁先获得还是让读锁先获得,

聊聊高并发(二十五)解析java.util.concurrent各个组件(七) 理解Semaphore

前几篇分析了一下AQS的原理和实现.这篇拿Semaphore信号量做样例看看AQS实际是怎样使用的. Semaphore表示了一种能够同一时候有多个线程进入临界区的同步器,它维护了一个状态表示可用的票据,仅仅有拿到了票据的线程尽能够进入临界区,否则就等待.直到获得释放出的票据. Semaphore经常使用在资源池中来管理资源.当状态仅仅有1个0两个值时,它退化成了一个相互排斥的同步器.类似锁. 以下来看看Semaphore的代码. 它维护了一个内部类Sync来继承AQS,定制tryXXX方法来使

聊聊高并发(二十六)解析java.util.concurrent各个组件(八) 理解CountDownLatch闭锁

CountDownLatch闭锁也是基于AQS实现的一种同步器,它表示了"所有线程都等待,直到锁打开才继续执行"的含义.它和Semaphore的语意不同, Semaphore的获取和释放操作都会修改状态,都可能让自己或者其他线程立刻拿到锁.而闭锁的获取操作只判断状态是否为0,不修改状态本身,闭锁的释放操作会修改状态,每次递减1,直到状态为0. 所以正常情况下,闭锁的获取操作只是等待,不会立刻让自己获得锁,直到释放操作把状态变为0. 闭锁可以用来实现很多场景,比如: 1. 某个服务依赖于

聊聊高并发(二十七)解析java.util.concurrent各个组件(九) 理解ReentrantLock可重入锁

这篇讲讲ReentrantLock可重入锁,JUC里提供的可重入锁是基于AQS实现的阻塞式可重入锁.这篇 聊聊高并发(十六)实现一个简单的可重入锁 模拟了可重入锁的实现.可重入锁的特点是: 1. 是互斥锁,基于AQS的互斥模式实现,也就是说同时只有一个线程进入临界区,唤醒下一个线程时也只能释放一个等待线程 2. 可重入,通过设置了一个字段exclusiveOwnerThread来标示当前获得锁的线程.获取锁操作是,如果当前线程是已经获得锁的线程,那么获取操作成功.把当前状态作为获得锁次数的计数器