线程池本质是一个生产者-消费者模式,一边维护一些线程执行任务,一边由主线程添加一些任务。现在我们抛弃源码中一些繁杂的状态判断,自己写一个线程池。
public class poolT { //可能频繁增删任务,链表队列效率较高 private final BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<Runnable>(); private final HashSet<Work> workers = new HashSet<Work>(); private static int num = 3; public poolT(int num) { this.num = num; for (int i = 0; i < num; i++) { Work w = new Work(); w.start(); workers.add(w); } } public void addWork(Runnable r) { workQueue.add(r); } public void close() throws Exception { while (!workQueue.isEmpty()) { Thread.sleep(500); } for (Work work : workers) { // 通知正在运行的结束 work.setDrop(); // 强制结束还在等待的 if (work.getState() == Thread.State.WAITING) { work.interrupt(); } } Thread.sleep(2000); for (Work work : workers) { System.out.println(work.getName() + "状态:" + work.getState()); } } // 内部线程封装 private class Work extends Thread { Runnable r = null; // 结束线程标志位 private boolean hasRunning = true; public void setDrop() { this.hasRunning = false; } public void run() { try { while (hasRunning || !workQueue.isEmpty()) { // 阻塞线程执行 r = workQueue.take(); if (r != null) { r.run(); } } } catch (Exception e) { e.printStackTrace(); } } } public static void main(String[] args) throws Exception { poolT p = new poolT(4); for (int i = 0; i < 2; i++) { Runnable newRun = new Runnable() { @Override public void run() { try { Thread.sleep(1000); System.out.println(Thread.currentThread().getName() + "运行任务;"); } catch (InterruptedException e) { e.printStackTrace(); } } }; p.addWork(newRun); } p.close(); System.out.println("主程序完毕"); } }
这里面我使用了一个阻塞队列,当任务添加时,由队列随机选取一个空闲线程进行处理,没有任务时,进行阻塞。
当然也可以不用阻塞队列,不过需要自己进行同步
public class MyThreadPool { List<Runnable> taskList = new LinkedList<Runnable>(); private List<MyThread> threadList = new LinkedList<MyThread>(); private static MyThreadPool threadPool; public MyThreadPool(int num) { for (int i = 0; i < num; i++) { threadList.add(new MyThread()); } for (MyThread thread : threadList) { thread.start(); } } public void destroy() { while (!taskList.isEmpty()) {// 如果还有任务没执行完成,就先睡会吧 try { Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } } // 工作线程停止工作,且置为null for (MyThread thread : threadList) { thread.setDistroy(); } } public void execute(Runnable run) { synchronized (taskList) { taskList.add(run); taskList.notify(); } } private class MyThread extends Thread { public boolean hasRun = true; private void setDistroy() { this.hasRun = false; } @Override public void run() { while (hasRun) { Runnable r = null; System.out.println(Thread.currentThread().getName() + "is running"); synchronized (taskList) { if (taskList.isEmpty() && hasRun) { try { taskList.wait(20); } catch (InterruptedException e) { e.printStackTrace(); } } else { r = taskList.remove(0); } } if (r != null) { r.run(); } } } } public static void main(String[] args) throws Exception { // ExecutorService excutor=Executors.newFixedThreadPool(3); MyThreadPool pool =new MyThreadPool(4); pool.execute(new Runnable() { @Override public void run() { try { Thread.sleep(500); System.out.println("任务一"); } catch (InterruptedException e) { e.printStackTrace(); } } }); pool.execute(new Runnable() { @Override public void run() { try { Thread.sleep(500); System.out.println("任务贰"); } catch (InterruptedException e) { e.printStackTrace(); } } }); System.out.println("End"); pool.destroy(); } }
参考:http://blog.csdn.net/hsuxu/article/details/8985931
时间: 2024-11-07 14:05:00