利用BlockingCollection实现生产者和消费者队列,实现写文本

  最近开发几个小项目,需要把结果写到txt文件里面,并且按照时间进行分文件,由于对于效率要求较高,所以采用 生产者和消费者 模型来进行写出文本,线程中只需要添加队列就立即返回,而不需要等待写文件的时间

public class WriteItem : IDisposable
    {
        public string Filename { get; }
        public Encoding Encode { get; }
        public bool Append { get; }
        public bool TimeName { get; }

        private StreamWriter _writer;

        private readonly BlockingCollection<string> _blocking = new BlockingCollection<string>();

        public void Write(string msg)
        {
            _blocking.Add(msg);
        }

        public void WriteLine(string msg)
        {
            Write(msg + Environment.NewLine);
        }

        public WriteItem(string filename, Encoding encode, bool append = true, bool timeName = false)
        {
            Filename = filename;
            Encode = encode;
            Append = append;
            TimeName = timeName;

            if (timeName && string.IsNullOrEmpty(Path.GetExtension(filename)))
            {
                this.Filename = Path.Combine(this.Filename, DateTime.Now.ToString("yyyy-MM-dd") + ".txt");
            }
            var dir = Path.GetDirectoryName(this.Filename);
            Directory.CreateDirectory(dir ?? throw new InvalidOperationException());
            _writer = new StreamWriter(this.Filename, this.Append, this.Encode) { AutoFlush = true };
            Task.Factory.StartNew(() =>
            {
                foreach (var s in _blocking.GetConsumingEnumerable())
                {
                    if (TimeName)
                    {
                        var fileNameWithoutExtension = Path.GetFileNameWithoutExtension(filename);
                        var nowDay = DateTime.Now.ToString("yyyy-MM-dd").ToString();
                        if (fileNameWithoutExtension != nowDay)
                        {
                            _writer.Dispose();
                            _writer = new StreamWriter(this.Filename, this.Append, this.Encode) { AutoFlush = true };
                        }
                    }
                    _writer.Write(s);
                }

            }, TaskCreationOptions.LongRunning);
        }

        public void Dispose()
        {
            _writer?.Dispose();
            _blocking?.Dispose();
        }
    }

  然后再写了个字典来维护:

 public class FileWriteQueue
    {
        private static readonly Dictionary<string, WriteItem> Dictionary = new Dictionary<string, WriteItem>();

        public static void AddOrUpdate(string key, WriteItem item)
        {
            if (Dictionary.ContainsKey(key))
            {
                Dictionary[key].Dispose();
                Dictionary[key] = item;
            }
            else
            {
                Dictionary.Add(key, item);
            }
        }

        public static WriteItem Get(string key)
        {
            return Dictionary[key];
        }
    }

  在实际使用添加WirteItem,设置好输出目录就行了:

            FileWriteQueue.AddOrUpdate("success",new WriteItem(Path.Combine("结果","成功"),Encoding.Default,true,true));
            FileWriteQueue.AddOrUpdate("error", new WriteItem(Path.Combine("结果", "失败"), Encoding.Default, true, true));
            for (int i = 0; i < 1000; i++)
            {
                FileWriteQueue.Get("success").WriteLine(i.ToString());
            }

原文地址:https://www.cnblogs.com/mldcy/p/8331619.html

时间: 2024-11-08 18:16:13

利用BlockingCollection实现生产者和消费者队列,实现写文本的相关文章

使用事件等待句柄EventWaitHandler 实现生产者、消费者队列

using System; using System.Threading; using System.Collections.Generic; class ProducerConsumerQueue : IDisposable { EventWaitHandle _wh = new AutoResetEvent (false); Thread _worker; readonly object _locker = new object(); Queue<string> _tasks = new

并发无锁队列学习之二【单生产者单消费者】

1.前言 最近工作比较忙,加班较多,每天晚上回到家10点多了.我不知道自己还能坚持多久,既然选择了就要做到最好.写博客的少了.总觉得少了点什么,需要继续学习.今天继续上个开篇写,介绍单生产者单消费者模型的队列.根据写入队列的内容是定长还是变长,分为单生产者单消费者定长队列和单生产者单消费者变长队列两种.单生产者单消费者模型的队列操作过程是不需要进行加锁的.生产者通过写索引控制入队操作,消费者通过读索引控制出队列操作.二者相互之间对索引是独享,不存在竞争关系.如下图所示: 2.单生产者单消费者定长

并发无锁队列学习(单生产者单消费者模型)

1.引言 本文介绍单生产者单消费者模型的队列.根据写入队列的内容是定长还是变长,分为单生产者单消费者定长队列和单生产者单消费者变长队列两种.单生产者单消费者模型的队列操作过程是不需要进行加锁的.生产者通过写索引控制入队操作,消费者通过读索引控制出队列操作.二者相互之间对索引是独享,不存在竞争关系.如下图所示: 2.单生产者单消费者定长队列 这种队列要求每次入队和出队的内容是定长的,即生产者写入队列和消费者读取队列的内容大小事相同的.linux内核中的kfifo就是这种队列,提供了读和写两个索引.

RabbitMQ基础概念详解(一)——环境配置及模拟生产者和消费者简单消息发送

一.简介: RabbitMq 是实现了高级消息队列协议(AMQP)的开源消息代理中间件.消息队列是一种应用程序对应用程序的通行方式,应用程序通过写消息,将消息传递于队列,由另一应用程序读取 完成通信.而作为中间件的 RabbitMq 无疑是目前最流行的消息队列之一. AMQP,即Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计.消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然.

消息队列_RabbitMQ-0002.深入MQ生产者/信道/交换机/队列/消费者?

形象说明: 比喻: RabbitMQ提供的消息投递服务类似于现实生活中的快递公司,双11我们可能会买很多东西,自然会陆续收到很多寄自淘宝店主由快递公司发来的快件,但是可能很多时候买回来的东西并不合心意,自然会陆续通过快递公司退回快件,所以回归到架构,这里的快件就相当于消息,我们相当于应用程序,淘宝店主相当于服务器,而快递公司相当于路由器,应用程序可以发送和接收消息,服务器也可以发送和接收消息,所以当应用程序连接到RabbitMQ时,就必须做一个决定:我是发送还是接收哪? 现实: 生产者(Prod

Java多线程15:Queue、BlockingQueue以及利用BlockingQueue实现生产者/消费者模型

Queue是什么 队列,是一种数据结构.除了优先级队列和LIFO队列外,队列都是以FIFO(先进先出)的方式对各个元素进行排序的.无论使用哪种排序方式,队列的头都是调用remove()或poll()移除元素的.在FIFO队列中,所有新元素都插入队列的末尾. Queue中的方法 Queue中的方法不难理解,6个,每2对是一个也就是总共3对.看一下JDK API就知道了: 注意一点就好,Queue通常不允许插入Null,尽管某些实现(比如LinkedList)是允许的,但是也不建议. Blockin

用阻塞队列和线程池简单实现生产者和消费者场景

本例子仅仅是博主学习阻塞队列和后的一些小实践,并不是真正的应用场景! 生产者消费者场景是我们应用中最常见的场景,我们可以通过ReentrantLock的Condition和对线程进行wait,notify同通信来实现生产者和消费者场景,前者可以实现多生产者和多消费者模式,后者仅可以实现一生产者,一消费者模式. 今天我们就利用阻塞队列来实现下生产者和消费者模式(里面还利用了线程池). 看过我关于阻塞队列博文的朋友已经知道,阻塞队列其实就是由ReentrantLock实现的! 场景就不描述了,为简单

生产者、消费者、队列

''' queue队列,什么是队列?排队干一件事,谁去维护排队的关系?预防插队等 队列是一个有顺序的容器,有列表了还要队列干什么. 根本区别是列表里拿走一个数据,数据还在里面.队列是数据取走了就没了 为什么要用队列?提高双方效率,解耦合,生产者向队列里放,(队列),消费者从队列里取 1.先入先出 queue.Queue(maxsize=0) queue.put()放数据 (如果放满了再放就会阻塞,用queue.put(block=False)或q.put_nowait()抛异常,或者判断队列长度

利用多线程编写 生产者-消费者 关系

package ace; import java.util.ArrayList; import java.util.Timer; import java.util.TimerTask; /** * 利用多线程编写 生产者-消费者 关系 */ public class ProductionAndConsumption { private static ArrayList<String> products = new ArrayList<String>(); // 产品 private