背景
有时我们能够使用缓存进行容灾的处理。场景例如以下:我们当前有一个专门提供各种数据的应用DataCore,该应用开放多个RFC方法供其它应用使用。
我们平时在读写数据时,会在Cache备份一份(为平时DataCore提高响应速度、减少DB、CPU压力所用),当DB挂掉的时候。Cache还能够用来容灾。使用缓存容灾的优点是:性能足够好,坏处是缓存可比数据库成本高多了。
让我们想象得更猛烈些,当DataCore整个挂掉的时候,A、B、C、D方怎么才干安然的执行下去?
我们能够在A、B、C、D端上提供DataCore的缓存容灾服务。这样。即使在DataCore整个挂掉的情况下,其它应用也不会受影响。
要考虑的几个问题
- 容灾读的部分不必说。对象原本在存入缓存时就依据类型分了区域,读的时候直接在对应的区域取出就可以。
容灾写的话,针对同一类型对象的写操作,怎样将这些对象集合在一块,待DataCore恢复后,再将容灾写过的对象覆盖到DB。 - 缓存的写操作必须是线程安全的
具体设计
缓存容灾写的一种可能策略是:针对每种类型的对象在缓存中开辟一大段储存空间(数组方式或者数组链接结合方式),然后把每一个容灾写的对象塞进这段空间内,在要覆写回DB时,直接从头到尾在缓存里把对象取出来就可以。示意图例如以下:
上面这样的设计缺点是须要一大片的连续的储存空间,对于缓存来说,这是要命的。
缓存的底层储存机制就是基于分散的hash。
上面设计的一种改进方案是。我们仅仅在连续空间中储存UserDO的唯一标示符。比方id或者key什么的。
这样我们的就不须要那么大的连续空间了。示意图例如以下:
更进一步,我们能够把UserDO的id也分散储存。能够利用一个DisasterIndexDO储存每个类型的容灾写的信息,利用beginIndex以及currentIndex字段为全部容灾写对象打上一个序号。在缓存中储存该序号与对象id的相应关系,然后我们就能够通过序号检索出id,再通过id检索出对象。
示意图例如以下:
在多个调用方在对某一类型的对象进行容灾写操作时,仅仅须要对DisasterIndexDO进行安全的并发訪问就可以,抢占currentIndex,然后再进行缓存的写操作。这样。我们的容灾写就实现了。
范例实现
//index对象 public class DisasterIndexDO implements Serializable { private static final long serialVersionUID = -8688243351154917184L; public int namespace; public int beginIndex; public int currentIndex; public long expireLockTime; public static final long DEFAULT_EXPIRE_TIME = 50; // 当序列被锁时间超时,防止死锁 public DisasterIndexDO(int namespace, int bIndex, int cIndex, long expireLockTime) { this.namespace = namespace; this.beginIndex = bIndex; this.currentIndex = cIndex; this.expireLockTime = expireLockTime; } }
//容灾实现类 public class DisasterCacheHandler { RemoteCache remoteCache; private final int DS_CACHE_NAMESPACE = 67; private final int DS_WRITE_REPETECOUNT = 3; private final int DISASTER_INDEX = 250; private final String DISASTER_KEYS = "disaster_keys"; //容灾读 public Object dsGetRemoteData(int namespace, String key){ return remoteCache.get(DS_CACHE_NAMESPACE, namespace+key); } //本地同步的namespace private Map<Integer, Object> synNamespace = new HashMap<Integer, Object>(); //容灾写 protected boolean dsWriteRemoteData(int namespace, String key, Serializable value) { //先把数据写入缓存 remoteCache.put(this.DS_CACHE_NAMESPACE, namespace+key, value); //本地同步的NameSpace if (!this.synNamespace.containsKey(Integer.valueOf(namespace))) { this.synNamespace.put(Integer.valueOf(namespace), namespace); } // update the Namespace Index and namespace disaster key queue synchronized (this.synNamespace.get(namespace)){ int count = this.DS_WRITE_REPETECOUNT; DisasterIndexDO index = null; do { count--; // try to lock Namespace Index int rc = remoteCache.lock(this.DISASTER_INDEX, String.valueOf(namespace)); // Namespace Index not exist if ( rc == 2 ) { // Initialize Namespace Index index = new DisasterIndexDO(namespace, 1, 1, System.currentTimeMillis()); remoteCache.put(this.DISASTER_INDEX, String.valueOf(namespace), index); // for each namespace should handle disaster, keep the only index object remoteCache.put(namespace, this.DISASTER_KEYS + index.currentIndex, key); return true; } else if (rc == 0) { // lock failure index = (DisasterIndexDO)remoteCache. get(this.DISASTER_INDEX, String.valueOf(namespace)); // 假设鎖已經超時。則解开,避免在訪问缓存时死住的情况 if (System.currentTimeMillis() - index.expireLockTime > DisasterIndexDO.DEFAULT_EXPIRE_TIME) { remoteCache.unLock(this.DISASTER_INDEX, String.valueOf(namespace)); continue; } continue; } else if (rc == 1) { // lock success try { index = (DisasterIndexDO)remoteCache.get(this.DISASTER_INDEX, String.valueOf(namespace)); // update locked Namespace Index int curIdx = index.currentIndex + 1; remoteCache.delete(this.DISASTER_INDEX, String.valueOf(namespace)); remoteCache.put(this.DISASTER_INDEX, String.valueOf(namespace), new DisasterIndexDO(namespace, index.beginIndex, curIdx, System.currentTimeMillis())); // keep key of this Namespace with current index remoteCache.put(namespace, this.DISASTER_KEYS + curIdx, key); return true; } catch (Throwable e ) { } finally { // unlock and handle unlock failure remoteCache.unLock(this.DISASTER_INDEX, String.valueOf(namespace)); } } } while (count >= 0); if (count <= 0) { return false; } } } }