异步编程与我们所看过的其他并行编程形式有一些不同,讨论的其他主题可以有大量的线程并行运行,可以完全利用系统中可用的处理器;而在异步编程中,需要避免阻塞线程,我们在这一章的第一节“线程、内存、锁定和阻塞”中已经对阻塞线程的概念有所了解了。阻塞的线程是不能工作的线程,因为它需要等待其他任务的完成;线程等待的通常任务是操作系统执行的输入输出,但有时也可能是等待锁,因此会进行临界区。线程是相对昂贵的资源,每个线程分配 1 MB 的堆(stack),以及操作系统内核为管理大量线程而产生的其他相关消耗。在性能至上的代码中,保持阻塞线程数量在较低的水平上是至关重要的;理论上,只要做到有多少的处理器,就有多少的线程,就不会有阻塞的线程了。
注意
为了概要地了解一下用这些方法能够达到什么样的结果,可以看一下Amanda Laucher 2009 年在 Lang.NET 上的演讲,她讲解了如何使用F# 工作流并行化 C# 的程序,以及实现了一些令人印象深刻的结果:www.langnetsymposium.com/2009/speakers.aspx。
在这一节,我们将学习如何使用 .NET 框架的异步编程模型(asynchronous programming model)避免输入输出期间线程的阻塞。异步编程模型的意思是,在有关流的类上,使用一对Begin/End 方法,比如 BeginRead/EndRead;典型地,这一对方法执行某种输入输出任务,比如读文件。这种编程方法的名声很不好,因为需要找到能在Begin/End 调用之间保持状态的好方法。这一节我们直接讨论编程模型,相反,将看一下 F# 的一个功能,异步工作流(asynchronous workflows),看如何用它来避免在其他
.NET 语言中与使用异步编程模型相关的工作。为了更详细了解异步编程模型,以及使用的困难,请参考Jeffrey Richter 在 MSDN 上的文章《Asynchronous Device Operations》(http://msdn.microsoft.com/en-us/magazine/cc163415.aspx)。
异步工作流不是由 .NET 的异步编程模型所专用。在下一节“消息传递”中,我们将学习如何使用这些工作流与 F# 的邮箱(mailboxes)来协调大量不同的任务,它可以等待任务完成而不阻塞线程。
了解 F# 中异步工作流的第一步是了解它的语法。创建异步工作流,使用一元语法(monadic syntax),同我们在第三章中见过的序列表达式相似;基本语法使用关键字async,加用大括号括起来的工作流表达式:async { ... }。简单的工作流程序像这样使用工作流:
open System.IO
// a function to read a text fileasynchronusly
let readFile file =
async{ let! stream = File.AsyncOpenText(file)
let! fileContents = stream.AsyncReadToEnd()
return fileContents }
// create an instance of the workflow
let readFileWorkflow = readFile"mytextfile.txt"
// invoke the workflow and get the contents
let fileContents = Async.RunSynchronouslyreadFileWorkflow
编译这个程序,需要引用 FSharp.PowerPack.dll。程序中的 readFile 函数创建一个工作,异步读文件,然后返回文件的内容;接下来,创建工作流的实例 readFileWorkflow;最后,运行这个工作流,获得文件的内容。很重要的一点,是要理解,只调用 readFile 函数并不真正读文件;相反,它报建工作流的新实例,然后,运行这个工作流,去执行读文件的任务;Async.RunSynchronously 函数真正负责运行工作流。工作流实例是一种小型的数据结构,有点像一段小程序,能够解释一些要做的工作。
关注这个示例最重要的是 let 后面的感叹号(let!),通常读作let bang。工作流/一元语法可以为 let! 赋予不同的含义。在异步工作流中,它表示将要发生的异步操作;在异步操作发生期间工作流停止运行;在线程池中插入一个回调函数,当这个异步操作完成时被调用,如果发出原始调用的线程没有空闲,就可能发生在不同的线程上;异步调用之后,原始线程被释放,可以继续其他工作。
你可能已经注意到,let! 是用在有 Async前缀的一些专用方法,在 FSharp.PowerPack.dll 中,这些函数被定义成类型扩增(type augmentations),它与 C# 的扩展方法(extension
methods)等价,这些方法处理对 Begin/End方法对的调用。如果没有现成的 Async 方法,我们自己创建也很简单,使用 Async.Primitive 函数和 Begin/End 方法对。
简单的步骤可能像这样:
第一步:主程序线程启动打开文件流的进程,在线程池中插入回调函数,当这个操作完成时使用,而这个线程现在空闲可以继续做其他工作;
第二步:当文件流已经打开,线程池线程(A thread pool thread)被激活,开始读文件的内容,在线程池中插入回调函数,当这个操作完成时使用。因为它是一个线程池线程,所以,它将返回到线程池;
第三步:当已经完成读文件,线程池线程被激活,将返回从文件中读到的文本数据,并返回到线程池;
第四步:因为我们已经使用了 sync.RunSynchronously 函数,主程序线程将等待工作流的结果,接收文件的内容。
在这个简单的示例中,你还可能会发现一点缺陷,没有阻塞主程序线程等待输入输出,但是,因为我们等待异步工作流完成,也就阻塞了主程序线程,直到输入输出完成。换一种方式,在它自己的[ 线程吗? ]上运行异步工作流并等待结果,就没有或几乎没有优势了。然而,并行运行几个工作流相当简单;同时运行几个工作流有一个明显的优势,因为原始线程在它启动了第一个异步任务之后,不会被阻塞,就是说,它是空闲的,可以继续运行其他异步任务。
要说明这个也很简单,我们把原来的示例作一点修改,不是读一个文件,而是读三个文件。而把这个与同步版本的程序作一下比较,有助于发现它们之间的差别。我们先看一下同步版本:
open System
open System.IO
open System.Threading
let print s =
lettid = Thread.CurrentThread.ManagedThreadId
Console.WriteLine(sprintf"Thread %i: %s" tid s)
let readFileSync file =
print(sprintf "Beginning file %s" file)
letstream = File.OpenText(file)
letfileContents = stream.ReadToEnd()
print(sprintf "Ending file %s" file)
fileContents
// invoke the workflow and get the contents
let filesContents =
[|readFileSync "text1.txt";
readFileSync"text2.txt";
readFileSync"text3.txt"; |]
这个程序相当简单,其中还有一些调试代码,显示处理文件的开始与结束。现在再看一下异步版本:
open System
open System.IO
open System.Threading
let print s =
lettid = Thread.CurrentThread.ManagedThreadId
Console.WriteLine(sprintf"Thread %i: %s" tid s)
// a function to read a text fileasynchronusly
let readFileAsync file =
async{ do print (sprintf "Beginning file %s" file)
let! stream = File.AsyncOpenText(file)
let! fileContents = stream.AsyncReadToEnd()
do print (sprintf "Ending file %s" file)
return fileContents }
let filesContents =
Async.RunSynchronously
(Async.Parallel[ readFileAsync "text1.txt";
readFileAsync "text2.txt";
readFileAsync "text3.txt"; ])
另外,这个版本也包含了一些调试代码,因此,可以看到程序是如何运行的。最大的改变是现在使用了 Async.Parallel函数,把几个工作流组合成一个工作流。这样,当第一个线程完成处理第一个异步调用之后,就空闲了,可以继续处理其他工作流。看看下面两个程序的运行结果就知道了:
同步结果:
Thread 1: Beginning file text1.txt
Thread 1: Ending file text1.txt
Thread 1: Beginning file text2.txt
Thread 1: Ending file text2.txt
Thread 1: Beginning file text3.txt
Thread 1: Ending file text3.txt
异步结果:
Thread 3: Beginning file text1.txt
Thread 4: Beginning file text2.txt
Thread 3: Beginning file text3.txt
Thread 4: Ending file text2.txt
Thread 4: Ending file text1.txt
Thread 4: Ending file text3.txt
两组结果完全不同。对于同步结果,每一个 Beginning file 后面跟一个 Ending file,且出现在同一个线程中;第二种情况下,所有 Beginningfile 的实例同时发生,且在两个不同的线程中,这是因为每一个线程完成了异步操作以后,它就空闲了可以继教启动另一个操作。输入输出一旦完成之后,Ending file 就发生了。
异步编程(Asynchronous Programming)