不惑JAVA之JAVA基础 - 线程池

简单原理

一个比较简单的线程池至少应包含线程池管理器、工作线程、任务队列、任务接口等部分。

  • 线程池管理器(ThreadPool Manager)的作用是创建、销毁并管理线程池,将工作线程放入线程池中;
  • 工作线程是一个可以循环执行任务的线程,在没有任务时进行等待;
  • 任务队列的作用是提供一种缓冲机制,将没有处理的任务放在任务队列中;
  • 任务接口是每个任务必须实现的接口,主要用来规定任务的入口、任务执行完后的收尾工作、任务的执行状态等,工作线程通过该接口调度任务的执行。

线程池的优点

  1. 降低资源消耗,通过重复利用已创建的线程降低线程创建和销毁造成的消耗;
  2. 提高响应速度,当任务到达时,任务可以不需要的等到线程创建就能立即执行;
  3. 提高线程的可管理性,线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。

例子

之前项目中用到的代码

public Integer repayUnprocessBorrowRepayTaskIds(boolean isMultiThread) {

        int updateOverdueBadloanThreadPoolSize = BORROW_REPAY_THREAD_POOL_SIZE;
        // 吐槽一下,对Redis的滥用啊,写到配置文件中不是更好么?
        String updateOverdueBadloanThreadPoolSize_s = redisService.get(Constants.BORROW_REPAY_THREAD_POOL_SIZE);
        if (StringUtil.isNotEmpty(updateOverdueBadloanThreadPoolSize_s)) {
            updateOverdueBadloanThreadPoolSize = Integer.parseInt(updateOverdueBadloanThreadPoolSize_s);
        }
        log.info("BORROW_REPAY_THREAD_POOL_SIZE:" + updateOverdueBadloanThreadPoolSize);
        // ------1 线程池初始化代码
        ExecutorService executorService = Executors.newFixedThreadPool(updateOverdueBadloanThreadPoolSize);
        int _counter = 0;
        List<Integer> borrowRepayTaskIds = borrowRepayTaskService.getUnprocessBorrowRepayTaskIds(updateOverdueBadloanThreadPoolSize);
        if (StringUtil.isNotEmpty(borrowRepayTaskIds)) {
            log.info("repayUnprocessBorrowRepayTaskIds ready to process, count:" + borrowRepayTaskIds.size());
            for (Integer borrowRepayTaskId : borrowRepayTaskIds) {
                _counter++;
                if (isMultiThread && updateOverdueBadloanThreadPoolSize > 1) { // 多线程
                // --------2 线程池执行代码
                    executorService.execute(new BorrowRepayTaskThread(this, borrowRepayTaskId));
                } else { // 单线程
                    repayByTask(borrowRepayTaskId);
                }
            }
        } else {
            log.info("repayUnprocessBorrowRepayTaskIds ready to process, count:0");
        }
        executorService.shutdown();
        try {
            while (!executorService.awaitTermination(10, TimeUnit.SECONDS)) {
                log.info("Waiting repayUnprocessBorrowRepayTaskIds thread pool close..."); // Waiting thread pool close...
            }
        } catch (InterruptedException ex) { // should not reach here
            ex.printStackTrace();
            executorService.shutdownNow();
            Thread.currentThread().interrupt();
        } catch (Exception ex) { // should not reach here
            ex.printStackTrace();
            executorService.shutdownNow();
        }
        log.info("repayUnprocessBorrowRepayTaskIds done, count:" + _counter);
        return _counter;
    }
  1. Executors类中提供的几个静态方法来创建线程池;
  2. 这里使用了newFixedThreadPool线程池。newFixedThreadPool的作用是创建固定大小的线程池,每次提交一个任务就创建一个线程,直到线程达到线程池的最大大小。线程池的大小一旦达到最大值就会保持不变,如果某个线程因为执行异常而结束,那么线程池会补充一个新线程;
  3. ExecutorService是线程池接口

几种常用线程池

newSingleThreadExecutor

创建一个单线程的线程池。这个线程池只有一个线程在工作,也就是相当于单线程串行执行所有任务。如果这个唯一的线程因为异常结束,那么会有一个新的线程来替代它。此线程池保证所有任务的执行顺序按照任务的提交顺序执行。

newFixedThreadPool(常用)

创建固定大小的线程池。每次提交一个任务就创建一个线程,直到线程达到线程池的最大大小。线程池的大小一旦达到最大值就会保持不变,如果某个线程因为执行异常而结束,那么线程池会补充一个新线程。

newCachedThreadPool

创建一个可缓存的线程池。如果线程池的大小超过了处理任务所需要的线程,那么就会回收部分空闲(60秒不执行任务)的线程,当任务数增加时,此线程池又可以智能的添加新线程来处理任务。此线程池不会对线程池大小做限制,线程池大小完全依赖于操作系统(或者说JVM)能够创建的最大线程大小。

newScheduledThreadPool

创建一个大小无限的线程池。此线程池支持定时以及周期性执行任务的需求。

newFixedThreadPool底层实现

public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }

this是调用的
public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

参数解释:

  • corePoolSize - 池中所保存的线程数,包括空闲线程。 (通过应用参数传过来的)
  • maximumPoolSize - 池中允许的最大线程数。 (与corePoolSize数量一致)
  • keepAliveTime - 当线程数大于核心时,此为终止前多余的空闲线程等待新任务的最长时间。 (这里是0L)
  • unit - keepAliveTime 参数的时间单位。
  • workQueue - 执行前用于保持任务的队列。此队列仅由保持 execute 方法提交的 Runnable 任务。

    (newFixedThreadPool使用的是LinkedBlockingQueue线程安全阻塞队列)

  • handler - 由于超出线程范围和队列容量而使执行被阻塞时所使用的处理程序。

    (newFixedThreadPool使用的是RejectedExecutionHandler)

详细说一下corePoolSize

核心池的大小,这个参数跟后面讲述的线程池的实现原理有非常大的关系。在创建了线程池后,默认情况下,线程池中并没有任何线程,而是等待有任务到来才创建线程去执行任务,除非调用了prestartAllCoreThreads()或者prestartCoreThread()方法,从这2个方法的名字就可以看出,是预创建线程的意思,即在没有任务到来之前就创建corePoolSize个线程或者一个线程。默认情况下,在创建了线程池后,线程池中的线程数为0,当有任务来之后,就会创建一个线程去执行任务,当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中。

LinkedBlockingQueue:一个基于链表结构的阻塞队列,此队列按FIFO (先进先出) 排序元素,吞吐量通常要高于ArrayBlockingQueue。静态工厂方法Executors.newFixedThreadPool()使用了这个队列。

线程池的主要工作流程

可以看到使用ThreadPoolExecutor方法实现的,深入研究一下了解具体的实现原理。

  1. 首先线程池判断基本线程池是否已满?没满,创建一个工作线程来执行任务。满了,则进入下个流程。
  2. 其次线程池判断工作队列是否已满?没满,则将新提交的任务存储在工作队列里。满了,则进入下个流程。
  3. 最后线程池判断整个线程池是否已满?没满,则创建一个新的工作线程来执行任务,满了,则交给饱和策略来处理这个任务。

线程池执行任务方法

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
        // 如果线程数小于基本线程数,则创建线程并执行当前任务
    if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
        if (runState == RUNNING && workQueue.offer(command)) {
            if (runState != RUNNING || poolSize == 0)
                ensureQueuedTaskHandled(command);
        }
        // 如果线程池不处于运行中或任务无法放入队列,并且当前线程数量小于最大允许的线程数量,则创建一个线程执行任务。
        else if (!addIfUnderMaximumPoolSize(command))
            reject(command); // is shutdown or saturated
    }
}

工作线程

线程池创建线程时,会将线程封装成工作线程Worker,Worker在执行完任务后,还会无限循环获取工作队列里的任务来执行。我们可以从Worker的run方法里看到这点。

public void run() {
        try {
            Runnable task = firstTask;
            firstTask = null;
            while (task != null || (task = getTask()) != null) {
                runTask(task);
                task = null;
            }
        } finally {
            workerDone(this);   //当任务队列中没有任务时,进行清理工作
        }
    }
}

线程池配置与监控

下面这些方法可以了解一下,我觉得最好的方法就是通过压力测试,观察核心功能的处理能力,服务器CPU、IO和JVM的具体情况,进行调试是最好的。

合理配置线程池

要想合理的配置线程池,就必须首先分析任务特性,可以从以下几个角度来进行分析:

1. 任务的性质:CPU密集型任务,IO密集型任务和混合型任务。
2. 任务的优先级:高,中和低。
3. 任务的执行时间:长,中和短。
4. 任务的依赖性:是否依赖其他系统资源,如数据库连接。
5. 任务性质不同的任务可以用不同规模的线程池分开处理。CPU密集型任务配置尽可能少的线程数量,如配置Ncpu+1个线程的线程池。IO密集型任务则由于需要等待IO操作,线程并不是一直在执行任务,则配置尽可能多的线程,如2*Ncpu。
6. 建议使用有界队列。

线程池的监控

通过线程池提供的参数进行监控。线程池里有一些属性在监控线程池的时候可以使用

* taskCount:线程池需要执行的任务数量。
* completedTaskCount:线程池在运行过程中已完成的任务数量。小于或等于taskCount。
* largestPoolSize:线程池曾经创建过的最大线程数量。通过这个数据可以知道线程池是否满过。如等于线程池的最大大小,则表示线程池曾经满了。
* getPoolSize:线程池的线程数量。如果线程池不销毁的话,池里的线程不会自动销毁,所以这个大小只增不减。
* getActiveCount:获取活动的线程数。

通过扩展线程池进行监控。通过继承线程池并重写线程池的beforeExecute,afterExecute和terminated方法,我们可以在任务执行前,执行后和线程池关闭前干一些事情。如监控任务的平均执行时间,最大执行时间和最小执行时间等。

参考

http://www.cnblogs.com/dolphin0520/p/3932921.html

http://ifeve.com/java-threadpool/

http://my.oschina.net/jielucky/blog/157250?fromerr=imwl7YeR 这个解释的比较详细。

时间: 2024-11-05 21:02:51

不惑JAVA之JAVA基础 - 线程池的相关文章

Java多线程-新特性-线程池

Sun在Java5中,对Java线程的类库做了大量的扩展,其中线程池就是Java5的新特征之一,除了线程池之外,还有很多多线程相关的内容,为多线程的编程带来了极大便利.为了编写高效稳定可靠的多线程程序,线程部分的新增内容显得尤为重要. 有关Java5线程新特征的内容全部在java.util.concurrent下面,里面包含数目众多的接口和类,熟悉这部分API特征是一项艰难的学习过程.目前有关这方面的资料和书籍都少之又少,大部分介绍线程方面书籍还停留在java5之前的知识层面上. 在Java5之

Java并发编程:线程池的使用

在前面的文章中,我们使用线程的时候就去创建一个线程,这样实现起来非常简便,但是就会有一个问题: 如果并发的线程数量很多,并且每个线程都是执行一个时间很短的任务就结束了,这样频繁创建线程就会大大降低系统的效率,因为频繁创建线程和销毁线程需要时间. 那么有没有一种办法使得线程可以复用,就是执行完一个任务,并不被销毁,而是可以继续执行其他的任务? 在Java中可以通过线程池来达到这样的效果.今天我们就来详细讲解一下Java的线程池,首先我们从最核心的ThreadPoolExecutor类中的方法讲起,

【转】线程池体系介绍及从阿里Java开发手册学习线程池的正确创建方法

jdk1.7中java.util.concurrent.Executor线程池体系介绍 java.util.concurrent.Executor : 负责线程的使用与调度的根接口  |–ExecutorService:Executor的子接口,线程池的主要接口  |–ThreadPoolExecutor:ExecutorService的实现类  |–ScheduledExecutorService:ExecutorService的子接口,负责线程的调度  |–ScheduledThreadPo

【转】Java并发编程:线程池的使用

Java并发编程:线程池的使用 在前面的文章中,我们使用线程的时候就去创建一个线程,这样实现起来非常简便,但是就会有一个问题: 如果并发的线程数量很多,并且每个线程都是执行一个时间很短的任务就结束了,这样频繁创建线程就会大大降低系统的效率,因为频繁创建线程和销毁线程需要时间. 那么有没有一种办法使得线程可以复用,就是执行完一个任务,并不被销毁,而是可以继续执行其他的任务? 在Java中可以通过线程池来达到这样的效果.今天我们就来详细讲解一下Java的线程池,首先我们从最核心的ThreadPool

Java并发编程:线程池的使用(转)

Java并发编程:线程池的使用 在前面的文章中,我们使用线程的时候就去创建一个线程,这样实现起来非常简便,但是就会有一个问题: 如果并发的线程数量很多,并且每个线程都是执行一个时间很短的任务就结束了,这样频繁创建线程就会大大降低系统的效率,因为频繁创建线程和销毁线程需要时间. 那么有没有一种办法使得线程可以复用,就是执行完一个任务,并不被销毁,而是可以继续执行其他的任务? 在Java中可以通过线程池来达到这样的效果.今天我们就来详细讲解一下Java的线程池,首先我们从最核心的ThreadPool

Java自带的线程池ThreadPoolExecutor

一.线程池引入 Java的线程池是Java5.0以后的新功能,它让开发者更易开发高效的多线程程序,也让多线程程序的性能大大提高.Java提供的关于线程池的API是基于原有线程API的,只是用另外一种方式来使用Java的多线程编程功能. 简单来说,线程池就是一个或者多个线程的集合.一般而言,线程池有一下几个部分. 完成任务的一个或者多个线程 用于调度管理的管理线程 要求执行的任务队列 那么为什么要使用线程池呢?当短时间内需要处理的任务数量很多时,使用线程池的好处体现在两点, 减少了创建和销毁线程的

运用JAVA的concurrent.ExecutorService线程池实现socket的TCP和UDP连接

运用JAVA的concurrent.ExecutorService线程池实现socket的TCP和UDP连接 最近在项目中可能要用到socket相关的东西来发送消息,所以初步研究了下socket的TCP和UDP实现方式,并且结合java1.5的concurrent.ExecutorService类来实现多线程. 具体实现方式见代码: 一.TCP方式: 1.服务端实现方式: TCP的服务端实现方式主要用到ServerSocket类,接收等待客户端连接的方法是accept(); 代码如下:类Sock

【转】Java学习---深入理解线程池

[原文]https://www.toutiao.com/i6566022142666736131/ 我们使用线程的时候就去创建一个线程,这样实现起来非常简便,但是就会有一个问题: 如果并发的线程数量很多,并且每个线程都是执行一个时间很短的任务就结束了,这样频繁创建线程就会大大降低系统的效率,因为频繁创建线程和销毁线程需要时间. 那么有没有一种办法使得线程可以复用,就是执行完一个任务,并不被销毁,而是可以继续执行其他的任务? 在Java中可以通过线程池来达到这样的效果.今天我们就来详细讲解一下Ja

java所提供的线程池有几种之线程池总结

今天给大家带来一个关于java线程池的资料,主要是因为在早些时候去面试的时候就被问到了线程池的问题,回答的不尽人意,今天突然有人问我一个同样的问题,我觉的我还是总结一下,看起来也方便. Java提供的几种线程池线程池,顾名思义,放线程的池子嘛,这个池子可以存放多少线程取决于你自己采用什么样的线程池,你的硬件资源,以及并发线程的数量.JDK提供了下面的四种线程池: 固定线程数的线程池# 最简单的 在Java中创建一个线程池,这很简单,只需要两行代码. CopyExecutorService exe

0726------Linux基础----------线程池

#ifndef __DEF_H__ #define __DEF_H__ #include <stddef.h> #include <pthread.h> #include <stdio.h> #define TRUE 1 #define FALSE 0 //任务结构体 typedef struct{ void (*thread_function_ptr_) (void*); void *arg_; }task_t; //队列结构体 typedef struct node