生产消费模式:多线程读写队列ConcurrentQueue

需求:现需要将多个数据源的数据导入到目标数据库,这是一个经典的生产消费应用的例子。

直接上代码,看下实现:

            // 初始化列队缓冲区 队列大小为100
            IDataCollection<List<T>> queue = new QueueCollection<List<T>>(100);

            //开启X个后台任务,读取RabbitMQ队列信息, 把列队信息插入缓冲区队列
            var count = 1;
            for (int i = 0; i < count; i++)
            {
                Task.Factory.StartNew(() => new Producer<List<T>>(queue).Start(new RabbitSource<List<T>>().Get));
            }

            //开启X个后台任务,主动获取数据库数据,作为数据生产者,插入到缓冲区队列,
            for (int i = 0; i < count; i++)
            {
                Task.Factory.StartNew(() => new Producer<List<T>>(queue).Start(new DatabaseSource<List<T>>().Get));
            }

            //开启X个后台任务,主动获取读取缓冲区列队,作为数据消息者,把数据插入到ES库,
            for (int i = 0; i < count; i++)
            {
                Task.Factory.StartNew(() => new Customer<List<T>>(queue).Start(new Elastic().Insert));
            }

队列我们采用线程安全的ConcurrentQueue队列:

/// <summary>
    /// 缓冲区队列
    /// ConcurrentQueue线程安全,不用考虑锁的问题
    /// </summary>
    public class QueueCollection<T> :IDataCollection<T>
    {
        //队列最大值
        private int _maxSize;

        /// <summary>
        /// 线程安全的队列
        /// </summary>
        private ConcurrentQueue<T> _queue;

        public QueueCollection(int maxSize)
        {
            this._maxSize = maxSize;
            _queue = new ConcurrentQueue<T>();
        }

        public bool isPopWaiting()
        {
            return !_queue.Any();
        }

        public bool isPushWaiting()
        {
            return this._maxSize == _queue.Count;
        }

        public T Pop()
        {
            T _obj = default(T);
            if (!_queue.IsEmpty)
                _queue.TryDequeue(out _obj);

            return _obj;
        }

        public void Push(T t)
        {
            if (this._maxSize > _queue.Count)
            {
                _queue.Enqueue(t);
            }
        }
    }

如果我们不使用这个队列,只要满足IDataCollection接口,也可以进行替换:

public interface IDataCollection<T>
    {
        /// <summary>
        /// 插入数据
        /// </summary>
        /// <param name="t"></param>
        void Push(T t);

        /// <summary>
        /// 取出数据
        /// </summary>
        /// <returns></returns>
        T Pop();

        /// <summary>
        /// 是否插入数据等待
        /// </summary>
        /// <returns></returns>
        bool isPushWaiting();

        /// <summary>
        /// 是否取出数据等待
        /// </summary>
        /// <returns></returns>
        bool isPopWaiting();

    }

生产者:

 public class Producer<T> : ITransientDependency
    {
        private int sleep;

        private IDataCollection<T> bufferQueue;

        public Producer(IDataCollection<T> queue)
        {
            sleep = 3000;
            bufferQueue = queue;
        }

        public void Start(Action<Action<T>> methodCall)
        {
            //入队
            methodCall((bills) =>
            {
                this.Enqueue(bills);
            });
        }

        private void Enqueue(T t)
        {
            var isWaiting = true;

            while (isWaiting)
            {
                if (!bufferQueue.isPushWaiting())
                {
                    this.bufferQueue.Push(t);
                    isWaiting = false;
                }
                else
                {
                    //生产者等待时间
                    Thread.Sleep(sleep);
                }
            }
        }
    }

消费者:

/// <summary>
    /// 消费者
    /// </summary>
    public class Customer<T>
    {
        //产品缓存队列
        private IDataCollection<T> _queue;

        //消费者等待时间
        private int Spead = 5000;//消费者等待时间

        public Customer(IDataCollection<T> queue)
        {
            this._queue = queue;
        }

        public void Start(Action<T> method)
        {
            while (true)
            {
                if (!_queue.isPopWaiting())
                {
                    T box = this._queue.Pop();

                    method(box);
                }
                else
                {
                    Thread.Sleep(Spead);
                }
            }
        }
    }

方法委托,也写了个基类,其实意义并不大,只是为了规范, 防止方法命名随意起。

    public interface IDataSource<T>
    {
        void Get(Action<T> func);
    }

最后,在DataSource的get方法中,调用 func即可。

时间: 2024-11-08 21:19:40

生产消费模式:多线程读写队列ConcurrentQueue的相关文章

使用C#的泛型队列Queue实现生产消费模式

本篇体验使用C#的泛型队列Queue<T>实现生产消费模式. 如果把生产消费想像成自动流水生产线的话,生产就是流水线的物料,消费就是某种设备对物料进行加工的行为,流水线就是队列. 现在,要写一个体现生产消费模式的泛型帮助类,比如叫ProducerConsumer<T>. 该类肯定会维护一个有关生产.物料的Queue<T>类型的字段,还存在一个有关消费.Action<T>类型的字段. 在ProducerConsumer类的构造函数中,为Action<T&

多线程十大经典案例之一 双线程读写队列数据

本文配套程序下载地址为:http://download.csdn.net/detail/morewindows/5136035 转载请标明出处,原文地址:http://blog.csdn.net/morewindows/article/details/8646902 欢迎关注微博:http://weibo.com/MoreWindows 在<秒杀多线程系列>的前十五篇中介绍多线程的相关概念,多线程同步互斥问题<秒杀多线程第四篇一个经典的多线程同步问题>及解决多线程同步互斥的常用方法

秒杀多线程第十六篇 多线程十大经典案例之一 双线程读写队列数据

版权声明:本文为博主原创文章,未经博主允许不得转载. 目录(?)[+] 本文配套程序下载地址为:http://download.csdn.net/detail/morewindows/5136035 转载请标明出处,原文地址:http://blog.csdn.net/morewindows/article/details/8646902 欢迎关注微博:http://weibo.com/MoreWindows 在<秒杀多线程系列>的前十五篇中介绍多线程的相关概念,多线程同步互斥问题<秒杀多

多线程面试题系列(16):多线程十大经典案例之一 双线程读写队列数据

前十五篇中介绍多线程的相关概念,多线程同步互斥问题(第四篇)及解决多线程同步互斥的常用方法--关键段.事件.互斥量.信号量.读写锁.为了让大家更加熟练运用多线程,将会有十篇文章来讲解十个多线程使用案例,相信看完这十篇后会让你能更加游刃有余的使用多线程. 首先来看第一篇--第十六篇 多线程十大经典案例之一 双线程读写队列数据 <多线程十大经典案例之一双线程读写队列数据>案例描述: MFC对话框中一个按钮的响应函数实现两个功能:显示数据同时处理数据,因此开两个线程,一个线程显示数据(开了一个定时器

Java的多线程实现生产/消费模式

Java的多线程实现生产/消费模式 在Java的多线程中,我们经常使用某个Java对象的wait(),notify()以及notifyAll() 方法实现多线程的通讯,今天就使用Java的多线程实现生产/消费模式,需求如下: 线程A ProductThread 继承Thread 实现生产数据 若线程共享的数据不为NULL,则生产线程进入等待状态 线程B CustomThread 继承Thread 实现消费数据(输出到控制台) 当线程共享数据为NULL的时候,进入等待状态 线程B 消费完数据之后,

异步简析之BlockingCollection实现生产消费模式

目前市面上有诸多的产品实现队列功能,比如Redis.MemCache等... 其实c#中也有一个基础的集合类专门用来实现生产/消费模式 (生产模式还是建议使用Redis等产品) 下面是官方的一些资料和介绍: BlockingCollection是一个线程安全集合类,可提供以下功能: 实现制造者-使用者模式. 通过多线程并发添加和获取项. 可选最大容量. 集合为空或已满时通过插入和移除操作进行阻塞. 插入和移除"尝试"操作不发生阻塞,或在指定时间段内发生阻塞. 封装实现 IProduce

PHP 高级编程之多线程-消息队列

Home  |  Mirror  |  Search  |  杂文  |  ITEYE 博客  |  OSChina 博客  |  51CTO 博客  |  Linkedin PHP 高级编程之多线程 http://netkiller.github.io/journal/thread.php.html Mr. Neo Chen (netkiller), 陈景峰(BG7NYT) 中国广东省深圳市龙华新区民治街道溪山美地 518131 +86 13113668890 +86 755 29812080

IOS多线程及队列的使用

IOS多线程及队列的使用 分类: ios多线程2013-12-11 17:56 1898人阅读 评论(0) 收藏 举报 多线程 最近搞一款塔防游戏,提到塔防,自然就想到了A星寻路.的确,它是一种高效的寻路算法.但当很多怪物同时在调用A星算法来寻找一条最近的路径来到达目的地时,我发现会很卡.我都不能接受这个卡屏,更何况是玩家呢.所有我一直都在努力去优化A星算法.虽然有所改善,但卡的问题还是存在.实在没辙了,我想到了队列线程.之前都没接触过这个东东,还好在网上找到很详细的线程介绍.当然,我只是用到了

c# 多线程排队队列实现的源码

[csharp] view plaincopy using System; using System.Threading; using System.Collections; using System.Collections.Generic; // 将线程同步事件封装在此类中, // 以便于将这些事件传递给 Consumer 和 // Producer 类. public class SyncEvents { public SyncEvents() { // AutoResetEvent 用于"