第十四章:构建自定义的同步工具——Java并发编程实战

一、状态依赖性管理

  • 对于单线程程序,某个条件为假,那么这个条件将永远无法成真
  • 在并发程序中,基于状态的条件可能会由于其他线程的操作而改变

 1 acquire lock on object state
 2 while (precondition does not hold)
 3 {
 4     release lock
 5     wait until precondition might hold
 6     optionally fail if interrupted or timeout expires
 7     reacquire lock
 8 }
 9 perform action
10 release lock

可阻塞的状态依赖操作的结构

 1 //有界缓存实现的基类
 2 public abstract class BaseBoundedBuffer<V> {
 3     private final V[] buf;
 4     private int tail;
 5     private int head;
 6     private int count;
 7
 8     protected BaseBoundedBuffer(int capacity){
 9         this.buf = (V[]) new Object[capacity];
10     }
11
12     protected synchronized final void doPut(V v){
13         buf[tail] = v;
14         if (++tail == buf.length){
15             tail = 0;
16         }
17         ++count;
18     }
19
20     protected synchronized final V doTake(){
21         V v = buf[head];
22         buf[head] = null; //let gc collect
23         if (++head == buf.length){
24             head = 0;
25         }
26         --count;
27         return v;
28     }
29
30     public synchronized final boolean isFull(){
31         return count == buf.length;
32     }
33
34     public synchronized final boolean isEmpty(){
35         return count == 0;
36     }
37 }

1、示例:将前提条件的失败传递给调用者

 1 public class GrumyBoundedBuffer<V> extends BaseBoundedBuffer<V> {
 2     public GrumyBoundedBuffer(int size){
 3         super(size);
 4     }
 5
 6     public synchronized void put(V v){
 7         if (isFull()){
 8             throw new BufferFullException();
 9         }
10         doPut(v);
11     }
12
13     public synchronized V take(){
14         if (isEmpty())
15             throw new BufferEmptyExeption();
16         return doTake();
17     }
18 }

当不满足前提条件时,有界缓存不会执行相应的操作

缺点:已满情况不应为异常;调用者自行处理失败;sleep:降低响应性;自旋等待:浪费CPU;yield让出CPU

2、示例:通过轮询与休眠来实现简单的阻塞

 1 public class SleepyBounedBuffer<V> extends BaseBoundedBuffer<V> {
 2     private static long SLEEP_TIME;
 3     public SleepyBounedBuffer(int size) {
 4         super(size);
 5     }
 6
 7     public void put(V v) throws InterruptedException{
 8         while (true){
 9             synchronized(this){
10                 if (!isFull()){
11                     doPut(v);
12                     return;
13                 }
14             }
15             Thread.sleep(SLEEP_TIME);
16         }
17     }
18
19     public V take() throws InterruptedException{
20         while (true){
21             synchronized(this){
22                 if (!isEmpty()){
23                     return doTake();
24                 }
25             }
26             Thread.sleep(SLEEP_TIME);
27         }
28     }
29 }

“轮询与休眠“重试机制

优点:对于调用者,无需处理失败与异常,操作可阻塞,可中断(休眠时候不要持有锁)

缺点:对于休眠时间设置的权衡(响应性与CPU资源)

3、条件队列——使得一组线程(称之为等待线程集合)能够通过某种方式来等待特定的条件变成真(元素是一个个正在等待相关条件的线程)

  • 每个对象都可以作为一个条件队列(API:wait、notify和notifyAll)

    • Object.wait会自动释放锁,并请求操作系统挂起当前线程,从而使其他线程能够获得这个锁并且修改对象的状态
    • Object.notify/notifyAll通知被挂起的线程可以重新请求资源执行
  • 只有能对状态进行检查时,才能在某个条件上等待,并且只有能修改状态时,才能从条件等待中释放另一个线程
  • 条件队列在CPU效率、上下文切换开销和响应性等进行了优化
  • 如果某个功能无法通过“轮询和休眠”来实现,那么使用条件队列也无法实现
 1 public class BoundedBuffer<V> extends BaseBoundedBuffer<V> {
 2
 3     public BoundedBuffer(int capacity) {
 4         super(capacity);
 5     }
 6
 7     public synchronized void put(V v) throws InterruptedException{
 8         while (isFull()){
 9             wait();
10         }
11         doPut(v);
12         notifyAll();
13     }
14
15     public synchronized V take() throws InterruptedException{
16         while (isEmpty()){
17             wait();
18         }
19         V v = doTake();
20         notifyAll();
21         return v;
22     }
23 }

二、使用条件队列

1、条件谓词

  • 条件等待中存在一种重要的三元关系,包括加锁、wait方法和一个条件谓词
  • 条件谓词是由类中各个状态变量构成的表达式(while)
  • 在测试条件谓词之前必须先持有这个锁
  • 锁对象与条件队列对象(即调用wait和notify等方法所在的对象)必须是同一个对象
  • wait被唤醒后需要重新获得锁,并重新检查条件谓词

2、过早唤醒——一个条件队列与多个条件谓词相关时,wait方法返回不一定线程所等待的条件谓词就变为真了

1 void stateDependentMethod() throws InterruptedException
2 {
3   synchronized(lock)  // 必须通过一个锁来保护条件谓词
4     {
5         while(!condietionPredicate())
6             lock.wait();
7     }
8 }

当使用条件等待时(如Object.wait(), 或Condition.await()):

  • 通常都有一个条件谓词--包括一些对象状态的测试,线程在执行前必须首先通过这些测试
  • 在调用wait之前测试条件谓词,并且从wait中返回时再次进行测试
  • 在一个循环中调用wait
  • 确保使用与条件队列相关的锁来保护构成条件谓词的各个状态变量
  • 当调用wait, notify或notifyAll等方法时,一定要持有与条件队列相关的锁
  • 在检查条件谓词之后以及开始执行相应的操作之前,不要释放锁。

3、丢失信号量——线程必须等待一个已经为真的条件,但在开始等待之前没有检查条件谓词

如果线程A通知了一个条件队列,而线程B随后在这个条件队列上等待,那么线程B将不会立即醒来,而是需要另一个通知来唤醒它(导致活跃性下降)

4、通知——确保在条件谓词变为真时通过某种方式发出通知挂起的线程

  • 发出通知的线程持有锁调用notify和notifyAll,发出通知后应尽快释放锁
  • 多个线程可以基于不同的条件谓词在同一个条件队列上等待,使用notify单一的通知很容易导致类似于信号丢失的问题
  • 可以使用notify:同一条件谓词并且单进单出

使用notifyAll有时是低效的:唤醒的所有线程都需要竞争锁,并重新检验,而有时最终只有一个线程能执行

优化:条件通知

1 public synchronized void put(V v) throws InterruptedException
2 {
3     while(isFull())
4         wait();
5     boolean wasEmpty = isEmpty();
6     doPut(v);
7     if(wasEmpty)
8         notifyAll();
9 }

5、示例:阀门类

 1 public class ThreadGate {
 2        private boolean isOpen;
 3        private int generation;
 4
 5        public synchronized void close() {
 6               isOpen = false;
 7        }
 8
 9        public synchronized void open() {
10               ++generation;
11               isOpen = true;
12               notifyAll();
13        }
14
15        public synchronized void await() throws InterruptedException {
16               int arrivalGeneration = generation;
17               while (!isOpen && arrivalGeneration == generation)
18                      wait();
19        }
20 }

可重新关闭的阀门

arrivalGeneration == generation为了保证在阀门打开时又立即关闭时,在打开时通知的线程都可以通过阀门

6、子类的安全问题

  • 如果在实施子类化时违背了条件通知或单词通知的某个需求,那么在子类中可以增加合适的通知机制来代表基类
  • 对于状态依赖的类,要么将其等待和通知等协议完全向子类公开(并且写入正式文档),要么完全阻止子类参与到等待和通知等过程中
  • 完全禁止子类化

7、封装条件队列

8、入口协议和出口协议

  • 入口协议:该操作的条件谓词
  • 出口协议:检查被该操作修改的所有状态变量,并确认它们是否使某个其他的条件谓词变为真,如果是,则通知相关的条件队列

三、显示的Condition对象

内置条件队列的缺点:每个内置锁都只能有一个相关联的条件队列,而多个线程可能在同一条件队列上等待不同的条件谓词,调用notifyAll通知的线程非等待同意谓词

Condition <-> Lock,内置条件队列 <-> 内置锁

  • Lock.newCondition()
  • 在每个锁上可存在多个等待、条件等待可以是可中断的或不可中断的、基于时限的等待,以及公平的或非公平的队列操作
  • Condition对象继承了相关的Lock对象的公平性
  • 与wait、notify和notifyAll方法对应的分别是await、signal和signalAll
  • 将多个条件谓词分开并放到多个等待线程集,Condition使其更容易满足单次通知的需求(signal比signalAll更高效)
  • 锁、条件谓词和条件变量:件谓词中包含的变量必须由Lock来保护,并且在检查条件谓词以及调用await和signal时,必须持有Lock对象
 1 public class ConditionBoundedBuffer<T> {
 2     protected final Lock lock = new ReentrantLock();
 3     private final Condition notFull    = lock.newCondition();//条件:count < items.length
 4     private final Condition notEmpty  = lock.newCondition();//条件:count > 0
 5     private final T[] items = (T[]) new Object[100];
 6     private int tail, head, count;
 7
 8     public void put(T x) throws InterruptedException {
 9         lock.lock();
10         try {
11             while (count == items.length)
12                 notFull.await();//等到条件count < items.length满足
13             items[tail] = x;
14             if (++tail == items.length)
15                 tail = 0;
16             ++count;
17             notEmpty.signal();//通知读取等待线程
18         } finally {
19             lock.unlock();
20         }
21     }
22
23     public T take() throws InterruptedException {
24         lock.lock();
25         try {
26             while (count == 0)
27                 notEmpty.await();//等到条件count > 0满足
28             T x = items[head];
29             items[head] = null;
30             if (++head == items.length)
31                 head = 0;
32             --count;
33             notFull.signal();//通知写入等待线程
34             return x;
35         } finally {
36             lock.unlock();
37         }
38     }
39 }

四、Synchronizer解析

  在ReentrantLock和Semaphore这两个接口之间存在许多共同点。两个类都可以用作一个”阀门“,即每次只允许一定数量的线程通过,并当线程到达阀门时,可以通过(在调用lock或acquire时成功返回),也可以等待(在调用lock或acquire时阻塞),还可以取消(在调用tryLock或tryAcquire时返回”假“,表示在指定的时间内锁是不可用的或者无法获取许可)。而且,这两个接口都支持中断不可中断的以及限时的获取操作,并且也都支持等待线程执行公平或非公平的队列操作。

原因:都实现了同一个基类AbstractQueuedSynchronizer(AQS)

 1 public class SemaphoreOnLock {//基于Lock的Semaphore实现
 2        private final Lock lock = new ReentrantLock();
 3        //条件:permits > 0
 4        private final Condition permitsAvailable = lock.newCondition();
 5        private int permits;//许可数
 6
 7        SemaphoreOnLock(int initialPermits) {
 8               lock.lock();
 9               try {
10                      permits = initialPermits;
11               } finally {
12                      lock.unlock();
13               }
14        }
15
16        //颁发许可,条件是:permits > 0
17        public void acquire() throws InterruptedException {
18               lock.lock();
19               try {
20                      while (permits <= 0)//如果没有许可,则等待
21                             permitsAvailable.await();
22                      --permits;//用一个少一个
23               } finally {
24                      lock.unlock();
25               }
26        }
27
28        //归还许可
29        public void release() {
30               lock.lock();
31               try {
32                      ++permits;
33                      permitsAvailable.signal();
34               } finally {
35                      lock.unlock();
36               }
37        }
38 }

使用Lock实现信号量

 1 public class LockOnSemaphore {//基于Semaphore的Lock实现
 2        //具有一个信号量的Semaphore就相当于Lock
 3        private final Semaphore s = new Semaphore(1);
 4
 5        //获取锁
 6        public void lock() throws InterruptedException {
 7               s.acquire();
 8        }
 9
10        //释放锁
11        public void unLock() {
12               s.release();
13        }
14 }

使用信号量实现Lock

五、AbstractQueuedSynchronizer

最基本的操作:

  • 获取操作是一种依赖状态的操作,并且通常会阻塞(同步器判断当前状态是否允许获得操作,更新同步器的状态)
  • 释放并不是一个可阻塞的操作时,当执行“释放”操作时,所有在请求时被阻塞的线程都会开始执行

状态管理(一个整数状态):

  • 通过getState,setState以及compareAndSetState等protected类型方法来进行操作
  • 这个整数在不同子类表示任意状态。例:剩余的许可数量,任务状态
  • 子类可以添加额外状态

六、java.util.concurrent 同步器类中的AQS

1、ReentrantLock

  ReentrantLock只支持独占方式的获取操作,因此它实现了tryAcquire、tryRelease和isHeldExclusively

  ReentrantLock将同步状态用于保存锁获取操作的次数,或者正要释放锁的时候,才会修改这个变量

2、Semaphore与CountDownLatch

  Semaphore将AQS的同步状态用于保存当前可用许可的数量;CountDownLatch使用AQS的方式与Semaphore很相似,在同步状态中保存的是当前的计数值

3、FutureTask

  在FutureTask中,AQS同步状态被用来保存任务的状态

  FutureTask还维护一些额外的状态变量,用来保存计算结果或者抛出的异常

4、ReentrantReadWriteLock

  • 单个AQS子类将同时管理读取加锁和写入加锁
  • ReentrantReadWriteLock使用了一个16位的状态来表示写入锁的计数,并且使用了另一个16位的状态来表示读取锁的计数
  • 在读取锁上的操作将使用共享的获取方法与释放方法,在写入锁上的操作将使用独占的获取方法与释放方法
  • AQS在内部维护了一个等待线程队列,其中记录了某个线程请求的是独占访问还是共享访问:写操作独占获取;读操作可使第一个写之前的读都获取
时间: 2024-12-14 08:14:55

第十四章:构建自定义的同步工具——Java并发编程实战的相关文章

第十四章 构建自定义的同步工具

14.1 状态依赖性管理 基于先检查后执行的状态依赖性操作在多线程下常常发生一些我们不希望的结果.因此有必要对状态依赖操作进行管理, 构成前提条件的状态变量必须有对象的锁来保护,从而使他们在测试前提条件的同事保持不变. 如果条件尚未满足, 则必须释放锁. 下次测试前提条件之前则必须重新获取锁.之后再进行前提条件的测试 重试的实现方式 : 自旋等待. 在条件不成立时一直询问, 直到条件成立. 会消耗大量的CPU时间 休眠. 如果条件不成立, 则休眠一段时间, 休眠过后继续测试条件是否成立. 响应性

《Java并发编程实战》第十四章 构建自己的同步工具定义 札记

一.状态依赖性的管理 有界缓存实现的基类 @ ThreadSafe public abstract class BaseBoundedBuffer<E> { @GuardeBy( "this" ) private final E[] buf; @GuardeBy( "this" ) private int tail; @GuardeBy( "this" ) private int head; @GuardeBy( "this

《Java并发编程实战》第十五章 原子变量与非阻塞同步机制 读书笔记

一.锁的劣势 锁定后如果未释放,再次请求锁时会造成阻塞,多线程调度通常遇到阻塞会进行上下文切换,造成更多的开销. 在挂起与恢复线程等过程中存在着很大的开销,并且通常存在着较长时间的中断. 锁可能导致优先级反转,即使较高优先级的线程可以抢先执行,但仍然需要等待锁被释放,从而导致它的优先级会降至低优先级线程的级别. 二.硬件对并发的支持 处理器填写了一些特殊指令,例如:比较并交换.关联加载/条件存储. 1 比较并交换 CAS的含义是:"我认为V的值应该为A,如果是,那么将V的值更新为B,否则不需要修

java并发编程10.构建自定义的同步工具

创建状态依赖类的最简单方法通常是在类库中现有状态依赖类的基础上进行构造.如果类库中没有提供你需要的功能,可以使用java语言和类库提供的底层机制来构造自己的同步机制,包括内置的条件队列.显示地Condition对象以及AbstractQueuedSynchronizer框架. 在单线程程序中调用方法时,如果基于某个状态的前提条件未得到满足,那么这个条件永远无法成真.而在并发程序中,基于状态的条件可能会由于其他线程的操作而改变. 可阻塞的状态依赖操作 acquire lock on object

《Java并发编程实战》第十六章 Java内存模型 读书笔记

Java内存模型是保障多线程安全的根基,这里仅仅是认识型的理解总结并未深入研究. 一.什么是内存模型,为什么需要它 Java内存模型(Java Memory Model)并发相关的安全发布,同步策略的规范.一致性等都来自于JMM. 1 平台的内存模型 在架构定义的内存模型中将告诉应用程序可以从内存系统中获得怎样的保证,此外还定义了一些特殊的指令(称为内存栅栏或栅栏),当需要共享数据时,这些指令就能实现额外的存储协调保证. JVM通过在适当的位置上插入内存栅栏来屏蔽在JVM与底层平台内存模型之间的

【Java并发编程实战】—– AQS(四):CLH同步队列

在[Java并发编程实战]-–"J.U.C":CLH队列锁提过,AQS里面的CLH队列是CLH同步锁的一种变形. 其主要从双方面进行了改造:节点的结构与节点等待机制.在结构上引入了头结点和尾节点,他们分别指向队列的头和尾,尝试获取锁.入队列.释放锁等实现都与头尾节点相关.而且每一个节点都引入前驱节点和后兴许节点的引用:在等待机制上由原来的自旋改成堵塞唤醒. 其结构例如以下: 知道其结构了,我们再看看他的实现.在线程获取锁时会调用AQS的acquire()方法.该方法第一次尝试获取锁假设

《Java并发编程实战》第十一章 性能与可伸缩性 读书笔记

造成开销的操作包括: 1. 线程之间的协调(例如:锁.触发信号以及内存同步等) 2. 增加的上下文切换 3. 线程的创建和销毁 4. 线程的调度 一.对性能的思考 1 性能与可伸缩性 运行速度涉及以下两个指标: 某个指定的任务单元需要"多快"才能处理完成.计算资源一定的情况下,能完成"多少"工作. 可伸缩性: 当增加计算资源时(例如:CPU.内存.存储容器或I/O带宽),程序的吞吐量或者处理能力能相应地增加. 2 评估各种性能权衡因素 避免不成熟的优化.首先使程序正

《Java并发编程实战》第三章 对象的共享 读书笔记

一.可见性 什么是可见性? Java线程安全须要防止某个线程正在使用对象状态而还有一个线程在同一时候改动该状态,并且须要确保当一个线程改动了对象的状态后,其它线程能够看到发生的状态变化. 后者就是可见性的描写叙述即多线程能够实时获取其它线程改动后的状态. *** 待补充   两个工人同一时候记录生产产品总数问题 1. 失效数据 可见性出现故障就是其它线程没有获取到改动后的状态,更直观的描写叙述就是其它线程获取到的数据是失效数据. 2. 非原子64位操作 3. 加锁与可见性 比如在一个变量的读取与

《Java并发编程实战》第二章 线程安全性 读书笔记

一.什么是线程安全性 编写线程安全的代码 核心在于要对状态访问操作进行管理. 共享,可变的状态的访问 - 前者表示多个线程访问, 后者声明周期内发生改变. 线程安全性 核心概念是正确性.某个类的行为与其规范完全一致. 多个线程同时操作共享的变量,造成线程安全性问题. * 编写线程安全性代码的三种方法: 不在线程之间共享该状态变量 将状态变量修改为不可变的变量 在访问状态变量时使用同步 Java同步机制工具: synchronized volatile类型变量 显示锁(Explicit Lock