分布式改造剧集三:Ehcache分布式改造

第三集:分布式Ehcache缓存改造

前言

? 好久没有写博客了,大有半途而废的趋势。忙不是借口,这个好习惯还是要继续坚持。前面我承诺的第一期的DIY分布式,是时候上终篇了---DIY分布式缓存。


探索之路

? 在前面的文章中,我给大家大致说过项目背景:项目中的缓存使用的是Ehcache。因为前面使用Ehcache的应用就一台,所以这种单机的Ehcache并不会有什么问题。现在分布式部署之后,如果各个应用之间的缓存不能共享,那么其实各自就是一个孤岛。可能在一个业务跑下来,请求了不同的应用,结果在缓存中取出来的值不一样,

造成数据不一致。所以需要重新设计缓存的实现。

? 因为尽量不要引入新的中间件,所以改造仍然是围绕Ehcache来进行的。搜集了各种资料之后,发现Ehcache实现分布式缓存基本有以下两种思路:

  • 客户端实现分布式算法: 在使用Ehcache的客户端自己实现分布式算法。

算法的基本思路就是取模:即假设有三台应用(编号假设分别为0,1,2),对于一个要缓存的对象,首先计算其key的hash值,然后将hash值模3,得到的余数是几,就将数据缓存到哪台机器。

  • 同步冗余数据: Ehcache是支持集群配置的,集群的各个节点之间支持按照一定的协议进行数据同步。这样每台应用其实缓存了一整份数据,不同节点之间的数据是一致的。

? 虽然冗余的办法显得有点浪费资源,但是我最终还是选择了冗余。具体原因有以下几点:

  • 分布式算法的复杂性: 前面所讲的分布式算法只是最基本的实现。事实上实现要比这个复杂的多。需要考虑增加或者删除节点的情况,需要使用更加复杂的一致性hash算法
  • 可能导致整个应用不可用: 当删除节点之后,如果算法不能够感知进行自动调整,仍然去请求那个已经被删除的节点,可能导致整个系统不可用。

Demo

? 最终我的实现采用RMI的方式进行同步

配置ehcache

? spring-ehcache-cache.xml

<?xml version="1.0" encoding="UTF-8"?>
<ehcache xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:noNamespaceSchemaLocation="http://ehcache.org/ehcache.xsd" name="businessCaches">

  <diskStore path="java.io.tmpdir/ehcache"/>
  <cache name="business1Cache"
         maxElementsInMemory="10000000"
         eternal="true"
         overflowToDisk="false"
         memoryStoreEvictionPolicy="LRU">
         <cacheEventListenerFactory
            class="net.sf.ehcache.distribution.RMICacheReplicatorFactory"/>
    </cache>

  <cache name="business2Cache"
         maxElementsInMemory="100"
         eternal="true"
         overflowToDisk="false"
         memoryStoreEvictionPolicy="LRU">
         <cacheEventListenerFactory
            class="net.sf.ehcache.distribution.RMICacheReplicatorFactory"/>
    </cache>

  <!-- cache发布信息配置,人工发现peerDiscovery=manual,cacheNames可配置多个缓存名称,以|分割 ) -->
   <cacheManagerPeerProviderFactory
        class="com.rampage.cache.distribute.factory.DisRMICacheManagerPeerProviderFactory"
        properties="peerDiscovery=manual, cacheNames=business1Cache|business2Cache" />

  <!-- 接收同步cache信息的地址 -->
 <cacheManagerPeerListenerFactory
        class="com.rampage.cache.distribute.factory.DisRMICacheManagerPeerListenerFactory"
        properties="socketTimeoutMillis=2000" />
</ehcache>

? spring-cache.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:cache="http://www.springframework.org/schema/cache"
    xmlns:context="http://www.springframework.org/schema/context"
    xmlns:aop="http://www.springframework.org/schema/aop"
    xsi:schemaLocation="http://www.springframework.org/schema/cache http://www.springframework.org/schema/cache/spring-cache.xsd
        http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.2.xsd
        http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.2.xsd"
        default-autowire="byName">
    <!-- 包扫描 -->
    <context:component-scan base-package="com.rampage.cache" />
    <!-- 启用Cache注解 -->
    <cache:annotation-driven cache-manager="cacheManager"
        key-generator="keyGenerator" proxy-target-class="true" />
    <!-- 自定义的缓存key生成类,需实现org.springframework.cache.interceptor.KeyGenerator接口 -->
    <bean id="keyGenerator" class="com.rampage.cache.support.CustomKeyGenerator" />
    <!-- 替换slite的ehcache实现 -->
    <bean id="ehCacheManagerFactory" class="org.springframework.cache.ehcache.EhCacheManagerFactoryBean">
        <property name="configLocation" value="classpath:spring/cache/sppay-ehcache-cache.xml"/>
        <!-- value对应前面ehcache文件定义的manager名称 -->
        <property name="cacheManagerName" value="businessCaches" />
    </bean>
    <bean id="ehCacheManager" class="org.springframework.cache.ehcache.EhCacheCacheManager">
        <property name="cacheManager" ref="ehCacheManagerFactory"/>
    </bean>
    <bean id="cacheManager" class="org.springframework.cache.support.CompositeCacheManager">
        <property name="cacheManagers">
            <list>
                <ref bean="ehCacheManager" />
            </list>
        </property>
        <property name="fallbackToNoOpCache" value="true" />
    </bean>
</beans>

实现自定义转发和监听

? 细心的读者应该不难发现,前面xml配置中cacheManagerPeerProviderFactorycacheManagerPeerListenerFactory我使用的都是自定义的类。之所以使用自定义的类,是为了在初始化的时候发布的地址和端口,监听的地址端口可以在配置文件配置。具体类的实现如下:

/**
 * 分布式EhCache监听工厂
 * @author secondWorld
 *
 */
public class DisRMICacheManagerPeerListenerFactory extends RMICacheManagerPeerListenerFactory {

    private static final Logger LOGGER = LoggerFactory.getLogger(DisRMICacheManagerPeerListenerFactory.class);

    /**
     * 配置文件中配置的监听地址,可以不配置,默认为本机地址
     */
    private static final String LISTEN_HOST = "distribute.ehcache.listenIP";

    /**
     * 配置文件中配置的监听端口
     */
    private static final String LISTEN_PORT = "distribute.ehache.listenPort";

    @Override
    protected CacheManagerPeerListener doCreateCachePeerListener(String hostName, Integer port,
            Integer remoteObjectPort, CacheManager cacheManager, Integer socketTimeoutMillis) {
        // xml中hostName为空,则读取配置文件(app-config.properties)中的值
        if (StringUtils.isEmpty(hostName)) {
            String propHost = AppConfigPropertyUtils.get(LISTEN_HOST);
            if (StringUtils.isNotEmpty(propHost)) {
                hostName = propHost;
            }
        }

        // 端口采用默认端口0,则去读取配置文件(app-config.properties)中的值
        if (port != null && port == 0) {
            Integer propPort = null;
            try {
                propPort = Integer.parseInt(AppConfigPropertyUtils.get(LISTEN_PORT));
            } catch (NumberFormatException e) {
            }
            if (propPort != null) {
                port = propPort;
            }
        }

        LOGGER.info(
                "Initiliazing DisRMICacheManagerPeerListenerFactory:cacheManager[{}], hostName[{}], port[{}], remoteObjectPort[{}], socketTimeoutMillis[{}]......",
                cacheManager, hostName, port, remoteObjectPort, socketTimeoutMillis);

        return super.doCreateCachePeerListener(hostName, port, remoteObjectPort, cacheManager, socketTimeoutMillis);
    }
}

/**
 * 分布式EhCache发布工厂
 *
 * @author secondWorld
 *
 */
public class DisRMICacheManagerPeerProviderFactory extends RMICacheManagerPeerProviderFactory {

    private static final Logger LOGGER = LoggerFactory.getLogger(DisRMICacheManagerPeerProviderFactory.class);

    private static final String CACHENAME_DELIMITER = "|";

    private static final String PROVIDER_ADDRESSES = "distribute.ehcache.providerAddresses";

    private static final String CACHE_NAMES = "cacheNames";

    /**
     * rmi地址格式: //127.0.0.1:4447/Cache1|//127.0.0.1:4447/Cache2
     */
    @Override
    protected CacheManagerPeerProvider createManuallyConfiguredCachePeerProvider(Properties properties) {
        // 从app-config.properties中读取发布地址列表
        String providerAddresses = AppConfigPropertyUtils.get(PROVIDER_ADDRESSES, StringUtils.EMPTY);
        // 从ehcache配置文件读取缓存名称
        String cacheNames = PropertyUtil.extractAndLogProperty(CACHE_NAMES, properties);

        // 参数校验,这里发布地址和缓存名称都不能为空
        if (StringUtils.isEmpty(providerAddresses) || StringUtils.isEmpty(cacheNames)) {
            throw new IllegalArgumentException("Elements \"providerAddresses\" and \"cacheNames\" are needed!");
        }

        // 解析地址列表
        List<String> cachesNameList = getCacheNameList(cacheNames);
        List<String> providerAddressList = getProviderAddressList(providerAddresses);

        // 注册发布节点
        RMICacheManagerPeerProvider rmiPeerProvider = new ManualRMICacheManagerPeerProvider();
        StringBuilder sb = new StringBuilder();
        for (String cacheName : cachesNameList) {
            for (String providerAddress : providerAddressList) {
                sb.setLength(0);
                sb.append("//").append(providerAddress).append("/").append(cacheName);
                rmiPeerProvider.registerPeer(sb.toString());
                LOGGER.info("Registering peer provider [{}]", sb);
            }
        }

        return rmiPeerProvider;
    }

    /**
     * 得到发布地址列表
     * @param providerAddresses 发布地址字符串
     * @return 发布地址列表
     */
    private List<String> getProviderAddressList(String providerAddresses) {
        StringTokenizer stringTokenizer = new StringTokenizer(providerAddresses,
                AppConfigPropertyUtils.APP_ITEM_DELIMITER);

        List<String> ProviderAddressList = new ArrayList<String>(stringTokenizer.countTokens());
        while (stringTokenizer.hasMoreTokens()) {
            String providerAddress = stringTokenizer.nextToken();
            providerAddress = providerAddress.trim();
            ProviderAddressList.add(providerAddress);
        }

        return ProviderAddressList;
    }

    /**
     * 得到缓存名称列表
     * @param cacheNames 缓存名称字符串
     * @return 缓存名称列表
     */
    private List<String> getCacheNameList(String cacheNames) {
        StringTokenizer stringTokenizer = new StringTokenizer(cacheNames, CACHENAME_DELIMITER);

        List<String> cacheNameList = new ArrayList<String>(stringTokenizer.countTokens());
        while (stringTokenizer.hasMoreTokens()) {
            String cacheName = stringTokenizer.nextToken();
            cacheName = cacheName.trim();
            cacheNameList.add(cacheName);
        }

        return cacheNameList;
    }

    @Override
    protected CacheManagerPeerProvider createAutomaticallyConfiguredCachePeerProvider(CacheManager cacheManager,
            Properties properties) throws IOException {
        throw new UnsupportedOperationException("Not supported automatic distribute cache!");
    }
}

配置

? 假设有三台机器,则他们分别得配置如下:

#应用1,在4447端口监听
#缓存同步消息发送地址(如果同步到多台需要配置多台地址,多台地址用英文逗号分隔)
distribute.ehcache.providerAddresses=127.0.0.1:4446,127.0.0.1:4448
#缓存同步监听端口和IP
distribute.ehache.listenPort=4447
distribute.ehcache.listenIP=localhost

#应用2,在4448端口监听
#缓存同步消息发送地址(如果同步到多台需要配置多台地址,多台地址用英文逗号分隔)
distribute.ehcache.providerAddresses=127.0.0.1:4446,127.0.0.1:4447
#缓存同步监听端口和IP
distribute.ehache.listenPort=4448
distribute.ehcache.listenIP=localhost

#应用3,在4446端口监听
#缓存同步消息发送地址(如果同步到多台需要配置多台地址,多台地址用英文逗号分隔)
distribute.ehcache.providerAddresses=127.0.0.1:4447,127.0.0.1:4448
#缓存同步监听端口和IP
distribute.ehache.listenPort=4446
distribute.ehcache.listenIP=localhost

使用

? 使用的时候直接通过Spring的缓存注解即可。简单的示例如下:

@CacheConfig("business1Cache")
@Component
public class Business1 {
    @Cacheable
    public String getData(String key) {
        // TODO:...
    }
}

说明

? 前面的实现是通过RMI的方式来实现缓存同步的,相对来说RMI的效率还是很快的。所以如果不需要实时的缓存一致性,允许少许延迟,那么这种方式的实现足够。


总结

? 到这篇完成,分布式改造的第一章算是告一段落了。对于分布式,如果可以选择,必然要选择现在成熟的框架。但是项目有很多时候,由于各种历史原因,必须要在原来的基础上改造。这个时候,希望我写的这个系列对大家有所帮助。造轮子有时候就是这么简单。


相关链接

原文地址:https://www.cnblogs.com/Kidezyq/p/9243878.html

时间: 2024-10-17 01:32:47

分布式改造剧集三:Ehcache分布式改造的相关文章

EhCache 分布式缓存/缓存集群

开发环境: System:Windows JavaEE Server:tomcat5.0.2.8.tomcat6 JavaSDK: jdk6+ IDE:eclipse.MyEclipse 6.6 开发依赖库: JDK6. JavaEE5.ehcache-core-2.5.2.jar Email:[email protected] Blog:http://blog.csdn.net/IBM_hoojo http://hoojo.cnblogs.com/ http://hoojo.blogjava.

分布式机器学习的集群方案介绍之HPC实现

机器学习的基本概念 机器学习方法是计算机利用已有的数据(经验),得出了某种模型(迟到的规律),并利用此模型预测未来(是否迟到)的一种方法.目前机器学习广泛应用于广告投放.趋势预测.图像识别.语音识别.自动驾驶和产品推荐等众多领域. 在确定了问题模型之后,根据已知数据寻找模型参数的过程就是训练,训练过程就是不断依据训练数据来调整参数的迭代,从而使依据模型作出的预测结果更加准确. HPC的基本概念 HPC就是高性能计算或高性能计算集群的简写.为了追求高性能,HPC的工作负载一般直接运行在Linux系

Apache Spark探秘:三种分布式部署方式比较

目前Apache Spark支持三种分布式部署方式,分别是standalone.spark on mesos和 spark on YARN,其中,第一种类似于MapReduce 1.0所采用的模式,内部实现了容错性和资源管理,后两种则是未来发展的趋势,部分容错性和资源管理交由统一的资源管理系统完成:让Spark运行在一个通用的资源管理系统之上,这样可以与其他计算框架,比如MapReduce,公用一个集群资源,最大的好处是降低运维成本和提高资源利用率(资源按需分配).本文将介绍这三种部署方式,并比

从集中式到分布式

前言 随着计算机系统规模变得越来越大,将所有业务单元集中部署在一个或者若干个大型机 上的体系结构物,已经越来越不能满足当今计算机系统,尤其是大型互联网系统的快速发展,各种灵活多变的系统架构模型层出不穷.同时,随着微型计算机的出 现,越来越多廉价的PC机成为了各大IT企业架构的首选,分布式的处理方式越来越受到业界的青睐----计算机系统正在经历一场前所未有的从集中式到分布 式架构的变革. 从集中式到分布式 自从20世纪60年代大型主机被发明出来以后,凭借其超强的计算和I/O处理能力 以及在稳定性和

集群与分布式 如何理解(自己的理解)

集群 与 分布式 如何理解 一. 概念理解 集群的理解:就是一台计算机处理不了太多的事情 ,需要大家组合在一起来完成一件事情. 分布式的理解:就是把一件事情分成好几个部分 ,每个人做其中的一部分. 二.图解 分布式 负责均衡集群 三.集群的种类: 负载均衡集群(Load Balance):Nginx反向代理  .DNS 轮训巡.LVS(Nat 模式 .隧道模式.直连路由) 高可用集群(High Availabe):Keepalive .Heartbeat 并行计算集群(HPC):不了解 申明:这

Hadoop学习笔记_4_实施Hadoop集群 --伪分布式安装

实施Hadoop集群 --伪分布式安装 准备与配置安装环境 安装虚拟机和linux,虚拟机推荐使用vmware,PC可以使用workstation,服务器可以使用ESXi,在管理上比较方便.ESXi还可以通过拷贝镜像文件复制虚拟机,复制后自动修改网卡号和ip,非常快捷.如果只是实验用途,硬盘大约预留20-30G空间. 以Centos为例,分区可以选择默认[如果想要手动分区,请参考博客:http://blog.csdn.net/zjf280441589/article/details/175485

彩讯重磅推出6000系列处理器--成功将集中式和分布式两种处理器有机融为一体

大屏拼接系统作为一种高端显示系统,目前被广泛应用于社会各行各业,每个行业对大屏的显示方式及实现功能的要求并不一致,因此大屏显示系统就需要采用不同的图像拼接处理器来满足这些不同要求.处理器和拼接屏.控制系统有机组成了大屏显示系统.而大屏显示系统功能的最终差异则越来越多地取决于图像处理控制器系统(包括硬件和软件)的性能.    早在2000年北京彩讯科技股份有限公司(TRIOLION)成立之初,公司的三位原始股东就研制出了国内第一台PC架构的拼接控制器,从那时起彩讯就成为了以大屏幕拼接控制器的研发生

(一)从集中式到分布式

前言 随着计算机系统规模变得越来越大,将所有业务单元集中部署在一个或者若干个大型机上的体系结构物,已经越来越不能满足当今计算机系统,尤其是大型互联网系统的快速发展,各种灵活多变的系统架构模型层出不穷.同时,随着微型计算机的出现,越来越多廉价的PC机成为了各大IT企业架构的首选,分布式的处理方式越来越受到业界的青睐----计算机系统正在经历一场前所未有的从集中式到分布式架构的变革. 从集中式到分布式 自从20世纪60年代大型主机被发明出来以后,凭借其超强的计算和I/O处理能力以及在稳定性和安全性方

源代码管理——git(分布式版本控制和集中式版本控制对比,git和SVN对比,git常用指令,搭建GitHub远程仓库,搭建oschina远程仓库 )

一.git简介 什么是git? git是一款开源的分布式版本控制工具 在世界上所有的分布式版本控制工具中,git是最快.最简单.最流行的 git的起源 作者是Linux之父:Linus Benedict Torvalds 当初开发git仅仅是为了辅助Linux内核的开发(管理源代码) git的现状 在国外已经非常普及,国内并未普及(在慢慢普及) 越来越多的开源项目已经转移到git CVS 最早的开源.免费的集中式版本控制工具 自身设计有问题,会造成提交文件不完整,版本库莫名其妙损坏的情况 SVN