为Disruptor 写的一个简单实用的.Net扩展

disruptor   用户封装自己的消费者,把消费者注入到消费者容器,消费者容器实现自动创建 缓存队列,生产者;

文中用到的 disruptor   C#移植源代码

https://github.com/bingyang001/disruptor-net-3.3.0-alpha

作者博客 http://www.cnblogs.com/liguo/p/3296166.html

消费者容器:

/// <summary>
    /// 消费者管理器
    /// </summary>
    /// <typeparam name="TProduct">产品</typeparam>
    public class Workers<TProduct> where TProduct : Producer<TProduct>, new()
    {
        private readonly WorkerPool<TProduct> _workerPool;

        public Workers(List<IWorkHandler<TProduct>> handers, IWaitStrategy waitStrategy = null, int bufferSize = 1024*64)
        {
            if (handers == null || handers.Count == 0)
                throw new ArgumentNullException("消费事件处理数组为空!");
            if (handers.Count == 1)
                _ringBuffer = RingBuffer<TProduct>.CreateSingleProducer(() => new TProduct(), bufferSize,
                    waitStrategy ?? new YieldingWaitStrategy());
            else
            {
                _ringBuffer = RingBuffer<TProduct>.CreateMultiProducer(() => new TProduct(), bufferSize,
                    waitStrategy ?? new YieldingWaitStrategy());
            }
            _workerPool = new WorkerPool<TProduct>(_ringBuffer
                , _ringBuffer.NewBarrier()
                , new FatalExceptionHandler()
                , handers.ToArray());
            _ringBuffer.AddGatingSequences(_workerPool.getWorkerSequences());
        }

        public void Start()
        {
            _workerPool.start(TaskScheduler.Default);
        }

        public Producer<TProduct> CreateOneProducer()
        {
            return new Producer<TProduct>(this._ringBuffer);
        }
        public void DrainAndHalt()
        {
            _workerPool.drainAndHalt();
        }

        private readonly RingBuffer<TProduct> _ringBuffer;
    }

生产者(产品): 所有的产品都应该继承自生产者

/// <summary>
    /// 生产者对象
    /// </summary>
    /// <typeparam name="TProduct">产品类型</typeparam>
    public class Producer<TProduct> where TProduct:Producer<TProduct>
    {

        long _sequence;
        private RingBuffer<TProduct> _ringBuffer;
        public Producer()
        {

        }
        public Producer(RingBuffer<TProduct> ringBuffer )
        {
            _ringBuffer = ringBuffer;
        }
        /// <summary>
        /// 获取可修改的产品
        /// </summary>
        /// <returns></returns>
        public Producer<TProduct> Enqueue()
        {
            long sequence = _ringBuffer.Next();
            Producer<TProduct> producer = _ringBuffer[sequence];
            producer._sequence = sequence;
            if (producer._ringBuffer == null)
                producer._ringBuffer = _ringBuffer;
            return producer;
        }
        /// <summary>
        /// 提交产品修改
        /// </summary>
        public void Commit()
        {
            _ringBuffer.Publish(_sequence);
        }
    }

--------------------------------------------------------

以上就实现了,测试代码

先创建 产品对象:

/// <summary>
        /// 产品/继承生产者
        /// </summary>
        public class Product : Producer<Product>
        {
            //产品包含的属下随便定义,无要求,只需要继承自生产者就行了
            public long Value { get; set; }
            public string Guid { get; set; }
        }

创建消费者对象

 /// <summary>
        /// 消费处理对象
        /// </summary>
        public class WorkHandler : IWorkHandler<Product>
        {

            public void OnEvent(Product @event)
            {
                //Test是测试对象数据准确(数据重复或者丢失数据)
                Test.UpdateCacheByOut(@event.Guid);
                //收到产品,在这里写处理代码

            }

        }

测试代码:

可创建1个或者多个的生产者对象,消费者处理对象;不一定太多,多不一定快; 建议生产者创建一个就行了,多线程操作一个生产者对象; 消费者对象可以根据实际情况创建多少个;

           //创建2个消费者,2个生产者, 2个消费者表示,框架会有2个线程去处理消费产品            Workers<Product> workers = new Workers<Product>(
            new List<IWorkHandler<Product>>() {new WorkHandler(), new WorkHandler()});

            Producer<Product> producerWorkers = workers.CreateOneProducer();
            Producer<Product> producerWorkers1 = workers.CreateOneProducer();           //开始消费             workers.Start();

产品生产:

可以在任何引用生产者的地方,把产品放进队列中. 这里 放入队列的方法和平时不太一样.  这里采用的是,从队列里面拿去一个位置,然后把产品放进去; 具体的做法 ,找生产者,获取一个产品对象,然后修改产品属性,最后提交修改.

  var obj = producer.Enqueue();
           //修改产品属性
                obj.Commit();

以上是关键代码:

完整的测试类 : 包含测试数据正确性,  性能,在不校验正确性的时候,每秒ops 1千万左右.

 class Test
    {
        public static long PrePkgInCount = 0;
        public static long PrePkgOutCount = 0;
        public static long PkgInCount = 0;
        public static long PkgOutCount = 0;
        static ConcurrentDictionary<string, string> InCache = new ConcurrentDictionary<string, string>();
        static ConcurrentDictionary<string, string> OutCache = new ConcurrentDictionary<string, string>();
        private static long Seconds;

        static void Main(string[] args)
        {
            Workers<Product> workers = new Workers<Product>(
            new List<IWorkHandler<Product>>() {new WorkHandler(), new WorkHandler()});

            Producer<Product> producerWorkers = workers.CreateOneProducer();
            Producer<Product> producerWorkers1 = workers.CreateOneProducer();

            workers.Start();
            Task.Run(delegate
            {
                while (true)
                {
                    Thread.Sleep(1000);
                    Seconds++;
                    long intemp = PkgInCount;
                    long outemp = PkgOutCount;
                    Console.WriteLine(
                        $"In ops={intemp - PrePkgInCount},out ops={outemp - PrePkgOutCount},inCacheCount={InCache.Count},OutCacheCount={OutCache.Count},RunningTime={Seconds}");
                    PrePkgInCount = intemp;
                    PrePkgOutCount = outemp;
                }

            });
            Task.Run(delegate { Run(producerWorkers); });
            Task.Run(delegate { Run(producerWorkers); });
            Task.Run(delegate { Run(producerWorkers1); });
            Console.Read();

        }

        public static void Run(Producer<Product> producer)
        {
            for (int i = 0; i < int.MaxValue; i++)
            {

                var obj = producer.Enqueue();
                CheckRelease(obj as Product);
                obj.Commit();
            }
        }

        public static  void CheckRelease(Product publisher)
        {
            Interlocked.Increment(ref PkgInCount);
            return; //不检查正确性
            publisher.Guid = Guid.NewGuid().ToString();
            InCache.TryAdd(publisher.Guid, string.Empty);

        }

        public static void UpdateCacheByOut(string guid)
        {
            Interlocked.Increment(ref Test.PkgOutCount);
            if (guid != null)
                if (InCache.ContainsKey(guid))
                {
                    string str;
                    InCache.TryRemove(guid, out str);
                }
                else
                {
                    OutCache.TryAdd(guid, string.Empty);
                }

        }
        /// <summary>
        /// 产品/继承生产者
        /// </summary>
        public class Product : Producer<Product>
        {
            //产品包含的属下随便定义,无要求,只需要继承自生产者就行了
            public long Value { get; set; }
            public string Guid { get; set; }
        }

        /// <summary>
        /// 消费处理对象
        /// </summary>
        public class WorkHandler : IWorkHandler<Product>
        {

            public void OnEvent(Product @event)
            {

                Test.UpdateCacheByOut(@event.Guid);
                //收到产品,在这里写处理代码

            }

        }
    }
时间: 2024-10-23 02:26:43

为Disruptor 写的一个简单实用的.Net扩展的相关文章

虚幻4,BP写了一个简单的三线跑酷工程

BP写了一个简单的三线跑酷 链接: http://pan.baidu.com/s/1jILE4V8 密码: 96ua

开发一个简单实用的android紧急求助软件

之前女朋友一个人住,不怎么放心,想找一个紧急求助的软件,万一有什么突发情况,可以立即知道.用金山手机卫士的手机定位功能可以知道对方的位置状态,但不能主动发送求助信息,在网上了很多的APK,都是鸡肋功能,都需要解锁.并打开软件,真正的紧急情况可能没有时间来完成这一系列操作. 于是我自己做了一个这样的软件,在紧急情况下,连续按电源键5次即可发送求救短信和位置信息给事先指定的用户,这个操作在裤兜里就能完成.原理很简单,就是设置监听器捕获屏幕的开关,在较短的时间内屏幕开关达到一定次数后,触发手机定位,定

java写的一个简单学生管理系统[改进]

用Java写的一个简单学生管理系统 import java.util.*; public class student_cj {  public static void main(String[] args){      Scanner in=new Scanner(System.in);   System.out.print("请输入学生人数:");   int num=in.nextInt();//学生人数   String[] str=new String[num];//结合一行数

写了一个简单的CGI Server

之前看过一些开源程序的源码,也略微知道些Apache的CGI处理程序架构,于是用了一周时间,用C写了一个简单的CGI Server,代码算上头文件,一共1200行左右,难度中等偏上,小伙伴可以仔细看看,对于学生来说,拿来当简历,含金量还是足够的.如果把程序里所涉及的HTTP协议,Linux下POSIX编程等等搞清楚,我想找工作中肯定是有足够的竞争力的,当然我也只是皮毛而已,不再班门弄斧了,下面简单的说下程序流程吧,方便小伙伴们阅读. 程序源代码:戳我 在说程序流程之前,我先简单说下CGI吧,CG

用qt写的一个简单到不能在简单的上位机

学QT时,写的一个简单得不能再简单的串口上位机,用来控制单片机上的2个LED.假设一个是只有开和关的状态.一个可以调节亮度.上位机的界面如下图: 其中,波特率,数据位,停止位下拉值在设计师里面添加.剩下的功能,基本由代码实现.通信使用的协议也是随便写的.很简单和随意.图片是老弟手绘的. 下面贴代码 (*^__^*) #include "mainwindow.h" #include "ui_mainwindow.h" #include <QtSerialPort

写的一个简单定时器(非独立线程)

//Callback.h #ifndef __CALLBACK_H__ #define __CALLBACK_H__ typedef void (*T_CallBack)(void *); typedef struct { T_CallBack cb; void *obj; }ST_CallBack; int __NewTimer(void* obj, int interval, bool isloop, T_CallBack cb); void __DeleteTimer(int handle

写了一个简单可用的IOC

根据<架构探险从零开始写javaweb框架>内容写的一个简单的 IOC 学习记录    只说明了主要的类,从上到下执行的流程,需要分清主次,无法每个类都说明,只是把整个主线流程说清楚,避免陷入细节中.学习过程最大的收获,框架也是人写的,没学过感觉很神秘高端.现在看来大概率是,未知往往觉得是高不可攀.http://naotu.baidu.com/file/6c3da879a4495b6bd369f71dcb726f05?token=ed8c0d49d4ee7bbd 原文地址:https://ww

如何创建一个简单的VS Code扩展

注:本文提到的代码示例下载地址>How to create a simple extension for VS Code VS Code 是微软推出的一款轻量级的代码编辑器,免费,开源,支持多种语言,还能安装各种扩展.没有用过的同学可以下载下来感受一下,具体参见官方文档. 假设VS Code你已经安装好了,也已经大概玩过一遍了.接下来我们就开始讲讲怎么创建一个简单的VS Code扩展. 首先要装下node.js,然后通过命令行安装Yeoman,我们要通过这个工具来自动生成扩展代码: >npm

浮动布局写了一个简单的页面

正在学习的路上...... 这两天写了一个比较简单的页面,主要使用了浮动和定位.左边的属于滚动页面,右边的list属于固定.先上图片: 主要使用了float:left/right. 1.下面是HTML <!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <title></title> <link type="