自动化控制之线程池的使用

昨天反编译看了公司项目首席架构师实现的线程池。非常之惊讶,在没有.net 4可用的年代。那思想和那技术是相当的可以啊。佩服。

这里说的线程池是一个类我总觉得这样叫有点不名副其实。其实就是一个类内部实现了FIFO队列,把临时数据放到这个队列里,“线程池类”内容按照入队的先后次序触发一个负责解析校验等的事件,并且把数据传递个这个事件。

好了,上代码:

    /// <summary>
    /// 自定义线程池类,不依赖.net Queue实现了先进先出处理队列里数据
    /// </summary>
    public class CoreThreadPool : IDisposable
    {
        /// <summary>
        /// 队列元素申明
        /// </summary>
        [StructLayout(LayoutKind.Sequential)]
        private class PoolData
        {
            /// <summary>
            /// 外部要求放入队列的数据
            /// </summary>
            public object Data;
            /// <summary>
            /// 需要执行的命令(Exit/Command(自定义))
            /// </summary>
            public CoreThreadPool.PoolCommand Command;
            public PoolData()
            {
                this.Command = CoreThreadPool.PoolCommand.Exit;
            }
            public PoolData(object data)
            {
                this.Data = data;
                this.Command = CoreThreadPool.PoolCommand.Command;
            }
            public PoolData(CoreThreadPool.PoolCommand cmd)
            {
                this.Command = cmd;
            }
        }
        protected enum PoolCommand
        {
            Command,
            Exit
        }
        protected SafeFileHandle complatePort;
        /// <summary>
        /// 线程池主线程
        /// </summary>
        protected Thread thread;
        protected volatile bool isOpened;
        [method: CompilerGenerated]
        [CompilerGenerated]
        public event Action<object> Exceute;
        [method: CompilerGenerated]
        [CompilerGenerated]
        public event Action<object> ExitExceute;
        /// <summary>
        /// 线程池是否正在运行
        /// </summary>
        public bool IsOpened
        {
            get
            {
                return this.isOpened;
            }
            set
            {
                this.isOpened = value;
            }
        }
        [DllImport("kernel32.dll", CharSet = CharSet.Auto, SetLastError = true)]
        private static extern SafeFileHandle CreateIoCompletionPort(IntPtr FileHandle, IntPtr ExistingCompletionPort, IntPtr CompletionKey, uint NumberOfConcurrentThreads);
        [DllImport("kernel32.dll", CharSet = CharSet.Auto, SetLastError = true)]
        private static extern bool GetQueuedCompletionStatus(SafeFileHandle CompletionPort, out uint lpNumberOfBytesTransferred, out IntPtr lpCompletionKey, out IntPtr lpOverlapped, uint dwMilliseconds);
        [DllImport("Kernel32", CharSet = CharSet.Auto)]
        private static extern bool PostQueuedCompletionStatus(SafeFileHandle CompletionPort, uint dwNumberOfBytesTransferred, IntPtr dwCompletionKey, IntPtr lpOverlapped);
        /// <summary>
        /// 启动线程池的主线程
        /// </summary>
        public void Start()
        {
            isOpened = true;
            if (thread != null)
            {
                throw new Exception("线程池已经是启动状态!");
            }
            complatePort = CreateIoCompletionPort(new IntPtr(-1), IntPtr.Zero, IntPtr.Zero, 0u);
            if (complatePort.IsInvalid)
            {
                throw new Exception(string.Format("创建IOCP出错!原因是:{0}", Marshal.GetLastWin32Error().ToString()));
            }
            thread = new Thread(new ParameterizedThreadStart(this.Run));
            thread.Start(complatePort);
        }
        /// <summary>
        /// 外部提交数据对象到队列
        /// </summary>
        /// <param name="data"></param>
        public void Post(object data)
        {
            PostData(new CoreThreadPool.PoolData(data));
        }
        /// <summary>
        /// 线程池主线程执行逻辑
        /// </summary>
        /// <param name="CompletionPortID"></param>
        private void Run(object CompletionPortID)
        {
            SafeFileHandle completionPort = (SafeFileHandle)CompletionPortID;
            while (IsOpened)
            {
                uint num;
                IntPtr intPtr;
                IntPtr value;
                //从队列里取出最前面的对象
                CoreThreadPool.GetQueuedCompletionStatus(completionPort, out num, out intPtr, out value, 4294967295u);
                if (num > 0u)
                {
                    GCHandle gCHandle = GCHandle.FromIntPtr(value);
                    CoreThreadPool.PoolData poolData = (CoreThreadPool.PoolData)gCHandle.Target;
                    gCHandle.Free();
                    if (poolData.Command != CoreThreadPool.PoolCommand.Command)
                    {
                        IsOpened = false;
                        break;
                    }
                    RaiseExecute(poolData.Data);
                }
            }
            RaiseExitExecute("线程池已经停止。");
            isOpened = false;
            thread = null;
        }
        /// <summary>
        /// 触发Execute事件
        /// </summary>
        /// <param name="data"></param>
        private void RaiseExecute(object data)
        {
            Exceute?.Invoke(data);
        }
        /// <summary>
        /// 触发ExitExecute事件
        /// </summary>
        /// <param name="data"></param>
        private void RaiseExitExecute(object data)
        {
            ExitExceute?.Invoke(data);
        }
        /// <summary>
        /// 结束线程池主线程
        /// </summary>
        public void Stop()
        {
            PostData(new PoolData(PoolCommand.Exit));
            IsOpened = false;
        }
        /// <summary>
        /// 内部提交数据到线程池队列中
        /// </summary>
        /// <param name="data"></param>
        private void PostData(PoolData data)
        {
            if (complatePort.IsClosed)
            {
                return;
            }
            GCHandle value = GCHandle.Alloc(data);
            PostQueuedCompletionStatus(complatePort, (uint)IntPtr.Size, IntPtr.Zero, GCHandle.ToIntPtr(value));
        }
        public void Dispose()
        {
            if (this.thread != null && this.thread.ThreadState != System.Threading.ThreadState.Stopped)
            {
                this.Stop();
            }
        }
    }

  代码亮点:队列是使用操作系统的,使用windows api实现的。牛吧。

由于现在项目已经依赖.net 4了。于是进行了模仿,经过一番测试,发现差不多,不过还是觉得在多线程环境下使用 ConcurrentQueue会更好些呢。

 /// <summary>
    /// 自定义线程池类,使用ConcurrentQueue实现了先进先出处理队列里数据
    /// </summary>
    public class CoolThreadPool<T>
    {
        protected ConcurrentQueue<T> queue = new ConcurrentQueue<T>();
        protected Thread thread;
        private volatile bool isOpened;
        public bool IsOpened
        {
            get
            {
                return isOpened;
            }
        }

        public event Action<T> Exceute;
        public event Action StopedExceute;
        /// <summary>
        /// 启动线程池的主线程
        /// </summary>
        public void Start()
        {
            if (thread != null)
            {
                throw new Exception("线程池已经是启动状态!");
            }
            thread = new Thread(Run);
            thread.Start();
            isOpened = thread != null;
        }

        /// <summary>
        /// 线程池主线程执行逻辑
        /// </summary>
        private void Run()
        {
            while (isOpened)
            {
                T temp;
                queue.TryDequeue(out temp);
                if (temp != null)
                {
                    RaiseExecute(temp);
                }
                else break;
            }
            isOpened = false;
            thread = null;
            RaiseStopedExceute();
        }

        /// <summary>
        /// 触发Execute事件
        /// </summary>
        /// <param name="data"></param>
        private void RaiseExecute(T data)
        {
            Exceute?.Invoke(data);
        }

        /// <summary>
        /// 触发停止Execute事件
        /// </summary>
        /// <param name="data"></param>
        private void RaiseStopedExceute()
        {
            StopedExceute?.Invoke();
        }

        /// <summary>
        /// 结束线程池主线程
        /// </summary>
        public void Stop()
        {
            PostData(default(T));
            isOpened = false;
        }

        /// <summary>
        /// 外部提交数据对象到队列
        /// </summary>
        /// <param name="data"></param>
        public void Post(T data)
        {
            PostData(data);
        }

        /// <summary>
        /// 内部提交数据到线程池队列中
        /// </summary>
        /// <param name="data"></param>
        private void PostData(T data)
        {
            queue.Enqueue(data);
        }

        public void Dispose()
        {
            if (this.thread != null && this.thread.ThreadState != System.Threading.ThreadState.Stopped)
            {
                this.Stop();
            }
        }
    }

测试代码:

string[] temp = new string[]
            {
                "This is 1.",
                "This is 2.",
                "This is 3.",
                "This is 4.",
                "This is 5.",
                "This is 6.",
                "This is 7.",
                "This is 8.",
                "This is 9.",
                "This is 10.",
                "This is 11.",
                "This is 12."
            };
            pool.Exceute += Pool_Exceute1;
            pool.Start();
            foreach (string data in temp)
            {
                pool.Post(data);
            }

  

时间: 2024-10-29 22:35:13

自动化控制之线程池的使用的相关文章

控制每次线程池的并发线程的最大个数

[本人原创],欢迎交流和分形技术,转载请附上如下内容: 作者:itshare [转自]http://www.cnblogs.com/itshare/ 1. 实验目的:       使用线程池的时候,有时候需要考虑服务器的最大线程数目和程序最快执行所有业务逻辑的取舍.并非逻辑线程越多也好,而且新的逻辑线程必须会在线程池的等待队列中等待 ,直到线程池中工作的线程执行完毕,才会有系统线程取出等待队列中的逻辑线程,进行CPU运算. 2.  解决问题:     <a>如果不考虑服务器实际可支持的最大并行

线程的控制和线程池

一.WaitHandle: ”.Net 中提供了一些线程间更自由通讯的工具,他们提供了通过"信号"进行通讯的机制 可以通过ManualResetEvent,AutoResetEvent(他是在开门并且一个 WaitOne 通过后自动关门)来进行线程间的通讯 waitOne:    等待开门 Set:           开门 Reset:       关门 static void Main(string[] args) { ManualResetEvent mre = new Manu

java使用默认线程池踩过的坑(二)

云智慧(北京)科技有限公司 陈鑫 是的.一个线程不可以启动两次.那么它是怎么推断的呢? public synchronized void start() { /** * A zero status valuecorresponds to state "NEW". 0相应的是state NEW */ if (threadStatus!= 0) //假设不是NEW state,就直接抛出异常! throw newIllegalThreadStateException(); group.ad

通过ThreadPoolExecutor源码分析线程池实现原理

为什么要用线程池 线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性.使用线程池可以重复利用已创建的线程降低线程创建和销毁带来的消耗,随之即可提高响应速度(当一个任务到达时,不需要重新创建线程来为之服务,重用已有线程),还可以通过线程池控制线程资源统一分配和监控等. 线程池工厂Executors JDK 提供了创建线程池的工厂类 Executors,该类提供了创建线程池的静态方法: public static ExecutorService newFixedThreadP

Java并发程序设计(6)线程池之线程数量的控制

1.1. ExecutorService ExecutorService是线程池的接口. Executors是用于创建不同线程池的工具类. 1.2. 线程数量固定的线程池 ExecutorService executorService = Executors.newFixedThreadPool(2); for(int j=0;j<10;j++){ final int t = j; executorService.execute( new Runnable(){ @Override public

java线程池的自带监控,线程池返回值的控制

当使用java线程池的时候,返回值类型用future<T> 来接收,比如: Future<String> future = fixedThreadPool.submit(new Thread());返回的结果必须由线程执行完毕后才会返回,我们可以利用这一点进行线程堵塞,监控线程的结束时间. package com.test; import java.util.ArrayList; import java.util.List; import java.util.concurrent.

记5.28大促压测的性能优化&mdash;线程池相关问题

目录: 1.环境介绍 2.症状 3.诊断 4.结论 5.解决 6.对比java实现 废话就不多说了,本文分享下博主在5.28大促压测期间解决的一个性能问题,觉得这个还是比较有意思的,值得总结拿出来分享下. 博主所服务的部门是作为公共业务平台,公共业务平台支持上层所有业务系统(2C.UGC.直播等).平台中核心之一的就是订单域相关服务,下单服务.查单服务.支付回调服务,当然结算页暂时还是我们负责,结算页负责承上启下进行下单.结算.跳支付中心.每次业务方进行大促期间平台都要进行一次常规压测,做到心里

记5.28大促压测的性能优化—线程池相关问题

目录: 1.环境介绍 2.症状 3.诊断 4.结论 5.解决 6.对比java实现 废话就不多说了,本文分享下博主在5.28大促压测期间解决的一个性能问题,觉得这个还是比较有意思的,值得总结拿出来分享下. 博主所服务的部门是作为公共业务平台,公共业务平台支持上层所有业务系统(2C.UGC.直播等).平台中核心之一的就是订单域相关服务,下单服务.查单服务.支付回调服务,当然结算页暂时还是我们负责,结算页负责承上启下进行下单.结算.跳支付中心.每次业务方进行大促期间平台都要进行一次常规压测,做到心里

13 join 线程锁之Lock\Rlock\信号量 将线程变为守护进程 Event事件  queue队列 生产者消费者模型 Queue队列 开发一个线程池

本节内容 操作系统发展史介绍 进程.与线程区别 python GIL全局解释器锁 线程 语法 join 线程锁之Lock\Rlock\信号量 将线程变为守护进程 Event事件 queue队列 生产者消费者模型 Queue队列 开发一个线程池 进程 语法 进程间通讯 进程池 操作系统发展史 手工操作(无操作系统) 1946年第一台计算机诞生--20世纪50年代中期,还未出现操作系统,计算机工作采用手工操作方式. 手工操作程序员将对应于程序和数据的已穿孔的纸带(或卡片)装入输入机,然后启动输入机把