第一部分:对线程池的需求分析
/* 8.1 线程池原理 一个完整的线程池应该具备如下要素: 1.任务队列:用于缓存提交的任务 2.线程数量管理功能:可通个三个参数实现: init:创建时初始的线程数量 max:线程池自动扩充时最大的线程数量 core:空闲时但是需要释放线程,但是也要维护一定数量的活跃线程 3.任务拒绝策略: 4.线程工程:主要用于个性化定制线程,比如设置守护线程、设置线程名称等 5.QueueSize:任务队列主要存放Runnable,防止内存溢出,需要有limit数量限制 6.keepedalive时间:该时间主要决定线程各个重要参数自动维护的时间间隔 */
第二部分:对线程池运行过程的简单分析
/* main:-------P---------------------------------------------- | pool: |-----A----B---C------------------------------- | | | A: |----|---|------------------------------- | | B: |---|------------------------------- | C: |------------------------------- */
第三部分:接口定义
ThreadPool接口:
public interface ThreadPool { //提交任务到线程池 void execute(Runnable runnable); //关闭线程池 void shutdown(); //获取线程池的初始化大小 int getInitSize(); //获取线程池最大的线程数 int getMaxSize(); //获取线程池的核心线程数量 int getCoreSize(); //获取线程池中活跃线程数量 int getActiveCount(); //获取线程池中用于缓存任务队列的大小 int getQueueSize(); //查看线程池是否已经别shutdown boolean isShutdown(); }
RunnableQueue接口:
//任务多列,主要用于缓存提交到线程池中的任务 public interface RunnableQueue{ //当有新的任务捡来时首先会offer到队列中 void offer(Runnable runnable); //工作线程通过take方法获取Runnable Runnable take() throws InterruptedException; //获取任务队列中任务的数量 int size(); }
DenyPolicy接口:
@FunctionalInterface public interface DenyPolicy{ void reject(Runnable runnable,ThreadPool threadPool); }
第四部分:定义异常
RunnableDenyException:
public class RunnableDenyException extends RuntimeException{ public RunnableDenyException(String msg){ super(msg); } }
第五部分:实现DenyPolicy
——实际上采用的是在接口中直接实现的
@FunctionalInterface public interface DenyPolicy{ void reject(Runnable runnable,ThreadPool threadPool); //该策略会直接将任务丢弃 class DiscardDenyPolicy implements DenyPolicy{ @Override public void reject(Runnable runnable, ThreadPool threadPool) { //do nothing } } //该策略会向任务提交这抛出异常 class AbortDenyPolicy implements DenyPolicy{ @Override public void reject(Runnable runnable, ThreadPool threadPool) { throw new RunnableDenyException("The runnable "+runnable+" will be abort."); } } //该策略会使任务在提交者所在的线程中执行任务 class RunnerDenyPlolicy implements DenyPolicy{ @Override public void reject(Runnable runnable, ThreadPool threadPool) { runnable.run(); } } }
第六部分:实现InternalTask
public class InternalTask implements Runnable{ private final RunnableQueue runnableQueue; private volatile boolean running = true; public InternalTask(RunnableQueue runnableQueue){ this.runnableQueue=runnableQueue; } /* 对run方法的分析: 如果当前任务为running且没有被中断,则将不断地从queue中获取Runnable 然后执行run方法。让这个任务停止的方法是: 1.在pool线程中,调用该线程对象的interrupt方法 2.在pool线程中,调用该InternalTask对象的stop方法 ——这也就解释了为什么在BasicThreadPool中 将InternalTask对象和执行InternalTask对象 的线程组合在一起保存。 */ @Override public void run() { while(running&&!Thread.currentThread().isInterrupted()){ try { Runnable task = runnableQueue.take(); task.run(); } catch (InterruptedException e) { running = false; break; } } } public void stop(){ this.running = false; } }
第七部分:实现RunableQueue
import java.util.LinkedList; public class LinkedRunnableQueue implements RunnableQueue{ private final int limit; private final DenyPolicy denyPolicy; private final ThreadPool threadPool; /* 这个地方有个很LinkedList的用法经验: addLast():从队列的尾部添加一个元素 removeFirst():从队列的头部拿出一个元素 */ private final LinkedList<Runnable> runnableList = new LinkedList<>(); public LinkedRunnableQueue(int limit, DenyPolicy denyPolicy, ThreadPool threadPool) { this.limit = limit; this.denyPolicy = denyPolicy; this.threadPool = threadPool; } @Override public void offer(Runnable runnable) { synchronized (runnableList){ if(runnableList.size()>=limit){ denyPolicy.reject(runnable,threadPool); }else { runnableList.addLast(runnable); runnableList.notifyAll(); } } } @Override public Runnable take() throws InterruptedException { synchronized (runnableList) { while (runnableList.isEmpty()) { try { /* 如果任务队列中没有可执行的任务,则当前线程将会被挂起, 所以在offer中,当队列中添加成员后,需要调用notifyAll */ runnableList.wait(); } catch (InterruptedException e) { /* 写法分析: 其实这个地方可以不用这么写,我就写一个wait,然后在方法上将其抛出去, 这也是允许的。 */ throw e; } } return runnableList.removeFirst(); } } @Override public int size() { synchronized (runnableList){ return runnableList.size(); } } }
第八部分:实现ThreadPool
import java.util.ArrayDeque; import java.util.Queue; import java.util.concurrent.TimeUnit; public class BasicThreadPool extends Thread implements ThreadPool { private final static DenyPolicy DEFAULT_DENY_POLICY = new DenyPolicy.DiscardDenyPolicy(); private final static ThreadFactory DEFAULT_THREAD_FACTORY = new ThreadFactory.DefaultThreadFactory(); private int initSize; private int maxSize; private int coreSize; private int activeCount; private final ThreadFactory threadFactory; private final RunnableQueue runnableQueue; private final long keepAliveTime; private final TimeUnit timeUnit; private volatile boolean isShutdown = false; /* 稍微注意下这些容器的写法 */ private final Queue<ThreadTask> threadQueue=new ArrayDeque<>(); public BasicThreadPool(int initSize, int maxSize, int coreSize,int queueSize){ this(initSize,maxSize,coreSize,DEFAULT_THREAD_FACTORY, queueSize,DEFAULT_DENY_POLICY,10,TimeUnit.SECONDS); } public BasicThreadPool(int initSize, int maxSize, int coreSize, ThreadFactory threadFactory, int queueSize, DenyPolicy denyPolicy, long keepAliveTime, TimeUnit timeUnit) { this.initSize = initSize; this.maxSize = maxSize; this.coreSize = coreSize; this.threadFactory = threadFactory; this.runnableQueue=new LinkedRunnableQueue(queueSize,denyPolicy,this); this.keepAliveTime = keepAliveTime; this.timeUnit = timeUnit; this.init(); } //创建这个线程池的线程,执行这个方法 private void init(){ start(); for (int i = 0; i < initSize; i++) { newThread(); } } private void newThread() { /* 如果Pool本身只保存internalTask,则InternalTask需要集成许多对 任务的管理,但是它终究只是一个Runnable,所以并不现实 */ InternalTask internalTask = new InternalTask(runnableQueue); Thread thread = this.threadFactory.createThread(internalTask); ThreadTask threadTask = new ThreadTask(thread,internalTask); threadQueue.offer(threadTask); this.activeCount++; thread.start(); } //从线程池中移除某个线程 private void removeThread(){ ThreadTask threadTask = threadQueue.remove(); /* 当调用stop方法是,循环将会不被执行,线程也就自然的结束了 生命周期。 */ threadTask.internalTask.stop(); this.activeCount--; } //维护线程数量,比如扩容、回收等工作 public void run(){ /* 只有调用了Pool的shutdown方法,或者对其中断了,才后导致其 退出循环。 ——很奇怪的一点哦,我直接把Pool线程给退出了,Pool 线程创建的线程我不进行管理了么。 */ while(!isShutdown&&!isInterrupted()){ try{ timeUnit.sleep(keepAliveTime); } catch (InterruptedException e) { isShutdown=true; break; } synchronized (this){ //多次确认,是否关闭这个线程了。。。 if(isShutdown) break; //当线程中有任务尚未处理,并且activeCount<coreSize则继续扩容 if(runnableQueue.size()>0&&activeCount<coreSize){ for (int i = initSize; i < coreSize; i++) { newThread(); } /* 书中说continue的目的是,不想让线程的扩容直接到达maxsize。 这个continue会导致while循环重新判断,从而导致该线程睡眠 keepAliveTime时间。 */ continue; } //当前队列中有任务尚未处理,并且activeCount<maxSize则kuorong if (runnableQueue.size() > 0 && activeCount < maxSize) { for (int i = coreSize; i < maxSize; i++) { newThread(); } } //如果任务队列中没有任务,则需要回收,回收至coreSize即可 if(runnableQueue.size()==0&&activeCount>coreSize){ for (int i = coreSize; i < activeCount; i++) { removeThread(); } } } } } @Override public void execute(Runnable runnable) { if(this.isShutdown){ throw new IllegalStateException("The thread pool is destory"); } this.runnableQueue.offer(runnable); } /* 问题还是存在着,而且现在更尴尬了,如果pool被interrupt了,那么 shutdown清理线程部分就永远都不会执行了。 */ @Override public void shutdown() { if(isShutdown) return; isShutdown=true; threadQueue.forEach(threadTask -> { threadTask.internalTask.stop(); threadTask.thread.interrupt(); }); } @Override public int getInitSize() { if(this.isShutdown){ throw new IllegalStateException("The thread pool is destory"); } return initSize; } @Override public int getMaxSize() { if(this.isShutdown){ throw new IllegalStateException("The thread pool is destory"); } return maxSize; } @Override public int getCoreSize() { if(this.isShutdown){ throw new IllegalStateException("The thread pool is destory"); } return coreSize; } @Override public int getActiveCount() { synchronized (this){ return this.activeCount; } } @Override public int getQueueSize() { if(this.isShutdown){ throw new IllegalStateException("The thread pool is destory"); } return runnableQueue.size(); } @Override public boolean isShutdown() { return this.isShutdown; } private static class ThreadTask{ Thread thread; InternalTask internalTask; public ThreadTask(Thread thread, InternalTask internalTask) { this.thread = thread; this.internalTask = internalTask; } } }
第九部分:测试
import java.util.concurrent.TimeUnit; public class Test { public static void main(String[] args){ ThreadPoolTest.test(); } } class ThreadPoolTest{ public static void test(){ final ThreadPool threadPool = new BasicThreadPool(2,6,4,1000); for (int i = 0; i < 20; i++) { threadPool.execute(()->{ try{ TimeUnit.SECONDS.sleep(10); System.out.println(Thread.currentThread().getName()+ " is running and done."); } catch (InterruptedException e) { e.printStackTrace(); } }); } while (true) { System.out.println("getActiveCount:"+threadPool.getActiveCount()); System.out.println("getQueueSize:"+threadPool.getQueueSize()); System.out.println("getCoreSize:"+threadPool.getCoreSize()); System.out.println("getMaxSize:"+threadPool.getMaxSize()); System.out.println("======================================="); try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); } } } }
原文地址:https://www.cnblogs.com/junjie2019/p/10591541.html
时间: 2024-10-09 18:54:48