并发编程 17—— 使用内置条件队列实现简单的有界缓存

并发编程 01—— ConcurrentHashMap

并发编程 02—— 阻塞队列和生产者-消费者模式

并发编程 03—— 闭锁CountDownLatch 与 栅栏CyclicBarrier

并发编程 04—— Callable和Future

并发编程 05—— CompletionService : Executor 和 BlockingQueue

并发编程 06—— 任务取消

并发编程 07—— 任务取消 之 中断

并发编程 08—— 任务取消 之 停止基于线程的服务

并发编程 09—— 任务取消 之 关闭 ExecutorService

并发编程 10—— 任务取消 之 “毒丸”对象

并发编程 11—— 任务取消与关闭 之 shutdownNow 的局限性

并发编程 12—— 线程池的使用 之 配置ThreadPoolExecutor 和 饱和策略

并发编程 13—— 线程池 之 整体架构

并发编程 14—— 线程池 之 原理一

并发编程 15—— 线程池 之 原理二

并发编程 16—— Lock

并发编程 17—— 使用内置条件队列实现简单的有界缓存

概述

第1 部分 状态依赖

第2 部分 实例

参考

第1 部分 状态依赖与条件队列

  在单线程程序中调用一个方法时,如果某个基于状态的前提条件未得到满足(例如“连接池必须非空”),那么这个条件将永远无法成真。因此,在编写顺序程序中的类时,要使得这些类在它们的前提条件未被满足时就失败。但在并发程序中,基于状态的条件可能会由于其他线程的操作而改变:一个资源池可能在几条指令之前还是空的,单现在却变成非空的,虽然有时候在前提条件不满足不会失败,单通常有一种更好的选择,即等待前条件变成真。

  依赖状态的操作可以一直阻塞直到可以使线程一直阻塞,这比使它们先失败再实现起来要更为方便且更不易出错。内置的条件队列可以使线程继续执行,直到对象进入某个进程可以继续执行的状态,并且当被阻塞的线程可以执行时在唤醒它们。

  可阻塞的状态依赖操作的形式如下所示。这种加锁模式有些不同寻常,因为锁是在操作的执行过程中被释放与重新获取的。构成前提条件的状态变量必须由对象的锁来保护,从而使它们在测试前提条件的同时保持不变。否则,前提条件就永远无法变成真。在再次测试前提条件之前,必须重新获得锁。

acquire lock on object state
    while (precondition does not hold){
        release lock
        wait until precondition might hold
        optionally fail if interrupted or timeout expires
        reacquire lock
    }
    preform action
        release lock

  在线程协作方面经典的案例“生产者-消费者”。在生产者和消费者之间有一个循环的传送带,生产者生产出产品消费者才能消费,消费者将传送带上的产品有消费,生产者才能生产新产品并放到传送带上。对于传送带来讲,它是生产者和消费者共享的,而且有满和空两种状态,如果满了则生产者需要停下来,如果空了消费者也没有可消费的产品。实际上,传送带既然是两者共享,则需要加锁使用保证线程安全和状态一致性,而生产者和消费者又同时依赖于传送带的状态。那么生产者或者消费者发现传送带状态不满足的情况下,需要释放锁,因为只有这样才能让对方来处理,只有这样才能使不满足的状态得到改变,有机会满足所需的状态。

一个简单的解决方案就是循环检查状态是否满足。假如有两个线程P和C,分别代表生产者和消费者,而一个数据结构Q代表传送带。对于P,需要获得Q的锁,循环判断Q是否为满,未满则可以继续执行,否则释放锁,继续循环判断。C也如此,只不过条件是Q未空。

对于处理器来说,循环的判断是十分消耗计算资源的。为了解决这个问题,我们可以让线程P和C每次尝试失败后释放锁并等待一段时间,等计时器到了之后再重新尝试获取锁并判断。这样至少看起来比前一种更有效果,但有至少有如下两个问题:

  • 在通常的实现中,线程不断的重复切换和睡眠唤醒状态的调度是对性能有损耗的
  • 睡眠(等待)时间的长短设定是一个问题,过短则和前一种情况没什么区别,徒增了一些调度开销,而过长会出现响应度低,延迟过长

那么有没有一种更有效的解决方案使得这装状态依赖的线程间协作更有效率呢?那就是条件队列。当一个线程发现自己不满足条件时,将其挂载到某个条件下的队列中,直到条件满足时得到系统的通知。这样的方式就避免了对线程不必要的唤醒、锁获取和检查。当然,要做到这些需要底层的支持。在java.lang.Object中,采用native的方式实现了wait()/notify()/notifyAll()方法。这三个方法结合操作系统的实现给我们使用条件队列提供了方便。wait()所做的就是释放锁,等待条件发生,当前线程阻塞。notify()/notifyAll()则是通知满足条件的队列中的线程,将其唤醒。

第2 部分 实例

  内置锁和内置条件队列一起,一个简单的应用是创建可阻塞的有界缓存区,java并发包的BlockingQueue就是一个利用Lock和显式条件队列实现的可阻塞的有界队列。总结内置锁和内置条件的原理,这里我们用另一种方式实现简单的可阻塞缓存。源码如下:

首先,创建一抽象有界缓存类ABoundedBuffer,提供插入和删除的基本实现。

 1 /**
 2  * 有界缓存抽象类
 3  * @ClassName: ABoundedBuffer
 4  * TODO
 5  * @author xingle
 6  * @date 2015-1-15 下午2:08:13
 7  */
 8 public class ABoundedBuffer<V> {
 9
10     private final V[] buf;
11     private int tail;
12     private int head;
13     private int count;
14
15     protected ABoundedBuffer(int capacity){
16         this.buf = (V[]) new Object[capacity];
17     }
18
19     protected synchronized final void doPut(V v){
20         buf[tail] = v;
21         if(++tail==buf.length){
22             tail = 0;
23         }
24
25         ++count;
26     }
27
28     protected synchronized final V doTake(){
29         V v = buf[head];
30         buf[head] = null;
31         if(++head==buf.length){
32             head = 0;
33         }
34         --count;
35         return v;
36     }
37
38     public synchronized final boolean isFull(){
39         return count == buf.length;
40     }
41
42     public synchronized final boolean isEmpty(){
43         return count == 0;
44     }
45
46 }

  其次,利用内置条件队列,编写子类实现可阻塞的插入和删除操作。插入操作,依赖的条件是缓存非满,当条件不满足时,调用wait方法挂起线程,一旦插入成功,说明缓存非空,则调用notifyAll方法唤醒等待非空的线程。删除操作,依赖的条件是非空,当条件不满足时,同样挂起等待,一旦删除成功,说明缓存非满,唤起等待该条件的线程。简单的源码如下:

 1 /**
 2  *
 3  * @ClassName: InnerConditionQueue
 4  * 使用内置条件队列,实现简单的有界缓存
 5  * 通过对象的wait和notify来实现挂起
 6  * 锁对象是this,调用wait/notify的对象是同一个对象。
 7  * 三元关系(锁、wait/notify、条件谓词)
 8  * 线程从wait中被唤醒时,并不代码条件谓词为真,此时还是需要再判断条件。所以必须在循环中调用wait;每次醒来时都判断谓词的真假。
 9  * @author xingle
10  * @date 2015-1-15 下午2:13:55
11  */
12 public class InnerConditionQueue<V> extends ABoundedBuffer<V> {
13
14     /**
15      * @param capacity
16      */
17     protected InnerConditionQueue(int capacity) {
18         super(capacity);
19         // TODO Auto-generated constructor stub
20     }
21
22     public synchronized void put(V v) throws InterruptedException{
23         while(isFull()){
24             System.out.println(new Date()+" buffer 满了, thread wait:"+Thread.currentThread().getName());
25             wait();
26         }
27         doPut(v);
28         System.out.println(new Date()+" "+Thread.currentThread().getName()+" 放入 :"+v+" ");
29         notifyAll();
30     }
31
32     public synchronized V take() throws InterruptedException {
33         while(isEmpty()){
34             System.out.println(new Date()+" buffer 为空, thread wait:"+Thread.currentThread().getName());
35             wait();
36         }
37
38         notifyAll();
39         //每当在等待一个条件时,一定要确保在条件谓词变为真时,通过某种方式发出通知
40         V v = doTake();
41         System.out.println(new Date()+" "+Thread.currentThread().getName()+" 取出 :"+v);
42         return v;
43     }
44
45 }

  最后,编写测试代码,创建一个大小为2的缓冲区,启动三个线程执行插入操作,当第三个线程执行时会因为缓存已满而挂起,主线程删除两个记录后,等待线程被唤醒成功插入。当缓存空的时候,之后的删除操作将被阻塞直到有新的记录插入为止。测试代码如下:

 1 public class ConditionQueueTest {
 2
 3     public static void main(String[] args){
 4         final InnerConditionQueue<String> buf = new InnerConditionQueue<String>(2);
 5
 6         Thread t1 = new Thread(new Runnable() {
 7
 8             @Override
 9             public void run() {
10                 try {
11                     buf.put("hello1");
12                 } catch (InterruptedException e) {
13                     System.out.println("intercetp1:"+Thread.currentThread().getName());
14                     e.printStackTrace();
15                 }
16
17             }
18         });
19
20         Thread t2 = new Thread(new Runnable() {
21
22             @Override
23             public void run() {
24                 try {
25                     buf.put("hello2");
26                 } catch (InterruptedException e) {
27                      System.out.println("intercetp2:"+Thread.currentThread().getName());
28                     e.printStackTrace();
29                 }
30             }
31         });
32
33         Thread t3 = new Thread(new Runnable() {
34
35             @Override
36             public void run() {
37                 try {
38                     buf.put("hello3");
39
40                     Thread.sleep(5000);
41                     buf.put("last one...");
42                 } catch (InterruptedException e) {
43                     System.out.println("intercetp3:"+Thread.currentThread().getName());
44                     e.printStackTrace();
45                 }
46
47             }
48         });
49
50         t1.start();
51         t2.start();
52         t3.start();
53
54         try {
55             Thread.sleep(5000);
56             buf.take();
57             buf.take();
58             buf.take();
59             buf.take();
60         } catch (InterruptedException e) {
61             e.printStackTrace();
62         }
63
64         System.out.println(new Date()+" main over...");
65     }
66
67 }

执行结果:



参考

1.《java 并发编程实战》 14.1

2.Java线程安全杂谈(下)——锁、状态依赖与协同以及锁优化

时间: 2024-08-27 06:35:34

并发编程 17—— 使用内置条件队列实现简单的有界缓存的相关文章

java并发-使用内置条件队列实现简单的有界缓存

内置锁和内置条件队列一起,一个简单的应用是创建可阻塞的有界缓存区,java并发包的BlockingQueue就是一个利用Lock和显式条件队列实现的可阻塞的有界队列.总结内置锁和内置条件的原理,这里我们用另一种方式实现简单的可阻塞缓存.源码如下: 首先,创建一抽象有界缓存类ABoundedBuffer,提供插入和删除的基本实现. /** * @title :ABoundedBuffer * @description :有界缓存抽象类 * @update :2014-12-30 上午9:29:33

Java并发编程(十一):阻塞队列(转载)

本文转载自:http://www.cnblogs.com/dolphin0520/p/3932906.html 在前面几篇文章中,我们讨论了同步容器(Hashtable.Vector),也讨论了并发容器(ConcurrentHashMap.CopyOnWriteArrayList),这些工具都为我们编写多线程程序提供了很大的方便.今天我们来讨论另外一类容器:阻塞队列. 在前面我们接触的队列都是非阻塞队列,比如PriorityQueue.LinkedList(LinkedList是双向链表,它实现

python并发编程基础之守护进程、队列、锁

并发编程2 1.守护进程 什么是守护进程? 表示进程A守护进程B,当被守护进程B结束后,进程A也就结束. from multiprocessing import Process import time ? def task(): print('妃子的一生') time.sleep(15) print('妃子死了') ? if __name__ == '__main__': fz = Process(target=task) fz.daemon = True #将子进程作为主进程的守护进程.必须在

并发编程【四】锁和队列及生产者消费模型

1.锁multiprocessing-Lock 锁的应用场景:当多个进程需要操作同一个文件/数据的时候: 当多个进程使用同一份数据资源的时候,就会引发数据安全或顺序混乱问题. 为保证数据的安全性,多进程中只有去操作一些进程之间可以共享的数据资源的时候才需要进行加锁: 枷锁可以保证多个进程修改同一块数据时,同一时间只能有一个任务进行修改,即串行的修改,没错速度式慢了,但牺牲了速度却保证了数据的安全: 模拟查票抢票: import json import time from multiprocess

17、内置变量、特殊变量、字符操作、声明变量 学习笔记

1.bash的内置变量 $PATH            环境变量路径 $HOSTNAME        系统主机名 $UID             系统当前用户ID $HISTFILE        历史文件存放路径 $HISTSIZE        系统可以保存的历史记录条目数 $HISTFILESIZE    历史文件可以保存的历史记录条目数 $HISTCONTROL     历史命令显示控制 $BASH            bash二进制程序文件的路径 $BASH_SUBSHELL 

17.python内置函数2

python内置函数1:https://www.cnblogs.com/raitorei/p/11813694.html # max,min高级玩法 # l=[1,3,100,-1,2] # print(max(l)) # print(min(l)) # # # print(list(zip(('a','n','c'),(1,2,3)))) # print(list(zip(('a','n','c'),(1,2,3,4)))) # print(list(zip(('a','n','c','d')

JS中的内置对象简介与简单的属性方法

JS中的数组: 1.数组的概念: 数组是在内存中连续存储的多个有序元素的结构,元素的顺序称为下标,通过下标查找对应元素 2.数组的声明: ①通过字面量声明var arr1 = [,,,,] JS中同一数组可以储存多种不同的数据类型(但,同一数组一般只用于存放同种数据类型) 例如var arr1 = [1,"2",true,{"name":"啦啦啦"},[1,2]]; ②通过new关键字声明:var arr2 = new Array(参数); &g

利用PHP内置函数制作一个简单的验证码

因为这两天学习了一些PHP的内置函数,所以今天就用一些内置函数配合数组来简单的制作一个随机验证码的效果. 例如:2dT5     T22c.... 分析:首先分析验证码的组成: 1.验证码是由数字1-9,大写字母A-Z,小写字母a-z 中随机生成的. 2.我先创建一个包含指定范围单元的数组.(这里应该是三个:数字,大写字母,小写字母). 3.我可以将这些数组合并成一个大的数组 4.随机打乱该函数.ps:其实觉得在这里再做一步将数组随机打乱,感觉也没有什么必要啊!因为后面我们做的不也是随机抽取吗?

java并发编程(十八)阻塞队列和阻塞栈

阻塞队列 阻塞队列是Java 5并发新特性中的内容,阻塞队列的接口是java.util.concurrent.BlockingQueue,它有多个实现类:ArrayBlockingQueue.DelayQueue.LinkedBlockingQueue.PriorityBlockingQueue.SynchronousQueue等,用法大同小异,具体可查看JDK文档,这里简单举例看下ArrayBlockingQueue,它实现了一个有界队列,当队列满时,便会阻塞等待,直到有元素出队,后续的元素才