CLR 异步函数

CLR - I/O限制的异步操作

windows是如何执行I/O操作的?计算机的每个模块都有自己的微型处理器,当写文件到磁盘中时,操作系统将写文件的任务交给磁盘的处理单元就可以做其他的了。还有需要TCP/IP 与另一台电脑通信时,系统只要将发送的数据写入TCP的缓存区就可以做其他的了,发送数据由网卡处理单元完成。

但是这些模块的处理单元的计算能力远不如CPU快,如果将CPU的计算资源总是和这些模块的处理单元同步的话,就会影响应用程序的性能,给用户带来不好的体验。这一节就讲解如何执行计算限制的操作,利用线程池在多个CPU 内核上调度任务,使多个线程并发工作,从而高效使用系统资源。

1 Windows 如何执行I/O 操作

用一个从磁盘中读取文件中的数据为例:

1 程序通过构造一个 FileStream 对象来打开磁盘文件,然后调用 Read 方法从文件读取数据。

2 调用 FileStream 的Read 方法时,你的线程从托管代码转变为本机/用户模式代码,Read 内部调用 Win32 ReadFile 函数。

3 ReadFile 分配一个小的数据结构,称为 I/O 请求包 (I/O Request Packet,IRP)。 然后 ReadFile 将你的线程从本机/ 用户模式代码转变成本机/内核模式代码,向内核传递 IRP 数据结构,从而调用 Windows 内核。

4 根据 IRP 中的设备句柄, Windows 内核知道I/O 操作要传送给哪个硬件设备。因此,Windows 将IRP 传送给恰当的设备驱动程序的IRP 队列。

5 每个设备驱动程序都维护着自己的 IRP 队列,其中包含了机器上运行的所有进程发出的I/O 请求。IRP 数据包到达时,设备驱动程序将IRP 信息传递给物理硬件设备上安装的电路板。现在,硬件设备将执行这些请求的I/O 操作。

6 在硬件设备执行I/O 操作期间,发出了I/O 请求的线程将无事可做,所以 Windows 将线程变成睡眠状态,防止浪费CPU 时间。

7 最终,硬件设备会完成 I/O 操作。然后,Windows 会唤醒你的线程,把它调度给一个 CPU ,使它从内核模式返回用户模式,再返回至托管代码。

现在讨论一下 Windows 如何执行异步I/O 操作。引入了 CLR 的线程池。打开磁盘文件的方式任然是通过构造一个 FileStream 对象,但现在传递了一个 FileOptions.Asynchronous 标志。告诉Windows 我希望文件的读/写 操作以异步方式执行。

  • 1 现在调用 ReadAsync 而不是 Read 从文件中读取数据。ReadAsync 内部分配一个 Task<Int32> 对象来代表用于完成读取操作的代码。然后,ReadAsync 调用 Win32 ReadFile函数。
  • 2 ReadFile 分配 IRP 添加到硬盘驱动程序的 IRP 队列中。但线程不再阻塞,而是允许返回至你的代码。
  • 3 那什么时候以及什么方式处理最终读取的数据呢?

    注意:调用 ReadAsync 返回的是一个 Task<Int32> 对象,可在该对象调用 ContinueWith 来登记任务完成时执行的回调方法。也可以用C# 的异步函数功能简化编码。

  • 4 硬件设备处理好IRP 后,会将完成的 IRP 放到 CLR 的线程池队列中。将来的某个时候,一个线程池线程会提取完成的 IRP 并执行完成任务的代码,最终要么设置异常,要么返回结果。这样一来,Task 对象就知道操作在什么时候完成,代码可以开始运行并安全地访问 Byte[] 中的数据。

CLR 的线程池使用名为 “I/O完成端口”(I/O Completion Port)的 Windows 资源来引出我刚才描述的行为。CLR 在初始化时创建一个 I/O 完成端口。当你打开硬件设备时,这些设备可以和I/O 完成端口关联,使设备驱动程序知道完成的IRP 送到哪

以异步方式执行 I/O 操作有很多好处:

  • 1 将资源利用率降到最低,并减少上下文切换。
  • 2 每开始一次垃圾回收,CLR 都会挂起进程中的所有线程。所以,线程越少,垃圾回收器运行的速度越快。
  • 3 垃圾回收时,CLR 遍历所有线程栈来查找根,同样线程越少,栈的数量越少,使垃圾回收速率变得更快。

C# 的异步函数

Microsoft 设计了一个编程模型来帮助开发者利用这种异步操作能力。该模式利用了上一章的 Task 和 称为 异步函数 的一个C# 语言功能。以下代码使用异步函数来执行两个异步 I/O 操作。

private static async Task<String> IssueClientRequestAsync(String serverName, String message) {    using (var pipe = new NamedPipeClientStream(serverName, "PipeName", PipeDirection.InOut,PipeOptions.Asynchronous | PipeOptions.WriteThrough)) {        pipe.Connect(); // Must Connect before setting ReadMode        pipe.ReadMode = PipeTransmissionMode.Message;        // Asynchronously send data to the server        Byte[] request = Encoding.UTF8.GetBytes(message);        await pipe.WriteAsync(request, 0, request.Length);        // Asynchronously read the server‘s response        Byte[] response = new Byte[1000];        Int32 bytesRead = await pipe.ReadAsync(response, 0, response.Length);        return Encoding.UTF8.GetString(response, 0, bytesRead);    } // Close the pipe}

下面来解释一下上述代码中的异步函数执行过程。对于理解await 非常重要。

  • 方法标记为 async ,编译器就会将方法的代码转换成实现了状态机的一个类型。这就允许线程执行状态机中的一些代码并返回,方法不需要一直执行到结束。
  • 调用WriteAsync时,在WriteAsync 内部分配了一个 Task 对象并把它返回给IssueClientRequestAsync ,此时,C# await 操作符实际会在 Task 对象上调用ContinueWith ,向它传递用于恢复状态机的方法。然后线程从 IssueClientRequestAsync返回。
  • 在将来某个时候,网络设备驱动程序会结束向管道的写入,一个线程池线程会通知Task 对象,后者激活ContinueWith回调方法,造成一个线程恢复状态机。更具体的说,一个线程会重新进入 IssueClientRequestAsync 方法,但这次是从 await 操作符的位置开始的。
  • 方法现在执行编译器生成的、用于查询Task 对象状态的代码如果操作失败,会设置代表错误的一个异常。如果操作成功完成,await 操作符会返回结果。本列子中,WriteAsync 返回一个Task 而不是Task<TResult>, 所以无返回值。
  • 现在方法继续执行,分配一个Byte[] 并调用 NamedPipeClientStream 的异步 ReadAsync 方法。ReadAsync 内部创建一个 Task<Int32>对象并返回它。同样的,await 操作符实际会在Task<Int32>对象上调用 ContinueWith,向其传递用于恢复状态机的方法然后线程再次从 IssueClientRequestAsync 返回。
  • 将来的某个时候,服务器向客户机发送一个响应,网络设备驱动程序获得这个响应,一个线程池线程通知 Task<Int32>对象,后者恢复状态机。await 操作符造成编译器生成代码来查询 Task对象的Result 属性(一个 Int32)并将结果赋给局部变量 bytesRead。如果操作失败,则抛出异常。然后执行 IssueClientRequestAsync 剩余的代码,返回结果字符串并关闭管道。此时状态机执行完毕,垃圾回收器会回收任何内存。
  • 调用者如何知道 IssueClientRequestAsync 已经执行完毕它的状态机呢?一旦将方法标记为 async,编译器会自动生成代码,在状态机开始执行时创建一个 Task 对象。该Task 对象在状态机执行完毕时自动完成。注意 IssueClientRequestAsync 方法的返回类型是 Task<String>,它实际返回的是由编译器生成的代码为这个方法(IssueClientRequestAsync 方法)的调用者而创建的Task<String> 对象,Task 的Result 属性在本列中是 String 类型。在IssueClientRequestAsync 方法靠近尾部的地方,我反回了一个字符串。这造成编译器生成的代码完成它创建的 Task<String>对象,把对象的Result 属性设为返回的字符串。

注意:异步函数存在以下限制。

  • 1 不能转变为异步函数的情况,Main方法、构造器、属性访问器方法和 事件访问器方法。
  • 2 异步函数不能使用任何 out 或 ref 参数。
  • 3 不能在 catch ,finally 或 unsafe 块中使用 await 操作符。
  • 4 不能在 await 操作符之前获得一个支持线程所有权 或递归的锁,并在 await 操作符之后释放它。 因为 await 之前的代码由一个线程执行,而await 之后的代码由另一个线程执行。
  • 5 在查询表达式中,await 操作符只能在初始 from 子句的第一个集合表达式中使用,或者在 join 子句的集合表达式中使用。

异步函数扩展性

在扩展性方面,用Task对象包装一个将来完成的操作,就可以用await 操作符来等待该操作。下面是Jeffrey Richter 写的一个TaskLogger 类,它可以显示尚未完成的异步操作。我们可以在调试的时候使用。

public static class TaskLogger {    public enum TaskLogLevel { None, Pending }    public static TaskLogLevel LogLevel { get; set; }    public sealed class TaskLogEntry {    public Task Task { get; internal set; }    public String Tag { get; internal set; }    public DateTime LogTime { get; internal set; }    public String CallerMemberName { get; internal set; }    public String CallerFilePath { get; internal set; }    public Int32 CallerLineNumber { get; internal set; }        public override string ToString() {            return String.Format("LogTime={0}, Tag={1}, Member={2}, File={3}({4})",            LogTime, Tag ?? "(none)", CallerMemberName, CallerFilePath, CallerLineNumber);        }    }    private static readonly ConcurrentDictionary<Task,TaskLogEntry> s_log =    new ConcurrentDictionary<Task, TaskLogEntry>();    public static IEnumerable<TaskLogEntry> GetLogEntries() { return s_log.Values; }    public static Task<TResult> Log<TResult>(this Task<TResult> task, String tag = null,    [CallerMemberName] String callerMemberName = null,    [CallerFilePath] String callerFilePath = null,    [CallerLineNumber] Int32 callerLineNumber = -1) {    return (Task<TResult>)        Log((Task)task, tag, callerMemberName, callerFilePath, callerLineNumber);    }    public static Task Log(this Task task, String tag = null,    [CallerMemberName] String callerMemberName = null,    [CallerFilePath] String callerFilePath = null,    [CallerLineNumber] Int32 callerLineNumber = •1) {        if (LogLevel == TaskLogLevel.None) return task;        var logEntry = new TaskLogEntry {        Task = task,        LogTime = DateTime.Now,        Tag = tag,        CallerMemberName = callerMemberName,        CallerFilePath = callerFilePath,        CallerLineNumber = callerLineNumber        };        s_log[task] = logEntry;        task.ContinueWith(t => { TaskLogEntry entry; s_log.TryRemove(t, out entry); },        TaskContinuationOptions.ExecuteSynchronously);        return task;    }}

以下代码演示如何使用它

public static async Task Go() {    #if DEBUG    // Using TaskLogger incurs a memory and performance hit; so turn it on in debug builds    TaskLogger.LogLevel = TaskLogger.TaskLogLevel.Pending;    #endif    // Initiate 3 task; for testing the TaskLogger, we control their duration explicitly    var tasks = new List<Task> {        Task.Delay(2000).Log("2s op"),        Task.Delay(5000).Log("5s op"),740 PART V Threading        Task.Delay(6000).Log("6s op")    };    try {        // Wait for all tasks but cancel after 3 seconds; only 1 task should complete in time        // Note: WithCancellation is my extension method described later in this chapter        await Task.WhenAll(tasks).        WithCancellation(new CancellationTokenSource(3000).Token);    }    catch (OperationCanceledException) { }    // Ask the logger which tasks have not yet completed and sort    // them in order from the one that’s been waiting the longest    foreach (var op in TaskLogger.GetLogEntries().OrderBy(tle => tle.LogTime))        Console.WriteLine(op);}

编译并运行结果如下:

LogTime=7/16/2012 6:44:31 AM, Tag=6s op, Member=Go, File=C:\CLR via C#\Code\Ch28-1-IOOps.cs(332)LogTime=7/16/2012 6:44:31 AM, Tag=5s op, Member=Go, File=C:\CLR via C#\Code\Ch28-1-IOOps.cs(331)

取消I/O 操作

作者建议实现一个 WithCancellation 扩展方法来扩展 Task<TResult> (需要类似的重载版本来扩展 Task)。如下所示:

private struct Void { } // Because there isn‘t a non-generic TaskCompletionSource class.private static async Task<TResult> WithCancellation<TResult>(this Task<TResult> originalTask,CancellationToken ct) {    // Create a Task that completes when the CancellationToken is canceled    var cancelTask = new TaskCompletionSource<Void>();    // When the CancellationToken is canceled, complete the Task    using (ct.Register(        t => ((TaskCompletionSource<Void>)t).TrySetResult(new Void()), cancelTask)) {        // Create a Task that completes when either the original or        // CancellationToken Task completes        Task any = await Task.WhenAny(originalTask, cancelTask.Task);        // If any Task completes due to CancellationToken, throw OperationCanceledException        if (any == cancelTask.Task) ct.ThrowIfCancellationRequested();    }    // await original task (synchronously); if it failed, awaiting it    // throws 1st inner exception instead of AggregateException    return await originalTask;}

//现在可以像下面这样调用该扩展方法了。

public static async Task Go() {    // Create a CancellationTokenSource that cancels itself after # milliseconds    var cts = new CancellationTokenSource(5000); // To cancel sooner, call cts.Cancel()    var ct = cts.Token;    try {        // I used Task.Delay for testing; replace this with another method that returns a Task        await Task.Delay(10000).WithCancellation(ct);        Console.WriteLine("Task completed");    }    catch (OperationCanceledException) {        Console.WriteLine("Task cancelled");    }}

有的I/O 操作必须同步执行

FCL 不能以异步的方式高效地打开文件。另外,Windows 也没有提供函数以异步方式访问注册表、访问事件日志、获取目录的文件/子目录 或者更改文件/ 目录的属性等待。例如:Win32 CreateFile 方法(由FileStream 的构造器调用)总是以同步方式执行。

I/O 请求优先级 为什么要由 I/O 请求优先级? 线程要执行 I/O 请求以便从各种硬件设备中读写数据。如果一个低优先级线程获得了CPU 时间,它可以在非常短的时间里轻易地将成百上千 的 I/O 请求放入队列。由于I/O 请求一般需要时间来执行,所以一个低优先级线程可能挂起高优先级线程,使后者不能快速完成工作,从而严重影响系统的总体相依能力。 Windows 允许线程在发出 I/O 请求时指定优先级。欲知 I/O 优先级的详情,参考以下网址。

http://www.microsoft.com/whdc/driver/priorityio.mspx

作者写书的时候 FCL 还没有包含这个功能。作者提供了一个办法可以实现 I/O 优先级。可以采取 P/Invoke 本机 Win32 函数的方式。

public static void Main () {    using (ThreadIO.BeginBackgroundProcessing()) {    // Issue low-priority I/O requests in here (eg: calls to ReadAsync/WriteAsync)    }}

用上面这个方法告诉 Windows 你的线程要发出低优先级 I/O 请求。注意,这同时也会降低线程的CPU 调度优先级。可调用 EndBackgroundProcessing,或者在 BeginBackgroundProcessing 返回的值上调用 Dispose 。使线程恢复为发出普通优先级 I/O请求(以及普通的CPU 调度优先级)。

下面是我实现的一个应用于await 的方法。

private Task<long> DoSomethingLongAsync(string name){    Console.WriteLine($"****************DoSomethingLong Start {name} {Thread.CurrentThread.ManagedThreadId.ToString("00")} {DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff")}***************");    long lResult = 0;       return new Task<long>(()=> {        for (int i = 0; i < 100000; i++)        {            lResult += i;        }        Thread.Sleep(4000);        Console.WriteLine($"****************DoSomethingLong   End {name} {Thread.CurrentThread.ManagedThreadId.ToString("00")} {DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff")} {lResult}***************");        return lResult;    });}?private long ReDoSomethingLong(string name){    Console.WriteLine($"****************DoSomethingLong Start {name} {Thread.CurrentThread.ManagedThreadId.ToString("00")} {DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff")}***************");    long lResult = 0;    for (int i = 0; i < 100000; i++)    {        lResult += i;    }    Thread.Sleep(4000);?    Console.WriteLine($"****************DoSomethingLong   End {name} {Thread.CurrentThread.ManagedThreadId.ToString("00")} {DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff")} {lResult}***************");    return lResult;}
private async void btnAwaitAsync_Click(object sender, EventArgs e){    //Stopwatch watch = new Stopwatch();    //watch.Start();    Console.WriteLine($"****************btnAwaitAsync_Click Start {Thread.CurrentThread.ManagedThreadId.ToString("00")} {DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff")}***************");    long res = await DoSomethingLongAsync("btnTask_Click_001");     long res1 = await Task.Run(() => this.ReDoSomethingLong("btnTask_Click_001"));    //watch.Stop();    Console.WriteLine($"****************btnAwaitAsync_Click   End {Thread.CurrentThread.ManagedThreadId.ToString("00")} {DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff")}***************");}

原文地址:https://www.cnblogs.com/mingjie-c/p/11745210.html

时间: 2024-07-30 08:04:12

CLR 异步函数的相关文章

.Net异步函数存在的限制

本文摘录自CLR Via C# 第四版. 异步函数存在以下限制: 1.不能讲应用程序的Main方法转变成异步函数.另外,构造器.属性访问器方法和时间访问器方法不能转变成异步函数. 2.异步函数不能使用任何out或ref参数. 3.不能再catch,finally或unsafe快中使用await操作符. 4.不能再await操作符之前获得一个支持县城所有权或递归的锁,并在await操作符之后释放它.这是因为await之前的代码由一个线程执行,之后的代码则由另一个线程执行.在C# lock语句中使用

cuda 异步函数

为了提高cuda效率使用异步函数是一个很常规的选择,但是异步函数并没有我自己想象的这么智能. 它要你要异步传输的数据在主机端(host)不能被更改,即异步函数只是指示了一个传输的位置指针,并没有对这个数据进行缓存,到真正需要的时候,才会去主机内存中去寻找这个值.所以在做异步的时候要保证异步传输的主机端不能在异步完成(或者说拷贝完成)之前被修改.要不然这个数据就成了修改后的了. 实验代码如下 #include <stdio.h> #include <cuda_runtime.h> _

获取JavaScript异步函数的返回值

今天研究一个小问题: 怎么拿到JavaScript异步函数的返回值? 1.错误尝试 当年未入行时,我的最初尝试: ? 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 <script> function getSomething() {  var r = 0;  setTimeout(function() {  r = 2;  }, 10);  return r; } function compute() {  var x = getSomething();  alert

同步函数与异步函数

依据微软的MSDN上的解说: (1)   同步函数:当一个函数是同步执行时,那么当该函数被调用时不会立即返回,直到该函数所要做的事情全都做完了才返回. (2)   异步函数:如果一个异步函数被调用时,该函数会立即返回尽管该函数规定的操作任务还没有完成. (3) 在一个线程中分别调用上述两种函数会对调用线程有何影响呢? 当一个线程调用一个同步函数时(例如:该函数用于完成写文件任务),如果该函数没有立即完成规定的操作,则该操作会导致该调用线程的挂起(将CPU的使用权交给系统,让系统分配给其他线程使用

tornado.gen.coroutine-编写异步函数

异步函数: 1. 返回Future 2. 必须有set_result( )或者set_exception( )调用. 这里展示一个异步socket读取的例子: 首先定义一个定时返回的服务器,来模拟耗时的操作 from tornado.tcpserver import TCPServer from tornado import ioloop from tornado import gen  from tornado.concurrent import Future def sleep(durati

ES2017异步函数现已正式可用

ES2017标准已于2017年6月份正式定稿了,并广泛支持最新的特性:异步函数.如果你曾经被异步 JavaScript 的逻辑困扰,这么新函数正是为你设计的. 异步函数或多或少会让你编写一些顺序的 JavaScript 代码,但是却不需要在 callbacks.generators 或 promise 中包含你的逻辑. 如下代码: function logger() {     let data = fetch('http://sampleapi.com/posts')     console.

Node.js用ES6原生Promise对异步函数进行封装

版权声明:本作品采用知识共享署名-非商业性使用-相同方式共享 4.0 国际许可协议进行许可.转载请注明来源http://blog.csdn.net/azureternite 目录(?)[+] Promise的概念 Promise 对象用于异步(asynchronous)计算..一个Promise对象代表着一个还未完成,但预期将来会完成的操作. Promise的几种状态: pending:初始状态,即等待操作的执行 fulfilled:成功的操作 rejected:失败的操作 pending的状态

js异步函数队列

场景: 做直播,会有入场消息,入场特效,用户如果有坐骑,需要给他展示几秒钟的坐骑特效,如果几个人同时进场,那该怎么展示呢?这时候就会想到setTimeout函数,对,思路不错,但是,异步函数队列怎么实现呢?直接上代码: var Queue = function() { this.list = []; }; Queue.prototype = { constructor: Queue, queue: function(fn) { this.list.push(fn) return this; },

异步函数封装请确保异步性(Javascript需要养成的良好习惯)

背景假设: 你有许多的配置信息存放在服务器上,因为配置太多,不希望每次都把所有的配置信息都写到前端,希望能需要用的时候再获取就好了. 因为Javascript单线程运行,你不希望堵塞ui渲染于是你专门写了个异步获取函数(ajax获取后台信息) var getConfig=function(key,callback){ $.get('/config/'+key,function(config){ callback(null,config); },'json');};//使用该函数getConfig