C# Task 源代码(2)

上篇已经讲到Task 的默认的TaskScheduler 为ThreadPoolTaskScheduler.

这时我们回到原来的task 的start方法,在代码最后,调用了 ScheduleAndStart(true) 这个方法。接着看这个方法

 [SecuritySafeCritical] // Needed for QueueTask
        internal void ScheduleAndStart(bool needsProtection)
        {
            Contract.Assert(m_taskScheduler != null, "expected a task scheduler to have been selected");
            Contract.Assert((m_stateFlags & TASK_STATE_STARTED) == 0, "task has already started");

            // Set the TASK_STATE_STARTED bit
            if (needsProtection)
            {
                if (!MarkStarted())
                {
                    // A cancel has snuck in before we could get started.  Quietly exit.
                    return;
                }
            }
            else
            {
                m_stateFlags |= TASK_STATE_STARTED;
            }

            if (s_asyncDebuggingEnabled)
            {
                AddToActiveTasks(this);
            }

            if (AsyncCausalityTracer.LoggingOn && (Options & (TaskCreationOptions)InternalTaskOptions.ContinuationTask) == 0)
            {
                //For all other task than TaskContinuations we want to log. TaskContinuations log in their constructor
                AsyncCausalityTracer.TraceOperationCreation(CausalityTraceLevel.Required, this.Id, "Task: "+((Delegate)m_action).Method.Name, 0);
            }

            try
            {
                // Queue to the indicated scheduler.
                m_taskScheduler.InternalQueueTask(this);
            }
            catch (ThreadAbortException tae)
            {
                AddException(tae);
                FinishThreadAbortedTask(true, false);
            }
            catch (Exception e)
            {
                // The scheduler had a problem queueing this task.  Record the exception, leaving this task in
                // a Faulted state.
                TaskSchedulerException tse = new TaskSchedulerException(e);
                AddException(tse);
                Finish(false);

                // Now we need to mark ourselves as "handled" to avoid crashing the finalizer thread if we are called from StartNew()
                // or from the self replicating logic, because in both cases the exception is either propagated outside directly, or added
                // to an enclosing parent. However we won‘t do this for continuation tasks, because in that case we internally eat the exception
                // and therefore we need to make sure the user does later observe it explicitly or see it on the finalizer.

                if ((Options & (TaskCreationOptions)InternalTaskOptions.ContinuationTask) == 0)
                {
                    // m_contingentProperties.m_exceptionsHolder *should* already exist after AddException()
                    Contract.Assert(
                        (m_contingentProperties != null) &&
                        (m_contingentProperties.m_exceptionsHolder != null) &&
                        (m_contingentProperties.m_exceptionsHolder.ContainsFaultList),
                            "Task.ScheduleAndStart(): Expected m_contingentProperties.m_exceptionsHolder to exist " +
                            "and to have faults recorded.");

                    m_contingentProperties.m_exceptionsHolder.MarkAsHandled(false);
                }
                // re-throw the exception wrapped as a TaskSchedulerException.
                throw tse;
            }
        }

开始先做契约参数认证,接着保护数值判断。我们要看的是AddToActiveTasks(this)这个方法,注意在他之前有个判断,在s_asyncDebuggingEnabled 为true 的情况才会执行,当然默认的是false。

  // This dictonary relates the task id, from an operation id located in the Async Causality log to the actual
        // task. This is to be used by the debugger ONLY. Task in this dictionary represent current active tasks.
        private static readonly Dictionary<int, Task> s_currentActiveTasks = new Dictionary<int, Task>();
 [FriendAccessAllowed]
        internal static bool AddToActiveTasks(Task task)
        {
            Contract.Requires(task != null, "Null Task objects can‘t be added to the ActiveTasks collection");
            lock (s_activeTasksLock)
            {
                s_currentActiveTasks[task.Id] = task;
            }
            //always return true to keep signature as bool for backwards compatibility
            return true;
        }

这个就是僵我们要执行task 对象放入一个字典中,放入的目的是做什么呢?当然就是为何方便查询和管理。这个方法在正常流程是不会执行的。这里觉得有些奇怪的写法,Task 类里面有个静态静态字典,用于存放自己执行的类集合。当然说到管理和查询,断然我是不会放在这个类,令起新类也好。

这里的代码方法参数验证都是采用契约验证,其实我个人并不是很赞同这东西,虽然C++也有这个。我倒更希望是原本的异常抛出,或者日志记录,或者其他自定义方式。

接着看核心方法 m_taskScheduler.InternalQueueTask(this); 前面我们已经看到默认的m_taskScheduler为ThreadPoolTaskScheduler。接着看代码

 [SecurityCritical]
        internal void InternalQueueTask(Task task)
        {
            Contract.Requires(task != null);

            task.FireTaskScheduledIfNeeded(this);

            this.QueueTask(task);
        }
 [SecurityCritical]
        protected internal override void QueueTask(Task task)
        {
            if ((task.Options & TaskCreationOptions.LongRunning) != 0)
            {
                // Run LongRunning tasks on their own dedicated thread.
                Thread thread = new Thread(s_longRunningThreadWork);
                thread.IsBackground = true; // Keep this thread from blocking process shutdown
                thread.Start(task);
            }
            else
            {
                // Normal handling for non-LongRunning tasks.
                bool forceToGlobalQueue = ((task.Options & TaskCreationOptions.PreferFairness) != 0);
                ThreadPool.UnsafeQueueCustomWorkItem(task, forceToGlobalQueue);
            }
        }

现在为止就开始清晰明朗了,看到QueueTask 方法,我已经可以看到task 对象已经传到Threadpool 里面了。至此,可以说到task 一般都是在ThreadPool 里面运行。接着我们再看ThreadpoolTaskScheduler让几个重要的方法

  [SecurityCritical]
        protected internal override bool TryDequeue(Task task)
        {
            // just delegate to TP
            return ThreadPool.TryPopCustomWorkItem(task);
        }

        [SecurityCritical]
        protected override IEnumerable<Task> GetScheduledTasks()
        {
            return FilterTasksFromWorkItems(ThreadPool.GetQueuedWorkItems());
        }

        /// <summary>
        /// This internal function will do this:
        ///   (1) If the task had previously been queued, attempt to pop it and return false if that fails.
        ///   (2) Propagate the return value from Task.ExecuteEntry() back to the caller.
        ///
        /// IMPORTANT NOTE: TryExecuteTaskInline will NOT throw task exceptions itself. Any wait code path using this function needs
        /// to account for exceptions that need to be propagated, and throw themselves accordingly.
        /// </summary>
        [SecurityCritical]
        protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
        {

        -----------------------
        }
这里有GetScheduledTasks()方法,这个方法就是用来获得当前的Task的,对于去珍断task 的运行状态非常有帮助。至此。我们一步一步看到Task 是如何运行的,当然到达Theadpool可以继续看下去。注意了ThreadPoolTaskScheduler 的访问修饰符是internal sealed,所以在用task 的时候无法用到他,还有里面的方法访问修饰符都是protected 的。到此,我们正常来运行task,还是没法获得到task的本身运行状态。很多人在代码中为了实现某个功能都会大量的使用task,每个人的写法有不一样,task 运行是否成功,是否发生异常 对于整个项目的运行至关重要。那么如何管理,如何查看task 的运行状态呢,在C# code 我们如果想把task 的异常接管到主线程种,必须task wait,但是很多task 都是无需直到返回结果,但是实际上我们还是要关心他的运行状态,那么如何来做,如何来看呢。1.常规做法,鉴于很多人喜欢TaskFactory.StartNew() 这个写法,所以想把所有的task的加入到一个队列中比较麻烦,因为启动task 的写法很多。所以各自的task的里面自己处理异常,写好日志。2.使用TaskScheduler,看代码的目的除了了解运行过程,更加了解如和使用这个类,我们只需要写上自己的TaskScheduler,当然继承这个类,是需要实现某些必须方法的,不管是task的start还是TaskFactory的StartNew方法,我们都可以注入自己的TaskScheduler,这样正如TaskScheduler设计初衷一样,所有的task 运行都会交给他来管理,默认的ThreadPoolTaskScheduler是没法使用的(访问修饰符),除非采用一些其他手段,这里不多介绍。所以只能自己重新去实现这个类的相关细节。
时间: 2024-08-26 01:03:35

C# Task 源代码(2)的相关文章

【Spark Core】任务运行机制和Task源代码浅析1

引言 上一小节<TaskScheduler源代码与任务提交原理浅析2>介绍了Driver側将Stage进行划分.依据Executor闲置情况分发任务,终于通过DriverActor向executorActor发送任务消息. 我们要了解Executor的运行机制首先要了解Executor在Driver側的注冊过程.这篇文章先了解一下Application和Executor的注冊过程. 1. Task类及其相关 1.1 Task类 Spark将由Executor运行的Task分为ShuffleMa

C# Task 源代码阅读(1)

平时我们开发中,经常使用Task,后续的.net版本种很多都和Task有关,比如asyn,await有了Task 我们很少就去关注Thread 了.Task 给我们带来了很多的便利之处.是我们更少的去关注执行的历程,更多的去关注逻辑.但是有些时候,有些应用.又不得不考虑task 的运行状况,比如这个任务成功与否,是否发生异常.经常听别人说到task 是在线程池执行的,那我们今天就来看看task 到底在做什么了,他执行的时候又做些哪些工作. 大家可以从这里可以看到Task 的源代码,也可以从ref

celery(一) application

Application application celery在使用之前,必须首先实例化.e.g. app = Celery() app 是线程安全的,即:不同配置.组件和任务的多个app可以共存在同一个进程空间. 任务注册表(task-registry) 在Celery中发送一个task 消息,这个消息并不包含任何源代码(函数体).而是只有你所期望执行的task的名字.每个worker有一个任务注册表(task-registry),它是task 名称与 task 源代码(函数)的映射.每当你定义

Spark源代码分析之六:Task调度(二)

话说在<Spark源代码分析之五:Task调度(一)>一文中,我们对Task调度分析到了DriverEndpoint的makeOffers()方法.这种方法针对接收到的ReviveOffers事件进行处理.代码例如以下: // Make fake resource offers on all executors     // 在全部的executors上提供假的资源(抽象的资源.也就是资源的对象信息,我是这么理解的)     private def makeOffers() {       /

Spark技术内幕: Task向Executor提交的源代码解析

在上文<Spark技术内幕:Stage划分及提交源代码分析>中,我们分析了Stage的生成和提交.可是Stage的提交,仅仅是DAGScheduler完毕了对DAG的划分,生成了一个计算拓扑,即须要依照顺序计算的Stage,Stage中包括了能够以partition为单位并行计算的Task.我们并没有分析Stage中得Task是怎样生成而且终于提交到Executor中去的. 这就是本文的主题. 从org.apache.spark.scheduler.DAGScheduler#submitMis

Hadoop源代码分析(Task的内部类和辅助类)

从前面的图中,我们可以发现Task有很多内部类,并拥有大量类成员变量,这些类配合Task完成相关的工作,如下图. MapOutputFile管理着Mapper的输出文件,它提供了一系列get方法,用于获取Mapper需要的各种文件,这些文件都存放在一个目录下面.我们假设传入MapOutputFile的JobID为job_200707121733_0003,TaskID为task_200707121733_0003_m_000005.MapOutputFile的根为{mapred.local.di

YYAsyncLayer源代码解析

前言 简书地址:http://www.jianshu.com/p/a5baa43b71c8 本文的中文注释代码demo更新在我的github上. 在研究iOS UI性能优化上,异步绘制一直是一个离不开的话题.最近在研究Facebook的开源框架AsyncDisplayKit的时候,找到了YYKit作者所实现的YYAsyncLayer.从这个项目了解异步绘制的方法. 项目结构 YYAsyncLayer项目较为简单,一共就三个文件: YYSentinel:线程安全的计数器. YYTransactio

Swoole源代码学习记录(十三)——Server模块具体解释(上)

Swoole版本号:1.7.5-stable Github地址:https://github.com/LinkedDestiny/swoole-src-analysis 最终能够正式进入Server.c模块了-- 在之前的分析中,能够看到非常多相关模块的声明都已经写在了Server.h中,就是由于这些模块构成了Server的核心部分.而Server本身,则是一个最上层的对象,它包含了核心的Reactor和Factory模块,存放了消息队列的key值,控制着所有的Connection.所有PHP层

线程池ThreadPoolExecutor、Executors参数详解与源代码分析

欢迎探讨,如有错误敬请指正 如需转载,请注明出处 http://www.cnblogs.com/nullzx/ 1. ThreadPoolExecutor数据成员 Private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING,0)); ctl主要用于存储线程池的工作状态以及池中正在运行的线程数.显然要在一个整型变量存储两个数据,只能将其一分为二.其中高3bit用于存储线程池的状态,低位的29bit用于存储正在运行的线程数. 线