Java 多线程学习笔记:生产者消费者问题

前言:最近在学习Java多线程,看到ImportNew网上有网友翻译的一篇文章《阻塞队列实现生产者消费者模式》。在文中,使用的是Java的concurrent包中的阻塞队列来实现。在看完后,自行实现阻塞队列。

(一)准备

  在多线程中,生产者-消费者问题是一个经典的多线程同步问题。简单来说就是有两种线程(在这里也可以做进程理解)——生产者和消费者,他们共享一个固定大小的缓存区(如一个队列)。生产者负责产生放入新数据,消费者负责取出缓存区的数据。具体介绍请参考 Producer-consumer problem

(二)设计概述

  详细的代码可以到我的 Github 仓库下载(master分支为详细的代码,输出所有的必要信息;simplification分支为简化的代码)。

  项目代码,分为四个类:Consumer类为消费者线程,Producer类为生产者线程,ProducerConsumer测试的启动类,ProducerConsumerQueue类为阻塞队列。

  在多线程中,生产者不断向队列放入新数据,当队列已满时,生产者将被阻塞,直到消费者从队列中取出部分数据并唤醒生产者;消费者不断从队列取出数据,当队列为空时,消费者将被阻塞,直到生产者再次放入新数据并唤醒消费者。在JAVA的concurrent包中,提供了一系列阻塞队列帮我们完成这一工作。这一工作也恰好是生产者消费者的问题的关键所在。在这里,我将自行实现阻塞队列:ProducerConsumerQueue;

(三)详细实现

  (1)ProducerConsumerQueue类

 1 public final class ProducerConsumerQueue<E> {
 2     //声明一个重入锁
 3     private final Lock lock = new ReentrantLock();
 4     private final Condition notEmpty = lock.newCondition();
 5     private final Condition notFull = lock.newCondition();
 6     //缓存大小
 7     private final int CAPACITY;
 8     //存储数据的缓存队列
 9     private Queue<E> queue;
10
11     public ProducerConsumerQueue(int CAPACITY) {
12         this.CAPACITY = CAPACITY;
13         this.queue = new LinkedList<E>();
14     }
15
16     //put the e into the queue
17     public void put(E e) throws InterruptedException {
18         lock.lock();
19
20         try {
21             while (queue.size() == this.CAPACITY)
22                 this.notFull.await();
23             queue.offer(e);
24             this.notEmpty.signalAll();
25         } finally {
26             lock.unlock();
27         }
28     }
29
30     //get e from queue.
31     public E take() throws InterruptedException {
32         lock.lock();
33
34         try {
35             while (queue.size() == 0)
36                 this.notEmpty.await();
37             this.notFull.signalAll();
38             return queue.poll();
39         } finally {
40             lock.unlock();
41         }
42     }
43 }

  该队列提供put()和take()方法,前者用于生产者放入数据,后者用于消费者取出数据。

  在put方法中,首先要获得锁。然后检测是否队列已满,如果是,阻塞生产者线程,并释放锁;当再次获得锁时,这里使用了while语句,使得线程再次检查队列是否已满,如果未满,放入数据到缓存,并唤醒可能阻塞的消费者线程。在这里使用while,目的是防止其他生产者线程已经唤醒,并且成功放入新数据到队列,然后阻塞,而该线程又再次放入,导致可能缓存溢出。

  在take方法中,同样要先获得锁。检测队列是否已空,如果是,阻塞消费者线程,释放锁;当被唤醒并再次获得锁时,使用while再次检查队列是否仍为空,如果非空,则取出缓存数据。while同样为了防止其他消费者对一个空队列取数据。

  (2)Consumer类

 1 public class Consumer implements Runnable {
 2     private final ProducerConsumerQueue sharedQueue;
 3
 4     public Consumer(ProducerConsumerQueue sharedQueue) {
 5         this.sharedQueue = sharedQueue;
 6     }
 7
 8     @Override
 9     public void run() {
10         while (true) {
11             try {
12                 synchronized (this){
13                     System.out.println("\tConsumer: " + sharedQueue.take());
14                 }
15             } catch (InterruptedException ex) {
16                 System.out.println(Thread.currentThread().getName() + " throw a interrupexception.");
17                 //do something.
18             }
19         }
20     }
21 }

  消费者线程持续从队列中取出数据。

  (3)Producer类

 1 public class Producer implements Runnable {
 2     private final ProducerConsumerQueue sharedQueue;
 3
 4     public Producer(ProducerConsumerQueue sharedQueue) {
 5         this.sharedQueue = sharedQueue;
 6     }
 7
 8     @Override
 9     public void run() {
10         for (int i = 0; i < 20; i++) {
11             try {
12                 synchronized (this){
13                     System.out.println("Producer: "+i);
14                     sharedQueue.put(i);
15                 }
16             } catch (InterruptedException ex) {
17                 System.out.println(Thread.currentThread().getName() + " throw a interrupexception. ");
18                 //do something.
19             }
20         }
21     }
22 }

  生产者线程产生数据并放入队列中。

  (4)ProducerConsumer测试类

public class ProducerConsumer {
    public static void main(String[] args) {
        ProducerConsumerQueue sharedQueue = new ProducerConsumerQueue(4);

        Thread prodThread = new Thread(new Producer(sharedQueue));
        Thread consThread1 = new Thread(new Consumer(sharedQueue));
        Thread consThread2 = new Thread(new Consumer(sharedQueue));

        prodThread.start();
        consThread1.start();
        consThread2.start();
    }
}

  创建一个生产者线程和两个消费者线程。

扩展:

1、Wikipedia:http://en.wikipedia.org/wiki/Producer%E2%80%93consumer_problem

2、《聊聊并发——生产者和消费者模式》http://www.infoq.com/cn/articles/producers-and-consumers-mode

时间: 2024-10-23 10:13:04

Java 多线程学习笔记:生产者消费者问题的相关文章

Java多线程学习笔记--生产消费者模式

实际开发中,我们经常会接触到生产消费者模型,如:Android的Looper相应handler处理UI操作,Socket通信的响应过程.数据缓冲区在文件读写应用等.强大的模型框架,鉴于本人水平有限目前水平只能膜拜,本次只能算学习笔记,为了巩固自己对Java多线程常规知识点的理解,路过大神还望能指导指导.下面一段代码是最常规的生产者消费者的例子: package com.zhanglei.demo; import java.util.ArrayList; import java.util.List

java 多线程 22 :生产者/消费者模式 进阶 利用await()/signal()实现

java多线程15 :wait()和notify() 的生产者/消费者模式 在这一章已经实现了  wait/notify 生产消费模型 利用await()/signal()实现生产者和消费者模型 一样,先定义一个缓冲区: public class ValueObject { public static String value = ""; } 换种写法,生产和消费方法放在一个类里面: public class ThreadDomain41 extends ReentrantLock {

Java 多线程学习笔记:wait、notify、notifyAll的阻塞和恢复

前言:昨天尝试用Java自行实现生产者消费者问题(Producer-Consumer Problem),在coding时,使用到了Condition的await和signalAll方法,然后顺便想起了wait和notify,在开发中遇到了一个问题:wait.notify等阻塞和恢复的时机分别是什么?在网上Google了很久各种博文后,发现几乎没有人提到这个点.最后在官方文档中才找到了相应的介绍. (一)准备 按照惯例应该是要先介绍一下wait.notify和notifyAll的基础知识.我找到了

Java多线程学习笔记(一)

一 概述 一个进程只有一个至少会运行一个线程,Java中同样存在这样,在调用main方法的时候,线程又JVM所创建. 1 package link.summer7c.test; 2 3 public class Test{ 4 public static void main(String[] args){ 5 System.out.println(Thread.currentThread().getName()); 6 } 7 } 运行结果:main 叫做main的线程正在执行main()方法中

Java多线程14:生产者/消费者模型

什么是生产者/消费者模型 一种重要的模型,基于等待/通知机制.生产者/消费者模型描述的是有一块缓冲区作为仓库,生产者可将产品放入仓库,消费者可以从仓库中取出产品,生产者/消费者模型关注的是以下几个点: 1.生产者生产的时候消费者不能消费 2.消费者消费的时候生产者不能生产 3.缓冲区空时消费者不能消费 4.缓冲区满时生产者不能生产 生产者/模型作为一种重要的模型,它的优点在于: 1.解耦.因为多了一个缓冲区,所以生产者和消费者并不直接相互调用,这一点很容易想到,这样生产者和消费者的代码发生变化,

java多线程--“朴素版”生产者消费者问题

1. 生产/消费者模型 生产/消费者问题是个非常典型的多线程问题,涉及到的对象包括"生产者"."消费者"."仓库"和"产品".他们之间的关系如下: (01) 生产者仅仅在仓储未满时候生产,仓满则停止生产. (02) 消费者仅仅在仓储有产品时候才能消费,仓空则等待. (03) 当消费者发现仓储没产品可消费时候会通知生产者生产. (04) 生产者在生产出可消费产品时候,应该通知等待的消费者去消费. 2. 生产/消费者实现 下面通过

Java多线程学习笔记——从Java JVM对多线程数据同步的一些理解

   我们知道在多线程编程中,我们很大的一部分内容是为了解决线程间的资源同步问题和线程间共同协作解决问题.线程间的同步,通俗我们理解为僧多粥少,在粥有限情况下,我们怎么去防止大家有秩序的喝到粥,不至于哄抢都没得喝.线程讲协作,我们可以理解为我们在医院看病的时候,我们要先挂号,才能看病.现在医院有很多病人排队,怎么协调病人都有秩序的先挂号,后看病.本篇文章的重点不在此,也不是在此一下子能分析完,我们先从Java JVM的角度来理解多线程的一些方面. 我们知道多线程间的数据同步,我们是通过加锁的操作

java多线程学习笔记——简单

进程:程序(任务)的执行过程——动态性. 持有资源(共享内存,共享文件)和线程. 线程:线程是系统中最小的执行单元,统一进程中有多个线程,线程共享进程的资源. 线程交互:互斥与同步. 注意:多线程是异步的,所以千万不要把Eclipse里代码的顺序当成线程执行的顺序,线程被调用的时机是随机的. java对线程的支持: class Thread    interface Runnable    共同的run方法 线程的创建和启动: 线程常用方法: 如何停止线程: java中有三种方法可以终止正在运行

Java多线程学习笔记1

1.线程的基本概念 一个关于计算机的简化的视图是: 它有一个执行计算的处理机. 包含处理机所执行的程 序的 ROM(只读存储器). 包含程序所要操作的数据的 RAM(只读存储器).线程,被认为是带有自己的程 序代码和数据的拟处理机的封装.线程的三个部分处理机,代码,数据. 代码可以或不可以由多个线程共享, 这和数据是独立的. 两个线程如果执行同一个类的 实例代码,则它们可以共享相同的代码. 类似地,数据可以或不可以由多个线程共享, 这和代码是独立的. 两个线程如果共享对 一个公共对象的存取,则它