关于生产者/消费者/订阅者模式的那些事

生产者/消费者模式


简介

用来干嘛的?

生产者/消费者模式的产生主要目的就是为了解决非同步的生产与消费之间的问题。

什么是非同步呢?

比如我刚刚生产了某个产品,而此时你正在打游戏,没空来取,要打完游戏来取,这就导致了我生产产品和你取产品是两个非同步的动作,你不知道我什么时候生产完产品,而我也不知道你什么时候来取。

而生产者/消费者模式就是解决这个非同步问题的,因为肯定不可能我生产完一个就给你打个电话叫你来取,然后等你取完我再生产下一个,这是多么低效的一种做法。所以这个模式运用而生,这个模式在生活中也有很好的体现,如:快递员派信这个例子,我就是生产者,快递员就是消费者,而生产者与消费者之间是通过什么来解决这种非同步的问题呢?就是一个存储中介,作为快递员派信这个例子中,信箱就是这个存储中介,每次我只要把写完的信扔入信箱,而快递员隔三差五的就会来取一次信,这两个动作是完全异步的,我把信扔入信箱后就不需要管什么了,之后肯定有快递员来取。

如何设计

设计图

生产者/消费者模式能够解决非同步的生产与消费的问题,归功就是存储中介的作用,因为生产者只要把生产完的物品放入存储中介中就行了,而不必关系消费者什么时候来取,当消费者需要时自然会来取,当存储中介满了的话,那么生产者将停止生产,因为再生产就没地放了,这时候就需要等待消费者消费了,而当存储中介没有时,这时候消费者来取那肯定取不到,所以也需要Wait,等待生产者生产后才能取到。所以这就有了下面这个设计图:

定位

从上图中可以知道,一个完整的生产者/消费者模式具有生产者、消费者、存储中介以及产品。这四个缺一不可,而关于它们的定位也至关重要

1 . Product

对于产品,什么样的对象才能成为产品呢,一是根据当时的业务逻辑来判断,比如执行完某些操作后的产生的Result,二是必须保持每个产品之间的完整性和独立性,保证各个产品之间互不影响、互不关联。

2 . Store

对于存储中介,它肯定是一块具有额定大小的存储空间,而这个存储空间一般来说具有FIFO的数据结构,比如JDK内置了具有阻塞作用的有界队列:ArrayBlockingQueue、LinkedBlockingQueue。并且存储中介需要起到生产者与消费者解耦的作用,这样的好处是当后期生产者或者消费者的生产方式或处理方式变了,这样只需要改变一方,而另外一方则不需要调整。而且它负责协调生产者与消费者之间的生产消费关系。

3 . Producer

对于生产者,它是具有配置Product各种属性的一个对象,可以设计成Factory、Builder、装饰者模式等等,一般来说生产者有单独的一个线程用来生产产品,当然如果量大的话可以用多个线程去生产,不过需要处理一下线程同步的问题(Semaphore|synchronized|ThreadLocal)

4 . Consumer

对于消费者,和Producer差不多,主要就是用来处理Product的,一般也有单独的一个线程去处理Product。

实例

1 . 生产者

AppleProducer

public class AppleProducer {
    private Store<Apple> mStore;
    private static ExecutorService mWorkThread = Executors.newFixedThreadPool(5);
    private String name;

    public AppleProducer setName(String name) {
        this.name = name;
        return this;
    }

    public AppleProducer bindStore(Store<Apple> store) {
        this.mStore = store;
        return this;
    }

    public void production() {
        mWorkThread.execute(new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                    Thread.currentThread().interrupt();
                }
                Apple apple = new Apple("第" + name + "个产品");
                mStore.push(apple);
            }
        });
    }
}

2 . 消费者

AppleConsumer

public class AppleConsumer {
    private Store<Apple> mStore;
    private ExecutorService mWorkThread = Executors.newFixedThreadPool(1);

    public AppleConsumer bindStore(Store<Apple> store) {
        this.mStore = store;
        return this;
    }

    public void consume() {
        mWorkThread.submit(new Runnable() {
            @Override
            public void run() {
                for (; ; ) {
                    try {
                        Thread.sleep(2000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                        Thread.currentThread().interrupt();
                    }
                    Apple apple = mStore.take();
                    System.out.println("apple:" + apple.getName() + "消费了");
                }
            }
        });

    }
}

3 . 存储中介

Store

public class Store<T> {
    private BlockingQueue<T> mQueue = new ArrayBlockingQueue<>(10, true);

    public void push(T t) {
        try {
            mQueue.put(t);
        } catch (InterruptedException e) {
            e.printStackTrace();
            Thread.currentThread().interrupt();
        }
        System.out.println("product生产了...");
    }

    public T take() {
        T t = null;
        try {
            t = mQueue.take();
        } catch (InterruptedException e) {
            e.printStackTrace();
            Thread.currentThread().interrupt();
        }
        System.out.println("product取出了...");
        return t;
    }

    public void release() {
        if (mQueue.isEmpty())
            return;
        mQueue.clear();
    }
}

4 . 产品

public class Apple {
    private String name;

    public Apple(String name) {
        this.name = name;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }
}

5 . Test

public class Test {
    public static void main(String[] args) {
        Store<Apple> store = new Store<>();

        new AppleConsumer()
                .bindStore(store)
                .consume();

        for (int i = 0; i < 30; i++) {
            new AppleProducer()
                    .bindStore(store)
                    .setName(i + "")
                    .production();
        }
    }
}

result:

product生产了…

product生产了…

product生产了…

product生产了…

product生产了…

product取出了…

apple:第1个产品消费了

product生产了…

product生产了…

product生产了…

product生产了…

product生产了…

product生产了…

product取出了…

apple:第0个产品消费了

product生产了…

product取出了…

product生产了…

apple:第3个产品消费了

product取出了…

apple:第2个产品消费了

product生产了…

product取出了…

apple:第4个产品消费了

product生产了…

product取出了…

apple:第5个产品消费了

product生产了…

product取出了…

apple:第6个产品消费了

product生产了…



对于生产者,为了避免生产线程数量过多采取了一个线程池控制生产线程的数量,而生产者每一件产品都是由单独的一个线程来生产,对于消费者,用一个线程去轮询取队列里的产品,有则取出,没有则阻塞等待,由于ArrayBlockingQueue本身是支持并发的,所以在多线程共同操作一个存储队列的情况下,并不会有并发的问题。

所以生产者/消费者模式也支持多生产——多消费的模式:

对于有众多种类不同的生产者,可以用一个工厂类来管理。

数据源/订阅者模式

关于这个模式,其实是生产者/消费者模式的变体,这种模式并不需要存储中介,而是通过一个DataSource空壳来包装数据,对于发布者提交了一个Task后,将立即返回一个DataSource,对于任务执行完后的结果,如果你想获取则必须通过datasource.subscribe(new XxxSubscriber(…))来订阅获取执行后的结果,而如果不通过数据源订阅的方式来获取而直接通过datasource.getData()获取则返回null,因为DataSource只是一个获取数据的空壳。这种模式在Fresco源码中有很好的体现,用了大量的这种模式。

栗子:

        DataSource<Apple> dataSource = ProducerFactory.newAppleProducer().submit(name);
        //Apple apple = dataSource.getData();// apple is null!
        dataSource.subscribe(new BaseDataSubscriber<Apple>() {
            @Override
            protected void onSuccess(DataSource<Apple> dataSource) {
                        Apple apple = dataSource.getData();
                        //...
            }

            @Override
            protected void onFailure(DataSource<Apple> dataSource,Throwable throwable) {
                        //...
            }
        });
时间: 2024-10-02 23:51:49

关于生产者/消费者/订阅者模式的那些事的相关文章

RabbitMQ下的生产消费者模式与订阅发布模式

??所谓模式,就是在某种场景下,一类问题及其解决方案的总结归纳.生产消费者模式与订阅发布模式是使用消息中间件时常用的两种模式,用于功能解耦和分布式系统间的消息通信,以下面两种场景为例: 数据接入 ??假设有一个用户行为采集系统,负责从App端采集用户点击行为数据.通常会将数据上报和数据处理分离开,即App端通过REST API上报数据,后端拿到数据后放入队列中就立刻返回,而数据处理则另外使用Worker从队列中取出数据来做,如下图所示. ??这样做的好处有:第一,功能分离,上报的API接口不关心

go 生产者消费者模型与发布订阅模型

作者:Gundy_链接:https://www.jianshu.com/p/dc94f2099277 生产者消费者模型 并发编程中最常见的例子就是生产者消费者模式,该模式主要通过平衡生产线程和消费线程的工作能力来提高程序的整体处理数据的速度.简单地说,就是生产者生产一些数据,然后放到成果队列中,同时消费者从成果队列中来取这些数据.这样就让生产消费变成了异步的两个过程.当成果队列中没有数据时,消费者就进入饥饿的等待中:而当成果队列中数据已满时,生产者则面临因产品挤压导致CPU被剥夺的下岗问题. /

阻塞队列和生产者-消费者模式

阻塞队列提供了可阻塞的put和take方法.如果队列满了put将阻塞到有空间可用,如果队列为空,take将阻塞到有元素可用.队列可以是有界和无界的,无界的队列put将不会阻塞. 阻塞队列支持生产者消费者模式,该模式将找出需要完成的工作,和执行工作分开.生产者-消费者模式能简化开发过程,因为消除了生产者和消费者之间的代码依赖性,此外,该模式还将生产数据的过程和使用数据的过程解耦开来. 在基于阻塞队列构建的生产者-消费者设计中个,当数据生成时,生产者把数据放入队列,当消费者处理数据时,将从队列中获取

并发编程基础之生产者消费者模式

一:概念 生产者消费者模式是java并发编程中很经典的并发情况,首先有一个大的容器,生产者put元素到 容器中,消费者take元素出来,如果元素的数量超过容器的容量时,生产者不能再往容器中put元素 ,处于阻塞状态,如果元素的数量等于0,则消费者不能在从容器中take数据,处于阻塞状态. 二:示例 /** * */ package com.hlcui.main; import java.util.LinkedList; import java.util.concurrent.ExecutorSe

生产者消费者模式

什么是生产者消费者模式   在工作中,大家可能会碰到这样一种情况:某个模块负责产生数据,这些数据由另一个模块来负责处理(此处的模块是广义的,可以是类.函数.线程.进程等).产生数据的模块,就形象地称为生产者:而处理数据的模块,就称为消费者.在生产者与消费者之间在加个缓冲区,我们形象的称之为仓库,生产者负责往仓库了进商品,而消费者负责从仓库里拿商品,这就构成了生产者消费者模式.结构图如下: 生产者消费者模式的优点 1.解耦 假设生产者和消费者分别是两个类.如果让生产者直接调用消费者的某个方法,那

使用BlockingQueue的生产者消费者模式

BlockingQueue很好的解决了多线程中,如何高效安全“传输”数据的问题.通过这些高效并且线程安全的队列类,为我们快速搭建高质量的多线程程序带来极大的便利.使用场景. 首先它是一个队列,而一个队列在数据结构中所起的作用大致如下图所示: 通过一个共享的队列,可以使得数据由队列的一端输入,从另外一端输出:在生产者消费者模式中,通过队列的方式可以很方便的实现两者之间的数据共享.强大的BlockingQueue使我们不用关心什么时候需要阻塞线程,什么时候需要唤醒线程. BlockingQueue的

生产者消费者模式(转)

本文转载自博文系列架构设计:生产者/消费者模式.文中对原文格式进行了稍加整理. 概述 今天打算来介绍一下“生产者/消费者模式”,这玩意儿在很多开发领域都能派上用场.由于该模式很重要,打算分几个帖子来介绍.今天这个帖子先来扫盲一把.如果你对这个模式已经比较了解,请跳过本扫盲帖,直接看下一个帖子(关于该模式的具体应用) . 看到这里,可能有同学心中犯嘀咕了:在四人帮(GOF)的23种模式里面似乎没听说过这种嘛!其实GOF那经典的23种模式主要是基于OO的(从书名<Design Patterns: E

生产者消费者模式(吃包子例子)

生产者-消费者问题是一个经典的进程同步问 题,该问题最早由Dijkstra提出,用以演示他提出的信号量机制.在同一个进程地址空间内执行的两个线程生产者线程生产物品,然后将物品放置在一个空 缓冲区中供消费者线程消费.消费者线程从缓冲区中获得物品,然后释放缓冲区.当生产者线程生产物品时,如果没有空缓冲区可用,那么生产者线程必须等待消费 者线程释放出一个空缓冲区.当消费者线程消费物品时,如果没有满的缓冲区,那么消费者线程将被阻塞,直到新的物品被生产出来. 生产者消费者模式是并发.多线程编程中经典的设计

Java 并发编程(四)阻塞队列和生产者-消费者模式

阻塞队列 阻塞队列提供了可阻塞的 put 和 take 方法,以及支持定时的 offer 和 poll 方法.如果队列已经满了,那么put方法将阻塞直到有空间可以用:如果队列为空,那么take方法将一直阻塞直到有元素可用.队列可以使有界的,也可以是无界的,无界队列永远都不会充满,因此无界队列上的put方法永远不会阻塞.一种常见的阻塞生产者-消费者模式就是线程池与工作队列的组合,在 Executor 任务执行框架中就体现了这种模式. 意义:该模式能简化开发过程,因为他消除了生产者和消费者类之间的代