Java多线程-----实现生产者消费者模式的几种方式

   1 生产者消费者模式概述

生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,

直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个阻塞队列就是用来给生产者和消费者解耦的。

在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,

才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这种生产消费能力不均衡的问题,所以便有了生产者和消费者模式

   2 实现生产者消费者模式

   产品类

package com.thread.pc.blockingqueue;

import java.util.UUID;

/**
 * 产品类
 *
 * @author yyx 2018年12月22日
 */
public class Product {
    private UUID proCode; // 产品唯一编码

    public Product(UUID proCode) {
        super();
        this.proCode = proCode;
    }

    public UUID getProCode() {
        return proCode;
    }

    public void setProCode(UUID proCode) {
        this.proCode = proCode;
    }

}

   生产者

package com.thread.pc.blockingqueue;

/**
 * 生产者
 * @author yyx 2018年12月22日
 */
public class Producer implements Runnable {
    private Warehouse warehouse;

    public Producer(Warehouse warehouse) {
        super();
        this.warehouse = warehouse;
    }

    @Override
    public void run() {
        while (true) {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            warehouse.addProduct();
        }
    }

}

   消费者

package com.thread.pc.blockingqueue;
/**
 * 消费者
 * @author yyx 2018年12月22日
 */
public class Consumer implements Runnable {
    private Warehouse warehouse;

    public Consumer(Warehouse warehouse) {
        super();
        this.warehouse = warehouse;
    }

    @Override
    public void run() {
        while (true) {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            warehouse.removeProduct();
        }
    }
}

2.1 使用lock、condition和await、singalAll

   仓库类

package com.thread.pc.lockcondition;

import java.util.List;
import java.util.UUID;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

/**
 * 仓库类
 *
 * @author yyx 2018年12月22日
 */
public class Warehouse {
    private final int MAX_SIZE = 10;
    private List<Product> listProduct;
    private Lock lock;
    private Condition producerCondition;
    private Condition consumerCondition;

    public Warehouse(List<Product> listProduct, Lock lock, Condition producerCondition, Condition consumerCondition) {
        super();
        this.listProduct = listProduct;
        this.lock = lock;
        this.producerCondition = producerCondition;
        this.consumerCondition = consumerCondition;
    }

    public void addProduct() {
        lock.lock();
        try {
            String currentName = Thread.currentThread().getName();
            if (listProduct.size() >= MAX_SIZE) {
                try {
                    System.out.println("产品列表已满,不再生产!" + currentName + "进入等待");
                    producerCondition.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            } else {
                Product product = new Product(UUID.randomUUID());
                System.out.println(currentName + "生产了一个产品,它的编号是:" + product.getProCode().toString());
                listProduct.add(product);
                consumerCondition.signalAll();
            }
        } finally {
            lock.unlock();
        }
    }

    public void removeProduct() {
        lock.lock();
        try {
            String currentName = Thread.currentThread().getName();
            if (listProduct.size() <= 0) {
                try {
                    System.out.println("产品列表不足,不再消费!" + currentName + "进入等待");
                    consumerCondition.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            } else {
                Product product = listProduct.get(0);
                System.out.println(currentName + "消费了一个产品,它的编号是:" + product.getProCode().toString());
                listProduct.remove(0);
                producerCondition.signalAll();
            }
        } finally {
            lock.unlock();
        }
    }
}

   测试类

package com.thread.pc.lockcondition;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
 * 测试类
 * @author 2018年12月22日
 */
public class TestModel {
    public static void main(String[] args) {
        List<Product> listProduct=new ArrayList<Product>();
        Lock lock = new ReentrantLock();
        Condition producerCondition = lock.newCondition();
        Condition consumerCondition = lock.newCondition();

        Warehouse warehouse = new Warehouse(listProduct,lock, producerCondition, consumerCondition);
        Producer producer = new Producer(warehouse);
        Consumer consumer = new Consumer(warehouse);

        new Thread(producer).start();
        new Thread(producer).start();
        new Thread(consumer).start();
    }
}

2.2 使用synchronized修饰方法

   仓库类

package com.thread.pc.synchronizedmethod;

import java.util.List;
import java.util.UUID;

/**
 * 仓库类
 *
 * @author yyx 2018年12月22日
 */
public class Warehouse {
    private final int MAX_SIZE = 10;
    private List<Product> listProduct;

    public Warehouse(List<Product> listProduct) {
        super();
        this.listProduct = listProduct;
    }

    public void addProduct() {
        synchronized (listProduct) {
            String currentName = Thread.currentThread().getName();
            if (listProduct.size() >= MAX_SIZE) {
                try {
                    System.out.println("产品列表已满,不再生产!" + currentName + "进入等待");
                    listProduct.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            } else {
                Product product = new Product(UUID.randomUUID());
                System.out.println(currentName + "生产了一个产品,它的编号是:" + product.getProCode().toString());
                listProduct.add(product);
                listProduct.notifyAll();
            }
        }
    }

    public void removeProduct() {
        synchronized (listProduct) {
            String currentName = Thread.currentThread().getName();
            if (listProduct.size() <= 0) {
                try {
                    System.out.println("产品列表不足,不再消费!" + currentName + "进入等待");
                    listProduct.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            } else {
                Product product = listProduct.get(0);
                System.out.println(currentName + "消费了一个产品,它的编号是:" + product.getProCode().toString());
                listProduct.remove(0);
                listProduct.notifyAll();
            }
        }
    }
}

   测试类

package com.thread.pc.synchronizedmethod;

import java.util.ArrayList;
import java.util.List;

/**
 * 测试类
 *
 * @author yyx 2018年12月22日
 */
public class TestModel {
    public static void main(String[] args) {
        List<Product> listProduct = new ArrayList<Product>();

        Warehouse warehouse = new Warehouse(listProduct);
        Producer producer = new Producer(warehouse);
        Consumer consumer = new Consumer(warehouse);

        new Thread(producer).start();
        new Thread(producer).start();
        new Thread(consumer).start();
        new Thread(consumer).start();
    }
}

2.3 使用synchronized修饰代码块

   仓库类

package com.thread.pc.synchronizedcodeblock;

import java.util.List;
import java.util.UUID;
/**
 * 仓库类
 * @author yyx 2018年12月22日
 */
public class Warehouse {
    private final int MAX_SIZE = 10;
    private List<Product> listProduct;

    public Warehouse(List<Product> listProduct) {
        super();
        this.listProduct = listProduct;
    }

    public synchronized void addProduct() {
        String currentName = Thread.currentThread().getName();
        if (listProduct.size() >= MAX_SIZE) {
            try {
                System.out.println("产品列表已满,不再生产!" + currentName + "进入等待");
                wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        } else {
            Product product = new Product(UUID.randomUUID());
            System.out.println(currentName + "生产了一个产品,它的编号是:" + product.getProCode().toString());
            listProduct.add(product);
            notifyAll();
        }
    }

    public synchronized void removeProduct() {
        String currentName = Thread.currentThread().getName();
        if (listProduct.size() <= 0) {
            try {
                System.out.println("产品列表不足,不再消费!" + currentName + "进入等待");
                wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        } else {
            Product product = listProduct.get(0);
            System.out.println(currentName + "消费了一个产品,它的编号是:" + product.getProCode().toString());
            listProduct.remove(0);
            notifyAll();
        }
    }
}

   测试类

package com.thread.pc.synchronizedcodeblock;

import java.util.ArrayList;
import java.util.List;
/**
 * 测试类
 * @author yyx 2018年12月22日
 */
public class TestModel {
    public static void main(String[] args) {
        List<Product> listProduct=new ArrayList<Product>();

        Warehouse warehouse = new Warehouse(listProduct);
        Producer producer = new Producer(warehouse);
        Consumer consumer = new Consumer(warehouse);

        new Thread(producer).start();
        new Thread(producer).start();
        new Thread(consumer).start();
    }
}

2.4 使用BlockingQueue

   仓库类

package com.thread.pc.blockingqueue;

import java.util.UUID;
import java.util.concurrent.BlockingQueue;
/**
 * 仓库
 * @author yyx 2018年12月22日
 */
public class Warehouse {
    private final int MAX_SIZE = 10;
    private BlockingQueue<Product> blockingQueue;

    public Warehouse(BlockingQueue<Product> blockingQueue) {
        super();
        this.blockingQueue = blockingQueue;
    }

    public void addProduct() {
        String currentName = Thread.currentThread().getName();
        if (blockingQueue.size() >= MAX_SIZE) {
            System.out.println("产品列表已满,不再生产!" + currentName + "进入等待");
        } else {
            Product product = new Product(UUID.randomUUID());
            System.out.println(currentName + "生产了一个产品,它的编号是:" + product.getProCode().toString());
            try {
                blockingQueue.put(product);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public void removeProduct() {
        String currentName = Thread.currentThread().getName();
        if (blockingQueue.size() <= 0) {
            System.out.println("产品列表不足,不再消费!" + currentName + "进入等待");
        } else {
            try {
                Product product = blockingQueue.take();
                System.out.println(currentName + "消费了一个产品,它的编号是:" + product.getProCode().toString());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

   测试类

package com.thread.pc.blockingqueue;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
/**
 * 测试类
 * @author yyx 2018年12月22日
 */
public class TestModel {
    public static void main(String[] args) {
        BlockingQueue<Product> blockingQueue = new LinkedBlockingQueue<>(10);

        Warehouse warehouse = new Warehouse(blockingQueue);
        Producer producer = new Producer(warehouse);
        Consumer consumer = new Consumer(warehouse);

        new Thread(producer).start();
        new Thread(producer).start();
        new Thread(consumer).start();
    }
}

原文地址:https://www.cnblogs.com/fengfuwanliu/p/10147953.html

时间: 2024-08-06 14:49:53

Java多线程-----实现生产者消费者模式的几种方式的相关文章

JAVA多线程之生产者消费者模式

一.什么是生产者消费者模式? 生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题.生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力. 二.为什么要使用生产者和消费者模式 在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程.在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产

[Java] 多线程下生产者消费者问题的五种同步方法实现

生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题. 生产者消费者模式的优点 - 解耦 - 支持并发 - 支持忙闲不均 解决方法可分为两类: (1)用信号量和锁机制实现生产者和消费者之间的同步: - wait() / notify()方法 - await() / signal()方法 - BlockingQueue阻塞队列方法 - Semaphore方法 (2)在生产者和消费者之间建立一个管道.(一般不使用,缓冲区不易控制.数据不易封装和传输) - PipedInputStream

Java多线程_生产者消费者模式1

生产者消费者模型       具体来讲,就是在一个系统中,存在生产者和消费者两种角色,他们通过内存缓冲区进行通信,生产者生产消费者需要的资料,消费者把资料做成产品.生产消费者模式如下图.(图片来自网络,侵删!) 生产者消费者模型的实现 生产者是一堆线程,消费者是另一堆线程,内存缓冲区可以使用List数组队列,数据类型只需要定义一个简单的类就好.关键是如何处理多线程之间的协作.这其实也是多线程通信的一个范例. 在这个模型中,最关键就是内存缓冲区为空的时候消费者必须等待,而内存缓冲区满的时候,生产者

JAVA多线程之生产者消费者

生产者消费者并发编程: 假设仓库有10个仓位,分别有10个生产者和10个消费者,生产者不断生产产品,放入仓库的仓位中,而消费者则不断从仓库中获取产品, 如果仓库已满,则生产者要等待,等消费者消费后,空出仓位后,再继续放入产品. 反之如果仓库已空,则消费者要等待,等待生产者生产出产品后,再继续消费产品. 关于生产者.消费者有四种实现方式 1,wait,nofity方式 2,ReentrantLock锁的await()和signal() 3,阻塞队列的方式 4,Semaphore 信号量方式 下面分

java多线程解决生产者消费者问题

import java.util.ArrayList; import java.util.List; /** * Created by ccc on 16-4-27. */ public class Test { public static void main(String[] args) { GunClip clip = new GunClip(); Producer p = new Producer(clip); customer c = new customer(clip); p.star

java多线程模拟生产者消费者问题,公司面试常常问的题。。。

package com.cn.test3; //java多线程模拟生产者消费者问题 //ProducerConsumer是主类,Producer生产者,Consumer消费者,Product产品 //Storage仓库 //批注:我把输出结果写在程序以下了,你能够看一下,事实上非常easy的,你想象一下产品从生产,到取出的一个生产线,我们定义两个线程,生产者线程,和消费者线程,一个是生产者不停的生产产品并放入数量有限的指定槽内,而消费者从指定槽依次取出产品,现实中的流水车间也相似于此. publ

java 多线程并发系列之 生产者消费者模式的两种实现

在并发编程中使用生产者和消费者模式能够解决绝大多数并发问题.该模式通过平衡生产线程和消费线程的工作能力来提高程序的整体处理数据的速度. 为什么要使用生产者和消费者模式 在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程.在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据.同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者.为了解决这种生产消费能力不均衡的问题,所以便有了生产者和消费者模式. 什么是生

多线程之生产者消费者模式

最近在项目中需要使用使用多线程实现一种功能,和生产者消费者模式类似,因此,学习了下生产者消费者模式的多线程实现.在生产者消费者模式中,通常有两类线程, 即若干个生产者线程和若干个消费者线程.生产者线程负责提交用户请求,消费者线程则负责处理生产者提交的任务.生产者和消费者之间则通过共享内存缓冲区进行通信. 在这里我们选择BlockingQueue做为共享内存缓冲区. 首先,我们构建生产者生产的,和消费者需要处理的数据PCData,即相关任务数据. public class PCData { pri

java设计模式之生产者/消费者模式

什么是生产者/消费者模式? 某个模块负责产生数据,这些数据由另一个模块来负责处理(此处的模块是广义的,可以是类.函数.线程.进程等).产生数据的模块,就形象地称为生产者:而处理数据的模块,就称为消费者.在生产者与消费者之间在加个缓冲区,我们形象的称之为仓库,生产者负责往仓库了进商品,而消费者负责从仓库里拿商品,这就构成了生产者消费者模式.结构图如下: 生产者消费者模式有如下几个优点: 1.解耦   由于有缓冲区的存在,生产者和消费者之间不直接依赖,耦合度降低. 2.支持并发   由于生产者与消费