深入理解并发(二)--生产者及消费者

生产者及消费者问题,是线程操作中的一个经典案列。但由于线程运行的不确定性,生产者及消费者可能会产生一些问题:

试想,如果生产者线程向存储数据空间添加了部分信息,但没有添加全部,这时就切换到消费者线程,这时消费者线程将会把已经添加了的部分信息,后上一次的信息混淆了,导致出错。

或者,若生产者放数据,与消费者取数据的速度不匹配,也会出现问题:即可能会出现,生产者放了多条数据,消费者才取了一条,导致数据丢失;或生产者只放了一条数据,但消费者已经取了多条,这会导致重复取出数据。

举例说明:


class Info {
    private String name = null;
    private String gender = null;
    private String school = null;

    public String getName() {
        return name;
    }
    public void setName(String name) {
        this.name = name;
    }
    public String getGender() {
        return gender;
    }
    public void setGender(String gender) {
        this.gender = gender;
    }
    public String getSchool() {
        return school;
    }
    public void setSchool(String school) {
        this.school = school;
    }
    @Override
    public String toString() {
        return "Info [name=" + name + ", gender=" + gender + ", school="
                + school + "]";
    }

}
class Produ implements Runnable {
    private Info info = null;
    public Produ(Info info) {
        this.info = info;
    }
    @Override
    public void run() {
        try {

            for (int i = 0; i < 10; i++) {
                if (i % 2 == 1) {
                    this.info.setName("qiu");
                    Thread.sleep((int)Math.random()*100);//通过加入延时,使问题更加容易显露出来
                    this.info.setSchool("whut");
                    this.info.setGender("boy");
                } else {
                    this.info.setName("cai");
                    Thread.sleep((int)Math.random()*100);
                    this.info.setSchool("scut");
                    this.info.setGender("girl");
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }

    }
}

class Consum implements Runnable{
    private Info info;
    public Consum(Info info) {
        this.info = info;
    }
    @Override
    public void run() {
        try {
            for (int i = 0; i < 10; i++) {
                System.out.println(info);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

public class ProducerAndConsume {
    public static void main(String args[]){
        Info info  = new Info();
        Produ producer = new Produ(info);
        Consum consumer = new Consum(info);
        new Thread(producer).start();
        new Thread(consumer).start();
    }
}

运行结果(由于线程的不确定性,所以每次运行的结果每次都不一样,这里就只粘一次运行结果):

Info [name=cai, gender=null, school=null]
Info [name=cai, gender=null, school=null]
Info [name=cai, gender=null, school=null]
Info [name=cai, gender=null, school=null]
Info [name=qiu, gender=girl, school=scut]
Info [name=qiu, gender=boy, school=whut]
Info [name=qiu, gender=boy, school=whut]
Info [name=qiu, gender=boy, school=whut]
Info [name=qiu, gender=boy, school=whut]
Info [name=qiu, gender=boy, school=whut]

从上述例子来看,之前提到的问题在结果上已经展现出来了。

解决方法1:

通过加入线程等待与唤醒,wait()和notify()(或者notifyAll),但实现过程是麻烦,并且会增加类与类之间的耦合,因此不推荐使用。这里也不多加介绍了。

解决方法2:

通过阻塞队列来解决这种任务协作问题。

在java.util.concurrent.BlockingQueue接口提供了阻塞队列。实现类分别有ArrayBlockingQueue,LinkedBlockingQueue,DelayQueue,PriorityBlockingQueue,SynchronousQueue等,

有了阻塞队列就不用担心本文一开始所担心的问题了。因为若队列为空,而此时消费者想从队列中获取对象,此时,阻塞队列会挂起消费者任务,等到有更多元素可用时才恢复消费者任务。更不用担心若消费者取出的速度小于生产者放入的速度的情况,因为生产者所生产的都会存入队列里等待消费者取出。

下面使用LinkedBlockingQueue阻塞队列来修改上面的例子。(其实可以使用ArrayBlockingQueue,两者的区别只是后者初始化时已经确定了队列的长度,即具有固定尺寸的,而前者是无界队列.)

在上述例子添加阻塞队列便可以解决上述两点问题了,代码如下:

import java.util.concurrent.LinkedBlockingQueue;

class Info {
    private String name = null;
    private String gender = null;
    private String school = null;

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public String getGender() {
        return gender;
    }

    public void setGender(String gender) {
        this.gender = gender;
    }

    public String getSchool() {
        return school;
    }

    public void setSchool(String school) {
        this.school = school;
    }

    @Override
    public String toString() {
        return "Info [name=" + name + ", gender=" + gender + ", school="
                + school + "]";
    }
}

class Produ implements Runnable {
    private Info info = null;
    private LinkedBlockingQueue<Info> infoQueue;
    public Produ(LinkedBlockingQueue<Info> infoQueue) {
        this.infoQueue = infoQueue;
    }
    public void run() {
        try {
            for (int i = 0; i < 10; i++) {
                info = new Info();
                if (i % 2 == 1) {
                    info.setName("qiu");
                    Thread.sleep((int)Math.random()*100);
                    info.setSchool("whut");
                    info.setGender("boy");
                } else {
                    info.setName("cai");
                    Thread.sleep((int)Math.random()*100);
                    info.setSchool("scut");
                    info.setGender("girl");
                }
                infoQueue.put(info);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

class Consum implements Runnable {
    private Info info;
    private LinkedBlockingQueue<Info> infoQueue;
    public Consum(LinkedBlockingQueue<Info> infoQueue) {
        this.infoQueue = infoQueue;
    }
    public void run() {
        try {
            while((info = infoQueue.take())!=null){
                System.out.println(info);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

public class ProducerAndConsume {
    public static LinkedBlockingQueue<Info> infoQueue = new LinkedBlockingQueue<Info>();
    public static void main(String args[]) {
        Produ producer = new Produ(infoQueue);
        Consum consumer = new Consum(infoQueue);
        new Thread(producer).start();
        new Thread(consumer).start();
    }
}

运行结果:

Info [name=cai, gender=girl, school=scut]
Info [name=qiu, gender=boy, school=whut]
Info [name=cai, gender=girl, school=scut]
Info [name=qiu, gender=boy, school=whut]
Info [name=cai, gender=girl, school=scut]
Info [name=qiu, gender=boy, school=whut]
Info [name=cai, gender=girl, school=scut]
Info [name=qiu, gender=boy, school=whut]
Info [name=cai, gender=girl, school=scut]
Info [name=qiu, gender=boy, school=whut]

从运行结果可以看出,通过添加阻塞队列确实可以解决生产者及消费者之间由于效率不匹配所导致的问题。

因此通过引入阻塞队列,不需要像引入wait()与notifyAll()来增加类与类之间的耦合来处理生产者与消费者效率不统一的问题,仅仅在每个类里面与阻塞队列进行通信即可。

时间: 2024-10-12 18:12:20

深入理解并发(二)--生产者及消费者的相关文章

7.2.6 - 并发多线程 生产者,消费者

一 生产者消费者模型介绍 为什么要使用生产者消费者模型 生产者指的是生产数据的任务,消费者指的是处理数据的任务,在并发编程中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据.同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者.为了解决这个问题于是引入了生产者和消费者模式. 什么是生产者和消费者模式 生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题.生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生

Java并发协作——生产者、消费者模型

概述 对于多线程程序来说,生产者和消费者模型是非常经典的模型.更加准确的说,应该叫"生产者-消费者-仓库模型".离开了仓库,生产者.消费者就缺少了共用的存储空间,也就不存在并非协作的问题了. 示例 定义一个场景.一个仓库只允许存放10件商品,生产者每次可以向其中放入一个商品,消费者可以每次从其中取出一个商品.同时,需要注意以下4点: 1.  同一时间内只能有一个生产者生产,生产方法需要加锁synchronized. 2.  同一时间内只能有一个消费者消费,消费方法需要加锁synchro

ZooKeeper典型应用(二) 生产者与消费者

In this tutorial, we show simple implementations of barriers and producer-consumer queues using ZooKeeper. We call the respective classes Barrier and Queue. These examples assume that you have at least one ZooKeeper server running. 本文将告诉你如何使用 Zookeep

生产者与消费者优点

在实际的软件开发过程中,经常会碰到如下场景:某个模块负责产生数据,这些数据由另一个模块来负责处理(此处的模块是广义的,可以是类.函数.线程.进程等).产生数据的模块,就形象地称为生产者:而处理数据的模块,就称为消费者. 单单抽象出生产者和消费者,还够不上是生产者/消费者模式.该模式还需要有一个缓冲区处于生产者和消费者之间,作为一个中介.生产者把数据放入缓冲区,而消费者从缓冲区取出数据. ◇解耦 假设生产者和消费者分别是两个类.如果让生产者直接调用消费者的某个方法,那么生产者对于消费者就会产生依赖

并发无锁队列学习之二【单生产者单消费者】

1.前言 最近工作比较忙,加班较多,每天晚上回到家10点多了.我不知道自己还能坚持多久,既然选择了就要做到最好.写博客的少了.总觉得少了点什么,需要继续学习.今天继续上个开篇写,介绍单生产者单消费者模型的队列.根据写入队列的内容是定长还是变长,分为单生产者单消费者定长队列和单生产者单消费者变长队列两种.单生产者单消费者模型的队列操作过程是不需要进行加锁的.生产者通过写索引控制入队操作,消费者通过读索引控制出队列操作.二者相互之间对索引是独享,不存在竞争关系.如下图所示: 2.单生产者单消费者定长

JAVA基础再回首(二十五)——Lock锁的使用、死锁问题、多线程生产者和消费者、线程池、匿名内部类使用多线程、定时器、面试题

JAVA基础再回首(二十五)--Lock锁的使用.死锁问题.多线程生产者和消费者.线程池.匿名内部类使用多线程.定时器.面试题 版权声明:转载必须注明本文转自程序员杜鹏程的博客:http://blog.csdn.net/m366917 我们来继续学习多线程 Lock锁的使用 虽然我们可以理解同步代码块和同步方法的锁对象问题,但是我们并没有直接看到在哪里加上了锁,在哪里释放了锁,为了更清晰的表达如何加锁和释放锁,JDK5以后提供了一个新的锁对象Lock Lock void lock():获取锁 v

使用JUC并发工具包的Lock和Condition,实现生产者和消费者问题中的有界缓存

JDK5.0之前,用java实现生产者和消费者的唯一方式就是使用synchronized内置锁和wait/notify条件通知机制.JDK5.0之后提供了显示锁Lock和条件队列Condition,与内置锁和内置条件队列相对应,但是显示的锁和条件队列,功能更强大,更灵活.此外JDK5.0之后还提供了大量很有用的并发工具类,如BlockingQueue等,基于这些数据结构,能够方便.快速.高效的构建自己应用需要的效果.这里我们简单使用下显示锁和条件队列,来模拟有界缓存的实现,功能类似于JDK内置的

11.9-全栈Java笔记: 线程并发协作(生产者/消费者模式)

多线程环境下,我们经常需要多个线程的并发和协作.这个时候,就需要了解一个重要的多线程并发协作模型"生产者消费者模式". 什么是生产者? 生产者指的是负责生产数据的模块(这里模块可能是:方法.对象.线程.进程). 什么是消费者? 消费者指的是负责处理数据的模块(这里模块可能是:方法.对象.线程.进程). 什么是缓冲区? 消费者不能直接使用生产者的数据,它们之间有个"缓冲区".生产者将生产好的数据放入"缓冲区",消费者从"缓冲区"

Java线程:并发协作-生产者消费者模型

对于多线程程序来说,不管任何编程语言,生产者消费者模型都是最经典的. 实际上,准确的说应该是"生产者-消费者-仓储"模型,离开了仓储,生产者消费者模型就显得没有说服力了. 对于此模型,应该明确以下几点: 生产者仅仅在仓储未满时候生产,仓满则停止生产. 消费者仅仅在仓储有产品时候才能消费,仓空则等待. 当消费者发现仓储没有产品的时候会通知生产者生产. 生产者在生产出可消费产品时候,应该通知消费者去消费. 此模型将要结合java.lang.Object的wait与notify,notify