//此文基于《Java并发编程实践》
我们都知道在应用程序中合理地使用缓存,能更快的访问我们之前的计算结果,从而提高吞吐量。例如Redis和Memcached基于内存的数据存储系统等。此篇文章介绍如何实现简单缓存。
首先定义一个Computable接口A是输入,V是输出。
1 package simplecache; 2 3 /** 4 * Created by yulinfeng on 12/25/16. 5 */ 6 public interface Computable<A, V> { 7 V compute(A arg) throws InterruptedException; 8 }
实现这个接口,也即是在ExpensiveFunction做具体的计算过程。
1 package simplecache; 2 3 /** 4 * Created by yulinfeng on 12/25/16. 5 */ 6 public class ExpensiveFunction implements Computable<String, Integer> { 7 @Override 8 public Integer compute(String arg) throws InterruptedException { 9 //计算 10 return new Integer(arg); 11 } 12 }
接着将创建一个Computable包装器,帮助记住之前的计算结果,并将缓存过程封装起来(Memoization)。
1.利用简单HashMap实现缓存
1 package simplecache; 2 3 import java.util.HashMap; 4 import java.util.Map; 5 6 /** 7 * Created by yulinfeng on 12/25/16. 8 */ 9 public class Memoizer1<A, V> implements Computable<A, V> { 10 private final Map<A, V> cache = new HashMap<A, V>(); 11 private final Computable<A, V> c; 12 13 public Memoizer1(Computable<A, V> c){ 14 this.c = c; 15 } 16 17 @Override 18 public synchronized V compute(A arg) throws InterruptedException { 19 V result = cache.get(arg); 20 if (null == result){ 21 result = c.compute(arg); 22 cache.put(arg, result); 23 } 24 return result; 25 } 26 }
我们首先利用最简单的HashMap实现缓存,由于HashMap并不是线程安全的,所以在compute方法使用synchronized关键字,同步以实现线程安全。可见使用synchronized同步方法如此大粒度的同步必然会带来并发性的降低,因为每次只有一个线程执行compute方法,其余线程只能排队等待。
2.利用并发容器ConcurrentHashMap
第1种方法能实现缓存,且能实现线程安全的缓存,不过带来的问题就是并发性降低。我们使用并发包中的ConcurrentHashMap并发容器。
1 package simplecache; 2 3 import java.util.Map; 4 import java.util.concurrent.ConcurrentHashMap; 5 6 /** 7 * Created by yulinfeng on 12/25/16. 8 */ 9 public class Memoizer2<A, V> implements Computable<A, V> { 10 private final Map<A, V> cache = new ConcurrentHashMap<A, V>(); 11 private final Computable<A, V> c; 12 13 public Memoizer2(Computable<A, V> c){ 14 this.c = c; 15 } 16 17 @Override 18 public V compute(A arg) throws InterruptedException { 19 V result = cache.get(arg); 20 if (null == result){ 21 result = c.compute(arg); 22 cache.put(arg, rsult); 23 } 24 return result; 25 } 26 }
毫无疑问,利用ConcurrentHashMap会比简单HashMap带来更好的并发性,同时它也是线程安全的。不过在有一种条件下,这种方式会带来一个新的问题,当这个计算过程比较复杂,计算时间比较长时,线程T1正在计算没有结束,此时线程T2并不知道此时T1已经在计算了,所以它同样会再次进行计算,这种条件下相当于一个值被计算了2次。我们应该想要达到的效果应该是T1正在计算,而此时T2能发现T1正在计算相同值,此时应该阻塞等待T1计算完毕返回计算结果,而不是T2也去做一次计算。FutureTask表示一个计算过程,这个计算过程可能已经计算完成,也有可能正在计算。如果有结果可用,那么FutureTask.get将立即返回结果,否则会一直阻塞直到计算结束返回结果。这正好我们想要达到的效果。
3.缓存的最佳实践——ConcurrentHashMap+FutureTask
1 package simplecache; 2 3 import java.util.Map; 4 import java.util.concurrent.ExecutionException; 5 import java.util.concurrent.Future; 6 7 /** 8 * Created by yulinfeng on 12/25/16. 9 */ 10 public class Memoizer3<A, V> implements Computable<A, V> { 11 private final Map<A, Future<V>> cache = new ConcurrentHashMap<A, Future<V>>(); 12 private final Computable<A, V> c; 13 14 public Memoizer3(Computable<A, V> c) { 15 this.c = c; 16 } 17 18 @Override 19 public V compute(final A arg) throws InterruptedException { 20 Future<V> f = cache.get(arg); 21 if (null == f){ 22 Callable<V> eval = new Callable<V>() { 23 @Override 24 public V call() throws InterruptedException { 25 return c.compute(arg); 26 } 27 }; 28 FutureTask<V> ft = new FutureTask<V>(eval); 29 cache.put(arg, ft); 30 ft.run(); //调用执行c.compute 31 } 32 try { 33 return f.get(); 34 } catch (ExecutionException e) { 35 e.printStackTrace(); 36 } 37 } 38 }
不了解FutureTask可以去补补了,但记住上面所说“FutureTask表示一个计算过程,这个计算过程可能已经计算完成,也有可能正在计算。如果有结果可用,那么FutureTask.get将立即返回结果,否则会一直阻塞直到计算结束返回结果。”,但这并不算是最完美的实现,在compute方法中出现了if的复合操作,也就是说在期间还是很有可能出现如同ConcurrentHashMap一样的重复计算,只是概率降低了而已。幸好,ConcurrentHashMap为我们提供了putIfAbsent的原子方法,从而完美的避免了这个问题。
1 package simplecache; 2 3 import java.util.concurrent.*; 4 5 /** 6 * Created by yulinfeng on 12/25/16. 7 */ 8 public class Memoizer<A, V> implements Computable<A, V> { 9 private final ConcurrentHashMap<A, Future<V>> cache = new ConcurrentHashMap<A, Future<V>>(); 10 private final Computable<A, V> c; 11 12 public Memoizer(Computable<A, V> c){ 13 this.c = c; 14 } 15 16 @Override 17 public V compute(final A arg) throws InterruptedException { 18 while (true) { 19 Future<V> f = cache.get(arg); 20 if (null == f) { 21 Callable<V> eval = new Callable<V>() { 22 @Override 23 public V call() throws Exception { 24 return c.compute(arg); 25 } 26 }; 27 FutureTask<V> ft = new FutureTask<V>(eval); 28 f = cache.putIfAbsent(arg, ft); 29 if (null == f){ 30 f = ft; 31 ft.run(); 32 } 33 } 34 try { 35 return f.get(); 36 } catch (CancellationException e){ 37 e.printStackTrace(); 38 } catch (ExecutionException e) { 39 e.printStackTrace(); 40 } 41 } 42 } 43 }
这样我们利用ConcurrentHashMap的并发性已经putIfAbsent原子性,以及FutureTask的特性实现了一个简单缓存。