ParallelProgramming-多消费者,多生产者同时运行并行

在上一篇文章演示了并行的流水线操作(生产者和消费者并行同时执行),C#是通过BlockingCollection这个线程安全的对象作为Buffer,并且结合Task来实现的。但是上一篇文章有个缺陷,在整个流水线上,生产者和消费者是唯一的。本文将演示多个消费者多个生产者同时并行执行。

一、多消费者、多生产者示意图

与前一篇文章演示的流水线思想类似,不同之处就是本文的topic:消费者和生产者有多个,以buffer1为例,起生产者有两个,消费者有两个,现在有三个纬度的并行:

  1. Action1和Action2并行(消费者和生产者并行)
  2. 消费者并行(Action2.1和Action2.2并行)
  3. 生产者并行(Action1.1和Action1.2并行)

二、实现

2.1 代码

 class PiplelineDemo
    {
        PRivate int seed;
        public PiplelineDemo()
        {
            seed = 10;
        }

        public void Action11(BlockingCollection<string> output)
        {
            for (var i = 0; i < seed; i++)
            {
                output.Add(i.ToString());//initialize data to buffer1
            }
        }

        public void Action12(BlockingCollection<string> output)
        {
            for (var i = 0; i < seed; i++)
            {
                output.Add(i.ToString());//initialize data to buffer1
            }
        }

        public void Action21(BlockingCollection<string> input, BlockingCollection<string> output)
        {
            foreach (var item in input.GetConsumingEnumerable())
            {
                var itemToInt = int.Parse(item);
                output.Add((itemToInt * itemToInt).ToString());// add new data to buffer2
            }
        }

        public void Action22(BlockingCollection<string> input, BlockingCollection<string> output)
        {
            foreach (var item in input.GetConsumingEnumerable())
            {
                var itemToInt = int.Parse(item);
                output.Add((itemToInt * itemToInt).ToString());// add new data to buffer2
            }
        }

        public void Action31(BlockingCollection<string> input, BlockingCollection<string> output)
        {
            foreach (var item in input.GetConsumingEnumerable())
            {
                output.Add((item));// add new data to buffer3
            }
        }

        public void Action32(BlockingCollection<string> input, BlockingCollection<string> output)
        {
            foreach (var item in input.GetConsumingEnumerable())
            {
                output.Add((item));// add new data to buffer3
            }
        }
        public void Pipeline()
        {
            var buffer1 = new BlockingCollection<string>(seed * 2);
            var buffer2 = new BlockingCollection<string>(seed * 2);
            var buffer3 = new BlockingCollection<string>(seed * 2);
            var taskFactory = new TaskFactory(TaskCreationOptions.LongRunning, TaskContinuationOptions.None);
            var stage11 = taskFactory.StartNew(() => Action11(buffer1));
            var stage12 = taskFactory.StartNew(() => Action12(buffer1));
            Task.Factory.ContinueWhenAll(new Task[] { stage11, stage12 }, (tasks) =>
            {
                buffer1.CompleteAdding();
            });
            var stage21 = taskFactory.StartNew(() => Action21(buffer1, buffer2));
            var stage22 = taskFactory.StartNew(() => Action22(buffer1, buffer2));
            Task.Factory.ContinueWhenAll(new Task[] { stage21, stage22 }, (tasks) =>
            {
                buffer2.CompleteAdding();
            });
            var stage31 = taskFactory.StartNew(() => Action31(buffer2, buffer3));
            var stage32 = taskFactory.StartNew(() => Action32(buffer2, buffer3));
            Task.Factory.ContinueWhenAll(new Task[] { stage31, stage32 }, (tasks) =>
            {
                buffer3.CompleteAdding();
            });
            Task.WaitAll(stage11, stage12, stage21, stage22, stage31, stage32);
            foreach (var item in buffer3.GetConsumingEnumerable())//print data in buffer3
            {
                Console.WriteLine(item);
            }
        }
    }

2.2 运行结果

2.3 代码解释

  1. Action11和Action12相对比较好理解。初始化数据到buffer1。
  2. Action2.1和Action2.2相对比较费解,他们同时接受buffer1作为输入,为什么最终的结果Buffer2没有产生重复? 最后由Action21,action22同时产生的buffer3为什么也没有重复?这就是GetConsumingEnumerable这个方法的功劳。这个方法会将buffer的数据分成多份给多个消费者,如果一个value已经被一个消费者获取,那么其他消费者将不会再拿到这个值。这就回答了为什么没有重复这个问题。
  3. 上面方法同时使用了多任务延续(ContinueWhenAll)对buffer的调用CompleteAdding方法:该方法非常重要,如果没有调用这个方法,程序会进入死锁,因为消费者(consumer)会处于一直的等待状态。
时间: 2024-10-31 09:32:14

ParallelProgramming-多消费者,多生产者同时运行并行的相关文章

Java程序设计之消费者和生产者

新建一个Break类,表示食物数量. public class Break { public static final int MAX = 10; //最多一次性煮十个面包 Stack<Integer> stack = new Stack<Integer>(); public static int food_count = 0; //统计食物的个数 //做面包 public synchronized void make_food(int number){ stack.push(nu

mutex&condition variable 黄金搭档之 多消费者多生产者

Condition Variable都会搭配一个Mutex来用.我们知道Mutex的普通意义上是维持一个互斥变量,从而保证一个或一组操作的原子性.同样,简单的说Mutex加在Condition Variable上也是为了保证它的原子性了.Condition Variable,有条件的唤醒机制.最经典不过的就是生产--消息者模型了.但有一个细节,消费者需要有"产品"才能继续它的消费行为,因此当消费者发现"产品"被消费完了?它该怎么办?没错,普通情况下它就会进入等待挂起

消费者与生产者(多线程)

1 /** 2 * 要求:假定有10个消费者去消费生产的产品, 产品要保证顺序被其他任意消费者消费,上一个消费完了下一个才能消费 3 * 4 * @author trfizeng 5 * 6 */ 7 public class SCThread { 8 public static void main(String[] args) { 9 System.out.println("begin:" + (System.currentTimeMillis()) / 1000); 10 for

[Java基础] java多线程关于消费者和生产者

多线程: 生产与消费1.生产者Producer生产produce产品,并将产品放到库存inventory里:同时消费者Consumer从库存inventory里消费consume产品.2.库存inventory的总量(最大库存量100)是有限的.如果库存inventory满了,生产者不能在库存inventory继续生产produce产品,须等待状态.等待产品被消费者Consumer消费consume了,再往库存inventory生产produce产品.3.若库存inventory空了,消费者Co

java利用lock和unlock实现消费者与生产者问题(多线程)

import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class LockAndUnlockDemo {     public static void main(String[] args) {         Clerk2 c=new Clerk2();        

通过消费者和生产者的多线程程序,了解Java的wait()和notify()用法

仓库类 public class Store { private int size = 0;//当前容量 private final int MAX = 10;//最大容量 //向仓库中增加货物 public synchronized void add() { while(size >= MAX)//每次执行都要检查是否已满 { try { wait();//如果满了,进入等待池等待 } catch (InterruptedException e) { e.printStackTrace();

微观经济学(七):市场和福利 - 消费者、生产者与市场效率

前言 配置资源的一种方式是交给市场控制,但是这种配置方式是否合意呢?在本章中,我们要讨论福利经济学(welfare economics)这个主题,即研究资源配置如何影响经济福利的一门学问.这种分析将得出一个影响深远的的结论:市场上的供求均衡可以使买者和卖者得到的总利益最大化. 消费者剩余 支付意愿 假设你有一张崭新的猫王首张专辑想要卖出,卖出的一种方法是举行一场拍卖会.四个猫王迷出现在你的拍卖会上,他们每个人都想拥有这张专辑,但每个人愿意为此支付的价格都有限.下面表格列出了这四个人可能的买者中每

java 模拟实现消费者和生产者问题

题目要求 用java代码模拟实现:一个人不断往箱子里放苹果,另一个人不断从箱子里取苹果,箱子只能放5个苹果,苹果数量无限.要求不使用java.util.concurrent包中的类. 思路 这道题主要考,java并发编程.Object.wai().Object.notify()方法的使用.循环队列的使用 1.使用两个线程分别模拟放苹果和取苹果. 2.定义一个类放苹果,类里主要是对一个数组的封装 注意: Object.wait()及Object.notify()方法的使用,下面是摘自JDK1.6文

二、Kafka基础实战:消费者和生产者实例

一.Kafka消费者编程模型 1.分区消费模型 分区消费伪代码描述 main() 获取分区的size for index =0 to size create thread(or process) consumer(Index) 第index个线程(进程) consumer(index) 创建到kafka broker的连接: KafkaClient(host,port) 指定消费参数构建consumer: SimpleConsumer(topic, partitions) 设置消费offset