跟我学Java多线程——线程池与阻塞队列

前言

上一篇文章中我们将ThreadPoolExecutor进行了深入的学习和介绍,实际上我们在项目中应用的时候很少有直接应用ThreadPoolExecutor来创建线程池的,在jdk的api中有这么一句话“但是,强烈建议程序员使用较为方便的
Executors 工厂方法Executors.newCachedThreadPool()(无界线程池,可以进行自动线程回收)、Executors.newFixedThreadPool(int)(固定大小线程池)和Executors.newSingleThreadExecutor()(单个后台线程),它们均为大多数使用场景预定义了设置。”所以这篇文章我们继续学习其它几种线程池。

线程池分类

newCachedThreadPool()

创建一个可缓存的线程池,即这个线程池是无界线程池,无界指工作线程的创建数量几乎没有限制(其实也有限制的,数目为Interger.MAX_VALUE),这样可以灵活的往线程池中添加数据;可以进行自动线程回收指的是如果长时间没有往线程池中提交任务,即如果工作线程空闲了指定的时间,则该工作线程将自动终止。终止后,如果你又提交了新的任务,则线程池重新创建一个工作线程。

我们一般使用如下代码进行创建:

ExecutorServiceservice = Executors.newCachedThreadPool();

我们点击代码进入源码:

   /**
     * Creates a thread pool that creates newthreads as needed, but
     * will reuse previously constructedthreads when they are
     * available.  These pools will typically improve theperformance
     * of programs that execute manyshort-lived asynchronous tasks.
     * Calls to {@code execute} will reusepreviously constructed
     * threads if available. If no existingthread is available, a new
     * thread will be created and added to thepool. Threads that have
     * not been used for sixty seconds areterminated and removed from
     * the cache. Thus, a pool that remainsidle for long enough will
     * not consume any resources. Note thatpools with similar
     * properties but different details (forexample, timeout parameters)
     * may be created using {@linkThreadPoolExecutor} constructors.
     *
     * @return the newly created thread pool
     */
    public static ExecutorServicenewCachedThreadPool() {
        return new ThreadPoolExecutor(0,Integer.MAX_VALUE,
                                      60L,TimeUnit.SECONDS,
                                      newSynchronousQueue<Runnable>());
    }
 

看到代码有没有很熟悉,调用的上我们上一篇文章中的ThreadPoolExecutor类的构造方法,只不过核心线程数为0,同时指定一个最大线程数。

newFixedThreadPool(int)

固定大小线程池这个很好理解,就是创建一个指定工作线程数量的线程池,如果线程达到设置的最大数,就将提交的任务放到线程池的队列中。一个典型且优秀的线程池,它具有线程池提高程序效率和节省创建线程时所耗的开销的优点。但在线程池空闲时,即线程池中没有可运行任务时,它不会释放工作线程,还会占用一定的系统资源。

一般创建:

ExecutorServicenewFixedThreadPool=Executors.newFixedThreadPool(5);

点击进入源码:

    /**
     * Creates a thread pool that reuses afixed number of threads
     * operating off a shared unboundedqueue.  At any point, at most
     * {@code nThreads} threads will be activeprocessing tasks.
     * If additional tasks are submitted whenall threads are active,
     * they will wait in the queue until athread is available.
     * If any thread terminates due to afailure during execution
     * prior to shutdown, a new one will takeits place if needed to
     * execute subsequent tasks.  The threads in the pool will exist
     * until it is explicitly {@linkExecutorService#shutdown shutdown}.
     *
     * @param nThreads the number of threads inthe pool
     * @return the newly created thread pool
     * @throws IllegalArgumentException if{@code nThreads <= 0}
     */
    public static ExecutorServicenewFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads,nThreads,
                                      0L,TimeUnit.MILLISECONDS,
                                      newLinkedBlockingQueue<Runnable>());
    }

调用的依然是我们上一篇文章中的ThreadPoolExecutor类的构造方法,只不过核心线程数为和最大线程数一样都是我们人为指定的。

newSingleThreadExecutor()

单线程线程池,只创建唯一的线程来执行任务,如果这个线程异常结束,会有另一个取代它,保证顺序执行。

一般创建方法:

ExecutorServicenewSingleThreadExecutor = Executors.newSingleThreadExecutor();

点击进入源码:

    /**
     * Creates an Executor that uses a singleworker thread operating
     * off an unbounded queue. (Note howeverthat if this single
     * thread terminates due to a failureduring execution prior to
     * shutdown, a new one will take its placeif needed to execute
     * subsequent tasks.)  Tasks are guaranteed to execute
     * sequentially, and no more than one taskwill be active at any
     * given time. Unlike the otherwiseequivalent
     * {@code newFixedThreadPool(1)} thereturned executor is
     * guaranteed not to be reconfigurable touse additional threads.
     *
     * @return the newly createdsingle-threaded Executor
     */
    public static ExecutorServicenewSingleThreadExecutor() {
        return newFinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L,TimeUnit.MILLISECONDS,
                                    newLinkedBlockingQueue<Runnable>()));
    }
 

调用的依然是我们上一篇文章中的ThreadPoolExecutor类的构造方法,只不过核心线程数为和最大线程数一样都是1。

介绍到这里我们发现这三个线程池调用的都是ThreadPoolExecutor的构造函数,这三个线程的区别除了核心线程数和最大线程数参数不一样外,最重要的是传入的最后一个参数即workQueue是不一样的。

newCachedThreadPool的参数为SynchronousQueue,newFixedThreadPool和newSingleThreadExecutor的参数都为LinkedBlockingQueue,其实这一种排队策略也叫阻塞队列,那接下来我们就来介绍一下常见的阻塞队列。

阻塞队列BlockingQueue

阻塞队列顾名思义首先它是一个队列,常见的队列有“后进先出”的栈和“先进先出”的队列。多线程环境中,通过队列可以很容易实现数据共享,最经典的就是“生产者”和“消费者”模型,这就是一个典型的阻塞队列,比如生产者生产到一定程度必须停一下,让生产者线程挂起,这就是阻塞。

在多线程领域:所谓阻塞,在某些情况下会挂起线程(即阻塞),一旦条件满足,被挂起的线程又会自动被唤醒)

java.util.concurrent包中的BlockingQueue就是阻塞队列的接口,作为BlockingQueue的使用者,我们再也不需要关心什么时候需要阻塞线程,什么时候需要唤醒线程,因为这一切BlockingQueue都给你一手包办了,并且它还是线程安全的。那我们现在来看下BlockingQueue接口的源码:

public interfaceBlockingQueue<E> extends Queue<E> {

    boolean add(E e);

    boolean offer(E e);

    void put(E e) throws InterruptedException;

    boolean offer(E e, long timeout, TimeUnitunit)
        throws InterruptedException;

    E take() throws InterruptedException;

    E poll(long timeout, TimeUnit unit)
        throws InterruptedException;

    int remainingCapacity();

    boolean remove(Object o);

    public boolean contains(Object o);

    int drainTo(Collection<? super E> c);

    int drainTo(Collection<? super E> c,int maxElements);
}

上面就是接口的所有方法,现在我们就介绍下这个接口中的核心方法:

放入数据:

    boolean add(E e);

这个方法将将泛型对象加到BlockingQueue里,即如果BlockingQueue可以容纳,则返回true,否则返回false.(本方法不阻塞当前执行方法的线程)

    boolean offer(E e);

这个方法将将泛型对象加到BlockingQueue里,即如果BlockingQueue可以容纳,则返回true,否则返回false.(本方法不阻塞当前执行方法的线程)

    boolean offer(E e, long timeout, TimeUnitunit)throws InterruptedException;

这个方法可以设定等待的时间,如果在指定的时间内,还不能往队列中加入BlockingQueue,则返回失败。(本方法不阻塞当前执行方法的线程)

    void put(E e) throws InterruptedException;

这个方法把泛型对象放到BlockingQueue里,如果BlockQueue没有空间,则调用此方法的线程被阻断直到BlockingQueue里面有空间再继续.(本方法有阻塞的功能)

移除数据:

    boolean remove(Object o);

这个方法从BlockingQueue取出一个队首的对象,如果在指定时间内,队列一旦有数据可取,则立即返回队列中的数据。否则知道时间超时还没有数据可取,返回失败。(本方法不阻塞当前执行方法的线程)

    E poll(long timeout, TimeUnit unit) throws InterruptedException;

这个方法从BlockingQueue取出一个队首的对象,如果在指定时间内,队列一旦有数据可取,则立即返回队列中的数据。否则知道时间超时还没有数据可取,返回失败。(本方法不阻塞当前执行方法的线程)

     E take() throws InterruptedException;

这个方法是取走BlockingQueue里排在首位的对象,若BlockingQueue为空,阻断进入等待状态直到BlockingQueue有新的数据被加入;(本方法有阻塞的功能)

    int drainTo(Collection<? super E> c);

这个方法是取走BlockingQueue里排在首位的对象,取不到时返回null;(本方法不阻塞当前执行方法的线程)

    int drainTo(Collection<? super E> c,int maxElements);

这个方法是取走BlockingQueue里排在首位的对象,若不能立即取出,则可以等time参数规定的时间,取不到时返回null;(本方法不阻塞当前执行方法的线程)

总结一下BlockingQueue接口中的方法,这些方法以四种形式出现,对于不能立即满足但可能在将来某一时刻可以满足的操作,这四种形式的处理方式不同:第一种是抛出一个异常,第二种是返回一个特殊值(null或 false,具体取决于操作),第三种是在操作可以成功前,无限期地阻塞当前线程,第四种是在放弃前只在给定的最大时间限制内阻塞。


抛出异常


特殊值


阻塞


超时


插入


add(e)


offer(e)


put(e)


offer(e,time,unit)


移除


remove()


poll()


take()


poll(time,unit)


检查


element()


peek()


不可用


不可用

BlockingQueue实现类

1)ArrayBlockingQueue:基于数组实现的一个阻塞队列,在创建ArrayBlockingQueue对象时必须制定容量大小,以便缓存队列中数据对象。并且可以指定公平性与非公平性,默认情况下为非公平的,即不保证等待时间最长的队列最优先能够访问队列。其所含的对象是以FIFO(先入先出)顺序排序的.

2)LinkedBlockingQueue:基于链表实现的一个阻塞队列,在创建LinkedBlockingQueue对象时如果不指定容量大小,则默认大小为Integer.MAX_VALUE。其所含的对象是以FIFO(先入先出)顺序排序的

3)PriorityBlockingQueue:类似于LinkedBlockQueue,但其所含对象的排序不是FIFO,它会按照元素的优先级对元素进行排序,按照优先级顺序出队,每次出队的元素都是优先级最高的元素。注意,此阻塞队列为无界阻塞队列,即容量没有上限(通过源码就可以知道,它没有容器满的信号标志),前面2种都是有界队列。

4)DelayQueue:基于PriorityQueue,一种延时阻塞队列,DelayQueue中的元素只有当其指定的延迟时间到了,才能够从队列中获取到该元素。DelayQueue也是一个无界队列,因此往队列中插入数据的操作(生产者)永远不会被阻塞,而只有获取数据的操作(消费者)才会被阻塞。

5)SynchronousQueue:一种无缓冲的等待队列,类似于无中介的直接交易,

其中LinkedBlockingQueue和ArrayBlockingQueue比较起来,它们背后所用的数据结构不一样,导致LinkedBlockingQueue的数据吞吐量要大于ArrayBlockingQueue,但在线程数量很大时其性能的可预见性低于ArrayBlockingQueue.

总结

我们这篇文章延续了上一篇文章中关于ThreadPoolExecutor线程池的一些内容,分别是newCachedThreadPool、newFixedThreadPool、newSingleThreadExecutor,同时根据这些线程池与ThreadPoolExecutor的关系,进而引出了阻塞队列BlockingQueue,于是我们详细介绍了接口BlockingQueue和接口中的方法,最后又介绍了接口BlockingQueue的实现类。

版权声明:本文为博主原创文章,未经博主允许不得转载。

时间: 2025-01-21 22:04:48

跟我学Java多线程——线程池与阻塞队列的相关文章

跟我学Java多线程——线程池与堵塞队列

前言 上一篇文章中我们将ThreadPoolExecutor进行了深入的学习和介绍,实际上我们在项目中应用的时候非常少有直接应用ThreadPoolExecutor来创建线程池的.在jdk的api中有这么一句话"可是,强烈建议程序猿使用较为方便的 Executors 工厂方法Executors.newCachedThreadPool()(无界线程池,能够进行自己主动线程回收).Executors.newFixedThreadPool(int)(固定大小线程池)和Executors.newSing

Java多线程——线程池

系统启动一个新线程的成本是比较高的,因为它涉及到与操作系统的交互.在这种情况下,使用线程池可以很好的提供性能,尤其是当程序中需要创建大量生存期很短暂的线程时,更应该考虑使用线程池. 与数据库连接池类似的是,线程池在系统启动时即创建大量空闲的线程,程序将一个Runnable对象传给线程池,线程池就会启动一条线程来执行该对象的run方法,当run方法执行结束后,该线程并不会死亡,而是再次返回线程池中成为空闲状态,等待执行下一个Runnable对象的run方法. 除此之外,使用线程池可以有效地控制系统

java多线程--线程池的使用

程序启动一个新线程的成本是很高的,因为涉及到要和操作系统进行交互,而使用线程池可以很好的提高性能,尤其是程序中当需要创建大量生存期很短的线程时,应该优先考虑使用线程池. 线程池的每一个线程执行完毕后,并不会死亡,会再次回到线程池中变成空闲状态,等待下一个对象来调用,类比于数据库连接池.JDK1.5以后,java内置线程池. JDK5新增了一个Executors工厂类来产生线程池,通过查文档我们发现,产生线程池很多方法,常用的有以下几个方法: public static ExecutorServi

java多线程 -- 线程池

第四种获取线程的方法:线程池,一个 ExecutorService,它使用可能的几个池线程之一执行每个提交的任务,通常使用 Executors 工厂方法配置. 线程池可以解决两个不同问题:由于减少了每个任务调用的开销,它们通常可以在执行大量异步任务时提供增强的性能,并且还可以提供绑定和管理资源(包括执行任务集时使用的线程)的方法.每个 ThreadPoolExecutor 还维护着一些基本的统计数据,如完成的任务数. 为了便于跨大量上下文使用,此类提供了很多可调整的参数和扩展钩子 (hook).

Java多线程-线程池ThreadPoolExecutor构造方法和规则

为什么用线程池 博客地址 http://blog.csdn.net/qq_25806863 原文地址 http://blog.csdn.net/qq_25806863/article/details/71126867 有时候,系统需要处理非常多的执行时间很短的请求,如果每一个请求都开启一个新线程的话,系统就要不断的进行线程的创建和销毁,有时花在创建和销毁线程上的时间会比线程真正执行的时间还长.而且当线程数量太多时,系统不一定能受得了. 使用线程池主要为了解决一下几个问题: 通过重用线程池中的线程

Java多线程-----线程池详解

1. 线程池的实现原理 提交一个任务到线程池中,线程池的处理流程如下: 判断线程池里的核心线程是否都在执行任务,如果不是(核心线程空闲或者还有核心线程没有被创建)则创建一个新的工作线程来执行任务.如果核心线程都在执行任务,则进入下个流程 线程池判断工作队列是否已满,如果工作队列没有满,则将新提交的任务存储在这个工作队列里.如果工作队列满了,则进入下个流程 判断线程池里的线程是否都处于工作状态,如果没有,则创建一个新的工作线程来执行任务.如果已经满了,则交给饱和策略来处理这个任务    2. 线程

java多线程(九)阻塞队列

转载请注明出处:http://blog.csdn.net/xingjiarong/article/details/48005091 前边的博客中我们介绍了如果用对象锁和条件锁以及更加方便的synchronized关键字来实现多线程的同步和互斥,也许你会觉得使用synchronized关键字已经非常方便了,但是使用者必须真正的理解synchronized的用法,而且要有一定的多线程的编程的经验,否则很难做到全面的考虑问题而造成意想不到的问题.其实在java中还有比synchronized关键字更加

JAVA多线程提高十二:阻塞队列应用

一.类相关属性 接口BlockingQueue<E>定义: public interface BlockingQueue<E> extends Queue<E> { boolean add(E e); boolean offer(E e); void put(E e) throws InterruptedException; boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedExcepti

Java多线程——线程池使用示例

示例代码: import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class ThreadPoolTest { public static void main(String[] args) { ExecutorService pool = Executors.newFixedThreadPool(2); String[] nameArr = new String[] {