c# 异步任务队列(可选是否使用单线程执行任务,以及自动取消任务)


使用demo,(.net framework 4.0 自行添加async wait 扩展库)

    class Program
    {
        static void Main(string[] args)
        {
            Console.WriteLine("主线程"+Thread.CurrentThread.ManagedThreadId);
            var asyncTaskQueue = new AsyncTaskQueue
            {
                AutoCancelPreviousTask = true, // 自动取消之前的任务
                UseSingleThread = true // 使用单线程执行任务
            };

            // 快速启动20个任务
            for (var i = 1; i < 20; i++)
            {
                Test(asyncTaskQueue, i);
            }
            Console.WriteLine("运行结束");
            Console.ReadKey();
        }

        public static async void Test(AsyncTaskQueue taskQueue, int num)
        {
            var result = await taskQueue.Run(() =>
            {
                // 长时间耗时任务
                Thread.Sleep(5000);
                Console.WriteLine("输入的是" + num);
                return num * 100;
            });
            Console.WriteLine("当前线程" + Thread.CurrentThread.ManagedThreadId + "输出的的" + result);
        }
    }

这里是实现代码

#region summary

//   ------------------------------------------------------------------------------------------------
//   <copyright file="AsyncTaskQueue.cs" >
//     作者:mokeyish
//   </copyright>
//   ------------------------------------------------------------------------------------------------

#endregion

using System;
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Security.Permissions;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;

namespace Test
{
    /// <summary>
    /// 异步任务队列
    /// </summary>
    public class AsyncTaskQueue : IDisposable
    {
        private bool _isDisposed;
        private readonly ConcurrentQueue<AwaitableTask> _queue = new ConcurrentQueue<AwaitableTask>();
        private Thread _thread;
        private AutoResetEvent _autoResetEvent;

        /// <summary>
        /// 异步任务队列
        /// </summary>
        public AsyncTaskQueue()
        {
            _autoResetEvent = new AutoResetEvent(false);
            _thread = new Thread(InternalRuning) {IsBackground = true};
            _thread.Start();
        }

        private bool TryGetNextTask(out AwaitableTask task)
        {
            task = null;
            while (_queue.Count > 0)
            {
                if (_queue.TryDequeue(out task) && (!AutoCancelPreviousTask || _queue.Count == 0)) return true;
                task.Cancel();
            }
            return false;
        }

        private AwaitableTask PenddingTask(AwaitableTask task)
        {
            lock (_queue)
            {
                Debug.Assert(task != null);
                _queue.Enqueue(task);
                _autoResetEvent.Set();
            }
            return task;
        }

        private void InternalRuning()
        {
            while (!_isDisposed)
            {
                if (_queue.Count == 0)
                {
                    _autoResetEvent.WaitOne();
                }
                while (TryGetNextTask(out var task))
                {
                    if (task.IsCancel) continue;

                    if (UseSingleThread)
                    {
                        task.RunSynchronously();
                    }
                    else
                    {
                        task.Start();
                    }
                }
            }
        }

        /// <summary>
        /// 是否使用单线程完成任务.
        /// </summary>
        public bool UseSingleThread { get; set; } = true;

        /// <summary>
        /// 自动取消以前的任务。
        /// </summary>
        public bool AutoCancelPreviousTask { get; set; } = false;

        /// <summary>
        /// 执行任务
        /// </summary>
        /// <param name="action"></param>
        /// <returns></returns>
        public AwaitableTask Run(Action action)
            => PenddingTask(new AwaitableTask(new Task(action, new CancellationToken(false))));

        /// <summary>
        /// 执行任务
        /// </summary>
        /// <typeparam name="TResult"></typeparam>
        /// <param name="function"></param>
        /// <returns></returns>
        public AwaitableTask<TResult> Run<TResult>(Func<TResult> function)
            => (AwaitableTask<TResult>) PenddingTask(new AwaitableTask<TResult>(new Task<TResult>(function)));

        /// <inheritdoc />
        public void Dispose()
        {
            Dispose(true);
            GC.SuppressFinalize(this);
        }

        /// <summary>
        /// 析构任务队列
        /// </summary>
        ~AsyncTaskQueue() => Dispose(false);

        private void Dispose(bool disposing)
        {
            if (_isDisposed) return;
            if (disposing)
            {
                _autoResetEvent.Dispose();
            }
            _thread = null;
            _autoResetEvent = null;
            _isDisposed = true;
        }

        /// <summary>
        /// 可等待的任务
        /// </summary>
        public class AwaitableTask
        {
            private readonly Task _task;

            /// <summary>
            /// 初始化可等待的任务。
            /// </summary>
            /// <param name="task"></param>
            public AwaitableTask(Task task) => _task = task;

            /// <summary>
            /// 任务的Id
            /// </summary>
            public int TaskId => _task.Id;

            /// <summary>
            /// 任务是否取消
            /// </summary>
            public bool IsCancel { get; private set; }

            /// <summary>
            /// 开始任务
            /// </summary>
            public void Start() => _task.Start();

            /// <summary>
            /// 同步执行开始任务
            /// </summary>
            public void RunSynchronously() => _task.RunSynchronously();

            /// <summary>
            /// 取消任务
            /// </summary>
            public void Cancel() => IsCancel = true;

            /// <summary>
            /// 获取任务等待器
            /// </summary>
            /// <returns></returns>
            public TaskAwaiter GetAwaiter() => new TaskAwaiter(this);

            /// <summary>Provides an object that waits for the completion of an asynchronous task. </summary>
            [HostProtection(SecurityAction.LinkDemand, ExternalThreading = true, Synchronization = true)]
            public struct TaskAwaiter : INotifyCompletion
            {
                private readonly AwaitableTask _task;

                /// <summary>
                /// 任务等待器
                /// </summary>
                /// <param name="awaitableTask"></param>
                public TaskAwaiter(AwaitableTask awaitableTask) => _task = awaitableTask;

                /// <summary>
                /// 任务是否完成.
                /// </summary>
                public bool IsCompleted => _task._task.IsCompleted;

                /// <inheritdoc />
                public void OnCompleted(Action continuation)
                {
                    var This = this;
                    _task._task.ContinueWith(t =>
                    {
                        if (!This._task.IsCancel) continuation?.Invoke();
                    });
                }
                /// <summary>
                /// 获取任务结果
                /// </summary>
                public void GetResult() => _task._task.Wait();
            }
        }

        /// <summary>
        /// 可等待的任务
        /// </summary>
        /// <typeparam name="TResult"></typeparam>
        public class AwaitableTask<TResult> : AwaitableTask
        {
            /// <summary>
            /// 初始化可等待的任务
            /// </summary>
            /// <param name="task">需要执行的任务</param>
            public AwaitableTask(Task<TResult> task) : base(task) => _task = task;

            private readonly Task<TResult> _task;

            /// <summary>
            /// 获取任务等待器
            /// </summary>
            /// <returns></returns>
            public new TaskAwaiter GetAwaiter() => new TaskAwaiter(this);

            /// <summary>
            /// 任务等待器
            /// </summary>
            [HostProtection(SecurityAction.LinkDemand, ExternalThreading = true, Synchronization = true)]
            public new struct TaskAwaiter : INotifyCompletion
            {
                private readonly AwaitableTask<TResult> _task;

                /// <summary>
                /// 初始化任务等待器
                /// </summary>
                /// <param name="awaitableTask"></param>
                public TaskAwaiter(AwaitableTask<TResult> awaitableTask) => _task = awaitableTask;

                /// <summary>
                /// 任务是否已完成。
                /// </summary>
                public bool IsCompleted => _task._task.IsCompleted;

                /// <inheritdoc />
                public void OnCompleted(Action continuation)
                {
                    var This = this;
                    _task._task.ContinueWith(t =>
                    {
                        if (!This._task.IsCancel) continuation?.Invoke();
                    });
                }

                /// <summary>
                /// 获取任务结果。
                /// </summary>
                /// <returns></returns>
                public TResult GetResult() => _task._task.Result;
            }
        }
    }
}

时间: 2024-11-14 21:40:27

c# 异步任务队列(可选是否使用单线程执行任务,以及自动取消任务)的相关文章

异步任务队列Celery在Django中的使用

前段时间在Django Web平台开发中,碰到一些请求执行的任务时间较长(几分钟),为了加快用户的响应时间,因此决定采用异步任务的方式在后台执行这些任务.在同事的指引下接触了Celery这个异步任务队列框架,鉴于网上关于Celery和Django结合的文档较少,大部分也只是粗粗介绍了大概的流程,在实践过程中还是遇到了不少坑,希望记录下来帮助有需要的朋友. 一.Django中的异步请求 Django Web中从一个http请求发起,到获得响应返回html页面的流程大致如下:http请求发起 --

在python下比celery更加简单的异步任务队列RQ

前言: 这里介绍一个python下,比celery更加简单的异步工具,真的是很简单,当然他的功能没有celery多,复杂程度也没有celery大,文档貌似也没有celery多,但是为啥会介绍rq这个东西 因为他够简单. 当然他虽然简单,但是也是需要中间人的,也就是 Broker,这里只能是redis了. 他没有celery支持的那么多,比如 redis rabbitmq mongodb mysql之类的. 说回来,咱们用rq,就是看重他的简单. 如对celery有兴趣,可以看看我以前写过的博文.

Django使用Celery异步任务队列

1  Celery简介 Celery是异步任务队列,可以独立于主进程运行,在主进程退出后,也不影响队列中的任务执行. 任务执行异常退出,重新启动后,会继续执行队列中的其他任务,同时可以缓存停止期间接收的工作任务,这个功能依赖于消息队列(MQ.Redis). 1.1  Celery原理 Celery的架构由三部分组成,消息中间件(message broker),任务执行单元(worker)和任务执行结果存储(task result store)组成. 消息中间件:Celery本身不提供消息服务,但

异步/同步,阻塞/非阻塞,单线程/多线程概念梳理

最近看了一文说到异步是单线程的,顿时就傻眼了,对相关概念和同事进行了一些讨论和总结. 文中的描述是以我的理解来说的,可能不一定准确甚至正确,有错欢迎指正. 这三个概念我认为是描述不同的维度的,概念正交. 异步 同步 异步和同步是不同的流程设计风格. 但存在依赖关系的操作之间是同步的,也就是如果操作B依赖操作A的返回,那么B必须要在A结束后才能执行. 比如你要读取文件然后对文件内容进行处理,那么读取内容和处理内容就是同步的. 而异步这是操作间没有依赖关系,或者先后顺序并不重要. 比如用户登陆要给登

高性能网站优化-确保异步加载脚本时保持执行顺序

<高性能网站建设进阶指南> 脚本如果按照常规方式加载,不仅会阻塞页面中其他内容的下载,还会阻塞脚本后面所有元素的渲染.异步加载脚本可以避免这种阻塞现象,从而提高页面加载速度.但是性能的提升是要付出代价的.代码的异步执行可能会出现竞争状态.简单地说就是页面内部的脚本需要的标示符如果是在外部文件中定义的,而当外部文件异步加载的时候,如果没有保证外部文件和内部脚本执行顺序,很有可能会出现未定义标示符的错误 当异步加载的外部脚本与行内脚本之间存在代码依赖时,就需要通过一种保证执行顺序的方法来整合这两个

Orleans的单线程执行模型

Orleans在默认情况下只创建一个grain的实例,并以单线程模型执行.如果同一个grain实例,在Orleans存在多个实例,就会产生并发冲突,单线程执行模型就可以完全避免并发冲突了. 但在特殊场景下,有些实例是需要创建多个实例或者以非单线程的执行方式来满足性能的需要; 如何支持创建多个实例 对于了解负载均衡的人,如果web服务器支持无状态(分布式Sesson或者cookie身份识别),会很容易做负载.同样的,对于grain来说,如果是无状态的,那么在系统中创建任意多的实例都是一样的,不存在

Celery异步任务队列/周期任务+ RabbitMQ + Django

一.Celery介绍和基本使用  Celery 是一个 基于python开发的分布式异步消息任务队列,通过它可以轻松的实现任务的异步处理, 如果你的业务场景中需要用到异步任务,就可以考虑使用celery, 举几个实例场景中可用的例子: 你想对100台机器执行一条批量命令,可能会花很长时间 ,但你不想让你的程序等着结果返回,而是给你返回 一个任务ID,你过一段时间只需要拿着这个任务id就可以拿到任务执行结果, 在任务执行ing进行时,你可以继续做其它的事情. 你想做一个定时任务,比如每天检测一下你

cerely-分布式异步任务队列

Celery 是一个强大的 分布式任务队列 的 异步处理框架,它可以让任务的执行完全脱离主程序,甚至可以被分配到其他主机上运行.我们通常使用它来实现异步任务(async task)和定时任务(crontab). 在Celery中几个基本的概念,需要先了解下,不然不知道为什么要安装下面的东西.概念:Broker.Backend. broker broker是一个消息传输的中间件或消息队列,可以理解为一个邮箱. 每当应用程序调用celery的异步任务的时候,会向broker传递消息,而后celery

JS高阶---为什么说JS是单线程执行的???

大纲: [主体] (1)如何证明JS运行是单线程的? 上述代码执行顺序结果为:fn()→timeout 1111→timeout 2222 接下来对上述代码做下修改 执行结果如右图所示 接下来点击确认,关闭弹框,再往后1s执行timeoout 1111,再过1s执行timeout 2222 从这里可以看出,alert暂停了计时,点击确定后才会恢复程序执行和计时 最后总结如下 (2)为什么JS是单线程模式? . 原文地址:https://www.cnblogs.com/jianxian/p/119