Java 线程池的原理与实现学习(一)

线程池:
    多线程技术主要解决处理器单元内多个线程执行的问题,它可以显著减少处理器单元的闲置时间,增加处理器单元的吞吐能力。
   
    假设一个服务器完成一项任务所需时间为:T1 创建线程时间,T2 在线程中执行任务的时间,T3 销毁线程时间。
   
    如果:T1 + T3 远大于 T2,则可以采用线程池,以提高服务器性能。
                一个线程池包括以下四个基本组成部分:
                1、线程池管理器(ThreadPool):用于创建并管理线程池,包括 创建线程池,销毁线程池,添加新任务;
                2、工作线程(PoolWorker):线程池中线程,在没有任务时处于等待状态,可以循环的执行任务;
                3、任务接口(Task):每个任务必须实现的接口,以供工作线程调度任务的执行,它主要规定了任务的入口,任务执行完后的收尾工作,任务的执行状态等;
                4、任务队列(taskQueue):用于存放没有处理的任务。提供一种缓冲机制。
               
    线程池技术正是关注如何缩短或调整T1,T3时间的技术,从而提高服务器程序性能的。它把T1,T3分别安排在服务器程序的启动和结束的时间段或者一些空闲的时间段,这样在服务器程序处理客户请求时,不会有T1,T3的开销了。

线程池不仅调整T1,T3产生的时间段,而且它还显著减少了创建线程的数目,看一个例子:

假设一个服务器一天要处理50000个请求,并且每个请求需要一个单独的线程完成。在线程池中,线程数一般是固定的,所以产生线程总数不会超过线程池中线程的数目,而如果服务器不利用线程池来处理这些请求则线程总数为50000。一般线程池大小是远小于50000。所以利用线程池的服务器程序不会为了创建50000而在处理请求时浪费时间,从而提高效率。

/**
 * @作者 whs
 * @创建日期 2015年2月5日
 * @版本 V 1.0
 */
package thread.pool1;

import java.util.Collections;
import java.util.Date;
import java.util.LinkedList;
import java.util.List;

import org.apache.log4j.Logger;

/**
 * 线程池
 * 创建线程池,销毁线程池,添加新任务
 */
public final class ThreadPool {
    private static Logger logger = Logger.getLogger(ThreadPool.class.getName());
    private static Logger taskLogger = Logger.getLogger("TaskLogger");

    private static boolean debug = taskLogger.isDebugEnabled();
    // private static boolean debug = taskLogger.isInfoEnabled();
    /* 单例  ,饿汉式 */
    private static ThreadPool instance = new ThreadPool();
    public static final int SYSTEM_BUSY_TASK_COUNT = 150;
    /* 默认池中线程数 */
    public static int worker_num = 5;
    /* 已经处理的任务数 */
    private static int taskCounter = 0;
    public static boolean systemIsBusy = false;

    private static List<Task> taskQueue = Collections.synchronizedList(new LinkedList<Task>());
    /* 池中的所有线程 */
    public PoolWorker[] workers;
    //默认构造函数 初始化 线程池 数目 5个
    private ThreadPool() {
        workers = new PoolWorker[5];
        for (int i = 0; i < workers.length; i++) {
            workers[i] = new PoolWorker(i);
        }
    }
    //传参时 构造函数 初始化 线程池 数目 参数pool_worker_num个
    private ThreadPool(int pool_worker_num) {
        worker_num = pool_worker_num;
        workers = new PoolWorker[worker_num];
        for (int i = 0; i < workers.length; i++) {
            workers[i] = new PoolWorker(i);
        }
    }

    public static synchronized ThreadPool getInstance() {
        return instance;
    }
    /**
     * 增加新的任务
     * 每增加一个新任务,都要唤醒任务队列
     * @param newTask
     */
    public void addTask(Task newTask) {
        synchronized (taskQueue) {
            newTask.setTaskId(++taskCounter);
            newTask.setSubmitTime(new Date());
            taskQueue.add(newTask);
            /* 唤醒队列, 开始执行 */
            taskQueue.notifyAll();
        }
        logger.info("Submit Task<" + newTask.getTaskId() + ">: " + newTask.info());
    }
    /**
     * 批量增加新任务
     * @param taskes
     */
    public void batchAddTask(Task[] taskes) {
        if (taskes == null || taskes.length == 0) {
            return;
        }
        synchronized (taskQueue) {
            for (int i = 0; i < taskes.length; i++) {
                if (taskes[i] == null) {
                    continue;
                }
                taskes[i].setTaskId(++taskCounter);
                taskes[i].setSubmitTime(new Date());
                taskQueue.add(taskes[i]);
            }
            /* 唤醒队列, 开始执行 */
            taskQueue.notifyAll();
        }
        for (int i = 0; i < taskes.length; i++) {
            if (taskes[i] == null) {
                continue;
            }
            logger.info("Submit Task<" + taskes[i].getTaskId() + ">: " + taskes[i].info());
        }
    }
    /**
     * 线程池信息
     * @return
     */
    public String getInfo() {
        StringBuffer sb = new StringBuffer();
        sb.append("\nTask Queue Size:" + taskQueue.size());
        for (int i = 0; i < workers.length; i++) {
            sb.append("\nWorker " + i + " is " + ((workers[i].isWaiting()) ? "Waiting." : "Running."));
        }
        return sb.toString();
    }
    /**
      * 销毁线程池
      */
    public synchronized void destroy() {
        for (int i = 0; i < worker_num; i++) {
            workers[i].stopWorker();
            workers[i] = null;
        }
        taskQueue.clear();
    }

    /**
     * 池中工作线程
     */
     private class PoolWorker extends Thread {
         private int index = -1;
         /* 该工作线程是否有效 */
         private boolean isRunning = true;
         /* 该工作线程是否可以执行新任务 */
         private boolean isWaiting = true;

         public PoolWorker(int index) {
             this.index = index;
             start();
         }

         public void stopWorker() {
             this.isRunning = false;
         }

         public boolean isWaiting() {
             return this.isWaiting;
         }
         /**
          * 循环执行任务
          * 这也许是线程池的关键所在
          */
         public void run() {
             while (isRunning) {
                 Task r = null;
                 synchronized (taskQueue) {
                     while (taskQueue.isEmpty()) {
                         try {
                             /* 任务队列为空,则等待有新任务加入从而被唤醒 */
                             taskQueue.wait(20);
                         } catch (InterruptedException ie) {
                             logger.error(ie);
                         }
                     }
                     /* 取出任务执行 */
                     r = (Task) taskQueue.remove(0);
                 }
                 if (r != null) {
                     isWaiting = false;
                     try {
                         if (debug) {
                             r.setBeginExceuteTime(new Date());
                             taskLogger.debug("Worker<" + index
                                     + "> start execute Task<" + r.getTaskId() + ">");
                             if (r.getBeginExceuteTime().getTime()
                                     - r.getSubmitTime().getTime() > 1000)
                                 taskLogger.debug("longer waiting time. "
                                         + r.info() + ",<" + index + ">,time:"
                                         + (r.getFinishTime().getTime() - r
                                                 .getBeginExceuteTime().getTime()));
                         }
                         /* 该任务是否需要立即执行 */
                         if (r.needExecuteImmediate()) {
                             new Thread(r).start();
                         } else {
                             r.run();
                         }
                         if (debug) {
                             r.setFinishTime(new Date());
                             taskLogger.debug("Worker<" + index
                                     + "> finish task<" + r.getTaskId() + ">");
                             if (r.getFinishTime().getTime()
                                     - r.getBeginExceuteTime().getTime() > 1000)
                                 taskLogger.debug("longer execution time. "
                                         + r.info() + ",<" + index + ">,time:"
                                         + (r.getFinishTime().getTime() - r
                                                 .getBeginExceuteTime().getTime()));
                         }
                     } catch (Exception e) {
                         e.printStackTrace();
                         logger.error(e);
                     }
                     isWaiting = true;
                     r = null;
                 }
             }
         }
     }
 }
/**
 * @作者 whs
 * @创建日期 2015年2月5日
 * @版本 V 1.0
 */
package thread.pool1;
/*
 * 所有任务接口
 * 其他任务必须继承访类
 */
import java.util.Date;

public abstract class Task implements Runnable {
	//private static Logger logger = Logger.getLogger(Task.class);
    /* 产生时间 */
    private Date generateTime = null;
    /* 提交执行时间 */
    private Date submitTime = null;
    /* 开始执行时间 */
    private Date beginExceuteTime = null;
    /* 执行完成时间 */
    private Date finishTime = null;
    private long taskId;
    public Task() {
        this.generateTime = new Date();
    }
    /**
      * 任务执行入口
      */
    @Override
    public void run() {
    	System.out.println("--------------------");
        /**
        * 相关执行代码
        * beginTransaction();
        * 执行过程中可能产生新的任务 subtask = taskCore();
        * commitTransaction();
        * 增加新产生的任务 ThreadPool.getInstance().batchAddTask(taskCore());
        */
    }

    /**
    * 所有任务的核心 所以特别的业务逻辑执行之处
    * @throws Exception
    */
    public abstract Task[] taskCore() throws Exception;

    /**
    * 是否用到数据库
    * @return
    */
    protected abstract boolean useDb();

    /**
    * 是否需要立即执行
    * @return
    */
    protected abstract boolean needExecuteImmediate();

    /**
    * 任务信息
    * @return String
    */
    public abstract String info();

    public Date getGenerateTime() {
        return generateTime;
    }

    public Date getBeginExceuteTime() {
        return beginExceuteTime;
    }

    public void setBeginExceuteTime(Date beginExceuteTime) {
        this.beginExceuteTime = beginExceuteTime;
    }

    public Date getFinishTime() {
        return finishTime;
    }

    public void setFinishTime(Date finishTime) {
        this.finishTime = finishTime;
    }

    public Date getSubmitTime() {
        return submitTime;
    }

    public void setSubmitTime(Date submitTime) {
        this.submitTime = submitTime;
    }

    public long getTaskId() {
        return taskId;
    }

    public void setTaskId(long taskId) {
        this.taskId = taskId;
    }
}
时间: 2024-11-11 16:32:10

Java 线程池的原理与实现学习(一)的相关文章

Java 线程池的原理与实现学习(二)

摘自:http://www.cnblogs.com/lxzh/archive/2013/01/20/2868736.html execute(Runnable command):履行Ruannable类型的任务 submit(task):可用来提交Callable或Runnable任务,并返回代表此任务的Future对象 invokeAll(collection of tasks):执行给定的任务,当所有任务完成时,返回保持任务状态和结果的 Future 列表. shutdown():在完成已提

Java 线程池的原理与实现

最近在学习线程池.内存控制等关于提高程序运行性能方面的编程技术,在网上看到有一哥们写得不错,故和大家一起分享. [分享]Java 线程池的原理与实现 这几天主要是狂看源程序,在弥补了一些以前知识空白的同时,也学会了不少新的知识(比如 NIO),或者称为新技术吧.线程池就是其中之一,一提到线程,我们会想到以前<操作系统>的生产者与消费者,信号量,同步控制等等.一提到池,我们会想到数据库连接池,但是线程池又如何呢? 建议:在阅读本文前,先理一理同步的知识,特别是syncronized同步关键字的用

Java线程池的原理及几类线程池的介绍

刚刚研究了一下线程池,如果有不足之处,请大家不吝赐教,大家共同学习.共同交流. 在什么情况下使用线程池? 单个任务处理的时间比较短 将需处理的任务的数量大 使用线程池的好处: 减少在创建和销毁线程上所花的时间以及系统资源的开销 如不使用线程池,有可能造成系统创建大量线程而导致消耗完系统内存以及"过度切换". 线程池工作原理: 为什么要用线程池? 诸如 Web 服务器.数据库服务器.文件服务器或邮件服务器之类的许多服务器应用程序都面向处理来自某些远程来源的大量短小的任务.请求以某种方式到

Java 线程池的原理与实现 (转)

  最近在学习线程池.内存控制等关于提高程序运行性能方面的编程技术,在网上看到有一哥们写得不错,故和大家一起分享. [分享]Java 线程池的原理与实现 这几天主要是狂看源程序,在弥补了一些以前知识空白的同时,也学会了不少新的知识(比如 NIO),或者称为新技术吧.线程池就是其中之一,一提到线程,我们会想到以前<操作系统>的生产者与消费者,信号量,同步控制等等.一提到池,我们会想到数据库连接池,但是线程池又如何呢? 建议:在阅读本文前,先理一理同步的知识,特别是syncronized同步关键字

我眼中的java线程池实现原理

最近在看java线程池实现方面的源码,在此做个小结,因为网上关于线程池源码分析的博客挺多的,我也不打算重复造轮子啦,仅仅用纯语言描述的方式做做总结啦! 个人认为要想理解清楚java线程池实现原理,明白下面几个问题就可以了: (1):线程池存在哪些状态,这些状态之间是如何进行切换的呢? (2):线程池的种类有哪些? (3):创建线程池需要哪些参数,这些参数的具体含义是什么? (4):将任务添加到线程池之后运行流程? (5):线程池是怎么做到重用线程的呢? (6):线程池的关闭 首先回答第一个问题:

11 java 线程池 实现原理

一 关键类的实现 1 ThreadPoolExecutor类 java.uitl.concurrent.ThreadPoolExecutor类是线程池中最核心的一个类,因此如果要透彻地了解Java中的线程池,必须先了解这个类. 下面我们来看一下ThreadPoolExecutor类的具体实现源码. 在ThreadPoolExecutor类中提供了四个构造方法: 1 public class ThreadPoolExecutor extends AbstractExecutorService {

Java线程池实现原理与技术

本文将通过实现一个简易的线程池理解线程池的原理,以及介绍JDK中自带的线程池ThreadPoolExecutor和Executor框架. 1.无限制线程的缺陷 多线程的软件设计方法确实可以最大限度地发挥多核处理器的计算能力,提高生产系统的吞吐量和性能.但是,若不加控制和管理的随意使用线程,对系统的性能反而会产生不利的影响. 一种最为简单的线程创建和回收的方法类似如下: new Thread(new Runnable() { @Override public void run() { //do s

深入浅出JAVA线程池使用原理1

前言: Java中的线程池是并发框架中运用最多的,几乎所有需要异步或并发执行任务的程序都可以使用线程池,线程池主要有三个好处: 1.降低资源消耗:可以重复使用已经创建的线程降低线程创建和销毁带来的消耗 2.提高响应速度:执行任务时,不需要等待线程的创建就可以直接执行任务 3.提高线程的可管理性:线程是稀缺资源,如果无限制地创建不仅会消耗系统资源,还会降低系统的稳定性,线程池可以对线程进行统一分配.调优和监控 一.线程池的实现原理 在了解线程池实现原理之前,先了解线程池的一些元素 1.核心线程池

深入浅出JAVA线程池使用原理2

一.Executor框架介绍 Executor框架将Java多线程程序分解成若干个任务,将这些任务分配给若干个线程来处理,并得到任务的结果 1.1.Executor框架组成 任务:被执行任务需要实现的接口:Runnable接口或Callable接口 任务的执行:任务执行的核心接口Executor以及其子类ExecutorService接口 任务的结果:包括Future接口以及Future接口的实现类FutureTask类 Executor接口是Executor框架的基础,将任务的提交与执行分离开