线程安全之ConcurrentQueue<T>队列

  最近在弄一个小项目,大概600w行的数据,要进行数据清洗,因数据量偏大,如果单线程去执行,会造成效率偏低,只能用多线程了,但采用多线程存在线程安全问题,于是查了下资料,发现有ConcurrentQueue<T>该数据结构,完美的解决了我目前问题。

   采自msdn上面解释:表示线程安全的先进先出 (FIFO) 集合。

先说说简单的用法吧:(来自msdn)

  1.Enqueue(T) 将对象添加到 ConcurrentQueue<T> 的结尾处。

  2.TryDequeue(T) 尝试移除并返回位于并发队列开头处的对象。

  3.Count 获取 ConcurrentQueue<T> 中包含的元素数

  4.IsEmpty 获取一个值,该值指示 ConcurrentQueue<T> 是否为空。

下面是小项目的实现方案,采用最简单的方式(生产者/消费者模式),先将数据写入到队列中,再由消费者进行消费,以下是我写的一个小Demo,用于学习,不对的地方请各位多多指教!

using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;

namespace ThreadCQueue
{
    class Program
    {
        static void Main(string[] args)
        {
            Task t = RunProgram();
            t.Wait();
            Console.WriteLine("ok");
            Console.ReadKey();
        }

        static async Task RunProgram()
        {
            var taskQueue = new ConcurrentQueue<CustomTask>();
            //生产
            var taskSource = Task.Run(() => TaskProducer(taskQueue));
            await taskSource;
            //消费者
            var processors = new Task[4];
            for (var i = 1; i <= 4; i++)
            {
                string processordId = i.ToString();
                processors[i - 1] = Task.Run(() => TaskProcessor(taskQueue, $"Processor {processordId}"));
            }
            await Task.WhenAll(processors);
        }
        static async Task TaskProducer(ConcurrentQueue<CustomTask> queue)
        {
            for (var i = 1; i <= 20; i++)
            {
                await Task.Delay(50);
                var workItem = new CustomTask { Id = i };
                queue.Enqueue(workItem);
            }
        }
        static async Task TaskProcessor(ConcurrentQueue<CustomTask> queue, string name)
        {
            CustomTask workItem;
            await GetRandomDelay();
            while (queue.TryDequeue(out workItem))
            {
                Console.WriteLine($"消费 {workItem.Id}===>{name}");
                await GetRandomDelay();
            }
        }

        static Task GetRandomDelay()
        {
            int delay = new Random(DateTime.Now.Millisecond).Next(1, 500);
            return Task.Delay(delay);
        }
    }
    class CustomTask
    {
        public int Id { get; set; }
    }
}

  

原文地址:https://www.cnblogs.com/SmallHan/p/11874867.html

时间: 2024-08-30 16:01:50

线程安全之ConcurrentQueue<T>队列的相关文章

python学习笔记-Day11 (线程、进程、queue队列、生产消费模型、携程)

线程使用 ###方式一 import threading def f1(arg): print(arg) t = threading.Thread(target=f1, args=(123,)) t.start() # start会调用run方法执行 # t是threading.Thread类的一个对象 # t.start()就会以线程的方式执行函数,可以使用pycharm ctrl选择start方法 # 找到Thread类的start方法,在start方法的注释中就已经写明,会去调用run()

JAVA线程池ThreadPoolExecutor与阻塞队列BlockingQueue .

从Java5开始,Java提供了自己的线程池.每次只执行指定数量的线程,java.util.concurrent.ThreadPoolExecutor 就是这样的线程池.以下是我的学习过程. 首先是构造函数签名如下: [java] view plain copy print ? public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<

线程池ThreadPoolExecutor与阻塞队列BlockingQueue应用

作者QQ:1095737364    QQ群:123300273     欢迎加入! 1.线程池介绍 JDK5.0以上: java.util.concurrent.ThreadPoolExecutor 构造函数签名: public ThreadPoolExecutor( int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, Reje

spring线程池ThreadPoolTaskExecutor与阻塞队列BlockingQueue

一: ThreadPoolTaskExecutor是一个spring的线程池技术,查看代码可以看到这样一个字段: private ThreadPoolExecutor threadPoolExecutor; 可以发现,spring的  ThreadPoolTaskExecutor是使用的jdk中的java.util.concurrent.ThreadPoolExecutor进行实现, 直接看代码: @Override protected ExecutorService initializeExe

线程间通信推荐用队列

# 队列是数据安全的,意思是自带锁,多线程间通信时,使用队列是好的,不会出现同一个数据被多个线程抢占,使用其他数据类型进行通信则需要自己实现锁功能 import queue # 普通先进先出队列 # q = queue.Queue() # 没有参数则表示队列没有指定深度 # q.put(1) # 如果队列指定了深度则到了深度后,会阻塞在这里等待队列有空间后再向队列中放入数据 # q.put_nowait() # 队列满了也不会阻塞在这里,但会有异常 # q.get() # 从队列中获取数据,无数

swift实现线程安全的栈和队列

实现一个线程安全的栈 这里使用数组来存储栈的数据.不足之处在于本例中的Stack可以无限扩容,更好的是初始化时候指定一个最大容量,防止不断扩容申请内存导致内存不够的问题.这里的线程安全使用一个串行队列来保证,实际上也可以通过加锁或者信号量甚至自旋锁来解决. struct Stack<Element> { private var items: [Element] private var queue = DispatchQueue(label: "StackOperationQueue&

[ovs][dpdk] ovs-dpdk 线程数,收包队列,core绑定

http://docs.openvswitch.org/en/latest/intro/install/dpdk/?highlight=dpdk 绑定2,4,6, 8核 [[email protected] ~]# ovs-vsctl set Open_vSwitch . other_config:pmd-cpu-mask=0x0154 设置4个收包队列 [[email protected] ~]# ovs-vsctl set Interface dpdk-p0 options:n_rxq=4

死锁,线程协作(同步,阻塞队列,Condition,管道流)

synchronized死锁 package com.thread.demo.deadlock; public class DeadLock { private static Object lock1 = new Object(); private static Object lock2 = new Object(); public static void main(String[] args) { // 创建线程1 new Thread(new Runnable() { @Override p

生产消费模式:多线程读写队列ConcurrentQueue

需求:现需要将多个数据源的数据导入到目标数据库,这是一个经典的生产消费应用的例子. 直接上代码,看下实现: // 初始化列队缓冲区 队列大小为100 IDataCollection<List<T>> queue = new QueueCollection<List<T>>(100); //开启X个后台任务,读取RabbitMQ队列信息, 把列队信息插入缓冲区队列 var count = 1; for (int i = 0; i < count; i++