JAVA进阶----ThreadPoolExecutor机制(转)

ThreadPoolExecutor机制 
一、概述 
1、ThreadPoolExecutor作为java.util.concurrent包对外提供基础实现,以内部线程池的形式对外提供管理任务执行,线程调度,线程池管理等等服务; 
2、Executors方法提供的线程服务,都是通过参数设置来实现不同的线程池机制。 
3、先来了解其线程池管理的机制,有助于正确使用,避免错误使用导致严重故障。同时可以根据自己的需求实现自己的线程池

二、核心构造方法讲解 
下面是ThreadPoolExecutor最核心的构造方法

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

构造方法参数讲解

参数名 作用
corePoolSize 核心线程池大小
maximumPoolSize 最大线程池大小
keepAliveTime 线程池中超过corePoolSize数目的空闲线程最大存活时间;可以allowCoreThreadTimeOut(true)使得核心线程有效时间
TimeUnit keepAliveTime时间单位
workQueue 阻塞任务队列
threadFactory 新建线程工厂
RejectedExecutionHandler 当提交任务数超过maxmumPoolSize+workQueue之和时,任务会交给RejectedExecutionHandler来处理

重点讲解: 
其中比较容易让人误解的是:corePoolSize,maximumPoolSize,workQueue之间关系。

1.当线程池小于corePoolSize时,新提交任务将创建一个新线程执行任务,即使此时线程池中存在空闲线程。 
2.当线程池达到corePoolSize时,新提交任务将被放入workQueue中,等待线程池中任务调度执行 
3.当workQueue已满,且maximumPoolSize>corePoolSize时,新提交任务会创建新线程执行任务 
4.当提交任务数超过maximumPoolSize时,新提交任务由RejectedExecutionHandler处理 
5.当线程池中超过corePoolSize线程,空闲时间达到keepAliveTime时,关闭空闲线程 
6.当设置allowCoreThreadTimeOut(true)时,线程池中corePoolSize线程空闲时间达到keepAliveTime也将关闭

线程管理机制图示: 

三、Executors提供的线程池配置方案

1、构造一个固定线程数目的线程池,配置的corePoolSize与maximumPoolSize大小相同,同时使用了一个无界LinkedBlockingQueue存放阻塞任务,因此多余的任务将存在再阻塞队列,不会由RejectedExecutionHandler处理

public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

2、构造一个缓冲功能的线程池,配置corePoolSize=0,maximumPoolSize=Integer.MAX_VALUE,keepAliveTime=60s,以及一个无容量的阻塞队列 SynchronousQueue,因此任务提交之后,将会创建新的线程执行;线程空闲超过60s将会销毁

public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

3、构造一个只支持一个线程的线程池,配置corePoolSize=maximumPoolSize=1,无界阻塞队列LinkedBlockingQueue;保证任务由一个线程串行执行

public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

4、构造有定时功能的线程池,配置corePoolSize,无界延迟阻塞队列DelayedWorkQueue;有意思的是:maximumPoolSize=Integer.MAX_VALUE,由于DelayedWorkQueue是无界队列,所以这个值是没有意义的

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }

public static ScheduledExecutorService newScheduledThreadPool(
            int corePoolSize, ThreadFactory threadFactory) {
        return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
    }

public ScheduledThreadPoolExecutor(int corePoolSize,
                             ThreadFactory threadFactory) {
        super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
              new DelayedWorkQueue(), threadFactory);
    }

四、定制属于自己的线程池

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

public class CustomThreadPoolExecutor {

    private ThreadPoolExecutor pool = null;

    /**
     * 线程池初始化方法
     *
     * corePoolSize 核心线程池大小----10
     * maximumPoolSize 最大线程池大小----30
     * keepAliveTime 线程池中超过corePoolSize数目的空闲线程最大存活时间----30+单位TimeUnit
     * TimeUnit keepAliveTime时间单位----TimeUnit.MINUTES
     * workQueue 阻塞队列----new ArrayBlockingQueue<Runnable>(10)====10容量的阻塞队列
     * threadFactory 新建线程工厂----new CustomThreadFactory()====定制的线程工厂
     * rejectedExecutionHandler 当提交任务数超过maxmumPoolSize+workQueue之和时,
     *                             即当提交第41个任务时(前面线程都没有执行完,此测试方法中用sleep(100)),
     *                                   任务会交给RejectedExecutionHandler来处理
     */
    public void init() {
        pool = new ThreadPoolExecutor(
                10,
                30,
                30,
                TimeUnit.MINUTES,
                new ArrayBlockingQueue<Runnable>(10),
                new CustomThreadFactory(),
                new CustomRejectedExecutionHandler());
    }

    public void destory() {
        if(pool != null) {
            pool.shutdownNow();
        }
    }

    public ExecutorService getCustomThreadPoolExecutor() {
        return this.pool;
    }

    private class CustomThreadFactory implements ThreadFactory {

        private AtomicInteger count = new AtomicInteger(0);

        @Override
        public Thread newThread(Runnable r) {
            Thread t = new Thread(r);
            String threadName = CustomThreadPoolExecutor.class.getSimpleName() + count.addAndGet(1);
            System.out.println(threadName);
            t.setName(threadName);
            return t;
        }
    }

    private class CustomRejectedExecutionHandler implements RejectedExecutionHandler {

        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            // 记录异常
            // 报警处理等
            System.out.println("error.............");
        }
    }

    // 测试构造的线程池
    public static void main(String[] args) {
        CustomThreadPoolExecutor exec = new CustomThreadPoolExecutor();
        // 1.初始化
        exec.init();

        ExecutorService pool = exec.getCustomThreadPoolExecutor();
        for(int i=1; i<100; i++) {
            System.out.println("提交第" + i + "个任务!");
            pool.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        Thread.sleep(300);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println("running=====");
                }
            });
        }

        // 2.销毁----此处不能销毁,因为任务没有提交执行完,如果销毁线程池,任务也就无法执行了
        // exec.destory();

        try {
            Thread.sleep(10000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

方法中建立一个核心线程数为30个,缓冲队列有10个的线程池。每个线程任务,执行时会先睡眠0.1秒,保证提交40个任务时没有任务被执行完,这样提交第41个任务是,会交给CustomRejectedExecutionHandler 类来处理。

http://825635381.iteye.com/blog/2184680

时间: 2024-11-19 08:40:10

JAVA进阶----ThreadPoolExecutor机制(转)的相关文章

JAVA进阶----ThreadPoolExecutor机制(转)

http://825635381.iteye.com/blog/2184680 ThreadPoolExecutor机制 一.概述 1.ThreadPoolExecutor作为java.util.concurrent包对外提供基础实现,以内部线程池的形式对外提供管理任务执行,线程调度,线程池管理等等服务: 2.Executors方法提供的线程服务,都是通过参数设置来实现不同的线程池机制. 3.先来了解其线程池管理的机制,有助于正确使用,避免错误使用导致严重故障.同时可以根据自己的需求实现自己的线

Java进阶之reflection(反射机制)——反射概念与基础

反射机制是Java动态性之一,而说到动态性首先得了解动态语言.那么何为动态语言? 一.动态语言 动态语言,是指程序在运行时可以改变其结构:新的函数可以引进,已有的函数可以被删除等结构上的变化.比如常见的JavaScript就是动态语言,除此之外Ruby,Python等也属于动态语言,而C.C++则不属于动态语言. 二.Java是动态语言吗? 从动态语言能在运行时改变程序结构结构或则变量类型上看,Java和C.C++一样都不属于动态语言. 但是JAVA却又一个非常突出的与动态相关的机制:反射机制.

Java进阶 六 Java反射机制可恶问题NoSuchFieldException

作为一种重要特性,Java反射机制在很多地方会用到.在此做一小结,供朋友们参考. 首先从一个问题开始着手. 可恶的问题又来了,NoSuchFieldException,如下图所示: 完全不知道这个question是从哪里来的.以前也遇到过这样的问题,后来解决了,但是没有写文档,再次相遇这样的问题,傻了. 经过上网一番查找,发现遇到这个问题的小盆友还真不少,这个问题是属于java反射机制里的. 这是一个反射对象时候的异常,已经被捕获了的.这个报错代码是混淆了的,是不是这个question对象被混淆

Java进阶书籍推荐

学习Java,书籍是必不可少的学习工具之一,尤其是对于自学者而言.废话不多说,下边就给广大程序猿们推荐一些Java进阶的好书. 第一部分:Java语言篇 1.<Java编程规范> 适合对象:初级.中级 介绍:这本书的作者是被誉为Java之父的James Gosling,入门者推荐阅读,对基础的讲解很不错. 2.<Java编程思想> 适合对象:初级.中级 介绍:豆瓣给出了9.1的评分,全球程序员广泛赞誉.有人说这本书不适合初学者,不过小编认为作者并没有对读者已有的知识经验有过多要求,

Java 垃圾回收机制概述

摘要: Java技术体系中所提倡的 自动内存管理 最终可以归结为自动化地解决了两个问题:给对象分配内存 以及 回收分配给对象的内存,而且这两个问题针对的内存区域就是Java内存模型中的 堆区.关于对象分配内存问题,笔者的博文<JVM 内存模型概述>已经阐述了 如何划分可用空间及其涉及到的线程安全问题,本文将结合垃圾回收策略进一步给出内存分配规则.垃圾回收机制的引入可以有效的防止内存泄露.保证内存的有效使用,也大大解放了Java程序员的双手,使得他们在编写程序的时候不再需要考虑内存管理.本文着重

JAVA进阶之旅(二)——认识Class类,反射的概念,Constructor,Fiald,Method,反射Main方法,数组的反射和实践

JAVA进阶之旅(二)--认识Class类,反射的概念,Constructor,Fiald,Method,反射Main方法,数组的反射和实践 我们继续聊JAVA,这次比较有意思,那就是反射了 一.认识Class类 想要反射,你就必须要了解一个类--Class,我们知道,java程序中的各个java类都属于同一事物,我们通常用Classliability描述对吧,反射这个概念从JDK1.2就出来了,历史算是比较悠久了,这个Class可不是关键字哦,这个是一个类,他代表的是一类事物: 我们归根结底就

Java进阶(二十五)Java连接mysql数据库(底层实现)

Java进阶(二十五)Java连接mysql数据库(底层实现) 前言 很长时间没有系统的使用java做项目了.现在需要使用java完成一个实验,其中涉及到java连接数据库.让自己来写,记忆中已无从搜索.特将之前使用的方法做一简单的总结.也能够在底层理解一下连接数据库的具体步骤. 实现 首先需要导入相关的jar包,我使用的为:mysql-connector-java-5.1.7-bin.jar. 下面来看一下我所使用的数据库连接方法类: MysqlUtil.java package cn.edu

java进阶学习计划

断断续续使用java也已经有两年了,算是最熟悉的开发工具了.但写的代码都是以项目为导向,追求work around,还需要打好基础才能长远发展. 大致的进阶学习计划, 阶段1:深究java语法,阅读常用库的jdk源码,了解jvm机制; 阶段2:阅读基于java的开源框架源码,各种framework,container. 希望可以坚持下来,经常更新技术博客. java进阶学习计划

Java进阶之路

Java进阶之路--从初级程序员到架构师,从小工到专家. 怎样学习才能从一名Java初级程序员成长为一名合格的架构师,或者说一名合格的架构师应该有怎样的技术知识体系,这是不仅一个刚刚踏入职场的初级程序员也是工作三五年之后开始迷茫的老程序员经常会问到的问题.希望这篇文章会是你看到过的最全面最权威的回答. 一: 编程基础 不管是C还是C++,不管是Java还是PHP,想成为一名合格的程序员,基本的数据结构和算法基础还是要有的.下面几篇文章从思想到实现,为你梳理出常用的数据结构和经典算法. 1-1 常