线程队列(版本1+版本2)

版本1.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace ClassLibrary1
{
    public class BaseThread
    {
        public event Action<object> ThreadAction;
        public event Action<object> CompeletedAction;
        public object Parameter { get; set; }
        public void Run()
        {
            ThreadAction?.Invoke(Parameter);
            CompeletedAction?.Invoke(Parameter);
        }

    }
    public class ThreadQueue
    {
        public readonly object lockObj = new object();
        public Queue<BaseThread> Queue;
        public ThreadQueue()
        {
            Queue = new Queue<BaseThread>();
        }
        public ThreadQueue(int poolSize)
        {
            Queue = new Queue<BaseThread>(poolSize);
        }

        public bool IsEmpty
        {
            get
            {
                return Queue.Count == 0;
            }
        }
        public void AddQueueItem(BaseThread item)
        {
            Queue.Enqueue(item);
            ThreadPool.QueueUserWorkItem(DoWork, null);
        }

        private void DoWork(object state)
        {
            while (true)
            {
                BaseThread item = null;

                lock (lockObj)
                {

                    if (Queue.Count > 0)
                        item = Queue.Dequeue();

                    else
                    {
                        return;
                    }
                }
                ThreadPool.QueueUserWorkItem(obj =>
                {
                    (obj as BaseThread).Run();
                }, item);

            }
        }

    }
    public class Program
    {

        public static void Main()
        {
            ThreadQueue workQueue = new ThreadQueue(100);
            for (int i = 1; i<=100; i++)
            {
                BaseThread th = new BaseThread();
                th.ThreadAction += a =>
                {

                };
                th.CompeletedAction += b =>
                {
                    Console.Write("队列a执行完成:" + b.ToString() + "\r\n");
                };
                th.Parameter = i;
                workQueue.AddQueueItem(th);
            }
            while (!workQueue.IsEmpty)
            {
                Console.Write("队列正在执行");
                Thread.Sleep(1000);
            }
            Console.Write("队列执行完必");
            Console.ReadKey();
        }
    }
}
2.版本2
using System.Collections.Generic;
using System;
using System.Threading;

/***********多线程的工作队列***************
 * 此工作队列保证线程安全性
 *
 *
 *
 *
 * *******/
namespace WorkQueue
{
    public delegate void RunThread(object threaParameter);
    public delegate void QueueException(Exception ex);
    /// <summary>
    /// 队列用到的实体
    /// </summary>
    public class QueueThreadEntity
    {
        /// <summary>
        /// 队列中每个元素执行的方法
        /// </summary>
        public event RunThread StartThreadHandle;
        /// <summary>
        /// 每个元素执行完毕事件
        /// </summary>
        public event RunThread EndThreadHandle;
        /// <summary>
        /// 执行异常回调函数
        /// </summary>
        public event QueueException EndExceptionHandle;
        /// <summary>
        /// 执行异常函数
        /// </summary>
        public event QueueException StartExceptionHandle;

        /// <summary>
        /// 线程参数
        /// </summary>
        public object ThreaParameter
        {
            get;
            set;
        }
        internal void Run()
        {
            try
            {
                StartThreadHandle(ThreaParameter);
            }
            catch (Exception ex)
            {
                if (StartExceptionHandle != null)
                {
                    StartExceptionHandle(ex);
                }
                else
                {
                    throw ex;
                }
            }
            if (EndThreadHandle != null)
            {
                try
                {
                    EndThreadHandle(ThreaParameter);
                }
                catch (Exception ex)
                {
                    if (EndExceptionHandle != null)
                    {
                        EndExceptionHandle(ex);
                    }
                    else
                    {
                        throw ex;
                    }
                }
            }
        }
    }
    public class WorkThreadQueue
    {
        WorkQueue<QueueThreadEntity> wk = null;
        /// <summary>
        ///
        /// </summary>
        /// <param name="poolSize">线程池的大小</param>
        public WorkThreadQueue(int poolSize)
        {
            wk = new WorkQueue<QueueThreadEntity>(poolSize);
            wk.UserWork += workQueue_UserWork;
        }
        public void AddQueue(QueueThreadEntity entity)
        {
            wk.EnqueueItem(entity);
        }
        private void workQueue_UserWork(object sender, WorkQueue<QueueThreadEntity>.EnqueueEventArgs e)
        {
            var item = e.Item;
            item.Run();
        }
        /// <summary>
        /// 队列处理是否需要单线程顺序执行
        /// ture表示单线程处理队列的T对象
        /// 默认为false,表明按照顺序出队,但是多线程处理item
        /// *****注意不要频繁改变此项****
        /// </summary>
        public bool IsOneThread
        {

            get
            {
                return wk.IsOneThread;
            }
            set
            {
                wk.IsOneThread = value;
            }
        }
        /// <summary>
        /// 判断队列是否为空
        /// </summary>
        /// <returns></returns>
        public bool IsEmpty()
        {
            return wk.IsEmpty();
        }

    }
    public delegate void UserWorkEventHandler<T>(object sender, WorkQueue<T>.EnqueueEventArgs e);
    public class WorkQueue<T>
    {
        private bool IsWorking; //表明处理线程是否正在工作
        private object lockIsWorking = new object();//对IsWorking的同步对象

        private Queue<T> queue; //实际的队列
        private object lockObj = new object(); //队列同步对象

        /// <summary>
        /// 绑定用户需要对队列中的item对象
        /// 施加的操作的事件
        /// </summary>
        public event UserWorkEventHandler<T> UserWork;

        public WorkQueue(int n)
        {
            queue = new Queue<T>(n);
        }

        public WorkQueue()
        {
            queue = new Queue<T>();
        }

        /// <summary>
        /// 谨慎使用此函数,
        /// 只保证此瞬间,队列值为空
        /// </summary>
        /// <returns></returns>
        public bool IsEmpty()
        {
            lock (lockObj)
            {
                return queue.Count == 0;
            }
        }

        private bool isOneThread;

        /// <summary>
        /// 队列处理是否需要单线程顺序执行
        /// ture表示单线程处理队列的T对象
        /// 默认为false,表明按照顺序出队,但是多线程处理item
        /// *****注意不要频繁改变此项****
        /// </summary>
        public bool IsOneThread
        {
            get
            {
                return isOneThread;
            }
            set
            {
                isOneThread = value;
            }

        }

        /// <summary>
        /// 向工作队列添加对象,
        /// 对象添加以后,如果已经绑定工作的事件
        /// 会触发事件处理程序,对item对象进行处理
        /// </summary>
        /// <param name="item">添加到队列的对象</param>
        public void EnqueueItem(T item)
        {
            lock (lockObj)
            {
                queue.Enqueue(item);
            }

            lock (lockIsWorking)
            {
                //if (!IsWorking)
                {
                    //IsWorking = true;
                    ThreadPool.QueueUserWorkItem(doUserWork);
                }
            }
        }
        /// <summary>
        /// 处理队列中对象的函数
        /// </summary>
        /// <param name="o"></param>
        private void doUserWork(object o)
        {
            try
            {
                T item;

                while (true)
                {
                    lock (lockObj)
                    {
                        if (queue.Count > 0)
                        {
                            item = queue.Dequeue();
                        }
                        else
                        {
                            return;
                        }
                    }
                    if (!item.Equals(default(T)))
                    {

                        if (isOneThread)
                        {
                            UserWork?.Invoke(this, new EnqueueEventArgs(item));
                        }
                        else
                        {
                            int i = 0;
                            ThreadPool.QueueUserWorkItem(obj =>
                            {
                                UserWork?.Invoke(this, new EnqueueEventArgs(obj));
                            }, item);
                        }

                    }

                }
            }
            finally
            {
                lock (lockIsWorking)
                {
                    IsWorking = false;
                }

            }
        }

        /// <summary>
        /// UserWork事件的参数,包含item对象
        /// </summary>
        public class EnqueueEventArgs : EventArgs
        {
            public T Item { get; private set; }
            public EnqueueEventArgs(object item)
            {
                try
                {
                    Item = (T)item;
                }
                catch (Exception)
                {

                    throw new InvalidCastException("object to T 转换失败");
                }
            }
        }
    }
}
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using WorkQueue;

namespace ConsoleApplication47
{
    class Program
    {
        static void Main(string[] args)
        {//实例并设置线程池的大小
            WorkThreadQueue queue = new WorkThreadQueue(10);

            for (var i = 1; i <= 100; i++)
            {
                //队列中的线程对象
                QueueThreadEntity entity = new QueueThreadEntity();
                entity.ThreaParameter = i;
                //队列中每个元素执行的函数
                entity.StartThreadHandle += (a) =>
                {

                    //Console.Write("当前参数为:" + a.ToString() + "\r\n");
                };
                //队列中每个元素执行完毕的函数
                entity.EndThreadHandle += (a) =>
                {
                    Console.Write("队列a执行完成:" + a.ToString() + "\r\n");
                };
                //将线程插入到队列
                queue.AddQueue(entity);
            }
            while (!queue.IsEmpty())
            {
                Console.Write("队列正在执行");
                Thread.Sleep(1000);
            }
            Console.Write("队列执行完必");

            Console.ReadKey();
        }
    }
}
时间: 2024-10-16 02:52:18

线程队列(版本1+版本2)的相关文章

转:Windows下的PHP开发环境搭建——PHP线程安全与非线程安全、Apache版本选择,及详解五种运行模式。

原文来自于:http://www.ituring.com.cn/article/128439 Windows下的PHP开发环境搭建——PHP线程安全与非线程安全.Apache版本选择,及详解五种运行模式. 今天为在Windows下建立PHP开发环境,在考虑下载何种PHP版本时,遭遇一些让我困惑的情况,为了解决这些困惑,不出意料地牵扯出更多让我困惑的问题. 为了将这些困惑一网打尽,我花了一下午加一晚上的时间查阅了大量资料,并做了一番实验后,终于把这些困惑全都搞得清清楚楚了. 说实话,之所以花了这么

11.python并发入门(part7 线程队列)

一.为什么要用队列? 队列是一种数据结构,数据结构是一种存放数据的容器,和列表,元祖,字典一样,这些都属于数据结构. 队列可以做的事情,列表都可以做,但是为什么我们还要去使用队列呢? 这是因为在多线程的情况下,列表是一种不安全的数据结构. 为什么不安全?可以看下面这个例子: #开启两个线程,这两个线程并发从列表中移除一个元素. import threading import time l1 = [1,2,3,4,5] def pri(): while l1: a = l1[-1] print a

高版本-&gt;低版本迁移,低版本客户端连接高版本数据库EXP导出报错EXP-00008,ORA-01455,EXP-00000

生产环境: 源数据库:RHEL + Oracle 11.2.0.3 目标数据库:HP-UX + Oracle 10.2.0.4 需求:迁移部分表  11.2.0.3-->10.2.0.4,若迁移范围内的有些表在目标库已经存在,则替换. 本次迁移数据量<1G. 初定方案:低版本的客户端连接到高版本数据库,用低版本导出,低版本导入. 1.采用初定方案,目标数据库所在服务器连接到源数据库,exp导出过程中报错. ZJCRNOPDB 36: sqlplus -version SQL*Plus: Rel

ANDROID PAD版本 PHONE版本 源码有什么 区别?

ANDROID PAD版本 PHONE版本 源码有什么 区别? 直接把frameworks/base/core/res/res/values/config.xml里面的<bool name="config_voice_capable">false</bool>改为false

关于消息推送的补充,主要介绍服务端的实现,包含object c 版本 c 版本 java 版本 php 版本 (转)

要实现消息推送功能,我们可以采用第三方(腾讯:信鸽:百度:云推送:极光推送:友盟):当然,因为各种原因,我们不能使用第三方的推送服务,那我们就需要自己编写服务端.在网上寻觅了很久,找到一篇很不错的讲解消息推送的文章,包含(object c 版本 c 版本 java 版本 php 版本)的后端实现,分享之. 原文地址:http://tanqisen.github.io/blog/2013/02/27/ios-push-apns/ 一步一步实现iOS应用PUSH功能 FEB 27TH, 2013 1

线程队列

在web应用中,单个进程或者机器的响应速度有限,类似大量数据导入导出的操作的数量如果不加限制,会导致服务器cpu被吃满,导致其他一些很简单的请求无法及时响应的问题.针对这个限制提出了如下要求.1. 先到达的请求先执行: 先入先出原则2. 只能同时执行若干请求:避免cpu被吃满3. 异步执行:如果长时间执行会长期占用iis的工作线程 基于上述的要求我设计了一个队列.这个队列我们需要稍微提一个组件,ParallelExtensionsExtras 这是微软提供的一个线程的扩展,具体的自行搜索下相关资

python全栈开发 * 线程队列 线程池 协程 * 180731

一.线程队列 队列:1.Queue 先进先出 自带锁 数据安全 from queue import Queue from multiprocessing import Queue (IPC队列)2.LifoQueue后进先出 后进先出 自带锁 数据安全 from queue import LifoQueue lq=LifoQueue(5) lq.put(123) lq.put(666) lq.put(888) lq.put(999) lq.put("love") print(lq.pu

线程队列 线程池 协程

1 . 线程队列 from multiprocessing Queue , JoinableQueue  #进程IPC队列 from queue import Queue  #线程队列  先进先出 from queue import LifoQueue  #后进先出的 方法都是一样的 : put , get , put_nowait , get_nowait , full , empty , qsize 队列 Queue : 先进先出 , 自带锁 , 数据安全 栈 LifoQueue : 后进先

python 线程队列,线程池

一. 线程队列 引入线程队列 : import queue #和普通队列引入方法相同 线程队列方法 : q = queue.Queue() #实例化对列,先进先出 q = queue.LifoQueue() #实例化队列,后进先出 ( Last in, first out ) q = queue.PriorityQueue() #实例化队列,优先级队列 优先级队列,put() 方法接收的是一个元组,第一个元素是优先级,第二个元素是数据 优先级如果为数字,按照数字大小比较 如果优先级是字符串或特殊

Python 线程----线程方法,线程事件,线程队列,线程池,GIL锁,协程,Greenlet

主要内容: 线程的一些其他方法 线程事件 线程队列 线程池 GIL锁 协程 Greenlet Gevent 一. 线程(threading)的一些其他方法 from threading import Thread import threading import time def work(): time.sleep(1) print("子线程对象>>>", threading.current_thread()) # 子线程对象 print("子线程名称>