三年前写过基于ConcurrentQueue的异步队列,今天在整理代码的时候发现当时另外一种实现方式-使用BlockingCollection实现,这种方式目前依然在实际项目中使用。关于BlockingCollection的基本使用请查阅MSDN。源码实现
下面直接上代码:(代码已经放到了我的github上)
using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Threading; using Danny.Infrastructure.Helper; namespace Danny.Infrastructure.Collections { /// <summary> /// 一个基于BlockingCollection实现的多线程的处理队列 /// </summary> public class ProcessQueue<T> { private BlockingCollection<T> _queue; private CancellationTokenSource _cancellationTokenSource; private CancellationToken _cancellToken; //内部线程池 private List<Thread> _threadCollection; //队列是否正在处理数据 private int _isProcessing; //有线程正在处理数据 private const int Processing = 1; //没有线程处理数据 private const int UnProcessing = 0; //队列是否可用 private volatile bool _enabled = true; //内部处理线程数量 private int _internalThreadCount; public event Action<T> ProcessItemEvent; //处理异常,需要三个参数,当前队列实例,异常,当时处理的数据 public event Action<dynamic,Exception,T> ProcessExceptionEvent; public ProcessQueue() { _queue=new BlockingCollection<T>(); _cancellationTokenSource = new CancellationTokenSource(); _internalThreadCount = 1; _cancellToken = _cancellationTokenSource.Token; _threadCollection = new List<Thread>(); } public ProcessQueue(int internalThreadCount):this() { this._internalThreadCount = internalThreadCount; } /// <summary> /// 队列内部元素的数量 /// </summary> public int GetInternalItemCount() { return _queue.Count; } public void Enqueue(T items) { if (items == null) { throw new ArgumentException("items"); } _queue.Add(items); DataAdded(); } public void Flush() { StopProcess(); while (_queue.Count != 0) { T item=default(T); if (_queue.TryTake(out item)) { try { ProcessItemEvent(item); } catch (Exception ex) { OnProcessException(ex,item); } } } } private void DataAdded() { if (_enabled) { if (!IsProcessingItem()) { ProcessRangeItem(); StartProcess(); } } } //判断是否队列有线程正在处理 private bool IsProcessingItem() { return !(Interlocked.CompareExchange(ref _isProcessing, Processing, UnProcessing) == UnProcessing); } private void ProcessRangeItem() { for (int i = 0; i < this._internalThreadCount; i++) { ProcessItem(); } } private void ProcessItem() { Thread currentThread = new Thread((state) => { T item=default(T); while (_enabled) { try { try { item = _queue.Take(_cancellToken); ProcessItemEvent(item); } catch (OperationCanceledException ex) { DebugHelper.DebugView(ex.ToString()); } } catch (Exception ex) { OnProcessException(ex,item); } } }); _threadCollection.Add(currentThread); } private void StartProcess() { foreach (var thread in _threadCollection) { thread.Start(); } } private void StopProcess() { this._enabled = false; foreach (var thread in _threadCollection) { if (thread.IsAlive) { thread.Join(); } } _threadCollection.Clear(); } private void OnProcessException(Exception ex,T item) { var tempException = ProcessExceptionEvent; Interlocked.CompareExchange(ref ProcessExceptionEvent, null, null); if (tempException != null) { ProcessExceptionEvent(this,ex,item); } } } }
使用方法:
class Program { static void Main(string[] args) { ProcessQueue<int> processQueue = new ProcessQueue<int>(); processQueue.ProcessExceptionEvent += ProcessQueue_ProcessExceptionEvent; processQueue.ProcessItemEvent += ProcessQueue_ProcessItemEvent; processQueue.Enqueue(1); processQueue.Enqueue(2); processQueue.Enqueue(3); } /// <summary> /// 该方法对入队的每个元素进行处理 /// </summary> /// <param name="value"></param> private static void ProcessQueue_ProcessItemEvent(int value) { Console.WriteLine(value); } /// <summary> /// 处理异常 /// </summary> /// <param name="obj">队列实例</param> /// <param name="ex">异常对象</param> /// <param name="value">出错的数据</param> private static void ProcessQueue_ProcessExceptionEvent(dynamic obj, Exception ex, int value) { Console.WriteLine(ex.ToString()); } }
时间: 2024-11-10 12:34:54