Java多线程设计模式(2)生产者与消费者模式

1 Producer-Consumer Pattern

Producer-Consumer Pattern主要就是在生产者与消费者之间建立一个“桥梁参与者”,用来解决生产者线程与消费者线程之间速度的不匹配。

当要从某个线程Produccer参与者将数据传输给其它线程Consumer参与者的时候,此时就可以在中间加一个Channel参与者,在Channel参与者中以某种方式存放接受的数据,再以某方式来获取收到的数据,Channel就可以来缓存两个线程之间传输的数据,在Channel参与者为了保证安全性,也要用Guarded Suspension Pattern模式。

Channel参与者作为一个中间者,当Channel参与者从Producer参与者接收到数据,可以用三种方式将数据按顺序 传递给Consumer参与者。

1 队列,这是一种按照FIFO的方式存储数据,即最先到达的数据最先传输给Consumer参与者。在Java中,可以利用数组形式来存放,每次从数组下标最前端获取数据,而从数组下标最后端来缓存数据。也可以利用LinkedList来存放,每次缓存数据的时候,利用LinkedList.addLast(obj),每次获取数据的时候利用LinkedList.removeFirst();移除并且返回队列的第一个元素。

2 堆栈,这是一种以LIFO的方式存储数据,即最先到达的数据最后传输给Consumer参与者。在Java中,对于堆栈的实现,可以直接使用LinkedList,利用pop()从栈顶弹出一个数据来获取数据,利用push(obj)来向堆栈中缓存一个数据

3 优先级队列,对于缓存的数据设置一些优先级来存储。

生产者与消费者模式,其实就是线程之间的合作关系,同时又包含了互斥关系。所谓的合作就是生产者生成产品,提供消费者消费。所谓的互斥就是生产者和消费者对于中间的缓冲区是互斥访问的。

实例:

几个厨师制作食物,将物品放置在桌子上,但是桌子放置的盘子有限,消费者可以从桌子上获取食物来吃。当桌子上有空位置的时候,厨师就可以继续放置做好的食物,且通知消费者来吃,但是满了就只能一直等待消费者吃了有空的位置。而消费者每次取食物的时候,如果桌子上面有食物,则就取走,并且通知厨师来做食物,如果没有则就等待。

生产者Producer代码:

package whut.producer;
import java.util.Random;
public class MakerThread extends Thread{
    private final Table table;
    private final Random random;
    private static int id=0;
    public MakerThread(String name,Table table,long seed)
    {
        super(name);
        this.table=table;
        this.random=new Random(seed);
    }

    public void run()
    {
        try{
            while(true)
            {
                Thread.sleep(random.nextInt(1000));
                String cake=" [Cake No."+nextId()+" by "+Thread.currentThread().getName()+"]";
                table.put(cake);

            }
        }catch(InterruptedException e)
        {
        }
    }
    //为了使得所有实例共享该字段
    public static synchronized int nextId()
    {
        return id++;
    }
}

消费者Consumer代码:

package whut.producer;
import java.util.Random;
public class EaterThread extends Thread{
    private final Table table;
    private final Random random;
    public EaterThread(String name,Table table,long seed)
    {
        super(name);
        this.table=table;
        this.random=new Random(seed);
    }
    public void run()
    {
        try{
            while(true)
            {
                String cake=table.take();
                Thread.sleep(random.nextInt(1000));
            }
        }catch(InterruptedException e)
        {
        }
    }
}

Channel中间缓冲区,关键部分

package whut.producer;
public class Table {
private final String[] cakes;//利用数组来作为缓冲区
    private int head;//下一次蛋糕取的位置
    private int tail;//下一次蛋糕放置位置
    private int count;//桌子上蛋糕的总数
    public Table(int count)
    {
        this.cakes=new String[count];
        this.head=0;
        this.tail=0;
        this.count=0;
    }

    public synchronized void put(String cake)throws InterruptedException
    {
        System.out.println(Thread.currentThread().getName()+" puts "+cake);
        while(count>=cakes.length)
        {
            System.out.println(Thread.currentThread().getName()+" Begin wait....");
            wait();
            System.out.println(Thread.currentThread().getName()+" End wait....");
        }
        cakes[tail]=cake;
        tail=(tail+1)%cakes.length;
        count++;
        notifyAll();
    }

    //取蛋糕
    public synchronized String take()throws InterruptedException
    {
        while(count<=0)
        {
            System.out.println(Thread.currentThread().getName()+" Begin wait....");
            wait();
            System.out.println(Thread.currentThread().getName()+" End wait....");
        }
        String cake=cakes[head];
        head=(head+1)%cakes.length;
        count--;
        notifyAll();
        System.out.println(Thread.currentThread().getName()+" gets "+cake);
        return cake;
    }
}

  测试类:

package whut.producer;
public class MainTest {
    public static void main(String[] args) {
        // TODO Auto-generated method stub
        Table table=new Table(3);
        new MakerThread("MakerThread-1",table,31415).start();
        new MakerThread("MakerThread-2",table,92653).start();
        new MakerThread("MakerThread-3",table,58979).start();

        new EaterThread("EaterThread-1",table,32384).start();
        new EaterThread("EaterThread-2",table,32384).start();
        new EaterThread("EaterThread-3",table,32384).start();
        //可以通过调用interrupt来去中断结束任何线程
    }
}
时间: 2024-12-05 10:24:18

Java多线程设计模式(2)生产者与消费者模式的相关文章

Java并发编程(4)--生产者与消费者模式介绍

一.前言 这种模式在生活是最常见的,那么它的场景是什么样的呢? 下面是我假象的,假设有一个仓库,仓库有一个生产者和一个消费者,消费者过来消费的时候会检测仓库中是否有库存,如果没有了则等待生产,如果有就先消费直至消费完成:而生产者每天的工作就是先检测仓库是否有库存,如果没有就开始生产,满仓了就停止生产等待消费,直至工作结束.下图是根据假象画的流程图: 那么在程序中怎么才能达到这样的效果呢?下面介绍三种方式实现. 二.使用notify() 和 wait()实现 相信大家这两个方法都不陌生,它是Obj

java多线程中的生产者与消费者之等待唤醒机制@Version1.0

一.生产者消费者模式的学生类成员变量生产与消费demo,第一版1.等待唤醒:    Object类中提供了三个方法:    wait():等待    notify():唤醒单个线程    notifyAll():唤醒所有线程2.为什么这些方法不定义在Thread类中呢?  这些方法的调用必须通过锁对象调用,而我们刚才使用的锁对象是任意锁对象.  所以,这些方法必须定义在Object类中.3.当我们在使用多线程的时候有的时候需要,一条线程产生一个数据,另一条线程接着消费一个数据,一边生产一边消费,

java多线程中的生产者与消费者之等待唤醒机制@Version2.0

二.生产者消费者模式的学生类成员变量生产与消费demo, @Version2.0 在学生类中添加同步方法:synchronized get()消费者,synchronized set()生产者 最终版的代码中: 把student的成员变量给私有化了, 把设置和获取的功能给封装成了功能,并加了同步, 设置或者获取的线程里面只需要调用方法即可. 1.等待唤醒:    Object类中提供了三个方法:    wait():等待    notify():唤醒单个线程    notifyAll():唤醒所

Java多线程与并发——生产者与消费者应用案例

多线程的开发中有一个最经典的操作案例,就是生产者-消费者,生产者不断生产产品,消费者不断取走产品. package com.vince; /** * 生产者与消费者案例 * @author Administrator * */ public class ThreadDemo4 { public static void main(String[] args) { // TODO 自动生成的方法存根 Food food=new Food(); Producter p=new Producter(fo

JAVA并发实现五(生产者和消费者模式Condition方式实现)

package com.subject01; import java.util.PriorityQueue; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; /** * Condition是在java 1.5中才出现的,它用来替代传统的Object的wait().notify()

Java多线程设计模式(4)线程池模式

前序: Thread-Per-Message Pattern,是一种对于每个命令或请求,都分配一个线程,由这个线程执行工作.它将“委托消息的一端”和“执行消息的一端”用两个不同的线程来实现.该线程模式主要包括三个部分: 1,Request参与者(委托人),也就是消息发送端或者命令请求端 2,Host参与者,接受消息的请求,负责为每个消息分配一个工作线程. 3,Worker参与者,具体执行Request参与者的任务的线程,由Host参与者来启动. 由于常规调用一个方法后,必须等待该方法完全执行完毕

多线程生产者、消费者模式中,如何停止消费者?多生产者情况下对“毒丸”策略的应用。

生产者.消费者模式是多线程中的经典问题.通过中间的缓冲队列,使得生产者和消费者的速度可以相互调节. 对于比较常见的单生产者.多消费者的情况,主要有以下两种策略: 通过volatile boolean producerDone =false 来标示是否完成.生产者结束后标示为true, 消费者轮询这个变量来决定自己是否退出. 这种方式对producerDone产生比较大的争用,实现起来也有诸多问题需要考虑. 比较经典的"毒丸"策略,生产者结束后,把一个特别的对象:"毒丸&quo

Java多线程设计模式wait和notify机制总结

Java多线程设计模式wait和notify机制总结: wait和notify方法必须写在synchronized方法内,即在调用wait和notify方法前,需先获得对象锁: 调用wait方法则释放锁:wait方法返回后,需获得对象锁才可继续执行下面语句: 多个线程wait时,若另外的线程调用notify方法后,由JVM决定唤醒其中一个线程: 多个线程wait时,若另外的线程调用notifyAll方法,则唤醒所有wait线程,但是只有其中一个线程可以获得对象锁,执行wait下面的语句,其余的等

python 生产者与消费者模式

生产者与消费者模式 1. 队列 先进先出 2. 栈 先进后出 Python的Queue模块中提供了同步的.线程安全的队列类,包括FIFO(先入先出)队列Queue,LIFO(后入先出)队列LifoQueue,和优先级队列PriorityQueue.这些队列都实现了锁原语(可以理解为原子操作,即要么不做,要么就做完),能够在多线程中直接使用.可以使用队列来实现线程间的同步. 用FIFO队列实现上述生产者与消费者问题的代码如下: import threading import time from q