多线程生产者消费者问题处理

一、比较低级的办法是用wait和notify来解决这个问题。

消费者生产者问题:

这个问题是一个多线程同步问题的经典案例,生产者负责生产对象,消费者负责将生成者产生的对象取出,两者不断重复此过程。这过程需要注意几个问题:

不论生产者和消费者有几个,必须保证:

1.生产者每次产出的对象必须不一样,产生的对象有且仅有出现一次;

2.消费者每次取出的对象必须不一样,取出的对象有且仅有出现一次;

3.一定是先产生该对象,然后对象才能被取出,顺序不能乱;

第一种情况:
多个生产者轮流负责生产,多个消费者负责取出。一旦生产者产生一个对象,其他生产者不能生产,只能由消费者执行取出操作;

需要的对象有商品类、消费者、生产者;

//测试类
public class ProducerConsumer {

    public static void main(String[] args) {
        // 定义资源对象
        Resource r = new Resource();

        //定义一个生产者和一个消费者
        Producer p = new Producer(r);
        Consumer c = new Consumer(r);

        //启动四个线程,2个负责生产者,两个消费者
        Thread t1 = new Thread(p);
        Thread t2 = new Thread(p);
        Thread t3 = new Thread(c);
        Thread t4 = new Thread(c);

        t1.start();
        t2.start();
        t3.start();
        t4.start();
    }

}

//商品类
class Resource{
    private String name;
    private int count = 1;
    private  boolean flag = false;

    //产生商品
    public synchronized void set(String name) {
        while (flag) {
            try {
                this.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        this.name = name + "---" + count++;
        System.out.println(Thread.currentThread().getName() + " 生产者" + this.name);
        flag = true;
        //唤醒所有线程
        this.notifyAll();

    }
    //取出商品
    public synchronized void out() {
            while (!flag) {
                try {
                    wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            System.out.println(Thread.currentThread().getName() + " 消费者________" + this.name);
            flag = false;
            this.notifyAll();
    }
}

//定义生产者
class Producer implements Runnable{

    private Resource res;

    public Producer(Resource res) {
        this.res = res;
    }

    @Override
    public void run() {
        while (true) {
            res.set("+商品+");
        }
    }
}

//定义消费者
class Consumer implements Runnable{

    private Resource res;

    Consumer(Resource res) {
        this.res = res;
    }

    @Override
    public void run() {
        while (true) {
            res.out();
        }
    }
}

  

运行结果是产生一个,随即取出一个,循环往复,其运行结果的部分如下:

Thread-2 消费者________+商品+---67821
Thread-1 生产者+商品+---67822
Thread-3 消费者________+商品+---67822
Thread-0 生产者+商品+---67823
Thread-2 消费者________+商品+---67823
Thread-1 生产者+商品+---67824
Thread-3 消费者________+商品+---67824
Thread-0 生产者+商品+---67825
Thread-2 消费者________+商品+---67825
Thread-1 生产者+商品+---67826
Thread-3 消费者________+商品+---67826
Thread-0 生产者+商品+---67827
Thread-2 消费者________+商品+---67827
Thread-1 生产者+商品+---67828
Thread-3 消费者________+商品+---67828
Thread-0 生产者+商品+---67829
Thread-2 消费者________+商品+---67829
Thread-1 生产者+商品+---67830
Thread-3 消费者________+商品+---67830
Thread-0 生产者+商品+---67831
Thread-2 消费者________+商品+---67831
Thread-1 生产者+商品+---67832

  

第二种情况:
目标:生产者与消费者轮换着抢夺执行权,但是生产者最多可以库存5个,消费者最多可以连续取出5个

此时需要定义一种中间对象:仓库类。该类是生产者和消费者共享的一块区域,里面数据类型选择链表结果存放产生的对象。仓库是有容量上限的,当数量达到上限后,生产者不允许继续生产产品.当前线程进入等待状态,等待其他线程唤醒。当仓库没有产品时,消费者不允许继续消费,当前线程进入等待状态,等待其他线程唤醒。

第一种解决方式,采用同步代码块(synchronized),结合着 wait() 和 notifyAll() 的方法,具体代码如下:

package Thread;
/**
 * 2个消费者,3个生产者
 */

import java.util.LinkedList;

public class ProConThreadDemo {
    public static void main(String[] args) {
        Respository res = new Respository();

        //定义2个消费者,3个生产者
        Worker p1 = new Worker(res,"手机");
        Worker p2 = new Worker(res,"电脑");
        Worker p3 = new Worker(res,"鼠标");
        Constomer c1 = new Constomer(res);
        Constomer c2 = new Constomer(res);

        Thread t1 = new Thread(p1,"甲");
        Thread t2 = new Thread(p2,"乙");
        Thread t3 = new Thread(p3,"丙");
        Thread t4 = new Thread(c1,"aaa");
        Thread t5 = new Thread(c2,"bbb");

        t1.start();
        t2.start();
        t3.start();
        t4.start();
        t5.start();

    }

}
//仓库类
class Respository{

    private LinkedList<Product> store = new LinkedList<Product>();

    //生产者的方法,用于向仓库存货
    //最多只能有一个线程同时访问该方法.
    public synchronized void push(Product p,String ThreadName){
        //设置仓库库存最多能存5个商品
        /* 仓库容量最大值为5,当容量等于5的时候进入等待状态.等待其他线程唤醒
         * 唤醒后继续循环,等到仓库的存量小于5时,跳出循环继续向下执行准备生产产品.
         */
        while (store.size()==5){
            try {
                System.out.println(ThreadName+" 发现:仓库已满,赶紧叫人运走");
                //因为仓库容量已满,无法继续生产,进入等待状态,等待其他线程唤醒.
                this.wait();
            }catch (InterruptedException e){
                e.printStackTrace();
            }
        }
        this.notifyAll();
        store.addLast(p);
        System.out.println(ThreadName+" 给仓库添加 "+p.Name+p.Id+"号名称为 "+" 当前库存量为:"+store.size());
        //为了方便观察运行结果,每次生产完后等待0.1秒
        try {
            Thread.sleep(100);
        }catch (InterruptedException e){
            e.printStackTrace();
        }

    }

    //消费者的方法,用于仓库出货
    //最多只能有一个线程同时访问该方法.
    public synchronized void pop(String ThreadName){
        /* 当仓库没有存货时,消费者需要进行等待.等待其他线程来唤醒
         * 唤醒后继续循环,等到仓库的存量大于0时,跳出循环继续向下执行准备消费产品.
         */
        while (store.size()==0){
            try {
                System.out.println(ThreadName+" 发现:仓库空了,赶紧安排生产");
                //因为仓库容量已空,无法继续消费,进入等待状态,等待其他线程唤醒.
                this.wait();
            }catch (InterruptedException e){
                e.printStackTrace();
            }
        }
        this.notifyAll();
        //定义对象。存放pollFirst()方法删除的对象,
        Product p = store.pollFirst();
        System.out.println(ThreadName+"买走 "+p.Name+p.Id+" 当前库存量为:"+store.size());
        //为了方便观察运行结果,每次取出后等待0.1秒
        try {
            Thread.sleep(100);
        }catch (InterruptedException e){
            e.printStackTrace();
        }

    }

}

//产品类
class Product{
    //产品的唯一标识Id
    public int Id;
    //产品的名称
    public String Name;

    public Product(String name, int id) {
        Name = name;
        Id = id;
    }

}

//生产者
class Worker implements Runnable{
    //关键字volatile 是为了保持 Id 的可见性,一旦Id被修改,其他任何线程用到Id的地方,都会相应修改
    //否则下方run方法容易出问题,生产商品的Id和名称 与到时候消费者取出商品的Id和名称不一致
    public volatile Integer Id = 0;

    public volatile String name;

    //引用一个产品
    private Product p;
    //引用一个仓库
    Respository res;

    boolean flag = true;

    public Worker(Respository res,String name) {
        this.res = res;
        this.name = name;
    }

    @Override
    public void run() {
        while (flag){
            p  = new Product(name,Id);
            res.push(new Product(this.p.Name,Id++),Thread.currentThread().getName());
        }
    }
}

class Constomer implements Runnable{
    boolean flag = true;

    //引用一个仓库
    Respository res;

    public Constomer(Respository res) {
        this.res = res;
    }

    @Override
    public void run() {
        while (flag) {

            res.pop(Thread.currentThread().getName());

        }

    }
}

  

运行结果如下,可见仓库最多库存为5个,接近于实际生产

aaa 发现:仓库空了,赶紧安排生产
乙 给仓库添加 电脑0号名称为  当前库存量为:1
丙 给仓库添加 鼠标0号名称为  当前库存量为:2
甲 给仓库添加 手机0号名称为  当前库存量为:3
bbb买走 电脑0 当前库存量为:2
bbb买走 鼠标0 当前库存量为:1
甲 给仓库添加 手机1号名称为  当前库存量为:2
丙 给仓库添加 鼠标1号名称为  当前库存量为:3
乙 给仓库添加 电脑1号名称为  当前库存量为:4
aaa买走 手机0 当前库存量为:3
aaa买走 手机1 当前库存量为:2
aaa买走 鼠标1 当前库存量为:1
aaa买走 电脑1 当前库存量为:0
aaa 发现:仓库空了,赶紧安排生产
乙 给仓库添加 电脑2号名称为  当前库存量为:1
丙 给仓库添加 鼠标2号名称为  当前库存量为:2
甲 给仓库添加 手机2号名称为  当前库存量为:3
bbb买走 电脑2 当前库存量为:2
bbb买走 鼠标2 当前库存量为:1
甲 给仓库添加 手机3号名称为  当前库存量为:2
丙 给仓库添加 鼠标3号名称为  当前库存量为:3
乙 给仓库添加 电脑3号名称为  当前库存量为:4
aaa买走 手机2 当前库存量为:3
乙 给仓库添加 电脑4号名称为  当前库存量为:4
乙 给仓库添加 电脑5号名称为  当前库存量为:5

  

第二种方法,利用 lock类 替代 synchronized的使用,这样可以优化代码,主要是在唤醒的时候可以根据条件去唤醒指定的某些线程。例如:当库存为空的时候,第一种方法是唤醒所有等待的线程,也包括取出的线程;而此时lock类 可以设置在库存为空的时候,只唤醒生产线程,取出的线程依旧处于等待状态,具体代码如下:

package Thread;

import java.util.LinkedList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class ProConThreadPool {

    public static void main(String[] args) {

       Respository res = new Respository();

       Worker p1 = new Worker(res,"手机");
       Worker p2 = new Worker(res,"电脑");
       Worker p3 = new Worker(res,"鼠标");

       Constomer c1 = new Constomer(res);
       Constomer c2 = new Constomer(res);

       Thread t1 = new Thread(p1,"甲");
       Thread t2 = new Thread(p2,"乙");
       Thread t3 = new Thread(p3,"丙");
       Thread t4 = new Thread(c1,"aaa");
       Thread t5 = new Thread(c2,"bbb");

       t1.start();
       t2.start();
       t3.start();
       t4.start();
       t5.start();

    }

}
//仓库类
class Respository{

    private Lock lock = new ReentrantLock();

    private LinkedList<Product> store = new LinkedList<Product>();

    private Condition condition_pro = lock.newCondition();
    private Condition condition_con = lock.newCondition();

    public LinkedList<Product> getStore() {
        return store;
    }

    public void setStore(LinkedList<Product> store) {
        this.store = store;
    }
    //向仓库存货
    public  void push(Product p,String ThreadName) throws InterruptedException{
        lock.lock();
        try {
            //设置仓库库存最多能存5个商品
            while (store.size()==5){
                    System.out.println(ThreadName+" 发现:仓库已满,赶紧叫人运走");
                    condition_pro.await();
            }
            condition_con.signalAll();
            store.addLast(p);
            System.out.println(ThreadName+" 给仓库添加 "+p.Name+p.Id+"号名称为 "+" 当前库存量为:"+store.size());

        }finally {
            lock.unlock();
        }
        try {
            Thread.sleep(100);
        }catch (InterruptedException e){
            e.printStackTrace();
        }

    }

    //仓库出货
    public void pop(String ThreadName) throws InterruptedException
    {
        lock.lock();
        try{
            while (store.size()==0){

                    System.out.println(ThreadName+" 发现:仓库空了,赶紧安排生产");
                    condition_con.await();
            }
            condition_pro.signalAll();
            Product p = store.pollFirst();
            System.out.println(ThreadName+"买走 "+p.Name+p.Id+" 当前库存量为:"+store.size());

        }
        finally {
            lock.unlock();
        }
        try {
            Thread.sleep(100);
        }catch (InterruptedException e){
            e.printStackTrace();
        }

    }

}

class Product{
    public int Id;

    public String Name;

    public Product(String name, int id) {
        Name = name;
        Id = id;
    }

}

class Worker implements Runnable{

    public volatile Integer Id = 0;

    public volatile String name;

    //引用一个产品
    private Product p;
    //引用一个仓库
    Respository res;

    boolean flag = true;

    public Worker(Respository res,String name) {
        this.res = res;
        this.name = name;
    }

    @Override
    public void run(){
        while (flag){
                p  = new Product(name,Id);
                try {
                    res.push(new Product(this.p.Name,Id++),Thread.currentThread().getName());
                }catch (InterruptedException e){
                    e.printStackTrace();
                }

            }
    }
}

class Constomer implements Runnable{
    boolean flag = true;

    //引用一个仓库
    Respository res;

    public Constomer(Respository res) {
        this.res = res;
    }

    @Override
    public void run() {
        while (flag) {
            try {
                res.pop(Thread.currentThread().getName());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

    }
}

  

运行结果与上面类似:

aaa 发现:仓库空了,赶紧安排生产
bbb 发现:仓库空了,赶紧安排生产
丙 给仓库添加 鼠标0号名称为  当前库存量为:1
乙 给仓库添加 电脑0号名称为  当前库存量为:2
甲 给仓库添加 手机0号名称为  当前库存量为:3
aaa买走 鼠标0 当前库存量为:2
bbb买走 电脑0 当前库存量为:1
bbb买走 手机0 当前库存量为:0
乙 给仓库添加 电脑1号名称为  当前库存量为:1
甲 给仓库添加 手机1号名称为  当前库存量为:2
aaa买走 电脑1 当前库存量为:1
丙 给仓库添加 鼠标1号名称为  当前库存量为:2
aaa买走 手机1 当前库存量为:1
甲 给仓库添加 手机2号名称为  当前库存量为:2
乙 给仓库添加 电脑2号名称为  当前库存量为:3
bbb买走 鼠标1 当前库存量为:2
丙 给仓库添加 鼠标2号名称为  当前库存量为:3
aaa买走 手机2 当前库存量为:2
甲 给仓库添加 手机3号名称为  当前库存量为:3
bbb买走 电脑2 当前库存量为:2
乙 给仓库添加 电脑3号名称为  当前库存量为:3
丙 给仓库添加 鼠标3号名称为  当前库存量为:4

二、比较赞的办法是用Semaphore 或者 BlockingQueue来实现生产者消费者模型。

BlockingQueue 是线程安全的,并且在调用 put,take 方法时会阻塞线程。

基于以上特性,可以不加任何锁解决生产者消费者问题。

直接上代码:

public static void main(String[] args) throws InterruptedException {
        BlockingQueue<String> bq = new LinkedBlockingQueue<>(2);

        CountDownLatch cdl = new CountDownLatch(2);

        Thread t1 = new Thread(()->{ // 生产者线程
                try {
                    for (int i = 0; i < 100; i++)
                        bq.put("z" + i);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    cdl.countDown();
                }
        });

        Thread t2 = new Thread(()->{ // 消费者线程
                try {
                    for (int i = 0; i < 100; i++)
                        System.out.println(bq.take());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    cdl.countDown();
                }
        });
        t2.start();
        t1.start();
        cdl.await(); // 等待两个线程结束
        System.out.println(bq.size());

    }

  

参考:https://blog.csdn.net/zjt980452483/article/details/81348668

     https://blog.csdn.net/qq_29697901/article/details/90405141

原文地址:https://www.cnblogs.com/wjqhuaxia/p/11746675.html

时间: 2024-08-28 06:03:21

多线程生产者消费者问题处理的相关文章

多线程-生产者消费者

正解博客:https://blog.csdn.net/u011863767/article/details/59731447 永远在循环(loop)里调用 wait 和 notify,不是在 If 语句 现在你知道wait应该永远在被synchronized的背景下和那个被多线程共享的对象上调用,下一个一定要记住的问题就是,你应该永远在while循环,而不是if语句中调用wait.因为线程是在某些条件下等待的--在我们的例子里,即"如果缓冲区队列是满的话,那么生产者线程应该等待",你可

多线程生产者/消费者模式实现

参考书籍<java多线程编程核心技术> 都是基于wait/notify实现的 一个生产者和一个消费者:操作值 1 package com.qf.test10.pojo; 2 3 /** 4 * @author qf 5 * @create 2018-09-18 15:59 6 */ 7 public class Entity { 8 public static String value = ""; 9 } 1 package com.qf.test10; 2 3 impor

java多线程 生产者消费者模式

package de.bvb; /** * 生产者消费者模式 * 通过 wait() 和 notify() 通信方法实现 * */ public class Test1 { public static void main(String[] args) { Godown godown = new Godown(50); for (int i = 0; i < 5; i++) { new ProducerThread(i * 10, godown).start(); new ConsumerThre

[多线程] 生产者消费者模型的BOOST实现

说明 如果 使用过程中有BUG 一定要告诉我:在下面留言或者给我邮件(sawpara at 126 dot com) 使用boost::thread库来实现生产者消费者模型中的缓冲区! 仓库内最多可以存放 capacity 个产品. 条件变量 condition_put 标记是否可以往仓库中存放一个产品. 条件变量 condition_get 标记是否可以从仓库中取出一个产品. 互斥量 mutexer 用于保证当前仓库只有一个线程拥有主权. 实现 #include <queue> #inclu

多线程生产者消费者模式

闲着没事,写了个生产者消费者模式玩玩,顺便熟悉下notify的用法. package sync; public class Test { public static void main(String[] args) { Test test = new Test(); Producer producer = test.new Producer(); producer.start(); Consumer consumer = test.new Consumer(); consumer.setName

多线程生产者消费者案例

题目需求:写一个生产者消费者容器,支持多个生产者消费者同时访问,容器里最多放十个数,需要get()和put()方法,当容器中没数据时,生产者开始生产数据,消费者等待,数据量满十个时,生产者等待,消费者开始消费. /** * Created by canner on 2018/11/30. */ public class MyContainer<T> { private final int MAX = 10; private final LinkedList<T> list = ne

多线程--生产者/消费者线程模型

//程序演进1 //thread loop,忙等Busy wait //不断的检查是不是该做什么事情了:为了减少CPU占用,sleep睡眠一会 //while (1) //{ // do_something(); // sleep(time); //} //程序演进2 //while (1) //{ // read_form_intput(); // do_something(); //} //程序员演进3,消息模型 //while (waitForMsg) //{ // if (isQuitM

java 多线程-生产者消费者模式-管程法

生产者消费者模式管程法通过容器中介,将数据放入和取出 wait()导致当前线程等待,直到另一个线程调用该对象的notify()或notyfyAll()方法notify()唤醒正在等待对象监视器的单个线程,notifyAll()唤醒正在等待对象监视器的所有线程 public class tuble { public static void main(String[]args) { SynContainer container=new SynContainer(); new Productor(co

java 多线程生产者消费者

class Res { private String name; private int count = 1; private boolean flag; public synchronized void set(String name) { while (flag) { try { this.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } this.name = name + "--" + cou