线程安全的缓存机制 - AOP设计与实现

最近几天由于工作原因,需要设计实现一个线程安全的缓存机制,拿出来和大家分享交流一下。

应用背景:

缓存是在实际工作中经常用到的,主要作用呢?1. 提高响应速度 2. 降低cpu压力或者数据库压力。

在此,我的应用背景是拦截一些RFC请求(不要求获取实时数据),以降低数据库及自身应用的访问压力。

目标:

高可扩展性:可以方便配置需要使用缓存的方法。

线程安全性:在并发情况下,要求线程安全,且尽可能高效。

使用技术:

  • 使用AOP的插件性质来降低缓存与原系统的耦合性,即在切面层做缓存的处理。
  • 使用Annotation来对需要做缓存处理的函数进行标记,并可以对缓存时间个性化
  • 针对缓存过期问题,对放入缓存的数据封装一层,并打上时间戳

示意图:

     

设计难点:

针对某一时刻并发数较多且缓存失效的情况下,我们应该保证的是只有一个线程会去执行数据的读取并设置的操作,那么其他线程应该是等待该线程完成操作再一起返回还是直接返回null值?

答:在并发数较多且数据准备时间过长的情况下,如果线程采取等待策略,那么将引起很大的资源浪费:占用RFC连接(一般数量是有限制的),占用服务器cpu时间等等问题。所以,最好是代码中提供两种策略,对于执行时间较长的读数据操作,我们应该将线程直接返回,而非一直等待。

代码实现:   

Annotation:

/**
 * 表示一个方法是否启用本地缓存,可以指定本地缓存的时间间隔,默认为一个小时
 */
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface LocalCacheOperation {

    long localCacheInterval() default 1000 * 60 * 60;
    String localCacheKey() default "";
}

CacheObject:

//为缓存对象包上一层时间戳
public class CacheObject implements Serializable {
    private static final long serialVersionUID = 4873268779348802945L;
    private long timestamp;
    private Object object;
    .....
}          

标记要使用缓存的方法:

public interface ConfigService {
     public BloomFilter<String> getAllPassengerNames();
     public BloomFilter<Long> getAllTrades();
}

public class ConfigServiceImpl implements ConfigService {
     private PassengerManager passengerManager;
     private TradeManager tradeManager;

     @LocalCacheOperation(localCacheInterval=1000*60*60*24)
     public BloomFilter<String> getAllPassengerNames() {
          return passengerManager.getAllPassengerNames();
     }
     @LocalCacheOperation
     public BloomFilter<Long> getAllTrades() {
          return tradeManager.getAllTrades();
     }
}

核心:切面实现

我在这里使用了加锁和不加锁两种方式来实现对应的两种策略:缓存不存在线程直接返回或等待。

/**
 * 本地缓存切面实现
 */
@Aspect
public class LocalCacheAspect {

    private static final Log log = LogFactory.getLog(LocalCacheAspect.class);

    private final ConcurrentMap<String, Future<CacheObject>>
     localCache = new ConcurrentHashMap<String, Future<CacheObject>>();

    //控制台
    private ConsoleBean consoleBean;

    private final ConcurrentMap<String, SoftReference<CacheObject>>
     concurrentLocalCache = new ConcurrentHashMap<String, SoftReference<CacheObject>>();

    /**
     * 对于执行时间较长的读数据操作,需要在这里相应的添加锁,对于操作添加锁后的函数的线程
     * 如果本地缓存为空,且读数据的锁已被其他线程占据,将直接返回null
     */

    private final static Map<String, Lock> localCacheLocks = new HashMap<String, Lock>();
    static {
        localCacheLocks.put("getAllTrades", new ReentrantLock());
    }

    /**
     * Advice aound audit operations
     *
     * @param pjpParam
     * @return
     */
    @Around("execution(@LocalCacheOperation * *(..))")
    public Object doCache(ProceedingJoinPoint pjpParam) throws Throwable {
        if(!getConsoleBean().isLocalCacheSwitchOn()) {
            log.warn("localCache not switch on, please pay attention");
            return pjpParam.proceed(pjpParam.getArgs());
        }
        final ProceedingJoinPoint pjp = pjpParam;
        Signature sig = pjp.getSignature();
        if (sig instanceof MethodSignature) {
            MethodSignature mSig = (MethodSignature) sig;
            LocalCacheOperation co = mSig.getMethod().getAnnotation(
             LocalCacheOperation.class);
            long localCacheInterval = 0;
            String localCacheKey = null;
            /**
             * AOP在拦截子类的Annotataion时,无法获取该Annotation,导致co可能为空
             * @author chenlei.cl
             */
            if( co == null ){
             localCacheInterval = consoleBean.getLocalCacheInterval();
             localCacheKey = mSig.getName();
            } else {
             localCacheInterval = co.localCacheInterval();
             localCacheKey = StringUtils.isNotBlank(co.localCacheKey()) ?
             co.localCacheKey() : mSig.getName();
            }
            if (localCacheLocks.containsKey(mSig.getMethod().getName())) { //使用本地互斥锁
             return doConcurrentLocalCache(pjp, localCacheInterval, localCacheKey);
            }

            while (true) {// 等待某个线程将数据获取到本地缓存
                Future<CacheObject> f = localCache.get(localCacheKey);
                try {
                    long currentTime = System.currentTimeMillis();
                    if (f != null && f.get() != null && currentTime - f.get().getTimestamp()
                    > localCacheInterval) {
                     localCache.remove(localCacheKey, f);
                        f = null;
                    }

                    if (f == null) {
                        Callable<CacheObject> eval = new Callable<CacheObject>() {
                            public CacheObject call() throws InterruptedException {
                                Object res;
                                try {
                                    res = pjp.proceed(pjp.getArgs());
                                }
                                catch (Throwable e) {
                                    log.error("Fail to process method", e);
                                    throw new  ServiceException(e.getMessage());
                                }
                                CacheObject cacheObject = new CacheObject();
                                cacheObject.setObject(res);
                                cacheObject.setTimestamp(System.currentTimeMillis());
                                return cacheObject;
                            }
                        };

                        FutureTask<CacheObject> ft =
                         new FutureTask<CacheObject>(eval);
                        f = localCache.putIfAbsent(localCacheKey, ft);
                        if (f == null) {
                            f = ft;
                            ft.run();
                        }
                    }

                    CacheObject obj = f.get();
                    if (obj != null)
                        return obj.getObject();
                }
                catch (CancellationException e) {
                    localCache.remove(localCacheKey, f);
                }
                catch (ExecutionException e) {
                    throw new ServiceException(e.getMessage());
                }
            }
        }
        return pjp.proceed(pjp.getArgs());
    }

    @SuppressWarnings("static-access")
    public Object doConcurrentLocalCache(ProceedingJoinPoint pjp,
        long localCacheInterval, String localCacheKey) throws Throwable {
        try {
            long currentTime = System.currentTimeMillis();
            SoftReference<CacheObject> weakRefCacheObj =
            concurrentLocalCache.get(localCacheKey);
            if (weakRefCacheObj != null && weakRefCacheObj.get() != null &&
                currentTime - weakRefCacheObj.get().getTimestamp() > localCacheInterval) {
                // 缓存过期
                weakRefCacheObj.get().setObject(null); // 清空引用
                concurrentLocalCache.remove(localCacheKey, weakRefCacheObj);
                weakRefCacheObj = null;
            } else if (weakRefCacheObj != null && weakRefCacheObj.get() != null) {
                return weakRefCacheObj.get().getObject();
           }

            if (this.localCacheLocks.get(localCacheKey).tryLock()) {
             weakRefCacheObj = concurrentLocalCache.get(localCacheKey);
             if (weakRefCacheObj != null && weakRefCacheObj.get() != null) {
             // double check
                    return weakRefCacheObj.get().getObject();
             }
             try {
                 Object res = pjp.proceed(pjp.getArgs());
                 CacheObject cacheObject = new CacheObject();
                 cacheObject.setObject(res);
                 cacheObject.setTimestamp(System.currentTimeMillis());
                 weakRefCacheObj = new SoftReference<CacheObject>(
                 cacheObject);
                 concurrentLocalCache.put(localCacheKey, weakRefCacheObj);
                 return res;
             } finally {
                 this.localCacheLocks.get(localCacheKey).unlock();
             }
            } else {
             return null; // make the other part wait
            }
        } catch (Exception e) {
            throw new ServiceException(e.getMessage(), e);
        }
    }
}
时间: 2025-01-04 15:13:57

线程安全的缓存机制 - AOP设计与实现的相关文章

《深入理解mybatis原理》 MyBatis缓存机制的设计与实现

本文主要讲解MyBatis非常棒的缓存机制的设计原理,给读者们介绍一下MyBatis的缓存机制的轮廓,然后会分别针对缓存机制中的方方面面展开讨论. MyBatis将数据缓存设计成两级结构,分为一级缓存.二级缓存: 一级缓存是Session会话级别的缓存,位于表示一次数据库会话的SqlSession对象之中,又被称之为本地缓存.一级缓存是MyBatis内部实现的一个特性,用户不能配置,默认情况下自动支持的缓存,用户没有定制它的权利(不过这也不是绝对的,可以通过开发插件对它进行修改): 二级缓存是A

《深入理解mybatis原理4》 MyBatis缓存机制的设计与实现

<深入理解mybatis原理> MyBatis缓存机制的设计与实现 本文主要讲解MyBatis非常棒的缓存机制的设计原理,给读者们介绍一下MyBatis的缓存机制的轮廓,然后会分别针对缓存机制中的方方面面展开讨论. MyBatis将数据缓存设计成两级结构,分为一级缓存.二级缓存:   一级缓存是Session会话级别的缓存,位于表示一次数据库会话的SqlSession对象之中,又被称之为本地缓存.一级缓存是MyBatis内部实现的一个特性,用户不能配置,默认情况下自动支持的缓存,用户没有定制它

针对增量请求的缓存机制实现 - AOP

背景: 在web应用中,我们经常使用黑白名单,在http://blog.csdn.net/troy__/article/details/39320699中我们实现了一个线程安全的针对全量请求的缓存机制,这种技术主要是用于黑白名单的全量更新.但是我们不能经常请求全量吧,网络和数据库都会累死,所以在此我们设计实现一个针对增量请求的缓存机制.全量请求提供黑白名单低频度的更新,增量请求提供黑白名单高频度的更新.当然,全量和增量的应用场景也并不只是黑白名单. 设计概述: 使用缓存的关键之一是:缓存并不能保

设计一个移动应用的本地缓存机制

在手机应用程序开发中,为了降低与服务端的交互次数,加快用户的响应速度,一般都会在iOS设备中加一个缓存的机制,前面一篇文章介绍了iOS设备的内存缓存.这篇文章将设计一个本地缓存的机制. 功能需求 这个缓存机制满足以下这些功能. 1.能够将数据缓存到本地磁盘. 2.能够推断一个资源是否已经被缓存.假设已经被缓存.在请求同样的资源.先到本地磁盘搜索. 3.能够推断文件缓存什么时候过期.这里为了简单起见这里,我们在请求url资源的时候.给每次请求的文件设定一个过期的时间. 4.能够实现:假设文件已经被

atitit。浏览器缓存机制 and 微信浏览器防止缓存的设计 attilax 总结

atitit.浏览器缓存机制 and 微信浏览器防止缓存的设计 attilax 总结 1. 缓存的一些机制 1 1.1. http 304 1 1.2. 浏览器刷新的处理机制 1 1.3. Expires 2 1.4. Cache-Control 2 1.5. Last-Modified/E-tag 3 1.6. Etag 主要为了解决 Last-Modified 无法解决的一些问题. 4 2. 不同的页面打开方式产生的请求区别 5 3. html  meta法 5 4. http head 法

Varnish缓存机制详细介绍及简单配置

Varnish是一款高性能的开源HTTP加速器,其主要用来做为反向代理中的缓存服务器使用,但其实Varnish本身也是具有反向代理功能的,但在创建连接和维持连接上,与Nginx相比差距很大,现在有一个很流行的架构就是前端用Nginx作为反向代理,后面加Varnish缓存服务器为Web服务加速 在将Varnish前先谈谈我们的浏览器缓存机制,现在的浏览器基本都具有缓存功能,它能将我们以前访问过的静态内容和可进行缓存的动态内容缓存再本地,而后在下次访问相同资源时,如果可以确认Server端的资源未发

9大浏览器端缓存机制分析

浏览器缓存(Browser Caching)是浏览器端保存数据用于快速读取或避免重复资源请求的优化机制,有效的缓存使用可以避免重复的网络请求和浏览器快速地读取本地数据,整体上加速网页展示给用户.浏览器端缓存的机制种类较多,总体归纳为九种,这里详细分析下这九种缓存机制的原理和使用场景.打开浏览器的调试模式->resources左侧就有浏览器的8种缓存机制. 一.http缓存 http缓存是基于HTTP协议的浏览器文件级缓存机制.即针对文件的重复请求情况下,浏览器可以根据协议头判断从服务器端请求文件

Java缓存学习之二:浏览器缓存机制

浏览器端的九种缓存机制介绍 浏览器缓存是浏览器端保存数据用于快速读取或避免重复资源请求的优化机制,有效的缓存使用可以避免重复的网络请求和浏览器快速地读取本地数据,整体上加速网页展示给用户.浏览器端缓存的机制种类较多,总体归纳为九种,这里详细分析下这九种缓存机制的原理和使用场景.打开浏览器的调试模式->resources左侧就有浏览器的8种缓存机制. 一.http缓存 http缓存是基于HTTP协议的浏览器文件级缓存机制.即针对文件的重复请求情况下,浏览器可以根据协议头判断从服务器端请求文件还是从

Android缓存机制&amp;一个缓存框架推荐

1.先推荐一个轻量级缓存框架--ACache(ASimpleCache) ACache介绍: ACache类似于SharedPreferences,但是比SharedPreferences功能更加强大,SharedPreferences只能保存一些基本数据类型.Serializable.Bundle等数据, 而Acache可以缓存如下数据: 普通的字符串.JsonObject.JsonArray.Bitmap.Drawable.序列化的java对象,和 byte数据. 主要特色: 1:轻,轻到只