线程池的原理与实现

  java中的线程池框架为Executors,但是这里我们将自己实现简单的线程池,主要目的是理解它的原理。

线程池主要由两个部分组成:

(1)线程数组,用于执行任务。

(2)任务队列。

下面的两个实现都是按照这种思路来做的。

一.简单的线程池,有点问题

package com.chuiyuan.utils;

import java.util.LinkedList;

/**
 * Created by chuiyuan on 2/21/16.
 * start:
 * create a new thread and run the job in run()
 * run:
 * not create any new thread, just run job in current thread
 *
 * TO DO:
 * (1)how to stop the thread ?
 */
public class MyWorkQueue {
    //Executors
    /**
     * thread pool size
     */
    private final int n ;
    /**
     * Runnable job queue,LinkedList
     */
    private final LinkedList queue ;
    /**
     *
     */
    private final PoolWorker [] threads ;
    /**
     * init n threads for workers,
     * start all work threads
     * @param n  thread pool size
     */
    public MyWorkQueue(int n){
        this.n = n;
        queue = new LinkedList();
        threads = new PoolWorker[n];

        for (int i=0;i<n ;i++){
            threads[i] = new PoolWorker();//if no,NullPointerException
            threads[i].start();//start all work threads
        }
    }

    /**
     * work thread
     */
    private class PoolWorker extends Thread{
        public void run(){
            Runnable r =null ;\\if not null, may has not initialized error
            while(true){
                synchronized (queue){
                    //if queue is empty, wait
                    while (queue.isEmpty()){
                        try {
                            queue.wait(10);
                        }catch (InterruptedException e){

                        }
                    }
                    //if queue not empty,get job from queue and do it
                    //if change LinkedList to blocking queue?
                    //this must in synchronized
                    if (!queue.isEmpty()) {
                        r = (Runnable) queue.removeFirst();
                    }
                }
                //out of synchronized
                try {
                    if (r!= null) r.run();
                }catch (RuntimeException e){}

            }
        }
    }

    /**
     * add and execute Runnable job
     * @param r
     */
    public void execute(Runnable r ){
        synchronized (queue){
            queue.addLast(r);
            queue.notify();
        }
    }

}

存在的问题:

没有进行线程关闭。

值得注意的是工作线程中的run方法逻辑。在从任务队列中取任务的时候,要对队列进行加锁,但是run的时候是不加锁的。

二.简单的线程池改进

  这里我们使用了单例模式,可以指定线程池中线程数,且在添加时,可以以单任务,任务List,任务数组的方式添加。

package com.chuiyuan.utils;

import java.util.LinkedList;
import java.util.List;

/**
 * Created by chuiyuan on 2/21/16.
 * Main:
 * WorkThread [] workThreads
 * List<Runnable> taskQueue
 */
public final class ThreadPool {
    //default 5 thread
    private static int worker_num =5;
    //worker thread
    private WorkThread [] workThreads;
    //tasks done
    private static volatile int finished_task=0;
    //task queue, as a buffer, List not thread safe
    private List<Runnable> taskQueue = new LinkedList<Runnable>() ;
    private static ThreadPool threadPool ;

    private ThreadPool(){
        this(5);
    }

    private ThreadPool(int worker_num){
        this.worker_num = worker_num;
        workThreads = new WorkThread[worker_num];
        for (int i=0;i<worker_num;i++){
            workThreads[i] = new WorkThread();
            workThreads[i].start();//start thread in pool
        }
    }
    /**
     * singleton
     */
    public static ThreadPool getThreadPool(){
        return getThreadPool(worker_num);
    }

    public static ThreadPool getThreadPool(int worker_num1){
        if (worker_num1<=0){
            worker_num1 = ThreadPool.worker_num;
        }
        if (threadPool ==null){
            threadPool = new ThreadPool(worker_num1);
        }
        return threadPool;
    }

    /**
     * Just add task to TaskQueue, when to start task is
     * decided by ThreadPool
     * @param task
     */
    public void execute(Runnable task){
        synchronized (taskQueue){
            taskQueue.add(task);
            taskQueue.notify();
        }
    }

    /**
     * Add task to TaskQueue in batch
     * @param tasks
     */
    public void execute(Runnable [] tasks){
        synchronized (taskQueue){
            for (Runnable task :tasks){
                taskQueue.add(task);
            }
            taskQueue.notify();
        }
    }

    /**
     * Add task in List
     * @param tasks
     */
    public void execute(List<Runnable> tasks){
        synchronized (taskQueue){
            for (Runnable task: tasks){
                taskQueue.add(task);
            }
            taskQueue.notify();
        }
    }

    /**
     * shutdown all the thread if all tasks are done,if not,
     * wait till all done
     */
    public void shutdown(){
        //if not done ,sleep for a while??
        while (!taskQueue.isEmpty()){
            try {          System.out.println("Thread "+Thread.cuurentThread().getName()+"want to shudown ThreadPool");
                Thread.sleep(10);
            }catch (InterruptedException e){
                e.printStackTrace();
            }
        }
        for (int i=0;i<worker_num;i++){
            workThreads[i].stopWorker();
            workThreads[i] = null;
        }
        threadPool = null ;
        taskQueue.clear();//clear taskQueue
    }

    public int getWorkThreadNumber(){
        return worker_num;
    }

    /**
     * tasks pop out of TaskQueue, it may haven‘t
     * done
     * @return
     */
    public int getFinishedTaskNumber(){
        return  finished_task;
    }

    /**
     * tasks left in TaskQueue
     * @return
     */
    public int getWaitTaskNumber(){
        return taskQueue.size();
    }

    @Override
    public String toString(){
        return "WorkThreadNumber:"+getWorkThreadNumber()+",FinishedTaskNumber"+
                getFinishedTaskNumber()+",WaitTaskNumber:"+getWaitTaskNumber();
    }

    private class WorkThread extends Thread{
        //use to stop thread
        private boolean isRunning = true ;

        /**
         * key in run()
         * if TaskQueue is not null, get task and run.
         * if TaskQueue is null, wait
         */
        @Override
        public void run(){
            Runnable r = null;
            //inner class has a reference of outclass.this
            //so it can read outclass.this.taskQueue
            while(isRunning){
                synchronized (taskQueue){
                    while(isRunning&& taskQueue.isEmpty()){
                        try {
                            taskQueue.wait(20);
                        }catch (InterruptedException e){
                            e.printStackTrace();
                        }
                    }
                    if (!taskQueue.isEmpty()){
                        r = taskQueue.remove(0);//get out task
                    }
                }
                if (r!=null){
                    r.run();//run task in this thread
                }
                finished_task++;
                r = null ;
            }
        }

        public void stopWorker(){
            isRunning = false;
        }
    }
}

值得注意的地方有:

(1)对象数组的初始化

对于基本数据类型的数组,在new时,同时也对元素进行了初始化。

对于数组对象,使用new只是对数组本身分配空间,但是数组元素并没有初始化,也就是数组元素都为空。

还要对数组的每个元素进行初始化。

(2)单例模式的使用

  将构造函数私有化,再通过公共的静态getThreadPool对私有的构造函数进行调用。

(3)execute方法

  execute方法只是添加了Runnable任务,但是任务的调度则是由ThreadPool进行的。在添加任务的时候,要对TaskQueue进行加锁,添加完成后要notify()。

notify(),notifyAll(),wait()为Object方法,只能在synchronized块中使用,而sleep()为Thread()方法,可以在非同步块中使用。

(4)shutdown方法

  里面的意思是,当外部线程(一般是主线程)想关闭ThreadPool,如果任务队列中还有任务没有执行,则主线程sleep(10),线程池则接着工作,过10后再看任务队列是否为空,如此循环。

wait():causes the current thread to wait until either another thread invokes notify() or notifyAll(), or the specified time has passed. The current thread must own this object‘s monitor.

(5)工作线程

  核心。

时间: 2024-12-25 05:40:54

线程池的原理与实现的相关文章

Java 线程池的原理与实现

最近在学习线程池.内存控制等关于提高程序运行性能方面的编程技术,在网上看到有一哥们写得不错,故和大家一起分享. [分享]Java 线程池的原理与实现 这几天主要是狂看源程序,在弥补了一些以前知识空白的同时,也学会了不少新的知识(比如 NIO),或者称为新技术吧.线程池就是其中之一,一提到线程,我们会想到以前<操作系统>的生产者与消费者,信号量,同步控制等等.一提到池,我们会想到数据库连接池,但是线程池又如何呢? 建议:在阅读本文前,先理一理同步的知识,特别是syncronized同步关键字的用

Java 多线程:线程池实现原理

前言 我们都知道,所谓线程池,那么就是相当于有一个池子,线程就放在这个池子中进行重复利用,能够减去了线程的创建和销毁所带来的代价.但是这样并不能很好的解释线程池的原理,下面从代码的角度分析一下线程池的实现. 线程池的相关类 对于原理,在 Java 中,有几个接口,类 值得我们关注: Executor ExecutorService AbstractExecutorService ThreadPoolExecutor Executor public interface Executor {    

Java线程池的原理及几类线程池的介绍

刚刚研究了一下线程池,如果有不足之处,请大家不吝赐教,大家共同学习.共同交流. 在什么情况下使用线程池? 单个任务处理的时间比较短 将需处理的任务的数量大 使用线程池的好处: 减少在创建和销毁线程上所花的时间以及系统资源的开销 如不使用线程池,有可能造成系统创建大量线程而导致消耗完系统内存以及"过度切换". 线程池工作原理: 为什么要用线程池? 诸如 Web 服务器.数据库服务器.文件服务器或邮件服务器之类的许多服务器应用程序都面向处理来自某些远程来源的大量短小的任务.请求以某种方式到

Java 线程池的原理与实现 (转)

  最近在学习线程池.内存控制等关于提高程序运行性能方面的编程技术,在网上看到有一哥们写得不错,故和大家一起分享. [分享]Java 线程池的原理与实现 这几天主要是狂看源程序,在弥补了一些以前知识空白的同时,也学会了不少新的知识(比如 NIO),或者称为新技术吧.线程池就是其中之一,一提到线程,我们会想到以前<操作系统>的生产者与消费者,信号量,同步控制等等.一提到池,我们会想到数据库连接池,但是线程池又如何呢? 建议:在阅读本文前,先理一理同步的知识,特别是syncronized同步关键字

我眼中的java线程池实现原理

最近在看java线程池实现方面的源码,在此做个小结,因为网上关于线程池源码分析的博客挺多的,我也不打算重复造轮子啦,仅仅用纯语言描述的方式做做总结啦! 个人认为要想理解清楚java线程池实现原理,明白下面几个问题就可以了: (1):线程池存在哪些状态,这些状态之间是如何进行切换的呢? (2):线程池的种类有哪些? (3):创建线程池需要哪些参数,这些参数的具体含义是什么? (4):将任务添加到线程池之后运行流程? (5):线程池是怎么做到重用线程的呢? (6):线程池的关闭 首先回答第一个问题:

11 java 线程池 实现原理

一 关键类的实现 1 ThreadPoolExecutor类 java.uitl.concurrent.ThreadPoolExecutor类是线程池中最核心的一个类,因此如果要透彻地了解Java中的线程池,必须先了解这个类. 下面我们来看一下ThreadPoolExecutor类的具体实现源码. 在ThreadPoolExecutor类中提供了四个构造方法: 1 public class ThreadPoolExecutor extends AbstractExecutorService {

Java的Executor框架和线程池实现原理

一,Java的Executor框架 1,Executor接口 public interface Executor { void execute(Runnable command); } Executor接口是Executor框架中最基础的部分,定义了一个用于执行Runnable的execute方法,它没有实现类只有另一个重要的子接口ExecutorService 2,ExecutorService接口 //继承自Executor接口 public interface ExecutorServic

并发编程 15—— 线程池 之 原理二

Java并发编程实践 目录 并发编程 01—— ConcurrentHashMap 并发编程 02—— 阻塞队列和生产者-消费者模式 并发编程 03—— 闭锁CountDownLatch 与 栅栏CyclicBarrier 并发编程 04—— Callable和Future 并发编程 05—— CompletionService : Executor 和 BlockingQueue 并发编程 06—— 任务取消 并发编程 07—— 任务取消 之 中断 并发编程 08—— 任务取消 之 停止基于线

并发编程 14—— 线程池 之 原理一

Java并发编程实践 目录 并发编程 01—— ConcurrentHashMap 并发编程 02—— 阻塞队列和生产者-消费者模式 并发编程 03—— 闭锁CountDownLatch 与 栅栏CyclicBarrier 并发编程 04—— Callable和Future 并发编程 05—— CompletionService : Executor 和 BlockingQueue 并发编程 06—— 任务取消 并发编程 07—— 任务取消 之 中断 并发编程 08—— 任务取消 之 停止基于线

Java线程池实现原理与技术

本文将通过实现一个简易的线程池理解线程池的原理,以及介绍JDK中自带的线程池ThreadPoolExecutor和Executor框架. 1.无限制线程的缺陷 多线程的软件设计方法确实可以最大限度地发挥多核处理器的计算能力,提高生产系统的吞吐量和性能.但是,若不加控制和管理的随意使用线程,对系统的性能反而会产生不利的影响. 一种最为简单的线程创建和回收的方法类似如下: new Thread(new Runnable() { @Override public void run() { //do s