自己实现一个简单线程池

先上原理图:

上代码之前,要先补充一下线程池构造的核心几个点

  1. 线程池里的核心线程数与最大线程数
  2. 线程池里真正工作的线程worker
  3. 线程池里用来存取任务的队列BlockingQueue
  4. 线程中的任务task

本例实现简化了一些,只实现了BlockingQueue存放任务,然后每个worker取任务并执行,下面看代码
首先定义一个线程池ThreadExcutor

class ThreadExcutor{

    //创建
    private volatile boolean RUNNING = true;

    //所有任务都放队列中,让工作线程来消费
    private static BlockingQueue<Runnable> queue = null;

    private final HashSet<Worker> workers = new HashSet<Worker>();

    private final List<Thread> threadList = new ArrayList<Thread>();

    int poolSize = 0;     //池子大小

    int currentSize = 0;  //创建了多少个线程

    boolean shutdown = false;

    public ThreadExcutor(int poolSize){
        this.poolSize = poolSize;
        queue = new LinkedBlockingQueue<Runnable>(poolSize);
    }

    //提交任务给线程池
    public void exec(Runnable runnable) {
        if (runnable == null) throw new NullPointerException();
        if(currentSize < poolSize){     //当前工作线程数小于池子大小
            addThread(runnable);        //创建一个线程来执行任务
        }else{
            //System.out.println("offer" +  runnable.toString() + "   " + queue.size());
            try {
                queue.put(runnable);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public void addThread(Runnable runnable){
        currentSize ++;
        Worker worker = new Worker(runnable);        //实例化一个Worker的同时,将runnable任务添加进队列
        workers.add(worker);
        Thread t = new Thread(worker);               //创建一个线程,worker本身实现了runnable接口
        threadList.add(t);
        try {
            t.start();                           //开启线程 执行worker的run方法
        }catch (Exception e){
            e.printStackTrace();
        }

    }

    public void shutdown() {
        RUNNING = false;
        if(!workers.isEmpty()){
            for (Worker worker : workers){
                worker.interruptIfIdle();              //调用worker成员方法,中断自身所在线程
            }
        }
        shutdown = true;
        Thread.currentThread().interrupt();
    }
   //这里留个位置放内部类Worker
    。。
 }
 

然后定义一个内部类Worker,这个内部类Worker是用来执行每个任务的,在创建线程池后,往线程里添加任务,每个任务都是由Worker一个一个来启动的。

//工作线程worker
class  Worker implements Runnable{

        public Worker(Runnable runnable){
            queue.offer(runnable);
        }

        // worker内部类实现了runnable接口,run方法循环从queue队列里取任务执行,直到池子shutdown
        @Override
        public void run() {
            while (RUNNING){
                Runnable task = null;
                if(shutdown == false){
                    try {
                        task = getTask();
                        task.run();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }

            }
        }

        public Runnable getTask() throws InterruptedException {
            return queue.take();
        }

        public void interruptIfIdle() {                            //idle 空闲的
            for (Thread thread :threadList) {
                System.out.println(thread.getName() + " interrupt");
                thread.interrupt();
            }
        }
    }

首先注意的一点,这个Worker是个内部类,是在线程池内声明的。

exec方法

Worker怎么工作

这个工作线程实例化的时候就先加入一个任务到队列中,也就是说在实例化这个工作线程时,这个工作线程也是一个任务被加入到线程池中。然后就是run方法,这个run方法是线程调start方法生成的线程,而Worker调的run方法并没有生成新的线程。就是一个循环,一直在不停的从队列中取任务,然后执行。可以看到,取队列的方法是take(),这个方法意思如果队列为空了,取不到数据时就阻塞队列。

然后看shutdown()

你每天辛勤的劳动着,突然接收到上面的命令,说活暂时不要接了,先停下来,当你还没搞清楚状况时,接着你的领导又把你开除了,说公司要倒了,你先下岗吧,一会我也得下岗了。这就是shutdown做的事,shutdown必须是主线程才能停止工作线程。
shutdown方法并不是用线程那种强制停止的搞法,而是先用一个标识符告诉工作线程,不要再接任务了。然后通知工作线程,你可以interrupt()了,当所有的线程停止后记得要把主线程也停掉,这样,一个简单任务的线程池就完成了。
让我们来测试一下:

public class Main{
     public static void main(String[] args){
         ThreadExcutor excutor = new ThreadExcutor(3);
         for (int i = 0; i < 3; i++) {
             excutor.exec(new Runnable() {
                 @Override
                 public void run() {
                     System.out.println("线程 " + Thread.currentThread().getName() + " 在帮我干活");
                 }
             });
         }
        excutor.shutdown();

     }
 }


import java.util.*;
import java.util.concurrent.*;

public class Main{
     public static void main(String[] args){
         ThreadExcutor excutor = new ThreadExcutor(3);
         for (int i = 0; i < 1000; i++) {
             excutor.exec(new Runnable() {
                 @Override
                 public void run() {
                     System.out.println("线程 " + Thread.currentThread().getName() + " 在帮我干活");
                 }
             });
         }
        excutor.shutdown();

     }
 }

class ThreadExcutor{

    //创建
    private volatile boolean RUNNING = true;

    //所有任务都放队列中,让工作线程来消费
    private static BlockingQueue<Runnable> queue = null;

    private final HashSet<Worker> workers = new HashSet<Worker>();

    private final List<Thread> threadList = new ArrayList<Thread>();

    int poolSize = 0;     //池子大小

    int currentSize = 0;  //创建了多少个线程

    boolean shutdown = false;

    public ThreadExcutor(int poolSize){
        this.poolSize = poolSize;
        queue = new LinkedBlockingQueue<Runnable>(poolSize);
    }

    //提交任务给线程池
    public void exec(Runnable runnable) {
        if (runnable == null) throw new NullPointerException();
        if(currentSize < poolSize){     //当前工作线程数小于池子大小
            addThread(runnable);        //创建一个线程来执行任务
        }else{
            //System.out.println("offer" +  runnable.toString() + "   " + queue.size());
            try {
                queue.put(runnable);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public void addThread(Runnable runnable){
        currentSize ++;
        Worker worker = new Worker(runnable);        //实例化一个Worker的同时,将runnable任务添加进队列
        workers.add(worker);
        Thread t = new Thread(worker);               //创建一个线程,worker本身实现了runnable接口
        threadList.add(t);
        try {
            t.start();                           //开启线程 执行worker的run方法
        }catch (Exception e){
            e.printStackTrace();
        }

    }

    public void shutdown() {
        RUNNING = false;
        if(!workers.isEmpty()){
            for (Worker worker : workers){
                worker.interruptIfIdle();              //调用worker成员方法,中断自身所在线程
            }
        }
        shutdown = true;
        Thread.currentThread().interrupt();
    }
   //这里留个位置放内部类Worker
    class  Worker implements Runnable{

        public Worker(Runnable runnable){
            queue.offer(runnable);
        }

        // worker内部类实现了runnable接口,run方法循环从queue队列里取任务执行,直到池子shutdown
        @Override
        public void run() {
            while (RUNNING){
                Runnable task = null;
                if(shutdown == false){
                    try {
                        task = getTask();
                        task.run();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }

            }
        }

        public Runnable getTask() throws InterruptedException {
            return queue.take();
        }

        public void interruptIfIdle() {                            //idle 空闲的
            for (Thread thread :threadList) {
                System.out.println(thread.getName() + " interrupt");
                thread.interrupt();
            }
        }
    }
 }

http://www.cnblogs.com/wxwall/p/7050698.html

时间: 2024-11-09 02:28:06

自己实现一个简单线程池的相关文章

一个简单线程池的实现---需进一步完善

1.定义一个任务结构体,和一个线程池结构体 struct task{ void *(*p)(void*);//需要实现的函数: void *arg;//函数所带的参数 struct task *next;};struct pthread_pool{ pthread_mutex_t mutex;//线程锁 pthread_cond_t cond;//条件变量 pthread_t *tids;//线程id int thread_nums;//需创建的线程的数量 struct task *head;任

一个简单线程池例子

// Test.cpp : 定义控制台应用程序的入口点. // #include "stdafx.h" #include <windows.h> #include <process.h> #include <vector> typedef unsigned (__stdcall*LP_THREAD_FUN)(void*); class Thread{ public: Thread(){ m_fun = NULL; m_param = NULL; m_

Linux多线程实践(9) --简单线程池的设计与实现

线程池的技术背景 在面向对象编程中,创建和销毁对象是很费时间的,因为创建一个对象要获取内存资源或者其它更多资源.在Java中更是如此,虚拟机将试图跟踪每一个对象,以便能够在对象销毁后进行垃圾回收.所以提高服务程序效率的一个手段就是尽可能减少创建和销毁对象的次数,特别是一些很耗资源的对象创建和销毁.如何利用已有对象来服务(不止一个不同的任务)就是一个需要解决的关键问题,其实这就是一些"池化资源"技术产生的原因.比如大家所熟悉的数据库连接池正是遵循这一思想而产生的,本文将介绍的线程池技术同

简单线程池的实现

1. 什么是线程池 线程池是线程的集合,拥有若干个线程,线程池中的线程一般用于执行大量的且相对短暂的任务.如果一个任务执行的时间很长,那么就不适合放在线程池中处理,比如说一个任务的执行时间跟进程的生命周期是一致的,那么这个线程的处理就没有必要放到线程池中调度,用一个普通线程即可. 线程池中线程的个数太少的话会降低系统的并发量,太多的话又会增加系统的开销.一般而言,线程池中线程的个数与线程的类型有关系,线程的类型分为 1.     计算密集型任务: 2.     I/O密集型任务. 计算密集型任务

Linux下简单线程池的实现

线程池的技术背景 在面向对象编程中,创建和销毁对象是很费时间的,因为创建一个对象要获取内存资源或者其它更多资源.在Java中更是如此,虚拟机将试图跟踪每一个对象,以便能够在对象销毁后进行垃圾回收.所以提高服务程序效率的一个手段就是尽可能减少创建和销毁对象的次数,特别是一些很耗资源的对象创建和销毁.如何利用已有对象来服务(不止一个不同的任务)就是一个需要解决的关键问题,其实这就是一些"池化资源"技术产生的原因.比如大家所熟悉的数据库连接池正是遵循这一思想而产生的,本文将介绍的线程池技术同

LINUX下的简单线程池

前言 任何一种设计方式的引入都会带来额外的开支,是否使用,取决于能带来多大的好处和能带来多大的坏处,好处与坏处包括程序的性能.代码的可读性.代码的可维护性.程序的开发效率等. 线程池适用场合:任务比较多,需要拉起大量线程来处理:任务的处理时间相对比较短,按照线程的周期T1(创建阶段).T2(执行阶段).T3(销毁阶段)来算,执行阶段仅占用较少时间. 简单的线程池通常有以下功能:预创建一定数量的线程:管理线程任务,当工作线程没有事情可做时休眠自己:销毁线程池. 复杂一些的线程池有额外的调节功能:管

C++版简单线程池

需求 之前写过一个C#版本的简单线程池http://blog.csdn.net/ylbs110/article/details/51224979 由于刚刚学习了C++11新特性中的future,于是想到用它来实现一个线程池. 实现 思路基本和C#版本的一样,主要区别是委托的实现,线程句柄的不同和线程锁: 本来C++有function模板,但是实现起来比较麻烦,这里主要是实现线程池,所以动态参数的委托就不实现了,直接使用typedef void(*Func)();来实现一个无参数无返回值的函数指针

发一个可伸缩线程池大小的python线程池。已通过测试。

发一个可伸缩线程池大小的线程池. 当任务不多时候,不开那么多线程,当任务多的时候开更多线程.当长时间没任务时候,将线程数量减小到一定数量. """ 可自动实时调节线程数量的线程池. """ import atexit import queue import sys import threading import time import weakref from app.utils_ydf import LoggerMixin, nb_print,

自实现简单线程池

线程池在现在的系统和框架中十分常见.明白线程池的思想原理,不仅对学习线程只是有很大的帮助.对理解一些系统的线程池实现也有很大的帮助.下面是我自己简单实现的一个线程池.用以对线程的简单理解. 线程的实现原理很简单: 线程池对象包含以下组件:工作者队列,Job队列: 用户通过线程池对象添加删除工作者,线程池对象维持工作者对象这个池和工作者的实际工作: 工作者池中的线程在用户没用明确关闭前不断的从Job队列拿取job执行job. 好了,一切看代码: 1.以接口编程,首先创建ThreadPool接口: