异步简析之BlockingCollection实现生产消费模式

目前市面上有诸多的产品实现队列功能,比如Redis、MemCache等...

其实c#中也有一个基础的集合类专门用来实现生产/消费模式 (生产模式还是建议使用Redis等产品)

下面是官方的一些资料和介绍:

BlockingCollection是一个线程安全集合类,可提供以下功能:

  • 实现制造者-使用者模式。
  • 通过多线程并发添加和获取项。
  • 可选最大容量。
  • 集合为空或已满时通过插入和移除操作进行阻塞。
  • 插入和移除“尝试”操作不发生阻塞,或在指定时间段内发生阻塞。
  • 封装实现 IProducerConsumerCollection 的任何集合类型
  • 使用取消标记执行取消操作。
  • 支持使用 foreach(在 Visual Basic 中,使用 For Each)的两种枚举:
    • 只读枚举。
    • 在枚举项时将项移除的枚举。

BlockingCollection 支持限制和阻塞。 限制意味着可以设置集合的最大容量。 限制在某些情况中很重要,因为它使你能够控制内存中的集合的最大大小,并可阻止制造线程移动到离使用线程前方太远的位置。

可以看到该类是完全线程安全的,因此用来做生产/消费是非常合适的

如下代码演示了该类在异步环境中很好的执行着生产和消费任务

using System;
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Globalization;
using System.Threading;
using System.Threading.Tasks;
using Xunit;

namespace AsyncLearning
{
    public class TestProduceAndConsumer
    {
        /// <summary>
        /// BlockingCollection是线程安全的集合类型,支持多线程同时读写
        /// </summary>
        private readonly BlockingCollection<string> _blockingQueue =
            new BlockingCollection<string>();

        private void Produce()
        {
            for (int i = 0; i < 10; i++) //限制生产1000次
            {
                var now = DateTime.Now.ToString(CultureInfo.InvariantCulture);
                Debug.WriteLine($"第{i+1}次生产! {now}");
                _blockingQueue.Add(now);
                Thread.Sleep(1000);  //特意减慢生产过程以至于不会太快。。。方便演示
            }

            _blockingQueue.CompleteAdding(); //标记生产完成
        }

        private void Consume()
        {
            int i = 1;
            while (!_blockingQueue.IsCompleted)
            {
                var x = _blockingQueue.Take();
                Debug.WriteLine($"第{i}次消费 {x}");
                i++;
                Thread.Sleep(2000);  //故意减慢消费
            }
        }

        [Fact]
        public void Test()
        {

            Task.WaitAll(Task.Run(() => { Produce(); }), Task.Run(() => { Consume(); }));
        }
    }
}

输出如下:

?

可见Demo很好的按照设定执行了代码逻辑,由于故意设定了不同的sleep时间,可以看到消费是晚于生产的,而消费全部完成后本Demo的任务全部结束

原文地址:https://www.cnblogs.com/linkanyway/p/produce-consume-task-async.html

时间: 2024-11-15 00:04:22

异步简析之BlockingCollection实现生产消费模式的相关文章

Linux网络性能优化方法简析

Linux网络性能优化方法简析 2010-12-20 10:56 赵军 IBMDW 字号:T | T 性能问题永远是永恒的主题之一,而Linux在网络性能方面的优势则显而易见,这篇文章是对于Linux内核中提升网络性能的一些优化方法的简析,以让我们去后台看看魔术师表演用的盒子,同时也看看内核极客们是怎样灵活的,渐进的去解决这些实际的问题. AD:2014WOT全球软件技术峰会北京站 课程视频发布 对于网络的行为,可以简单划分为 3 条路径:1) 发送路径,2) 转发路径,3) 接收路径,而网络性

[转载] Thrift原理简析(JAVA)

转载自http://shift-alt-ctrl.iteye.com/blog/1987416 Apache Thrift是一个跨语言的服务框架,本质上为RPC,同时具有序列化.发序列化机制:当我们开发的service需要开放出去的时候,就会遇到跨语言调用的问题,JAVA语言开发了一个UserService用来提供获取用户信息的服务,如果服务消费端有PHP/Python/C++等,我们不可能为所有的语言都适配出相应的调用方式,有时候我们会很无奈的使用Http来作为访问协议;但是如果服务消费端不能

Android -- MediaPlayer内部实现简析

Android -- MediaPlayer内部实现简析 在之前的博客中,已经介绍了使用MediaPlayer时要注意的内容.现在,这里就通过一个MediaPlayer代码实例,来进一步分析MediaPlayer内部是如何运作.实现的:当然这里的分析只截止到底层调用播放器之前,因为播放器这块实在是没搞懂. 我们使用的例子来源于之前MediaPlayer Playback译文中的官方实例: String url = "http://........"; // your URL here

PostgreSQL的 pg_hba.conf 文件简析

最近试用PostgreSQL 9.04,将pg_hba.conf配置的一些心得分享. pg_hba.conf是客户端认证配置文件,定义如何认证客户端. 下面是常用的pg_hba.conf配置: 1 2 3 4 5 6 7 8 9 10 # TYPE  DATABASE  USER  CIDR-ADDRESS  METHOD   # "local" is for Unix domain socket connections only local    all      all      

linux网络流控-htb算法简析

项目中用tc,htb做流控期间,研究了htb(分层令牌桶)算法的实现.觉得这种思想在类似与有消费优先级的生产者消费者场景中也很适用.该算法过于复杂,碍于嘴拙遂在标题中加了简析,只介绍核心思想和关键代码的实现. 一个栗子: tc qdisc add dev eth0 root handle 1: htb tc class add dev eth0 parent 1: classid 1:1 htb rate 100mibps tc class add dev eth0 parent 1:1 cla

0002 - Spring MVC 拦截器源码简析:拦截器加载与执行

1.概述 Spring MVC中的拦截器(Interceptor)类似于Servlet中的过滤器(Filter),它主要用于拦截用户请求并作相应的处理.例如通过拦截器可以进行权限验证.记录请求信息的日志.判断用户是否登录等. 2.简单示例 2.1.继承 HandlerInterceptorAdapter 抽象类实现一个拦截器.代码如下: public class DemoInterceptor extends HandlerInterceptorAdapter { @Override    pu

简析 Golang IO 包

简析 Golang IO 包 io 包提供了 I/O 原语(primitives)的基本接口.io 包中定义了四个最基本接口 Reader.Writer.Closer.Seeker 用于表示二进制流的读.写.关闭和寻址操作.这些原语和接口是对底层操作的封装,因此如没有特殊说明,这些原语和接口都不能被视为线程安全的. Reader Reader 接口封装了基本的 Read 方法.Read 读取长度为 len(p) 字节的数据,并写入到 p.返回结果包含读取数据字节数(0 <= n <= len(

web应用构架LAMT及tomcat负载简析

Httpd    (mod_jk.so) workers.properties文件 uriworkermap.properties文件 <--AJP1.3--> Tomcat  --> jdk 大致流程:apache服务器通过mod_jk.so 模块处理jsp文件的动态请求.通过tomcat worker等待执行servlet/JSP的tomcat实例.使用 AJP1.3协议与tomcat通信.tomcat有借助jdk解析. 负载就是 多台tomcat.共同解析apache发送的jsp请

CentOS的网络配置简析

我们在进行对CentOS的网络配置时,一般会从IP地址(IPADDR).子网掩码(NETMASK).网关(Gateway).主机名(HOSTNAME).DNS服务器等方面入手.而在CentOS中,又有着不同的命令或配置文件可以完成这些配置操作,接下来,我们将从ifcfg系命令,iproute2系命令以及配置文件3个方面来简析网络配置的方法. 一.ifcfg系命令 ifcfg系命令包括ifconfig,route,netstat和hostname. 1.ifconfig命令 用来配置一个网络接口.