线程池简单实现

实现了一个简化版的线程池。

实现线程池的关键有两个:一是阻塞队列,用于任务的存取,二是内部的线程对象如何持续性的执行任务,并在空闲时被回收。

线程池代码:

  1 package learnConcurrent;
  2
  3 import java.util.ArrayList;
  4 import java.util.Collection;
  5 import java.util.LinkedList;
  6 import java.util.List;
  7 import java.util.concurrent.ArrayBlockingQueue;
  8 import java.util.concurrent.BlockingQueue;
  9 import java.util.concurrent.Callable;
 10 import java.util.concurrent.ExecutionException;
 11 import java.util.concurrent.ExecutorService;
 12 import java.util.concurrent.Future;
 13 import java.util.concurrent.TimeUnit;
 14 import java.util.concurrent.TimeoutException;
 15 import java.util.concurrent.atomic.AtomicBoolean;
 16 import java.util.concurrent.atomic.AtomicInteger;
 17 import java.util.concurrent.locks.ReentrantLock;
 18
 19 public class MyThreadPool implements ExecutorService{
 20     //线程队列
 21     private List<Worker> workers;
 22     //任务队列
 23     private BlockingQueue<Runnable> rQueue;
 24     //线程池核心大小
 25     private int corePoolSize;
 26     //线程池最大大小
 27     private int maxPoolSize;
 28     //空闲线程最长存活时间
 29     private int keepAliveTime = 5;
 30
 31     private static final int ALIVE = 0;
 32
 33     private static final int SHUTDOMN = 1;
 34
 35     private AtomicInteger state = new AtomicInteger(ALIVE);
 36
 37     private ReentrantLock lock = new ReentrantLock();
 38
 39     public MyThreadPool(int corePoolSize, int maxPoolSize){
 40         this.corePoolSize = corePoolSize;
 41         this.maxPoolSize = maxPoolSize;
 42
 43         this.workers = new LinkedList<Worker>();
 44         //阻塞队列,最大容量为maxPoolSize
 45         this.rQueue = new ArrayBlockingQueue<Runnable>(maxPoolSize, true);
 46     }
 47
 48     @Override
 49     public void execute(Runnable command) {
 50         //FIXME size在获取时和判断时 可能发生改变
 51         lock.lock();
 52         int size = workers.size();
 53         if(size < corePoolSize){//当线程池线程数小于核心数量时,增加线程
 54             addWorker();
 55         }else if(size < maxPoolSize && !rQueue.isEmpty()){//当线程大于核心数量且任务队列中任务排队时,增加线程
 56             addWorker();
 57         }
 58         lock.unlock();
 59
 60         if(!isShutdown()){
 61             rQueue.offer(command);
 62         }
 63     }
 64
 65     @Override
 66     public void shutdown() {
 67         //关闭线程池的简单实现,设置状态让任务队列不在接受任务,线程也会因为超时被回收
 68         //缺点时空闲的线程资源得不到立即释放
 69         state.set(SHUTDOMN);
 70     }
 71
 72     /**
 73      * 立即停止线程池,试图停止正在活动的线程,返回还在等待的任务列表
 74      */
 75     @Override
 76     public List<Runnable> shutdownNow() {
 77         if(isShutdown())
 78             return null;
 79         state.set(SHUTDOMN);
 80         lock.lock();
 81         List<Runnable> restRunnable = new ArrayList<Runnable>();
 82         while(!rQueue.isEmpty()){
 83             restRunnable.add(rQueue.poll());
 84         }
 85         for(Worker w : workers){
 86             w.interrupt();
 87         }
 88         lock.unlock();
 89         return restRunnable;
 90     }
 91
 92     @Override
 93     public boolean isShutdown() {
 94         return state.get() == ALIVE;
 95     }
 96
 97     @Override
 98     public boolean isTerminated() {
 99         return isShutdown() && rQueue.isEmpty();
100     }
101
102     @Override
103     public boolean awaitTermination(long timeout, TimeUnit unit)
104             throws InterruptedException {
105         // TODO Auto-generated method stub
106         return false;
107     }
108
109     @Override
110     public <T> Future<T> submit(Callable<T> task) {
111         // TODO Auto-generated method stub
112         return null;
113     }
114
115     @Override
116     public <T> Future<T> submit(Runnable task, T result) {
117         // TODO Auto-generated method stub
118         return null;
119     }
120
121     @Override
122     public Future<?> submit(Runnable task) {
123         return null;
124     }
125
126     @Override
127     public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
128             throws InterruptedException {
129         return null;
130     }
131
132     @Override
133     public <T> List<Future<T>> invokeAll(
134             Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
135             throws InterruptedException {
136         return null;
137     }
138
139     @Override
140     public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
141             throws InterruptedException, ExecutionException {
142         return null;
143     }
144
145     @Override
146     public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
147             long timeout, TimeUnit unit) throws InterruptedException,
148             ExecutionException, TimeoutException {
149         return null;
150     }
151
152     private Runnable getTask(){
153         Runnable r = null;
154         try {
155             r = rQueue.poll(keepAliveTime, TimeUnit.SECONDS);
156         } catch (InterruptedException e) {
157             // TODO Auto-generated catch block
158             e.printStackTrace();
159         }
160         return r;
161     }
162
163     private void addWorker(){
164         Worker w = new Worker();
165         w.start();
166         lock.lock();
167         workers.add(w);
168         lock.unlock();
169     }
170
171     private void removeWorker(Worker w){
172         lock.lock();
173         workers.remove(w);
174         lock.unlock();
175     }
176
177     class Worker extends Thread{
178
179         private AtomicBoolean isAlive = new AtomicBoolean(true);
180
181         private Runnable task;
182
183
184         @Override
185         public void run() {
186             while(isAlive.get()){
187                 //阻塞一定时间,超时则回收该线程
188                 task = getTask();
189                 if(task != null){
190                     task.run();
191                 }else{
192                     isAlive.set(false);
193
194                 }
195                 task = null;
196             }
197             System.out.println("remove worker");
198             removeWorker(this);
199         }
200
201     }
202
203
204 }

测试代码:

 1 package learnConcurrent;
 2
 3
 4 public class ThreadPoolTest {
 5     static int taskNo = 0;
 6     public static void main(String[] args) throws InterruptedException {
 7         MyThreadPool pool = new MyThreadPool(2, 5);
 8
 9         for(int i=0; i< 50; i++){
10             Task task = new Task(taskNo++);
11             pool.execute(task);
12             Thread.sleep((int)(Math.random() * 1000));
13         }
14
15     }
16
17 }
18
19 class Task implements Runnable{
20     String str;
21     public Task(int taskNo){
22         str = "TaskNo:" + taskNo;
23     }
24     @Override
25     public void run() {
26         System.out.println(str + " start work ");
27         //DO SOMETHING
28         try {
29             Thread.sleep((int)(Math.random() * 1000));
30         } catch (InterruptedException e) {
31             // TODO Auto-generated catch block
32             e.printStackTrace();
33         }
34
35         System.out.println(str + " done ");
36     }
37
38 }

虽然是继承了ExecutorService对象,但是只实现了几个接口,设计上也可能有未考虑到的问题。

测试代码也很简陋,仅供参考。

时间: 2024-08-27 07:20:50

线程池简单实现的相关文章

基于ThreadPoolExecutor,自定义线程池简单实现

一.线程池作用 在上一篇随笔中有提到多线程具有同一时刻处理多个任务的特点,即并行工作,因此多线程的用途非常广泛,特别在性能优化上显得尤为重要.然而,多线程处理消耗的时间包括创建线程时间T1.工作时间T2.销毁线程时间T3,创建和销毁线程需要消耗一定的时间和资源,如果能够减少这部分的时间消耗,性能将会进一步提高,线程池就能够很好解决问题.线程池在初始化时会创建一定数量的线程,当需要线程执行任务时,从线程池取出线程,当任务执行完成后,线程置回线程池成为空闲线程,等待下一次任务.JDK1.5提供了一个

java threadPool 线程池简单分析

java 1.5 concurrent 工具包中提供了五类线程池的创建: ExecutorService executor=Executors.newCachedThreadPool(); ExecutorService cacheExecutor=Executors.newCachedThreadPool(new TestThreadFactory()); ExecutorService fixExecutor=Executors.newFixedThreadPool(10); Executo

java线程及操作实例,线程池简单例子

java io.集合.线程.字符串.gc.jvm可谓是java中的最基本的知识,尤其是线程操作复杂,相应难懂,要想java基础知识扎实,上面提到的几个方面的知识点都要精通,这样方可以称自己掌握java方面基础知识. 总结一下java线程知识,平时接触过线程,尤其是在android开发中,线程可谓是无处不在,稍有不注意就会报错.在java中线程也是无处不在,main就是一个线程,只不过被包装好了,一般接触不到. 我的无数次的复习经历告诉我,学习知识最快,最深刻的方法就是从解题开始,不要先看概念,遇

用阻塞队列和线程池简单实现生产者和消费者场景

本例子仅仅是博主学习阻塞队列和后的一些小实践,并不是真正的应用场景! 生产者消费者场景是我们应用中最常见的场景,我们可以通过ReentrantLock的Condition和对线程进行wait,notify同通信来实现生产者和消费者场景,前者可以实现多生产者和多消费者模式,后者仅可以实现一生产者,一消费者模式. 今天我们就利用阻塞队列来实现下生产者和消费者模式(里面还利用了线程池). 看过我关于阻塞队列博文的朋友已经知道,阻塞队列其实就是由ReentrantLock实现的! 场景就不描述了,为简单

JAVA 线程与线程池简单小结

JAVA线程创建方式: 1.继承Thread类创建线程类 继承Thread类并重写该类的run方法,该un方法代表了线程要完成的任务. 2.通过Runnable接口创建线程类 实现runnable接口,重写该接口的run()方法,该run()方法的方法体同样是该线程的线程执行体.将Runnable实现类实例作为Thread的target来创建Thread对象,该Thread对象才是真正的线程对象. 3.通过Callable和Future创建线程 (1)实现Callable接口,重写call()方

线程池简单代码

condition.h #ifndef _CONDITION_H_#define _CONDITION_H_ #include <pthread.h> //封装一个互斥量和条件变量作为状态typedef struct condition{ pthread_mutex_t pmutex; pthread_cond_t pcond;}condition_t; //对状态的操作函数int condition_init(condition_t *cond);int condition_lock(con

java 线程池简单例子

package com.hra.riskprice; import com.hra.riskprice.SysEnum.Factor_Type; import com.hra.riskprice.pojo.RskFactor; import com.hra.riskprice.service.impl.RskFactorBulkMapper; import org.springframework.boot.SpringApplication; import org.springframework

最简单的C线程池简单Linux C

 http://weheartit.com/chengzhidun/collections/82197144 2015-01-14 http://weheartit.com/youdongfan/collections/82197139 2015-01-14 http://weheartit.com/meishici/collections/82197151 2015-01-14 http://weheartit.com/dengpanmeng/collections/82197156 20

python学习第十课续 :线程池

线程分步走 t=threading.Thread(target=fun,args=()) t.start() 执行流程:         threading.Thread(target=fun,args=()) à           self.__target = target          self.__name = str(name or _newname())          self.__args = args         t.start  à           _star