Java实现生产者和消费者

  生产者和消费者问题是操作系统的经典问题,在实际工作中也常会用到,主要的难点在于协调生产者和消费者,因为生产者的个数和消费者的个数不确定,而生产者的生成速度与消费者的消费速度也不一样,同时还要实现生产者与消费者的解耦,即生产者并不知道有哪些消费者,而消费者也不需要知道产品是哪个生产的,他们之间只与一个交易平台发生关系。

  这是现实世界普遍存在的问题,比如我们去苹果专卖店买IPhone 6,我们属于消费者,而生产商把产品生产出来放在苹果专卖店,如果全世界只有一个苹果专卖店,当专卖店没有IPhone 6时,我们只有等,而当专卖店屯了很多货,以至于专卖店放不下了时,苹果公司比如要生产商暂停生产。生产者与消费者是通过一个缓存仓库来交易的。

  Java里面有LinkedBlockingQueue、ArrayBlockingQueue可以在并发环境实现阻塞插入和删除,非常适合作为生产者和消费者之间的纽带。

  生产者:

/**
 * 生产者
 * @author jiqunpeng
 *
 */
class Producer implements Runnable {

    LinkedBlockingQueue<Integer> buffer;
    //构造生产者,注册仓库
    Producer(LinkedBlockingQueue<Integer> buffer) {
        this.buffer = buffer;
    }
    /**
     * 生产一个产品,当仓库已经满时,等待仓库有空地再放入仓库
     * @param e
     * @throws InterruptedException
     */
    public void produce(Integer e) throws InterruptedException {
        buffer.put(e);
    }

    @Override
    public void run() {
        Random random = new Random(7);
        try {
            while (true) {//一生不息
                Integer product = random.nextInt();
                System.out.println(this + " \tProduct:\t " + product);
                produce(product);
                TimeUnit.MILLISECONDS.sleep(random.nextInt(500));//短暂的休息
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

  消费者  

/**
 * 消费者
 * @author jiqunpeng
 *
 */
class Consumer implements Runnable {
    LinkedBlockingQueue<Integer> buffer;
    //注册仓库
    Consumer(LinkedBlockingQueue<Integer> buffer) {
        this.buffer = buffer;
    }
    /**
     * 从仓库中的取出产品消费,当仓库里面没有产品时,会一直等下去
     * @return
     * @throws InterruptedException
     */
    public Integer consume() throws InterruptedException {
        Integer e = buffer.take();
        return e;
    }

    @Override
    public void run() {
        Random random = new Random(7);
        try {
            while (true) {//一生都要吃
                Integer product = consume();
                System.out.println(this + " \tConsume:\t " + product);
                TimeUnit.MILLISECONDS.sleep(random.nextInt(2000));//吃了也要睡会
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

  调度运行

public class ProducerConsumer {
    public static void main(String[] args) {
        // 任务调度器
        ExecutorService exec = Executors.newFixedThreadPool(10);
        // 仓库
        final LinkedBlockingQueue<Integer> buffer = new LinkedBlockingQueue<>(5);
        for (int i = 0; i < 2; i++) {
            // 创建生产者
            Producer p = new Producer(buffer);
            // 领到把生产者拉到车间,被迫没日没夜的干活
            exec.execute(p);
            // 消费者出生了
            Consumer c = new Consumer(buffer);
            // 消费者一生都在消费
            exec.execute(c);

        }
        exec.execute(new Runnable() {

            @Override
            public void run() {
                while (true) {
                    // 定时看一下仓库的空间
                    System.out.println("buffer :" + buffer.size());
                    try {
                        TimeUnit.SECONDS.sleep(5);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }

        });
    }

}

  模拟结果:

[email protected]     Product:     -1156638823
[email protected]     Consume:     -1156638823
[email protected]     Product:     -1156638823
[email protected]     Consume:     -1156638823
buffer :0
[email protected]     Product:     -1077308326
[email protected]     Product:     -1077308326
[email protected]     Product:     1495978761
[email protected]     Product:     1495978761
[email protected]     Consume:     -1077308326
[email protected]     Consume:     -1077308326
[email protected]     Product:     -441191359
[email protected]     Product:     -441191359
[email protected]     Product:     -1253369595
[email protected]     Product:     -1253369595
[email protected]     Product:     1511462400
[email protected]     Consume:     1495978761
[email protected]     Consume:     1495978761
[email protected]     Product:     1511462400
[email protected]     Product:     518557417

  当然我们也可以自己定义一个线程安全的有界阻塞缓存队列:

public class BoundedBuffer<E> {
    private Object[] buffer;

    final private ReentrantLock lock;
    final private Condition notEmpty;
    final private Condition notFull;

    private int count;
    private int putIndex;
    private int takeIndex;

    public BoundedBuffer(int size) {
        buffer = new Object[size];
        lock = new ReentrantLock();
        notEmpty = lock.newCondition();
        notFull = lock.newCondition();
    }

    public void put(E e) throws InterruptedException {
        lock.lock();
        try {
            while (count == buffer.length)
                notFull.await();
            buffer[putIndex] = e;
            if (++putIndex == buffer.length)// 循环数组
                putIndex = 0;
            count++;
            notEmpty.signal();
        } finally {
            lock.unlock();
        }
    }

    public E take() throws InterruptedException {
        lock.lock();
        System.out.println("take()");
        try {
            while (count == 0)
                notEmpty.await();
            @SuppressWarnings("unchecked")
            E item = (E) buffer[takeIndex];
            count--;
            if (++takeIndex == buffer.length)// 循环数组
                takeIndex = 0;
            notFull.signal();
            return item;
        } finally {
            lock.unlock();
        }
    }
}
时间: 2024-11-08 17:45:07

Java实现生产者和消费者的相关文章

Java多线程--生产者与消费者问题

说明 Java中,线程之间的通信主要是由java.lang.Object类提供的wait.notify和notifyAll这3个方法来完成: ①对象的wait方法被调用后,线程进入对象的等待队列中,并释放对象锁,其它线程可以竞争使用此对象锁:sleep方法使得一个线程进入睡眠状态,但是线程所占有的资源并没有释放. ②当对象的notify方法被调用,该方法会从对象的等待队列中随机取出一个线程来唤醒:notifyAll是唤醒等待队列中所有线程,这些线程会与其它正在执行的线程共同竞争对象锁. ③wai

java线程 生产者与消费者

package org.rui.thread.block; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; /** * 生产者与消费者 * 餐馆 * * @author lenovo * */ public class Restaurant { //Restaurant r=new Restaurant

java之生产者与消费者

package com.produce; import java.util.LinkedList; import java.util.Queue; /*@author shijin * 生产者与消费者模型中,要保证以下几点: * 1 同一时间内只能有一个生产者生产 生产方法加锁sychronized * 2 同一时间内只能有一个消费者消费 消费方法加锁sychronized * 3 生产者生产的同时消费者不能消费 生产方法加锁sychronized * 4 消费者消费的同时生产者不能生产 消费方

java之生产者和消费者问题

package testThread; public class Test3 { public static void main(String[] args) { Clerk c = new Clerk(); //消费时不生产,生产时不消费 //生产者 new Thread(new Runnable() { @Override public void run() { // TODO Auto-generated method stub synchronized (c) { //无限生产 whil

RabbitMQ简单Java示例——生产者和消费者

添加Maven依赖: 使用rabbitmq-client的最新Maven坐标: <!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client --> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.3.0</ver

java多线程:生产者和消费者模式(wait-notify) : 单生产和单消费

单生产者 package com.example.t.pc; import java.util.List; //生产者 public class P { private List list; public P(){ } public P(List list){ this.list = list; } public void add(){ while(true){ synchronized (list){ try { System.out.println("3s----------------&q

使用JUC并发工具包的Lock和Condition,实现生产者和消费者问题中的有界缓存

JDK5.0之前,用java实现生产者和消费者的唯一方式就是使用synchronized内置锁和wait/notify条件通知机制.JDK5.0之后提供了显示锁Lock和条件队列Condition,与内置锁和内置条件队列相对应,但是显示的锁和条件队列,功能更强大,更灵活.此外JDK5.0之后还提供了大量很有用的并发工具类,如BlockingQueue等,基于这些数据结构,能够方便.快速.高效的构建自己应用需要的效果.这里我们简单使用下显示锁和条件队列,来模拟有界缓存的实现,功能类似于JDK内置的

Java模拟生产者消费者问题

一.Syncronized方法详解 解决生产者消费这问题前,先来了解一下Java中的syncronized关键字. synchronized关键字用于保护共享数据.请大家注意"共享数据",你一定要分清哪些数据是共享数据,如下面程序中synchronized关键字保护的不是共享数据(其实在这个程序中synchronized关键字没有起到任何作用,此程序的运行结果是不可预先确定的).这个程序中的t1,t2是 两个对象(pp1,pp2)的线程.JAVA是面向对象的程序设计语言,不同的对象的数

Java中的生产者、消费者问题

Java中的生产者.消费者问题描述: 生产者-消费者(producer-consumer)问题, 也称作有界缓冲区(bounded-buffer)问题, 两个进程共享一个公共的固定大小的缓冲区(仓库). 其中一个是生产者, 用于将产品放入仓库: 另外一个是消费者, 用于从仓库中取出产品消费. 问题出现在当仓库已经满了, 而此时生产者还想向其中放入一个新的产品的情形, 其解决方法是让生产者此时进行等待, 等待消费者从仓库中取走了一个或者多个产品后再去唤醒它. 同样地, 当仓库已经空了, 而消费者还