TPL Dataflow库的几个扩展函数
TPL Dataflow是微软面向高并发应用而推出的新程序库。借助于异步消息传递与管道,它可以提供比线程池更好的控制。本身TPL库在DataflowBlock类中提供了不少扩展函数,用起来还是非常方便的,但感觉还是不够全(当然,MS没必要设计大而全的接口),前段时间写个小程序的时候用到了它,当时顺便写了几个扩展函数,这里记录一下,如果后续有扩展再继续补充。
static class DataFlowExtension { /// <summary> ///同步发送所有数据至TargetBlock /// </summary> public static void PostAll<T>(this ITargetBlock<T> target, IEnumerable<T> source) { var isSuccess = source.All(i => target.Post(i)); if (!isSuccess) { throw new InvalidOperationException(); } target.Complete(); }
/// <summary> ///异步发送所有数据至TargetBlock /// </summary> public static async Task PostAllAsync<T>(this ITargetBlock<T> target, IEnumerable<T> source) { foreach (var item in source) { await target.SendAsync(item); } target.Complete(); } /// <summary> ///同步从数据源中获取所有数据 /// </summary> public static IReadOnlyList<T> ReceiveAll<T>(this IReceivableSourceBlock<T> source) { IList<T> output; if (!source.TryReceiveAll(out output)) { throw new InvalidOperationException(); }
return output as IReadOnlyList<T>; }
/// <summary> ///异步从数据源中获取所有数据 /// </summary> public static async Task<IReadOnlyList<T>> ReceiveAllAsync<T>(this ISourceBlock<T> source) { List<T> output = new List<T>(); while (await source.OutputAvailableAsync()) { output.Add(source.Receive()); } return output; } }
这几个扩展函数本身是对DataflowBlock类中的函数二次封装,没有太多的功能,基本上每个函数都只有几行,主要为了使用更加方便罢了,由于实现简单,扩充它也是非常方便的。