//-------------------------------------------------------------------------- // // Copyright (c) BUSHUOSX. All rights reserved. // // File: AsyncTaskManager.cs // // Version:1.0.0.0 // // Datetime:20170812 // //-------------------------------------------------------------------------- using System; using System.Collections.Generic; using System.Diagnostics; using System.Threading; using System.Threading.Tasks; namespace BUSHUOSX { class AsyncTaskManager { /// <summary> /// 缓存的任务队列 /// </summary> readonly Queue<Task> _taskQueue = new Queue<Task>(); /// <summary> /// 工作锁,保护_taskQueue /// </summary> SpinLock _workLock; ///// <summary> ///// 入队锁 ///// </summary> //SpinLock _taskEnQueueLock; ///// <summary> ///// 出队锁 ///// </summary> //SpinLock _taskDeQueueLock; /// <summary> /// 工作信号,与MaxConcurrencyLevel控制并行量 /// </summary> SemaphoreSlim _workSemaphore; /// <summary> /// 工作线程取消标志 /// </summary> CancellationTokenSource ctsCancel; /// <summary> /// 工作线程 /// </summary> Task _worker; /// <summary> /// 工作器状态 /// </summary> private bool IsWorking { get; set; } /// <summary> /// 任务最大并发量 /// </summary> public int MaxConcurrencyLevel { get; } /// <summary> /// 内部工作器将在队列中有任务时自动启动。否则由Start方法启动。 /// </summary> public bool AutoRunWorker { get; } /// <summary> /// 工作器每一次清空队列任务,都调用 /// </summary> private Action<Task> _callbackOnAllTaskComplited; /// <summary> /// 队列中的任务任务完成时,都将调用 /// </summary> private Action<Task> _callbackOnAnyTaskComplited; /// <summary> /// 控制异步任务的并发量。 /// 注意:只能严格控制stauts为Created的任务 /// </summary> /// <param name="maxConcurrencyLevel">最大并发数</param> /// <param name="callbackOnAnyTaskComplited">如果不为null,则队列中的任何任务完成时都将传递给此回调</param> /// <param name="callbackOnAllTaskComplited">如果不为null,则内部队列为空时传递工作器给此回调</param> /// <param name="autoRunWorker">指示内部工作器将在队列中有任务时自动启动,还是由Start方法启动。</param> public AsyncTaskManager(int maxConcurrencyLevel, Action<Task> callbackOnAnyTaskComplited = null, Action<Task> callbackOnAllTaskComplited = null, bool autoRunWorker = true) { _callbackOnAnyTaskComplited = callbackOnAnyTaskComplited; _callbackOnAllTaskComplited = callbackOnAllTaskComplited; AutoRunWorker = autoRunWorker; MaxConcurrencyLevel = maxConcurrencyLevel < 0 ? int.MaxValue : maxConcurrencyLevel; } /// <summary> /// 排入一个任务到内部队列,该队列中的任务将被依次调用。 /// </summary> /// <param name="task">要排队的任务。注意:只能严格控制stauts为Created的任务</param> /// <param name="callbackOnTaskComplited">此任务完成时回调</param> public void QueueTask(Task task, Action<Task> callbackOnTaskComplited = null) { if (task == null) return; if (null == callbackOnTaskComplited) { EnqueueTask(task); } else { EnqueueTask(task.ContinueWith(callbackOnTaskComplited)); } if (AutoRunWorker) { notifyStartWork(); } } //public void QueueTask(IEnumerable<Task> tasks, Action<Task> callbackOnTaskComplited = null) //{ // foreach (var item in tasks) // { // if (item == null) break; // if (null == callbackOnTaskComplited) // { // EnqueueTask(item); // } // else // { // EnqueueTask(item.ContinueWith(callbackOnTaskComplited)); // } // } // if (AutoRunWorker) // { // notifyStartWork(); // } //} /// <summary> /// 返回此刻队列中的任务。 /// </summary> /// <returns></returns> public Task[] GetQueueTask() { bool gotlock = false; try { _workLock.Enter(ref gotlock); if (_taskQueue.Count > 0) { return _taskQueue.ToArray(); } else { return null; } } finally { _workLock.Exit(); } } /// <summary> /// 启动内部工作器。 /// 注意:为降低资源占用,该工作器在内部队列为空时会自动退出。 /// </summary> public void Start() { notifyStartWork(); } /// <summary> /// 挂起队列中剩余的任务。稍后可以使用Continue方法继续。 /// </summary> public void Suspend() { stopWorkThread(false); } /// <summary> /// 停止工作器,并清空内部任务队列还未调用的任务。 /// 已调用的任务还将继续运行。 /// </summary> public void Cancel() { stopWorkThread(true); } private void stopWorkThread(bool clearTasks) { if (IsWorking) { ctsCancel.Cancel(); if (clearTasks) { bool gotlock = false; try { _workLock.Enter(ref gotlock); _taskQueue.Clear(); } finally { if (gotlock) { _workLock.Exit(); } } } } } /// <summary> /// 继续之前挂起的任务。 /// </summary> public void Continue() { notifyStartWork(); } /// <summary> /// 内部启动工作器 /// </summary> private void notifyStartWork() { if (IsWorking) return; //初始化 ctsCancel = new CancellationTokenSource(); //_taskDeQueueLock = new SpinLock(); //_taskEnQueueLock = new SpinLock(); _workLock = new SpinLock(); _workSemaphore = new SemaphoreSlim(MaxConcurrencyLevel, MaxConcurrencyLevel); //_worker = Task.Run(new Action(workerThread), ctsStop.Token); _worker = Task.Run(new Action(workerThread)); _worker.ContinueWith((t) => { notifyEndWork(); }); IsWorking = true; } /// <summary> /// 工作器结束时调用 /// </summary> private void notifyEndWork() { if (IsWorking) { //ctsCancel = null; _callbackOnAllTaskComplited?.Invoke(_worker); IsWorking = false; Debug.WriteLine("工作线程结束……"); } } /// <summary> /// 任务完成时调用 /// </summary> /// <param name="task"></param> private void anyTaskComplited(Task task) { _workSemaphore.Release(); //todo task _callbackOnAnyTaskComplited?.Invoke(task); //Debug.WriteLine("完成任务{0}:{1}", task.Id, task.Status.ToString()); } /// <summary> /// 工作器线程执行方法。只应存在一个。 /// </summary> private void workerThread() { Debug.WriteLine("工作线程启动……"); Task tmp = null; while (true) { try { _workSemaphore.Wait(ctsCancel.Token); } catch (OperationCanceledException e) { break; } tmp = DequeueTask(); if (tmp != null) { if (tmp.Status == TaskStatus.Created) { tmp.Start(); } tmp.ContinueWith(anyTaskComplited); } else { Debug.WriteLine("workerAsync:null taskQueue"); break; } } } /// <summary> /// 排入任务,期望线程安全 /// </summary> /// <param name="task"></param> private void EnqueueTask(Task task) { bool gotlock = false; try { _workLock.Enter(ref gotlock); _taskQueue.Enqueue(task); } finally { if (gotlock) _workLock.Exit(); } } /// <summary> /// 弹出任务,期望线程安全 /// </summary> /// <returns></returns> private Task DequeueTask() { bool gotlock = false; try { _workLock.Enter(ref gotlock); if (_taskQueue.Count > 0) { return _taskQueue.Dequeue(); } else { return null; } } finally { if (gotlock) _workLock.Exit(); } } } }
时间: 2024-10-09 21:29:20