先上原理图:
上代码之前,要先补充一下线程池构造的核心几个点
- 线程池里的核心线程数与最大线程数
- 线程池里真正工作的线程
worker
- 线程池里用来存取任务的队列
BlockingQueue
- 线程中的任务
task
本例实现简化了一些,只实现了BlockingQueue存放任务,然后每个worker取任务并执行,下面看代码
首先定义一个线程池ThreadExcutor
class ThreadExcutor{ //创建 private volatile boolean RUNNING = true; //所有任务都放队列中,让工作线程来消费 private static BlockingQueue<Runnable> queue = null; private final HashSet<Worker> workers = new HashSet<Worker>(); private final List<Thread> threadList = new ArrayList<Thread>(); int poolSize = 0; //池子大小 int currentSize = 0; //创建了多少个线程 boolean shutdown = false; public ThreadExcutor(int poolSize){ this.poolSize = poolSize; queue = new LinkedBlockingQueue<Runnable>(poolSize); } //提交任务给线程池 public void exec(Runnable runnable) { if (runnable == null) throw new NullPointerException(); if(currentSize < poolSize){ //当前工作线程数小于池子大小 addThread(runnable); //创建一个线程来执行任务 }else{ //System.out.println("offer" + runnable.toString() + " " + queue.size()); try { queue.put(runnable); } catch (InterruptedException e) { e.printStackTrace(); } } } public void addThread(Runnable runnable){ currentSize ++; Worker worker = new Worker(runnable); //实例化一个Worker的同时,将runnable任务添加进队列 workers.add(worker); Thread t = new Thread(worker); //创建一个线程,worker本身实现了runnable接口 threadList.add(t); try { t.start(); //开启线程 执行worker的run方法 }catch (Exception e){ e.printStackTrace(); } } public void shutdown() { RUNNING = false; if(!workers.isEmpty()){ for (Worker worker : workers){ worker.interruptIfIdle(); //调用worker成员方法,中断自身所在线程 } } shutdown = true; Thread.currentThread().interrupt(); } //这里留个位置放内部类Worker 。。 }
然后定义一个内部类Worker,这个内部类Worker是用来执行每个任务的,在创建线程池后,往线程里添加任务,每个任务都是由Worker一个一个来启动的。
//工作线程worker class Worker implements Runnable{ public Worker(Runnable runnable){ queue.offer(runnable); } // worker内部类实现了runnable接口,run方法循环从queue队列里取任务执行,直到池子shutdown @Override public void run() { while (RUNNING){ Runnable task = null; if(shutdown == false){ try { task = getTask(); task.run(); } catch (InterruptedException e) { e.printStackTrace(); } } } } public Runnable getTask() throws InterruptedException { return queue.take(); } public void interruptIfIdle() { //idle 空闲的 for (Thread thread :threadList) { System.out.println(thread.getName() + " interrupt"); thread.interrupt(); } } }
首先注意的一点,这个Worker是个内部类,是在线程池内声明的。
exec方法
Worker怎么工作
这个工作线程实例化的时候就先加入一个任务到队列中,也就是说在实例化这个工作线程时,这个工作线程也是一个任务被加入到线程池中。然后就是run方法,这个run方法是线程调start方法生成的线程,而Worker调的run方法并没有生成新的线程。就是一个循环,一直在不停的从队列中取任务,然后执行。可以看到,取队列的方法是take(),这个方法意思如果队列为空了,取不到数据时就阻塞队列。
然后看shutdown()
你每天辛勤的劳动着,突然接收到上面的命令,说活暂时不要接了,先停下来,当你还没搞清楚状况时,接着你的领导又把你开除了,说公司要倒了,你先下岗吧,一会我也得下岗了。这就是shutdown做的事,shutdown必须是主线程才能停止工作线程。
shutdown方法并不是用线程那种强制停止的搞法,而是先用一个标识符告诉工作线程,不要再接任务了。然后通知工作线程,你可以interrupt()
了,当所有的线程停止后记得要把主线程也停掉,这样,一个简单任务的线程池就完成了。
让我们来测试一下:
public class Main{ public static void main(String[] args){ ThreadExcutor excutor = new ThreadExcutor(3); for (int i = 0; i < 3; i++) { excutor.exec(new Runnable() { @Override public void run() { System.out.println("线程 " + Thread.currentThread().getName() + " 在帮我干活"); } }); } excutor.shutdown(); } }
import java.util.*; import java.util.concurrent.*; public class Main{ public static void main(String[] args){ ThreadExcutor excutor = new ThreadExcutor(3); for (int i = 0; i < 1000; i++) { excutor.exec(new Runnable() { @Override public void run() { System.out.println("线程 " + Thread.currentThread().getName() + " 在帮我干活"); } }); } excutor.shutdown(); } } class ThreadExcutor{ //创建 private volatile boolean RUNNING = true; //所有任务都放队列中,让工作线程来消费 private static BlockingQueue<Runnable> queue = null; private final HashSet<Worker> workers = new HashSet<Worker>(); private final List<Thread> threadList = new ArrayList<Thread>(); int poolSize = 0; //池子大小 int currentSize = 0; //创建了多少个线程 boolean shutdown = false; public ThreadExcutor(int poolSize){ this.poolSize = poolSize; queue = new LinkedBlockingQueue<Runnable>(poolSize); } //提交任务给线程池 public void exec(Runnable runnable) { if (runnable == null) throw new NullPointerException(); if(currentSize < poolSize){ //当前工作线程数小于池子大小 addThread(runnable); //创建一个线程来执行任务 }else{ //System.out.println("offer" + runnable.toString() + " " + queue.size()); try { queue.put(runnable); } catch (InterruptedException e) { e.printStackTrace(); } } } public void addThread(Runnable runnable){ currentSize ++; Worker worker = new Worker(runnable); //实例化一个Worker的同时,将runnable任务添加进队列 workers.add(worker); Thread t = new Thread(worker); //创建一个线程,worker本身实现了runnable接口 threadList.add(t); try { t.start(); //开启线程 执行worker的run方法 }catch (Exception e){ e.printStackTrace(); } } public void shutdown() { RUNNING = false; if(!workers.isEmpty()){ for (Worker worker : workers){ worker.interruptIfIdle(); //调用worker成员方法,中断自身所在线程 } } shutdown = true; Thread.currentThread().interrupt(); } //这里留个位置放内部类Worker class Worker implements Runnable{ public Worker(Runnable runnable){ queue.offer(runnable); } // worker内部类实现了runnable接口,run方法循环从queue队列里取任务执行,直到池子shutdown @Override public void run() { while (RUNNING){ Runnable task = null; if(shutdown == false){ try { task = getTask(); task.run(); } catch (InterruptedException e) { e.printStackTrace(); } } } } public Runnable getTask() throws InterruptedException { return queue.take(); } public void interruptIfIdle() { //idle 空闲的 for (Thread thread :threadList) { System.out.println(thread.getName() + " interrupt"); thread.interrupt(); } } } }
http://www.cnblogs.com/wxwall/p/7050698.html
时间: 2024-11-09 02:28:06