.net core中使用redis 延迟队列

一、项目场景:

添加任务并设定任务的执行时间,然后按时间顺序由近到远依次执行。

二、思路:

可以利用redis的有序集合(SortedSet),用时间戳排序实现,大概的流程如下。

三、关键思路&代码段

  1. 写入任务

    使用任务下一次的执行时间按分钟生成key,将同一分钟待执行的任务放到一个key中,这一步主要思考的问题是:拆分队列,设置各自的过期时间,如:过期时间 = 执行时间 + 5分钟,保证过期的队列自动删除,不会造成后续因消费能力不足而导致redis持续膨胀。

IDictionary<double, string> dic = new Dictionary<double, string>()
{
    {
       interLineCacheModel.NextHandTimeSpan,
       interLineCacheModel.CacheRoute
    }
};
var taskKey = GetKey(interLineCacheModel.NextHandTime);
await _buildInterLineRepository.ZAddAsync(taskKey, dic);
await _buildInterLineRepository.ExpireAsync(taskKey, new DateTimeOffset(interLineCacheModel.NextHandTime.AddMinutes(5)).ToUnixTimeSeconds());
private string GetKey(DateTime dateTime)
{
    return $"IRTask{dateTime.ToString("yyyyMMddHHmm")}";
}
  1. 消费服务

    因为是一个有序集合,所以队列会自动按时间戳的大小来排序,这样就自动实现了由近到远依次执行,使用当前时间生成key,来获取对应的task,每次可以获取1条或者N条。

var taskKey = GetKey(DateTime.Now);
var routeList = await _buildInterLineRepository.ZRangeAsync(taskKey, 0, 0);

再拿到对应的执行时间戳,与当前时间做比较,如果还没有到执行时间就跳出队列。

var nextHandleTs = await _buildInterLineRepository.ZScoreAsync(taskKey, route);
if (long.TryParse(nextHandleTs, out var nextHandleTimeSpan))
{
    var nextHandleTime = DateTimeOffset.FromUnixTimeMilliseconds(nextHandleTimeSpan).ToBeiJingDateTime();
    if (nextHandleTime > DateTime.Now)
    {
       continue;
    }
}

最后一步,使用ZRemAsync,以确保谁删除成功谁执行,解决多台机器同时拿到数据的问题

var success = await _buildInterLineRepository.ZRemAsync(taskKey, route) > 0;
if (success)
{
    //todo
}

注意事项:因为我们是按时间分钟来生成key,所以到时间临界点的时候,如果消费能力不足会导致key仍然有遗留任务,如果对此很敏感,可以在临界点将时间回退1秒,再获取出所有的任务。stop = -1 能拿出所有集合。

if second = 0
{
    addsecond(-1);
    var routeList = await _buildInterLineRepository.ZRangeAsync(taskKey, 0, -1);
}

四、注册消费服务

方式很多,这里使用的是IHostedService实现后台任务,具体可以参考:https://docs.microsoft.com/zh-cn/aspnet/core/fundamentals/host/hosted-services?view=aspnetcore-3.0&tabs=visual-studio

public class InterLineLoopService : IHostedService
    {
        private readonly ILog _log;
        private readonly IInterLineTaskService _interLineTaskService;

        public InterLineLoopService(
            ILog log,
            IInterLineTaskService interLineTaskService)
        {
            _log = Guard.ArgumentNotNull(nameof(log), log);
            _interLineTaskService = Guard.ArgumentNotNull(nameof(interLineTaskService), interLineTaskService);
        }

        public Task StartAsync(CancellationToken cancellationToken)
        {
            _log.Info("LoopDelayMessage start....", "Domain", "InterLineLoopService", "StartAsync");
            return _interLineTaskService.LoopDelayMessage();
        }

        public Task StopAsync(CancellationToken cancellationToken)
        {
            return Task.CompletedTask;
        }
    }

再到startup中注册下

services.AddHostedService<InterLineLoopService>();

原文地址:https://www.cnblogs.com/yinpeng186/p/11994055.html

时间: 2024-11-08 16:33:17

.net core中使用redis 延迟队列的相关文章

如何在ASP.NET Core中使用Redis

注:本文提到的代码示例下载地址> https://code.msdn.microsoft.com/How-to-use-Redis-in-ASPNET-0d826418 Redis是一个开源的内存中的数据结构存储系统,可以用作数据库.缓存和消息中间件.它支持多种类型的数据结构:字符串,哈希表,列表,集合,有序集等等. Redis 官方没有推出Windows版本,倒是由Microsoft Open Tech提供了Windows 64bit 版本支持. 如何在Windows机器上安装Redis=>

.NET Core中使用IHostedService结合队列执行定时任务

最近遇到了这样的场景:每隔一段时间,需要在后台使用队列对一批数据进行业务处理. Quartz.NET是一种选择,在 .NET Core中,可以使用IHostedService执行后台定时任务.在本篇中,首先尝试把队列还原到最简单.原始的状态,然后给出以上场景问题的具体解决方案. 假设一个队列有8个元素.现在abcd依次进入队列. 0 1 2 3 4 5 6 7 a b c d head tail ab依次出队列. 0 1 2 3 4 5 6 7 c d head tail 可以想象,随着不断地入

谈谈在.NET Core中使用Redis和Memcached的序列化问题

前言 在使用分布式缓存的时候,都不可避免的要做这样一步操作,将数据序列化后再存储到缓存中去. 序列化这一操作,或许是显式的,或许是隐式的,这个取决于使用的package是否有帮我们做这样一件事. 本文会拿在.NET Core环境下使用Redis和Memcached来当例子说明,其中,Redis主要是用StackExchange.Redis,Memcached主要是用EnyimMemcachedCore. 先来看看一些我们常用的序列化方法. 常见的序列化方法 或许,比较常见的做法就是将一个对象序列

mvc core 中使用 redis

redis 下载安装路径: https://github.com/MicrosoftArchive/redis/releases 右键打开cmd命令行,运行命令:   .\redis-server.exe redis.windows.conf 将redis安装成服务:在该文件夹下运行命令:redis-server.exe --service-install redis.windows.conf 连接客户端: .\redis-cli -h 127.0.0.1 -p 6379 windows下我们如

灵感来袭,基于Redis的分布式延迟队列

延迟队列 延迟队列,也就是一定时间之后将消息体放入队列,然后消费者才能正常消费.比如1分钟之后发送短信,发送邮件,检测数据状态等. Redisson Delayed Queue 如果你项目中使用了redisson,那么恭喜你,使用延迟队列将非常的简单. 基于Redis的Redisson分布式延迟队列(Delayed Queue)结构的RDelayedQueue Java对象在实现了RQueue接口的基础上提供了向队列按要求延迟添加项目的功能.该功能可以用来实现消息传送延迟按几何增长或几何衰减的发

三分钟学会Redis在.NET Core中做缓存中间件

原文:三分钟学会Redis在.NET Core中做缓存中间件 大家好,今天给大家说明如何在.NET Core中使用Redis,我们在想要辩论程序的好与坏,都想需要一个可视化工具,我经常使用的是一位国内大牛开发的免费工具,其Github地址为: https://github.com/qishibo/AnotherRedisDesktopManager/releases ,它真的很给力,Redis的安装在 https://github.com/MicrosoftArchive/redis/relea

WSL2+Docker部署RabbitMQ以及在Asp.net core 中使用RabbitMQ示例(1)

本文主要在于最近因疫情不能外出,在家研究的一些技术积累. 主要用到的技术以及知识点: WSL 2 WSL 2+Docker Docker+RabbitMQ 在ASP.NET Core中使用RabbitMQ消息队列    一.WSL 2 1.什么是WSL 2? WSL 2就是 适用于Linux的Windows子系统的第二代版本,全称 Windows Subsystem for Linux 2. 2.为什么要使用WSL2? 其实这里使用WSL2目的,纯碎是为了用Docker.以前微软实现的WSL有些

C#.Net Core 操作Docker中的redis数据库

做软件开发的人,会在本机安装很多开发时要用到的软件,比如数据库,有MS SQL Server,MySQL,等,如果每种数据库都按照在本机确实有点乱,这个时候我们就想用虚拟机来隔离,这样就不会扰乱本机一些配置啊环境变量啊等等.但虚机机太耗硬盘了,而且还物理隔离内存,分给它多少内存,本机就少多少内存,对于现在的笔记本电脑,大多都是固态硬盘256G,512G太昂贵了,这个时候Docker 就派上用场了,我把数据库按照Docker容器里,它是进程隔离的,干净又快捷,下面就来介绍一下,在Docker中安装

有赞延迟队列设计

延迟队列,顾名思义它是一种带有延迟功能的消息队列. 那么,是在什么场景下我才需要这样的队列呢? 背景 我们先看看以下业务场景: 当订单一直处于未支付状态时,如何及时的关闭订单,并退还库存? 如何定期检查处于退款状态的订单是否已经退款成功? 新创建店铺,N天内没有上传商品,系统如何知道该信息,并发送激活短信?等等 为了解决以上问题,最简单直接的办法就是定时去扫表.每个业务都要维护一个自己的扫表逻辑. 当业务越来越多时,我们会发现扫表部分的逻辑会非常类似.我们可以考虑将这部分逻辑从具体的业务逻辑里面