线程池原理以及自定义线程池

第一部分:对线程池的需求分析

/*
    8.1 线程池原理

    一个完整的线程池应该具备如下要素:
        1.任务队列:用于缓存提交的任务
        2.线程数量管理功能:可通个三个参数实现:
            init:创建时初始的线程数量
            max:线程池自动扩充时最大的线程数量
            core:空闲时但是需要释放线程,但是也要维护一定数量的活跃线程
        3.任务拒绝策略:
        4.线程工程:主要用于个性化定制线程,比如设置守护线程、设置线程名称等
        5.QueueSize:任务队列主要存放Runnable,防止内存溢出,需要有limit数量限制
        6.keepedalive时间:该时间主要决定线程各个重要参数自动维护的时间间隔

*/


第二部分:对线程池运行过程的简单分析

/*
    main:-------P----------------------------------------------
                |
    pool:       |-----A----B---C-------------------------------
                      |    |   |
       A:             |----|---|-------------------------------
                           |   |
       B:                  |---|-------------------------------
                               |
       C:                      |-------------------------------
 */


第三部分:接口定义

  ThreadPool接口:

public interface ThreadPool {

    //提交任务到线程池
    void execute(Runnable runnable);

    //关闭线程池
    void shutdown();

    //获取线程池的初始化大小
    int getInitSize();

    //获取线程池最大的线程数
    int getMaxSize();

    //获取线程池的核心线程数量
    int getCoreSize();

    //获取线程池中活跃线程数量
    int getActiveCount();

    //获取线程池中用于缓存任务队列的大小
    int getQueueSize();

    //查看线程池是否已经别shutdown
    boolean isShutdown();

}

  RunnableQueue接口:

//任务多列,主要用于缓存提交到线程池中的任务
public interface RunnableQueue{

    //当有新的任务捡来时首先会offer到队列中
    void offer(Runnable runnable);

    //工作线程通过take方法获取Runnable
    Runnable take() throws InterruptedException;

    //获取任务队列中任务的数量
    int size();

}

  DenyPolicy接口:

@FunctionalInterface
public interface DenyPolicy{

    void reject(Runnable runnable,ThreadPool threadPool);
}


第四部分:定义异常

  RunnableDenyException:

public class RunnableDenyException extends RuntimeException{
    public RunnableDenyException(String msg){
        super(msg);
    }
}


第五部分:实现DenyPolicy

  ——实际上采用的是在接口中直接实现的

@FunctionalInterface
public interface DenyPolicy{
    void reject(Runnable runnable,ThreadPool threadPool);

    //该策略会直接将任务丢弃
    class DiscardDenyPolicy implements DenyPolicy{

        @Override
        public void reject(Runnable runnable, ThreadPool threadPool) {
            //do nothing
        }
    }

    //该策略会向任务提交这抛出异常
    class AbortDenyPolicy implements DenyPolicy{

        @Override
        public void reject(Runnable runnable, ThreadPool threadPool) {
            throw new RunnableDenyException("The runnable "+runnable+" will be abort.");
        }
    }

    //该策略会使任务在提交者所在的线程中执行任务
    class RunnerDenyPlolicy implements DenyPolicy{

        @Override
        public void reject(Runnable runnable, ThreadPool threadPool) {
            runnable.run();
        }
    }
}


第六部分:实现InternalTask

public class InternalTask implements Runnable{

    private final RunnableQueue runnableQueue;
    private volatile boolean running = true;

    public InternalTask(RunnableQueue runnableQueue){
        this.runnableQueue=runnableQueue;
    }

    /*
        对run方法的分析:
            如果当前任务为running且没有被中断,则将不断地从queue中获取Runnable
            然后执行run方法。让这个任务停止的方法是:
                1.在pool线程中,调用该线程对象的interrupt方法
                2.在pool线程中,调用该InternalTask对象的stop方法
                    ——这也就解释了为什么在BasicThreadPool中
                        将InternalTask对象和执行InternalTask对象
                        的线程组合在一起保存。
     */
    @Override
    public void run() {
        while(running&&!Thread.currentThread().isInterrupted()){
            try {
                Runnable task = runnableQueue.take();
                task.run();
            } catch (InterruptedException e) {
                running = false;
                break;
            }
        }
    }

    public void stop(){
        this.running = false;
    }
}


第七部分:实现RunableQueue

import java.util.LinkedList;

public class LinkedRunnableQueue implements RunnableQueue{

    private final int limit;
    private final DenyPolicy denyPolicy;
    private final ThreadPool threadPool;

    /*
        这个地方有个很LinkedList的用法经验:
            addLast():从队列的尾部添加一个元素
            removeFirst():从队列的头部拿出一个元素
     */
    private final LinkedList<Runnable> runnableList = new LinkedList<>();

    public LinkedRunnableQueue(int limit, DenyPolicy denyPolicy, ThreadPool threadPool) {
        this.limit = limit;
        this.denyPolicy = denyPolicy;
        this.threadPool = threadPool;
    }

    @Override
    public void offer(Runnable runnable) {
        synchronized (runnableList){
            if(runnableList.size()>=limit){
                denyPolicy.reject(runnable,threadPool);
            }else {
                runnableList.addLast(runnable);
                runnableList.notifyAll();
            }
        }
    }

    @Override
    public Runnable take() throws InterruptedException {
        synchronized (runnableList) {
            while (runnableList.isEmpty()) {
                try {
                    /*
                        如果任务队列中没有可执行的任务,则当前线程将会被挂起,
                        所以在offer中,当队列中添加成员后,需要调用notifyAll
                     */
                    runnableList.wait();
                } catch (InterruptedException e) {
                    /*
                        写法分析:
                            其实这个地方可以不用这么写,我就写一个wait,然后在方法上将其抛出去,
                            这也是允许的。
                     */
                    throw e;
                }
            }
            return runnableList.removeFirst();
        }
    }

    @Override
    public int size() {
        synchronized (runnableList){
            return runnableList.size();
        }
    }
}


第八部分:实现ThreadPool

import java.util.ArrayDeque;
import java.util.Queue;
import java.util.concurrent.TimeUnit;

public class BasicThreadPool extends Thread implements ThreadPool {

    private final static DenyPolicy DEFAULT_DENY_POLICY = new DenyPolicy.DiscardDenyPolicy();
    private final static ThreadFactory DEFAULT_THREAD_FACTORY = new ThreadFactory.DefaultThreadFactory();

    private int initSize;
    private int maxSize;
    private int coreSize;
    private int activeCount;

    private final ThreadFactory threadFactory;
    private final RunnableQueue runnableQueue;
    private final long keepAliveTime;
    private final TimeUnit timeUnit;

    private volatile boolean isShutdown = false;

    /*
        稍微注意下这些容器的写法
     */
    private final Queue<ThreadTask> threadQueue=new ArrayDeque<>();

    public BasicThreadPool(int initSize, int maxSize, int coreSize,int queueSize){
        this(initSize,maxSize,coreSize,DEFAULT_THREAD_FACTORY,
                queueSize,DEFAULT_DENY_POLICY,10,TimeUnit.SECONDS);
    }

    public BasicThreadPool(int initSize, int maxSize, int coreSize,
                           ThreadFactory threadFactory, int queueSize,
                           DenyPolicy denyPolicy, long keepAliveTime,
                           TimeUnit timeUnit) {
        this.initSize = initSize;
        this.maxSize = maxSize;
        this.coreSize = coreSize;

        this.threadFactory = threadFactory;

        this.runnableQueue=new LinkedRunnableQueue(queueSize,denyPolicy,this);

        this.keepAliveTime = keepAliveTime;
        this.timeUnit = timeUnit;

        this.init();
    }

    //创建这个线程池的线程,执行这个方法
    private void init(){
        start();
        for (int i = 0; i < initSize; i++) {
            newThread();
        }
    }

    private void newThread() {
        /*
            如果Pool本身只保存internalTask,则InternalTask需要集成许多对
            任务的管理,但是它终究只是一个Runnable,所以并不现实
         */
        InternalTask internalTask = new InternalTask(runnableQueue);
        Thread thread = this.threadFactory.createThread(internalTask);
        ThreadTask threadTask = new ThreadTask(thread,internalTask);
        threadQueue.offer(threadTask);
        this.activeCount++;
        thread.start();
    }

    //从线程池中移除某个线程
    private void removeThread(){

        ThreadTask threadTask = threadQueue.remove();
        /*
            当调用stop方法是,循环将会不被执行,线程也就自然的结束了
            生命周期。
         */
        threadTask.internalTask.stop();
        this.activeCount--;
    }

    //维护线程数量,比如扩容、回收等工作
    public void run(){
        /*
            只有调用了Pool的shutdown方法,或者对其中断了,才后导致其
            退出循环。
                ——很奇怪的一点哦,我直接把Pool线程给退出了,Pool
                    线程创建的线程我不进行管理了么。
         */
        while(!isShutdown&&!isInterrupted()){
            try{
                timeUnit.sleep(keepAliveTime);
            } catch (InterruptedException e) {
                isShutdown=true;
                break;
            }
            synchronized (this){
                //多次确认,是否关闭这个线程了。。。
                if(isShutdown)
                    break;

                //当线程中有任务尚未处理,并且activeCount<coreSize则继续扩容
                if(runnableQueue.size()>0&&activeCount<coreSize){
                    for (int i = initSize; i < coreSize; i++) {
                        newThread();
                    }
                    /*
                        书中说continue的目的是,不想让线程的扩容直接到达maxsize。
                        这个continue会导致while循环重新判断,从而导致该线程睡眠
                        keepAliveTime时间。
                     */
                    continue;
                }

                //当前队列中有任务尚未处理,并且activeCount<maxSize则kuorong
                if (runnableQueue.size() > 0 && activeCount < maxSize) {
                    for (int i = coreSize; i < maxSize; i++) {
                        newThread();
                    }
                }

                //如果任务队列中没有任务,则需要回收,回收至coreSize即可
                if(runnableQueue.size()==0&&activeCount>coreSize){
                    for (int i = coreSize; i < activeCount; i++) {
                        removeThread();
                    }
                }
            }
        }
    }

    @Override
    public void execute(Runnable runnable) {
        if(this.isShutdown){
            throw new IllegalStateException("The thread pool is destory");
        }
        this.runnableQueue.offer(runnable);
    }

    /*
        问题还是存在着,而且现在更尴尬了,如果pool被interrupt了,那么
        shutdown清理线程部分就永远都不会执行了。
     */
    @Override
    public void shutdown() {
        if(isShutdown)
            return;

        isShutdown=true;

        threadQueue.forEach(threadTask -> {
            threadTask.internalTask.stop();
            threadTask.thread.interrupt();
        });
    }

    @Override
    public int getInitSize() {
        if(this.isShutdown){
            throw new IllegalStateException("The thread pool is destory");
        }
        return initSize;
    }

    @Override
    public int getMaxSize() {
        if(this.isShutdown){
            throw new IllegalStateException("The thread pool is destory");
        }
        return maxSize;
    }

    @Override
    public int getCoreSize() {
        if(this.isShutdown){
            throw new IllegalStateException("The thread pool is destory");
        }
        return coreSize;
    }

    @Override
    public int getActiveCount() {
        synchronized (this){
            return this.activeCount;
        }
    }

    @Override
    public int getQueueSize() {
        if(this.isShutdown){
            throw new IllegalStateException("The thread pool is destory");
        }
        return runnableQueue.size();
    }

    @Override
    public boolean isShutdown() {
        return this.isShutdown;
    }

    private static class ThreadTask{
        Thread thread;
        InternalTask internalTask;

        public ThreadTask(Thread thread, InternalTask internalTask) {
            this.thread = thread;
            this.internalTask = internalTask;
        }
    }
}


第九部分:测试

import java.util.concurrent.TimeUnit;

public class Test {
    public static void main(String[] args){
        ThreadPoolTest.test();
    }
}

class ThreadPoolTest{
    public static void test(){
        final ThreadPool threadPool = new BasicThreadPool(2,6,4,1000);

        for (int i = 0; i < 20; i++) {
            threadPool.execute(()->{
                try{
                    TimeUnit.SECONDS.sleep(10);
                    System.out.println(Thread.currentThread().getName()+
                        " is running and done.");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }

        while (true) {
            System.out.println("getActiveCount:"+threadPool.getActiveCount());
            System.out.println("getQueueSize:"+threadPool.getQueueSize());
            System.out.println("getCoreSize:"+threadPool.getCoreSize());
            System.out.println("getMaxSize:"+threadPool.getMaxSize());
            System.out.println("=======================================");
            try {
                TimeUnit.SECONDS.sleep(5);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

原文地址:https://www.cnblogs.com/junjie2019/p/10591541.html

时间: 2024-10-09 18:54:48

线程池原理以及自定义线程池的相关文章

池化技术——自定义线程池

目录 池化技术--自定义线程池 1.为什么要使用线程池? 1.1.池化技术的特点: 1.2.线程池的好处: 1.3.如何自定义一个线程池 2.三大方法 2.1.单个线程的线程池方法 2.2.固定的线程池的大小的方法 2.3.可伸缩的线程池的方法 2.4.完整的测试代码为: 3.为什么要自定义线程池?三大方法创建线程池的弊端分析 4.七大参数 5.如何手动的去创建一个线程池 6.四种拒绝策略 6.1.会抛出异常的拒绝策略 6.2.哪来的去哪里拒绝策略 6.3.丢掉任务拒绝策略 6.4.尝试竞争拒绝

Android线程管理之ThreadPoolExecutor自定义线程池(三)

前言: 上篇主要介绍了使用线程池的好处以及ExecutorService接口,然后学习了通过Executors工厂类生成满足不同需求的简单线程池,但是有时候我们需要相对复杂的线程池的时候就需要我们自己来自定义一个线程池,今天来学习一下ThreadPoolExecutor,然后结合使用场景定义一个按照线程优先级来执行的任务的线程池. ThreadPoolExecutor ThreadPoolExecutor线程池用于管理线程任务队列.若干个线程. 1.)ThreadPoolExecutor构造函数

使用wait()和notifyAll()方法自定义线程池

声明: 1.该篇只是提供一种自定义线程池的实现方式,可能性能.安全等方面需要优化: 2.该篇自定义线程池使用的是wait()和notifyAll()方法,也可以使用Lock结合Condition来实现: 3.该篇力求使用简单的方式呈现,如有错误之处,欢迎指正,在此表示感谢. 概述 自定义线程池三要素包括: 1.存储线程的容器(或叫线程池).该容器可使用数组或链表,容器中存放执行线程,本篇使用链表实现. 2.执行线程(或叫执行器).具体执行的线程. 3.执行任务.执行线程需要执行的具体任务. 代码

12.ThreadPoolExecutor线程池原理及其execute方法

jdk1.7.0_79  对于线程池大部分人可能会用,也知道为什么用.无非就是任务需要异步执行,再者就是线程需要统一管理起来.对于从线程池中获取线程,大部分人可能只知道,我现在需要一个线程来执行一个任务,那我就把任务丢到线程池里,线程池里有空闲的线程就执行,没有空闲的线程就等待.实际上对于线程池的执行原理远远不止这么简单. 在Java并发包中提供了线程池类——ThreadPoolExecutor,实际上更多的我们可能用到的是Executors工厂类为我们提供的线程池:newFixedThread

【线程池原理】线程池的原理及实现

前言 线程是稀缺资源,如果被无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,合理的使用线程池对线程进行统一分配.调优和监控,有如下好处 1.降低资源消耗 2.提高响应速度 3.提高线程的可管理性 java1.5中引入的Executor框架把任务的提交和执行进行解耦,只需要定义好任务,然后提交给线程池,而不用关心该任务是如何执行.被哪个线程执行,以及什么时候执行. 一.ThreadPoolExecutor ThreadPoolExecutor是线程池的工厂类,通过它可以快速初始化一个符合业

JAVA线程池原理与源码分析

1.线程池常用接口介绍 1.1.Executor public interface Executor { void execute(Runnable command); } 执行提交的Runnable任务.其中的execute方法在将来的某个时候执行给定的任务,该任务可以在新线程.池化线程或调用线程中执行,具体由Executor的实现者决定. 1.2.ExecutorService ExecutorService继承自Executor,下面挑几个方法介绍: 1.2.1.shutdown() vo

Java线程池之线程池原理

1 /** 2 * 线程池原理 3 */ 4 public class ThreadPoolExecutor { 5 6 //大部分线程池都试调用的ThreadPoolExecutor这个类 7 //如果你想自定义线程池,创建一个ThreadPoolExecutor对象,传入参数即可 8 9 } 原文地址:https://www.cnblogs.com/mxh-java/p/12246501.html

java线程池原理及实现方式

线程池的定义 线程池是一种多线程处理形式,处理过程中将任务添加到队列,然后在创建线程后自动启动这些任务.线程池线程都是后台线程 为什么要使用线程池 1.减少在创建和销毁线程上所花的时间以及系统资源的开销 2.在一个 JVM 里创建太多的线程可能会导致系统由于过度消耗内存而用完内存或"切换过度".为了防止资源不足,服务器应用程序需要一些办法来限制任何给定时刻处理的请求数目. 线程池组成部分 1.线程池管理器(ThreadPoolManager):用于创建并管理线程池,包括 创建线程池,销

java多线程(四)-自定义线程池

当我们使用 线程池的时候,可以使用 newCachedThreadPool()或者 newFixedThreadPool(int)等方法,其实我们深入到这些方法里面,就可以看到它们的是实现方式是这样的. 1 public static ExecutorService newCachedThreadPool() { 2 return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 3 60L, TimeUnit.SECONDS, 4 new Synchro