个推基于 Apache Pulsar 的优先级队列方案

作者:个推平台研发工程师 祥子

一、业务背景

在个推的推送场景中,消息队列在整个系统中占有非常重要的位置。

当 APP 有推送需求的时候, 会向个推发送一条推送命令,接到推送需求后,我们会把APP要求推送消息的用户放入下发队列中,进行消息下发;当同时有多个APP进行消息下发时,难免会出现资源竞争的情况, 因此就产生了优先级队列的需求,在下发资源固定的情况下, 高优先级的用户需要有更多的下发资源。

二、基于 Kafka 的优先级队列方案

针对以上场景,个推基于 Kafka 设计了第一版的优先级队列方案。Kafka 是 LinkedIn 开发的一个高性能、分布式消息系统;Kafka 在个推有非常广泛的应用,如日志收集、在线和离线消息分发等。

架构

在该方案中,个推将优先级统一设定为高、中、低三个级别。具体操作方案如下:

  1. 对某个优先级根据 task (单次推送任务)维度,存入不同的 Topic,一个 task 只写入一个 Topic,一个 Topic 可存多个 task;
  2. 消费模块根据优先级配额(如 6:3:1),获取不同优先级的消息数,同一优先级轮询获取消息;这样既保证了高优先级用户可以更快地发送消息,又避免了低优先级用户出现没有下发的情况。

Kafka 方案遇到的问题

随着个推业务的不断发展,接入的 APP 数量逐渐增多,第一版的优先级方案也逐渐暴露出一些问题:

  1. 当相同优先级的 APP 在同一时刻推送任务越来越多时,后面进入的 task 消息会因为前面 task 消息还存在队列情况而出现延迟。如下图所示, 当 task1 消息量过大时,在task1 消费结束前,taskN 将一直处于等待状态。
  2. Kafka 在 Topic 数量由 64 增长到 256 时,吞吐量下降严重,Kafka 的每个 Topic、每个分区都会对应一个物理文件。当 Topic 数量增加时,消息分散的落盘策略会导致磁盘 IO 竞争激烈,因此我们不能仅通过增加 Topic 数量来缓解第一点中的问题。

基于上述问题,个推进行了新一轮的技术选型, 我们需要可以创建大量的 Topic, 同时吞吐性能不能比 Kafka 逊色。经过一段时间的调研,Apache Pulsar 引起了我们的关注。

三、为什么是 Pulsar

Apache Pulsar 是一个企业级的分布式消息系统,最初由 Yahoo 开发,在 2016 年开源,并于2018年9月毕业成为 Apache 基金会的顶级项目。Pulsar 已经在 Yahoo 的生产环境使用了三年多,主要服务于Mail、Finance、Sports、 Flickr、 the Gemini Ads platform、 Sherpa (Yahoo 的 KV 存储)。

架构

Topic 数量
Pulsar 可以支持百万级别 Topic 数量的扩展,同时还能一直保持良好的性能。Topic 的伸缩性取决于它的内部组织和存储方式。Pulsar 的数据保存在 bookie (BookKeeper 服务器)上,处于写状态的不同 Topic 的消息,在内存中排序,最终聚合保存到大文件中,在 Bookie 中需要更少的文件句柄。另一方面 Bookie 的 IO 更少依赖于文件系统的 Pagecache,Pulsar 也因此能够支持大量的主题。

消费模型
Pulsar 支持三种消费模型:Exclusive、Shared 和Failover。

Exclusive (独享):一个 Topic 只能被一个消费者消费。Pulsar 默认使用这种模式。

Shared(共享):共享模式,多个消费者可以连接到同一个 Topic,消息依次分发给消费者。当一个消费者宕机或者主动断开连接时,那么分发给这个消费者的未确认(ack)的消息会得到重新调度,分发给其他消费者。

Failover (灾备):一个订阅同时只有一个消费者,可以有多个备份消费者。一旦主消费者故障,则备份消费者接管。不会出现同时有两个活跃的消费者。

Exclusive和Failover订阅,仅允许一个消费者来使用和消费每个订阅的Topic。这两种模式都按 Topic 分区顺序使用消息。它们最适用于需要严格消息顺序的流(Stream)用例。

Shared 允许每个主题分区有多个消费者。同一个订阅中的每个消费者仅接收Topic分区的一部分消息。Shared最适用于不需要保证消息顺序队列(Queue)的使用模式,并且可以按照需要任意扩展消费者的数量。

存储
Pulsar 引入了 Apache BookKeeper 作为存储层,BookKeeper 是一个专门为实时系统优化过的分布式存储系统,具有可扩展、高可用、低延迟等特性。具体介绍,请参考 BookKeeper官网

Segment
BookKeeper以 Segment (在 BookKeeper 内部被称作 ledger) 作为存储的基本单元。从 Segment 到消息粒度,都会均匀分散到 BookKeeper 的集群中。这种机制保证了数据和服务均匀分散在 BookKeeper 集群中。

Pulsar 和 Kafka 都是基于 partition 的逻辑概念来做 Topic 存储的。最根本的不同是,Kafka 的物理存储是以 partition 为单位的,每个 partition 必须作为一个整体(一个目录)存储在某个 broker 上。 而 Pulsar 的 partition 是以 segment 作为物理存储的单位,每个 partition 会再被打散并均匀分散到多个 bookie 节点中。

这样的直接影响是,Kafka 的 partition 的大小,受制于单台 broker 的存储;而 Pulsar 的 partition 则可以利用整个集群的存储容量。

扩容
当 partition 的容量达到上限后,需要扩容的时候,如果现有的单台机器不能满足,Kafka 可能需要添加新的存储节点,并将 partition 的数据在节点之间搬移达到 rebalance 的状态。

而 Pulsar 只需添加新的 Bookie 存储节点即可。新加入的节点由于剩余空间大,会被优先使用,接收更多的新数据;整个扩容过程不涉及任何已有数据的拷贝和搬移。

Broker 故障
Pulsar 在单个节点失败时也会体现同样的优势。如果 Pulsar 的某个服务节点 broker 失效,由于 broker 是无状态的,其他的 broker 可以很快接管 Topic,不会涉及 Topic 数据的拷贝;如果存储节点 Bookie 失效,在集群后台中,其他的 Bookie 会从多个 Bookie 节点中并发读取数据,并对失效节点的数据自动进行恢复,对前端服务不会造成影响。

Bookie 故障
Apache BookKeeper 中的副本修复是 Segment (甚至是 Entry)级别的多对多快速修复。这种方式只会复制必须的数据,这比重新复制整个主题分区要精细。如下图所示,当错误发生时, Apache BookKeeper 可以从 bookie 3 和 bookie 4 中读取 Segment 4 中的消息,并在 bookie 1 处修复 Segment 4。所有的副本修复都在后台进行,对 Broker 和应用透明。

当某个 Bookie 节点出错时,BookKeeper会自动添加可用的新 Bookie 来替换失败的 Bookie,出错的 Bookie 中的数据在后台恢复,所有 Broker 的写入不会被打断,而且不会牺牲主题分区的可用性。

四、基于 Pulsar 的优先级队列方案

在设计思路上,Pulsar 方案和 Kafka 方案并没有多大区别。但在新方案中,个推技术团队借助 Pulsar 的特性,解决了 Kafka 方案中存在的问题。

  1. 根据 task 动态生成 Topic,保证了后进入的 task 不会因为其他 task 消息堆积而造成等待情况。
  2. 中高优先级 task 都独享一个 Topic,低优先级 task 共享 n 个 Topic。
  3. 相同优先级内,各个 task 轮询读取消息,配额满后流转至下一个优先级。
  4. 相同优先级内, 各个 task 可动态调整 quota, 在相同机会内,可读取更多消息。
  5. 利用 Shared 模式, 可以动态添加删除 consumer,且不会触发 Rebalance 情况。
  6. 利用 BookKeeper 特性,可以更灵活的添加存储资源。

五、Pulsar 其他实践

  1. 不同 subscription 之间相对独立,如果想要重复消费某个 Topic 的消息,需要使用不同的 subscriptionName 订阅;但是一直增加新的 subscriptionName,backlog 会不断累积。
  2. 如果 Topic 无人订阅,发给它的消息默认会被删除。因此如果 producer 先发送,consumer 后接收,一定要确保 producer 发送之前,Topic 有 subscription 存在(哪怕 subscribe 之后 close 掉),否则这段时间发送的消息会导致无人处理。
  3. 如果既没有人发送消息,又没有人订阅消息,一段时间后 Topic 会自动删除。
  4. Pulsar 的 TTL 等设置,是针对整个 namespace 起效的,无法针对单个 Topic。
  5. Pulsar 的键都建立在 zookeeper 的根目录上,在初始化时建议增加总节点名。
  6. 目前 Pulsar 的 java api 设计,消息默认需要显式确认,这一点跟 Kafka 不一样。
  7. Pulsar dashboard 上的 storage size 和 prometheus 上的 storage size (包含副本大小)概念不一样。
  8. dbStorage_rocksDB_blockCacheSize 设置的足够大;当消息体量大,出现backlog 大量堆积时, 使用默认大小(256M)会出现读耗时过大情况,导致消费变慢。
  9. 使用多 partition,提高吞吐。
  10. 在系统出现异常时,主动抓取 stats 和 stats-internal,里面有很多有用数据。
  11. 如果业务中会出现单 Topic 体量过大的情况,建议把 backlogQuotaDefaultLimitGB 设置的足够大(默认10G), 避免因为默认使用producer_request_hold 模式出现 block producer 的情况;当然可以根据实际业务选择合适的 backlogQuotaDefaultRetentionPolicy
  12. 根据实际业务场景主动选择 backlog quota。
  13. prometheus 内如果发现读耗时为空情况,可能是因为直接读取了缓存数据;Pulsar 在读取消息时会先读取 write cache, 然后读取 read cache;如果都没有命中, 则会在 RocksDB 中读取条目位子后,再从日志文件中读取该条目。
  14. 写入消息时, Pulsar 会同步写入 journal 和 write cache;write cache 再异步写入日志文件和 RocksDB; 所以有资源的话,建议 journal 盘使用SSD。

六、总结

现在, 个推针对优先级中间件的改造方案已经在部分现网业务中试运行,对于 Pulsar 的稳定性,我们还在持续关注中。
作为一个2016 年才开源的项目,Pulsar 拥有非常多吸引人的特性,也弥补了其他竞品的短板,例如跨地域复制、多租户、扩展性、读写隔离等。尽管在业内使用尚不广泛, 但从现有的特性来说, Pulsar 表现出了取代 Kafka 的趋势。在使用 Pulsar 过程中,我们也遇到了一些问题, 在此特别感谢翟佳和郭斯杰(两位均为 Stream Native 的核心工程师、开源项目 Apache Pulsar 的 PMC 成员)给我们提供的支持和帮助。

参考文献:

[1] 比拼 Kafka, 大数据分析新秀Pulsar 到底好在哪(https://www.infoq.cn/article/1UaxFKWUhUKTY1t_5gPq)

[2] 开源实时数据处理系统Pulsar:一套搞定Kafka+Flink+DB(https://juejin.im/post/5af414365188256717765441)

原文地址:https://blog.51cto.com/13031991/2378638

时间: 2024-08-18 11:44:35

个推基于 Apache Pulsar 的优先级队列方案的相关文章

初学算法-基于最小堆的优先级队列C++实现

笔者近日实现了最小堆类及其派生的优先级队列,特将代码奉上,不足之处还请指出! 在实现优先级队列时,笔者表示萌萌哒没有用过template写派生类,结果写完了出现error: *** was not decleared in this scope..后来各种补上this->才完事,在CSDN(笔者的帖子地址? http://bbs.csdn.net/topics/391806995)上提问后才知道是模板参数依赖,笔者表示涨姿势了.. /**  * The Minimum Heap Class an

在STICORP使用Apache Pulsar构建数据驱动的应用程序

" Apache Pulsar 提供了太多无法忽视的好处.我们决定部署Apache Pulsar,并对此非常满意.我们已经将超过30%的生产数据迁移到Pulsar上,并计划在未来六个月内将所有数据迁移到Pulsar上." 原作者:Daniel Ferreira Jorge, STICORP运营总监. 译者:Sijie Guo 背景介绍 在一家商业公司,采用任何一项新技术,包括开源技术,都有一定的风险,即使这项技术具有显著的技术优势.Apache Pulsar的引入经过了我们的深思熟虑和

优先级队列(PriprityQueue)是一种什么样的数据结构

优先级队列(PriprityQueue)是一种无界队列,基于优先级堆,它的元素根据自然顺序或者通过实现Comparator接口的自定义排序方式进行排序.这篇文章,我们将创建一个Items的优先级队列,基于价格排序,优先级队列用来实现迪科斯彻算法(Dijkstra algorithm)非常实用.值得注意的是他的迭代器并不保证有序,如果需要按顺序遍历,最好使用Arrays.sort(pd.toArray())方法.同时它的实现不是同步的,意味着在多线程中不是线程安全的对象,可以取而代之的是Prior

有序链表实现的优先级队列

package day1_29; public class Link { public long dDate; public Link next; public Link(long dDate){ this.dDate = dDate; } //打印链结点的方法 public void displayLink(){ System.out.println("["+dDate+"]"); } } =====================================

分布式消息队列Apache Pulsar

Pulsar简介 Apache Pulsar是一个企业级的分布式消息系统,最初由Yahoo开发并在2016年开源,目前正在Apache基金会下孵化.Plusar已经在Yahoo的生产环境使用了三年多,主要服务于Mail.Finance.Sports. Flickr. the Gemini Ads platform. Sherpa以及Yahoo的KV存储. Pulsar之所以能够称为下一代消息队列,主要是因为以下特性: 线性扩展.能够丝滑的扩容到成百上千个节点(Kafka扩容需要占用很多系统资源在

基于堆的最大最小优先级队列的实现

最大堆能够在O(1)的时间内取得集合中的最大值,并且在集合中加入新元素的时候,能够以O(Logn)的时间将新的元素插入到堆中. 当取出最大的元素时,能够以O(Logn)的时间重新将堆整理成最大堆.最小堆同理. 最大优先级队列的应用实例:基于优先级的作业调度,在所有等待调度的作业中,选择具有最大优先级作业进行处理.同时一个新的作业也可以插入到队列里面去. 例如可以实现自己的基于优先级的多线程作业调度程序. 最小优先级队列的应用实例:可以实现一个基于时间的作业调度程序,时间最小的被优先选择进行事件通

初学图论-Dijkstra单源最短路径算法基于优先级队列(Priority Queue)的实现

这一次,笔者使用了STL库中的优先级队列(Priority Queue)来完成Dijkstra算法中extract-min()语句(即从未选中的节点中选取一个距离原点s最小的点)的功能.由于优先级队列的插入.删除操作只需要logn的时间花费,因此降低了不少运行时间. 本文使用C++实现了这一基本算法.参考<算法导论>第24.3节. /**  * Dijkstra's Single Source Shortest Path Algorithm in C++  * Time Cost : O(Ml

如何基于RabbitMQ实现优先级队列

概述 由于种种原因,RabbitMQ到目前为止,官方还没有实现优先级队列,只实现了Consumer的优先级处理. 但是,迫于种种原因,应用层面上又需要优先级队列,因此需求来了:如何为RabbitMQ加入优先级队列特性. 查询资料后,得知RabbitMQ虽然官方没有支持此特性,但是社区已经有相关优先级队列插件了,并且这个插件被列在RabbitMQ官方网站中了. 地址如下:http://www.rabbitmq.com/community-plugins.html 插件安装 不要立刻下载这个url中

优先级队列与堆排序

转自:http://www.cnblogs.com/yangecnu/p/Introduce-Priority-Queue-And-Heap-Sort.html 在很多应用中,我们通常需要按照优先级情况对待处理对象进行处理,比如首先处理优先级最高的对象,然后处理次高的对象.最简单的一个例子就是,在手机上玩游戏的时候,如果有来电,那么系统应该优先处理打进来的电话. 在这种情况下,我们的数据结构应该提供两个最基本的操作,一个是返回最高优先级对象,一个是添加新的对象.这种数据结构就是优先级队列(Pri