自实现简单线程池

  线程池在现在的系统和框架中十分常见。明白线程池的思想原理,不仅对学习线程只是有很大的帮助。对理解一些系统的线程池实现也有很大的帮助。下面是我自己简单实现的一个线程池。用以对线程的简单理解。

  线程的实现原理很简单:

    线程池对象包含以下组件:工作者队列,Job队列;

    用户通过线程池对象添加删除工作者,线程池对象维持工作者对象这个池和工作者的实际工作;

    工作者池中的线程在用户没用明确关闭前不断的从Job队列拿取job执行job。

  好了,一切看代码:

  1.以接口编程,首先创建ThreadPool接口:

    

/**
 * 线程池接口
 * @author yum
 *
 */
public interface ThreadPool<Job extends Runnable> {
    //执行一个Job,Job必须实现Runnable接口
    void execute(Job job);
    //关闭线程池
    void shutdown();
    //添加工作者线程
    void addWorkers(int num);
    //减少工作者线程
    void removeWorkers(int num);
    //得到真在等待执行的任务的数量
    int getJobSize();
}

  2.实现ThreadPool接口:

/**
 * 自定义默认线程池
 * @author yum
 *
 * @param <Job>
 */
public class DefaultThreadPool<Job extends Runnable> implements ThreadPool<Job>{

    //线程最大限制数
    private static final int MAX_WORKER_NUMBER = 10;
    //线程池默认数量
    private static final int DEFAULT_WORKER_NUMBER = 5;
    //线程池最小的数量
    private static final int MIN_WORKER_NUMBER = 1;
    //工作列表,可以向其中添加工作数
    private final LinkedList<Job> jobs = new LinkedList<>();
    //工作者列表
    private final List<Worker> workers = Collections.synchronizedList(new ArrayList<Worker>());
    //工作者线程的数量
    private int workerNum = 5;
    //线程编号
    private AtomicLong threadNum = new AtomicLong();

    public DefaultThreadPool() {
        initWorkers(DEFAULT_WORKER_NUMBER);
    }

    public DefaultThreadPool(int num){
        workerNum = num > MAX_WORKER_NUMBER?MAX_WORKER_NUMBER:num < MIN_WORKER_NUMBER?MIN_WORKER_NUMBER:num;
        initWorkers(workerNum);
    }

    @Override
    public void execute(Job job) {
        if(job==null)
            throw new NullPointerException();
        synchronized (jobs) {
            jobs.addLast(job);
            jobs.notify();
        }
    }

    @Override
    public void shutdown() {
        for (int i = 0; i < workers.size(); i++) {
            Worker worker = workers.get(i);
            worker.shutdown();
        }
    }

    @Override
    public void addWorkers(int num) {
        synchronized (jobs) {
            if(num+this.workerNum>MAX_WORKER_NUMBER)
                num = MAX_WORKER_NUMBER - workerNum;
            initWorkers(num);
            workerNum+=num;
        }
    }

    @Override
    public void removeWorkers(int num) {
        synchronized (jobs) {
            if(num>=workerNum)
                throw new IllegalArgumentException("beyond worknum");
            //按照给定的数量关闭worker
            int count = 0;
            while(count<num){
                Worker worker = workers.get(count);
                if(workers.remove(worker)){
                    worker.shutdown();
                    count++;
                }
            }
            this.workerNum-=num;
        }
    }

    @Override
    public int getJobSize() {
        return this.workerNum;
    }

    //初始化线程工作者
    private void initWorkers(int num){
        for (int i = 0; i < num; i++) {
            Worker worker = new Worker();
            workers.add(worker);
            Thread thread = new Thread(worker, "ThreadPool-Worker-"+threadNum.getAndIncrement());
            thread.start();
        }
    }

    //工作者线程
    class Worker implements Runnable{
        //是否工作
        private volatile boolean running = true;
        @Override
        public void run() {
            while(running){
                Job job = null;
                synchronized (jobs) {
                    while(jobs.isEmpty()){
                        try {
                            jobs.wait();
                        } catch (InterruptedException e) {
                            //感知到外部WorkerThread的中断,返回
                            Thread.currentThread().interrupt();
                            return;
                        }
                    }
                    //取出一个Job
                    job = jobs.removeFirst();
                }
                if(job!=null){
                    try {
                        job.run();
                    } catch (Exception e) {
                        //忽略Job中的异常
                    }
                }
            }
        }

        public void shutdown(){
            running = false;
        }

    }
}

  3.测试代码如下:

public class ThisTest {
    public static void main(String[] args) {

        ThreadPool<Runnable> pool = new DefaultThreadPool<>(2);
        for (int i = 0; i < 100; i++) {
            pool.execute(new countThread());
        }
    }

    static class countThread implements Runnable{
        private static volatile Integer count  = 0;
        private static Object object = new Object();

        @Override
        public void run() {
            synchronized(object){
                count++;
                System.err.println(count);
            }
        }

    }
}

  

    

时间: 2024-10-03 18:25:11

自实现简单线程池的相关文章

Linux多线程实践(9) --简单线程池的设计与实现

线程池的技术背景 在面向对象编程中,创建和销毁对象是很费时间的,因为创建一个对象要获取内存资源或者其它更多资源.在Java中更是如此,虚拟机将试图跟踪每一个对象,以便能够在对象销毁后进行垃圾回收.所以提高服务程序效率的一个手段就是尽可能减少创建和销毁对象的次数,特别是一些很耗资源的对象创建和销毁.如何利用已有对象来服务(不止一个不同的任务)就是一个需要解决的关键问题,其实这就是一些"池化资源"技术产生的原因.比如大家所熟悉的数据库连接池正是遵循这一思想而产生的,本文将介绍的线程池技术同

Linux下简单线程池的实现

线程池的技术背景 在面向对象编程中,创建和销毁对象是很费时间的,因为创建一个对象要获取内存资源或者其它更多资源.在Java中更是如此,虚拟机将试图跟踪每一个对象,以便能够在对象销毁后进行垃圾回收.所以提高服务程序效率的一个手段就是尽可能减少创建和销毁对象的次数,特别是一些很耗资源的对象创建和销毁.如何利用已有对象来服务(不止一个不同的任务)就是一个需要解决的关键问题,其实这就是一些"池化资源"技术产生的原因.比如大家所熟悉的数据库连接池正是遵循这一思想而产生的,本文将介绍的线程池技术同

LINUX下的简单线程池

前言 任何一种设计方式的引入都会带来额外的开支,是否使用,取决于能带来多大的好处和能带来多大的坏处,好处与坏处包括程序的性能.代码的可读性.代码的可维护性.程序的开发效率等. 线程池适用场合:任务比较多,需要拉起大量线程来处理:任务的处理时间相对比较短,按照线程的周期T1(创建阶段).T2(执行阶段).T3(销毁阶段)来算,执行阶段仅占用较少时间. 简单的线程池通常有以下功能:预创建一定数量的线程:管理线程任务,当工作线程没有事情可做时休眠自己:销毁线程池. 复杂一些的线程池有额外的调节功能:管

C++版简单线程池

需求 之前写过一个C#版本的简单线程池http://blog.csdn.net/ylbs110/article/details/51224979 由于刚刚学习了C++11新特性中的future,于是想到用它来实现一个线程池. 实现 思路基本和C#版本的一样,主要区别是委托的实现,线程句柄的不同和线程锁: 本来C++有function模板,但是实现起来比较麻烦,这里主要是实现线程池,所以动态参数的委托就不实现了,直接使用typedef void(*Func)();来实现一个无参数无返回值的函数指针

简单线程池原理和代码

线程池就是,预先创建一定数量的线程,然后当需要异步任务时,只要把任务放入队列中,线程池自动在队列中取任务,每执行完一个任务就自动取下一个任务 本文提供的是一个简单的线程池,所以并不提供线程的自动增减的功能,以比较简单的代码来理解其原理 代码只有一个文件,算上注释才勉强200行,由于代码较长就不全部贴在这里了. 线程池代码见Github[点击] 由于代码使用了一些c++11的东西,所以先需要复习一下以下几个东西:(不要被吓怕,就算不会其实也能懂下面的讲解,具体语法所表达的意思我会说明) std::

简单线程池的实现

1. 什么是线程池 线程池是线程的集合,拥有若干个线程,线程池中的线程一般用于执行大量的且相对短暂的任务.如果一个任务执行的时间很长,那么就不适合放在线程池中处理,比如说一个任务的执行时间跟进程的生命周期是一致的,那么这个线程的处理就没有必要放到线程池中调度,用一个普通线程即可. 线程池中线程的个数太少的话会降低系统的并发量,太多的话又会增加系统的开销.一般而言,线程池中线程的个数与线程的类型有关系,线程的类型分为 1.     计算密集型任务: 2.     I/O密集型任务. 计算密集型任务

自己实现一个简单线程池

先上原理图: 上代码之前,要先补充一下线程池构造的核心几个点 线程池里的核心线程数与最大线程数 线程池里真正工作的线程worker 线程池里用来存取任务的队列BlockingQueue 线程中的任务task 本例实现简化了一些,只实现了BlockingQueue存放任务,然后每个worker取任务并执行,下面看代码首先定义一个线程池ThreadExcutor class ThreadExcutor{ //创建 private volatile boolean RUNNING = true; //

简单线程池实现 (C版本)

1. 概述 在谈到线程池之前,我们看看并发还有哪几种方式. 多进程,包括一次启动多个进程,然后在进程间传递描述符,或者连接来了以后fork子进程处理. 多线程,同理也可以像多进程一样,要么建立一堆放在那里,要么就有连接以后"即时建立,即时销毁" 我们看上面的方法并没有什么不妥,特殊的场景总是能够派上用场的. 比如一个并发量只有两位数甚至个位数,那么上面的这些方式应付起来也很轻松.但是如果要举一个极端的例子,十万的并发,首先不可能建立十万个进程等着这个并发,也不可能即时创建即时销毁.这些

c++简单线程池实现

线程池,简单来说就是有一堆已经创建好的线程(最大数目一定),初始时他们都处于空闲状态,当有新的任务进来,从线程池中取出一个空闲的线程处理任务,然后当任务处理完成之后,该线程被重新放回到线程池中,供其他的任务使用,当线程池中的线程都在处理任务时,就没有空闲线程供使用,此时,若有新的任务产生,只能等待线程池中有线程结束任务空闲才能执行,下面是线程池的工作原理图: 我们为什么要使用线程池呢? 简单来说就是线程本身存在开销,我们利用多线程来进行任务处理,单线程也不能滥用,无止禁的开新线程会给系统产生大量