1分钟实现“延迟消息”功能

非常棒的一个微信公众号

转载地址:http://mp.weixin.qq.com/s/eDMV25YqCPYjxQG-dvqSqQ

一、缘起

很多时候,业务有“在一段时间之后,完成一个工作任务”的需求。

例如:滴滴打车订单完成后,如果用户一直不评价,48小时后会将自动评价为5星。

一般来说怎么实现这类“48小时后自动评价为5星”需求呢?

常见方案:启动一个cron定时任务,每小时跑一次,将完成时间超过48小时的订单取出,置为5星,并把评价状态置为已评价。

假设订单表的结构为:t_order(oid, finish_time, stars, status, …),更具体的,定时任务每隔一个小时会这么做一次:

select oid from t_order where finish_time > 48hours and status=0;

update t_order set stars=5 and status=1 where oid in[…];

如果数据量很大,需要分页查询,分页update,这将会是一个for循环。

方案的不足:

(1)轮询效率比较低

(2)每次扫库,已经被执行过记录,仍然会被扫描(只是不会出现在结果集中),有重复计算的嫌疑

(3)时效性不够好,如果每小时轮询一次,最差的情况下,时间误差会达到1小时

(4)如果通过增加cron轮询频率来减少(3)中的时间误差,(1)中轮询低效和(2)中重复计算的问题会进一步凸显

如何利用“延时消息”,对于每个任务只触发一次,保证效率的同时保证实时性,是今天要讨论的问题。

 

二、高效延时消息设计与实现

高效延时消息,包含两个重要的数据结构:

(1)环形队列,例如可以创建一个包含3600个slot的环形队列(本质是个数组)

(2)任务集合,环上每一个slot是一个Set<Task>

同时,启动一个timer,这个timer每隔1s,在上述环形队列中移动一格,有一个Current Index指针来标识正在检测的slot。

Task结构中有两个很重要的属性:

(1)Cycle-Num:当Current Index第几圈扫描到这个Slot时,执行任务

(2)Task-Function:需要执行的任务指针

假设当前Current Index指向第一格,当有延时消息到达之后,例如希望3610秒之后,触发一个延时消息任务,只需:

(1)计算这个Task应该放在哪一个slot,现在指向1,3610秒之后,应该是第11格,所以这个Task应该放在第11个slot的Set<Task>中

(2)计算这个Task的Cycle-Num,由于环形队列是3600格(每秒移动一格,正好1小时),这个任务是3610秒后执行,所以应该绕3610/3600=1圈之后再执行,于是Cycle-Num=1

Current Index不停的移动,每秒移动到一个新slot,这个slot中对应的Set<Task>,每个Task看Cycle-Num是不是0:

(1)如果不是0,说明还需要多移动几圈,将Cycle-Num减1

(2)如果是0,说明马上要执行这个Task了,取出Task-Funciton执行(可以用单独的线程来执行Task),并把这个Task从Set<Task>中删除

使用了“延时消息”方案之后,“订单48小时后关闭评价”的需求,只需将在订单关闭时,触发一个48小时之后的延时消息即可:

(1)无需再轮询全部订单,效率高

(2)一个订单,任务只执行一次

(3)时效性好,精确到秒(控制timer移动频率可以控制精度)

三、总结

环形队列是一个实现“延时消息”的好方法,开源的MQ好像都不支持延迟消息,不妨自己实现一个简易的“延时消息队列”,能解决很多业务问题,并减少很多低效扫库的cron任务。

另外,关于MQ的可达性、幂等性未来撰文另述。

时间: 2024-10-27 00:44:19

1分钟实现“延迟消息”功能的相关文章

170307、1分钟实现“延迟消息”功能

一.缘起 很多时候,业务有"在一段时间之后,完成一个工作任务"的需求. 例如:滴滴打车订单完成后,如果用户一直不评价,48小时后会将自动评价为5星. 一般来说怎么实现这类"48小时后自动评价为5星"需求呢? 常见方案:启动一个cron定时任务,每小时跑一次,将完成时间超过48小时的订单取出,置为5星,并把评价状态置为已评价. 假设订单表的结构为:t_order(oid, finish_time, stars, status, -),更具体的,定时任务每隔一个小时会这

基于redis的延迟消息队列设计

需求背景 用户下订单成功之后隔20分钟给用户发送上门服务通知短信 订单完成一个小时之后通知用户对上门服务进行评价 业务执行失败之后隔10分钟重试一次 类似的场景比较多 简单的处理方式就是使用定时任务 假如数据比较多的时候 有的数据可能延迟比较严重,而且越来越多的定时业务导致任务调度很繁琐不好管理. 队列设计 目前可以考虑使用rabbitmq来满足需求 但是不打算使用,因为目前太多的业务使用了另外的MQ中间件. 开发前需要考虑的问题? 及时性 消费端能按时收到 同一时间消息的消费权重 可靠性 消息

Spring Boot RabbitMQ 延迟消息实现完整版

概述 曾经去网易面试的时候,面试官问了我一个问题,说 下完订单后,如果用户未支付,需要取消订单,可以怎么做 我当时的回答是,用定时任务扫描DB表即可.面试官不是很满意,提出: 用定时任务无法做到准实时通知,有没有其他办法? 我当时的回答是: 可以用队列,订单下完后,发送一个消息到队列里,并指定过期时间,时间一到,执行回调接口. 面试官听完后,就不再问了.其实我当时的思路是对的,只不过讲的不是很专业而已.专业说法是利用延迟消息. 其实用定时任务,确实有点问题,原本业务系统希望10分钟后,如果订单未

C# RabbitMQ延迟队列功能实战项目演练

一.需求背景 当用户在商城上进行下单支付,我们假设如果8小时没有进行支付,那么就后台自动对该笔交易的状态修改为订单关闭取消,同时给用户发送一份邮件提醒.那么我们应用程序如何实现这样的需求场景呢?在之前的<C# Redis缓存过期实现延迟通知实战演练>分享课程中阿笨最后总结的时候说过Redis Pub/Sub是一种并不可靠地消息机制,他不会做信息的存储,只是在线转发,那么肯定也没有ack确认机制,另外只有订阅段监听时才会转发!我们是否有更好的方式去实现呢?今天给大家分享的比较好的解决方案就是通过

电商项目实战(架构八)——RabbitMQ实现延迟消息

一.前言 RabbitMQ是一个开源的消息队列,轻量级且易于部署,并支持多种消息协议.RabbitMQ可以部署在分布式和联合配置中,以满足高规模.高可用性的需求.本文整合RabbitMQ实现延迟消息的过程,以发送延迟消息取消超时订单为例. 二.RabbitMQ的安装和使用 1.安装Erlang,下载地址:http://erlang.org/download/otp_win_64_21.3.exe 2.安装RabbitMQ,下载地址:https://dl.bintray.com/rabbitmq/

springmvc(18)使用WebSocket 和 STOMP 实现消息功能

[0]README 1)本文旨在 介绍如何 利用 WebSocket 和 STOMP 实现消息功能: 2)要知道, WebSocket 是发送和接收消息的 底层API,而SockJS 是在 WebSocket 之上的 API:最后 STOMP(面向消息的简单文本协议)是基于 SockJS 的高级API (干货--简而言之,WebSocket 是底层协议,SockJS 是WebSocket 的备选方案,也是 底层协议,而 STOMP 是基于 WebSocket(SockJS) 的上层协议) 3)b

rabttmq php延迟消息 相关代码(网上没搜到自己琢磨着弄好了)

前言:  作为一个运维人员不背锅,谁背呢! 正文: 网上都是2种办法去实现,第二种我就不说了,要升级rabbitmq 别人也说什么什么不行 好吧~今天按照网上说的 TTL + DLX 的方式来实现 延迟消息(java的 python的相关代码都有 就是没有 拍huang片的!!!!) 撸代码咯: 1.首先得要创建一个新的队列 新的交换机来存放延迟消息 并且设置新的队列消息ttl到期后 转发的 交换机 和 routeing key (我的老的交换机为e_test  key 为k1) 主要代码为:

如何在微信的自定义菜单上实现“历史消息”功能?

最近在开发微信的接口发现可以通过菜单实现微信的历史消息(历史消息是指订阅号或者服务号每次群发的消息),拿出来与大家分享,操作方法如下: 1.首先进入到您维护的微信订阅号或者服务号,点击帐号详情(右上的小人像). 2.进入帐号详情,找到“查看消息”并点击进入. 3.进入到历史消息页面后,点击右上角的“更多按钮”(三个竖排的点),找到复制链接,复制这个链接,这个链接就是我们要找的历史消息链接. 4.在微信中增加一个”历史消息“菜单,设置为视图类型,将菜单试图地址设置为刚才复制的链接,然后保存生效.

OpenSIPS离线消息功能配置

按照项目的需求,需要使用OpenSIPS的离线功能,现在将配置过程记录以备后用. OpenSIPS离线消息功能依赖于msilo模块(http://www.opensips.org/html/docs/modules/1.11.x/msilo.html) 修改OpenSIPS的配置文件opensips.cfg添加 loadmodule "msilo.so" #加载msilo模块 modparam("msilo", "db_table", "