线程安全的生产者消费者四种实现方法

问题描述

在IT技术面试过程中,我们经常会遇到生产者消费者问题(Producer-consumer problem), 这是多线程并发协作问题的经典案例。场景中包含三个对象,生产者(Producer),消费者(Consumer)以及一个固定大小的缓冲区(Buffer)。生产者的主要作用是不断生成数据放到缓冲区,消费者则从缓冲区不断消耗数据。该问题的关键是如何线程安全的操作共享数据块,保证生产者线程和消费者线程可以正确的更新数据块,主要考虑 1. 生产者不会在缓冲区满时加入数据. 2. 消费者应当停止在缓冲区时消耗数据. 3. 在同一时间应当只允许一个生产者或者消费者访问共享缓冲区(这一点是对于互斥操作访问共享区块的要求)。

解决方案

解决问题以上问题通常有信号量,wait & notify, 管道或者阻塞队列等几种思路。本文以Java语言为例一一进行举例讲解。

信号量

信号量(Semaphore)也称信号灯,是用来控制资源被同时访问的个数,比如控制访问数据库最大连接数的数量,线程通过acquire()获得连接许可,完成数据操作后,通过release()释放许可。对于生产者消费者问题来说,为了满足线程安全操作的要求,同一时间我们只允许一个线程访问共享数据区,因此需要一个大小为1的信号量mutex来控制互斥操作。注意到我们还定义了notFull 和 notEmpty 信号量,notFull用于标识当前可用区块的空间大小,当notFull size 大于0时表明"not full", producer 可以继续生产,等于0时表示空间已满,无法继续生产;同样,对于notEmpty信号量来说,大于0时表明 "not empty", consumer可以继续消耗,等于0 时表明没有产品,无法继续消耗。notFull初始size 为5 (5个available空间可供生产),notEmpty初始为0(没有产品可供消耗)。

   /***
     数据仓储class,所有的producer和consumer共享这个class对象
   **/
    static class DataWareHouse {
       //共享数据区
        private final Queue<String> data = new LinkedList();
        //非满锁
        private final Semaphore notFull;
        //非空锁
        private final Semaphore notEmpty;
        //互斥锁
        private final Semaphore mutex;

        public DataWareHouse(int capacity) {
            this.notFull = new Semaphore(capacity);
            this.notEmpty = new Semaphore(0);
            mutex = new Semaphore(1);
        }
        public void offer(String x) throws InterruptedException {
            notFull.acquire(); //producer获取信号,notFull信号量减一
            mutex.acquire(); //当前进程获得信号,mutex信号量减1,其他线程被阻塞操作共享区块data
            data.add(x);
            mutex.release(); //mutex信号量+1, 其他线程可以继续信号操作共享区块data
            notEmpty.release(); //成功生产数据,notEmpty信号量加1
        }
        public String poll() throws InterruptedException {
            notEmpty.acquire(); //notEmpty信号减一
            mutex.acquire();
            String result = data.poll();
            mutex.release();
            notFull.release(); //成功消耗数据, notFull信号量加1
            return result;
        }
    }
   /**Producer线程**/
    static class Producer implements Runnable {
        private final DataWareHouse dataWareHouse;

        public Producer(final DataWareHouse dataWareHouse) {
            this.dataWareHouse = dataWareHouse;
        }

        @Override
        public void run() {
            while (true) {
                try {
                    Thread.sleep(100); //生产的速度慢于消耗的速率
                    String s = UUID.randomUUID().toString();
                    System.out.println("put  data " + s);
                    dataWareHouse.offer(s);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
   /**Consumer线程**/
    static class Consumer implements Runnable {
        private final DataWareHouse dataWareHouse;

        public Consumer(final DataWareHouse dataWareHouse) {
            this.dataWareHouse = dataWareHouse;
        }

        @Override
        public void run() {
            while (true) {
                while (true) {
                    try {
                        System.out.println("get data " + dataWareHouse.poll());
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }
    //测试代码
    public static void main(String[] args) {
        final DataWareHouse dataWareHouse = new DataWareHouse(5);
        //三个producer 持续生产
        for (int i = 0; i < 3; i++) {
            Thread t = new Thread(new Producer(dataWareHouse));
            t.start();
        }
        //三个consumer 持续消耗
        for (int i = 0; i < 3; i++) {
            Thread t = new Thread(new Consumer(dataWareHouse));
            t.start();
        }
    }

Wait 和 Notify 机制

Java Object对象类中包含三个final methods来允许线程之间进行通信,告知资源的状态。它们分别是wait(), notify(), 和notifyAll()。

wait(): 顾名思义告诉当前线程释放锁,陷入休眠状态(waiting状态),等待资源。wait 方法本身是一个native method,它在Java中的使用语法如下所示:

synchronized(lockObject )
{
    while( ! condition )
    {
        lockObject.wait();
    }
    //take the action here;
}

notify(): 用于唤醒waiting状态的线程, 同时释放锁,被唤醒的线程可以重新获得锁访问资源。它的基本语法 如下

synchronized(lockObject)
{
    //establish_the_condition;
    lockObject.notify();
    //any additional code if needed
}

notifyAll(): 不同于notify(),它用于唤醒所有处于waiting状态的线程。语法如下:

synchronized(lockObject)
{
    establish_the_condition;
    lockObject.notifyAll();
}

说完了这三个方法,来看下如何使用wait & notify(All) 来解决我们的问题。新的DataWareHouse 类如下所示:

    //producer类和consumer共享对象
    static class DataWareHouse {
        //共享数据区
        private final Queue<String> data = new LinkedList();
        private int capacity;
        private int size = 0;

        public DataWareHouse(int capacity) {
            this.capacity = capacity;
        }

        public synchronized void offer(String x) throws InterruptedException {
            while (size == capacity) { //当buffer满时,producer进入waiting 状态
                this.wait(); //使用this对象来加锁
            }
            data.add(x);
            size++;
            notifyAll(); //当buffer 有数据时,唤醒所有等待的consumer线程
        }

        public synchronized String poll() throws InterruptedException {
            while (size == 0) {//当buffer为空时,consumer 进入等待状态
                this.wait();
            }
            String result = data.poll();
            size--;
            notifyAll(); //当数据被消耗,空间被释放,通知所有等待的producer。
            return result;
        }
    }

Note: 在方法上使用synchronized 等价于在方法体内使用synchronized(this),两者都是使用this对象作为锁。

生产者和消费者类,以及测试代码和 信号量相同,不做重复列举了。

管道

管道Pipe是实现进程或者线程(线程之间通常通过共享内存实现通讯,而进程则通过scoket,管道,消息队列等技术)之间通信常用方式,它连接输入流和输出流,基于生产者- 消费者模式构建的一种技术。具体实现可以通过创建一个管道输入流对象和管道输出流对象,然后将输入流和输出流就行链接,生产者通过往管道中写入数据,而消费者在管道数据流中读取数据,通过这种方式就实现了线程之间的互相通讯。

具体实现代码如下所示

public class PipeSolution {
    static class DataWareHouse implements Closeable {
        private final PipedInputStream pis;
        private final PipedOutputStream pos;

        public DataWareHouse() throws IOException {
            pis = new PipedInputStream();
            pos = new PipedOutputStream();
            pis.connect(pos); //连接管道
        }
        //向管道中写入数据
        public void offer(int val) throws IOException {
            pos.write(val);
            pos.flush();
        }
        //从管道中取数据.
        public int poll() throws IOException {
             //当管道中没有数据,方法阻塞
            return pis.read();
        }
        //关闭管道
        @Override
        public void close() throws IOException {
            if (pis != null) {
                pis.close();
            }
            if (pos != null) {
                pos.close();
            }
        }
    }
    //consumer类
    static class Consumer implements Runnable {
        private final DataWareHouse dataWareHouse;

        Consumer(DataWareHouse dataWareHouse) {
            this.dataWareHouse = dataWareHouse;
        }

        @Override
        public void run() {
            try {
                //消费者不断从管道中读取数据
                while (true) {
                    int num = dataWareHouse.poll();
                    System.out.println("get data +" + num);
                }
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }
    }
    static class Producer implements Runnable {
        private final DataWareHouse dataWareHouse;
        private final Random random = new Random();

        Producer(DataWareHouse dataWareHouse) {
            this.dataWareHouse = dataWareHouse;
        }

        @Override
        public void run() {
            try {
                //生产者不断向管道中写入数据
                while (true) {
                    int num = random.nextInt(256);
                    dataWareHouse.offer(num);
                    System.out.println("put data +" + num);
                    Thread.sleep(1000);
                }
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }

        public static void main(String[] args) throws IOException {
            DataWareHouse dataWareHouse = new DataWareHouse();
            new Thread(new Producer(dataWareHouse)).start();
            new Thread(new Consumer(dataWareHouse)).start();
        }
    }

阻塞队列

阻塞队列(BlockingQueue),具有1. 当队列满了的时候阻塞入队列操作 2. 当队列空了的时候阻塞出队列操作 3. 线程安全 的特性,因而阻塞队列通常被视为实现生产消费者模式最便捷的工具,其中DataWareHouse类实现代码如下:

  static class DataWareHouse {
        //共享数据区
        private final BlockingQueue<String> blockingQueue;

        public DataWareHouse(int capacity) {
            this.blockingQueue = new ArrayBlockingQueue<>(capacity);
        }

        public void offer(String x) {
            blockingQueue.offer(x);
        }
        public String poll() {
            return blockingQueue.poll();
        }
    }

生产者和消费者类,以及测试代码和 信号量 相同,在此不做重复列举了。

总结

生产者消费者问题是面试中经常会遇到的题目,本文总结了几种常见的实现方式,面试过程中通常不必要向面试官描述过多实现细节,说出每种实现方式的特点即可。希望能给大家带来帮助。

Reference

  1. https://howtodoinjava.com/java/multi-threading/wait-notify-and-notifyall-methods/

原文地址:https://www.cnblogs.com/jun-ma/p/11843394.html

时间: 2024-12-10 20:46:53

线程安全的生产者消费者四种实现方法的相关文章

Linux线程编程之生产者消费者问题

前言 本文基于顺序循环队列,给出Linux生产者/消费者问题的多线程示例,并讨论编程时需要注意的事项.文中涉及的代码运行环境如下: 本文假定读者已具备线程同步的基础知识. 一  顺序表循环队列 1.1 顺序循环队列定义 队列是一种运算受限的先进先出线性表,仅允许在队尾插入(入队),在队首删除(出队).新元素入队后成为新的队尾元素,元素出队后其后继元素就成为队首元素. 队列的顺序存储结构使用一个数组和两个整型变量实现,其结构如下: 1 struct Queue{ 2 ElemType elem[M

由ORACLE_SID想到脚本的四种运行方法

以前学习脚本知道一个概念,关于脚本运行方式的问题,我们熟知的脚本运行方式有以下几种: (1)../script.sh (点斜线脚本) (2).sh script.sh (sh空格脚本) (3).source script.sh (source空格脚本) (4).. script.sh (点空格脚本) 先写个简单的脚本,把执行结果贴在下面,然后分别对这几种脚本运行方式作解释. script.sh内容如下: ------------------------------- #!/bin/bash ec

JAVA中运用数组的四种排序方法

JAVA中在运用数组进行排序功能时,一般有四种方法:快速排序法.冒泡法.选择排序法.插入排序法. 快速排序法主要是运用了Arrays中的一个方法Arrays.sort()实现. 冒泡法是运用遍历数组进行比较,通过不断的比较将最小值或者最大值一个一个的遍历出来. 选择排序法是将数组的第一个数据作为最大或者最小的值,然后通过比较循环,输出有序的数组. 插入排序是选择一个数组中的数据,通过不断的插入比较最后进行排序.下面我就将他们的实现方法一一详解供大家参考. <1>利用Arrays带有的排序方法快

C#四种深拷贝方法

//四种深拷贝方法 public static T DeepCopyByReflect<T>(T obj) { //如果是字符串或值类型则直接返回 if (obj is string || obj.GetType().IsValueType) return obj; object retval = Activator.CreateInstance(obj.GetType()); FieldInfo[] fields = obj.GetType().GetFields(BindingFlags.

并查集类的c++封装,比较union_find algorithm四种实现方法之间的性能差别

问题描述: 在计算机科学中,并查集是一种树型的数据结构,其保持着用于处理一些不相交集合(Disjoint Sets)的合并及查询问题.有一个联合-查找算法(union-find algorithm)定义了两个操作用于此数据结构: Find:确定元素属于哪一个子集.它可以被用来确定两个元素是否属于同一子集: Union:将两个子集合并成同一个集合: 实现并查集的关键是实现union-find algorithm, 本文根据常用的四种算法,实现了这个类,具体算法实现请参看维基百科: 制造测试数据集,

数据库的四种操纵方法——增、删、改、查

数据库的四种操纵方法——增.删.改.查 增——一种可视化增加就是在设计页面右键点开已经存在的表 进行内容的增加. 另一种是在查询页面,创建查询在代码界面进行代码添加.书写形式:insert into xxx(zzz,ccc)values('aaa','sss')——insert是插入的意思,into是进哪去,xxx代表要插入的表名,zzz,ccc表示表内的列名,values的意思是‘值’,后面的xxx和ccc是插入所内容的列名.总体的意思就是:在xxx表内的zzz列和ccc列插入aaa和sss

并查集类的c++封装,比較union_find algorithm四种实现方法之间的性能区别

问题描写叙述: 在计算机科学中,并查集是一种树型的数据结构,其保持着用于处理一些不相交集合(Disjoint Sets)的合并及查询问题.有一个联合-查找算法(union-find algorithm)定义了两个操作用于此数据结构: Find:确定元素属于哪一个子集.它能够被用来确定两个元素是否属于同一子集: Union:将两个子集合并成同一个集合: 实现并查集的关键是实现union-find algorithm, 本文依据经常使用的四种算法,实现了这个类,详细算法实现请參看维基百科: 制造測试

关于Facebook、twitter、google、pinterest四种分享方法,附带微博、QQ、微信、豆瓣等等

常用的国外网站分享推过,Facebook.twitter.google.pinterest这四种社交平台最常见,下面就简单介绍一下分享的方法. 首先,必须使用分享插件,share.js 获取方法: https://github.com/chenpenggood/share.js 这有两种分享文件:1.关于Facebook.twitter.google.pinterest四种分享方法, 简洁版,容量小 2.jQuery分享插件jquery.share.js享到QQ.微信.微博.google.in.

U盘强制拔出丢失数据的恢复方法(U盘写保护的四种解决方法)

● U盘强制拔出丢失数据的恢复方法 U盘从出现以来,小巧便携容量大深受人们的喜爱,不用像云盘一样需要下载,所以重要的文件我们都喜欢用U盘来传递数据,但是很多人使用U盘拔出时都没有使用"弹出U盘"功能,取出过程中电脑正在对U盘读写,这时候拔出很容易造成数据丢失或者导致U盘中的文件损坏无法打开,那么这种情况下我们应该怎么解决呢?下面小编就教大家如何恢复,一起来看看吧. 首先,下载一个数据恢复软件,比较好用的就有[迷你兔数据恢复软件](minitool中文版本),可以恢复U盘中被删除的数据,