在没有看不论什么代码之前首先想一下线程池应该有哪几部分:
- 任务队列
- 线程
任务队列非常好办。直接用堵塞队列就能够了:BlockingQueue<Runnable> workQueue。而线程是用来运行任务的,那么理所当然应该是不断地从任务队列中取出任务来运行,我们来看ThreadPoolExecutor中的Worker的实现:
private final class Worker implements Runnable { private final ReentrantLock runLock = new ReentrantLock(); private Runnable firstTask; volatile long completedTasks; Thread thread; // 运行task private void runTask(Runnable task) { final ReentrantLock runLock = this.runLock; runLock.lock(); try { // 检查线程池的状态,推断是否中断 if (runState < STOP && Thread.interrupted() && runState >= STOP) thread.interrupt(); boolean ran = false; beforeExecute(thread, task);// 运行任务前的操作 try { task.run(); // 任务開始运行 ran = true; afterExecute(task, null);// 运行任务后的操作 ++completedTasks; } catch (RuntimeException ex) { if (!ran) afterExecute(task, ex);// 运行任务后的操作 throw ex; } } finally { runLock.unlock(); } } // 线程启动之后開始运行 public void run() { try { Runnable task = firstTask; firstTask = null; // 从线程池中取出Runnable然后运行 while (task != null || (task = getTask()) != null) { runTask(task); task = null; } } finally { workerDone(this); } } }
通过推断runLock是不是锁上的状态就能够推断Worker是否在运行任务了。当然getTask失败的时候,Worker的任务也就结束了,我们有时候会希望线程去等一段时间,假设这段时间里面没有任务到达线程才退出。
这个在堵塞队列中有线程的方法。在getTask能够看到,例如以下:
r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);
在拿到任务的时候,假设如今线程池还非常小,那么如今不须要将任务放到堵塞队列里面去,直接创建一个线程执行就能够了。在这条路走不通的时候才会“绕弯路”来做。由于堵塞队列也是有容量限制的,那么在尝试将任务放进去的时候可能会失败。
假设让我写的话就不会尝试了,而是直接调用堵塞方法让主线程堵塞在这里,这样事实上并不好:
- 不够灵活
- 调用的线程被堵塞了,浪费资源,事实上它是能够用来运行任务的
假设自己要实现reject策略的话实现以下接口就能够了:
public interface RejectedExecutionHandler { void rejectedExecution(Runnable r, ThreadPoolExecutor executor); }
有可能出现一种情况:插入任务之后线程池的状态改变了。那么也要保证该任务可以被处理(并不一定是任务被完毕。也可能是拒绝),那么以下来看提交任务的完整逻辑:
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); // 尝试创建线程并运行任务 if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) { // 尝试将线程放入堵塞队列 if (runState == RUNNING && workQueue.offer(command)) { if (runState != RUNNING || poolSize == 0) // 确保任务会被处理 ensureQueuedTaskHandled(command); } else if (!addIfUnderMaximumPoolSize(command))// 假设如今能创建一个线程运行该任务的话,就不要拒绝它了。。 。 reject(command); } }
能够发如今线程池执行期间改动各种阀值都是能够起到作用的。假设在spring中想用简单的线程池的话没有必要自己写一个,直接用现成的配置一个就能够了,比方:
<bean id="taskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor"> <property name="corePoolSize" value="5" /> <property name="maxPoolSize" value="10" /> <property name="queueCapacity" value="25" /> </bean>
---------- ---------- ---------- ---------- END ---------- ---------- ---------- ----------
时间: 2024-10-10 22:36:07