案例研究:CopyToAsync

返回该系列目录《基于Task的异步模式--全面介绍》



把一个流拷贝到另一个流是有用且常见的操作。Stream.CopyTo 方法在.Net 4中就已经加入来满足要求这个功能的场景,例如在一个指定的URL处下载数据:

public static byte[] DownloadData(string url)
{
    using(var request = WebRequest.Create(url))
    using(var response = request.GetResponse())
    using(var responseStream = response.GetResponseStream())
    using(var result = new MemoryStream())
    {
        responseStream.CopyTo(result);
        return result.ToArray();
    }
}

为了提高响应能力和伸缩性,我们想使用基于TAP模式来实现上面的功能。可以尝试按下面的来做:

public static async Task<byte[]> DownloadDataAsync(string url)
{
    using(var request = WebRequest.Create(url))
    {
        return await Task.Run(() =>
        {
            using(var response = request.GetResponse())
            using(var responseStream = response.GetResponseStream())
            using(var result = new MemoryStream())
            {
                responseStream.CopyTo(result);
                return result.ToArray();
            }
        }
    }
}

此实现如果用于UI线程会提升响应能力,因为它脱离了从网络流下载数据任务的调用线程以及把该网络流复制到最终将下载的数据转成一个数组的内存流。然而,该实现对伸缩性没有效果,因为它在等待数据下载的过程中,仍旧执行同步I/O和阻塞线程池线程。反之,我们想要的是下面的功能代码:

public static async Task<byte[]> DownloadDataAsync(string url)
{
    using(var request = WebRequest.Create(url))
    using(var response = await request.GetResponseAsync())
    using(var responseStream = response.GetResponseStream())
    using(var result = new MemoryStream())
    {
        await responseStream.CopyToAsync(result);
        return result.ToArray();
    }
}

不幸的是,在.Net 4中缺少异步的CopyToAsync方法,只有Stream类有一个同步的CopyTo方法。现在我们就自己提供一个实现:

public static void CopyTo(this Stream source, Stream destination)
{
    var buffer = new byte[0x1000];
    int bytesRead;
    while((bytesRead = source.Read(buffer, 0, buffer.Length)) > 0)
    {
        destination.Write(buffer, 0, bytesRead);
    }
}

为了提供一个异步的CopyTo实现,我们可以利用编译器实现TAP的能力,稍微地修改这个实现:

public static async Task CopyToAsync(this Stream source, Stream destination)
{
    var buffer = new byte[0x1000];
    int bytesRead;
    while((bytesRead = await source.ReadAsync(buffer, 0, buffer.Length)) > 0)
    {
        await destination.WriteAsync(buffer, 0, bytesRead);
    }
}

这里我们将返回类型从void改成了Task,将Read和Write分别换成了ReadAsync和WriteAsync,并且在ReadAsync和WriteAsync的调用前加了与上下文相关的await关键字前缀。.Net 4 中不存在ReadAsycn和WriteAsync,但是可以通过基于Task.Factory.FromAsync实现,关于这个描述在上一篇随笔中的“Tasks和APM”章节讲过:

public static Task<int> ReadAsync(
    this Stream source, byte [] buffer, int offset, int count)
{
    return Task<int>.Factory.FromAsync(source.BeginRead, source.EndRead,
        buffer, offset, count, null);
}

public static Task WriteAsync(
    this Stream destination, byte [] buffer, int offset, int count)
{
    return Task.Factory.FromAsync(
        destination.BeginWrite, destination.EndWrite,
        buffer, offset, count, null);
}

有了这些方法,我们可以成功地实现CopyToAsync方法。我们也可以通过添加一个CancellationToken到方法中以支持撤销请求,该CancellationToken将会在复制过程中的每次读写之后被监控到(如果ReadAsync和WriteAsync支持撤销,那么也可以将CancellationToken线程化到那些调用中):

public static async Task CopyToAsync(
    this Stream source, Stream destination,
    CancellationToken cancellationToken)
{
    var buffer = new byte[0x1000];
    int bytesRead;
    while((bytesRead = await source.ReadAsync(buffer, 0, buffer.Length)) > 0)
    {
        await destination.WriteAsync(buffer, 0, bytesRead);
        cancellationToken.ThrowIfCancellationRequested();
    }
}

【注意这种撤销在同步的CopyTo实现中也是有用的,传入的CancellationToken会启用撤销。实现会依赖一个从该方法返回的可取消的对象,但实现接收到那么对象已经太晚了,因为同步调用完成时,已经没有留下要取消的东西了。】

我们也加入了进度通知的支持,包括至今已经复制了多少数据:

public static async Task CopyToAsync(
    this Stream source, Stream destination,
    CancellationToken cancellationToken,
    IProgress<long> progress)
{
    var buffer = new byte[0x1000];
    int bytesRead;
    long totalRead = 0;
    while((bytesRead = await source.ReadAsync(buffer, 0, buffer.Length)) > 0)
    {
        await destination.WriteAsync(buffer, 0, bytesRead);
        cancellationToken.ThrowIfCancellationRequested();
        totalRead += bytesRead;
        progress.Report(totalRead);
    }
}

有了该方法,我们现在可以完全实现我们的DownloadDataAsync方法了,包括加入撤销和进度支持:

public static async Task<byte[]> DownloadDataAsync(
    string url,
    CancellationToken cancellationToken,
    IProgress<long> progress)
{
    using(var request = WebRequest.Create(url))
    using(var response = await request.GetResponseAsync())
    using(var responseStream = response.GetResponseStream())
    using(var result = new MemoryStream())
    {
        await responseStream.CopyToAsync(
            result, cancellationToken, progress);
        return result.ToArray();
    }
}

给我们的CopyToAsync方法做进一步的优化也是可能的。比如,如果我们要使用两个buffer而不是一个,就可以在读取下一片数据时写入之前读取的数据,因此如果读取和写入都使用了异步了I/O就会产生交叉延迟:

public static async Task CopyToAsync(this Stream source, Stream destination)
{
    int i = 0;
    var buffers = new [] { new byte[0x1000], new byte[0x1000] };
    Task writeTask = null;
    while(true)
    {
        var readTask = source.ReadAsync(buffers[i], 0, buffers[i].Length))>0;
        if (writeTask != null) await Task.WhenAll(readTask, writeTask);
        int bytesRead = await readTask;
        if (bytesRead == 0) break;
        writeTask = destination.WriteAsync(buffers[i], 0, bytesRead);
        i ^= 1; // swap buffers
    }
}

消除不必要的上下文转换是另一个优化。正如之前提到的,默认await一个Task开始执行的时候,会传输回到当前的SynchronizationContext。在CopyToAsynch实现的情况下,使用这样的转换时没必要的,因为我们没有操作任何UI状态。我们可以发挥Task.ConfigureAwait的优势类关闭这个自动的转换。为了简化,上面的原始异步的实现修改如下:

public static Task CopyToAsync(this Stream source, Stream destination)
{
    var buffer = new byte[0x1000];
    int bytesRead;
    while((bytesRead = await
        source.ReadAsync(buffer, 0, buffer.Length).ConfigureAwait(false)) > 0)
    {
        await destination.WriteAsync(buffer, 0, bytesRead)
            .ConfigureAwait(false);
    }
}                                                                                           
返回该系列目录《基于Task的异步模式--全面介绍》


该系列终于结束了,谢谢大家多多支持。请继续关注我们博客,我会陆续出些系列性的文章。

作者:tkb至简 出处:http://www.cnblogs.com/farb/

QQ:782762625

欢迎各位多多交流!

本文版权归作者和博客园共有,欢迎转载。未经作者同意下,必须在文章页面明显标出原文链接及作者,否则保留追究法律责任的权利。
如果您认为这篇文章还不错或者有所收获,可以点击右下角的【推荐】按钮,因为你的支持是我继续写作,分享的最大动力!

时间: 2024-10-25 08:28:07

案例研究:CopyToAsync的相关文章

JavaEE Tutorials (30) - Duke综合案例研究示例

30.1Duke综合应用的设计和架构456 30.1.1events工程458 30.1.2entities工程459 30.1.3dukes—payment工程461 30.1.4dukes—resources工程461 30.1.5Duke商店工程461 30.1.6Duke货运工程46530.2构建和部署Duke综合案例研究应用467 30.2.1使用NetBeans IDE构建和部署Duke综合应用467 30.2.2使用Maven构建和部署Duke综合应用46730.3运行Duke综合

机器学习入门系列02,Regression 回归:案例研究

为什么要先进行案例研究? 没有比较好的数学基础,直接接触深度学习会非常抽象,所以这里我们先通过一个预测 Pokemon Go 的 Combat Power (CP) 值的案例,打开深度学习的大门. Regression (回归) 应用举例(预测Pokemon Go 进化后的战斗力) 比如估计一只神奇宝贝进化后的 CP 值(战斗力). 下面是一只妙蛙种子,可以进化为妙蛙草,现在的CP值是14,我们想估计进化后的CP值是多少:进化需要糖果,好处就是如果它进化后CP值不满意,那就不用浪费糖果来进化它了

JavaEE Tutorials (28) - Duke书店案例研究示例

28.1Duke书店的设计和架构43828.2Duke书店接口439 28.2.1Book Java持久化API实体439 28.2.2Duke书店中使用的企业bean440 28.2.3Duke书店中使用的Facelets页面和托管bean440 28.2.4Duke书店中使用的定制组件和其他定制对象441 28.2.5Duke书店中使用的属性文件442 28.2.6Duke书店中使用的部署描述文件44328.3运行Duke书店案例研究应用443 28.3.1使用NetBeans IDE构建和

JavaEE Tutorials (29) - Duke辅导案例研究示例

29.1Duke辅导应用的设计和架构44529.2主界面447 29.2.1主界面中使用的Java持久化API实体447 29.2.2主界面中使用的企业bean448 29.2.3主界面中使用的WebSocket端点448 29.2.4主界面中使用的Facelets文件448 29.2.5主界面中使用的辅助类449 29.2.6属性文件449 29.2.7Duke辅导应用中使用的部署描述文件45029.3管理界面450 29.3.1管理界面中使用的企业bean450 29.3.2管理界面中使用的

吴恩达《深度学习》-课后测验-第三门课 结构化机器学习项目(Structuring Machine Learning Projects)-Week2 Autonomous driving (case study) (case study)( 自动驾驶 (案例研究))

Week2 Autonomous driving (case study) (case study)( 自动驾驶 (案例研究)) \1. To help you practice strategies for machine learning, in this week we'll present another scenario and ask how you would act. We think this "simulator" of working in a machine l

案例研究RAID控制器应用程序中的everspin mram

everspin MRAM是为LSI Corporation(现在的Avago Technologies)RAID控制器卡上的日志存储器选择的存储器,该RAID卡具有6Gb/s和12Gb/sSAS存储连接.Everspin MRAM在其RAID磁盘阵列中执行写日志或数据日志功能. MRAM实时捕获事务信息,以便在发生系统故障时可以恢复数据.写入MRAM的数据日志还可以捕获系统状况和状态,以进行远程诊断和修复.MRAM芯片也包含在LSI公司针对第三方RAID卡和主板RAID(ROMB)解决方案的参

优化字符串的使用:案例研究

优化字符串的使用:案例研究 C++的std::string类模板是C++标准库中使用最广泛的特性之一.只要操作字符串的代码会被频繁地执行,那么就有优化的用武之地. 为什么字符串很麻烦 字符串在概念上很简单,但是想要实现高效的字符串却非常微妙.由于std::string中特性的特定组合的交互方式,使得实现高效的字符串几乎不可能. 字符串的某些行为会增加使用它们的开销,这一点与实现方式无关.字符串是动态分配的,它们在表达式中的行为与值相似,而且实现它们需要大量的复制操作. 字符串是动态分配的 字符串

ORACLE 11G DataGuard的一些高级管理案例研究

搭建完了ORACLE 11G dataguard后,也做了角色切换的实验,有switchover已经failover,感觉受益颇多,而后继续研究了下dataguard的一些高级管理功能,所谓冰山一角,ORACLE果然博大精深,总结记录如下:1,ORACLE 11G dataguard的高级管理1.1.READ ONLY/WRITE模式打开物理STANDBY一般standby都是可以设置为mount状态的,于物理standby 可以有效分担primary 数据库压力,提升资源利用,实际上说的就是这

算法(第4版)-1.5 案例研究:union-find算法

问题→ 动态连通性:当程序从输入中读取了整数对p q时,如果已知的所有整数对都不能说明p和q是相连的,那么则将这一对整数写入到输出中.如果已知的数据可以说明p和q 是相连的,那么程序应该忽略p q这对整数并继续处理输入中的下一对整数. 该问题的应用→ 网络,变量名等价性,数字集合等. 设计API→ public class UF   UF(int N) 以整数标识(0到N-1)初始化N个触点 void union(int p, int q) 在p和q之间添加一条连接 int find(int p