阻塞队列模型介绍
阻塞队列模型和线程池息息相关,因此本篇博客先介绍阻塞队列的相关知识。如下图所示:
首先我们来说,什么是Queue,然后在谈什么是BlockingQueue。
那么什么是Queue呢?一句话,就是一端进,另一端出,这样就形成了First In , First Out,即先进先出。而BlockingQueue只不过是在Queue的基础上进行了2个附加操作而已:如果Queue空,那么Out线程阻塞,如果Queue满,那么In线程阻塞。
理解了上面的Queue/BlockingQueue,那么就好理解Deque/BlockingDeque了。
Deque,就是双端队列,其实就是说在2端,都可以进行IN/OUT,当然如果我们只在同一端进行IN/OUT,那么自然形成了栈结构(先进后出)。
其次,我们先来看看java.util.concurrent.BlockingQueue的API列表:
其实从这里可以看出JAVA API的一个思路,往往一个操作提供多种选择:
如果队列是空的,那么消费线程该如何处理呢?
可以立刻抛出异常,也可以返回false/NULL,也可以过一段时间在TRY...
好了,到这里,我们只看到了接口API,如果要你来实现,你会怎么做呢?又会有哪些疑问呢?
思考:
是否应该提供一个存储,来放置队列中的元素呢?
这个存储应该多大呢?可以是什么形式呢?
队列中的元素是否存在优先级排序呢?
对队列中的元素进行操作时,是否应该有锁的控制呢?
一端IN,另一端OUT,这2个操作可以同时进行吗?
带着这些疑问,我们来对典型的BlockingQueue来进行分析。
ArrayBlockingQueue PK LinkedBlockingQueue
存储PK:
从名称上就可以知道,一个用的是数组,另一个用的是链表。看看源码验证下:
ArrayBlockingQueue:
LinkedBlockingQueue:
容量PK:
ArrayBlockingQueue可以通过构造方法指定容量:
而LinkedBlockingQueue如果在初始化时不指定容量,那么将是Integer.MAX_VALUE,这一点很重要,特别是在生产者的速度大于消费者的速度,由于此时无容量限制,将导致队列中的元素开始膨胀,那么将消耗掉大量系统资源。
锁PK:
在ArrayBlockingQueue中只有一个锁:
而在LinkedBlockingQueue中有2个锁:
其实到这里,我们已经可以大致猜测出,LinkedBlockingQueue对于take/put使用了分别的锁,从而比ArrayBlockingQueue在高并发下更具优势。
我们再来看看其他BlockingQueue:
DelayQueue:延迟队列,实际上是说,队列中的元素生效的话,有个时间差。
PriorityQueue:优先级队列,会提供Comparator来进行队列中的元素的排序。
SynchronousQueue:这个队列比较特殊,因为没有存储机制,实际上只是做了一个生产者和消费者的传递机制。
线程池介绍
如果任务到达时,才开始创建线程,这实际上会让任务的执行被延迟,于是产生了线程池的概念,如果在池子中已经存在了一批线程,那么任务到达时自然省去了线程创建的时间,相当于提高了响应速度。其次,如果线程执行完任务后,在放入池子中,这相当于在复用线程,达到了资源节约的目的。当然,如果任务的执行时间是远远大于线程的创建/销毁时间,其实就无所谓了。
快速创建线程池:Executors
Executors提供了一系列的快速创建线程池的方法,比如:
创建数量固定的/单个的/缓存的 线程池。
可以看到线程池的创建利用到了上面提及的BlockingQueue,队列中的元素就是任务Runnable。
方法返回的都是ExecutorService的实现类:ThreadPoolExecutor。
线程池的核心:ThreadPoolExecutor
我们直接来看看ThreadPoolExecutor的构造方法:
理解这些参数,对于理解线程池的原理有很大帮助:
corePoolSize:线程池的核心线程数量,是线程数目的一个稳定峰值。
maximumPoolSize:线程池的最大线程数量,如果corePoolSize依旧满足不了需要,那么可以让线程增长至maximumPoolSize,一旦需要下降,那么超出核心线程的那一部分线程资源将被回收。
workQueue:这个队列是待处理的任务队列。实际上,在ThreadPoolExecutor中除此之外还存在一个正在处理的工作队列workers:
keepAliveTime:超过核心线程数,又小于最大线程数目的线程在空闲的情况下,多久回收。
threadFactory:线程工厂,实际上用的是默认的DefaultThreadFactory,通过代码发现仅仅是针对Thread做了些设置(比如线程组/线程名称/后台/优先级等设置),将Runnable挂到Thread上而已。
handler:如果已经达到了最大线程数目,那么对于任务只能开始拒绝了,这个就是拒绝处理的策略类。