Java多线程学习笔记--生产消费者模式

  实际开发中,我们经常会接触到生产消费者模型,如:Android的Looper相应handler处理UI操作,Socket通信的响应过程、数据缓冲区在文件读写应用等。强大的模型框架,鉴于本人水平有限目前水平只能膜拜,本次只能算学习笔记,为了巩固自己对Java多线程常规知识点的理解,路过大神还望能指导指导。下面一段代码是最常规的生产者消费者的例子:

package com.zhanglei.demo;

import java.util.ArrayList;
import java.util.List;
import java.util.Random;

public class ResourceBuffer {

    private final int DEFAULT_BUFFER_SIZE = 100;
    private  int size;
    private Random rnd;
    private List<Integer> bufferList = new ArrayList<Integer>();
    public ResourceBuffer(int size){
        rnd = new Random();
        if(size >0)
            this.size = size;
        else
            this.size = DEFAULT_BUFFER_SIZE;
    }

    public synchronized void product(){
        if(bufferList.size() == size){
              try {
                  wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        int num = rnd.nextInt(100);
        bufferList.add(num);
        System.out.println("生产商品编号"+num);
        notifyAll();
    }

    public synchronized void consumer(){
        if(bufferList.size() == 0){
            try {
                 wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        int index = bufferList.size() -1;
        System.out.println("消费商品编号"+bufferList.get(index));
        bufferList.remove(index);
        notifyAll();
    }
}
package com.zhanglei.demo;

import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Program {

    /**
     * @param args
     */
    public static void main(String[] args) {
        ResourceBuffer buffer = new ResourceBuffer(10);
        ExecutorService executor = Executors.newCachedThreadPool();
        executor.execute(new ProductTask(buffer));
        executor.execute(new ConsumerTask(buffer));
    }
}

class ConsumerTask implements Runnable{

    private ResourceBuffer buffer;
    public ConsumerTask(ResourceBuffer buffer){
        this.buffer = buffer;
    }

    @Override
    public void run() {
        while(true){
            buffer.consumer();
        }
    }
}

class ProductTask implements Runnable{
    private ResourceBuffer buffer;
    public ProductTask(ResourceBuffer buffer){
        this.buffer = buffer;
    }

    @Override
    public void run() {
        while(true){
            buffer.product();
        }
    }
}

以上代码通过实现对ResourceBuffer类的对象生产和消费来实现同步和协作,实际上就是对资源互斥访问实现同步。我们同样可以用java.util.concurrent包下的Lock接口实现同样的效果,代码如下:

package com.zhanglei.demo;

import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class ResourceBuffer {

    private final int DEFAULT_BUFFER_SIZE = 100;
    private  int size;
    private Random rnd;
    private List<Integer> bufferList = new ArrayList<Integer>();
    private Lock  lock = new ReentrantLock();
    private  Condition notEmpty = lock.newCondition();//不为空条件
    private  Condition notFill = lock.newCondition();//不为满条件
    public ResourceBuffer(int size){
        rnd = new Random();
        if(size >0)
            this.size = size;
        else
            this.size = DEFAULT_BUFFER_SIZE;
    }

    public void product(){
        lock.lock();
        try{
        if(bufferList.size() == size){
              try {
                  notFill.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        int num = rnd.nextInt(100);
        bufferList.add(num);
        System.out.println("生产商品编号"+num);
          notEmpty.signalAll();
        }
        finally{
            lock.unlock();
        }
    }

    public  void consumer(){
        lock.lock();
        try{
        if(bufferList.size() == 0){
            try {
                    notEmpty.await();
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
        int index = bufferList.size() -1;
        System.out.println("消费商品编号"+bufferList.get(index));
        bufferList.remove(index);
         notFill.signalAll();
        }
        finally{
             lock.unlock();
        }
    }
}

  通过以上代码实现的对生产者和消费者模式的同步,也只是实现对资源互斥访问实现同步,这种同步方式的并发并不高。如果说这种方式的生产者和消费者模式有什么优势的话,我个人觉得唯一的优势,即使发生了异常,也能保证锁一定能被释放。这种方式只是解决了同步问题,还有并发还有提高的空间。我们通过同步方法,我们本来目的只是为了保证生产和消费互斥操作,但是我们本来可以多个生产者一起生产的情况也被禁止了,这样让我们的并发度降低不少。

  由此,我们可以改进我们的生产者和消费者模式,下面我们通过引入读写锁来解决不能多个生产者同时生产或者多个消费者同时消费的问题。改进后的代码如下:

package com.zhanglei.demo;

import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

public class ResourceBuffer {
    private final int DEFAULT_BUFFER_SIZE = 100;
    private  int size;
    private Random rnd;
    private List<Integer> bufferList = new ArrayList<Integer>();
    private ReadWriteLock rwLock = new ReentrantReadWriteLock();
    public ResourceBuffer(int size){
        rnd = new Random();
        if(size >0)
            this.size = size;
        else
            this.size = DEFAULT_BUFFER_SIZE;
    }

    public void product(){
        rwLock.writeLock().lock();
        try{
        if(bufferList.size() == size){
              try {
                  Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        int num = rnd.nextInt(100);
        bufferList.add(num);
        System.out.println("生产商品编号"+num);
        }
        finally{
            rwLock.writeLock().unlock();
        }
    }

    public  void consumer(){
        rwLock.readLock().lock();
        try{
        if(bufferList.size() == 0){
        try {
              Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        int index = bufferList.size() -1;
        System.out.println("消费商品编号"+bufferList.get(index));
        bufferList.remove(index);
        }
        finally{
            rwLock.readLock().unlock();
        }
    }
}

本着不重复造轮子的原则,生产者和消费者模式中的缓存区,在我们java类库已经做了相当好的封装,我们下面引入java.util.concurrent下的ArrayBlockingQueue来实现我们的生产者和消费者模式的代码如下:

  

package com.zhanglei.demo;

import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;

public class ResourceBuffer {
    private final int DEFAULT_BUFFER_SIZE = 100;
    private  int size;
    private Random rnd;
    private ArrayBlockingQueue<Integer> arrayQueue;

    public ResourceBuffer(int size){
         if(size >0)
                this.size = size;
            else
                this.size = DEFAULT_BUFFER_SIZE;
        rnd = new Random();
        arrayQueue = new ArrayBlockingQueue<Integer>(size);
        //此处指定数组的队列的初始容量大小
    }

    public void product() {
         int num = rnd.nextInt(100);
         System.out.println("生产商品编号"+num);
         try {
            arrayQueue.put(num);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public void consumer(){
        int num;
        try {
            num = arrayQueue.take();
            System.out.println("消费商品编号"+num);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

  我们通过查看put和take方法的源码,我们知道ArrayBlockingQueue已经实现我们以上可阻塞的队列。关于offer和poll的源码如下:

public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == items.length)
                notFull.await();
            insert(e);
        } finally {
            lock.unlock();
        }
    }
public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0)
                notEmpty.await();
            return extract();
        } finally {
            lock.unlock();
        }
    }

  通过ArrayBlockingQueue源码,我们看到ArrayBlockingQueue的读/取的方法跟以上生产者和消费者方法实现基本一致。

时间: 2024-10-27 12:09:18

Java多线程学习笔记--生产消费者模式的相关文章

Java 多线程学习笔记:生产者消费者问题

前言:最近在学习Java多线程,看到ImportNew网上有网友翻译的一篇文章<阻塞队列实现生产者消费者模式>.在文中,使用的是Java的concurrent包中的阻塞队列来实现.在看完后,自行实现阻塞队列. (一)准备 在多线程中,生产者-消费者问题是一个经典的多线程同步问题.简单来说就是有两种线程(在这里也可以做进程理解)——生产者和消费者,他们共享一个固定大小的缓存区(如一个队列).生产者负责产生放入新数据,消费者负责取出缓存区的数据.具体介绍请参考 Producer-consumer

java 多线程 22 :生产者/消费者模式 进阶 利用await()/signal()实现

java多线程15 :wait()和notify() 的生产者/消费者模式 在这一章已经实现了  wait/notify 生产消费模型 利用await()/signal()实现生产者和消费者模型 一样,先定义一个缓冲区: public class ValueObject { public static String value = ""; } 换种写法,生产和消费方法放在一个类里面: public class ThreadDomain41 extends ReentrantLock {

Java多线程学习笔记(一)

一 概述 一个进程只有一个至少会运行一个线程,Java中同样存在这样,在调用main方法的时候,线程又JVM所创建. 1 package link.summer7c.test; 2 3 public class Test{ 4 public static void main(String[] args){ 5 System.out.println(Thread.currentThread().getName()); 6 } 7 } 运行结果:main 叫做main的线程正在执行main()方法中

Java多线程学习笔记——从Java JVM对多线程数据同步的一些理解

   我们知道在多线程编程中,我们很大的一部分内容是为了解决线程间的资源同步问题和线程间共同协作解决问题.线程间的同步,通俗我们理解为僧多粥少,在粥有限情况下,我们怎么去防止大家有秩序的喝到粥,不至于哄抢都没得喝.线程讲协作,我们可以理解为我们在医院看病的时候,我们要先挂号,才能看病.现在医院有很多病人排队,怎么协调病人都有秩序的先挂号,后看病.本篇文章的重点不在此,也不是在此一下子能分析完,我们先从Java JVM的角度来理解多线程的一些方面. 我们知道多线程间的数据同步,我们是通过加锁的操作

java多线程学习笔记——简单

进程:程序(任务)的执行过程——动态性. 持有资源(共享内存,共享文件)和线程. 线程:线程是系统中最小的执行单元,统一进程中有多个线程,线程共享进程的资源. 线程交互:互斥与同步. 注意:多线程是异步的,所以千万不要把Eclipse里代码的顺序当成线程执行的顺序,线程被调用的时机是随机的. java对线程的支持: class Thread    interface Runnable    共同的run方法 线程的创建和启动: 线程常用方法: 如何停止线程: java中有三种方法可以终止正在运行

Java多线程学习笔记1

1.线程的基本概念 一个关于计算机的简化的视图是: 它有一个执行计算的处理机. 包含处理机所执行的程 序的 ROM(只读存储器). 包含程序所要操作的数据的 RAM(只读存储器).线程,被认为是带有自己的程 序代码和数据的拟处理机的封装.线程的三个部分处理机,代码,数据. 代码可以或不可以由多个线程共享, 这和数据是独立的. 两个线程如果执行同一个类的 实例代码,则它们可以共享相同的代码. 类似地,数据可以或不可以由多个线程共享, 这和代码是独立的. 两个线程如果共享对 一个公共对象的存取,则它

Java多线程学习笔记——信号量的使用

Java中在控制多线程访问资源的时候使用了信号量可以控制多个线程同时访问一个资源. 有两个构造方法: public Semaphore(int permits) public Semaphore(int permits,boolean fair) 第二个参数和重入锁一样,是指定是否公平.(公平是要牺牲性能的) 1 public class SignalNum implements Runnable { 2 Semaphore semaphore=new Semaphore(2); 3 4 5 @

Java 多线程学习笔记:wait、notify、notifyAll的阻塞和恢复

前言:昨天尝试用Java自行实现生产者消费者问题(Producer-Consumer Problem),在coding时,使用到了Condition的await和signalAll方法,然后顺便想起了wait和notify,在开发中遇到了一个问题:wait.notify等阻塞和恢复的时机分别是什么?在网上Google了很久各种博文后,发现几乎没有人提到这个点.最后在官方文档中才找到了相应的介绍. (一)准备 按照惯例应该是要先介绍一下wait.notify和notifyAll的基础知识.我找到了

java 多线程学习笔记(一) -- 计算密集型任务

最近在看<Java虚拟机并发编程>,在此记录一些重要的东东. 线程数的确定:1. 获取系统可用的处理器核心数:int numOfCores = Runtime.getRuntime().availableProcessors()2. 如果任务是计算密集型的,则线程数 = numOfCores        如果任务是IO密集型的,则线程数 = numOfCores / (1 - 阻塞系数), 其中阻塞系数在0~1之间.注:如果任务被阻塞的时间大于执行时间, 则这些任务是IO密集型的,我们就需要