并发任务管理器AsyncTaskManager

//--------------------------------------------------------------------------
//
//  Copyright (c) BUSHUOSX.  All rights reserved.
//
//  File: AsyncTaskManager.cs
//
//  Version:1.0.0.0
//
//  Datetime:20170812
//
//-------------------------------------------------------------------------- 

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;

namespace BUSHUOSX
{
    class AsyncTaskManager
    {
        /// <summary>
        /// 缓存的任务队列
        /// </summary>
        readonly Queue<Task> _taskQueue = new Queue<Task>();

        /// <summary>
        /// 工作锁,保护_taskQueue
        /// </summary>
        SpinLock _workLock;

        ///// <summary>
        ///// 入队锁
        ///// </summary>
        //SpinLock _taskEnQueueLock;
        ///// <summary>
        ///// 出队锁
        ///// </summary>
        //SpinLock _taskDeQueueLock;

        /// <summary>
        /// 工作信号,与MaxConcurrencyLevel控制并行量
        /// </summary>
        SemaphoreSlim _workSemaphore;

        /// <summary>
        /// 工作线程取消标志
        /// </summary>
        CancellationTokenSource ctsCancel;
        /// <summary>
        /// 工作线程
        /// </summary>
        Task _worker;

        /// <summary>
        /// 工作器状态
        /// </summary>
        private bool IsWorking { get; set; }

        /// <summary>
        /// 任务最大并发量
        /// </summary>
        public int MaxConcurrencyLevel { get; }

        /// <summary>
        /// 内部工作器将在队列中有任务时自动启动。否则由Start方法启动。
        /// </summary>
        public bool AutoRunWorker { get; }

        /// <summary>
        /// 工作器每一次清空队列任务,都调用
        /// </summary>
        private Action<Task> _callbackOnAllTaskComplited;

        /// <summary>
        /// 队列中的任务任务完成时,都将调用
        /// </summary>
        private Action<Task> _callbackOnAnyTaskComplited;

        /// <summary>
        /// 控制异步任务的并发量。
        /// 注意:只能严格控制stauts为Created的任务
        /// </summary>
        /// <param name="maxConcurrencyLevel">最大并发数</param>
        /// <param name="callbackOnAnyTaskComplited">如果不为null,则队列中的任何任务完成时都将传递给此回调</param>
        /// <param name="callbackOnAllTaskComplited">如果不为null,则内部队列为空时传递工作器给此回调</param>
        /// <param name="autoRunWorker">指示内部工作器将在队列中有任务时自动启动,还是由Start方法启动。</param>
        public AsyncTaskManager(int maxConcurrencyLevel, Action<Task> callbackOnAnyTaskComplited = null, Action<Task> callbackOnAllTaskComplited = null, bool autoRunWorker = true)
        {
            _callbackOnAnyTaskComplited = callbackOnAnyTaskComplited;
            _callbackOnAllTaskComplited = callbackOnAllTaskComplited;
            AutoRunWorker = autoRunWorker;
            MaxConcurrencyLevel = maxConcurrencyLevel < 0 ? int.MaxValue : maxConcurrencyLevel;
        }

        /// <summary>
        /// 排入一个任务到内部队列,该队列中的任务将被依次调用。
        /// </summary>
        /// <param name="task">要排队的任务。注意:只能严格控制stauts为Created的任务</param>
        /// <param name="callbackOnTaskComplited">此任务完成时回调</param>
        public void QueueTask(Task task, Action<Task> callbackOnTaskComplited = null)
        {
            if (task == null) return;
            if (null == callbackOnTaskComplited)
            {
                EnqueueTask(task);
            }
            else
            {
                EnqueueTask(task.ContinueWith(callbackOnTaskComplited));
            }
            if (AutoRunWorker)
            {
                notifyStartWork();
            }
        }

        //public void QueueTask(IEnumerable<Task> tasks, Action<Task> callbackOnTaskComplited = null)
        //{
        //    foreach (var item in tasks)
        //    {
        //        if (item == null) break;
        //        if (null == callbackOnTaskComplited)
        //        {
        //            EnqueueTask(item);
        //        }
        //        else
        //        {
        //            EnqueueTask(item.ContinueWith(callbackOnTaskComplited));
        //        }
        //    }
        //    if (AutoRunWorker)
        //    {
        //        notifyStartWork();
        //    }
        //}

        /// <summary>
        /// 返回此刻队列中的任务。
        /// </summary>
        /// <returns></returns>
        public Task[] GetQueueTask()
        {
            bool gotlock = false;
            try
            {
                _workLock.Enter(ref gotlock);
                if (_taskQueue.Count > 0)
                {
                    return _taskQueue.ToArray();
                }
                else
                {
                    return null;
                }
            }
            finally
            {
                _workLock.Exit();
            }
        }

        /// <summary>
        /// 启动内部工作器。
        /// 注意:为降低资源占用,该工作器在内部队列为空时会自动退出。
        /// </summary>
        public void Start()
        {
            notifyStartWork();
        }

        /// <summary>
        /// 挂起队列中剩余的任务。稍后可以使用Continue方法继续。
        /// </summary>
        public void Suspend()
        {
            stopWorkThread(false);
        }

        /// <summary>
        /// 停止工作器,并清空内部任务队列还未调用的任务。
        /// 已调用的任务还将继续运行。
        /// </summary>
        public void Cancel()
        {
            stopWorkThread(true);
        }

        private void stopWorkThread(bool clearTasks)
        {
            if (IsWorking)
            {
                ctsCancel.Cancel();
                if (clearTasks)
                {
                    bool gotlock = false;
                    try
                    {
                        _workLock.Enter(ref gotlock);
                        _taskQueue.Clear();
                    }
                    finally
                    {
                        if (gotlock)
                        {
                            _workLock.Exit();
                        }
                    }
                }
            }
        }

        /// <summary>
        /// 继续之前挂起的任务。
        /// </summary>
        public void Continue()
        {
            notifyStartWork();
        }

        /// <summary>
        /// 内部启动工作器
        /// </summary>
        private void notifyStartWork()
        {
            if (IsWorking) return;

            //初始化
            ctsCancel = new CancellationTokenSource();
            //_taskDeQueueLock = new SpinLock();
            //_taskEnQueueLock = new SpinLock();
            _workLock = new SpinLock();
            _workSemaphore = new SemaphoreSlim(MaxConcurrencyLevel, MaxConcurrencyLevel);

            //_worker = Task.Run(new Action(workerThread), ctsStop.Token);
            _worker = Task.Run(new Action(workerThread));
            _worker.ContinueWith((t) => { notifyEndWork(); });

            IsWorking = true;
        }

        /// <summary>
        /// 工作器结束时调用
        /// </summary>
        private void notifyEndWork()
        {
            if (IsWorking)
            {
                //ctsCancel = null;

                _callbackOnAllTaskComplited?.Invoke(_worker);

                IsWorking = false;
                Debug.WriteLine("工作线程结束……");
            }
        }

        /// <summary>
        /// 任务完成时调用
        /// </summary>
        /// <param name="task"></param>
        private void anyTaskComplited(Task task)
        {
            _workSemaphore.Release();
            //todo task
            _callbackOnAnyTaskComplited?.Invoke(task);
            //Debug.WriteLine("完成任务{0}:{1}", task.Id, task.Status.ToString());
        }

        /// <summary>
        /// 工作器线程执行方法。只应存在一个。
        /// </summary>
        private void workerThread()
        {
            Debug.WriteLine("工作线程启动……");

            Task tmp = null;
            while (true)
            {
                try
                {
                    _workSemaphore.Wait(ctsCancel.Token);
                }
                catch (OperationCanceledException e)
                {
                    break;
                }

                tmp = DequeueTask();
                if (tmp != null)
                {
                    if (tmp.Status == TaskStatus.Created)
                    {
                        tmp.Start();
                    }
                    tmp.ContinueWith(anyTaskComplited);
                }
                else
                {
                    Debug.WriteLine("workerAsync:null taskQueue");
                    break;
                }
            }
        }

        /// <summary>
        /// 排入任务,期望线程安全
        /// </summary>
        /// <param name="task"></param>
        private void EnqueueTask(Task task)
        {
            bool gotlock = false;
            try
            {
                _workLock.Enter(ref gotlock);
                _taskQueue.Enqueue(task);
            }
            finally
            {
                if (gotlock) _workLock.Exit();
            }

        }

        /// <summary>
        /// 弹出任务,期望线程安全
        /// </summary>
        /// <returns></returns>
        private Task DequeueTask()
        {
            bool gotlock = false;
            try
            {
                _workLock.Enter(ref gotlock);
                if (_taskQueue.Count > 0)
                {
                    return _taskQueue.Dequeue();
                }
                else
                {
                    return null;
                }
            }
            finally
            {
                if (gotlock) _workLock.Exit();
            }
        }
    }
}
时间: 2024-10-09 21:29:20

并发任务管理器AsyncTaskManager的相关文章

IIS处理并发请求时出现的问题及解决

原文链接:http://www.cnblogs.com/hgamezoom/p/3082538.html 一个ASP.NET项目在部署到生产环境时,当用户并发量达到200左右时,IIS出现了明显的请求排队现象,发送的请求都进入等待,无法及时响应,系统基本处于不可用状态.因经验不足,花了很多时间精力解决这个问题,本文记录了我查找问题的过程和最后解决方案,供大家参考. 软硬件环境: IBM刀片服务器,Intel至强处理器,4物理核,16个逻辑核心,内存32G Windows Server2008 E

走近并发编程之一 进程和线程

并发与并行,进程与线程不仅是操作系统中及其重要的概念,也是并发编程入门 必须要理解的核心知识. 什么是并发?并发与并行的区别 顺序编程:程序中的所有事物在任意时刻都只能执行一个步骤 并发:在同一时间段内,需要处理多个任务,而在每个时间点又只能处理一个,这就是并发. 假设我们要把多个任务分配给处理机,如果这台机器有多个处理器,显然可以同时执行这些任务,这就是并行. 不同于并行,并发的目的旨在最大限度的提高程序在单处理器上的效率.前者是在物理上的同时发生,而并发是在逻辑上的同时发生.如图,如果要在同

python并发编程之多进程

python并发编程之多进程 一.什么是进程 进程:正在进行的一个过程或者一个任务,执行任务的是CPU. 原理:单核加多道技术 二.进程与程序的区别 进程是指程序的运行过程 需要强调的是:同一个程序执行两次是两个进程,比如打开暴风影音,虽然都是同一个软件,但是一个可以播放苍井空,另一个可以播放武藤兰. 三.并发与并行 无论是并行还是并发,在用户看来都是'同时'运行的,不管是进程还是线程,都只是一个任务而已,真是干活的是cpu,cpu来做这些任务,而一个cpu同一时刻只能执行一个任务. (1)并发

Java并发编程:进程和线程

.title { text-align: center } .todo { font-family: monospace; color: red } .done { color: green } .tag { background-color: #eee; font-family: monospace; padding: 2px; font-size: 80%; font-weight: normal } .timestamp { color: #bebebe } .timestamp-kwd

Visual Studio并发Qpar优化效果

IOCP客户端的connect线程FOR循环中添加强制并行,1万/S并发connect+send+recv+close,任务管理器使用从60%降到20%. Visual Studio性能监控CPU使用率,StartTest线程使用率从27%降到1.6%,IOCP消息处理的Work线程使用率从30%提升至60%.

Java多线程并发编程

Thread和Runnable Runnable接口可以避免继承自Thread类的单继承的局限性. Runnable的代码可以被多个线程(Thread的实例)所共享,适合于多个线程共享资源(其实就是持有同一个runnable实例)的情况. 以火车站买票为例,分别以继承Thread类和实现Runnable接口这两种方式来模拟3个线程卖5张票: 使用Thread类模拟卖票 1 class MyThread extends Thread{ 2 3 private int ticketCount = 5

如何配置IIS处理多并发请求及存在的问题

很多时候多线程能快速高效独立的计算数据,应用比较多. 但今天遇到的多进程下的问题更是让人觉得复杂 多进程下static变量都要失效,就目前的平台和产品static使用是很多的,各种session.cache等,完全不适合多进程. 分布式系统之间不能相互使用进程内的变量,必须使用分布式缓存之类的远程容器,否则无法做到跨进程. 同样的Application变量也没法使用,必须做进程间通信. 分布式系统比普通系统复杂得多的,支持几千人在线的系统和支持数十万人在线的系统的架构是不同的. so,面对如此多

mmysql-最大链接数和最大并发数的区别

关于连接数和并发数的设置(针对Innodb引擎) 对于机器本身来说,进程数是说机器正在运行的进程数量,调出任务管理器就可以看到.连接数是指进程接收和发送数据的连接ip的数量.并发数是指进程同时发送数据到各个ip线程的数量. 对于mysql来说,连接数可以my.cnf或者my.ini中通过max_connections 设置.并发数可以通过innodb_thread_concurrency来设置. 我们查看MAX_Connections使用命令show VARIABLES like 'max_co

Java并发编程:线程、进程的创建

首先要理清下进程.线程和应用程序概念. 从一定意义上讲,进程就是一个应用程序在处理机上的一次执行过程,它是一个动态的概念,而线程是进程中的一部分,进程包含多个线程在运行. a. 进程是一个具有独立功能的程序关于某个数据集合的一次运行活动.它可以申请和拥有系统资源,是一个动态的概念,是一个活动的实体.它不只是程序的代码,还包括当前的活动,通过程序计数器的值和处理寄存器的内容来表示. b. 进程是一个"执行中的程序".程序是一个没有生命的实体,只有处理器赋予程序生命时,它才能成为一个活动的