第十四章 Executors源码解析

前边两章介绍了基础线程池ThreadPoolExecutor的使用方式、工作机理、参数详细介绍以及核心源码解析。

具体的介绍请参照:

第十二章 ThreadPoolExecutor使用与工作机理

第十三章 ThreadPoolExecutor源码解析

1、Executors与ThreadPoolExecutor

  • ThreadPoolExecutor

    • 可以灵活的自定义的创建线程池,可定制性很高
    • 想创建好一个合适的线程池比较难
    • 使用稍微麻烦一些
    • 实际中很少使用
  • Executors
    • 可以创建4种线程池,这四种线程池基本上已经包含了所有需求,将来根据业务特点选用就好
    • 使用非常简单
    • 实际中很常用

使用方法:

package com.collection.test;

import java.util.concurrent.Executor;
import java.util.concurrent.Executors;

public class ThreadPoolExecutorTest {
    //private static ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 30, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(10));
    //private static Executor executor = Executors.newFixedThreadPool(5);
    //private static Executor executor = Executors.newSingleThreadExecutor();
    //private static Executor executor = Executors.newCachedThreadPool();
    private static Executor executor = Executors.newScheduledThreadPool(5);

    public void executeTask(){
        Task1 task1 = new Task1();//构建任务1
        Task2 task2 = new Task2();//构建任务2
        executor.execute(task1);//执行任务1
        executor.execute(task2);//执行任务2
    }

    /*
     * 基本任务2
     */
    class Task1 implements Runnable{
        public void run() {
            //具体任务的业务
            for(int i=0;i<1000;i++){
                System.out.println("hello xxx!!!");
            }
        }
    }

    /*
     * 基本任务2
     */
    class Task2 implements Runnable{
        public void run() {
            //具体任务的业务
            for(int i=0;i<5;i++){
                System.out.println("hello world2!!!");
            }
        }
    }

    public static void main(String[] args) {
        ThreadPoolExecutorTest test = new ThreadPoolExecutorTest();
        test.executeTask();
    }
}

2、Executors可以创建的几种线程池简介

  • newFixedThreadPool(int corePoolSize)

    • 创建一个线程数固定(corePoolSize==maximumPoolSize)的线程池
    • 核心线程会一直运行
    • 无界队列LinkedBlockingQueue
  • newSingleThreadExecutor
    • 创建一个线程数固定(corePoolSize==maximumPoolSize==1)的线程池
    • 核心线程会一直运行
    • 无界队列LinkedBlockingQueue
    • 所有task都是串行执行的(即同一时刻只有一个任务在执行)
  • newCachedThreadPool
    • corePoolSize==0
    • maximumPoolSize==Integer.MAX_VALUE
    • 队列:SynchronousQueue
    • 创建一个线程池:当池中的线程都处于忙碌状态时,会立即新建一个线程来处理新来的任务
    • 这种池将会在执行许多耗时短的异步任务的时候提高程序的性能
    • 6秒钟内没有使用的线程将会被中止,并且从线程池中移除,因此几乎不必担心耗费资源
  • newScheduledThreadPool(int corePoolSize)
    • 用于执行定时或延迟执行的任务,最典型的:异步操作时的超时回调

注意:对于定时任务的执行,在实际使用中,会去使用spring定时器,非常方便

3、newFixedThreadPool(int corePoolSize)

源代码:

    /**
     * 1、创建一个线程数固定(corePoolSize==maximumPoolSize)的线程池,
     * 2、核心线程会一直运行
     * 3、无界队列LinkedBlockingQueue
     */
    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

说明:execute()的源代码查看第十三章 ThreadPoolExecutor源码解析

4、newSingleThreadExecutor()

源代码:

    /**
     * 1、创建一个线程数固定(corePoolSize==maximumPoolSize==1)的线程池
     * 2、核心线程会一直运行
     * 3、无界队列LinkedBlockingQueue
     * 注意:所有task都是串行执行的
     */
    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

说明:execute()的源代码查看第十三章 ThreadPoolExecutor源码解析

5、newCachedThreadPool()

源代码:

    /**
     * 1、创建一个线程池:当池中的线程都处于忙碌状态时,会立即新建一个线程来处理新来的任务
     * 2、这种池将会在执行许多耗时短的异步任务的时候提高程序的性能。
     * 3、6秒钟内没有使用的线程将会被中止,并且从线程池中移除,因此几乎不必担心耗费资源
     * 4、队列:SynchronousQueue
     * 5、maximumPoolSize为Integer.MAX_VALUE
     */
    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

说明:execute()的源代码查看第十三章 ThreadPoolExecutor源码解析

6、newScheduledThreadPool(int corePoolSize)

源代码:

Executors:newScheduledThreadPool(int corePoolSize)

    /**
     * 创建一个线程池:该线程池可以用于执行延时任务或者定时任务
     */
    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }

ScheduledThreadPoolExecutor:ScheduledThreadPoolExecutor(int corePoolSize)

    /**
     * 创建一个线程池:
     * corePoolSize==我们指定
     * maximumPoolSize==Integer.MAX_VALUE
     * keepAliveTime==0纳秒(即不回收闲置线程)
     * 队列: DelayedWorkQueue
     */
    public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
              new DelayedWorkQueue());
    }

说明:ScheduledThreadPoolExecutor是ThreadPoolExecutor的子类,其中调用的super构造器就是ThreadPoolExecutor的构造器。

ScheduledThreadPoolExecutor:execute(Runnable command)

    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        schedule(command, 0, TimeUnit.NANOSECONDS);
    }

ScheduledThreadPoolExecutor:schedule(Runnable command, long delay, TimeUnit unit)

    /**
     * 这个方法:其实就是将task封装一下,然后加入到DelayedWorkQueue中
     * 1、DelayedWorkQueue其实就是一个DelayQueue
     * 2、当有新的task加入时,DelayQueue会将其加入内部的数组对象中,并对其进行排序,在这里,排序的规则就是执行的时间,执行时间越近的排在越前
     * 3、线程池中的线程在执行task时,获取最近要执行的task,然后唤醒所有等待available条件的线程来执行该任务
     */
    public ScheduledFuture<?> schedule(Runnable command,
                                       long delay,
                                       TimeUnit unit) {
        if (command == null || unit == null)
            throw new NullPointerException();
        RunnableScheduledFuture<?> t = decorateTask(command,
                                                    new ScheduledFutureTask<Void>(command, null, triggerTime(delay, unit)));

        delayedExecute(t);
        return t;
    }

注意:这里的注释就是整个ScheduledThreadPoolExecutor的执行机理。

下面说一下其中调用到的一些方法。

第一部分:封装ScheduledFutureTask任务

ScheduledThreadPoolExecutor:triggerTime(long delay, TimeUnit unit)

    /**
     * 返回一个delayed action(延时任务)的触发时间
     */
    private long triggerTime(long delay, TimeUnit unit) {
         return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));
    }

    /**
     * Returns the trigger time of a delayed action.
     */
    long triggerTime(long delay) {
         return now() +
             ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
    }

说明:用于计算延时任务的触发时间。

注意:在上边的execute()方法中传递的delay是0,根据上边的代码,计算出触发时间就是now()。

ScheduledThreadPoolExecutor:内部类ScheduledFutureTask

    private class ScheduledFutureTask<V>
            extends FutureTask<V> implements RunnableScheduledFuture<V> {

        private final long sequenceNumber;//用于打破FIFO关系的序列号
        private long time;//任务执行的触发时间
        /**
         * 一个用于重复执行的任务的时间段(单位:纳秒)
         * 0-->不重复执行的任务
         * 正值:fixed-rate执行
         * 负值:fixed-delay执行
         */
        private final long period;

        /**
         * 创建一个一次性的action并且指定触发时间
         */
        ScheduledFutureTask(Runnable r, V result, long ns) {
            super(r, result);
            this.time = ns;
            this.period = 0;
            this.sequenceNumber = sequencer.getAndIncrement();
        }

说明:ScheduledFutureTask是FutureTask的子类,上边的构造器中的super(r, result)代码如下:

FutureTask:FutureTask(Runnable runnable, V result)

    private final Sync sync;//控制FutureTask的同步器

    public FutureTask(Runnable runnable, V result) {
        sync = new Sync(Executors.callable(runnable, result));
    }

Executors:callable(Runnable task, T result)

    public static <T> Callable<T> callable(Runnable task, T result) {
        if (task == null)
            throw new NullPointerException();
        return new RunnableAdapter<T>(task, result);
    }

Executors:内部类RunnableAdapter

    static final class RunnableAdapter<T> implements Callable<T> {
        final Runnable task;
        final T result;
        RunnableAdapter(Runnable  task, T result) {
            this.task = task;
            this.result = result;
        }
        public T call() {
            task.run();//这里是真正的task运行的地方
            return result;
        }
    }

注意:这里才是task真正去运行的地方。-->task.run()

至此,ScheduledFutureTask任务封装完成。

第二部分:修饰任务

ScheduledThreadPoolExecutor:RunnableScheduledFuture

    protected <V> RunnableScheduledFuture<V> decorateTask(Runnable runnable,
                                                          RunnableScheduledFuture<V> task) {
        return task;
    }

说明:这里其实就是直接返回了刚刚封装好的任务

第三部分:将延时任务加入阻塞队列

ScheduledThreadPoolExecutor:delayedExecute(Runnable command)

    private void delayedExecute(Runnable command) {
        if (isShutdown()) {//return runState != RUNNING;线程池状态不是RUNNING
            reject(command);//回绝任务
            return;
        }

        if (getPoolSize() < getCorePoolSize())//当前线程池数量少于核心线程数
            prestartCoreThread();//创建并启动一个核心线程

        super.getQueue().add(command);//获取阻塞队列,并将command加入队列
    }

说明:这样之后,之前封装好的任务就加入了延时队列DelayQueue(阻塞队列的一个子类)

DelayQueue:add(E e)

    public boolean add(E e) {
        return offer(e);
    }

    public boolean offer(E e) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            E first = q.peek();//获取队列头部节点但不删除
            q.offer(e);//将e放到q的尾部
            //如果队列中只有e或者e的触发时间小于队头结点
            if (first == null || e.compareTo(first) < 0)
                available.signalAll();
            return true;
        } finally {
            lock.unlock();
        }
    }

说明:在该方法中,将上边封装好的任务就加入了DelayQueue,并将该任务置于了队头,然后唤醒所有等待available条件的线程来执行该任务。

总结:

  • 四种线程池最常用的就是newCachedThreadPool和newFixedThreadPool(int corePoolSize)
  • 对于newScheduledThreadPool(int corePoolSize)使用比较少,因为在现代开发中,如果用于去开发定时任务程序的话,用spring定时器会非常简单
时间: 2024-10-06 16:54:18

第十四章 Executors源码解析的相关文章

第四章 CopyOnWriteArraySet源码解析

注:在看这篇文章之前,如果对CopyOnWriteArrayList底层不清楚的话,建议先去看看CopyOnWriteArrayList源码解析. http://www.cnblogs.com/java-zhao/p/5121944.html 1.对于CopyOnWriteArraySet需要掌握以下几点 创建:CopyOnWriteArraySet() 添加元素:即add(E)方法 删除对象:即remove(E)方法 遍历所有对象:即iterator(),在实际中更常用的是增强型的for循环去

第六章 ReentrantLock源码解析2--释放锁unlock()

最常用的方式: int a = 12; //注意:通常情况下,这个会设置成一个类变量,比如说Segement中的段锁与copyOnWriteArrayList中的全局锁 final ReentrantLock lock = new ReentrantLock(); lock.lock();//获取锁 try { a++;//业务逻辑 } catch (Exception e) { }finally{ lock.unlock();//释放锁 } 注:关于lock()方法的源码解析,请参照"第五章

第六章 HashSet源码解析

6.1.对于HashSet需要掌握以下几点 HashSet的创建:HashSet() 往HashSet中添加单个对象:即add(E)方法 删除HashSet中的对象:即remove(Object key)方法 判断对象是否存在于HashSet中:containsKey(Object key)  注:HashSet没有获取单个对象的方法,需要使用iterator 6.2.构建HashSet 源代码: //HashSet底层数据结构:通过hashmap的key不可重复的原则,使得存放入HashSet

第三章 CopyOnWriteArrayList源码解析

注:在看这篇文章之前,如果对ArrayList底层不清楚的话,建议先去看看ArrayList源码解析. http://www.cnblogs.com/java-zhao/p/5102342.html 1.对于CopyOnWriteArrayList需要掌握以下几点 创建:CopyOnWriteArrayList() 添加元素:即add(E)方法 获取单个对象:即get(int)方法 删除对象:即remove(E)方法 遍历所有对象:即iterator(),在实际中更常用的是增强型的for循环去做

Android进阶:四、RxJava2 源码解析 1

本文适合使用过Rxjava2或者了解Rxjava2的基本用法的同学阅读 一.Rxjava是什么Rxjava在GitHub 主页上的自我介绍是 "a library for composing asynchronous and event-based programs using observable sequences for the Java VM"(一个在 Java VM 上使用可观测的序列来组成异步的.基于事件的程序的库). 通俗来说,Rxjava是一个采用了观察者模式设计处理异

Android网络编程(四)从源码解析volley

相关文章 Android网络编程(一)HTTP协议原理 Android网络编程(二)HttpClient与HttpURLConnection Android网络编程(三)Volley用法全解析 1.volley结构图 从上图可以看到volley分为三个线程,分别是主线程.缓存调度线程.和网络调度线程,首先请求会加入缓存队列,如果发现可以找到相应的缓存结果就直接读取缓存并解析,然后回调给主线程:如果在缓存中没有找到结果,则将这条请求加入到网络队列中,然后发送HTTP请求,解析响应并写入缓存,并回调

第二章 ArrayList源码解析

一.对于ArrayList需要掌握的七点内容 ArrayList的创建:即构造器 往ArrayList中添加对象:即add(E)方法 获取ArrayList中的单个对象:即get(int index)方法 删除ArrayList中的对象:即remove(E)方法 遍历ArrayList中的对象:即iterator,在实际中更常用的是增强型的for循环去做遍历 判断对象是否存在于ArrayList中:contain(E) ArrayList中对象的排序:主要取决于所采取的排序算法(以后讲) 二.源

Netty(四):AbstractChannel源码解析

首先我们通过一张继承关系的图来认识下AbstractChannel在Netty中的位置. 除了Comaprable接口来自java自带的包,其他都是Netty包中提供的. Comparable接口定义了Channel是可以比较的. AttributeMap接口为Channel提供了绑定其他属性的能力. 这两个接口我们先不去深入了解,主要看ChannelOutboundInvoker. ChannelOutboundInvoker接口主要提供了与网络链路相关的一些操作以及读写相关的操作,并统一返回

第十一章 AtomicInteger源码解析

1.原子类 可以实现一些原子操作 基于CAS 下面就以AtomicInteger为例. 2.AtomicInteger 在没有AtomicInteger之前,对于一个Integer的线程安全操作,是需要使用同步锁来实现的,当然现在也可以通过ReentrantLock来实现,但是最好最方便的实现方式是采用AtomicInteger. 具体示例: package com.collection.test; import java.util.concurrent.atomic.AtomicInteger