最近几天由于工作原因,需要设计实现一个线程安全的缓存机制,拿出来和大家分享交流一下。
应用背景:
缓存是在实际工作中经常用到的,主要作用呢?1. 提高响应速度 2. 降低cpu压力或者数据库压力。
在此,我的应用背景是拦截一些RFC请求(不要求获取实时数据),以降低数据库及自身应用的访问压力。
目标:
高可扩展性:可以方便配置需要使用缓存的方法。
线程安全性:在并发情况下,要求线程安全,且尽可能高效。
使用技术:
- 使用AOP的插件性质来降低缓存与原系统的耦合性,即在切面层做缓存的处理。
- 使用Annotation来对需要做缓存处理的函数进行标记,并可以对缓存时间个性化
- 针对缓存过期问题,对放入缓存的数据封装一层,并打上时间戳
示意图:
设计难点:
针对某一时刻并发数较多且缓存失效的情况下,我们应该保证的是只有一个线程会去执行数据的读取并设置的操作,那么其他线程应该是等待该线程完成操作再一起返回还是直接返回null值?
答:在并发数较多且数据准备时间过长的情况下,如果线程采取等待策略,那么将引起很大的资源浪费:占用RFC连接(一般数量是有限制的),占用服务器cpu时间等等问题。所以,最好是代码中提供两种策略,对于执行时间较长的读数据操作,我们应该将线程直接返回,而非一直等待。
代码实现:
Annotation:
/** * 表示一个方法是否启用本地缓存,可以指定本地缓存的时间间隔,默认为一个小时 */ @Retention(RetentionPolicy.RUNTIME) @Target(ElementType.METHOD) public @interface LocalCacheOperation { long localCacheInterval() default 1000 * 60 * 60; String localCacheKey() default ""; }
CacheObject:
//为缓存对象包上一层时间戳 public class CacheObject implements Serializable { private static final long serialVersionUID = 4873268779348802945L; private long timestamp; private Object object; ..... }
标记要使用缓存的方法:
public interface ConfigService { public BloomFilter<String> getAllPassengerNames(); public BloomFilter<Long> getAllTrades(); } public class ConfigServiceImpl implements ConfigService { private PassengerManager passengerManager; private TradeManager tradeManager; @LocalCacheOperation(localCacheInterval=1000*60*60*24) public BloomFilter<String> getAllPassengerNames() { return passengerManager.getAllPassengerNames(); } @LocalCacheOperation public BloomFilter<Long> getAllTrades() { return tradeManager.getAllTrades(); } }
核心:切面实现
我在这里使用了加锁和不加锁两种方式来实现对应的两种策略:缓存不存在线程直接返回或等待。
/** * 本地缓存切面实现 */ @Aspect public class LocalCacheAspect { private static final Log log = LogFactory.getLog(LocalCacheAspect.class); private final ConcurrentMap<String, Future<CacheObject>> localCache = new ConcurrentHashMap<String, Future<CacheObject>>(); //控制台 private ConsoleBean consoleBean; private final ConcurrentMap<String, SoftReference<CacheObject>> concurrentLocalCache = new ConcurrentHashMap<String, SoftReference<CacheObject>>(); /** * 对于执行时间较长的读数据操作,需要在这里相应的添加锁,对于操作添加锁后的函数的线程 * 如果本地缓存为空,且读数据的锁已被其他线程占据,将直接返回null */ private final static Map<String, Lock> localCacheLocks = new HashMap<String, Lock>(); static { localCacheLocks.put("getAllTrades", new ReentrantLock()); } /** * Advice aound audit operations * * @param pjpParam * @return */ @Around("execution(@LocalCacheOperation * *(..))") public Object doCache(ProceedingJoinPoint pjpParam) throws Throwable { if(!getConsoleBean().isLocalCacheSwitchOn()) { log.warn("localCache not switch on, please pay attention"); return pjpParam.proceed(pjpParam.getArgs()); } final ProceedingJoinPoint pjp = pjpParam; Signature sig = pjp.getSignature(); if (sig instanceof MethodSignature) { MethodSignature mSig = (MethodSignature) sig; LocalCacheOperation co = mSig.getMethod().getAnnotation( LocalCacheOperation.class); long localCacheInterval = 0; String localCacheKey = null; /** * AOP在拦截子类的Annotataion时,无法获取该Annotation,导致co可能为空 * @author chenlei.cl */ if( co == null ){ localCacheInterval = consoleBean.getLocalCacheInterval(); localCacheKey = mSig.getName(); } else { localCacheInterval = co.localCacheInterval(); localCacheKey = StringUtils.isNotBlank(co.localCacheKey()) ? co.localCacheKey() : mSig.getName(); } if (localCacheLocks.containsKey(mSig.getMethod().getName())) { //使用本地互斥锁 return doConcurrentLocalCache(pjp, localCacheInterval, localCacheKey); } while (true) {// 等待某个线程将数据获取到本地缓存 Future<CacheObject> f = localCache.get(localCacheKey); try { long currentTime = System.currentTimeMillis(); if (f != null && f.get() != null && currentTime - f.get().getTimestamp() > localCacheInterval) { localCache.remove(localCacheKey, f); f = null; } if (f == null) { Callable<CacheObject> eval = new Callable<CacheObject>() { public CacheObject call() throws InterruptedException { Object res; try { res = pjp.proceed(pjp.getArgs()); } catch (Throwable e) { log.error("Fail to process method", e); throw new ServiceException(e.getMessage()); } CacheObject cacheObject = new CacheObject(); cacheObject.setObject(res); cacheObject.setTimestamp(System.currentTimeMillis()); return cacheObject; } }; FutureTask<CacheObject> ft = new FutureTask<CacheObject>(eval); f = localCache.putIfAbsent(localCacheKey, ft); if (f == null) { f = ft; ft.run(); } } CacheObject obj = f.get(); if (obj != null) return obj.getObject(); } catch (CancellationException e) { localCache.remove(localCacheKey, f); } catch (ExecutionException e) { throw new ServiceException(e.getMessage()); } } } return pjp.proceed(pjp.getArgs()); } @SuppressWarnings("static-access") public Object doConcurrentLocalCache(ProceedingJoinPoint pjp, long localCacheInterval, String localCacheKey) throws Throwable { try { long currentTime = System.currentTimeMillis(); SoftReference<CacheObject> weakRefCacheObj = concurrentLocalCache.get(localCacheKey); if (weakRefCacheObj != null && weakRefCacheObj.get() != null && currentTime - weakRefCacheObj.get().getTimestamp() > localCacheInterval) { // 缓存过期 weakRefCacheObj.get().setObject(null); // 清空引用 concurrentLocalCache.remove(localCacheKey, weakRefCacheObj); weakRefCacheObj = null; } else if (weakRefCacheObj != null && weakRefCacheObj.get() != null) { return weakRefCacheObj.get().getObject(); } if (this.localCacheLocks.get(localCacheKey).tryLock()) { weakRefCacheObj = concurrentLocalCache.get(localCacheKey); if (weakRefCacheObj != null && weakRefCacheObj.get() != null) { // double check return weakRefCacheObj.get().getObject(); } try { Object res = pjp.proceed(pjp.getArgs()); CacheObject cacheObject = new CacheObject(); cacheObject.setObject(res); cacheObject.setTimestamp(System.currentTimeMillis()); weakRefCacheObj = new SoftReference<CacheObject>( cacheObject); concurrentLocalCache.put(localCacheKey, weakRefCacheObj); return res; } finally { this.localCacheLocks.get(localCacheKey).unlock(); } } else { return null; // make the other part wait } } catch (Exception e) { throw new ServiceException(e.getMessage(), e); } } }
时间: 2025-01-04 15:13:57