并发编程 03—— 闭锁CountDownLatch 与 栅栏CyclicBarrier
并发编程 05—— CompletionService : Executor 和 BlockingQueue
并发编程 09—— 任务取消 之 关闭 ExecutorService
并发编程 11—— 任务取消与关闭 之 shutdownNow 的局限性
并发编程 12—— 线程池的使用 之 配置ThreadPoolExecutor 和 饱和策略
概述
第1 部分 状态依赖
第2 部分 实例
参考
第1 部分 状态依赖与条件队列
在单线程程序中调用一个方法时,如果某个基于状态的前提条件未得到满足(例如“连接池必须非空”),那么这个条件将永远无法成真。因此,在编写顺序程序中的类时,要使得这些类在它们的前提条件未被满足时就失败。但在并发程序中,基于状态的条件可能会由于其他线程的操作而改变:一个资源池可能在几条指令之前还是空的,单现在却变成非空的,虽然有时候在前提条件不满足不会失败,单通常有一种更好的选择,即等待前条件变成真。
依赖状态的操作可以一直阻塞直到可以使线程一直阻塞,这比使它们先失败再实现起来要更为方便且更不易出错。内置的条件队列可以使线程继续执行,直到对象进入某个进程可以继续执行的状态,并且当被阻塞的线程可以执行时在唤醒它们。
可阻塞的状态依赖操作的形式如下所示。这种加锁模式有些不同寻常,因为锁是在操作的执行过程中被释放与重新获取的。构成前提条件的状态变量必须由对象的锁来保护,从而使它们在测试前提条件的同时保持不变。否则,前提条件就永远无法变成真。在再次测试前提条件之前,必须重新获得锁。
acquire lock on object state while (precondition does not hold){ release lock wait until precondition might hold optionally fail if interrupted or timeout expires reacquire lock } preform action release lock
在线程协作方面经典的案例“生产者-消费者”。在生产者和消费者之间有一个循环的传送带,生产者生产出产品消费者才能消费,消费者将传送带上的产品有消费,生产者才能生产新产品并放到传送带上。对于传送带来讲,它是生产者和消费者共享的,而且有满和空两种状态,如果满了则生产者需要停下来,如果空了消费者也没有可消费的产品。实际上,传送带既然是两者共享,则需要加锁使用保证线程安全和状态一致性,而生产者和消费者又同时依赖于传送带的状态。那么生产者或者消费者发现传送带状态不满足的情况下,需要释放锁,因为只有这样才能让对方来处理,只有这样才能使不满足的状态得到改变,有机会满足所需的状态。
一个简单的解决方案就是循环检查状态是否满足。假如有两个线程P和C,分别代表生产者和消费者,而一个数据结构Q代表传送带。对于P,需要获得Q的锁,循环判断Q是否为满,未满则可以继续执行,否则释放锁,继续循环判断。C也如此,只不过条件是Q未空。
对于处理器来说,循环的判断是十分消耗计算资源的。为了解决这个问题,我们可以让线程P和C每次尝试失败后释放锁并等待一段时间,等计时器到了之后再重新尝试获取锁并判断。这样至少看起来比前一种更有效果,但有至少有如下两个问题:
- 在通常的实现中,线程不断的重复切换和睡眠唤醒状态的调度是对性能有损耗的
- 睡眠(等待)时间的长短设定是一个问题,过短则和前一种情况没什么区别,徒增了一些调度开销,而过长会出现响应度低,延迟过长
那么有没有一种更有效的解决方案使得这装状态依赖的线程间协作更有效率呢?那就是条件队列。当一个线程发现自己不满足条件时,将其挂载到某个条件下的队列中,直到条件满足时得到系统的通知。这样的方式就避免了对线程不必要的唤醒、锁获取和检查。当然,要做到这些需要底层的支持。在java.lang.Object中,采用native的方式实现了wait()/notify()/notifyAll()方法。这三个方法结合操作系统的实现给我们使用条件队列提供了方便。wait()所做的就是释放锁,等待条件发生,当前线程阻塞。notify()/notifyAll()则是通知满足条件的队列中的线程,将其唤醒。
第2 部分 实例
内置锁和内置条件队列一起,一个简单的应用是创建可阻塞的有界缓存区,java并发包的BlockingQueue就是一个利用Lock和显式条件队列实现的可阻塞的有界队列。总结内置锁和内置条件的原理,这里我们用另一种方式实现简单的可阻塞缓存。源码如下:
首先,创建一抽象有界缓存类ABoundedBuffer,提供插入和删除的基本实现。
1 /** 2 * 有界缓存抽象类 3 * @ClassName: ABoundedBuffer 4 * TODO 5 * @author xingle 6 * @date 2015-1-15 下午2:08:13 7 */ 8 public class ABoundedBuffer<V> { 9 10 private final V[] buf; 11 private int tail; 12 private int head; 13 private int count; 14 15 protected ABoundedBuffer(int capacity){ 16 this.buf = (V[]) new Object[capacity]; 17 } 18 19 protected synchronized final void doPut(V v){ 20 buf[tail] = v; 21 if(++tail==buf.length){ 22 tail = 0; 23 } 24 25 ++count; 26 } 27 28 protected synchronized final V doTake(){ 29 V v = buf[head]; 30 buf[head] = null; 31 if(++head==buf.length){ 32 head = 0; 33 } 34 --count; 35 return v; 36 } 37 38 public synchronized final boolean isFull(){ 39 return count == buf.length; 40 } 41 42 public synchronized final boolean isEmpty(){ 43 return count == 0; 44 } 45 46 }
其次,利用内置条件队列,编写子类实现可阻塞的插入和删除操作。插入操作,依赖的条件是缓存非满,当条件不满足时,调用wait方法挂起线程,一旦插入成功,说明缓存非空,则调用notifyAll方法唤醒等待非空的线程。删除操作,依赖的条件是非空,当条件不满足时,同样挂起等待,一旦删除成功,说明缓存非满,唤起等待该条件的线程。简单的源码如下:
1 /** 2 * 3 * @ClassName: InnerConditionQueue 4 * 使用内置条件队列,实现简单的有界缓存 5 * 通过对象的wait和notify来实现挂起 6 * 锁对象是this,调用wait/notify的对象是同一个对象。 7 * 三元关系(锁、wait/notify、条件谓词) 8 * 线程从wait中被唤醒时,并不代码条件谓词为真,此时还是需要再判断条件。所以必须在循环中调用wait;每次醒来时都判断谓词的真假。 9 * @author xingle 10 * @date 2015-1-15 下午2:13:55 11 */ 12 public class InnerConditionQueue<V> extends ABoundedBuffer<V> { 13 14 /** 15 * @param capacity 16 */ 17 protected InnerConditionQueue(int capacity) { 18 super(capacity); 19 // TODO Auto-generated constructor stub 20 } 21 22 public synchronized void put(V v) throws InterruptedException{ 23 while(isFull()){ 24 System.out.println(new Date()+" buffer 满了, thread wait:"+Thread.currentThread().getName()); 25 wait(); 26 } 27 doPut(v); 28 System.out.println(new Date()+" "+Thread.currentThread().getName()+" 放入 :"+v+" "); 29 notifyAll(); 30 } 31 32 public synchronized V take() throws InterruptedException { 33 while(isEmpty()){ 34 System.out.println(new Date()+" buffer 为空, thread wait:"+Thread.currentThread().getName()); 35 wait(); 36 } 37 38 notifyAll(); 39 //每当在等待一个条件时,一定要确保在条件谓词变为真时,通过某种方式发出通知 40 V v = doTake(); 41 System.out.println(new Date()+" "+Thread.currentThread().getName()+" 取出 :"+v); 42 return v; 43 } 44 45 }
最后,编写测试代码,创建一个大小为2的缓冲区,启动三个线程执行插入操作,当第三个线程执行时会因为缓存已满而挂起,主线程删除两个记录后,等待线程被唤醒成功插入。当缓存空的时候,之后的删除操作将被阻塞直到有新的记录插入为止。测试代码如下:
1 public class ConditionQueueTest { 2 3 public static void main(String[] args){ 4 final InnerConditionQueue<String> buf = new InnerConditionQueue<String>(2); 5 6 Thread t1 = new Thread(new Runnable() { 7 8 @Override 9 public void run() { 10 try { 11 buf.put("hello1"); 12 } catch (InterruptedException e) { 13 System.out.println("intercetp1:"+Thread.currentThread().getName()); 14 e.printStackTrace(); 15 } 16 17 } 18 }); 19 20 Thread t2 = new Thread(new Runnable() { 21 22 @Override 23 public void run() { 24 try { 25 buf.put("hello2"); 26 } catch (InterruptedException e) { 27 System.out.println("intercetp2:"+Thread.currentThread().getName()); 28 e.printStackTrace(); 29 } 30 } 31 }); 32 33 Thread t3 = new Thread(new Runnable() { 34 35 @Override 36 public void run() { 37 try { 38 buf.put("hello3"); 39 40 Thread.sleep(5000); 41 buf.put("last one..."); 42 } catch (InterruptedException e) { 43 System.out.println("intercetp3:"+Thread.currentThread().getName()); 44 e.printStackTrace(); 45 } 46 47 } 48 }); 49 50 t1.start(); 51 t2.start(); 52 t3.start(); 53 54 try { 55 Thread.sleep(5000); 56 buf.take(); 57 buf.take(); 58 buf.take(); 59 buf.take(); 60 } catch (InterruptedException e) { 61 e.printStackTrace(); 62 } 63 64 System.out.println(new Date()+" main over..."); 65 } 66 67 }
执行结果:
参考
1.《java 并发编程实战》 14.1
2.Java线程安全杂谈(下)——锁、状态依赖与协同以及锁优化