并发编程(十四)—— ScheduledThreadPoolExecutor 实现原理与源码深度解析 之 DelayedWorkQueue

我们知道线程池运行时,会不断从任务队列中获取任务,然后执行任务。如果我们想实现延时或者定时执行任务,重要一点就是任务队列会根据任务延时时间的不同进行排序,延时时间越短地就排在队列的前面,先被获取执行。

队列是先进先出的数据结构,就是先进入队列的数据,先被获取。但是有一种特殊的队列叫做优先级队列,它会对插入的数据进行优先级排序,保证优先级越高的数据首先被获取,与数据的插入顺序无关。

实现优先级队列高效常用的一种方式就是使用堆。

什么是堆?

堆通常是一个可以被看做一棵树的数组对象。

堆(heap)又被为优先队列(priority queue)。尽管名为优先队列,但堆并不是队列。

因为队列中允许的操作是先进先出(FIFO),在队尾插入元素,在队头取出元素。

而堆虽然在堆底插入元素,在堆顶取出元素,但是堆中元素的排列不是按照到来的先后顺序,而是按照一定的优先顺序排列的。

这里来说明一下满二叉树的概念与完全二叉树的概念。

满二叉树

  除了叶子节点,所有的节点的左右孩子都不为空,就是一棵满二叉树,如下图。

可以看出:满二叉树所有的节点都拥有左孩子,又拥有右孩子。

完全二叉树

  不一定是一个满二叉树,但它不满的那部分一定在右下侧,如下图

堆总是满足下列性质:

  • 堆中某个节点的值总是不大于或不小于其父节点的值;
  • 堆总是一棵完全二叉树。
  • 最大值时,称为“最大堆”,也称大顶堆;
  • 最小值时,称为“最小堆”,也称小顶堆。

堆的实现

堆是一个二叉树,但是它最简单的方式是通过数组去实现二叉树,而且因为堆是一个完全二叉树,就不存在数组空间的浪费。怎么使用数组来存储二叉树呢?

就是用数组的下标来模拟二叉树的各个节点,比如说根节点就是0,第一层的左节点是1,右节点是2。由此我们可以得出下列公式:

1 // 对于n位置的节点来说:
2 int left = 2 * n + 1; // 左子节点
3 int right = 2 * n + 2; // 右子节点
4 int parent = (n - 1) / 2; // 父节点,当然n要大于0,根节点是没有父节点的

对于堆来说,只有两个操作,插入insert和删除remove,不管插入还是删除保证堆的成立条件,1.是完全二叉树,2.父节点的值不能小于子节点的值。

最大堆的插入(ADD)

 1 public void insert(int value) {
 2      // 第一步将插入的值,直接放在最后一个位置。并将长度加一
 3      store[size++] = value;
 4      // 得到新插入值所在位置。
 5      int index = size - 1;
 6      while(index > 0) {
 7          // 它的父节点位置坐标
 8          int parentIndex = (index - 1) / 2;
 9          // 如果父节点的值小于子节点的值,你不满足堆的条件,那么就交换值
10          if (store[index] > store[parentIndex]) {
11              swap(store, index, parentIndex);
12              index = parentIndex;
13          } else {
14              // 否则表示这条路径上的值已经满足降序,跳出循环
15              break;
16          }
17      }
18 }

主要步骤:

  • 直接将value插入到size位置,并将size自增,这样store数组中插入一个值了。
  • 要保证从这个叶节点到根节点这条路径上的节点,满足父节点的值不能小于子节点。
  • 通过int parentIndex = (index - 1) / 2得到父节点,如果比父节点值大,那么两者位置的值交换,然后再拿这个父节点和它的父父节点比较。

    直到这个节点值比父节点值小,或者这个节点已经是根节点就退出循环。

因为每次循环index都是除以2这种倍数递减的方式,所以它最多循环次数是(log N)次。

最大堆的删除(DELETE)

 1 public int remove() {
 2       // 将根的值记录,最后返回
 3       int result = store[0];
 4       // 将最后位置的值放到根节点位置
 5       store[0] = store[--size];
 6       int index = 0;
 7       // 通过循环,保证父节点的值不能小于子节点。
 8       while(true) {
 9           int leftIndex = 2 * index + 1; // 左子节点
10           int rightIndex = 2 * index + 2; // 右子节点
11           // leftIndex >= size 表示这个子节点还没有值。
12           if (leftIndex >= size) break;
13           int maxIndex = leftIndex;
14           //找到左右节点中较大的一个节点
15           if (store[leftIndex] < store[rightIndex]) maxIndex = rightIndex;
16           //与子节点中较大的子节点比较,如果子节点更大,则交换位置
17           //为什么要与较大的子节点比较呢?如果和较小的节点比较,没有交换位置,但有可能比较大的节点小
18           if (store[index] < store[maxIndex]) {
19               swap(store, index, maxIndex);
20               index = maxIndex;
21           } else {
22               //满足子节点比当前节点小,退出循环
23               break;
24           }
25       }
26       //返回最开始的第一个值
27       return result;
28 }

在堆中最大值就在根节点,所以操作步骤:

  1. 将根节点的值保存到result中。
  2. 将最后节点的值移动到根节点,再将长度减一,这样满足堆成立第一个条件,堆是一个完全二叉树。
  3. 使用循环,来满足堆成立的第二个条件,父节点的值不能小于子节点的值。
  4. 最后返回result。

每次循环我们都是以2的倍数递增,所以它也是最多循环次数是(log N)次。

所以通过堆这种方式可以快速实现优先级队列,它的插入和删除操作的效率都是O(log N)。

那么怎么实现堆排序?这个很简单,利用优先队列的特性:

  1. 先遍历数组。将数组中的值依次插入到堆中。
  2. 然后再用一个循环将值从堆中取出来。
 1 private static void headSort(int[] arr) {
 2       int size = arr.length;
 3       Head head = new Head(size);
 4       for (int i = 0; i < size; i++) {
 5           head.insert(arr[i]);
 6       }
 7       for (int i = 0; i < size; i++) {
 8           //  实现从大到小的排序
 9           arr[size - 1 - i] = head.remove();
10       }
11 }

堆排序的效率:因为每次插入数据效率是O(log N),而我们需要进行n次循环,将数组中每个值插入到堆中,所以它的执行时间是O(N * log N)级。

DelayedWorkQueue类

1 static class DelayedWorkQueue extends AbstractQueue<Runnable>
2         implements BlockingQueue<Runnable> {

从定义中看出DelayedWorkQueue是一个阻塞队列。并且DelayedWorkQueue是一个最小堆,最顶点的值最小,即堆中某个节点的值总是不小于其父节点的值。

属性

 1 // 初始时,数组长度大小。
 2 private static final int INITIAL_CAPACITY = 16;
 3 // 使用数组来储存队列中的元素。
 4 private RunnableScheduledFuture<?>[] queue =
 5     new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
 6 // 使用lock来保证多线程并发安全问题。
 7 private final ReentrantLock lock = new ReentrantLock();
 8 // 队列中储存元素的大小
 9 private int size = 0;
10
11 //特指队列头任务所在线程
12 private Thread leader = null;
13
14 // 当队列头的任务延时时间到了,或者有新的任务变成队列头时,用来唤醒等待线程
15 private final Condition available = lock.newCondition();

DelayedWorkQueue是用数组来储存队列中的元素,那么我们看看它是怎么实现优先级队列的。

插入元素方法

 1 public void put(Runnable e) {
 2     offer(e);
 3 }
 4
 5 public boolean add(Runnable e) {
 6     return offer(e);
 7 }
 8
 9 public boolean offer(Runnable e, long timeout, TimeUnit unit) {
10     return offer(e);
11 }

我们发现与普通阻塞队列相比,这三个添加方法都是调用offer方法。那是因为它没有队列已满的条件,也就是说可以不断地向DelayedWorkQueue添加元素,当元素个数超过数组长度时,会进行数组扩容。

 1 public boolean offer(Runnable x) {
 2     if (x == null)
 3         throw new NullPointerException();
 4     RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
 5     // 使用lock保证并发操作安全
 6     final ReentrantLock lock = this.lock;
 7     lock.lock();
 8     try {
 9         int i = size;
10         // 如果要超过数组长度,就要进行数组扩容
11         if (i >= queue.length)
12             // 数组扩容
13             grow();
14         // 将队列中元素个数加一
15         size = i + 1;
16         // 如果是第一个元素,那么就不需要排序,直接赋值就行了
17         if (i == 0) {
18             queue[0] = e;
19             setIndex(e, 0);
20         } else {
21             // 调用siftUp方法,使插入的元素变得有序。
22             siftUp(i, e);
23         }
24         // 表示新插入的元素是队列头,更换了队列头,
25         // 那么就要唤醒正在等待获取任务的线程。
26         if (queue[0] == e) {
27             leader = null;
28             // 唤醒正在等待等待获取任务的线程
29             available.signal();
30         }
31     } finally {
32         lock.unlock();
33     }
34     return true;
35 }

数组扩容方法:

1 private void grow() {
2     int oldCapacity = queue.length;
3     // 每次扩容增加原来数组的一半数量。
4     int newCapacity = oldCapacity + (oldCapacity >> 1); // grow 50%
5     if (newCapacity < 0) // overflow
6         newCapacity = Integer.MAX_VALUE;
7     // 使用Arrays.copyOf来复制一个新数组
8     queue = Arrays.copyOf(queue, newCapacity);
9 }

插入元素排序siftUp方法:

 1 private void siftUp(int k, RunnableScheduledFuture<?> key) {
 2     // 当k==0时,就到了堆二叉树的根节点了,跳出循环
 3     while (k > 0) {
 4         // 父节点位置坐标, 相当于(k - 1) / 2
 5         int parent = (k - 1) >>> 1;
 6         // 获取父节点位置元素
 7         RunnableScheduledFuture<?> e = queue[parent];
 8         // 如果key元素大于父节点位置元素,满足条件,那么跳出循环
 9         // 因为是从小到大排序的。
10         if (key.compareTo(e) >= 0)
11             break;
12         // 否则就将父节点元素存放到k位置
13         queue[k] = e;
14         // 这个只有当元素是ScheduledFutureTask对象实例才有用,用来快速取消任务。
15         setIndex(e, k);
16         // 重新赋值k,寻找元素key应该插入到堆二叉树的那个节点
17         k = parent;
18     }
19     // 循环结束,k就是元素key应该插入的节点位置
20     queue[k] = key;
21     setIndex(key, k);
22 }

主要是三步:

  • 元素个数超过数组长度,就会调用grow()方法,进行数组扩容。
  • 将新元素e添加到优先级队列中对应的位置,通过siftUp方法,保证按照元素的优先级排序。
  • 如果新插入的元素是队列头,即更换了队列头,那么就要唤醒正在等待获取任务的线程。这些线程可能是因为原队列头元素的延时时间没到,而等待的。

我们来看看动画

假设现有元素 5 需要插入,为了维持完全二叉树的特性,新插入的元素一定是放在结点 6 的右子树;同时为了满足任一结点的值要小于左右子树的值这一特性,新插入的元素要和其父结点作比较,如果比父结点小,就要把父结点拉下来顶替当前结点的位置,自己则依次不断向上寻找,找到比自己大的父结点就拉下来,直到没有符合条件的值为止。

动画讲解:

  1. 在这里先将元素 5 插入到末尾,即放在结点 6 的右子树。
  2. 然后与父类比较, 6 > 5 ,父类数字大于子类数字,子类与父类交换。
  3. 重复此操作,直到不发生替换。

立即获取队列头元素

 1 public RunnableScheduledFuture<?> poll() {
 2     final ReentrantLock lock = this.lock;
 3     lock.lock();
 4     try {
 5         RunnableScheduledFuture<?> first = queue[0];
 6         // 队列头任务是null,或者任务延时时间没有到,都返回null
 7         if (first == null || first.getDelay(NANOSECONDS) > 0)
 8             return null;
 9         else
10             // 移除队列头元素
11             return finishPoll(first);
12     } finally {
13         lock.unlock();
14     }
15 }

当队列头任务是null,或者任务延时时间没有到,表示这个任务还不能返回,因此直接返回null。否则调用finishPoll方法,移除队列头元素并返回。

 1 // 移除队列头元素
 2 private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) {
 3     // 将队列中元素个数减一
 4     int s = --size;
 5     // 获取队列末尾元素x
 6     RunnableScheduledFuture<?> x = queue[s];
 7     // 原队列末尾元素设置为null
 8     queue[s] = null;
 9     if (s != 0)
10         // 将队列最后一个元素移动到对列头元素位置,然后向下排序
11         // 因为移除了队列头元素,所以进行重新排序。
12         siftDown(0, x);
13     setIndex(f, -1);
14     return f;
15 }

这个方法与我们在第一节中,介绍堆的删除方法一样。

  1. 先将队列中元素个数减一。
  2. 将原队列末尾元素设置成队列头元素,再将队列末尾元素设置为null。
  3. 调用siftDown(0, x)方法,保证按照元素的优先级排序。

移除元素排序siftDown方法:

 1 private void siftDown(int k, RunnableScheduledFuture<?> key) {
 2     int half = size >>> 1;
 3     // 通过循环,保证父节点的值不能大于子节点。
 4     while (k < half) {
 5         // 左子节点, 相当于 (k * 2) + 1
 6         int child = (k << 1) + 1;
 7         // 左子节点位置元素
 8         RunnableScheduledFuture<?> c = queue[child];
 9         // 右子节点, 相当于 (k * 2) + 2
10         int right = child + 1;
11         // 如果左子节点元素值大于右子节点元素值,那么右子节点才是较小值的子节点。
12         // 就要将c与child值重新赋值
13         if (right < size && c.compareTo(queue[right]) > 0)
14             c = queue[child = right];
15         // 如果父节点元素值小于较小的子节点元素值,那么就跳出循环
16         if (key.compareTo(c) <= 0)
17             break;
18         // 否则,父节点元素就要和子节点进行交换
19         queue[k] = c;
20         setIndex(c, k);
21         k = child;
22     }
23     // 循环结束,k就是元素key应该插入的节点位置
24     queue[k] = key;
25     setIndex(key, k);
26 }

我们来看看动画

核心点:将最后一个元素填充到堆顶,然后不断的下沉这个元素。

假设要从节点 1 ,也可以称为取出节点 1 ,为了维持完全二叉树的特性 ,我们将最后一个元素 6 去替代这个 1 ;然后比较 1 和其子树的大小关系,如果比左右子树大(如果存在的话),就要从左右子树中找一个较小的值替换它,而它能自己就要跑到对应子树的位置,再次循环这种操作,直到没有子树比它小。

通过这样的操作,堆依然是堆,总结一下:

  • 找到要删除的节点(取出的节点)在数组中的位置
  • 用数组中最后一个元素替代这个位置的元素
  • 当前位置和其左右子树比较,保证符合最小堆的节点间规则
  • 删除最后一个元素

等待获取队列头元素

 1 public RunnableScheduledFuture<?> take() throws InterruptedException {
 2     final ReentrantLock lock = this.lock;
 3     lock.lockInterruptibly();
 4     try {
 5         for (;;) {
 6             RunnableScheduledFuture<?> first = queue[0];
 7             // 如果没有任务,就让线程在available条件下等待。
 8             if (first == null)
 9                 available.await();
10             else {
11                 // 获取任务的剩余延时时间
12                 long delay = first.getDelay(NANOSECONDS);
13                 // 如果延时时间到了,就返回这个任务,用来执行。
14                 if (delay <= 0)
15                     return finishPoll(first);
16                 // 将first设置为null,当线程等待时,不持有first的引用
17                 first = null; // don‘t retain ref while waiting
18
19                 // 如果还是原来那个等待队列头任务的线程,
20                 // 说明队列头任务的延时时间还没有到,继续等待。
21                 if (leader != null)
22                     available.await();
23                 else {
24                     // 记录一下当前等待队列头任务的线程
25                     Thread thisThread = Thread.currentThread();
26                     leader = thisThread;
27                     try {
28                         // 当任务的延时时间到了时,能够自动超时唤醒。
29                         available.awaitNanos(delay);
30                     } finally {
31                         if (leader == thisThread)
32                             leader = null;
33                     }
34                 }
35             }
36         }
37     } finally {
38         if (leader == null && queue[0] != null)
39             // 唤醒等待任务的线程
40             available.signal();
41         lock.unlock();
42     }
43 }

如果队列中没有任务,那么就让当前线程在available条件下等待。如果队列头任务的剩余延时时间delay大于0,那么就让当前线程在available条件下等待delay时间。

超时等待获取队列头元素

 1 public RunnableScheduledFuture<?> poll(long timeout, TimeUnit unit)
 2     throws InterruptedException {
 3     long nanos = unit.toNanos(timeout);
 4     final ReentrantLock lock = this.lock;
 5     lock.lockInterruptibly();
 6     try {
 7         for (;;) {
 8             RunnableScheduledFuture<?> first = queue[0];
 9             // 如果没有任务。
10             if (first == null) {
11                 // 超时时间已到,那么就直接返回null
12                 if (nanos <= 0)
13                     return null;
14                 else
15                     // 否则就让线程在available条件下等待nanos时间
16                     nanos = available.awaitNanos(nanos);
17             } else {
18                 // 获取任务的剩余延时时间
19                 long delay = first.getDelay(NANOSECONDS);
20                 // 如果延时时间到了,就返回这个任务,用来执行。
21                 if (delay <= 0)
22                     return finishPoll(first);
23                 // 如果超时时间已到,那么就直接返回null
24                 if (nanos <= 0)
25                     return null;
26                 // 将first设置为null,当线程等待时,不持有first的引用
27                 first = null; // don‘t retain ref while waiting
28                 // 如果超时时间小于任务的剩余延时时间,那么就有可能获取不到任务。
29                 // 在这里让线程等待超时时间nanos
30                 if (nanos < delay || leader != null)
31                     nanos = available.awaitNanos(nanos);
32                 else {
33                     Thread thisThread = Thread.currentThread();
34                     leader = thisThread;
35                     try {
36                         // 当任务的延时时间到了时,能够自动超时唤醒。
37                         long timeLeft = available.awaitNanos(delay);
38                         // 计算剩余的超时时间
39                         nanos -= delay - timeLeft;
40                     } finally {
41                         if (leader == thisThread)
42                             leader = null;
43                     }
44                 }
45             }
46         }
47     } finally {
48         if (leader == null && queue[0] != null)
49             // 唤醒等待任务的线程
50             available.signal();
51         lock.unlock();
52     }
53 }

与take方法相比较,就要考虑设置的超时时间,如果超时时间到了,还没有获取到有用任务,那么就返回null。其他的与take方法中逻辑一样。

推荐博客

  https://www.cnblogs.com/chen-haozi/p/10227797.html

总结

使用优先级队列DelayedWorkQueue,保证添加到队列中的任务,会按照任务的延时时间进行排序,延时时间少的任务首先被获取。

原文地址:https://www.cnblogs.com/java-chen-hao/p/10275910.html

时间: 2024-10-08 01:43:44

并发编程(十四)—— ScheduledThreadPoolExecutor 实现原理与源码深度解析 之 DelayedWorkQueue的相关文章

并发编程(四):ThreadLocal从源码分析总结到内存泄漏

一.目录 1.ThreadLocal是什么?有什么用? 2.ThreadLocal源码简要总结? 3.ThreadLocal为什么会导致内存泄漏? 二.ThreadLocal是什么?有什么用? 引入话题:在并发条件下,如何正确获得共享数据?举例:假设有多个用户需要获取用户信息,一个线程对应一个用户.在mybatis中,session用于操作数据库,那么设置.获取操作分别是session.set().session.get(),如何保证每个线程都能正确操作达到想要的结果? /** * 回顾sync

JVM CPU Profiler技术原理及源码深度解析

研发人员在遇到线上报警或需要优化系统性能时,常常需要分析程序运行行为和性能瓶颈.Profiling技术是一种在应用运行时收集程序相关信息的动态分析手段,常用的JVM Profiler可以从多个方面对程序进行动态分析,如CPU.Memory.Thread.Classes.GC等,其中CPU Profiling的应用最为广泛.CPU Profiling经常被用于分析代码的执行热点,如“哪个方法占用CPU的执行时间最长”.“每个方法占用CPU的比例是多少”等等,通过CPU Profiling得到上述相

第66讲:Scala并发编程实战初体验及其在Spark源码中的应用解析

王家林亲授<DT大数据梦工厂>大数据实战视频“Scala深入浅出实战经典”视频.音频和PPT下载!第66讲:Scala并发编程实战初体验及其在Spark源码中的应用解析百度云:http://pan.baidu.com/s/1pJ5jzHx腾讯微云:http://url.cn/aSawrm360云盘:http://yunpan.cn/cctL3QYACaVNa  访问密码 c0fb 信息来源于 DT大数据梦工厂微信公众账号:DT_Spark

spring5 源码深度解析----- 被面试官给虐懵了,竟然是因为我不懂@Configuration配置类及@Bean的原理

@Configuration注解提供了全新的bean创建方式.最初spring通过xml配置文件初始化bean并完成依赖注入工作.从spring3.0开始,在spring framework模块中提供了这个注解,搭配@Bean等注解,可以完全不依赖xml配置,在运行时完成bean的创建和初始化工作.例如: public interface IBean { } public class AppBean implements IBean{ } //@Configuration申明了AppConfig

【Java并发编程】21、线程池ThreadPoolExecutor源码解析

一.前言 JUC这部分还有线程池这一块没有分析,需要抓紧时间分析,下面开始ThreadPoolExecutor,其是线程池的基础,分析完了这个类会简化之后的分析,线程池可以解决两个不同问题:由于减少了每个任务调用的开销,它们通常可以在执行大量异步任务时提供增强的性能,并且还可以提供绑定和管理资源(包括执行任务集时使用的线程)的方法.下面开始分析. 二.ThreadPoolExecutor数据结构 在ThreadPoolExecutor的内部,主要由BlockingQueue和AbstractQu

Java并发编程与技术内幕:CopyOnWriteArrayList、CopyOnWriteArraySet源码解析

林炳文Evankaka原创作品.转载请注明出处http://blog.csdn.net/evankaka 摘要:本文主要讲了Java中CopyOnWriteArrayList .CopyOnWriteArraySet的源码分析 一.CopyOnWriteArrayList源码分析 CopyOnWriteArrayList在java的并发场景中用得其实并不是非常多,因为它并不能完全保证读取数据的正确性.其主要有以下的一些特点:1.适合场景读多写少2.不能保证读取数据一定是正确 的,因为get时是不

Scala并发编程实战初体验及其在Spark源码中的应用解析之Scala学习笔记-56

package com.leegh.actor import scala.actors.Actor /** * @author Guohui Li */object First_Actor extends Actor { def act() { for (i <- 1 to 10) { println("Step : " + i) println(Thread.currentThread().getName) Thread.sleep(2000) } }} object Seco

Spark进阶视频之Scala并发编程实战初体验及其在Spark源码中的应用解析

王家林亲授<DT大数据梦工厂>大数据实战视频"Scala深入浅出实战经典"视频.音频和PPT下载! 欢迎广大Spark爱好者学习交流.也欢迎广大学习爱好者加入DT大数据梦工厂交流群:462923555DT大数据微信公众账号:DT_Spark 视频观看链接http://www.tudou.com/plcover/Yy5F5gsurSE/ 视频下载地址百度云:http://pan.baidu.com/s/1eQGqzEa腾讯微云: http://url.cn/SshT6b

java并发编程的艺术(三)---lock源码

jdk1.5以后,并发包中新增了lock接口, 它相对于synchronized,多了以下三个主要特性:尝试非阻塞地获取锁(尝试获取锁成功则持有).能被中断地获取锁(锁的进程能响应中断).超时获取锁(指定时间截止之前获取锁). 我们看看它接口中定义的api: 获取锁 可中断地获取锁 尝试非阻塞地获取锁,能够获取则返回true,否则false 超时获取锁,三种返回情况:1.当前线程在超时时间内获得了锁.2.当前线程在超时时间内被中断.3.超时时间内没获得锁 释放锁 获取等待通知组件,该组件和当前的