背景
有时我们可以使用缓存进行容灾的处理。场景如下:我们当前有一个专门提供各种数据的应用DataCore,该应用开放多个RFC方法供其他应用使用。
我们平时在读写数据时,会在Cache备份一份(为平时DataCore提高响应速度、降低DB、CPU压力所用),当DB挂掉的时候,Cache还可以用来容灾。使用缓存容灾的好处是:性能足够好,坏处是缓存可比数据库成本高多了。
让我们想象得更猛烈些,当DataCore整个挂掉的时候,A、B、C、D方怎么才能安然的运行下去?
我们可以在A、B、C、D端上提供DataCore的缓存容灾服务,这样,即使在DataCore整个挂掉的情况下,其他应用也不会受影响。
要考虑的几个问题
- 容灾读的部分不必说,对象原本在存入缓存时就根据类型分了区域,读的时候直接在相应的区域取出即可。容灾写的话,针对同一类型对象的写操作,如何将这些对象集合在一块,待DataCore恢复后,再将容灾写过的对象覆盖到DB。
- 缓存的写操作必须是线程安全的
详细设计
缓存容灾写的一种可能策略是:针对每种类型的对象在缓存中开辟一大段储存空间(数组方式或者数组链接结合方式),然后把每个容灾写的对象塞进这段空间内,在要覆写回DB时,直接从头到尾在缓存里把对象取出来即可。示意图如下:
上面这种设计缺点是需要一大片的连续的储存空间,对于缓存来说,这是要命的。缓存的底层储存机制就是基于分散的hash。
上面设计的一种改进方案是,我们只在连续空间中储存UserDO的唯一标示符,比如id或者key什么的。这样我们的就不需要那么大的连续空间了。示意图如下:
更进一步,我们可以把UserDO的id也分散储存。可以利用一个DisasterIndexDO储存每一个类型的容灾写的信息,利用beginIndex以及currentIndex字段为所有容灾写对象打上一个序号,在缓存中储存该序号与对象id的对应关系,然后我们就可以通过序号检索出id,再通过id检索出对象。示意图如下:
在多个调用方在对某一类型的对象进行容灾写操作时,只需要对DisasterIndexDO进行安全的并发访问即可,抢占currentIndex,然后再进行缓存的写操作。这样,我们的容灾写就实现了。
范例实现
//index对象 public class DisasterIndexDO implements Serializable { private static final long serialVersionUID = -8688243351154917184L; public int namespace; public int beginIndex; public int currentIndex; public long expireLockTime; public static final long DEFAULT_EXPIRE_TIME = 50; // 当序列被锁时间超时,防止死锁 public DisasterIndexDO(int namespace, int bIndex, int cIndex, long expireLockTime) { this.namespace = namespace; this.beginIndex = bIndex; this.currentIndex = cIndex; this.expireLockTime = expireLockTime; } }
//容灾实现类 public class DisasterCacheHandler { RemoteCache remoteCache; private final int DS_CACHE_NAMESPACE = 67; private final int DS_WRITE_REPETECOUNT = 3; private final int DISASTER_INDEX = 250; private final String DISASTER_KEYS = "disaster_keys"; //容灾读 public Object dsGetRemoteData(int namespace, String key){ return remoteCache.get(DS_CACHE_NAMESPACE, namespace+key); } //本地同步的namespace private Map<Integer, Object> synNamespace = new HashMap<Integer, Object>(); //容灾写 protected boolean dsWriteRemoteData(int namespace, String key, Serializable value) { //先把数据写入缓存 remoteCache.put(this.DS_CACHE_NAMESPACE, namespace+key, value); //本地同步的NameSpace if (!this.synNamespace.containsKey(Integer.valueOf(namespace))) { this.synNamespace.put(Integer.valueOf(namespace), namespace); } // update the Namespace Index and namespace disaster key queue synchronized (this.synNamespace.get(namespace)){ int count = this.DS_WRITE_REPETECOUNT; DisasterIndexDO index = null; do { count--; // try to lock Namespace Index int rc = remoteCache.lock(this.DISASTER_INDEX, String.valueOf(namespace)); // Namespace Index not exist if ( rc == 2 ) { // Initialize Namespace Index index = new DisasterIndexDO(namespace, 1, 1, System.currentTimeMillis()); remoteCache.put(this.DISASTER_INDEX, String.valueOf(namespace), index); // for each namespace should handle disaster, keep the only index object remoteCache.put(namespace, this.DISASTER_KEYS + index.currentIndex, key); return true; } else if (rc == 0) { // lock failure index = (DisasterIndexDO)remoteCache. get(this.DISASTER_INDEX, String.valueOf(namespace)); // 如果鎖已經超時,則解开,避免在访问缓存时死住的情况 if (System.currentTimeMillis() - index.expireLockTime > DisasterIndexDO.DEFAULT_EXPIRE_TIME) { remoteCache.unLock(this.DISASTER_INDEX, String.valueOf(namespace)); continue; } continue; } else if (rc == 1) { // lock success try { index = (DisasterIndexDO)remoteCache.get(this.DISASTER_INDEX, String.valueOf(namespace)); // update locked Namespace Index int curIdx = index.currentIndex + 1; remoteCache.delete(this.DISASTER_INDEX, String.valueOf(namespace)); remoteCache.put(this.DISASTER_INDEX, String.valueOf(namespace), new DisasterIndexDO(namespace, index.beginIndex, curIdx, System.currentTimeMillis())); // keep key of this Namespace with current index remoteCache.put(namespace, this.DISASTER_KEYS + curIdx, key); return true; } catch (Throwable e ) { } finally { // unlock and handle unlock failure remoteCache.unLock(this.DISASTER_INDEX, String.valueOf(namespace)); } } } while (count >= 0); if (count <= 0) { return false; } } } }