Java并发编程总结5——ThreadPoolExecutor

一、ThreadPoolExecutor介绍

在jdk1.8中,构造函数有4个。以

ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)为例:

1、corePoolSize: 核心线程池大小

2、maximumPoolSize: 最大线程池大小

3、keepAliveTime: 当线程池中的线程数大于corePoolSize时, 多余空闲线程等待新任务的最长时间, 超过这个时间后多余线程终止

4、unit: 时间单位, 比如毫秒, 纳秒等

5、workQueue: 阻塞队列

6、threadFactory: 创建线程工厂, 可以方便的创建线程, 可以自定义; 默认为Executors.DefaultThreadFactory

7、handler: 饱和策略, 默认为AbortPolicy

定义一个完整的线程池可以这么写:

private ThreadPoolExecutor defaultThreadPool = new ThreadPoolExecutor(10, 100, 100, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(1000), Executors.defaultThreadFactory(), new ThreadPoolExecutor.DiscardPolicy());

线程池的主要处理流程如下:

1、提交任务到线程池,判断核心线程池(corePoolSize)是否已满? 没满,创建工作线程执行任务;满了,进入下个流程。

2、线程池判断工作队列(workQueue)是否已满?没满,则将新提交的任务存储在工作队列里。满了,则进入下个流程。

3、判断整个线程池(maximumPoolSize)是否已满?没满,则创建一个新的工作线程来执行任务,满了,则交给饱和策略(RejectedExecutionHandler)来处理这个任务。

二、饱和策略 RejectedExecutionHandler

在ThreadPoolExecutor类中定义了4种RejectedExecutionHandler类型:

1、AbortPolicy: 默认策略,直接抛出RejectedExecutionException异常。

2、CallerRunsPolicy:只用调用者所在线程来运行任务。

3、DiscardOldestPolicy:丢弃队列里最近的一个任务,并执行当前任务。

4、DiscardPolicy:不处理,丢弃掉。

也可以自定义饱和策略,比如将无法处理的新任务加入日志等,只需要实现RejectedExecutionHandler接口即可。

三、阻塞队列BlockingQueue

    /**
     * ArrayBlockingQueue: 由数组结构组成的有界阻塞队列, 队列遵循FIFO
     */
    private BlockingQueue<String> abq = new ArrayBlockingQueue<String>(10);

    /**
     * LinkedBlockingQueue: 由链表结构组成的阻塞队列, FIFO
     * LinkedBlockingDeque: 由链表结构组成的双向阻塞队列, 构造方法和LinkedBlockingQueue类似
     * public LinkedBlockingQueue(int capacity)     //有界阻塞队列,队列最大值为capacity
     * public LinkedBlockingQueue() { this(Integer.MAX_VALUE); }        //无界阻塞队列,队列最大值为Integer.MAX_VALUE
     * 通过Executors.newFixedThreadPool(int nThreads)创建的线程池中采用的是无界LinkedBlockingQueue
     * 对于put和take操作, 内部采用了不同的锁: putLock, takeLock, 而ArrayBlockingQueue内部只有一把锁
     */
    private BlockingQueue<Thread> lbq = new LinkedBlockingQueue<Thread>(10);
    private ExecutorService fixedThreadPool = Executors.newFixedThreadPool(10);
    private BlockingQueue<String> lbd = new LinkedBlockingDeque<String>();

    /**
     * PriorityBlockingQueue: 支持优先级排序的有界阻塞队列
     * public PriorityBlockingQueue(int initialCapacity, Comparator<? super E> comparator)
     */
    private BlockingQueue<String> pbq = new PriorityBlockingQueue(100, new Comparator<String>() {
        public int compare(String o1, String o2) {
            return o1.compareTo(o2);        //升序排列
        }
    });

    /**
     * DelayQueue: 一个使用优先级队列实现的无界阻塞队列, 支持延时获取元素, 适用于缓存系统的设计以及定时任务调度。
     * 内部队列采用PriorityQueue, private final PriorityQueue<E> q = new PriorityQueue<E>();
     * 队列元素需要实现Delayed接口, class DelayQueue<E extends Delayed> extends AbstractQueue<E>
     */

    /**
     * SynchronousQueue: 不存储元素的阻塞队列
     */

    /**
     * LinkedTransferQueue: 由链表结构组成的实现了TransferQueue接口的无界阻塞队列
     * transfer方法: 如果当前消费者正在等待接收元素(take()或poll(long timeout, TimeUnit unit))方法, 生产者传入的元素可以直接传递给消费者, 而不放入队列
     */
    BlockingQueue<String> ltq = new LinkedTransferQueue<String>();

四、通过Executors创建线程池

       /**
         * 创建单个线程, 适用于需要保证顺序执行任务的场景
         * return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()));
         * BlockingQueue采用无界LinkedBlockingQueue, 因此maximumPoolSize、keepAliveTime参数无意义
         */
        Executors.newSingleThreadExecutor();
        /**
         * 创建固定线程, 适用于需要限制当前线程数量, 负载比较重的服务器
         * return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
         * 采用无界LinkedBlockingQueue, 因此maximumPoolSize、keepAliveTime参数无意义
         */
        Executors.newFixedThreadPool(10);
        /**
         * 根据需要创建线程, 适用于执行很多的短期异步任务的小程序, 或者负载较轻的服务器
         * return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
         * 初始线程为0, 采用SynchronousQueue阻塞队列, 如果生产任务的速度低于消费的速度, 空闲60s的线程会被终止; 如果生产任务的速度持续高于消费速度, 则会不断创建新线程
         */
        Executors.newCachedThreadPool();
        /**
         * 在给定的延迟之后运行任务, 或者定期执行任务
         * super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue());
         * 采用无界DelayQueue队列, 因此maximumPoolSize、keepAliveTime参数无意义
         */
        Executors.newScheduledThreadPool(10);        
时间: 2024-10-11 00:42:03

Java并发编程总结5——ThreadPoolExecutor的相关文章

Java并发编程笔记 并发概览

并发概览 >>同步 如何同步多个线程对共享资源的访问是多线程编程中最基本的问题之一.当多个线程并发访问共享数据时会出现数据处于计算中间状态或者不一致的问题,从而影响到程序的正确运行.我们通常把这种情况叫做竞争条件(race condition),把并发访问共享数据的代码叫做关键区域(critical section).同步就是使得多个线程顺序进入关键区域从而避免竞争条件的发生. >>线程安全性 编写线程安全的代码的核心是要对状态访问操作进行管理,尤其是对共享的和可变的状态访问. 线

【转】Java并发编程:线程池的使用

Java并发编程:线程池的使用 在前面的文章中,我们使用线程的时候就去创建一个线程,这样实现起来非常简便,但是就会有一个问题: 如果并发的线程数量很多,并且每个线程都是执行一个时间很短的任务就结束了,这样频繁创建线程就会大大降低系统的效率,因为频繁创建线程和销毁线程需要时间. 那么有没有一种办法使得线程可以复用,就是执行完一个任务,并不被销毁,而是可以继续执行其他的任务? 在Java中可以通过线程池来达到这样的效果.今天我们就来详细讲解一下Java的线程池,首先我们从最核心的ThreadPool

Java 并发编程之任务取消(八)

处理非正常的线程中止 当单线程的控制台程序由于 发生了一个未捕获的异常而终止时,程序将停止运行,并产生与程序正常输出非常不同的栈追踪信息,这种情况是很容易理解的.然而,如果并发程序中的某个线程发生故障,那么通常不会如此明显.在控制台中可能会输出栈追踪信息,但没有人会观察控制台.此外,当线程发生故障时,应用程序可能看起来仍然 在工作,所以这个失败很可能被忽略.下面要讲的问题就是监测并防止在程序中"遗漏"线程的方法 . 导致线程提前死亡的最主要原因就是RuntimeException. 我

《Java并发编程实战》第八章 线程池的使用 读书笔记

一.在任务与执行策略之间的隐性解耦 有些类型的任务需要明确地指定执行策略,包括: . 依赖性任务.依赖关系对执行策略造成约束,需要注意活跃性问题.要求线程池足够大,确保任务都能放入. . 使用线程封闭机制的任务.需要串行执行. . 对响应时间敏感的任务. . 使用ThreadLocal的任务. 1. 线程饥饿死锁 线程池中如果所有正在执行任务的线程都由于等待其他仍处于工作队列中的任务而阻塞,这种现象称为线程饥饿死锁. 2. 运行时间较长的任务 Java提供了限时版本与无限时版本.例如Thread

Java并发编程-非阻塞同步方式原子类(Atomic)的使用

非阻塞同步 在大多数情况下,我们为了实现线程安全都会使用Synchronized或lock来加锁进行线程的互斥同步,但互斥同步的最主要的问题就是进行线程的阻塞和唤醒所带来的性能问题,因此这种阻塞也称作阻塞同步.从处理问题的方式上说,互斥同步属于一种悲观的并发策略,总是认为只要不去做正确的同步措施,那就肯定会出现问题,无论共享数据是否真的会出现竞争,它都会进行加锁.用户态核心态转换.维护锁的计数器和检查是否有被阻塞的线程需要被唤醒等操作. 随着硬件指令集的发展,我们有了另一个选择:基于冲突检测的乐

Java并发编程:线程池的使用(转)

Java并发编程:线程池的使用 在前面的文章中,我们使用线程的时候就去创建一个线程,这样实现起来非常简便,但是就会有一个问题: 如果并发的线程数量很多,并且每个线程都是执行一个时间很短的任务就结束了,这样频繁创建线程就会大大降低系统的效率,因为频繁创建线程和销毁线程需要时间. 那么有没有一种办法使得线程可以复用,就是执行完一个任务,并不被销毁,而是可以继续执行其他的任务? 在Java中可以通过线程池来达到这样的效果.今天我们就来详细讲解一下Java的线程池,首先我们从最核心的ThreadPool

Java并发编程(六)阻塞队列

相关文章 Java并发编程(一)线程定义.状态和属性 Java并发编程(二)同步 Java并发编程(三)volatile域 Java并发编程(四)Java内存模型 Java并发编程(五)ConcurrentHashMap的实现原理和源码分析 前言 在Android多线程(一)线程池这篇文章时,当我们要创建ThreadPoolExecutor的时候需要传进来一个类型为BlockingQueue的参数,它就是阻塞队列,在这一篇文章里我们会介绍阻塞队列的定义.种类.实现原理以及应用. 1.什么是阻塞队

Java并发编程之线程池

一.概述 在执行并发任务时,我们可以把任务传递给一个线程池,来替代为每个并发执行的任务都启动一个新的线程,只要池里有空闲的线程,任务就会分配一个线程执行.在线程池的内部,任务被插入一个阻塞队列(BlockingQueue),线程池里的线程会去取这个队列里的任务. 利用线程池有三个好处: 降低资源消耗.通过重复利用已创建的线程降低线程创建和销毁造成的消耗 提高响应速度.当任务到达时,任务可以不需要的等到线程创建就能立即执行 提高线程的可管理性.线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,

Java并发编程深入学习

基本概念 在实践中,为了更好的利用资源提高系统整体的吞吐量,会选择并发编程.但由于上下文切换和死锁等问题,并发编程不一定能提高性能,因此如何合理的进行并发编程时本文的重点,接下来介绍关于锁最基本的一些知识(选学). volatile:轻量,保证共享变量的可见性,使得多个线程对共享变量的变更都能及时获取到.其包括两个子过程,将当前处理器缓存行的数据写回到系统内存,之后会使其他CPU里缓存了该内存地址的数据无效. synchronized:相对重量,其包含3种形式,针对普通同步方法,锁是当前实例对象