死磕 java线程系列之自己动手写一个线程池

欢迎关注我的公众号“彤哥读源码”,查看更多源码系列文章, 与彤哥一起畅游源码的海洋。

(手机横屏看源码更方便)


问题

(1)自己动手写一个线程池需要考虑哪些因素?

(2)自己动手写的线程池如何测试?

简介

线程池是Java并发编程中经常使用到的技术,那么自己如何动手写一个线程池呢?本文彤哥将手把手带你写一个可用的线程池。

属性分析

线程池,顾名思义它首先是一个“池”,这个池里面放的是线程,线程是用来执行任务的。

首先,线程池中的线程应该是有类别的,有的是核心线程,有的是非核心线程,所以我们需要两个变量标识核心线程数量coreSize和最大线程数量maxSize。

为什么要区分是否为核心线程呢?这是为了控制系统中线程的数量。

当线程池中线程数未达到核心线程数coreSize时,来一个任务加一个线程是可以的,也可以提高任务执行的效率。

当线程池中线程数达到核心线程数后,得控制一下线程的数量,来任务了先进队列,如果任务执行足够快,这些核心线程很快就能把队列中的任务执行完毕,完全没有新增线程的必要。

当队列中任务也满了,这时候光靠核心线程就无法及时处理任务了,所以这时候就需要增加新的线程了,但是线程也不能无限制地增加,所以需要控制其最大线程数量maxSize。

其次,我们需要一个任务队列来存放任务,这个队列必须是线程安全的,我们一般使用BlockingQueue阻塞队列来充当,当然使用ConcurrentLinkedQueue也是可以的(注意ConcurrentLinkedQueue不是阻塞队列,不能运用在jdk的线程池中)。

最后,当任务越来越多而线程处理却不及时,迟早会达到一种状态,队列满了,线程数也达到最大线程数了,这时候怎么办呢?这时候就需要走拒绝策略了,也就是这些无法及时处理的任务怎么办的一种策略,常用的策略有丢弃当前任务、丢弃最老的任务、调用者自己处理、抛出异常等。

根据上面的描述,我们定义一个线程池一共需要这么四个变量:核心线程数coreSize、最大线程数maxSize、阻塞队列BlockingQueue、拒绝策略RejectPolicy。

另外,为了便于给线程池一个名称,我们再加一个变量:线程池的名称name。

所以我们得出了线程池的属性及构造方法大概如下:

public class MyThreadPoolExecutor implements Executor {
    /**
     * 线程池的名称
     */
    private String name;
    /**
     * 核心线程数
     */
    private int coreSize;
    /**
     * 最大线程数
     */
    private int maxSize;
    /**
     * 任务队列
     */
    private BlockingQueue<Runnable> taskQueue;
    /**
     * 拒绝策略
     */
    private RejectPolicy rejectPolicy;

    public MyThreadPoolExecutor(String name, int coreSize, int maxSize, BlockingQueue<Runnable> taskQueue, RejectPolicy rejectPolicy) {
        this.name = name;
        this.coreSize = coreSize;
        this.maxSize = maxSize;
        this.taskQueue = taskQueue;
        this.rejectPolicy = rejectPolicy;
    }
}

任务流向分析

根据上面的属性分析,基本上我们已经得到了任务流向的完整逻辑:

首先,如果运行的线程数小于核心线程数,直接创建一个新的核心线程来运行新的任务。

其次,如果运行的线程数达到了核心线程数,则把新任务入队列。

然后,如果队列也满了,则创建新的非核心线程来运行新的任务。

最后,如果非核心线程数也达到最大了,那就执行拒绝策略。

代码逻辑大致如下:

    @Override
    public void execute(Runnable task) {
        // 正在运行的线程数
        int count = runningCount.get();
        // 如果正在运行的线程数小于核心线程数,直接加一个线程
        if (count < coreSize) {
            // 注意,这里不一定添加成功,addWorker()方法里面还要判断一次是不是确实小
            if (addWorker(task, true)) {
                return;
            }
            // 如果添加核心线程失败,进入下面的逻辑
        }

        // 如果达到了核心线程数,先尝试让任务入队
        // 这里之所以使用offer(),是因为如果队列满了offer()会立即返回false
        if (taskQueue.offer(task)) {
            // do nothing,为了逻辑清晰这里留个空if
            // 【本篇文章由公众号“彤哥读源码”原创】
        } else {
            // 如果入队失败,说明队列满了,那就添加一个非核心线程
            if (!addWorker(task, false)) {
                // 如果添加非核心线程失败了,那就执行拒绝策略
                rejectPolicy.reject(task, this);
            }
        }
    }

创建线程逻辑分析

首先,创建线程的依据是正在运行的线程数量有没有达到核心线程数或者最大线程数,所以我们还需要一个变量runningCount用来记录正在运行的线程数。

其次,这个变量runningCount需要在并发环境下加加减减,所以这里需要使用到Unsafe的CAS指令来控制其值的修改,用了CAS就要给这个变量加上volatile修饰,为了方便我们这里直接使用AtomicInteger来作为这个变量的类型。

然后,因为是并发环境中,所以需要判断runningCount < coreSize(或maxSize)(条件一)的同时修改runningCount CAS加一(条件二)成功了才表示可以增加一个线程,如果条件一失败则表示不能再增加线程了直接返回false,如果条件二失败则表示其它线程先修改了runningCount的值,则重试。

最后,创建一个线程并运行新任务,且不断从队列中拿任务来运行【本篇文章由公众号“彤哥读源码”原创】。

代码逻辑如下:

    private boolean addWorker(Runnable newTask, boolean core) {
        // 自旋判断是不是真的可以创建一个线程
        for (; ; ) {
            // 正在运行的线程数
            int count = runningCount.get();
            // 核心线程还是非核心线程
            int max = core ? coreSize : maxSize;
            // 不满足创建线程的条件,直接返回false
            if (count >= max) {
                return false;
            }
            // 修改runningCount成功,可以创建线程
            if (runningCount.compareAndSet(count, count + 1)) {
                // 线程的名字
                String threadName = (core ? "core_" : "") + name + sequence.incrementAndGet();
                // 创建线程并启动
                new Thread(() -> {
                    System.out.println("thread name: " + Thread.currentThread().getName());
                    // 运行的任务
                    Runnable task = newTask;
                    // 不断从任务队列中取任务执行,如果取出来的任务为null,则跳出循环,线程也就结束了
                    while (task != null || (task = getTask()) != null) {
                        try {
                            // 执行任务
                            task.run();
                        } finally {
                            // 任务执行完成,置为空
                            task = null;
                        }
                    }
                }, threadName).start();

                break;
            }
        }

        return true;
    }

取任务逻辑分析

从队列中取任务应该使用take()方法,这个方法会一直阻塞直至取到任务或者中断,如果中断了就返回null,这样当前线程也就可以安静地结束了,另外还要注意中断了记得把runningCount减一。

    private Runnable getTask() {
        try {
            // take()方法会一直阻塞直到取到任务为止
            return taskQueue.take();
        } catch (InterruptedException e) {
            // 线程中断了,返回null可以结束当前线程
            // 当前线程都要结束了,理应要把runningCount的数量减一
            runningCount.decrementAndGet();
            return null;
        }
    }

好了,到这里我们自己的线程池就写完了,下面我们一起来想想怎么测试呢?

测试逻辑分析

我们再来回顾下自己的写的线程池的构造方法:

    public MyThreadPoolExecutor(String name, int coreSize, int maxSize, BlockingQueue<Runnable> taskQueue, RejectPolicy rejectPolicy) {
        this.name = name;
        this.coreSize = coreSize;
        this.maxSize = maxSize;
        this.taskQueue = taskQueue;
        this.rejectPolicy = rejectPolicy;
    }

name,这个随便传;

coreSize,我们假设为5;

maxSize,我们假设为10;

taskQueue,任务队列,既然我们设置的是有边界的,我们就用最简单的ArrayBlockingQueue好吧,容量设置为15,这样里面最多可以存储15条任务;

rejectPolicy,拒绝策略,我们假设使用丢弃当前任务的策略,OK,我们来实现一个。

/**
 * 丢弃当前任务
 */
public class DiscardRejectPolicy implements RejectPolicy {
    @Override
    public void reject(Runnable task, MyThreadPoolExecutor myThreadPoolExecutor) {
        // do nothing
        System.out.println("discard one task");
    }
}

OK,这样一个线程池就创建完成了,下面就是执行任务了,我们假设通过for循环连续不断地添加100个任务好不好。

public class MyThreadPoolExecutorTest {
    public static void main(String[] args) {
        Executor threadPool = new MyThreadPoolExecutor("test", 5, 10, new ArrayBlockingQueue<>(15), new DiscardRejectPolicy());
        AtomicInteger num = new AtomicInteger(0);

        for (int i = 0; i < 100; i++) {
            threadPool.execute(()->{
                try {
                    Thread.sleep(1000);
                    System.out.println("running: " + System.currentTimeMillis() + ": " + num.incrementAndGet());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }
    }
}

我们分析下这段程序:

(1)先连续创建了5个核心线程,并执行了新任务;

(2)后面的15个任务进了队列;

(3)队列满了,又连续创建了5个线程,并执行了新任务;

(4)后面的任务就没得执行了,全部走了丢弃策略;

(5)所以真正执行成功的任务应该是 5 + 15 + 5 = 25 条任务;

运行之:

thread name: core_test2
thread name: core_test5
thread name: core_test3
thread name: core_test4
thread name: core_test1
thread name: test6
thread name: test7
thread name: test8
thread name: test9
discard one task
thread name: test10
discard one task
...省略被拒绝的任务
【本篇文章由公众号“彤哥读源码”原创】
discard one task
running: 1570546871851: 2
running: 1570546871851: 8
running: 1570546871851: 7
running: 1570546871851: 6
running: 1570546871851: 5
running: 1570546871851: 3
running: 1570546871851: 4
running: 1570546871851: 1
running: 1570546871851: 10
running: 1570546871851: 9
running: 1570546872852: 14
running: 1570546872852: 20
running: 1570546872852: 19
running: 1570546872852: 17
running: 1570546872852: 18
running: 1570546872852: 16
running: 1570546872852: 15
running: 1570546872852: 12
running: 1570546872852: 13
running: 1570546872852: 11
running: 1570546873852: 21
running: 1570546873852: 24
running: 1570546873852: 23
running: 1570546873852: 25
running: 1570546873852: 22

可以看到,创建了5个核心线程、5个非核心线程,成功执行了25条任务,完成没问题,完美^^。

总结

(1)自己动手写一个线程池需要考虑的因素主要有:核心线程数、最大线程数、任务队列、拒绝策略。

(2)创建线程的时候要时刻警惕并发的陷阱;

彩蛋

我们知道,jdk自带的线程池还有两个参数:keepAliveTime、unit,它们是干什么的呢?

答:它们是用来控制何时销毁非核心线程的,当然也可以销毁核心线程,具体的分析请期待下一章吧。

完整源码

Executor接口

public interface Executor {
    void execute(Runnable command);
}

MyThreadPoolExecutor线程池实现类

/**
 * 自动动手写一个线程池
 */
public class MyThreadPoolExecutor implements Executor {

    /**
     * 线程池的名称
     */
    private String name;
    /**
     * 线程序列号
     */
    private AtomicInteger sequence = new AtomicInteger(0);
    /**
     * 核心线程数
     */
    private int coreSize;
    /**
     * 最大线程数
     */
    private int maxSize;
    /**
     * 任务队列
     */
    private BlockingQueue<Runnable> taskQueue;
    /**
     * 拒绝策略
     */
    private RejectPolicy rejectPolicy;
    /**
     * 当前正在运行的线程数【本篇文章由公众号“彤哥读源码”原创】
     * 需要修改时线程间立即感知,所以使用AtomicInteger
     * 或者也可以使用volatile并结合Unsafe做CAS操作(参考Unsafe篇章讲解)
     */
    private AtomicInteger runningCount = new AtomicInteger(0);

    public MyThreadPoolExecutor(String name, int coreSize, int maxSize, BlockingQueue<Runnable> taskQueue, RejectPolicy rejectPolicy) {
        this.name = name;
        this.coreSize = coreSize;
        this.maxSize = maxSize;
        this.taskQueue = taskQueue;
        this.rejectPolicy = rejectPolicy;
    }

    @Override
    public void execute(Runnable task) {
        // 正在运行的线程数
        int count = runningCount.get();
        // 如果正在运行的线程数小于核心线程数,直接加一个线程
        if (count < coreSize) {
            // 注意,这里不一定添加成功,addWorker()方法里面还要判断一次是不是确实小
            if (addWorker(task, true)) {
                return;
            }
            // 如果添加核心线程失败,进入下面的逻辑
        }

        // 如果达到了核心线程数,先尝试让任务入队
        // 这里之所以使用offer(),是因为如果队列满了offer()会立即返回false
        if (taskQueue.offer(task)) {
            // do nothing,为了逻辑清晰这里留个空if
        } else {
            // 如果入队失败,说明队列满了,那就添加一个非核心线程
            if (!addWorker(task, false)) {
                // 如果添加非核心线程失败了,那就执行拒绝策略
                rejectPolicy.reject(task, this);
            }
        }
    }

    private boolean addWorker(Runnable newTask, boolean core) {
        // 自旋判断是不是真的可以创建一个线程
        for (; ; ) {
            // 正在运行的线程数
            int count = runningCount.get();
            // 核心线程还是非核心线程
            int max = core ? coreSize : maxSize;
            // 不满足创建线程的条件,直接返回false
            if (count >= max) {
                return false;
            }
            // 修改runningCount成功,可以创建线程
            if (runningCount.compareAndSet(count, count + 1)) {
                // 线程的名字
                String threadName = (core ? "core_" : "") + name + sequence.incrementAndGet();
                // 创建线程并启动
                new Thread(() -> {
                    System.out.println("thread name: " + Thread.currentThread().getName());
                    // 运行的任务【本篇文章由公众号“彤哥读源码”原创】
                    Runnable task = newTask;
                    // 不断从任务队列中取任务执行,如果取出来的任务为null,则跳出循环,线程也就结束了
                    while (task != null || (task = getTask()) != null) {
                        try {
                            // 执行任务
                            task.run();
                        } finally {
                            // 任务执行完成,置为空
                            task = null;
                        }
                    }
                }, threadName).start();

                break;
            }
        }

        return true;
    }

    private Runnable getTask() {
        try {
            // take()方法会一直阻塞直到取到任务为止
            return taskQueue.take();
        } catch (InterruptedException e) {
            // 线程中断了,返回null可以结束当前线程
            // 当前线程都要结束了,理应要把runningCount的数量减一
            runningCount.decrementAndGet();
            return null;
        }
    }

}

RejectPolicy拒绝策略接口

public interface RejectPolicy {
    void reject(Runnable task, MyThreadPoolExecutor myThreadPoolExecutor);
}

DiscardRejectPolicy丢弃策略实现类

/**
 * 丢弃当前任务
 */
public class DiscardRejectPolicy implements RejectPolicy {
    @Override
    public void reject(Runnable task, MyThreadPoolExecutor myThreadPoolExecutor) {
        // do nothing
        System.out.println("discard one task");
    }
}

测试类

public class MyThreadPoolExecutorTest {
    public static void main(String[] args) {
        Executor threadPool = new MyThreadPoolExecutor("test", 5, 10, new ArrayBlockingQueue<>(15), new DiscardRejectPolicy());
        AtomicInteger num = new AtomicInteger(0);

        for (int i = 0; i < 100; i++) {
            threadPool.execute(()->{
                try {
                    Thread.sleep(1000);
                    System.out.println("running: " + System.currentTimeMillis() + ": " + num.incrementAndGet());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }

    }
}


欢迎关注我的公众号“彤哥读源码”,查看更多源码系列文章, 与彤哥一起畅游源码的海洋。

原文地址:https://www.cnblogs.com/tong-yuan/p/11639269.html

时间: 2024-12-12 13:20:46

死磕 java线程系列之自己动手写一个线程池的相关文章

死磕 java线程系列之自己动手写一个线程池(续)

(手机横屏看源码更方便) 问题 (1)自己动手写的线程池如何支持带返回值的任务呢? (2)如果任务执行的过程中抛出异常了该怎么处理呢? 简介 上一章我们自己动手写了一个线程池,但是它是不支持带返回值的任务的,那么,我们自己能否实现呢?必须可以,今天我们就一起来实现带返回值任务的线程池. 前情回顾 首先,让我们先回顾一下上一章写的线程池: (1)它包含四个要素:核心线程数.最大线程数.任务队列.拒绝策略: (2)它具有执行无返回值任务的能力: (3)它无法处理有返回值的任务: (4)它无法处理任务

死磕 java同步系列之ReentrantReadWriteLock源码解析

问题 (1)读写锁是什么? (2)读写锁具有哪些特性? (3)ReentrantReadWriteLock是怎么实现读写锁的? (4)如何使用ReentrantReadWriteLock实现高效安全的TreeMap? 简介 读写锁是一种特殊的锁,它把对共享资源的访问分为读访问和写访问,多个线程可以同时对共享资源进行读访问,但是同一时间只能有一个线程对共享资源进行写访问,使用读写锁可以极大地提高并发量. 特性 读写锁具有以下特性: 是否互斥 读 写 读 否 是 写 是 是 可以看到,读写锁除了读读

死磕 java同步系列之Semaphore源码解析

问题 (1)Semaphore是什么? (2)Semaphore具有哪些特性? (3)Semaphore通常使用在什么场景中? (4)Semaphore的许可次数是否可以动态增减? (5)Semaphore如何实现限流? 简介 Semaphore,信号量,它保存了一系列的许可(permits),每次调用acquire()都将消耗一个许可,每次调用release()都将归还一个许可. 特性 Semaphore通常用于限制同一时间对共享资源的访问次数上,也就是常说的限流. 下面我们一起来学习Java

死磕 java同步系列之CountDownLatch源码解析

??欢迎关注我的公众号"彤哥读源码",查看更多源码系列文章, 与彤哥一起畅游源码的海洋. (手机横屏看源码更方便) 问题 (1)CountDownLatch是什么? (2)CountDownLatch具有哪些特性? (3)CountDownLatch通常运用在什么场景中? (4)CountDownLatch的初始次数是否可以调整? 简介 CountDownLatch,可以翻译为倒计时器,但是似乎不太准确,它的含义是允许一个或多个线程等待其它线程的操作执行完毕后再执行后续的操作. Cou

死磕 java同步系列之AQS终篇(面试)

问题 (1)AQS的定位? (2)AQS的重要组成部分? (3)AQS运用的设计模式? (4)AQS的总体流程? 简介 AQS的全称是AbstractQueuedSynchronizer,它的定位是为Java中几乎所有的锁和同步器提供一个基础框架. 在之前的章节中,我们一起学习了ReentrantLock.ReentrantReadWriteLock.Semaphore.CountDownLatch的源码,今天我们一起来对AQS做个总结. 状态变量state AQS中定义了一个状态变量state

死磕 java同步系列之StampedLock源码解析

问题 (1)StampedLock是什么? (2)StampedLock具有什么特性? (3)StampedLock是否支持可重入? (4)StampedLock与ReentrantReadWriteLock的对比? 简介 StampedLock是java8中新增的类,它是一个更加高效的读写锁的实现,而且它不是基于AQS来实现的,它的内部自成一片逻辑,让我们一起来学习吧. StampedLock具有三种模式:写模式.读模式.乐观读模式. ReentrantReadWriteLock中的读和写都是

死磕 java同步系列之CyclicBarrier源码解析——有图有真相

问题 (1)CyclicBarrier是什么? (2)CyclicBarrier具有什么特性? (3)CyclicBarrier与CountDownLatch的对比? 简介 CyclicBarrier,回环栅栏,它会阻塞一组线程直到这些线程同时达到某个条件才继续执行.它与CountDownLatch很类似,但又不同,CountDownLatch需要调用countDown()方法触发事件,而CyclicBarrier不需要,它就像一个栅栏一样,当一组线程都到达了栅栏处才继续往下走. 使用方法 pu

死磕 java同步系列之Phaser源码解析

问题 (1)Phaser是什么? (2)Phaser具有哪些特性? (3)Phaser相对于CyclicBarrier和CountDownLatch的优势? 简介 Phaser,翻译为阶段,它适用于这样一种场景,一个大任务可以分为多个阶段完成,且每个阶段的任务可以多个线程并发执行,但是必须上一个阶段的任务都完成了才可以执行下一个阶段的任务. 这种场景虽然使用CyclicBarrier或者CountryDownLatch也可以实现,但是要复杂的多.首先,具体需要多少个阶段是可能会变的,其次,每个阶

死磕 java同步系列之zookeeper分布式锁

问题 (1)zookeeper如何实现分布式锁? (2)zookeeper分布式锁有哪些优点? (3)zookeeper分布式锁有哪些缺点? 简介 zooKeeper是一个分布式的,开放源码的分布式应用程序协调服务,它可以为分布式应用提供一致性服务,它是Hadoop和Hbase的重要组件,同时也可以作为配置中心.注册中心运用在微服务体系中. 本章我们将介绍zookeeper如何实现分布式锁运用在分布式系统中. 基础知识 什么是znode? zooKeeper操作和维护的为一个个数据节点,称为 z