基于阻塞队列的生产者消费者C#并发设计

这是从上文的<<图文并茂的生产者消费者应用实例demo>>整理总结出来的,具体就不说了,直接给出代码,注释我已经加了,原来的code请看<<.Net中的并行编程-7.基于BlockingCollection实现高性能异步队列>>,我改成适合我的版本了,直接给code:

调用code:

static void Main(string[] args)
        {
            ProcessQueue<int> processQueue = new ProcessQueue<int>();
            processQueue.ProcessExceptionEvent += ProcessQueue_ProcessExceptionEvent;
            processQueue.ProcessItemEvent += ProcessQueue_ProcessItemEvent;

            for (int i = 0; i < 50; i++)
            {
                processQueue.Enqueue(i);
            }

            Console.WriteLine("阻塞队列的数量: {0}", processQueue.GetInternalItemCount());

            processQueue.Flush();

            Console.Read();
        }

        /// <summary>
        /// 该方法对入队的每个元素进行处理
        /// </summary>
        /// <param name="value"></param>
        private static void ProcessQueue_ProcessItemEvent(int value)
        {
            Console.WriteLine("输出: {0}", value);
        }

        /// <summary>
        ///  处理异常
        /// </summary>
        /// <param name="obj">队列实例</param>
        /// <param name="ex">异常对象</param>
        /// <param name="value">出错的数据</param>
        private static void ProcessQueue_ProcessExceptionEvent(dynamic obj, Exception ex, int value)
        {
            Console.WriteLine(ex.ToString());
        }

封装的队列:

public class ProcessQueue<T>
    {
        private BlockingCollection<T> _queue;
        private CancellationTokenSource _cancellationTokenSource;
        private CancellationToken _cancellToken;
        //内部线程池
        private List<Thread> _threadCollection;

        //队列是否正在处理数据
        private int _isProcessing;
        //有线程正在处理数据
        private const int Processing = 1;
        //没有线程处理数据
        private const int UnProcessing = 0;
        //队列是否可用
        private volatile bool _enabled = true;
        //内部处理线程数量
        private int _internalThreadCount;
        // 消费者处理事件
        public event Action<T> ProcessItemEvent;
        //处理异常,需要三个参数,当前队列实例,异常,当时处理的数据
        public event Action<dynamic, Exception, T> ProcessExceptionEvent;

        public ProcessQueue()
        {
            _queue = new BlockingCollection<T>();
            _cancellationTokenSource = new CancellationTokenSource();
            _internalThreadCount = 3;
            _cancellToken = _cancellationTokenSource.Token;
            _threadCollection = new List<Thread>();
        }

        public ProcessQueue(int internalThreadCount) : this()
        {
            this._internalThreadCount = internalThreadCount;
        }

        /// <summary>
        /// 队列内部元素的数量
        /// </summary>
        public int GetInternalItemCount()
        {
            //return _queue.Count;
            return _threadCollection.Count;
        }
        //生产者生产
        public void Enqueue(T items)
        {
            if (items == null)
            {
                throw new ArgumentException("items");
            }

            _queue.Add(items);
            DataAdded();
        }

        public void Flush()
        {
            StopProcess();

            while (_queue.Count != 0)
            {
                T item = default(T);
                if (_queue.TryTake(out item))
                {
                    try
                    {
                        ProcessItemEvent(item);
                    }
                    catch (Exception ex)
                    {
                        OnProcessException(ex, item);
                    }
                }
            }
        }
        // 通知消费者消费队列元素
        private void DataAdded()
        {
            if (_enabled)
            {
                if (!IsProcessingItem())
                {
                    Console.WriteLine("DataAdded");
                    ProcessRangeItem();
                    StartProcess();
                }
            }
        }

        //判断是否队列有线程正在处理
        private bool IsProcessingItem()
        {
            // 替换第一个参数, 如果相等
            //int x = Interlocked.CompareExchange(ref _isProcessing, Processing, UnProcessing);
            return !(Interlocked.CompareExchange(ref _isProcessing, Processing, UnProcessing) == UnProcessing);
        }
        // 多消费者消费
        private void ProcessRangeItem()
        {
            for (int i = 0; i < this._internalThreadCount; i++)
            {
                ProcessItem();
            }
        }
        // 开启消费处理
        private void ProcessItem()
        {
            Thread currentThread = new Thread((state) =>
            {
                T item = default(T);
                while (_enabled)
                {
                    try
                    {
                        try
                        {
                            if (!_queue.TryTake(out item))
                            {
                                //Console.WriteLine("阻塞队列为0时的item: {0}", item);
                                //Console.WriteLine("ok!!!");
                                break;
                            }
                            // 处理事件
                            ProcessItemEvent(item);
                        }
                        catch (OperationCanceledException ex)
                        {
                            DebugHelper.DebugView(ex.ToString());
                        }

                    }
                    catch (Exception ex)
                    {
                        OnProcessException(ex, item);
                    }
                }
            });
            _threadCollection.Add(currentThread);
        }
        // 开启消费者
        private void StartProcess()
        {
            //Console.WriteLine("线程的数量: {0}", _threadCollection.Count);
            foreach (var thread in _threadCollection)
            {
                thread.Start();
                thread.IsBackground = true;
            }
        }
        // 终止运行
        private void StopProcess()
        {
            this._enabled = false;
            foreach (var thread in _threadCollection)
            {
                if (thread.IsAlive)
                {
                    thread.Join();
                }
            }
            _threadCollection.Clear();
        }

        private void OnProcessException(Exception ex, T item)
        {
            var tempException = ProcessExceptionEvent;
            Interlocked.CompareExchange(ref ProcessExceptionEvent, null, null);

            if (tempException != null)
            {
                ProcessExceptionEvent(this, ex, item);
            }
        }

    }

原文地址:https://www.cnblogs.com/zhiyong-ITNote/p/8312965.html

时间: 2024-10-13 16:11:43

基于阻塞队列的生产者消费者C#并发设计的相关文章

Java并发(基础知识)—— 阻塞队列和生产者消费者模式

1.阻塞队列 BlockingQueue是线程安全的Queue版本,从它的名字就可以看出,它是一个支持阻塞的Queue实现:当向空BlockingQueue请求数据时,它会阻塞至BlockingQueue非空:当向一个已满BlockingQueue插入数据时,线程会阻塞至BlockingQueue可插入. BlockingQueue 的方法以四种形式出现,对于不能立即满足但可能在将来某一时刻可以满足的操作,这四种形式的处理方式不同:第一种是抛出一个异常,第二种是返回一个特殊值(null 或 fa

并发编程—— 阻塞队列和生产者-消费者模式

Java并发编程实践 目录 并发编程—— ConcurrentHashMap 并发编程—— 阻塞队列和生产者-消费者模式 概述 第1部分 为什么要使用生产者和消费者模式 第2部分 什么是生产者消费者模式 第3部分 代码示例 第1部分 为什么要使用生产者和消费者模式 在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程.在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据.同样的道理,如果消费者的处理能力大于生产者,那么消费

阻塞队列和生产者-消费者模式

阻塞队列提供了可阻塞的put和take方法.如果队列满了put将阻塞到有空间可用,如果队列为空,take将阻塞到有元素可用.队列可以是有界和无界的,无界的队列put将不会阻塞. 阻塞队列支持生产者消费者模式,该模式将找出需要完成的工作,和执行工作分开.生产者-消费者模式能简化开发过程,因为消除了生产者和消费者之间的代码依赖性,此外,该模式还将生产数据的过程和使用数据的过程解耦开来. 在基于阻塞队列构建的生产者-消费者设计中个,当数据生成时,生产者把数据放入队列,当消费者处理数据时,将从队列中获取

阻塞队列和生产者-消费者模式、DelayQueue

1.ArrayDeque, (数组双端队列) 2.PriorityQueue, (优先级队列) 3.ConcurrentLinkedQueue, (基于链表的并发队列) 4.DelayQueue,                                         (延期阻塞队列)(阻塞队列实现了BlockingQueue接口) 5.ArrayBlockingQueue,           (基于数组的并发阻塞队列) 6.LinkedBlockingQueue,        (基

使用阻塞队列实现生产者-消费者模型

生产者-消费者模问题 /** * 使用阻塞队列实现生产者-消费者模型 * 阻塞队列只允许元素以FIFO的方式来访问 * @author Bingyue * */ public class ProducerCustomerPattern { public static void main(String[] args) { //生产者和消费者共享的存储区域 BlockingQueue<Integer> blockQueue=new LinkedBlockingQueue(); /** * 此处外部

Java 并发编程(四)阻塞队列和生产者-消费者模式

阻塞队列 阻塞队列提供了可阻塞的 put 和 take 方法,以及支持定时的 offer 和 poll 方法.如果队列已经满了,那么put方法将阻塞直到有空间可以用:如果队列为空,那么take方法将一直阻塞直到有元素可用.队列可以使有界的,也可以是无界的,无界队列永远都不会充满,因此无界队列上的put方法永远不会阻塞.一种常见的阻塞生产者-消费者模式就是线程池与工作队列的组合,在 Executor 任务执行框架中就体现了这种模式. 意义:该模式能简化开发过程,因为他消除了生产者和消费者类之间的代

11.python并发入门(part8 基于线程队列实现生产者消费者模型)

一.什么是生产者消费者模型? 生产者就是生产数据的线程,消费者指的就是消费数据的线程. 在多线程开发过程中,生产者的速度比消费者的速度快,那么生产者就必须等待消费者把数据处理完,生产者才会产生新的数据,相对的,如果消费者处理数据的速度大于生产者,那么消费者就必须等待生产者. 为了解决这种问题,就有了生产者消费者模型. 生产者与消费者模型,是通过一个容器,来解决生产者和消费者之间的耦合性问题,生产者和消费者之间并不会直接通信,这样生产者就无需等待消费者处理完数据,生产者可以直接把数据扔给队列,这个

阻塞队列实现生产者消费者模式

阻塞队列的特点:当队列元素已满的时候,阻塞插入操作: 当队列元素为空的时候,阻塞获取操作: 生产者线程:Producer 1 package test7; 2 3 import java.util.concurrent.BlockingQueue; 4 5 public class Producer implements Runnable{ 6 7 private final BlockingQueue queue; 8 public Producer(BlockingQueue queue){

【译】使用阻塞队列解决生产者-消费者问题

如果你想避免使用错综复杂的wait–notify的语句,BlockingQueue非常有用.BlockingQueue可用于解决生产者-消费者问题,如下代码示例.对于每个开发人员来说,生产者消费者问题已经非常熟悉了,这里我将不做详细描述. 为什么BlockingQueue适合解决生产者消费者问题 任何有效的生产者-消费者问题解决方案都是通过控制生产者put()方法(生产资源)和消费者take()方法(消费资源)的调用来实现的,一旦你实现了对方法的阻塞控制,那么你将解决该问题. Java通过Blo