一个简单的线程池



/**
 *
 * @author hc
 * @version 1.0
 *
 * @param <Job>
 */
public interface ThreadPool<Job extends Runnable>{
	//执行一个job
	void execute(Job job);
	//关闭线程
	void shutdown();
	//增加工作者线程
	void addWorkers(int num);
	//减少工作者线程
	void removeWorkers(int num);
	//正在等待执行的线程数量
	int getJobSize();

}


import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;

/**
 *
 * @author hc
 * @version 1.0
 * @param <Job>
 */
public class ThreadPoolImpl<Job extends Runnable> implements ThreadPool<Job> {
    //最大的线程池数量
    private static final int Max_Worker_Numbers=10;
    //最少工作线程数量
    private static final int Min_Worker_Numbers=1;
    //默认的工作线程数量
    private static final int Default_Worker_Numbers=5;
    //工作列表
    private final LinkedList<Job> jobs=new LinkedList<Job>();
    //工作的执行者列表
    private final List<ThreadPoolImpl<Job>.Worker>workers=Collections.synchronizedList(new ArrayList<ThreadPoolImpl<Job>.Worker>());
    //工作线程数量
    private int workerNum=Default_Worker_Numbers;
    //线程编号生成
    private AtomicLong threadNum=new AtomicLong();
    public ThreadPoolImpl(){
        initializeWorkers(Default_Worker_Numbers);
    }
    /**
     *
     * @param num 初始化的工作线程数量
     */
    public ThreadPoolImpl(int num){
        if(num>Max_Worker_Numbers){
            initializeWorkers(Max_Worker_Numbers);
            workerNum=Max_Worker_Numbers;
        }else if(num<Min_Worker_Numbers){
            initializeWorkers(Min_Worker_Numbers);
            workerNum=Min_Worker_Numbers;
        }else{
            initializeWorkers(num);
            workerNum=num;
        }
    }
    //添加需要执行的任务
    @Override
    public void execute(Job job) {
        // TODO Auto-generated method stub
        if(job!=null){
            synchronized (jobs) {
                jobs.addLast(job);
                jobs.notify();
            }
        }
    }
   /**
    * 停止执行
    */
    @Override
    public void shutdown() {
        // TODO Auto-generated method stub
        for(ThreadPoolImpl.Worker worker: workers){
            worker.shutdown();
        }
    }
    /**
     * 增加工作线程
     */
    @Override
    public void addWorkers(int num) {
        // TODO Auto-generated method stub
        synchronized (jobs) {
            if(workers.size()+num>Max_Worker_Numbers){
                num=Max_Worker_Numbers-workers.size();
            }
            initializeWorkers(num);
            workerNum+=num;
        }
    }
   /**
    * 减少工作线程数量
    */
    @Override
    public void removeWorkers(int num) {
        // TODO Auto-generated method stub
        synchronized (jobs) {
            if(num>this.workerNum){
                throw new IllegalArgumentException("超过实际工作线程");
            }
            int count=0;
            while(count<num){
                Worker worker= workers.get(count);
                if(workers.remove(worker)){
                    worker.shutdown();
                    count++;
                }
                workerNum--;
            }
        }
    }

    @Override
    public int getJobSize() {
        // TODO Auto-generated method stub
        return jobs.size();
    }
    //工作者初始化
    private void initializeWorkers(int num){
        for(int i=0;i<num;i++){
            Worker worker=new Worker();
            workers.add(worker);
            Thread thread=new Thread(worker,"Thread-Worker-"+threadNum.incrementAndGet());
            thread.start();
        }
    }
    //工作的执行者,内部类
      class Worker implements Runnable{
        private volatile boolean isRunning=true;
        @Override
        public void run() {
            while(isRunning){
                Job job=null;
                synchronized (jobs) {
                    while(jobs.isEmpty()){
                        try {
                            jobs.wait();
                        } catch (InterruptedException e) {
                            // TODO Auto-generated catch block
                            Thread.currentThread().interrupt();
                            return;
                        }
                    }
                    //获取到需要执行的工作
                    job=jobs.removeFirst();
                }
                if(job!=null){
                    try {
                        job.run();
                    } catch (Exception e) {
                        // TODO: handle exception
                    }
                }
            }
        }
        public void shutdown(){
            isRunning=false;
        }

    }
}
 
时间: 2024-07-30 13:12:20

一个简单的线程池的相关文章

一个简单的线程池程序设计(消费者和生产者)

最近在学习linux下的编程,刚开始接触感觉有点复杂,今天把线程里比较重要的线程池程序重新理解梳理一下. 实现功能:创建一个线程池,该线程池包含若干个线程,以及一个任务队列,当有新的任务出现时,如果任务队列不满,则把该任务加入到任务队列中去,并且向线程发送一个信号,调用某个线程为任务队列中的任务服务.如果线程池中的线程都在忙,那么任务队列中的任务则等待.本程序较为简单,把任务定义为了两个数相加,输出它们的和. 采用自顶向下的设计方法,先把整体框架构建出来,然后再慢慢把细节,小模块补全. 1.在l

Linux C 一个简单的线程池程序设计

实现功能:创建一个线程池,该线程池包含若干个线程,以及一个任务队列,当有新的任务出现时,如果任务队列不满,则把该任务加入到任务队列中去,并且向线程发送一个信号,调用某个线程为任务队列中的任务服务.如果线程池中的线程都在忙,那么任务队列中的任务则等待.本程序较为简单,把任务定义为了两个数相加,输出它们的和. 采用自顶向下的设计方法,先把整体框架构建出来,然后再慢慢把细节,小模块补全. 1.在linux环境下构建三个文件夹(include,src,bin) include:包含该程序所需要的头文件.

Linux C 实现一个简单的线程池

线程池的定义 线程池是一种多线程处理形式,处理过程中将任务添加到队列,然后在创建线程后自动启动这些任务.线程池线程都是后台线程.每个线程都使用默认的堆栈大小,以默认的优先级运行,并处于多线程单元中.如果某个线程在托管代码中空闲(如正在等待某个事件),则线程池将插入另一个辅助线程来使所有处理器保持繁忙.如果所有线程池线程都始终保持繁忙,但队列中包含挂起的工作,则线程池将在一段时间后创建另一个辅助线程但线程的数目永远不会超过最大值.超过最大值的线程可以排队,但他们要等到其他线程完成后才启动. 什么时

一个简单的线程池实现

前段时间学习了线程方面的知识,看了关于线程池的教程,自己也试着实现一个.跟大家分享,同时也整理整理思路.   对线程池的要求: 1.用于处理大量短暂的任务. 2.动态增加线程,直到达到最大允许的线程数量. 3.动态销毁线程.   线程池的实现类似于"消费者--生产者"模型: 用一个队列存放任务(仓库,缓存) 主线程添加任务(生产者生产任务) 新建线程函数执行任务(消费者执行任务) 由于任务队列是全部线程共享的,就涉及到同步问题.这里采用条件变量和互斥锁来实现. ------------

一个最简单的线程池

import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /**  * 一个最简单的线程池,这个模型很简单,但是很有用  *  * @author leizhimin 2014/8/22 20:21  */ public class Test3 {     private static final ExecutorService threadPool = Executors.newFix

自己动手实现简单的线程池

为了节省系统在多线程并发情况下不断的创建新和销毁线程所带来的性能浪费,就需要引入线程池. 线程池的基本功能就是线程复用.每当系统提交一个任务时,会尝试从线程池内取出空闲线程来执行它.如果没有空闲线程,这时候再创建新的线程.任务执行完毕,线程也不会立即销毁,而是加入到线程池中以便下次复用. Java提供了多种线程池的实现,以满足不同业务的需求.为了理解它们,下面给出一个最简单的线程池的实现. 线程池主要分为两大部分,线程池和一些永不退出的线程 首先是线程池部分: package optimisti

一个简单的线程锁------pthread和win32的临界区(Critical Section)

临界区: 临界区是指一个小代码段,在代码能够执行前,它必须独占对某些资源的访问权.这是让若干代码能够"以原子操作方式"来使用资源的一种方法. 所谓原子(atomic)操作方式,是指这段代码知道没有别的线程要访问这个资源. 说明: 1.  MacOSX,Windows有自己的线程模型, pthread可以说是跨平台的线程编程模型解决方案,当然对pthread不熟悉的也可以使用本地线程模型, 其实pthread的win32版本也是基于本地线程模型的, pthread-win32的mutex

一个Windows下线程池的实现(C++)

前言 本文配套代码:https://github.com/TTGuoying/ThreadPool 先看看几个概念: 线程:进程中负责执行的执行单元.一个进程中至少有一个线程. 多线程:一个进程中有多个线程同时运行,根据cpu切换轮流工作,在多核cpu上可以几个线程同时在不同的核心上同时运行. 线程池:基本思想还是一种对象池思想,开辟一块内存空间,里面存放一些休眠(挂起Suspend)的线程.当有任务要执行时,从池中取一个空闲的线程执行任务,执行完成后线程休眠放回池中.这样可以避免反复创建线程对

Python简单的线程池

class ThreadPool(object): def __init__(self, max_num=20): # 创建一个队列,队列里最多只能有10个数据 self.queue = queue.Queue(max_num) # 在队列里填充线程类 # [线程类.线程类.线程类.线程类.线程类.线程类.线程类] for i in range(max_num): self.queue.put(threading.Thread) def get_thread(self): # 去队列里去数据,