支持内部晋升的无锁并发优先级线程池

支持内部晋升的无锁并发优先级线程池

[TOC]

引言

在技术群讨论到一个有意思的业务需求,可以描述为:

有一个内部按照优先级进行任务排序的线程池。线程池会优先执行高优先级的任务。随着时间的流逝,线程池内部低优先级的任务的优先级会逐渐晋升变为高优先级,以避免被不断新增的高优先级任务阻塞导致饿死。

考虑到 JDK 已经为开发者提供了自定义线程池ThreadPoolExecutor以及优先级队列PriorityBlockingQueue,两者相结合并且定期调整队列中低优先级任务的优先级再进行resort将低优先级的任务调整到队列的前头,也可以一定程度上避免被饿死。

这种方案的问题在于resort的消耗比较高,并且还需要重新计算每一个任务的优先级。为此,引出我们下面的设计,希望使用无锁并发的数据结构存储任务,并且任务支持自动的优先级晋升,保证低优先级的任务最终能够执行而不会被不断增加的高优先级任务饿死。

欢迎加入技术交流群186233599讨论交流,也欢迎关注笔者公众号:风火说。

推导过程

如何实现优先级晋升

声明一个数组,按照循环队列的方式使用。每一个数组槽位上都挂载一个任务列表。有一个当前指针指向数组中的某一个槽位,该槽位即为当前最高优先级任务插入的槽位。指针数字递增方向优先级依次降低。指针以某种方式沿递增方向移动,因为指针指向的槽位代表最高优先级,因此指针的移动实际上意味着所有槽位的优先级都晋升了。

那么这里的优先级只能是离散化的整型数字,并且优先级的范围为 0 到 数组长度减 1 。最高优先级为0。

用图形化的方式表达就是如下的情况

图中优先级的范围是[0,6],current指针指向的槽位即为最高优先级,current左侧槽位为最低优先级,current右侧槽位为次高优先级。每一个槽位上都挂载一个队列,队列中的任务的优先级都相同(后续算法中可以看到会有不同的优先级混合)。

每次取任务时总是从current指针指向的槽位的队列读取任务。当一定时间流逝后,current指针沿着右侧移动一位,此时意味着所有槽位的优先级都被晋升了,除了原本的current指向的槽位,它变为了最低优先级槽位。

由于current指针总是在移动,因此最终会移动到之前低优先级的槽位,此时该槽位下的任务就成了最高优先级任务,被读取执行。这样就避免了在运行过程不断有高优先级任务被加入导致原本的低优先级饿死的情况发生。

数据结构设计

根据上面的优先级晋升思路,显然应该有一个数组,其不同的槽位代表着不同的优先级。每一个槽位上挂载一个 MPMC 类型的队列,用于该优先级下任务的添加和读取。

使用一个当前指针,该指针指向的槽位为最高优先级槽位。

一个指针产生的问题

如果只有一个指针,意味着读取任务时,从该指针指向的槽位读取,因此此时指针指向的槽位是最高优先级。而插入任务的时候,需要根据当前指针进行计算。这种模式在优先级晋升时存在并发问题。

当指针从槽位1指向更新到槽位2。此时槽位1可能还存在部分剩余的任务,这部分任务的实际优先级应该是高于槽位2当中的。而如果在这个时候插入最低优先级的任务,可能就会插入到槽位1中。那么槽位1的任务队列实际就混合了最高优先级和最低优先级的任务,无法区分。

为了解决不同优先级任务在同一个队列中混合的问题,我们可以在指针移动时,将之前槽位的剩余移动到当前槽位的队列头。这实际上就意味着要求队列是出于双端队列模式。但是因为指针移动和任务移动无法原子化进行,还是会造成槽位1的队列中最高优先级任务和最低优先级任务混在一起的情况。

从实现效果而言,我们需要的是在指针移动的时候,保证槽位1中剩余的原本高优先级的任务执行完毕后才能去执行槽位2这个“原本的次高优先级,现在的最高优先级“的任务。从效果来看,并不需要一定移动任务,可以通过一种手段,保证槽位1中原本高优先级任务执行完毕后再去执行槽位2的任务即可。

基于这种考量,我们将一个指针拆分为两个:任务插入指针和任务读取指针。

任务插入指针和任务读取指针

基于并发读写的考虑,两个指针都是AtomicInteger类型。两个指针的作用分别为:

  • 任务插入指针:该指针指向的槽位为当前最高优先级槽位(后续会引入轮次这个概念,因此这里对当前加粗)。
  • 任务读取指针:从结构体中获取任务时使用该指针指向的槽位上获取任务队列进行任务读取。

任务插入指针和任务读取指针分离的好处在于,任务插入指针的移动意味着不同槽位优先级的实际晋升。而读取可以依照读取指针指向的槽位上的队列读取任务,直到对应优先级的任务读取完毕后再移动读取指针到下一个槽位。这样一来,保证了按照入列的顺序被公平的处理,也保证了同一个时间单位高优先级的任务优于低优先级任务被处理,也避免了单一指针移动需要的任务拷贝带来的不同优先级任务污染问题。

任务插入指针如何移动

插入指针可以按照两种策略移动:

  • 自然时间流逝移动,一定时间后移动。
  • 以读取次数为单位,一定次数后移动。

如果选择策略一,需要后台配置一个线程,按照固定时间移动插入指针;如果选择策略二,需要一个全局的AtomicInteger对象,用于次数判定。

如果选择方案一,可能会存在一种场景,往线程池中投入了大量的同一个优先级的任务,使得某个槽位上的队列长度很长。如果任务处理相对缓存,则任务插入指针可能会被移动多次。这种移动会使得槽位上队列有了很多不同优先级的任务。而读取任务时按照优先级逐步去处理,这使得产生了这么多不同的优先级实际上意义是不大的。

因此采用策略二会更加合适一些。

由于读取任务时是多线程的,因此策略二实现上需要注意的点包括:

  • AtomicInteger#incrementAndGet实现任务读取次数累加。如果返回的数字是阈值的倍数,则意味着可以移动任务插入指针。
  • 使用AtomicInteger#incrementAndGet来移动插入指针。

在这里对插入指针移动的并发考量在于,由于读取线程对读取计数使用AtomicInteger#incrementAndGet方式累加是必然成功,而返回数值是晋升阈值的倍数时必然需要实现插入指针的递增。因为递增的必然性,因此同样使用AtomicInteger#incrementAndGet方式来实现。

任务插入指针移动到同一位置导致的优先级任务混合问题

假定系统初始状态,插入和读取指针都指向了槽位1,在槽位1上插入了大量的任务。随着任务的读取,插入指针移动到了槽位2,此时该槽位上插入了一些任务。随着任务的读取,插入指针继续移动,移动过数组的长度后,再次指向了槽位2。假定此时读取指针仍然在槽位1,而如果这个时候插入插入任务。那么实际上槽位2队列中任务应该分为两种:前半部分是上一个轮次插入的任务,后半部分是当前刚插入的任务。

如果读取指针移动到槽位2,应该将前半部分任务执行完毕后就去执行槽位3上的任务,而不是将所有的任务都执行完。因此槽位3上的任务实际优先级应该高于槽位2队列中后半部分的任务。

基于上述情况,问题可以转化为依靠读取指针在读取任务时,如何识别当前队列中不是本轮次要处理的任务进而移动读取指针?

考虑到任务插入指针和任务读取指针本身是有值的,这个值单调递增,实际上可以看成是一种“顺序”概念的表达。因此任务的准备添加时,可以将插入指针的值加上任务的优先级,声明为任务的插入优先级。读取指针在读取任务时,只有当前任务的插入优先级等于读取指针的值,意味着该任务时本轮次读取指针应该要处理的任务。如果读取的任务的插入优先级与读取指针不等时,意味着当前队列不能再读取任务,应该移动读取指针。

通过任务本身的插入优先级避免了不同轮次的任务在一个队列中被混合导致的优先级混乱。

任务读取指针如何移动

上个章节提出任务的插入优先级,解决了不同轮次的任务在同一个队列可能会混合的问题。这个问题的解决引出了读取指针的移动策略:在读取到的任务的插入优先级与读取指针的值不等时意味着需要移动。

但是这里又产生了新的问题:并发移动读取指针的问题。在读取并发的情况下,会遇到一个问题:读取出来的任务的优先级不符合指针,此时要重新放回队列,但是重新放入,就可能和任务的插入混合,造成数据混乱。

有几种可能的解决方式:

  • 任务的读取采用Sync关键字修饰,如果读取任务不符合,则放回,并且移动指针。由于没有读取并发,但仍然可能因为读取的放回和新任务的添加造成数据混乱。
  • 采用分段机制,每一个分段是一个队列,分段和分段构成一个队列。一个分段内的优先级是固定的,因此当分段耗尽时,就是切换读取指针的时候。

策略一并不能彻底解决问题,在这里我们采用策略二的方案。

策略二的引入实际上改变了上面的一个数据结构,也即是数组存储的元素不再是一个任务队列,而是一个分段队列。而每一个分段内部又存储了任务队列,并且分段的队列的任务的插入优先级均是相同的。这意味着分段在创建的时候就具备了插入优先级这个值。分段和分段的插入优先级必然不同,这个结构就天然的支持了轮次的概念。

分段结构的引入导致了数据结构的变化,这实际上会改变任务插入和任务读取的流程。下文会再来细说具体的实现。分析到这里,读取指针的移动时机就很明白了,在分段内数据耗尽,就意味着某个具体插入优先级的任务都被读取完毕了。

当然,考虑到读写并发的原因,读取线程发现分段内数据耗尽并不意味着该插入优先级的任务全被读取了,后文会针对并发场景在处理流程上解决。

插入和读取并发

插入和读取可能在同一个槽位同一个分段上并发。分段的队列本身是支持MPMC的,这并没有问题。

可能会出现一种并发异常就是插入线程读取了插入指针的值,并且准备插入数据,但是因为线程调度的原因,失去了CPU资源,尚未完成数据插入。此时读取线程将槽位内的任务读取完毕后认为没有数据,则移动了读取指针到下一个槽位。在读取指针移动后,插入线程才完成数据的插入。这样导致本来应该是高优先级的任务变成最低优先级槽位上的任务。而当下一轮次读取指针再次指向该槽位时,读取指针获取的到任务的任务优先级又会和读取指针本身的数值冲突。

针对并发的异常场景,有一种常见的解决思路就是二次检查。也就是读取线程在移动任务读取指针后,再次检查下当前分段内是否出现了新的任务,如果有,则协助迁移到下一个槽位上;写入线程在放入任务后,检查是否读取指针移动过,如果有,则协助迁移到下一个槽位上。

然而,读取线程检查分段内的队列是否剩余,写入线程检查读取指针是否移动,这些状态都是在动态变化的,仍然会产生一些其他问题。双重检查一般会引入一个终止状态来来减少可能的变化场景。在这里,我们为分段引入状态:使用中和终止。一个分段初始化时是使用中状态,当读取线程认为该分段内的任务都被消耗后,则应该更新为终止状态。一旦分段进入终止状态,则被抛弃,不应该再有任务数据添加到该分段中。

通过分段状态,我们可以将任务区分为终止前添加到分段和终止后添加到分段两类。前者需要被正常读取,后者则需要迁移到其它合适的分段中再被处理。

到这里为止,我们针对数据结构和其元素属性的变化就完成了。

将数组通过循环队列的方式来表达不同的优先级。通过任务写指针的移动来实现内部任务优先级的晋升。通过读指针来实现任务严格按照优先级顺序被处理,且避免低优先级任务被高优先级任务饿死。数组的元素指向一个该槽位上插入优先级最低的分段。一同散列到同一个槽位上的分段按照插入优先级的顺序形成队列。

代码实现

整个代码当中,最为复杂的就是任务的插入和读取,下面分别来设计流程。

任务插入

上面推导过程分析了插入和读取并发可能导致的冲突场景。这里我们细化其解决流程。对于插入线程而言,要处理的情况包括有:

  • 元素对应槽位上没有分段。
  • 元素对应槽位上的分段的插入优先级和插入指针的值不相等。
  • 元素对应槽位上分段列表中插入优先级与插入指针相符的分段处于终止状态
  • 元素对应槽位上的分段插入优先级与插入指针相等,且处于使用状态。

可以看到,只有第四种情况任务可以在当前分段插入成功,且插入完毕后还需要再次检查分段的状态。基于这些考量,我们将插入流程设计为

可以看到,这个流程中没有处理槽位上没有分段的情况,这个在下一个章节我们会分析。

任务的读取

有了分段的存在,读取指针的移动判定更加复杂,读取线程可能碰到的场景有:

  • 读取指针散列的槽位上没有分段。
  • 读取指针散列的槽位上有分段且状态为使用,分段内没有任务。
  • 读取指针散列的槽位上有分段且状态为使用,分段内有任务。
  • 读取指针散列的槽位上有分段且状态为关闭,分段内没有任务。
  • 读取指针散列的槽位上有分段且状态为关闭,分段内有任务。

只有第三种情况可以读取任务并且进行处理。有了轮次这个概念,读取指针永远只会读取槽位上的第一个分段。如果槽位上没有分段,或者分段的插入优先级与读取指针不同,或者分段内没有任务,则可以考虑移动读取指针。注意,分段状态为关闭并不是读取指针移动的条件,原因下面会分析。

但是移动读取指针的时候首先需要考虑当前读取指针是否已经处于(写入指针的值+最低优先级数字),如果是的话,意味着已经处于边界,不应该在移动。

分段状态的更新只能由读取线程来进行。当读取线程发现该分段已经没有任务了,首先应该通过CAS的方式更新分段状态。CAS竞争成功的线程再次检查分段内是否出现了新的任务,如果出现的话,则提取任务,完成任务读取。为何不将任务移动到下一个槽位。因为下一个槽位上可能还没有分段,此时读取线程可能和写入线程竞争槽位上的分段写入。如果写入线程竞争成功,读取线程移动过去的任务数据的优先级就放到了错误的分段中;如果读取线程竞争成功,则读取线程创建的分段必须是第一个分段,否则任务还是移动到错误的地方。

解决这个问题最好的办法就是不解决。不移动任务,仍然在该分段上读取任务直到任务耗尽。然后再尝试移动读取指针。而对于写入线程而言,当其发现分段的状态变为终止后,是提取出任务重新执行完整的放入流程,不会有并发的问题。

再次梳理下没有任务情况下的流程,应该是通过CAS修改分段的状态。无论成功或失败,都可以继续检查队列是否有任务,如果有的话,则返回读取到的任务。如果没有的话,则CAS将读取指针+1。竞争成功的线程将当前分段的下一个分段设置给槽位,并且重新执行读取流程。竞争失败的线程则反复检查读取指针的值,发现变化后,重新执行读取流程。

这里有一个并发冲突需要考虑,当读取线程尝试将当前分段的下一个分段设置为槽位的值时,可能此时当前分段的下一个分段是null,而写入线程正在尝试为当前分段设置下一个分段。这种情况下可能导致下一个分段丢失。特别的,如果当前分段的下一个分段已经被设置,并且有任务被放入其中,丢失这个分段就意味着数据丢失。

为了避免这个情况,在当前分段的下一个分段为null时,就不能将下一个分段(属性值)设置给槽位。这使得在读取到分段时,需要首先检查分段的优先级,确认是否本轮次。如果是的话,再执行后续的流程。否则要么移动(该分段没有下一个分段),要么将该分段的下一个分段设置给槽位后,在移动。

从这个角度出发,我们可以在初始化的时候,将数组中的元素都填充一个分段。这样写入线程就不需要处理槽位上可能为空的场景了。

基于此,我们将读取任务的变化为:

  • 槽位上的分段优先级小于读取指针,且分段状态为终止。
  • 槽位上的分段优先级等于读取指针。
  • 槽位上的分段优先级大于读取指针。

第一种情况,如果该分段有下一个分段,CAS更新到槽位上;如果没有,则CAS移动读取指针。

第二种情况,按照上面分析的流程进行处理即可。

第三种情况,CAS移动读取指针。

综上,我们可以将读取流程设计为

包装为BlockQueue

在JDK提供的ThreadPoolExecutor类的构造方法中,需要传入BlockingQueue作为队列的接口。显然,上述的存储结构并不能支持BlockQueue,需要考虑包装。

显然,上面的存储结果在写入的时候并不会阻塞,因此只需要考虑如何包装读取数据不存在时的阻塞等待即可。

简单的方式就是在读取失败的获取锁,并且在队列空的condition对象执行等待;插入任务的时候执行唤醒。

效果展现

测试代码如下

首先添加一定量的高优先级任务,随后添加5个低优先级,最后通过CountLatch模拟在运行过程中添加高优先级任务。

如果单纯按照优先级排序,则需要所有高优先级任务输出完毕后才会输出低优先级任务,显然这是错误的。正确的实现应该是先输出第一批高优先级任务,再输出低优先级任务,最后输出第三批高优先级任务。运行代码,看到结果如下

与我们的预期相吻合。

来源:站长资讯

原文地址:https://www.cnblogs.com/1994jinnan/p/12178052.html

时间: 2024-10-10 12:20:45

支持内部晋升的无锁并发优先级线程池的相关文章

加锁并发算法 vs 无锁并发算法

Heinz Kabutz 在上周举办了一次成功 JCrete研讨会,我在会上参加了对一种新的 StampedLock(于JSR166中 引入) 进行的评审.StampedLock (邮戳锁) 旨在解决系统中共享资源的争用问题.在一个系统中,如果多个需要读写某一共享状态的程序并发访问这个共享对象时,争用问题就产生了.在设计 上,StampedLock 试图通过一种“乐观读取”的方式来减小系统开销,从而提供比 ReentrantReadWriteLock(重入读写锁) 更好的性能. 在评审过程中,我

17.并发编程--线程池

并发编程线程池 合理利用线程池能够带来三个好处. 第一:降低资源消耗.通过重复利用已创建的线程降低线程创建和销毁造成的消耗. 第二:提高响应速度.当任务到达时,任务可以不需要的等到线程创建就能立即执行. 第三:提高线程的可管理性.线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控.但是要做到合理的利用线程池,必须对其原理了如指掌. 1. Executor 框架简介 在 Java 5 之后,并发编程引入了一堆新的启动.调度和管理 线

优先级线程池实现

运维在升级,无聊写博客 最近在实现消息通知平台上面,对于针对不同的通知需要设置优先级,实现当通知队列堵塞的时候可以有限推送高优先级的消息.为了保证通知队列的高效并发,通知队列的消费端是采用多线程并发处理的,因此需要实现一个可以实现优先级的多线程处理逻辑: 对于ThreadPollExecutor来说, public PriorityThreadPoolExecutor(int corePoolSize, int maximumPoolSize,long keepAliveTime, TimeUn

java并发与线程池应用

一.创建线程池 Executors类,提供了一系列工厂方法用于创先线程池,返回的线程池都实现了ExecutorService接口,Executor的实现还提供了对生命周期的支持,以及统计信息收集,应用程序管理机制和性能监视等机制.Executor基于生产者---消费者模式,提交任务的操作相当于生产者,执行任务的线程相当于消费者.如果要使用Executor,必须将任务表述为一个Runnable. (1) public static ExecutorService newFixedThreadPoo

Java并发编程——线程池的使用

在前面的文章中,我们使用线程的时候就去创建一个线程,这样实现起来非常简便,但是就会有一个问题: 如果并发的线程数量很多,并且每个线程都是执行一个时间很短的任务就结束了,这样频繁创建线程就会大大降低系统的效率,因为频繁创建线程和销毁线程需要时间. 那么有没有一种办法使得线程可以复用,就是执行完一个任务,并不被销毁,而是可以继续执行其他的任务? 在Java中可以通过线程池来达到这样的效果.今天我们就来详细讲解一下Java的线程池,首先我们从最核心的ThreadPoolExecutor类中的方法讲起,

java并发编程-线程池的使用

参考文章:http://www.cnblogs.com/dolphin0520/p/3932921.html 深入剖析线程池实现原理 将从下面几个方面讲解: 1.线程池状态 2.任务的执行 3.线程池中的线程初始化 4.任务缓存队列及排队策略 5.任务拒绝策略 6.线程池的关闭 7.线程池容量的动态调整 1.线程池状态 在ThreadPoolExcutor中定义了一个volatile变量,另外定义了几个static final变量表示线程池的各个状态: volatile int runState

Java并发编程——线程池

一.Java中的ThreadPoolExecutor类 二.深入剖析线程池实现原理 三.使用示例 四.如何合理配置线程池的大小 一.Java中的ThreadPoolExecutor类 java.uitl.concurrent.ThreadPoolExecutor类是线程池中最核心的一个类,因此如果要透彻地了解Java中的线程池,必须先了解这个类.下面我们来看一下ThreadPoolExecutor类的具体实现源码. 在ThreadPoolExecutor类中提供了四个构造方法: public c

java之并发编程线程池的学习

如果并发的线程数量很多,并且每个线程都是执行一个时间很短的任务就结束了,这样频繁创建线程就会大大降低系统的效率,因为频繁创建线程和销毁线程需要时间. java.uitl.concurrent.ThreadPoolExecutor类是线程池中最核心的一个类. corePoolSize在很多地方被翻译成核心池大小,其实我的理解这个就是线程池的大小.举个简单的例子: 假如有一个工厂,工厂里面有10个工人,每个工人同时只能做一件任务. 因此只要当10个工人中有工人是空闲的,来了任务就分配给空闲的工人做:

java并发:线程池、饱和策略、定制、扩展

一.序言 当我们需要使用线程的时候,我们可以随时新建一个线程,这样实现起来非常简便,但在某些场景下存在缺陷:如果需要同时执行多个任务(即并发的线程数量很多),频繁地创建线程会降低系统的效率,因为创建和销毁线程均需要一定的时间.线程池可以使线程得到复用,所谓线程复用就是线程在执行完一个任务后并不被销毁,该线程可以继续执行其他的任务. 二.Executors提供的线程池 Executors是线程的工厂类,也可以说是一个线程池工具类,Executors提供的线程都是通过参数设置来实现不同的线程池机制.