在我们使用异步工作流来抓取网页内容之前,需要引用 FSharp.PowerPack.dll 库,它包含了许多 .NET 方法的异步版本。开发独立的应用程序时,可以使用添加引用命令;在这一章,我们将使用互动开发模式,因此,创建一个新的 F# 脚本文件,使用 #r 指令(清单 13.1)。
清单13.1 使用异步工作流写代码 (F# Interactive)
> #r "FSharp.PowerPack.dll";;
> open System.IO
open System.Net;;
> let downloadUrl(url:string) = async{ [1]
let request =HttpWebRequest.Create(url)
let! response =request.AsyncGetResponse() [2]
use response =response [3]
let stream =response.GetResponseStream()
use reader = newStreamReader(stream)
return!reader.AsyncReadToEnd() };; [4]
val downloadUrl : string ->Async<string>
导入(opening)所有必需的命名空间以后, 我们定义一个使用异步工作流程的函数,它使用 async 值作为计算生成器[1]。可以轻松地证明,它就是普通的值;在 Visual Studio 中,如果在值的后面键入一个点(.),智能感知会显示它包含的所有常用计算生成器成员,比如,Bind 和 Return,以及其他几个基本操作,我们在以后会要到。输出的类型签名显示,计算类型是 Async<string>。在后面,我们会详细讨论这个类型。
清单 13.1 中的代码,使用一次 let! 结构,执行由 F# 库所提供的异步基本操作 AsyncGetResponse。这个方法返回Async<WebResponse> 类型,因此,let! 结构组合了两个异步操作,把实际的 WebResponse 值绑定到符号 response 上。这样,一旦异步操作完成后,我们就可以使用这个值了。
在下一行[3],使用到了use 基本操作,特定对象一旦超出作用域,就会释放。我们已经讨论过在通常 F# 程序中 use 的使用,它在异步工作流中的行为非常类似。工作流完成时,它会立即释放 HTTP 响应。我们使用值隐藏(value hiding),隐藏原来的 response 符号,声明一个将被释放新值。这是一种常用模式,因此,F# 提供了简便的方法,使用 use! 基本操作,简单地组合了 let! 和 use。既然我们知道了,就可以把上面的两行替换成一行:
use! response = request.AsyncGetResponse()
清单 13.1 的最后一行,我们使用了之前从没见过的基本操作 return![4],它可以运行其他的异步操作(就像使用 let! 基本操作),只是当操作完成时,会返回结果,而不是指定给符号。像 do! 基本操作一样,这也是简单的语法糖(syntactic sugar)。计算生成器不需要实现任何其他的成员,编译器就可以把代码看作是这样写的(实际的转换更简单):
let! text = reader.AsyncReadToEnd()
return text
现在,我们已经有了创建异步计算的 downloadUrl 函数,还应该确定如何用它来下载网页的内容。在清单 13.2 中可以看到,我们使用了 Async 模块中的函数来运行工作流。
清单13.2 进行异步计算 (F# Interactive)
> let downloadTask = downloadUrl("http://www.manning.com");; [1] <-- 生成异步工作流
val downloadTask : Async<string>
>Async.RunSynchronously(downloadTask);; [2] <-- 运行工作流,等待结果
val it : string = "<!DOCTYPE htmlPUBLIC "-//W3C//DTD XHTML 1.0
Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-tr
ansitional.dtd"><html><head>(...)"
> let tasks =
[ downloadUrl("http://www.tomasp.net");
downloadUrl("http://www.manning.com") ];;
val tasks : list<Async<string>>
> let all =Async.Parallel(tasks);; [3] <--把几个工作流组合成一个
val all : Async<string[]>
> Async.RunSynchronously(all);;
val it : string[] = [ "...";"..." ]
使用异步工作流写的代码是自动延迟的,因此,当我们执行第一行的 downloadUrl 函数,它还不会开始下载网页[1]。返回的值(Async<string> 类型)表示想要运行的计算,就像函数值表示以后可以执行的代码一样。Async 模块提供了运行工作流的方法,表 13.1 描述了其中一部分。
表 13.1 在标准 F# 库的 Async 模块中,几个处理异步工作流的基本操作
基本操作 |
基本操作的类型和描述 |
RunSynchronously |
Async<‘T> –> ‘T 在当前线程中启动给定的工作流。异步操作在工作流中使用时,工作流重新开始线程,用于调用异步回调。此操作会阻塞调用者线程,并等待工作流的结果。 |
Start |
Async<unit> –> unit 在后台(使用线程池线程)启动给定的工作流,并立即返回。工作流与随后的调用者代码并行执行。从签名可知,工作流不返回值。 |
CreateAsTask |
Async<‘T> -> Task<‘T> 这个方法仅在 .NET 4.0 使用。它把异步工作流打包成可用于执行它的 Task<‘T> 对象。任务可以用 Start 或 RunSynchronously 方法启动,其行为类似于 Async 基本操作。要获取工作流的结果,可以使用Result 属性,如果工作流尚未完成,就会阻塞。 |
Parallel |
seq<Async<‘a>> -> Async<array<‘a>> 得到异步工作流的集合,并返回一个工作流,以并行方式执行所有参数值。返回的工作流等待所有操作完成,然后,在一个数组中返回结果。 |
在清单 13.2 中,我们最初使用 Async.RunSynchronously[2],阻塞了调用线程,这对于以交互方式测试工作流,非常有用。在下一步,我们创建工作流值的列表。另外,在这里,不启动任何运行。我们有了这个集合以后,就可以使用 Async.Parallel 方法[3]生成一个工作流,以并行方式,执行列表中所有工作流。这时,仍不会执行任何原始的工作流;要执行,就需要再次使用 Async.RunSynchronously,以启动组合的工作流,并等待结果。组合的工作流启动所有工作流,并等待所有的工作流都完成。
在等待整体结果时,代码仍会阻塞,但运行的效率高了,它使用 .NET 线程池来平衡运行线程的最大值。如果我们创建几百个任务,它不可能创建几百个线程,因为,这样做效率不高相反,应该使用少量的线程。当工作流使用 let! 结构,进行异步基本操作的调用时,会在系统中注册一个回调,并释放这个线程。因为 .NET 使用线程池管理线程,完成工作的线程可以重用,以启动其他的异步工作流。当我们使用异步工作流时,以并行方式运行的任务数可以远远大于直接使用的线程数。
在本章,我们需要交互地获取数据,因此,我们对以并行方式运行工作流感兴趣的是,而不在意开发响应灵敏的图形应用程序。后一类应用程序(也称为响应式应用程序(reactive applications))也很重要,我们将在第十六章讨论这个主题。现在,我们已经看到了使用异步工作流的代码,下面,就看看它们是如何实现的。