一个C#多线程的工作队列

多线程添加元素到队列中,队列根据绑定

的事件进行自动处理,可以设置WorkSequential属性来实现对队列处理的单线程(严格顺序处理)或者多线程处理(循序出队,但是

多线程处理,不保证对队列元素的处理顺利)的选择。

代码

/***********多线程的工作队列***************
 * 此工作队列保证线程安全性
 *
 *
 *
 *
 * *******/
namespace WorkQueue
{
    using System.Collections.Generic;
    using System;
    using System.Threading;

    public delegate void UserWorkEventHandler<T>(object sender, WorkQueue<T>.EnqueueEventArgs e);
    public class WorkQueue<T>
    {
        private bool IsWorking; //表明处理线程是否正在工作
        private object lockIsWorking = new object();//对IsWorking的同步对象

        private Queue<T> queue; //实际的队列
        private object lockObj = new object(); //队列同步对象

        /// <summary>
        /// 绑定用户需要对队列中的item对象
        /// 施加的操作的事件
        /// </summary>
        public event UserWorkEventHandler<T> UserWork;

        public WorkQueue(int n)
        {
            queue = new Queue<T>(n);
        }

        public WorkQueue()
        {
            queue = new Queue<T>();
        }

        /// <summary>
        /// 谨慎使用此函数,
        /// 只保证此瞬间,队列值为空
        /// </summary>
        /// <returns></returns>
        public bool IsEmpty()
        {
            lock (lockObj)
            {
                return queue.Count == 0;
            }
        }

        private bool isOneThread;

        /// <summary>
        /// 队列处理是否需要单线程顺序执行
        /// ture表示单线程处理队列的T对象
        /// 默认为false,表明按照顺序出队,但是多线程处理item
        /// *****注意不要频繁改变此项****
        /// </summary>
        public bool WorkSequential
        {
            get
            {
                return isOneThread;
            }
            set
            {
                isOneThread = value;
            }

        }

        /// <summary>
        /// 向工作队列添加对象,
        /// 对象添加以后,如果已经绑定工作的事件
        /// 会触发事件处理程序,对item对象进行处理
        /// </summary>
        /// <param name="item">添加到队列的对象</param>
        public void EnqueueItem(T item)
        {
            lock (lockObj)
            {
                queue.Enqueue(item);
            }

            lock (lockIsWorking)
            {
                if (!IsWorking)
                {
                    IsWorking = true;
                    ThreadPool.QueueUserWorkItem(doUserWork);
                }
            }

        }

        /// <summary>
        /// 处理队列中对象的函数
        /// </summary>
        /// <param name="o"></param>
        private void doUserWork(object o)
        {
            try
            {
                T item;

                while (true)
                {
                    lock (lockObj)
                    {
                        if (queue.Count > 0)
                        {
                            item = queue.Dequeue();
                        }
                        else
                        {
                            return;
                        }
                    }
                    if (!item.Equals(default(T)))
                    {

                        if (isOneThread)
                        {
                            if (UserWork != null)
                            {
                                UserWork(this, new EnqueueEventArgs(item));
                            }
                        }
                        else
                        {
                            ThreadPool.QueueUserWorkItem(obj =>
                                                             {
                                                                 if (UserWork != null)
                                                                 {
                                                                     UserWork(this, new EnqueueEventArgs(obj));
                                                                 }
                                                             }, item);
                        }

                    }

                }
            }
            finally
            {
                lock (lockIsWorking)
                {
                    IsWorking = false;
                }

            }
        }

        /// <summary>
        /// UserWork事件的参数,包含item对象
        /// </summary>
        public class EnqueueEventArgs : EventArgs
        {
            public T Item { get; private set; }
            public EnqueueEventArgs(object item)
            {
                try
                {
                    Item = (T)item;
                }
                catch (Exception)
                {

                    throw new InvalidCastException("object to T 转换失败");
                }
            }
        }
    }
}
代码

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.IO;
using WorkQueue;
namespace Program
{
    class Program
    {
        private static List<string> list=new List<string>(1000);
        static  StreamWriter sw = new StreamWriter(new FileStream("test.dat", FileMode.Create));
        static void Main(string[] args)
        {
            WorkQueue<int> workQueue=new WorkQueue<int>(1000);
            workQueue.UserWork += new UserWorkEventHandler<int>(workQueue_UserWork);
           // workQueue.WorkSequential = true;
            ThreadPool.QueueUserWorkItem(o =>
                                             {
                                                 for (int i = 0; i < 1000; i++)
                                                 {
                                                     workQueue.EnqueueItem(i);

                                                 }
                                             });
            Console.ReadLine();

            list.ForEach(str=>sw.WriteLine(str));
            Console.WriteLine(workQueue.IsEmpty());
            sw.Close();
        }

        static void workQueue_UserWork(object sender, WorkQueue<int>.EnqueueEventArgs e)
        {

            StringBuilder sb=new StringBuilder();
            sb.Append(e.Item).Append("\t\t").Append(DateTime.Now.ToString("u")+"\t\t").Append(Thread.CurrentThread.ManagedThreadId);
            list.Add(sb.ToString());
            Thread.Sleep(15);
        }
    }
}
时间: 2024-11-05 17:48:08

一个C#多线程的工作队列的相关文章

一个比较多线程(并行)和非多线程下完成同等任务(I/O频繁)所需开销的案例

//非多线程 package test; import java.io.*; import java.security.DigestInputStream; import java.security.MessageDigest; /* * 利用DigestInputStream完成消息摘要计算, * 先调用此摘要输入流的一个 read 方法,之后在关联的消息摘要上调用一个 digest方法. * 本案例包含两个程序,其中一个采用多线程,相互比较. */ public class DigestTh

基于c++11新标准开发一个支持多线程高并发的网络库

背景 新的c++11标准出后,c++语法得到了很多的扩展,比起以往任何时候都要灵活和高效,提高了程序编码的效率,为软件开发人员节省了不少的时间. 之前我也写过基于ACE的网络服务器框架,但ACE毕竟有些臃肿,内部对象关系错综复杂,容易给人造成只见树木不见森林的错觉. 所以打算用c++11开发一个较为简洁,高效,支持高并发的网络库. 开源         花了两三周,终于把基础的结构开发完成,代码也开源在github上,网址是 https://github.com/lichuan/fly 欢迎各位

一个关于多线程和DbHelper的问题

我的初衷是这样的:在多线程环境下,每个数据库编号对应一个DbHelper对象. 下面是代码,不知道这样写有什么问题. 1 namespace TestDAL 2 { 3 public class DB 4 { 5 private static string[] ConnString = new[] 6 { 7 "unknown", "Data Source=163.163.1.100;Initial Catalog=xiaomi;User Id=sa;Password=123

一个java多线程面试题

线程a 打印 数字 0--12: 线程b 打印 字母 a--z; 打印结果:0ab1cd2ef3gh4ij5kl6mn7op8qr9st10uv11wx12yz 要求用到 线程间传值: 分析:线程a打印一个数字,线程b打印两个字母 , 进行13次循环, 通过公共资源类进行线程间传值 public class FftThreadTest {public static void main(String[] args) { //创建final资源对象 final Business business =

一个java多线程实例

import java.util.List; import java.util.ArrayList; import java.util.Queue; import java.util.LinkedList; public class Test3 { public static void main(String[] args) { final Manager3 m3 = new Manager3(); m3.init(); Thread t1 = new Thread(new Runnable()

一个简单多线程购票Demo

package thread; public class Test02 { //定义初始票数 public static int chepiao = 20; public static void main(String[] args) { Test02 t = new Test02(); //匿名类创建线程 Thread t1 = new Thread() { @Override public void run() { //加同步锁 synchronized(Test02.class) { //

多线程概念

一个进程是由一个或者N个线程组成的! 线程:cpu调度和分配的基本单位!电脑中真正执行的是线程! 在同一个时间点,我们的电脑只能运行一个线程 多线程: 如果在一个进程中,同时运行多个线程,来完成不同的工作,我们称之为多线程! CUP不能同时运行多个线程! 一个CPU在同一个时间点,只能运行一个线程,单线程运行的速度太快,我们肉眼无法分辨,所以我们认为是多线程! 生活中进入地铁站的例子: 场景1:现在地铁站有1个进站口,同时来了5个人! 需要排队进站! 场景2:现在地铁站有5个进站口,同时来了5个

ios 多线程开发(一)简介

简介 线程是在一个程序中并发的执行代码的方法之一.虽然有一些新的技术(operations, GCD)提供了更先进高效的并发实现,OS X和iOS同时也提供了创建和维护线程的接口. 这里将要介绍线程相关的包以及如何使用他们.同时也会介绍程序中多线程代码的同步. 关于线程开发 多年以来,电脑的最大处理速度受制于单个处理器的处理速度.当单核处理器开始达到他们的上线时,芯片市场转向了多核的设计,这样电脑就可以同时处理多个任务了.OS X在处理系统任务时使用了这些优势,你自己的程序也可以使用这些优势.

Java面试之多线程

35. 并行和并发有什么区别? 并行:多个处理器或多核处理器同时处理多个任务. 并发:多个任务在同一个 CPU 核上,按细分的时间片轮流(交替)执行,从逻辑上来看那些任务是同时执行. 如下图: 并发 = 两个队列和一台咖啡机. 并行 = 两个队列和两台咖啡机. 36. 线程和进程的区别? 一个程序下至少有一个进程,一个进程下至少有一个线程,一个进程下也可以有多个线程来增加程序的执行速度. 37. 守护线程是什么? 守护线程是运行在后台的一种特殊进程.它独立于控制终端并且周期性地执行某种任务或等待