?
本文旨在演示ActionBlock的使用。
大致流程:
输入路径——读取字节——计算——传输到打印
?
- // Demonstrates how to provide delegates to exectution dataflow blocks.
- class DataflowExecutionBlocks
- {
- ????// 计算文件中包含零字节的总数
- ????static
int CountBytes(string path) - ????{
- ????????byte[] buffer = new
byte[1024]; - ????????int totalZeroBytesRead = 0;
- ????????using (var fileStream = File.OpenRead(path))
- ????????{
- ????????????int bytesRead = 0;
- ????????????do
- ????????????{
- ????????????????bytesRead = fileStream.Read(buffer, 0, buffer.Length);
- ????????????????totalZeroBytesRead += buffer.Count(b => b == 0);
- ????????????} while (bytesRead > 0);
- ????????}
- ?
- ????????return totalZeroBytesRead;
- ????}
- ?
- ????static
void Run(string[] args) - ????{
- ????????// 创建一个临时目录
- ????????string tempFile = Path.GetTempFileName();
- ?
- ????????// 随机写入数据
- ????????using (var fileStream = File.OpenWrite(tempFile))
- ????????{
- ????????????Random rand = new Random();
- ????????????byte[] buffer = new
byte[1024]; - ????????????for (int i = 0; i < 512; i++)
- ????????????{
- ????????????????rand.NextBytes(buffer);
- ????????????????fileStream.Write(buffer, 0, buffer.Length);
- ????????????}
- ????????}
- ?
- ????????// 创建一个ActionBlock<int> 对象来打印 读取到的字节数
- ????????var printResult = new ActionBlock<int>(zeroBytesRead =>
- ????????{
- ????????????Console.WriteLine("{0} contains {1} zero bytes.",
- ????????????????Path.GetFileName(tempFile), zeroBytesRead);
- ????????});
- ?
- ????????// 创基一个 TransformBlock<string, int>对象来调用CountBytes函数,并返回计算结果
- ????????var countBytes = new TransformBlock<string, int>(
- ????????????new Func<string, int>(CountBytes));
- ?
- ????????// 将两个块链接起来:TranformBlock<string,int>对象和ActionBlock对象。
- ????????countBytes.LinkTo(printResult);
- ?
- ????????// 创建一个连续任务:当TransformBlock<string, int>完成时,通知打印结果已完成
- ????????countBytes.Completion.ContinueWith(delegate { printResult.Complete(); });
- ?
- ????????// 输入临时目录
- ????????countBytes.Post(tempFile);
- ?
- ????????// 标识结束
- ????????countBytes.Complete();
- ?
- ????????// 等待打印完成
- ????????printResult.Completion.Wait();
- ?
- ????????File.Delete(tempFile);
- ????}
- }
?
TransformBlock:一般用作传输和计算。类似函数式编程中的Map操作。
Func<TInput,TOutput>委托,通过Post,输入一个TInput参数。
Complete(),表明已完成
Completion任务,完成之后,当前任务结束。本质上来讲,TranformBlock运行在一个Task中。
?
ActionBlock:
Action<TInput> action委托,单一输入,执行诸如打印之类的操作。
同样也有Complete()和Completion任务。
?
同时,以上两个均支持异步方法:
Transform
Action
?
- var countBytesAsync = new TransformBlock<string, int>(async path =>
- {
- ????byte[] buffer = new
byte[1024]; - ????int totalZeroBytesRead = 0;
- ????using (var fileStream = new FileStream(
- ????????path, FileMode.Open, FileAccess.Read, FileShare.Read, 0x1000, true))
- ????{
- ????????int bytesRead = 0;
- ????????do
- ????????{
- ????????????// Asynchronously read from the file stream.
- ????????????bytesRead = await fileStream.ReadAsync(buffer, 0, buffer.Length);
- ????????????totalZeroBytesRead += buffer.Count(b => b == 0);
- ????????} while (bytesRead > 0);
- ????}
- ?
- ????return totalZeroBytesRead;
- });
时间: 2024-11-05 13:39:30