Parallel Programming-实现并行操作的流水线(生产者、消费者)

本文介绍如何使用C#实现并行执行的流水线(生产者消费者):

  1. 流水线示意图
  2. 实现并行流水线

一、流水线示意图

上图演示了流水线,action1接收input,然后产生结果保存在buffer1中,action2读取buffer1中由action1产生的数据,以此类推指导action4完成产生Output。

以上也是典型的生产者消费者模式。

上面的模式如果使用普通常规的串行执行是很简单的,按部就班按照流程图一步一步执行即可。如果为了提高效率,想使用并行执行,也就是说生产者和消费者同时并行执行,该怎么办么?

二、实现并行流水线

2.1 代码

class PiplelineDemo
    {
        private int seed;
        public PiplelineDemo()
        {
            seed = 10;
        }

        public void Action1(BlockingCollection<string> output)
        {
            try
            {
                for (var i = 0; i < seed; i++)
                {
                    output.Add(i.ToString());//initialize data to buffer1
                }
            }
            finally
            {
                output.CompleteAdding();
            }
        }

        public void Action2(BlockingCollection<string> input, BlockingCollection<string> output)
        {
            try
            {
                foreach (var item in input.GetConsumingEnumerable())
                {
                    var itemToInt = int.Parse(item);
                    output.Add((itemToInt * itemToInt).ToString());// add new data to buffer2
                }
            }
            finally
            {
                output.CompleteAdding();
            }
        }

        public void Action3(BlockingCollection<string> input, BlockingCollection<string> output)
        {
            try
            {
                foreach (var item in input.GetConsumingEnumerable())
                {
                    output.Add(item);//set data into buffer3
                }
            }
            finally
            {
                output.CompleteAdding();
            }
        }

        public void Pipeline()
        {
            var buffer1 = new BlockingCollection<string>(seed);
            var buffer2 = new BlockingCollection<string>(seed);
            var buffer3 = new BlockingCollection<string>(seed);
            var taskFactory = new TaskFactory(TaskCreationOptions.LongRunning, TaskContinuationOptions.None);
            var stage1 = taskFactory.StartNew(() => Action1(buffer1));
            var stage2 = taskFactory.StartNew(() => Action2(buffer1, buffer2));
            var stage3 = taskFactory.StartNew(() => Action3(buffer2, buffer3));

            Task.WaitAll(stage1, stage2, stage3);
            foreach(var item in buffer3.GetConsumingEnumerable())//print data in buffer3
            {
                Console.WriteLine(item);
            }
        }
    }

    class Program
    {
        static void Main(string[] args)
        {
            new PiplelineDemo().Pipeline();
            Console.Read();
        }
    }

2.2 运行结果

预期打印出了0-9自我相乘的结果。

2.3 代码解释

代码本身的逻辑和本文开始的流程图是一一对应的。

BlockingCollection<T>是.Net里面的一个线程安全集合。实现了IProducerConsumerCollection<T>.

  1. Add方法:将元素加入集合
  2. CompleteAdding方法:告诉消费者,在当调用该方法之前的元素处理完之后就不要再等待处理了,可以结束处理了。这个非常重要,一定要执行,所以放在finally中(就算exception也要执行)
  3. GetConsumingEnumberable,给消费者返回一个可以便利的集合

GetConsumingEnumberable是一个非常强大的东东,专门写一片文章介绍介绍。

时间: 2024-11-20 03:42:05

Parallel Programming-实现并行操作的流水线(生产者、消费者)的相关文章

OpenMP实现生产者消费者模型

生产者消费者模型已经很古老了吧,最近写了个OpenMP版的此模型之实现,来分享下. 先说一下模型的大致做法是: 1.生产者需要取任务,生产产品. 2.消费者需要取产品,消费产品. 生产者在生产某个产品之后,要告知消费者此产品已经可以使用了.消费者通过获得可以使用这个信号来取得产品,进一步消费产品. 比如,我们有N个图像需要对每一个图像作滤波或者变换等处理,并且把处理后的结果存到硬盘上. 那么生产者可以将N个图像看成N个任务,每个任务都是独立的,每个任务的计算结果可以看成是产品,消费者就是取这个产

python 生产者消费者线程模型

python 多线程生产者消费者模型: 一个生产者多个消费者 The Queue module implements multi-producer, multi-consumer queues. It is especially useful in threaded programming when information must be exchanged safely between multiple threads. The Queue class in this module imple

13 join 线程锁之Lock\Rlock\信号量 将线程变为守护进程 Event事件  queue队列 生产者消费者模型 Queue队列 开发一个线程池

本节内容 操作系统发展史介绍 进程.与线程区别 python GIL全局解释器锁 线程 语法 join 线程锁之Lock\Rlock\信号量 将线程变为守护进程 Event事件 queue队列 生产者消费者模型 Queue队列 开发一个线程池 进程 语法 进程间通讯 进程池 操作系统发展史 手工操作(无操作系统) 1946年第一台计算机诞生--20世纪50年代中期,还未出现操作系统,计算机工作采用手工操作方式. 手工操作程序员将对应于程序和数据的已穿孔的纸带(或卡片)装入输入机,然后启动输入机把

进击的Python【第九章】:paramiko模块、线程与进程、各种线程锁、queue队列、生产者消费者模型

一.paramiko模块 他是什么东西? paramiko模块是用python语言写的一个模块,遵循SSH2协议,支持以加密和认证的方式,进行远程服务器的连接. 先来个实例: 1 import paramiko 2 # 创建SSH对象 3 ssh = paramiko.SSHClient() 4 5 # 允许连接不在know_hosts文件中的主机 6 ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) 7 # 连接服务器 8 ss

多线程、生产者消费者模型

目录 生产者消费者模型 生产者消费者模型 为什么要使用生产者和消费者模式 什么是生产者消费者模式 基于队列实现生产者消费者模型 多线程 什么是线程 开启线程的两种方式 线程与进程区别 Tread类的常用属性 守护线程 线程锁 生产者消费者模型 生产者消费者模型 在并发编程中使用生产者和消费者模式能够解决绝大多数并发问题.该模式通过平衡生产线程和消费线程的工作能力来提高程序的整体处理数据的速度. 为什么要使用生产者和消费者模式 在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程.在多

生产者消费者模型Java实现

生产者消费者模型 生产者消费者模型可以描述为: ①生产者持续生产,直到仓库放满产品,则停止生产进入等待状态:仓库不满后继续生产: ②消费者持续消费,直到仓库空,则停止消费进入等待状态:仓库不空后,继续消费: ③生产者可以有多个,消费者也可以有多个: 生产者消费者模型 对应到程序中,仓库对应缓冲区,可以使用队列来作为缓冲区,并且这个队列应该是有界的,即最大容量是固定的:进入等待状态,则表示要阻塞当前线程,直到某一条件满足,再进行唤醒. 常见的实现方式主要有以下几种. ①使用wait()和notif

go 生产者消费者模型与发布订阅模型

作者:Gundy_链接:https://www.jianshu.com/p/dc94f2099277 生产者消费者模型 并发编程中最常见的例子就是生产者消费者模式,该模式主要通过平衡生产线程和消费线程的工作能力来提高程序的整体处理数据的速度.简单地说,就是生产者生产一些数据,然后放到成果队列中,同时消费者从成果队列中来取这些数据.这样就让生产消费变成了异步的两个过程.当成果队列中没有数据时,消费者就进入饥饿的等待中:而当成果队列中数据已满时,生产者则面临因产品挤压导致CPU被剥夺的下岗问题. /

阻塞队列和生产者-消费者模式

阻塞队列提供了可阻塞的put和take方法.如果队列满了put将阻塞到有空间可用,如果队列为空,take将阻塞到有元素可用.队列可以是有界和无界的,无界的队列put将不会阻塞. 阻塞队列支持生产者消费者模式,该模式将找出需要完成的工作,和执行工作分开.生产者-消费者模式能简化开发过程,因为消除了生产者和消费者之间的代码依赖性,此外,该模式还将生产数据的过程和使用数据的过程解耦开来. 在基于阻塞队列构建的生产者-消费者设计中个,当数据生成时,生产者把数据放入队列,当消费者处理数据时,将从队列中获取

4.利用python生成器实现简单的“生产者消费者”模型

假如说,没有生成器这种对象,那么如何实现这种简单的"生产者消费者"模型呢? import time def producer(): pro_list = [] for i in range(10000): print "包子%s制作ing" %(i) time.sleep(0.5) pro_list.append("包子%s" %i) return pro_list def consumer(pro_list): for index,stuffe