.NET Core中使用IHostedService结合队列执行定时任务

最近遇到了这样的场景:每隔一段时间,需要在后台使用队列对一批数据进行业务处理。

Quartz.NET是一种选择,在 .NET Core中,可以使用IHostedService执行后台定时任务。在本篇中,首先尝试把队列还原到最简单、原始的状态,然后给出以上场景问题的具体解决方案。

假设一个队列有8个元素。现在abcd依次进入队列。

0 1 2 3 4 5 6 7
a b c d
head tail

ab依次出队列。

0 1 2 3 4 5 6 7
c d
head tail

可以想象,随着不断地入列出列,head和tail的位置不断往后,当tail在7号位的时候,虽然队列里还有空间,但此时数据就无法入队列了。

如何才可以继续入队列呢

首先想到的是数据搬移。当数据无法进入队列,首先让队列项出列,进入到另外一个新队列,这个新队列就可以再次接收数据入队列了。但是,搬移整个队列中的数据的时间复杂度为O(n),而原先出队列的时间复杂度是O(1),这种方式不够理想。

还有一种思路是使用循环队列。当tail指向最后一个位置,此时有新的数据进入队列,tail就来到头部指向0号位置,这样这个队列就可以循环使用了。

0 1 2 3 4 5 6 7
h i j
head tail

现在a入栈。

0 1 2 3 4 5 6 7
h i j a
tail head

队列有很多种实现

比如在生产消费模型中可以用阻塞队列。当生产队列为空的时候,为了不让消费者取数据,生产队列的Dequeue行为会被阻塞;而当生产队列满的时候,为了不让更多的数据进来,生产队列的Enqueue行为被阻塞。

线程安全的队列叫并发队列,如C#中的ConcurrentQueue

线程池内部也使用了队列机制。因为CPU的资源是有限的,过多的线程会导致CPU频繁地在线程之间切换。线程池内通过维护一定数量的线程来减轻CPU的负担。当线程池没有多余的线程可供使用,请求过来,一种方式是拒绝请求,另外一种方式是让请求队列阻塞,等到线程池内有线程可供使用,请求队列出列执行。用链表实现的队列是无界队列(unbounded queue),这种做法可能会导致过多的请求在排队,等待响应时间过长。用数组实现的队列是有界队列(bounded queue),当线程池已满,请求过来就会被拒绝。对有界队列来说数组的大小设置很讲究。

来模拟一个数组队列。

public class ArrayQueue
{
    private string[] items;
    private int n = 0; //数组长度
    private int head = 0;
    private int tail = 0;

    public ArrayQueue(int capacity)
    {
        n = capacity;
        items = new string[capacity];
    }

    public bool Enqueue(string item)
    {
        if(tail==n){
            return false;
        }
        items[tail] = item;
        ++tail;
        return true;
    }

    public string Dequeue()
    {
        if(head==null){
            return null;
        }
        string ret = items[head];
        ++head;
        return ret;
    }
}

以上就是一个最简单的、用数组实现的队列。

再次回到要解决的场景问题。解决思路大致是:实现IHostedService接口,在其中执行定时任务,每次把队列项放到队列中,并定义出队列的方法,在其中执行业务逻辑。

关于队列,通过以下的步骤使其在后台运行。

  • 队列项(MessageQueueItem):具备唯一标识、委托、添加到队列中的时间等属性
  • 队列(MessageQueue):维护着Dictionary<string, MessageQueueItem>静态字典集合
  • MessageQueueUtility类:决定着如何运行,比如队列执行的间隔时间、垃圾回收
  • MessageQueueThreadUtility类:维护队列线程,提供队列在后台运行的方法
  • Startup.cs中的Configure中调用MessageQueueThreadUtility中的方法使队列在后台运行

队列项(MessageQueueItem)

public class MessageQueueItem
{
    public MessageQueueItem(string key, Action action, string description=null)
    {
        Key = key;
        Action = action;
        Description = description;
        AddTime = DateTime.Now;
    }

    public string Key{get;set;}
    public Actioin Action{get;set;}
    public DateTime AddTime{get;set;}
    public string Description{get;set;}
}

队列(MessageQueue),维护着针对队列项的一个静态字典集合。

public class MessageQueue
{
    public static Dictionary<string, MessageQueueItem> MessageQueueDictionary = new Dictionary<string, MessageQueueItem>(StringComparer.OrdinalIgnoreCase);

    public static object MessageQueueSyncLock = new object();
    public static object OperateLock = new object();

    public static void OperateQueue()
    {
        lock(OperateLock)
        {
            var mq = new MessageQueue();
            var key = mq.GetCurrentKey();
            while(!string.IsNullOrEmpty(key))
            {
                var mqItem = mq.GetItem(key);
                mqItem.Action();
                mq.Remove(key);
                key = mq.GetCurrentKey();
            }
        }
    }

    public string GetCurrentKey()
    {
        lock(MessageQueueSyncLock)
        {
            return MessageQueueDictionary.Keys.FirstOrDefault();
        }
    }

    public MessageQueueItem GetItem(string key)
    {
        lock(MessageQueueSyncLock)
        {
            if(MessageQueueDictionary.ContainsKey(key))
            {
                return MessageQueueDictionary[key];
            }
            return null;
        }
    }

    public void Remove(string key)
    {
        lock(MessageQueueSyncLock)
        {
            if(MessageQueueDictionary.ContainsKey(key))
            {
                MessageQueueDictionary.Remove(key);
            }
        }
    }

    public MessageQueueItem Add(string key, Action actioin)
    {
        lock(MessageQueueSyncLock)
        {
            var mqItem = new MessageQueueItem(key, action);
            MessageQueueDictionary[key] = mqItem;
            return mqItem;
        }
    }

    public int GetCount()
    {
        lock(MessageQueueSyncLock)
        {
            return MessageQueueDictionary.Count;
        }
    }
}

MessageQueueUtility类, 决定着队列运行的节奏。

public class MessageQueueUtility
{
    private readonly int _sleepMilliSeconds;
    public MessageQueueUtility(int sleepMilliSeconds=1000)
    {
        _sleepMilliSeconds = sleepMilliSeoncds;
    }

    ~MessageQueueUtility()
    {
        MessageQueue.OperateQueue();
    }

    public void Run
    {
        do
        {
            MessageQueue.OperateQueue();
            Thread.Sleep(_sleepMilliSeconds);
        } while(true)
    }
}

MessageQueueThreadUtility类,管理队列的线程,并让其在后台运行。

public static class MessageQueueThreadUtility
{
    public static Dictionary<string, Thread> AsyncThreadCollection = new Dictioanry<string, Thread>();
    public static void Register(string threadUniqueName)
    {
        {
            MessageQueueUtility messageQueueUtility = new MessageQueueUtility();
            Thread messageQueueThread = new Thread(messageQueueUtility.Run){
                Name = threadUniqueName
            };
            AsyncThreadCollection.Add(messageQueueThread.Name, messageQueueThread);
        }

        AsyncThreadCollection.Values.ToList().ForEach(z => {
            z.IsBackground = true;
            z.Start();
        });
    }
}

Startup.cs中注册。

public class Startup
{
    public IServiceProvider ConfigureServices(IServiceCollection services)
    {
        ...
    }

    public void Configure(IApplicationBuilder app, IHostingEnvironment env...)
    {
        RegisterMessageQueueThreads();
    }

    private void RegisterMessageQueueThreads()
    {
        MessageQueueThreadUtility.Register("");
    }
}

最后在IHostedService的实现类中把队列项丢给队列。

public class MyBackgroundSerivce : IHostedService, IDisposable
{
    private Timer _timer;
    public IServiceProvider Services{get;}

    public MyBackgroundService(IServiceProvider services)
    {
        Serivces = services;
    }

    public void Dispose()
    {
        _timer?.Dispose();
    }

    public Task StartAsync(CancellationToken cancellationToken)
    {
        _timer = new Timer(DoWork, null, TimeSpan.Zero, TimeSpan.FromSeconds(10));
        return Task.CompletedTask;
    }

    public Task StopAsync(CancellationToken cancellationToken)
    {
        _timer?.Change(Timeout.Infinite,0);
        return Task.CompletedTask;
    }

    private void DoWork(object state)
    {
        using(var scope = Services.CreateScope())
        {
            using(var db = scope.ServiceProvider.GetRequiredService<MyDbContext>())
            {
                ...
                var mq = new MessageQueue();
                mq.Add("somekey", DealQueueItem);
            }
        }
    }

    private void DealQueueItem()
    {
        var mq = new MessageQueue();
        var key = mq.GetCurrentKey();
        var item = mq.GetItem(key);
        if(item!=null)
        {
            using(var scope = Services.CreateScope())
            {
                using(var db = scope.ServiceProvider.GetRequiredService<MyDbContext>())
                {
                    //执行业务逻辑
                }
            }
        }
    }
}

当需要使用上下文的时候,首先通过IServiceProviderCreateScope方法得到ISerivceScope,再通过它的ServiceProvider属性获取依赖倒置容器中的上下文服务。

以上,用IHostedService结合队列解决了开篇提到的场景问题,如果您有很好的想法,我们一起交流吧。文中的队列部分来自"盛派网络"的Senparc.Weixin SDK源码。

原文地址:https://www.cnblogs.com/darrenji/p/10254957.html

时间: 2024-08-30 00:28:41

.NET Core中使用IHostedService结合队列执行定时任务的相关文章

java中服务器启动时,执行定时任务

package com.ripsoft.util; import java.util.Calendar; import java.util.Timer; import javax.servlet.ServletContextEvent; import javax.servlet.ServletContextListener; public class TimerListener implements ServletContextListener{ private Timer timer = nu

.net core中使用redis 延迟队列

一.项目场景: 添加任务并设定任务的执行时间,然后按时间顺序由近到远依次执行. 二.思路: 可以利用redis的有序集合(SortedSet),用时间戳排序实现,大概的流程如下. 三.关键思路&代码段 写入任务 使用任务下一次的执行时间按分钟生成key,将同一分钟待执行的任务放到一个key中,这一步主要思考的问题是:拆分队列,设置各自的过期时间,如:过期时间 = 执行时间 + 5分钟,保证过期的队列自动删除,不会造成后续因消费能力不足而导致redis持续膨胀. IDictionary<dou

.NET Core 中基于 IHostedService 实现后台定时任务

.NET Core 2.0 引入了 IHostedService ,基于它可以很方便地执行后台任务,.NET Core 2.1 则锦上添花地提供了 IHostedService 的默认实现基类 BackgroundService ,在这篇随笔中分别用 Web 与 Console 程序体验一下. 首先继承 BackgroundService 实现一个 TimedBackgroundService public class TimedBackgroundService : BackgroundSer

WSL2+Docker部署RabbitMQ以及在Asp.net core 中使用RabbitMQ示例(1)

本文主要在于最近因疫情不能外出,在家研究的一些技术积累. 主要用到的技术以及知识点: WSL 2 WSL 2+Docker Docker+RabbitMQ 在ASP.NET Core中使用RabbitMQ消息队列    一.WSL 2 1.什么是WSL 2? WSL 2就是 适用于Linux的Windows子系统的第二代版本,全称 Windows Subsystem for Linux 2. 2.为什么要使用WSL2? 其实这里使用WSL2目的,纯碎是为了用Docker.以前微软实现的WSL有些

SpringMVC框架使用注解执行定时任务

在项目开发过程中,免不了会有一些定时任务.今天就给大家一个SpringMVC框架中利用注解的方式执行定时任务的示例代码 使用到的JAR文件: 点击下列Jar文件会跳到我的网盘下载 aopalliance-1.0.jarcommons-logging-1.1.3.jarspring-aop-3.2.4.RELEASE.jarspring-beans-3.2.4.RELEASE.jarspring-context-3.2.4.RELEASE.jarspring-core-3.2.4.RELEASE.

EF Core中执行Sql语句查询操作之FromSql,ExecuteSqlCommand,SqlQuery

一.目前EF Core的版本为V2.1 相比较EF Core v1.0 目前已经增加了不少功能. EF Core除了常用的增删改模型操作,Sql语句在不少项目中是不能避免的. 在EF Core中上下文,可以返货DbConnection ,执行sql语句.这是最底层的操作方式,代码写起来还是挺多的. 初次之外 EF Core中还支持 FromSql,ExecuteSqlCommand 连个方法,用于更方便的执行Sql语句. 另外,目前版本的EF Core 不支持SqlQuery,但是我们可以自己扩

[翻译]在 .NET Core 中的并发编程

原文地址:http://www.dotnetcurry.com/dotnet/1360/concurrent-programming-dotnet-core 今天我们购买的每台电脑都有一个多核心的 CPU,允许它并行执行多个指令.操作系统通过将进程调度到不同的内核来发挥这个结构的优点.然而,还可以通过异步 I/O 操作和并行处理来帮助我们提高单个应用程序的性能.在.NET Core中,任务 (tasks) 是并发编程的主要抽象表述,但还有其他支撑类可以使我们的工作更容易. 并发编程 - 异步 v

.Net Core中的通用主机(二)——托管服务

前文介绍了.Net core的通用主机的配置,在基础配置完成后,下一步就是注册我们的后台任务了..net core提供了一个通用的后台服务接口IHostedService,称为托管服务.一个注册托管服务的示例如下: hostBuilder.ConfigureServices((hostContext, services) =>{    services.AddHostedService<LifetimeEventsHostedService>();    services.AddHost

NET Core中怎么使用HttpContext.Current

NET Core中怎么使用HttpContext.Current 阅读目录 一.前言 二.IHttpContextAccessor 三.HttpContextAccessor 回到目录 一.前言 我们都知道,ASP.NET Core作为最新的框架,在MVC5和ASP.NET WebForm的基础上做了大量的重构.如果我们想使用以前版本中的HttpContext.Current的话,目前是不可用的,因为ASP.NET Core中是并没有这个API的. 当然我们也可以通过在Controller中访问