How to: 使用 数据流 实现生产者-消费者模式

?

producer把消息发送到消息块,consumer从块读取消息。

安装:

  1. Install-Package Microsoft.Tpl.Dataflow
  2. ?
  3. using System.Threading.Tasks.Dataflow;

?

解释:

Produce方法随机生成字节,并Post到ITargetBlock对象;

Consumer方法从ISourceBlock对象读取字节;

可以使用BufferBlock来同时扮演源和目标对象。

Post():同步发送消息。

Complete():表明当前块(source block)已经没有数据更多的数据了。

Consumer方法使用await和async操作符异步地计算总的字节数。

OutputAvailableAsync():从source block收到一个通知(接收到Complete的通知),表明没有更多的数据可用。

?

  1. public
    static
    class DataflowProducerConsumer
  2. {
  3. ????// Demonstrates the production end of the producer and consumer pattern.
  4. ????static
    void Produce(ITargetBlock<byte[]> target)
  5. ????{
  6. ????????Random rand = new Random();
  7. ?
  8. ????????// In a loop, fill a buffer with random data and
  9. ????????// post the buffer to the target block.
  10. ????????for (int i = 0; i < 100; i++)
  11. ????????{
  12. ????????????// Create an array to hold random byte data.
  13. ????????????byte[] buffer = new
    byte[1024];
  14. ?
  15. ????????????// Fill the buffer with random bytes.
  16. ????????????rand.NextBytes(buffer);
  17. ?
  18. ????????????// Post the result to the message block.
  19. ????????????target.Post(buffer);
  20. ????????}
  21. ?
  22. ????????// Set the target to the completed state to signal to the consumer
  23. ????????// that no more data will be available.
  24. ????????target.Complete();
  25. ????}
  26. ?
  27. ????// Demonstrates the consumption end of the producer and consumer pattern.
  28. ????static async Task<int> ConsumeAsync(ISourceBlock<byte[]> source)
  29. ????{
  30. ????????// Initialize a counter to track the number of bytes that are processed.
  31. ????????int bytesProcessed = 0;
  32. ?
  33. ????????// Read from the source buffer until the source buffer has no
  34. ????????// available output data.
  35. ????????while (await source.OutputAvailableAsync())
  36. ????????{
  37. ????????????byte[] data = source.Receive();
  38. ?
  39. ????????????// Increment the count of bytes received.
  40. ????????????bytesProcessed += data.Length;
  41. ????????}
  42. ?
  43. ????????return bytesProcessed;
  44. ????}
  45. ?
  46. ????static
    void Run(string[] args)
  47. ????{
  48. ????????// Create a BufferBlock<byte[]> object. This object serves as the
  49. ????????// target block for the producer and the source block for the consumer.
  50. ????????var buffer = new BufferBlock<byte[]>();
  51. ?
  52. ????????// Start the consumer. The Consume method runs asynchronously.
  53. ????????var consumer = ConsumeAsync(buffer);
  54. ?
  55. ????????// Post source data to the dataflow block.
  56. ????????Produce(buffer);
  57. ?
  58. ????????// Wait for the consumer to process all data.
  59. ????????consumer.Wait();
  60. ?
  61. ????????// Print the count of bytes processed to the console.
  62. ????????Console.WriteLine("Processed {0} bytes.", consumer.Result);
  63. ????}
  64. }

?

参考:https://msdn.microsoft.com/en-us/library/hh228601(v=vs.110).aspx

时间: 2024-11-07 05:35:42

How to: 使用 数据流 实现生产者-消费者模式的相关文章

阻塞队列和生产者-消费者模式

阻塞队列提供了可阻塞的put和take方法.如果队列满了put将阻塞到有空间可用,如果队列为空,take将阻塞到有元素可用.队列可以是有界和无界的,无界的队列put将不会阻塞. 阻塞队列支持生产者消费者模式,该模式将找出需要完成的工作,和执行工作分开.生产者-消费者模式能简化开发过程,因为消除了生产者和消费者之间的代码依赖性,此外,该模式还将生产数据的过程和使用数据的过程解耦开来. 在基于阻塞队列构建的生产者-消费者设计中个,当数据生成时,生产者把数据放入队列,当消费者处理数据时,将从队列中获取

并发编程基础之生产者消费者模式

一:概念 生产者消费者模式是java并发编程中很经典的并发情况,首先有一个大的容器,生产者put元素到 容器中,消费者take元素出来,如果元素的数量超过容器的容量时,生产者不能再往容器中put元素 ,处于阻塞状态,如果元素的数量等于0,则消费者不能在从容器中take数据,处于阻塞状态. 二:示例 /** * */ package com.hlcui.main; import java.util.LinkedList; import java.util.concurrent.ExecutorSe

生产者消费者模式

什么是生产者消费者模式   在工作中,大家可能会碰到这样一种情况:某个模块负责产生数据,这些数据由另一个模块来负责处理(此处的模块是广义的,可以是类.函数.线程.进程等).产生数据的模块,就形象地称为生产者:而处理数据的模块,就称为消费者.在生产者与消费者之间在加个缓冲区,我们形象的称之为仓库,生产者负责往仓库了进商品,而消费者负责从仓库里拿商品,这就构成了生产者消费者模式.结构图如下: 生产者消费者模式的优点 1.解耦 假设生产者和消费者分别是两个类.如果让生产者直接调用消费者的某个方法,那

使用BlockingQueue的生产者消费者模式

BlockingQueue很好的解决了多线程中,如何高效安全“传输”数据的问题.通过这些高效并且线程安全的队列类,为我们快速搭建高质量的多线程程序带来极大的便利.使用场景. 首先它是一个队列,而一个队列在数据结构中所起的作用大致如下图所示: 通过一个共享的队列,可以使得数据由队列的一端输入,从另外一端输出:在生产者消费者模式中,通过队列的方式可以很方便的实现两者之间的数据共享.强大的BlockingQueue使我们不用关心什么时候需要阻塞线程,什么时候需要唤醒线程. BlockingQueue的

生产者消费者模式(转)

本文转载自博文系列架构设计:生产者/消费者模式.文中对原文格式进行了稍加整理. 概述 今天打算来介绍一下“生产者/消费者模式”,这玩意儿在很多开发领域都能派上用场.由于该模式很重要,打算分几个帖子来介绍.今天这个帖子先来扫盲一把.如果你对这个模式已经比较了解,请跳过本扫盲帖,直接看下一个帖子(关于该模式的具体应用) . 看到这里,可能有同学心中犯嘀咕了:在四人帮(GOF)的23种模式里面似乎没听说过这种嘛!其实GOF那经典的23种模式主要是基于OO的(从书名<Design Patterns: E

生产者消费者模式(吃包子例子)

生产者-消费者问题是一个经典的进程同步问 题,该问题最早由Dijkstra提出,用以演示他提出的信号量机制.在同一个进程地址空间内执行的两个线程生产者线程生产物品,然后将物品放置在一个空 缓冲区中供消费者线程消费.消费者线程从缓冲区中获得物品,然后释放缓冲区.当生产者线程生产物品时,如果没有空缓冲区可用,那么生产者线程必须等待消费 者线程释放出一个空缓冲区.当消费者线程消费物品时,如果没有满的缓冲区,那么消费者线程将被阻塞,直到新的物品被生产出来. 生产者消费者模式是并发.多线程编程中经典的设计

Java 并发编程(四)阻塞队列和生产者-消费者模式

阻塞队列 阻塞队列提供了可阻塞的 put 和 take 方法,以及支持定时的 offer 和 poll 方法.如果队列已经满了,那么put方法将阻塞直到有空间可以用:如果队列为空,那么take方法将一直阻塞直到有元素可用.队列可以使有界的,也可以是无界的,无界队列永远都不会充满,因此无界队列上的put方法永远不会阻塞.一种常见的阻塞生产者-消费者模式就是线程池与工作队列的组合,在 Executor 任务执行框架中就体现了这种模式. 意义:该模式能简化开发过程,因为他消除了生产者和消费者类之间的代

关于java中生产者消费者模式的理解

在说生产者消费者模式之前,我觉得有必要理解一下 Obj.wait(),与Obj.notify()方法.wait()方法是指在持有对象锁的线程调用此方法时,会释放对象锁,同时休眠本线程.notify()方法是持有相同的对象锁来唤醒休眠的线程,使其具有抢占cpu的资格.可以理解同步方法,同步方法的对象锁就是谁调用这个方法,这个对象就是对象锁. 根据李兴华老师的视频讲解,建立一个生产者类,一个消费者类,还有一个Info类,贴上代码: 1.生产者类 package com.company; /** *

python 多线程笔记(6)-- 生产者/消费者模式(续)

用 threading.Event() 也可以实现生产者/消费者模式 (自己拍脑袋想出来的,无法知道其正确性,请大神告知为谢!) import threading import time import random products = 20 class Producer(threading.Thread): '''生产者''' ix = [0] # 生产者实例个数 # 闭包,必须是数组,不能直接 ix = 0 def __init__(self): super().__init__() sel