生产者和消费者问题是操作系统的经典问题,在实际工作中也常会用到,主要的难点在于协调生产者和消费者,因为生产者的个数和消费者的个数不确定,而生产者的生成速度与消费者的消费速度也不一样,同时还要实现生产者与消费者的解耦,即生产者并不知道有哪些消费者,而消费者也不需要知道产品是哪个生产的,他们之间只与一个交易平台发生关系。
这是现实世界普遍存在的问题,比如我们去苹果专卖店买IPhone 6,我们属于消费者,而生产商把产品生产出来放在苹果专卖店,如果全世界只有一个苹果专卖店,当专卖店没有IPhone 6时,我们只有等,而当专卖店屯了很多货,以至于专卖店放不下了时,苹果公司比如要生产商暂停生产。生产者与消费者是通过一个缓存仓库来交易的。
Java里面有LinkedBlockingQueue、ArrayBlockingQueue可以在并发环境实现阻塞插入和删除,非常适合作为生产者和消费者之间的纽带。
生产者:
/** * 生产者 * @author jiqunpeng * */ class Producer implements Runnable { LinkedBlockingQueue<Integer> buffer; //构造生产者,注册仓库 Producer(LinkedBlockingQueue<Integer> buffer) { this.buffer = buffer; } /** * 生产一个产品,当仓库已经满时,等待仓库有空地再放入仓库 * @param e * @throws InterruptedException */ public void produce(Integer e) throws InterruptedException { buffer.put(e); } @Override public void run() { Random random = new Random(7); try { while (true) {//一生不息 Integer product = random.nextInt(); System.out.println(this + " \tProduct:\t " + product); produce(product); TimeUnit.MILLISECONDS.sleep(random.nextInt(500));//短暂的休息 } } catch (InterruptedException e) { e.printStackTrace(); } } }
消费者
/** * 消费者 * @author jiqunpeng * */ class Consumer implements Runnable { LinkedBlockingQueue<Integer> buffer; //注册仓库 Consumer(LinkedBlockingQueue<Integer> buffer) { this.buffer = buffer; } /** * 从仓库中的取出产品消费,当仓库里面没有产品时,会一直等下去 * @return * @throws InterruptedException */ public Integer consume() throws InterruptedException { Integer e = buffer.take(); return e; } @Override public void run() { Random random = new Random(7); try { while (true) {//一生都要吃 Integer product = consume(); System.out.println(this + " \tConsume:\t " + product); TimeUnit.MILLISECONDS.sleep(random.nextInt(2000));//吃了也要睡会 } } catch (InterruptedException e) { e.printStackTrace(); } } }
调度运行
public class ProducerConsumer { public static void main(String[] args) { // 任务调度器 ExecutorService exec = Executors.newFixedThreadPool(10); // 仓库 final LinkedBlockingQueue<Integer> buffer = new LinkedBlockingQueue<>(5); for (int i = 0; i < 2; i++) { // 创建生产者 Producer p = new Producer(buffer); // 领到把生产者拉到车间,被迫没日没夜的干活 exec.execute(p); // 消费者出生了 Consumer c = new Consumer(buffer); // 消费者一生都在消费 exec.execute(c); } exec.execute(new Runnable() { @Override public void run() { while (true) { // 定时看一下仓库的空间 System.out.println("buffer :" + buffer.size()); try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); } } } }); } }
模拟结果:
[email protected] Product: -1156638823 [email protected] Consume: -1156638823 [email protected] Product: -1156638823 [email protected] Consume: -1156638823 buffer :0 [email protected] Product: -1077308326 [email protected] Product: -1077308326 [email protected] Product: 1495978761 [email protected] Product: 1495978761 [email protected] Consume: -1077308326 [email protected] Consume: -1077308326 [email protected] Product: -441191359 [email protected] Product: -441191359 [email protected] Product: -1253369595 [email protected] Product: -1253369595 [email protected] Product: 1511462400 [email protected] Consume: 1495978761 [email protected] Consume: 1495978761 [email protected] Product: 1511462400 [email protected] Product: 518557417
当然我们也可以自己定义一个线程安全的有界阻塞缓存队列:
public class BoundedBuffer<E> { private Object[] buffer; final private ReentrantLock lock; final private Condition notEmpty; final private Condition notFull; private int count; private int putIndex; private int takeIndex; public BoundedBuffer(int size) { buffer = new Object[size]; lock = new ReentrantLock(); notEmpty = lock.newCondition(); notFull = lock.newCondition(); } public void put(E e) throws InterruptedException { lock.lock(); try { while (count == buffer.length) notFull.await(); buffer[putIndex] = e; if (++putIndex == buffer.length)// 循环数组 putIndex = 0; count++; notEmpty.signal(); } finally { lock.unlock(); } } public E take() throws InterruptedException { lock.lock(); System.out.println("take()"); try { while (count == 0) notEmpty.await(); @SuppressWarnings("unchecked") E item = (E) buffer[takeIndex]; count--; if (++takeIndex == buffer.length)// 循环数组 takeIndex = 0; notFull.signal(); return item; } finally { lock.unlock(); } } }
时间: 2024-11-08 17:45:07