有赞延迟队列设计

延迟队列,顾名思义它是一种带有延迟功能的消息队列。 那么,是在什么场景下我才需要这样的队列呢?

背景

我们先看看以下业务场景:

  • 当订单一直处于未支付状态时,如何及时的关闭订单,并退还库存?
  • 如何定期检查处于退款状态的订单是否已经退款成功?
  • 新创建店铺,N天内没有上传商品,系统如何知道该信息,并发送激活短信?等等

为了解决以上问题,最简单直接的办法就是定时去扫表。每个业务都要维护一个自己的扫表逻辑。 当业务越来越多时,我们会发现扫表部分的逻辑会非常类似。我们可以考虑将这部分逻辑从具体的业务逻辑里面抽出来,变成一个公共的部分。
那么开源界是否已有现成的方案呢?答案是肯定的。Beanstalkd(http://kr.github.io/beanstalkd/), 它基本上已经满足以上需求。但是,在删除消息的时候不是特别方便,需要更多的成本。而且,它是基于C语言开发的,当时我们团队主流是PHP和Java,没法做二次开发。于是我们借鉴了它的设计思路,用Java重新实现了一个延迟队列。

设计目标

  • 消息传输可靠性:消息进入到延迟队列后,保证至少被消费一次。
  • Client支持丰富:由于业务上的需求,至少支持PHP和Python。
  • 高可用性:至少得支持多实例部署。挂掉一个实例后,还有后备实例继续提供服务。
  • 实时性:允许存在一定的时间误差。
  • 支持消息删除:业务使用方,可以随时删除指定消息。

整体结构

整个延迟队列由4个部分组成:

  • Job Pool用来存放所有Job的元信息。
  • Delay Bucket是一组以时间为维度的有序队列,用来存放所有需要延迟的/已经被reserve的Job(这里只存放Job Id)。
  • Timer负责实时扫描各个Bucket,并将delay时间大于等于当前时间的Job放入到对应的Ready Queue。
  • Ready Queue存放处于Ready状态的Job(这里只存放Job Id),以供消费程序消费。

如下图表述:

设计要点

基本概念

  • Job:需要异步处理的任务,是延迟队列里的基本单元。与具体的Topic关联在一起。
  • Topic:一组相同类型Job的集合(队列)。供消费者来订阅。

消息结构

每个Job必须包含一下几个属性:

  • Topic:Job类型。可以理解成具体的业务名称。
  • Id:Job的唯一标识。用来检索和删除指定的Job信息。
  • Delay:Job需要延迟的时间。单位:秒。(服务端会将其转换为绝对时间)
  • TTR(time-to-run):Job执行超时时间。单位:秒。
  • Body:Job的内容,供消费者做具体的业务处理,以json格式存储。

具体结构如下图表示:TTR的设计目的是为了保证消息传输的可靠性。

消息状态转换

每个Job只会处于某一个状态下:

  • ready:可执行状态,等待消费。
  • delay:不可执行状态,等待时钟周期。
  • reserved:已被消费者读取,但还未得到消费者的响应(delete、finish)。
  • deleted:已被消费完成或者已被删除。

下面是四个状态的转换示意图:

消息存储

在选择存储介质之前,先来确定下具体的数据结构:

  • Job Poll存放的Job元信息,只需要K/V形式的结构即可。key为job id,value为job struct。
  • Delay Bucket是一个有序队列。
  • Ready Queue是一个普通list或者队列都行。

能够同时满足以上需求的,非redis莫属了。
bucket的数据结构就是redis的zset,将其分为多个bucket是为了提高扫描速度,降低消息延迟。

通信协议

为了满足多语言Client的支持,我们选择Http通信方式,通过文本协议(json)来实现与Client端的交互。 目前支持以下协议:

  • 添加:{‘command’:’add’, ’topic’:’xxx’, ‘id’: ‘xxx’, ‘delay’: 30, ’TTR’: 60, ‘body’:‘xxx‘}
  • 获取:{‘command’:’pop’, ’topic’:’xxx‘}
  • 完成:{‘command’:’finish’, ‘id’:’xxx‘}
  • 删除:{‘command’:’delete’, ‘id’:’xxx‘}

body也是一个json串。 
Response结构:{’success’:true/false, ‘error’:’error reason’, ‘id’:’xxx’, ‘value’:’job body‘} 
强调一下:job id是由业务使用方决定的,一定要保证全局唯一性。这里建议采用topic+业务唯一id的组合。

举例说明一个Job的生命周期

  • 用户对某个商品下单,系统创建订单成功,同时往延迟队列里put一个job。job结构为:{‘topic‘:‘orderclose’, ‘id‘:‘ordercloseorderNoXXX’, ‘delay’:1800 ,’TTR‘:60 , ‘body‘:’XXXXXXX’}
  • 延迟队列收到该job后,先往job pool中存入job信息,然后根据delay计算出绝对执行时间,并以轮询(round-robbin)的方式将job id放入某个bucket。
  • timer每时每刻都在轮询各个bucket,当1800秒(30分钟)过后,检查到上面的job的执行时间到了,取得job id从job pool中获取元信息。如果这时该job处于deleted状态,则pass,继续做轮询;如果job处于非deleted状态,首先再次确认元信息中delay是否大于等于当前时间,如果满足则根据topic将job id放入对应的ready queue,然后从bucket中移除;如果不满足则重新计算delay时间,再次放入bucket,并将之前的job id从bucket中移除。
  • 消费端轮询对应的topic的ready queue(这里仍然要判断该job的合理性),获取job后做自己的业务逻辑。与此同时,服务端将已经被消费端获取的job按照其设定的TTR,重新计算执行时间,并将其放入bucket。
  • 消费端处理完业务后向服务端响应finish,服务端根据job id删除对应的元信息。

现有物理拓扑

目前采用的是集中存储机制,在多实例部署时Timer程序可能会并发执行,导致job被重复放入ready queue。为了解决这个问题,我们使用了redis的setnx命令实现了简单的分布式锁,以保证每个bucket每次只有一个timer thread来扫描。

设计不足的地方

timer是通过独立线程的无限循环来实现,在没有ready job的时候会对CPU造成一定的浪费。 
消费端在reserve job的时候,采用的是http短轮询的方式,且每次只能取的一个job。如果ready job较多的时候会加大网络I/O的消耗。
数据存储使用的redis,消息在持久化上受限于redis的特性。
scale-out的时候依赖第三方(nginx)。

未来架构方向

基于wait/notify方式的Timer实现。
提供TCP长连的API,实现push或者long-polling的消息reserve方法。
拥有自己的存储方案(内嵌数据库、自定义数据结构写文件),确保消息的持久化。
实现自己的name-server。
考虑提供周期性任务的直接支持。

 
如无特殊说明,本文版权归 本文作者及有赞技术团队 所有,采用 署名-非商业性使用 4.0 国际许可协议 进行许可。
转载请注明:来自有赞技术团队博客 http://tech.youzan.com/queuing_delay/

时间: 2024-12-20 23:48:12

有赞延迟队列设计的相关文章

RabbitMQ延迟队列(Python版)

原创Bge的博客 最后发布于2019-02-13 18:20:39 阅读数 401 收藏展开欢迎访问个人博客最近在做一任务时,遇到需要延迟处理的数据,最开始的做法是现将数据存储在数据库,然后写个脚本,隔五分钟扫描数据表再处理数据,实际效果并不好.因为系统本身一直在用RabbitMQ做异步处理任务的中间件,所以想到是否可以利用RabbitMQ实现延迟队列.功夫不负有心人,RabbitMQ虽然没有现成可用的延迟队列,但是可以利用其两个重要特性来实现之:1.Time To Live(TTL)消息超时机

灵感来袭,基于Redis的分布式延迟队列

延迟队列 延迟队列,也就是一定时间之后将消息体放入队列,然后消费者才能正常消费.比如1分钟之后发送短信,发送邮件,检测数据状态等. Redisson Delayed Queue 如果你项目中使用了redisson,那么恭喜你,使用延迟队列将非常的简单. 基于Redis的Redisson分布式延迟队列(Delayed Queue)结构的RDelayedQueue Java对象在实现了RQueue接口的基础上提供了向队列按要求延迟添加项目的功能.该功能可以用来实现消息传送延迟按几何增长或几何衰减的发

基于redis的延迟消息队列设计

需求背景 用户下订单成功之后隔20分钟给用户发送上门服务通知短信 订单完成一个小时之后通知用户对上门服务进行评价 业务执行失败之后隔10分钟重试一次 类似的场景比较多 简单的处理方式就是使用定时任务 假如数据比较多的时候 有的数据可能延迟比较严重,而且越来越多的定时业务导致任务调度很繁琐不好管理. 队列设计 目前可以考虑使用rabbitmq来满足需求 但是不打算使用,因为目前太多的业务使用了另外的MQ中间件. 开发前需要考虑的问题? 及时性 消费端能按时收到 同一时间消息的消费权重 可靠性 消息

消息队列设计精要【转】

消息队列已经逐渐成为企业IT系统内部通信的核心手段.它具有低耦合.可靠投递.广播.流量控制.最终一致性等一系列功能,成为异步RPC的主要手段之一. 当今市面上有很多主流的消息中间件,如老牌的ActiveMQ.RabbitMQ,炙手可热的Kafka,阿里巴巴自主开发的Notify.MetaQ.RocketMQ等. 本文不会一一介绍这些消息队列的所有特性,而是探讨一下自主开发设计一个消息队列时,你需要思考和设计的重要方面.过程中我们会参考这些成熟消息队列的很多重要思想. 本文首先会阐述什么时候你需要

消息队列设计精要(转载)

消息队列已经逐渐成为企业IT系统内部通信的核心手段.它具有低耦合.可靠投递.广播.流量控制.最终一致性等一系列功能,成为异步RPC的主要手段之一.当今市面上有很多主流的消息中间件,如老牌的ActiveMQ.RabbitMQ,炙手可热的Kafka,阿里巴巴自主开发的Notify.MetaQ.RocketMQ等.本文不会一一介绍这些消息队列的所有特性,而是探讨一下自主开发设计一个消息队列时,你需要思考和设计的重要方面.过程中我们会参考这些成熟消息队列的很多重要思想.本文首先会阐述什么时候你需要一个消

【转】消息队列设计精要

介绍的比较全面,可以借鉴学习:原文连接:http://tech.meituan.com/mq-design.html 消息队列已经逐渐成为企业IT系统内部通信的核心手段.它具有低耦合.可靠投递.广播.流量控制.最终一致性等一系列功能,成为异步RPC的主要手段之一.当今市面上有很多主流的消息中间件,如老牌的ActiveMQ.RabbitMQ,炙手可热的Kafka,阿里巴巴自主开发的Notify.MetaQ.RocketMQ等.本文不会一一介绍这些消息队列的所有特性,而是探讨一下自主开发设计一个消息

基于2PC和延迟队列的分布式事务实现

背景 分布式多消息事务问题  在消息队列使用场景中,有时需要同时下发多条消息,但现在的消息队列比如kafka只支持单条消息的事务保证,不能保证多条消息,今天说的这个方案就时kafka内部的一个子项目中基于2PC和延迟更新来实现分布式事务 2PC  2PC俗称两阶段提交,通过将一个操作分为两个阶段:准备阶段和提交阶段来尽可能保证操作的原子执行(实际上不可能,大家有个概念先) 延迟更新  延迟更新其实是一个很常用的技术手段,简单来说,当某个操作条件不满足时,通过一定手段将数据暂存,等条件满足时在进行

【RabbitMQ】一文带你搞定RabbitMQ延迟队列

本文口味:鱼香肉丝? ?预计阅读:10分钟 一.说明 在上一篇中,介绍了RabbitMQ中的死信队列是什么,何时使用以及如何使用RabbitMQ的死信队列.相信通过上一篇的学习,对于死信队列已经有了更多的了解,这一篇的内容也跟死信队列息息相关,如果你还不了解死信队列,那么建议你先进行上一篇文章的阅读. 这一篇里,我们将继续介绍RabbitMQ的高级特性,通过本篇的学习,你将收获: 什么是延时队列 延时队列使用场景 RabbitMQ中的TTL 如何利用RabbitMQ来实现延时队列 二.本文大纲

如何实现延迟队列

延迟队列的需求各位应该在日常开发的场景中经常碰到.比如: 用户登录之后5分钟给用户做分类推送: 用户多少天未登录给用户做召回推送: 定期检查用户当前退款账单是否被商家处理等等场景. 一般这种场景和定时任务还是有很大的区别,定时任务是你知道任务多久该跑一次或者什么时候只跑一次,这个时间是确定的.延迟队列是当某个事件发生的时候需要延迟多久触发配套事件,引子事件发生的时间不是固定的. 业界目前也有很多实现方案,单机版的方案就不说了,现在也没有哪个公司还是单机版的服务,今天我们一一探讨各种方案的大致实现