?
producer把消息发送到消息块,consumer从块读取消息。
安装:
- Install-Package Microsoft.Tpl.Dataflow
- ?
- using System.Threading.Tasks.Dataflow;
?
解释:
Produce方法随机生成字节,并Post到ITargetBlock对象;
Consumer方法从ISourceBlock对象读取字节;
可以使用BufferBlock来同时扮演源和目标对象。
Post():同步发送消息。
Complete():表明当前块(source block)已经没有数据更多的数据了。
Consumer方法使用await和async操作符异步地计算总的字节数。
OutputAvailableAsync():从source block收到一个通知(接收到Complete的通知),表明没有更多的数据可用。
?
- public
static
class DataflowProducerConsumer - {
- ????// Demonstrates the production end of the producer and consumer pattern.
- ????static
void Produce(ITargetBlock<byte[]> target) - ????{
- ????????Random rand = new Random();
- ?
- ????????// In a loop, fill a buffer with random data and
- ????????// post the buffer to the target block.
- ????????for (int i = 0; i < 100; i++)
- ????????{
- ????????????// Create an array to hold random byte data.
- ????????????byte[] buffer = new
byte[1024]; - ?
- ????????????// Fill the buffer with random bytes.
- ????????????rand.NextBytes(buffer);
- ?
- ????????????// Post the result to the message block.
- ????????????target.Post(buffer);
- ????????}
- ?
- ????????// Set the target to the completed state to signal to the consumer
- ????????// that no more data will be available.
- ????????target.Complete();
- ????}
- ?
- ????// Demonstrates the consumption end of the producer and consumer pattern.
- ????static async Task<int> ConsumeAsync(ISourceBlock<byte[]> source)
- ????{
- ????????// Initialize a counter to track the number of bytes that are processed.
- ????????int bytesProcessed = 0;
- ?
- ????????// Read from the source buffer until the source buffer has no
- ????????// available output data.
- ????????while (await source.OutputAvailableAsync())
- ????????{
- ????????????byte[] data = source.Receive();
- ?
- ????????????// Increment the count of bytes received.
- ????????????bytesProcessed += data.Length;
- ????????}
- ?
- ????????return bytesProcessed;
- ????}
- ?
- ????static
void Run(string[] args) - ????{
- ????????// Create a BufferBlock<byte[]> object. This object serves as the
- ????????// target block for the producer and the source block for the consumer.
- ????????var buffer = new BufferBlock<byte[]>();
- ?
- ????????// Start the consumer. The Consume method runs asynchronously.
- ????????var consumer = ConsumeAsync(buffer);
- ?
- ????????// Post source data to the dataflow block.
- ????????Produce(buffer);
- ?
- ????????// Wait for the consumer to process all data.
- ????????consumer.Wait();
- ?
- ????????// Print the count of bytes processed to the console.
- ????????Console.WriteLine("Processed {0} bytes.", consumer.Result);
- ????}
- }
?
参考:https://msdn.microsoft.com/en-us/library/hh228601(v=vs.110).aspx
时间: 2024-11-07 05:35:42