死磕 java线程系列之自己动手写一个线程池(续)

(手机横屏看源码更方便)


问题

(1)自己动手写的线程池如何支持带返回值的任务呢?

(2)如果任务执行的过程中抛出异常了该怎么处理呢?

简介

上一章我们自己动手写了一个线程池,但是它是不支持带返回值的任务的,那么,我们自己能否实现呢?必须可以,今天我们就一起来实现带返回值任务的线程池。

前情回顾

首先,让我们先回顾一下上一章写的线程池:

(1)它包含四个要素:核心线程数、最大线程数、任务队列、拒绝策略;

(2)它具有执行无返回值任务的能力;

(3)它无法处理有返回值的任务;

(4)它无法处理任务执行的异常(线程中的异常不会抛出到线程外);

那么,我们能不能在现有的基础上实现其下面两项能力呢?让我们一起来试一试吧!

有返回值和无返回值的任务到底有何不同?

答案很明显,就是一个有返回值,一个无返回值,用伪代码来表示就是下面这样:

    // 无返回值
    threadPool.execute(()->{
        System.out.println(1);
    });
    // 有返回值,分两步走
    // 1. 提交任务到线程池中
    SomeClass result = threadPool.execute(()->{
        System.out.println(1);
        return 1;
    });
    // 2. 等待任务的结果返回
    Object value = result.get();
    

无返回值的任务提交了就完事,主线程并不Care它到底有没有执行完,并不关心它是不是抛出异常,主线程Just提交线程到线程池中,其余什么都不管。

有返回值的任务就不一样了,主线程首先要提交任务到线程池中,它需要使用到任务执行的结果,所以它必须等待任务执行完毕才能拿到任务执行的结果。

那么,为什么不直接在execute的时候就等待任务执行完毕呢?这样的话那不就跟串行没啥区别了,还不如直接在主线程执行任务呢,还少了线程切换的资源消耗。

之所以要分成两步,是因为主线程并不一定需要立即获取返回值,在需要用到返回值的时候才去get,这样就可以在提交任务和获取返回值之间干些其它的事情,提高效率。

所以,提交任务的时候不需要阻塞,get返回值的时候才可能需要阻塞,如果get的时候任务已经执行完毕了,这时候也不需要阻塞,如果get的时候任务还未执行完毕,那就要阻塞等待任务执行完毕才能获取到返回值。

实现分析

首先,无返回值的任务我们直接使用的Runnable函数式接口,有返回值的任务有没有现成的接口呢?还真有,那就是Callable接口,它有个返回值。

@FunctionalInterface
public interface Callable<V> {
    V call() throws Exception;
}

其次,提交任务的时候需要有个返回值,它是在将来用来获取任务执行结果的,实际上它也是新任务的一种能力,可以使用它对任务进行包装,使其具有返回值的能力。

public interface Future<T> {
    T get();
}

再次,我们需要给现有的线程池增加一种新的能力,根据单一职责原则,我们定义一个新的接口来承载这种能力。

public interface FutureExecutor extends Executor {
    <T> Future<T> submit(Callable<T> command);
}

然后,我们需要一种新的任务,它既具有旧任务的执行能力(run()方法),又具有新任务的返回值能力(get()方法),所以我们造一个“将来的任务”对提交的任务进行包装,使其具有返回值的能力。

public class FutureTask<T> implements Runnable, Future<T> {

    /**
     * 真正的任务
     */
    private Callable<T> task;

    public FutureTask(Callable<T> task) {
        this.task = task;
    }

    @Override
    public void run() {
        // 具体实现...
    }

    @Override
    public T get() {
        // 具体实现...
    }
}

最后,我们只要对原有的线程池进行扩展,将提交的任务包装成“将来获取返回值的任务”,还是使用原来的方法去执行,然后返回这个将来的任务即可。

根据开闭原则,【本篇文章由公众号“彤哥读源码”原创】原来的代码我们不做任何修改,扩展新的子类来实现新的能力。

public class MyThreadPoolFutureExecutor extends MyThreadPoolExecutor implements FutureExecutor, Executor {

    public MyThreadPoolFutureExecutor(String name, int coreSize, int maxSize, BlockingQueue<Runnable> taskQueue, RejectPolicy rejectPolicy) {
        super(name, coreSize, maxSize, taskQueue, rejectPolicy);
    }

    @Override
    public <T> Future<T> submit(Callable<T> task) {
        // 包装成将来获取返回值的任务
        FutureTask<T> futureTask = new FutureTask<>(task);
        // 还是使用原来的执行能力
        execute(futureTask);
        // 返回将来的任务,只需要返回其get返回值的能力即可
        // 所以这里返回的是Future而不是FutureTask类型
        return futureTask;
    }
}

好了,到这里整体的逻辑我们就已经比较清晰地实现完了,还剩下最关键的部分,这个“将来的任务”的两个能力要如何实现。

将来的任务

将来的任务,具有两个能力:一是执行真正任务的能力,二是将来获取返回值的能力。

public class FutureTask<T> implements Runnable, Future<T> {
    @Override
    public void run() {
        // 具体实现...
    }

    @Override
    public T get() {
        // 具体实现...
    }
}

首先,我们要明确一件事,任务的执行是线程池中,获取返回值是在主线程中,它们是在两个线程中执行的,而且谁先谁后我们无法确定。

其次,如果run()在get()之前执行,我们需要告诉get()任务已经执行完毕了,所以需要一个状态来通知这个事,还需要一个变量来承载任务执行的返回值。

    /**
     * 任务执行的状态,0未开始,1正常完成,2异常完成
     * 也可以使用volatile+Unsafe实现CAS操作
     */
    private AtomicInteger state = new AtomicInteger(NEW);
    private static final int NEW = 0;
    private static final int FINISHED = 1;
    private static final int EXCEPTION = 2;
    /**
     * 任务执行的结果【本篇文章由公众号“彤哥读源码”原创】
     * 如果执行正常,返回结果为T
     * 如果执行异常,返回结果为Exception
     */
    private Object result;

再次,如果get()在run()之前执行,那就需要阻塞等待run()执行完毕才能拿到返回值,所以需要保存调用者(主线程),get()的时候park阻塞住,run()完成了unpark唤醒它来拿返回值。

    /**
     * 调用者线程
     * 也可以使用volatile+Unsafe实现CAS操作
     */
    private AtomicReference<Thread> caller = new AtomicReference<>();

然后,我们先来看看run()方法的逻辑,它其实就是先执行真正的任务,然后修改状态为完成,并保存任务的返回值,如果保存了主线程,还要唤醒它。

    @Override
    public void run() {
        // 如果状态不是NEW,说明执行过了,直接返回
        if (state.get() != NEW) {
            return;
        }
        try {
            // 执行任务【本篇文章由公众号“彤哥读源码”原创】
            T r = task.call();
            // CAS更新state的值为FINISHED
            // 如果更新成功,就把r赋值给result
            // 如果更新失败,说明state的值不为NEW了,也就是任务已经执行过了
            if (state.compareAndSet(NEW, FINISHED)) {
                this.result = r;
                // finish()必须放在修改state里面,见下面的分析
                finish();
            }
        } catch (Exception e) {
            // 如果CAS更新state的值为EXCEPTION成功,就把e赋值给result
            // 如果CAS更新失败,说明state的值不为NEW了,也就是任务已经执行过了
            if (state.compareAndSet(NEW, EXCEPTION)) {
                this.result = e;
                // finish()必须放在修改state里面,见下面的分析
                finish();
            }
        }
    }

    private void finish() {
        // 检查调用者是否为空,如果不为空,唤醒它
        // 调用者在调用get()方法的进入阻塞状态
        for (Thread c; (c = caller.get()) != null;) {
            if (caller.compareAndSet(c, null)) {
                LockSupport.unpark(c);
            }
        }
    }

最后,我们再看看get()方法,如果任务还未执行,就阻塞等待任务的执行;如果任务已经执行完毕了,直接拿返回值即可;但是,还有一种情况,get()方法执行的过程中run()方法也在执行,所以get()方法中的每一步都要检查状态的值有没有变化。

@Override
    public T get() {
        int s = state.get();
        // 如果任务还未执行完成,判断当前线程是否要进入阻塞状态
        if (s == NEW) {
            // 标识调用者线程是否被标记过
            boolean marked = false;
            for (;;) {
                // 重新获取state的值
                s = state.get();
                // 如果state大于NEW说明完成了,跳出循环
                if (s > NEW) {
                    break;
                    // 此处必须把caller的CAS更新和park()方法分成两步处理,不能把park()放在CAS里面
                } else if (!marked) {
                    // 尝试更新调用者线程
                    // 试想断点停在此处【本篇文章由公众号“彤哥读源码”原创】
                    // 此时state为NEW,让run()方法执行到底,它不会执行finish()中的unpark()方法
                    // 这时打开断点,这里会更新caller成功,但是循环从头再执行一遍发现state已经变了,
                    // 直接在上面的if(s>NEW)处跳出循环了,因为finish()在修改state内部
                    marked = caller.compareAndSet(null, Thread.currentThread());
                } else {
                    // 调用者线程更新之后park当前线程
                    // 试想断点停在此处
                    // 此时state为NEW,让run()方法执行到底,因为上面的caller已经设置值了,
                    // 所以会执行finish()方法中的unpark()方法,
                    // 这时再打开断点,这里不会park信
                    // 见unpark()方法的注释,上面写得很清楚:
                    // 如果线程执行了park()方法,那么执行unpark()方法会唤醒那个线程
                    // 如果先执行了unpark()方法,那么线程下一次执行park()方法将不会阻塞
                    LockSupport.park();
                }
            }
        }

        if (s == FINISHED) {
            return (T) result;
        }
        throw new RuntimeException((Throwable) result);
    }

在我们的实现中,如果任务执行的过程抛出异常了,也是通过result返回给主线程,这样主线程就拿到了这个异常,它就可以做相应的处理了。

好了,完整的实现到此结束,不知道你领悟了没有。

测试用例

最后奉上测试代码:

public class MyThreadPoolFutureExecutorTest {
    public static void main(String[] args) {
        FutureExecutor threadPool = new MyThreadPoolFutureExecutor("test", 2, 4, new ArrayBlockingQueue<>(6), new DiscardRejectPolicy());
        List<Future<Integer>> list = new ArrayList<>();
        for (int i = 0; i < 100; i++) {
            int num = i;
            Future<Integer> future = threadPool.submit(() -> {
                Thread.sleep(1000);
                System.out.println("running: " + num);
                return num;
            });
            list.add(future);
        }

        for (Future<Integer> future : list) {
            System.out.println("runned: " + future.get());
        }
    }
}

运行结果:

thread name: core_test2
thread name: test4
thread name: test3
discard one task
thread name: core_test1
discard one task
...省略被拒绝的任务
【本篇文章由公众号“彤哥读源码”原创】
discard one task
running: 0
running: 1
running: 8
running: 9
runned: 0
runned: 1
running: 4
running: 2
running: 3
running: 5
runned: 2
runned: 3
runned: 4
runned: 5
running: 6
running: 7
runned: 6
runned: 7
runned: 8
runned: 9

总结

(1)有返回值的任务是通过包装成将来的任务来实现的,这个任务既具有基本的执行能力,又具有将来获取返回值的能力;

(2)任务执行的异常跟任务正常的返回值是通过同一个返回值返回到主线程的,主线程根据状态判断是异常还是正常值;

(3)我们的实现中运用了单一职责原则、开闭原则等设计原则,对原有代码没有造成任何的入侵;

彩蛋

手写线程池目前只打算写这两章,后面开始进入jdk原生线程池的源码分析,敬请期待。

另外,需要手写线程池完整源码的同学请关注我的公众号“彤哥读源码”,在后台回复“MyThreadPool”(不带引号)即可领取手写线程池完整源码,注意大小写不要弄错哦,否则彤哥是不会给你的哈。



欢迎关注我的公众号“彤哥读源码”,查看更多源码系列文章, 与彤哥一起畅游源码的海洋。

原文地址:https://www.cnblogs.com/tong-yuan/p/11651370.html

时间: 2024-10-06 07:31:04

死磕 java线程系列之自己动手写一个线程池(续)的相关文章

死磕 java线程系列之自己动手写一个线程池

欢迎关注我的公众号"彤哥读源码",查看更多源码系列文章, 与彤哥一起畅游源码的海洋. (手机横屏看源码更方便) 问题 (1)自己动手写一个线程池需要考虑哪些因素? (2)自己动手写的线程池如何测试? 简介 线程池是Java并发编程中经常使用到的技术,那么自己如何动手写一个线程池呢?本文彤哥将手把手带你写一个可用的线程池. 属性分析 线程池,顾名思义它首先是一个"池",这个池里面放的是线程,线程是用来执行任务的. 首先,线程池中的线程应该是有类别的,有的是核心线程,有

死磕 java同步系列之ReentrantReadWriteLock源码解析

问题 (1)读写锁是什么? (2)读写锁具有哪些特性? (3)ReentrantReadWriteLock是怎么实现读写锁的? (4)如何使用ReentrantReadWriteLock实现高效安全的TreeMap? 简介 读写锁是一种特殊的锁,它把对共享资源的访问分为读访问和写访问,多个线程可以同时对共享资源进行读访问,但是同一时间只能有一个线程对共享资源进行写访问,使用读写锁可以极大地提高并发量. 特性 读写锁具有以下特性: 是否互斥 读 写 读 否 是 写 是 是 可以看到,读写锁除了读读

死磕 java同步系列之Semaphore源码解析

问题 (1)Semaphore是什么? (2)Semaphore具有哪些特性? (3)Semaphore通常使用在什么场景中? (4)Semaphore的许可次数是否可以动态增减? (5)Semaphore如何实现限流? 简介 Semaphore,信号量,它保存了一系列的许可(permits),每次调用acquire()都将消耗一个许可,每次调用release()都将归还一个许可. 特性 Semaphore通常用于限制同一时间对共享资源的访问次数上,也就是常说的限流. 下面我们一起来学习Java

死磕 java同步系列之CountDownLatch源码解析

??欢迎关注我的公众号"彤哥读源码",查看更多源码系列文章, 与彤哥一起畅游源码的海洋. (手机横屏看源码更方便) 问题 (1)CountDownLatch是什么? (2)CountDownLatch具有哪些特性? (3)CountDownLatch通常运用在什么场景中? (4)CountDownLatch的初始次数是否可以调整? 简介 CountDownLatch,可以翻译为倒计时器,但是似乎不太准确,它的含义是允许一个或多个线程等待其它线程的操作执行完毕后再执行后续的操作. Cou

死磕 java同步系列之AQS终篇(面试)

问题 (1)AQS的定位? (2)AQS的重要组成部分? (3)AQS运用的设计模式? (4)AQS的总体流程? 简介 AQS的全称是AbstractQueuedSynchronizer,它的定位是为Java中几乎所有的锁和同步器提供一个基础框架. 在之前的章节中,我们一起学习了ReentrantLock.ReentrantReadWriteLock.Semaphore.CountDownLatch的源码,今天我们一起来对AQS做个总结. 状态变量state AQS中定义了一个状态变量state

死磕 java同步系列之StampedLock源码解析

问题 (1)StampedLock是什么? (2)StampedLock具有什么特性? (3)StampedLock是否支持可重入? (4)StampedLock与ReentrantReadWriteLock的对比? 简介 StampedLock是java8中新增的类,它是一个更加高效的读写锁的实现,而且它不是基于AQS来实现的,它的内部自成一片逻辑,让我们一起来学习吧. StampedLock具有三种模式:写模式.读模式.乐观读模式. ReentrantReadWriteLock中的读和写都是

死磕 java同步系列之CyclicBarrier源码解析——有图有真相

问题 (1)CyclicBarrier是什么? (2)CyclicBarrier具有什么特性? (3)CyclicBarrier与CountDownLatch的对比? 简介 CyclicBarrier,回环栅栏,它会阻塞一组线程直到这些线程同时达到某个条件才继续执行.它与CountDownLatch很类似,但又不同,CountDownLatch需要调用countDown()方法触发事件,而CyclicBarrier不需要,它就像一个栅栏一样,当一组线程都到达了栅栏处才继续往下走. 使用方法 pu

死磕 java同步系列之Phaser源码解析

问题 (1)Phaser是什么? (2)Phaser具有哪些特性? (3)Phaser相对于CyclicBarrier和CountDownLatch的优势? 简介 Phaser,翻译为阶段,它适用于这样一种场景,一个大任务可以分为多个阶段完成,且每个阶段的任务可以多个线程并发执行,但是必须上一个阶段的任务都完成了才可以执行下一个阶段的任务. 这种场景虽然使用CyclicBarrier或者CountryDownLatch也可以实现,但是要复杂的多.首先,具体需要多少个阶段是可能会变的,其次,每个阶

死磕 java同步系列之zookeeper分布式锁

问题 (1)zookeeper如何实现分布式锁? (2)zookeeper分布式锁有哪些优点? (3)zookeeper分布式锁有哪些缺点? 简介 zooKeeper是一个分布式的,开放源码的分布式应用程序协调服务,它可以为分布式应用提供一致性服务,它是Hadoop和Hbase的重要组件,同时也可以作为配置中心.注册中心运用在微服务体系中. 本章我们将介绍zookeeper如何实现分布式锁运用在分布式系统中. 基础知识 什么是znode? zooKeeper操作和维护的为一个个数据节点,称为 z