从生产者消费者窥探线程同步(上)

欢迎转载,转载请注明出处。尊重他人的一丢丢努力,谢谢啦!

阅读本篇之后,如果你觉得说得还有点道理,那不妨先戳一下从生产者消费者窥探线程同步(下) ,两篇一起嚼才更好呢。

最近复习了下生产者消费者模式,虽然对它不太陌生,但要说认认真真地实现,还真从来没有过,这里将它总结一下,有不妥或者见识不到之处,欢迎留言指出。

为什么要使用

大概基于以下2点:

(1)可以实现解耦

大多数设计模式,都会创造出一个第三者来担任解耦角色。比如末班模式的模板类,工厂模式的工厂类等。而消费者观察者模式则是使用拥塞队列来给两者解耦的。解耦之后,生产者和消费者就是两个相对独立的个体,他们之间不再进行直接的交互,而是通过拥塞队列来中转完成。

(2)线程安全

既然提到了拥塞队列,肯定就少不了并发问题,就少不了线程安全。更具体一点来说,整个安全控制要做到以下6点:

1 同一时间只有一个生产者生产;
2 同一时间只有一个消费者消费;
3 生产的同时不能消费;
4 消费的同时不能生产;
5 拥塞队列为空,不能消费;
6 拥塞队列为满,不能生产;

总之,就是一个时间点,只能进行一种活动。

实现方法

从它的特点来看,要想通过不同的实现方式,必然要在线程安全这一块花点心思。代码层面的线程同步,主要有三种实现方式:阻塞,非阻塞和一些不需要同步方案的代码(本身就是安全的)。而在这三种方式中,使用最多的恐怕要数阻塞方式,也就是互斥同步,网上一些博文对这个概念似乎有偏颇之嫌,这里先明确一下两个概念:互斥和同步。

所谓的互斥,就是互斥同步(下文简称互斥),它是实现同步的一种阻塞方案,互斥是方法,同步是目的。它们两个并不是并列关系,而应该算是一种因果关系。

互斥的实现方式包括临界区Critical Section,互斥量Metux,信号量Semaphore,当然还有伟大的synchronized、以及Java 5以后提供的Lock等等。

网上大多数实现都是synchronized、Lock、BlockingQueue这三总方式,毋庸置疑,这三种方式确实用的比较多。值得指出的是,通过Semaphore和Metux的PV操作,同样可以达到目的。

实现

前面已经说过,设计模式大多数都是奔着解耦去的,能使一团糟糕的代码变得条理清晰。在上代码不妨先来看一下程序结构:

主要四个文件构成,各自的作用如名字所示:

(1)首先得有产品吧,不然生产毛线,对应Product;

(2)有了产品,自然就有生产者和消费者,对应Producer和Consumer,实质是两个线程;

(3)有了生产者消费者,如何实现他们之间的交互,也就是怎么解决何时生产何时该消费呢,这就用到了前面说过的拥塞队列,对应StorageQueue。

惯例,到了能上代码就不说话环节。

//定义产品
public class Product {
    long id;
    String name;

    public Product(long id, String name) {
        super();
        this.id = id;
        this.name = name;
    }

    @Override
    public String toString() {
        return "产品 详情:[id= " + id + " , name= " + name + "]";
    }
}

//消费者
public class Consumer extends Thread {
    int num;

    public Consumer(int num) {
        super();
        this.num = num;
    }

    @Override
    public void run() {
        // TODO Auto-generated method stub
        super.run();
        //注意,不同的实现,这里需要更改为对应的仓库
        StorageLock.consume(num);
    }
}

//生产者
public class Producer extends Thread {
    int num;
    public Producer(int num) {
        // TODO Auto-generated constructor stub
        this.num = num;
    }

    @Override
    public void run() {
        // TODO Auto-generated method stub
        super.run();
        //注意,不同的实现,这里需要更改为对应的仓库
        StorageLock.produce(num);
    }
}

(1)首先用最简单的BlockQueue实现

核心:拥塞控制完全交给BlockQueue来实现,这个队列内部用到了可重入锁的await()和singal()方法,队列满时,再存放就阻塞;队列空时,再取就阻塞。

值得注意,BlockQueue有两套(实际上是三套)存取的方法,分别是put()和take()、offer()和poll()。它们对着应不同的处理策略,说白了就是当队列满时,调用put()方法会阻塞,一直等到队列有空闲然后将元素放进去。而后者offer()不会等待,而是直接丢弃,返回false,它看起来更像是add()方法的线程安全版!!!自己动手写的时候,一定要注意。

public class StorageQueue {
    public static Integer MAX = 50;
    public static ArrayBlockingQueue<Product> list = new ArrayBlockingQueue<>(
            MAX);

    public static void main(String[] args) {
        // TODO Auto-generated method stub
        ExecutorService s = Executors.newCachedThreadPool();
        for (int i = 0; i < 20; i++) {
            s.submit(new Producer(2));
            s.submit(new Consumer(1));
        }
    }

    public static void produce(Integer num) {
        if (list.size() == MAX) {
            System.out.println(Thread.currentThread().getName()
                    + " 我是生产,我在等待... ");
        }
        try {
            for (int i = 0; i < num; i++) {
                list.put(new Product(i, ""));
            }
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        System.out.println(Thread.currentThread().getName() + " 库存: "
                + list.size());
    }

    public static void consume(Integer num) {
        if (list.size() == 0) {
            System.out.println(Thread.currentThread().getName()
                    + " 我是消费,我在等待... ");
        }
        try {
            for (int i = 0; i < num; i++) {
                list.take();
            }
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        System.out.println(Thread.currentThread().getName() + " 库存: "
                + list.size());
    }
}

这是部分输出,考虑到篇幅,就不全部贴上了,后文附有源码下载链接,感兴趣的可自行下载运行:

pool-1-thread-1 库存: 2
pool-1-thread-2 库存: 1
pool-1-thread-4 库存: 0
pool-1-thread-5 库存: 3
pool-1-thread-1 库存: 1
pool-1-thread-2 库存: 2
pool-1-thread-6 库存: 2
...

(2)其次是经典的synchronized实现

核心:没什么好说的,主要是使用对象的notify()和wait()来实现线程间通信。可以用同步方法或者同步代码块,这里采用的是同步代码块。

public class StorageSync {
    public static Integer MAX = 50;
    public static List<Product> list = new ArrayList<>();

    public static void main(String[] args) {
        // TODO Auto-generated method stub
        ExecutorService s = Executors.newCachedThreadPool();
        for (int i = 0; i < 100; i++) {
            s.submit(new Producer(50));
            s.submit(new Consumer(5));
        }
    }

    public static void produce(Integer num) {
        synchronized (list) {
            AtomicInteger m = new AtomicInteger(0);
            while (list.size() + num > MAX) {
                // if (list.size() + num > MAX) {
                try {
                    m.addAndGet(1);
                    System.out.println(Thread.currentThread().getName() + " 阻塞"
                            + " m: " + m);
                    System.out.println("要生产的数量:" + num + "\t库存量:" + list.size()
                            + "\t暂时不能执行生产任务!");
                    list.wait();
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
            // else {
            for (int i = 0; i < num; i++) {
                list.add(new Product(i, ""));
            }
            System.out.println(Thread.currentThread().getName() + " m: " + m);
            System.out.println("已经生产数:" + num + "\t现仓储量为:" + list.size());
            list.notify();
            // }
        }
    }

    public static void consume(Integer num) {
        synchronized (list) {
            // if (num > list.size()) {
            while (num > list.size()) {
                try {
                    System.out
                            .println(Thread.currentThread().getName() + " 阻塞");
                    System.out.println("要消费的数量:" + num + "\t库存量:" + list.size()
                            + "\t暂时不能执行消费任务!");
                    list.wait();
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
            // } else {
            for (int i = 0; i < num; i++) {
                list.remove(0);
            }
            System.out.println(Thread.currentThread().getName());
            System.out.println("已经消费数:" + num + "\t现仓储量为:" + list.size());
            list.notifyAll();
            // }
        }
    }
}

部分运行结果:

pool-1-thread-1 m: 0
已经生产数:50    现仓储量为:50
pool-1-thread-1 阻塞 m: 1
要生产的数量:50   库存量:50  暂时不能执行生产任务!
pool-1-thread-2
已经消费数:5 现仓储量为:45
pool-1-thread-1 阻塞 m: 2
要生产的数量:50   库存量:45  暂时不能执行生产任务!
pool-1-thread-2
已经消费数:5 现仓储量为:40
pool-1-thread-1 阻塞 m: 3
要生产的数量:50   库存量:40  暂时不能执行生产任务!
pool-1-thread-2 阻塞 m: 1
要生产的数量:50   库存量:40  暂时不能执行生产任务!
pool-1-thread-65
已经消费数:5 现仓储量为:35
...

注意代码中的AtomicInteger m是我用来跟踪线程状态的变量,表次该线程阻塞的次数,完全可以删去。

不知道你有没有这样的疑问:代码中使用了这样的循环语句while (list.size() + num > MAX),为什么不用if(list.size() + num > MAX)来判断呢?这里怎么看都应该是个顺序控制,而不应该是个循环呀?再说了,在执行到list.wait();之后,线程不是阻塞了吗?后面的for循环语句自然就不会执行,为什么还要用while()来循环判断,岂不多余?

乍一听,上面的分析确实”蛮有道理”,而且我相信,大多数人第一次写的时候,很容易就想到if上来了。我们不妨先顺着这思路写一下,看看有什么后果。将代码中的if语句屏蔽去掉。

public static void produce(Integer num) {
        synchronized (list) {
            AtomicInteger m = new AtomicInteger(0);
            // while (list.size() + num > MAX) {
            if (list.size() + num > MAX) {
                try {
                    m.addAndGet(1);
                    System.out.println(Thread.currentThread().getName() + " 阻塞"
                            + " m: " + m);
                    System.out.println("要生产的数量:" + num + "\t库存量:" + list.size()
                            + "\t暂时不能执行生产任务!");
                    list.wait();
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
            // else {
            for (int i = 0; i < num; i++) {
                list.add(new Product(i, ""));
            }
            System.out.println(Thread.currentThread().getName() + " m: " + m);
            System.out.println("已经生产数:" + num + "\t现仓储量为:" + list.size());
            list.notify();
            // }
        }
    }

    public static void consume(Integer num) {
        synchronized (list) {
            if (num > list.size()) {
                // while (num > list.size()) {
                try {
                    System.out
                            .println(Thread.currentThread().getName() + " 阻塞");
                    System.out.println("要消费的数量:" + num + "\t库存量:" + list.size()
                            + "\t暂时不能执行消费任务!");
                    list.wait();
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
            // } else {
            for (int i = 0; i < num; i++) {
                list.remove(0);
            }
            System.out.println(Thread.currentThread().getName());
            System.out.println("已经消费数:" + num + "\t现仓储量为:" + list.size());
            list.notifyAll();
            // }
        }
    }

部分运行结果:

pool-1-thread-1 m: 0
已经生产数:50    现仓储量为:50
pool-1-thread-2
已经消费数:5 现仓储量为:45
pool-1-thread-2 阻塞 m: 1
要生产的数量:50   库存量:45  暂时不能执行生产任务!
pool-1-thread-1 阻塞 m: 1
要生产的数量:50   库存量:45  暂时不能执行生产任务!
pool-1-thread-3 阻塞 m: 1
要生产的数量:50   库存量:45  暂时不能执行生产任务!**
pool-1-thread-4
已经消费数:5 现仓储量为:40
pool-1-thread-3 m: 1
已经生产数:50    现仓储量为:90**
pool-1-thread-1 m: 1
已经生产数:50    现仓储量为:140
pool-1-thread-2 m: 1
已经生产数:50    现仓储量为:190
...

what?丑旦,你的仓库容量都飙到190啦?这还了得!汗…

冷静下来,先分析一下原因。不妨从有代表性的pool-1-thread-3入手,根据输出的结果来看,第一下pool-1-thread-3阻塞的时候,它的输出是正常的,即打印了要生产的数量:50 库存量:45 暂时不能执行生产任务!,然而第二次阻塞的时候,明知道仓库容不下,Mr pool-1-thread-3 先生还是不听话地生产了50个,一下子就爆仓了。

为什会这样呢?让我们暂且回到代码的结构上看一下。

根据这个图,再来重现一下上面的情况。

我们的Mr pool-1-thread-3 先生运气不太好,仓库已经满了,不能大展拳脚进行生产,只得乖乖滴执行了list.wait();,放弃手中所持有的同步锁,目前处于阻塞状态(第一次阻塞)。注意虽然Mr pool-1-thread-3 先生已经放弃了同步锁,但他此时仍然停留在方法内部,只是暂时丧失了获得锁的权利,直到一个notify来临。得,那Mr pool-1-thread-3 先生,你先凉快一会儿去吧。

接下来,该我们的Miss pool-1-thread-4 小姐登场。仓库已满,只是消费的时候。Miss pool-1-thread-4 小姐毫不客气地消费了5个资源之后,爽快地抛了一个notifyAll()。

处于阻塞状态的每一双眼睛立刻闪现出欲望的光芒。原来救世主是你,我们的Mr pool-1-thread-3 先生心下大喜,立刻伸出双手。哎,你别说,再搓比的屌丝也有春天,再倒霉的线程也有狗屎运的时候。托总管JVM的福,Mr pool-1-thread-3 先生拿到了这把锁,他立刻从原地(也就是list.wait()这一行)出发,高歌猛进。

不好,一下子就进到了for循环里面,仓库就崩塌了,悲剧就发生了…

更可怕的时,Mr pool-1-thread-3 先生完全没有意识到自己闯下的大祸,在一顿愉快的生产之后,竟然也大手一挥,扮演起救世主的角色,继续抛出notify(),后面的情况可想而知。

反应快的同学可能要说了,为什么不在for循环前面加上一个else呢?

这是另外一个问题了。

不妨先把代码修改一下,看看有什么现象。

确实,一切”正常”,并没有爆仓,所有的生产、消费好像都没什么问题。但是细心的话,你会发现我这里打印的”m”的值,要么是0要么是1,永远不会有别的值。而使用while的则不一样。

至于这种想现象,还要回到代码上来看。

同样的,我们Mr pool-1-thread-3 先生拿到了Miss pool-1-thread-4 小姐释放的同步锁,他立刻从原地(也就是list.wait()这一行)出发,高歌猛进。

不好,下面是else分支,我进不去啊,而下面也没有可让我执行的语句了,Mr pool-1-thread-3 先生暗暗叫苦,吐血三升,倒地而亡,当然在蹬腿之前,他还要把自己辛苦拿到的锁释放出来…

这样也没什么问题呃,Mr pool-1-thread-3 先生生产不了,还有下一个Mr pool-1-thread-3 先生呢?

真的是这样吗?

Miss pool-1-thread-4 小姐发出的唤醒,本意是给那些阻塞在生产线,希望继续生产的优质男们,结果优质男们拿到锁之后并不能生产,而是直接挂了。对,就是挂了。而最终执行生产的都是那些一开始并没有被阻塞起来的线程。

这样看起来,有点类似于非阻塞的同步控制但并不是(遗忘的请自行回到篇首复习,说白了,就是一个线程先进行操作,如果没有发生竞争,那就成功了;如果发生竞争,这个线程就不断地重试,直到成功)。事实上,这种做法,选择的是一种抛弃策略,就是一个线程无法生产,那就放弃它,让下一个线程来尝试生产,直到仓库存满为止。造成的后果,就是资源浪费,想生产的不能保证都能生产到,该消费的也不保证都能消费到。

反观,while()则巧妙地化解了这个问题。

依然是,我们Mr pool-1-thread-3 先生拿到了Miss pool-1-thread-4 小姐释放的同步锁,他立刻从原地(也就是list.wait()这一行)出发,高歌猛进。

不好,Mr pool-1-thread-3 先生发现自己依然处在循环之中,要想出去,必须得满足判断条件。于是,他开始计算list.size() + num > MAX是否成立,运气好的话(不成立),跳出循环,开始愉快地生产。运气不好(成立),依然被圈在while()里,只得再次执行了list.wait();,释放同步锁,等待下一个救世主的到来…

所以我们也能看到,使用while()的程序输出的m值是不确定的,而且一个线程对应的m值,会呈现出增长的态势,也说它的状态是唤醒–等待–唤醒–等待…想生产而不得,委屈ing…也从侧面吻合了我们的分析。JDK源码中类似的并发控制,也都是用的while(),所有以后就放心地使用它吧。

如果你对这个地方的写法还有疑问,一定要自己把程序跑起来,对着输出分析一下。

后两种实现,我们放在下一篇讲。

时间: 2025-01-16 14:40:59

从生产者消费者窥探线程同步(上)的相关文章

从生产者消费者窥探线程同步(下)

欢迎转载,转载请注明出处.尊重他人的一丢丢努力,谢谢啦! 阅读本篇之前,如果你还没有看过从生产者消费者窥探线程同步(上) ,那不妨先戳一下,两篇一起嚼才更好呢. 上一篇分析了使用BlockQueue和synchronized来实现生产者消费者模式.这一篇来看一下其他的实现,闲言少叙. (3)Lock实现 核心:Lock的用法中规中矩,有点类似于非静态同步方法,只是前者是对lock对象显式加锁,而后者是对当前对象隐式加锁. 我相信大多数人在第一次接触Lock锁的时候,内心都会有这样的疑惑:明明提供

Windows 互斥对象在线程同步上的运用

互斥对象在线程同步时的使用 1 多线程在资源共享的时候出现的问题 在程序中如果不同线程对同一个对象进行操作的话就有可能出现因为线程切换而导致的问题.例如下面的程序 #include <stdio.h> #include <WinSock2.h> #include <iostream> using namespace std; #pragma comment(lib,"ws2_32.lib") DWORD WINAPIfun1Proc(LPVOID l

c++11 条件变量 生产者-消费者 并发线程

http://baptiste-wicht.com/posts/2012/04/c11-concurrency-tutorial-advanced-locking-and-condition-variables.html ? 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 struc

进程,互斥锁,生产者消费者,线程

进程,互斥锁,生产者消费者,线程 一.僵尸进程与孤儿进程 代码演示 ''' 僵尸进程(有坏处): - 在子进程结束后,主进程没有正常结束,子进程的PID不会被回收. 缺点: - 操作系统中PID号是有限的,比如子进程PID号无法正常回收,则会占用PID号. - 资源浪费 - 若PID号满了,则无法创建新的进程. 孤儿进程(没有坏处): - 在子进程没有结束时,主进程没有"正常结束",子进程PID不会被回收. - 操作系统优化机制(孤儿院): 当主进程意外终止,操作系统会检测是否有正在运

VB.net学习笔记(二十七)线程同步上

X夫妇二人试图同时从同一账户(总额1000)中支取1000.由于余额有1000,夫妇各自都满足条件,于是银行共支付2000.结果是银行亏了1000元.这种两个或更多线程试图在同一时刻访问同一资源来修改其状态,并产生不良后果的情况被称做竞争条件. 为避免竞争条件,需要使Withdraw()方法具有线程安全性,即在任一时刻只有一个线程可以访问到该方法. 一.线程同步 多个线程读或写同一资源,就会造成错漏状况,这时就需要线程同步.同步就是协同步调,按预定的先后次序进行运行.如:你说完,我再说.线程A与

生产者消费者模型 线程池

1.生产者消费者模型 主要是为解耦 借助队列来实现生产者消费这模型 栈:先进后出(First In Last Out 简称:FILO) 队列:先进先出(FIFO) import queue from multiprocessing import Queue 借助Queue解决生产者消费这模型队列是安全的 q=Queue(m) q = Queue(num) num : 队列的最大长度 q.get()# 阻塞等待获取数据,如果有数据直接获取,如果没有数据,阻塞等待 q.put()# 阻塞,如果可以继

Operating System Concepts 项目:生产者-消费者问题 线程

一. 实验目的 实现一个c程序,该程序能模拟解决有限缓冲问题,其中消费者和生产者产生和消耗随机数 二.实验内容 缓冲区 元数据类型为buffer_item,大小为1000的数组,按环形队列处理 生产者和消费者线程 生产者不断执行如下两个操作:消费一个随机数,生产两个随机数 消费者不断执行如下两个操作:生产一个随机数,消费两个随机数 3.Pthead线程创建 使用pthread_create创建5个生产者线程,5个消费者线程,主程序等待所有线程退出 三. 实验环境 Ubuntu Gnome 14.

OS 多个生产者--消费者间线程通信

class ProducerConsumeerDemo { public static void main(String[] args) { Resource r = new Resource(); Producer pro = new Producer(r); Consumer con = new Consumer(r); Producer pro2 = new Producer(r); Consumer con2 = new Consumer(r); Thread t1 = new Thre

Java线程与并发编程实践----等待通知(生产者消费者问题)线程

Java提供了一套API来支持线程之间的交互.在Object类中提供了一套等待通知的API  wait()     notify()     notifyAll()     此处要注意的是,绝不要在循环外面调用wait()方法.(单独开一片文章来讨论)     下面使用消费者与生产者问题来展示以上API的使用: package xiancheng; public class PC { public static void main(String[] args) { Shared s = new