.Net中的并行编程-7.基于BlockingCollection实现高性能异步队列

  三年前写过基于ConcurrentQueue的异步队列,今天在整理代码的时候发现当时另外一种实现方式-使用BlockingCollection实现,这种方式目前依然在实际项目中使用。关于BlockingCollection的基本使用请查阅MSDN源码实现

下面直接上代码:(代码已经放到了我的github上)

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading;
using Danny.Infrastructure.Helper;

namespace Danny.Infrastructure.Collections
{
    /// <summary>
    /// 一个基于BlockingCollection实现的多线程的处理队列
    /// </summary>
    public class ProcessQueue<T>
    {
        private  BlockingCollection<T> _queue;
        private CancellationTokenSource _cancellationTokenSource;
        private CancellationToken _cancellToken;
        //内部线程池
        private List<Thread> _threadCollection;

        //队列是否正在处理数据
        private int _isProcessing;
        //有线程正在处理数据
        private const int Processing = 1;
        //没有线程处理数据
        private const int UnProcessing = 0;
        //队列是否可用
        private volatile bool _enabled = true;
        //内部处理线程数量
        private int _internalThreadCount;

        public event Action<T> ProcessItemEvent;
        //处理异常,需要三个参数,当前队列实例,异常,当时处理的数据
        public event Action<dynamic,Exception,T> ProcessExceptionEvent;

        public ProcessQueue()
        {
            _queue=new BlockingCollection<T>();
            _cancellationTokenSource = new CancellationTokenSource();
            _internalThreadCount = 1;
            _cancellToken = _cancellationTokenSource.Token;
            _threadCollection = new List<Thread>();
        }

        public ProcessQueue(int internalThreadCount):this()
        {
            this._internalThreadCount = internalThreadCount;
        }

        /// <summary>
        /// 队列内部元素的数量
        /// </summary>
        public int GetInternalItemCount()
        {
            return _queue.Count;
        }

        public void Enqueue(T items)
        {
            if (items == null)
            {
                throw new ArgumentException("items");
            }

            _queue.Add(items);
            DataAdded();
        }

        public void Flush()
        {
            StopProcess();

            while (_queue.Count != 0)
            {
                T item=default(T);
                if (_queue.TryTake(out item))
                {
                    try
                    {
                        ProcessItemEvent(item);
                    }
                    catch (Exception ex)
                    {
                        OnProcessException(ex,item);
                    }
                }
            }
        }

        private void DataAdded()
        {
            if (_enabled)
            {
                if (!IsProcessingItem())
                {
                    ProcessRangeItem();
                    StartProcess();
                }
            }
        }

        //判断是否队列有线程正在处理
        private bool IsProcessingItem()
        {
            return !(Interlocked.CompareExchange(ref _isProcessing, Processing, UnProcessing) == UnProcessing);
        }

        private void ProcessRangeItem()
        {
            for (int i = 0; i < this._internalThreadCount; i++)
            {
                ProcessItem();
            }
        }

        private void ProcessItem()
        {
            Thread currentThread = new Thread((state) =>
            {
                T item=default(T);
                while (_enabled)
                {
                    try
                    {
                        try
                        {
                            item = _queue.Take(_cancellToken);
                            ProcessItemEvent(item);
                        }
                        catch (OperationCanceledException ex)
                        {
                            DebugHelper.DebugView(ex.ToString());
                        }

                    }
                    catch (Exception ex)
                    {
                        OnProcessException(ex,item);
                    }
                }

            });

            _threadCollection.Add(currentThread);
        }

        private void StartProcess()
        {
            foreach (var thread in _threadCollection)
            {
                thread.Start();
            }
        }

        private void StopProcess()
        {
            this._enabled = false;
            foreach (var thread in _threadCollection)
            {
                if (thread.IsAlive)
                {
                    thread.Join();
                }
            }
            _threadCollection.Clear();
        }

        private void OnProcessException(Exception ex,T item)
        {
            var tempException = ProcessExceptionEvent;
            Interlocked.CompareExchange(ref ProcessExceptionEvent, null, null);

            if (tempException != null)
            {
                ProcessExceptionEvent(this,ex,item);
            }
        }

    }
}

使用方法:

class Program
    {
        static void Main(string[] args)
        {
            ProcessQueue<int> processQueue = new ProcessQueue<int>();
            processQueue.ProcessExceptionEvent += ProcessQueue_ProcessExceptionEvent;
            processQueue.ProcessItemEvent += ProcessQueue_ProcessItemEvent;

            processQueue.Enqueue(1);
            processQueue.Enqueue(2);
            processQueue.Enqueue(3);

        }

        /// <summary>
        /// 该方法对入队的每个元素进行处理
        /// </summary>
        /// <param name="value"></param>
        private static void ProcessQueue_ProcessItemEvent(int value)
        {
            Console.WriteLine(value);
        }

        /// <summary>
        ///  处理异常
        /// </summary>
        /// <param name="obj">队列实例</param>
        /// <param name="ex">异常对象</param>
        /// <param name="value">出错的数据</param>
        private static void ProcessQueue_ProcessExceptionEvent(dynamic obj, Exception ex, int value)
        {
            Console.WriteLine(ex.ToString());
        }
    }
时间: 2024-11-10 12:34:54

.Net中的并行编程-7.基于BlockingCollection实现高性能异步队列的相关文章

.Net中的并行编程-1.路线图(转)

大神,大神,膜拜膜拜,原文地址:http://www.cnblogs.com/zw369/p/3834559.html 目录 .Net中的并行编程-1.路线图 分析.Net里线程同步机制 .Net中的并行编程-2.ConcurrentStack的实现与分析 .Net中的并行编程-3.ConcurrentQueue实现与分析 .Net中的并行编程-4.实现高性能异步队列 .Net中的并行编程-5.流水线模型实战 .Net中的并行编程-6.常用优化策略 .Net中的并行编程-7.基于Blocking

.Net中的并行编程-4.实现高性能异步队列

上文<.Net中的并行编程-3.ConcurrentQueue实现与分析>分析了ConcurrentQueue的实现,本章就基于ConcurrentQueue实现一个高性能的异步队列,该队列主要用于实时数据流的处理并简化多线程编程模型.设计该队列时考虑以下几点需求(需求来自公司的一个实际项目): 1. 支持多线程入队出队,尽量简化多线程编程的复杂度. 2. 支持事件触发机制,数据入队时才进行处理而不是使用定时处理机制, 而且内部能阻塞消费者线程. 3. 出队时数据处理的顺序要保证和入队时是一致

.Net中的并行编程-3.ConcurrentQueue实现与分析

在上文<.Net中的并行编程-2.ConcurrentQueue的实现与分析> 中解释了无锁的相关概念,无独有偶BCL提供的ConcurrentQueue也是基于原子操作实现, 由于ConcurrentQueue的代码较多所以本文主要分析几个常用操作: 入队(EnQueue) .出队(TryDequeue) .是否为空(IsEmpty).获取队列内元素数量(Count). 一.ConcurrentQueue内部结构: 1.实现原理 众所周知,在普通的非线程安全队列有两种实现方式: 1.使用数组

.Net中的并行编程-5.流水线模型实战

自己在Excel整理了很多想写的话题,但苦于最近比较忙(其实这是借口).... 上篇文章<.Net中的并行编程-4.实现高性能异步队列>介绍了异步队列的实现,本篇文章介绍我实际工作者遇到了处理多线程问题及基于异步队列底层数据结构的解决方案. 需求如下:1.提供数据服写入务供上层应用调用,数据写入服务处理的吞吐量要达到60w/s每秒,也就是用户每秒发送60w的数据然后通过数据写入服务写到数据库中(数据库为公司自主研发的实时数据库). 2.尽量简化上层应用调用服务的复杂度. 一.分析性能瓶颈: 1

NET中的并行编程(TPL)——多线程、异步、任务和并行计算

https://masuit.com/1201 谈一谈.NET中的并行编程(TPL)——多线程.异步.任务和并行计算 懒得勤快 发表于2018-04-26 19:41:00 | 最后修改于2018-06-27 23:44:40 .NET 多线程 异步 高并发 分类:.NET开发技术 | 评论总数:0条 | 热度:2243℃ 我要编辑 写在前面: 在做了几个月的高并发项目的过程中,其实发现自己真的提升了不少,所以也想把这段时间的收获分享给大家,然后写这篇文章发现,写下来是一发不可收拾,所以这篇文章

.Net中的并行编程-6.常用优化策略

            本文是.Net中的并行编程第六篇,今天就介绍一些我在实际项目中的一些常用优化策略.      一.避免线程之间共享数据 避免线程之间共享数据主要是因为锁的问题,无论什么粒度的锁,最好的线程之间同步方式就是不加锁,这个地方主要措施就是找出数据之间的哪个地方需要共享数据和不需要共享数据的地方,再设计上避免多线程之间共享数据. 在以前做过的某项目,开始时设计的方案: 开始设计时所有的数据都放入到了公共队列,然后队列通知多个线程去处理数据,队列采用互斥锁保证线程同步,造成的结果就

.Net中的并行编程-2.ConcurrentStack的实现与分析

在上篇文章<.net中的并行编程-1.基础知识>中列出了在.net进行多核或并行编程中需要的基础知识,今天就来分析在基础知识树中一个比较简单常用的并发数据结构--.net类库中无锁栈的实现. 首先解释一下什么这里“无锁”的相关概念. 所谓无锁其实就是在普通栈的实现方式上使用了原子操作,原子操作的原理就是CPU在系统总线上设置一个信号,当其他线程对同一块内存进行访问时CPU监测到该信号存在会,然后当前线程会等待信号释放后才能对内存进行访问.原子操作都是由操作系统API实现底层由硬件支持,常用的操

.net中的并行编程系列-1.基础知识

最近半年一直研究用.net进行并行程序的开发与设计,再研究的过程中颇有收获,所以画了一个图总结了一下并行编程的基础知识点,这些知识点是并行编程的基础,有助于我们编程高性能的程序,里面的某些结构实现机制也蕴含着丰富的软件设计思想,在后续的文章中我会对图里面提到某些数据结构或同步机制的源码进行分析. 注:虽然使用的平台是.net ,但大部分知识点和平台以及语言无关,相关数据结构其他相关平台都有实现,包括优化手段都非常相似. .net中的并行编程系列-1.基础知识,布布扣,bubuko.com

spark 中的RDD编程 -以下基于Java api

1.RDD介绍:     RDD,弹性分布式数据集,即分布式的元素集合.在spark中,对所有数据的操作不外乎是创建RDD.转化已有的RDD以及调用RDD操作进行求值.在这一切的背后,Spark会自动将RDD中的数据分发到集群中,并将操作并行化. Spark中的RDD就是一个不可变的分布式对象集合.每个RDD都被分为多个分区,这些分区运行在集群中的不同节点上.RDD可以包含Python,Java,Scala中任意类型的对象,甚至可以包含用户自定义的对象. 用户可以使用两种方法创建RDD:读取一个