之所以今天讨论它,因为在motan的的NettyServer中利用它这个线程池可以作为业务线程池,它定制了一个自己的线程池。当然还是基于jdk中的ThreadExecutor中的构造方法和execute方法,然后在外边包装一层。
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
1 public void execute(Runnable command) { 2 if (command == null) 3 throw new NullPointerException(); 4 /* 5 * Proceed in 3 steps: 6 * 7 * 1. If fewer than corePoolSize threads are running, try to 8 * start a new thread with the given command as its first 9 * task. The call to addWorker atomically checks runState and 10 * workerCount, and so prevents false alarms that would add 11 * threads when it shouldn‘t, by returning false. 12 * 13 * 2. If a task can be successfully queued, then we still need 14 * to double-check whether we should have added a thread 15 * (because existing ones died since last checking) or that 16 * the pool shut down since entry into this method. So we 17 * recheck state and if necessary roll back the enqueuing if 18 * stopped, or start a new thread if there are none. 19 * 20 * 3. If we cannot queue task, then we try to add a new 21 * thread. If it fails, we know we are shut down or saturated 22 * and so reject the task. 23 */ 24 int c = ctl.get(); 25 if (workerCountOf(c) < corePoolSize) { 26 if (addWorker(command, true)) 27 return; 28 c = ctl.get(); 29 } 30 if (isRunning(c) && workQueue.offer(command)) { 31 int recheck = ctl.get(); 32 if (! isRunning(recheck) && remove(command)) 33 reject(command); 34 else if (workerCountOf(recheck) == 0) 35 addWorker(null, false); 36 } 37 else if (!addWorker(command, false)) 38 reject(command); 39 }
ThreadExecutor中的execute方法
从execute 方法的注释清晰得知,传统线程加入线程池执行过程分3步
小于等于Coresize: 创建线程之行
大于CoreSize 加入队列
队列满且小于maxSize 有空闲线程使用空闲线程执行,没有的话,创建线程执行
大于maxSize 拒绝策略执行
现在需要按照如下方式改造
1 /** 2 * <pre> 3 * 4 * 代码和思路主要来自于: 5 * 6 * tomcat : 7 * org.apache.catalina.core.StandardThreadExecutor 8 * 9 * java.util.concurrent 10 * threadPoolExecutor execute执行策略: 优先offer到queue,queue满后再扩充线程到maxThread,如果已经到了maxThread就reject 11 * 比较适合于CPU密集型应用(比如runnable内部执行的操作都在JVM内部,memory copy, or compute等等) 12 * 13 * StandardThreadExecutor execute执行策略: 优先扩充线程到maxThread,再offer到queue,如果满了就reject 14 * 比较适合于业务处理需要远程资源的场景 15 * 16 * </pre>
具体的方式自己改造一个队列,在队列入队的方式下功夫。
时间: 2024-10-11 05:24:48