【杂谈】再看生产-消费模式

生产者和消费者之间为什么隔着一个队列?

首先,生产者与消费者由于速度的不一致,所以需要一个空间用于缓冲。这可以将生产者与消费者解耦,生产者产出数据的时候,不需要把数据交到消费者手上才行,只要把数据丢入缓冲区就好。这样就可以各做各的。

为什么缓冲区是一个队列?

通常情况下,这个缓冲区的数据结构是一个有序的队列。实际上如果对处理顺序没啥要求,其实也不一定要用队列。插空都可以。

为什么访问的缓冲区的时候要获得锁?

缓冲区这个数据结构会被多线程并发访问(生产者、消费者线程),所以需要加锁,一方面保护它的结构不被破坏,另一方面保证代码的正确性。

这样是不是就可以用了?

是,用是可以用了,但是性能可能会比较差。

为什么性能会比较差?

考虑这样一个场景:缓冲区已满。生产者会一直尝试往里面丢东西,所以就一直"获得锁-释放锁-获得锁-释放锁"。一方面,生产者空转,浪费CPU时间片,就会影响其他线程的调度。这时候如果有一个消费者处理完手头的数据,想再拿一个出来处理,那这时候生产者和消费者就会进行不必要的竞争,因为这个时候生产者抢到了锁也没用。

这,这可如何是好啊?

简单,分两种情况,一种是当缓冲区满的时候,如果生产者再尝试往里面丢东西,就把它挂起。同理,当缓冲区为空的时候,如果消费者再尝试往里面,也把它挂起。

那什么时候唤醒呢?

不是一样的吗,当缓冲区来数据的时候(从无到有),就唤醒消费者线程。当缓冲区有空闲空间的时候(从满到不满),就唤醒生产者线程。

那代码该怎么写呢?

首先我们先简单实现一个"锁",就是下面这样。

public class Lock {
    /**
     * 等待锁的线程队列
     */
    private List<Thread> waitThreads = new ArrayList<>();
    /**
     * 守卫
     */
    private AtomicInteger guard = new AtomicInteger(0);
    /**
     * 锁标志
     */
    private AtomicInteger lockFlag = new AtomicInteger(0);
    /**
     * 当前线程的拥有者
     */
    private Thread holder;

    public void lock() {
        if(Objects.equals(holder, Thread.currentThread())) //如果线程已经获得锁,则直接返回
            return;
        while(!guard.compareAndSet(0, 1)) //尝试获得守卫允许
            ;
        if(lockFlag.intValue() == 0) {
            lockFlag.set(1); //将锁标记为"已被占用"
            holder = Thread.currentThread(); //将锁的拥有者设为当前线程
            guard.set(0); //释放守卫
        } else {
            waitThreads.add(Thread.currentThread()); //加入到等待队列
            guard.set(0); //释放守卫
            LockSupport.park(); //将当前线程挂起
            holder = Thread.currentThread(); //当线程从上一行恢复执行的时候,就说明此线程获得了锁
        }
    }

    public void unlock() {
        if(!Objects.equals(holder, Thread.currentThread())) //如果不是锁的拥有者就没资格释放锁
            return;
        while(!guard.compareAndSet(0, 1))
            ;
        if(waitThreads.size() == 0) { //判断是否有线程正在等待
            lockFlag.set(0); //如果没有,就将锁标记为"空闲"
            holder = null;
            guard.set(0); //释放守卫
        } else {
            LockSupport.unpark(waitThreads.remove(0)); //如果有线程在等待,则唤醒队列中的第一个
            guard.set(0); //释放守卫
        }
    }
}

然后,我们再来实现一下缓冲区的类。

public class BufferCache {
    /**
     * 缓冲数组,用于保存数据
     */
    private Object[] data;
    /**
     * 读索引 => 下一个要消费的数据从哪里拿
     */
    private int readIndex;
    /**
     * 写索引 => 下一个进来的数据要放哪里
     */
    private int writeIndex;
    /**
     * 当前缓冲区内数据的数量
     */
    private int count;
    /**
     * 生产者线程等待队列
     */
    private List<Thread> waitProducers = new ArrayList<>();
    /**
     * 消费者线程等待队列
     */
    private List<Thread>  waitConsumers = new ArrayList<>();
    /**
     * 前面实现的锁
     */
    private Lock lock = new Lock();

    public BufferCache(int initial) {
        this.data = new Object[initial];
    }

    public void put(Object e) {
        lock.lock(); //获得锁
        while(count == data.length) { //如果已满,就将生产线程挂起
            waitProducers.add(Thread.currentThread()); //放入等待队列,这样要唤醒的时候可以找得到
            lock.unlock(); //释放锁
            LockSupport.park(); //挂起当前线程
            lock.lock(); //醒来的时候,要再获得锁

        }

        data[writeIndex] = e;  //存入数据
        count++;
        if(++writeIndex == data.length) { //循环利用存储空间
            writeIndex = 0;
        }
        while(waitConsumers.size() != 0) { //唤醒消费线程
            LockSupport.unpark(waitConsumers.remove(0));
        }
        lock.unlock(); //释放锁
    }

    public Object take() { //同理
        lock.lock();
        Object e = null;
        while (count == 0) {
            waitConsumers.add(Thread.currentThread());
            lock.unlock();
            LockSupport.park();
            lock.lock();
        }

        e = data[readIndex];
        count--;
        if(++readIndex == data.length) {
            readIndex = 0;
        }
        while(waitProducers.size() != 0) {
            LockSupport.unpark(waitProducers.remove(0));
        }
        lock.unlock();
        return e;
    }

    private static class Task1 implements Runnable { //生产任务
        private int num;
        private BufferCache cache;
        private String name;

        public Task1(BufferCache cache, String index) {
            this.name = "producer-" + index;
            this.num = 0;
            this.cache = cache;
        }

        @Override
        public void run() {
            String data;
            while(true) { //每隔一秒往队列丢一个数据
                data = num + " from " + name;
                cache.put(data);
                System.out.println(name + "放入:" + data);
                num++;
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    private static class Task2 implements Runnable {
        private BufferCache cache;

        public Task2( BufferCache cache) {
            this.cache = cache;
        }

        @Override
        public void run() {
            while(true) { //不断从队列中抓取数据
                Object e = cache.take();
                System.out.println("消费到:" + e);
            }
        }
    }
    public static void main(String[] args) { //跑个案例
        BufferCache cache = new BufferCache(20);
        Thread producer;
        Thread consumer;
        for(int i = 0; i < 5; i++) { //开5个生产者
            producer = new Thread(new Task1(cache, i + ""));
            producer.start();
        }

        for(int i = 0; i < 3; i++) { //开3个消费者
            consumer = new Thread(new Task2(cache));
            consumer.start();
        }
    }
}

为什么条件判断要用while循环,if不行吗?

假如唤醒的是生产者线程A,可能它恢复执行的时候,缓冲区已经被生产者线程B再次填满了,所以它需要再判断一次。

为什么线程恢复的时候还要再获得锁?

获得锁是为了在判断和执行期间,条件不会发生变化。这样代码执行起来才是正确的。再详细点就是,当生产者线程A获得锁的时候,其他生产者线程不能改变缓冲区的状态(因为其他生产者线程如果要改变的话,也要先获得锁),在线程A获得锁到释放锁期间,它看到的状态是不会发生变化的。

这两个等待队列好像跟条件变量很像,这跟条件变量有什么关系?

其实这就是条件变量,条件变量的本质就是一个队列,当条件不满足的时候,就把线程放入这个队列;当条件满足的时候,可以唤醒一个或多个线程,让他们继续执行。

你可以参考JDK中,BlockingQueue的一个实现类ArrayBlockingQueue,看看是不是跟上面的代码很像。

锁和条件变量的关系

一方面,由于条件变量是一个队列,当多线程访问的时候,肯定要保证它的线程安全,所以它一般都会跟一个锁对象有关联。要访问这个队列,必须先获得锁。

另一方面,进行条件判断的时候也离不开锁(保证在判断和执行期间,条件不会发生变化)

所以,条件变量和锁是绑在一块的,或者说条件变量离不开锁。这样看来,JDK中,Condition对象由Lock对象生成就很容易理解了。

Condition x = lock.newCondition();

原文地址:https://www.cnblogs.com/longfurcat/p/11494441.html

时间: 2024-10-18 11:56:12

【杂谈】再看生产-消费模式的相关文章

【重磅干货】看了此文,Oracle SQL优化文章不必再看!

听“俊”一席话,胜读十年书.看了这篇由DBA+社群联合发起人丁俊大师(网名:dingjun123)分享的SQL优化大作,其他Oracle SQL优化文章都不必再看了! 专家简介 丁俊 网名:dingjun123 DBA+社群联合发起人 性能优化专家,Oracle ACEA,ITPUB开发版资深版主.8年电信行业从业经验,在某大型电信系统提供商工作7年,任资深工程师,从事过系统开发与维护.业务架构和数据分析.系统优化等工作.擅长基于ORACLE的系统优化,精通SQL.PL/SQL.JAVA等.电子

Java的多线程实现生产/消费模式

Java的多线程实现生产/消费模式 在Java的多线程中,我们经常使用某个Java对象的wait(),notify()以及notifyAll() 方法实现多线程的通讯,今天就使用Java的多线程实现生产/消费模式,需求如下: 线程A ProductThread 继承Thread 实现生产数据 若线程共享的数据不为NULL,则生产线程进入等待状态 线程B CustomThread 继承Thread 实现消费数据(输出到控制台) 当线程共享数据为NULL的时候,进入等待状态 线程B 消费完数据之后,

透过现象看本质——回头再看Nginx(进程模型、异步非阻塞、源码目录结构)

透过现象看本质--回头再看Nginx Nginx的进程模型 ? 使用过nginx的朋友都知道nginx的性能很高,而其原因可能少有人知.首先,nginx的架构就奠定了其高性能的基础.那么就先来看看nginx的基础架构吧,如下图所示:(不能完全理清楚所有内容也没关系,因为本小节讲述的主要内容是Nginx的进程模型) ? 本小节先来说说Nginx基础架构中的进程模型: ? 所谓进程模型,即Nginx响应请求或服务时程序运行(机器执行指令集)的方式,一般在nginx服务启动后,在Unix系统中会以da

机房收费系统合作——再看数据库设计

机房合作我负责了最简单的D层,接口层,工厂层.反正D层是我来写,于是数据库索性也就顺便设计了.已经是第三次敲机房收费系统了,每次都是相隔半年左右吧.需求搞得透透的了,数据库也就好设计了.基本跟第二次没什么大的区别,就是把Student表和Card表分开了. 重构的时候,我的数据库几乎什么都用到了:事务,存储过程,触发器,视图,联合查询等等.所以,这次设计数据库还是SO Easy的..并且,为了让婵婵和牛迁迁师哥写的方便,我把组合查询都写成了存储过程!!!!费了一番功夫,但是D层简单了不少.还记得

再看GS线程

再看GS线程 void GameServer::ProcessThreadTry() { int nCount = 0; packet rcvPkt; rcvPkt.data = new char[1024 * 100]; //该事件工厂主要创建了两个定时器1.livemgr的检测(即是否是有效的连接)2.道具帮会差异更新数据的获取即定时从道具帮会容器中获取差异需要保存到数据库的这个数据 //3.释放队列的处理,现在玩家下线不是直接把channel删除掉而是放到释放队列中等没有数据库访问时在删除

再看数据库——(2)视图

概念 *是从用户使用数据库的观点来说的. *从一个或多个表(视图)中导出来的 *一个虚表,或者说查询表 为什么要用视图呢? 一是简单,看到的就是需要的.视图不仅可以简化用户对数据的理解,也可以简化他们的操作.那些被经常使用的查询可以被定义为视图,从而使得用户不必为以后的操作每次指定全部的条件. 二是 安全,通过视图用户只能查询和修改他们所能见到的数据,但不能授权到数据库特定行和特定的列上.通过视图,用户可以被限制在数据的不同子集上:使用权限可被限制在另一视图的一个子集上,或是一些视图和基表合并后

再看ftp上传文件

前言 去年在项目中用到ftp上传文件,用FtpWebRequest和FtpWebResponse封装一个帮助类,这个在网上能找到很多,前台使用Uploadify控件,然后在服务器上搭建Ftp服务器,在本地测试程序上传到ftp服务器一点问题都没有,奇怪的是当发布Web和ftp到同一个IIS下,上传文件时程序直接卡死,然后页面卡死,后来我又发现把Web和ftp分开发布在两台机器上问题又得到解决,所以当时放弃了这个方案. 再看ftp上传文件 前几天偶然看到Wolfy写到一个项目总结,其中提到了用Ser

2015第43周六再看长征

今天从网上看到习大大推荐看的十三本书信息,然后先听了里面的长征,然后再看了<长征>电视,把思绪带回了70多年前,也更真切的感受到长征的重要意义.截取里面的几个事件分析下自己的感受: 1.李德博古专权导致了第五次反围剿和湘江战役的惨败,尤其是血染湘江的悲壮场景,红军从8w多损失到了3w多,多少革命战士英勇牺牲.这里有几个问题(1)为什么李德博古会专政,其它人会让他们专政?他们是共产国际派来的,代表苏联成功经验,受过正规军事训练,代表理论文化先进,所以被推崇.(2)为什么会有这两次的惨败.血流成河

再看Ajax

再回顾Ajax相关的内容,再次梳理学习还是很有必要的,尤其是实际的开发中,ajax更是必不可少,仔细学习以便避免不必要的错误. 文章导读: --1.使用XMLHttpRequest---------- 1.1 必备知识点 1.2 send()方法 1.3  再看CORS --2.HTTP请求和响应---------------- 2.1 Request Header中的参数 2.2 Response Header中的参数 2.3 GET请求和POST请求的区别 --3.jQuery中的Ajax-