dubbo源码阅读-Filter默认实现(十一)之CacheFiler

API文档

http://dubbo.apache.org/zh-cn/docs/user/demos/result-cache.html

缓存使用例子

可参考:https://blog.csdn.net/hardworking0323/article/details/81293402

CacheFilter

/**
 * CacheFilter
 * group为consumer或者provider 同时 含有cache=的配置的时候 返回此过滤器
 */
@Activate(group = {Constants.CONSUMER, Constants.PROVIDER}, value = Constants.CACHE_KEY)
public class CacheFilter implements Filter {

    /**
     * CacheFactory$Adaptive 对象。
     * <p>
     * X * 通过 Dubbo SPI 机制,调用 {@link #setCacheFactory(CacheFactory)} 方法,进行注入
     */
    private CacheFactory cacheFactory;

    public void setCacheFactory(CacheFactory cacheFactory) {
        this.cacheFactory = cacheFactory;
    }

    @Override
    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
        //方法开启 Cache 功能 判断method是否配置了cache
        if (cacheFactory != null && ConfigUtils.isNotEmpty(invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.CACHE_KEY))) {
            //通过cacheFactory获取cache对象
            Cache cache = cacheFactory.getCache(invoker.getUrl(), invocation);
            if (cache != null) {
                String key = StringUtils.toArgumentString(invocation.getArguments());
                Object value = cache.get(key);
                if (value != null) {
                    return new RpcResult(value);
                }
                Result result = invoker.invoke(invocation);
                //没发生异常 存入cache
                if (!result.hasException() && result.getValue() != null) {
                    cache.put(key, result.getValue());
                }
                return result;
            }
        }
        return invoker.invoke(invocation);
    }

}

CacheFactory

接口定义

//默认是lru
@SPI("lru")
public interface CacheFactory {

    //是否带SPI扩展根据cache参数获取
    @Adaptive("cache")
    Cache getCache(URL url, Invocation invocation);

}

AbstractCacheFactory

/**
 * AbstractCacheFactory
 */
public abstract class AbstractCacheFactory implements CacheFactory {

    private final ConcurrentMap<String, Cache> caches = new ConcurrentHashMap<String, Cache>();

    /**
     * 模板方法模式 父类通过将cacheKey存入缓存 子类负责创建Cache
     * @param url
     * @param invocation
     * @return
     */
    @Override
    public Cache getCache(URL url, Invocation invocation) {
        url = url.addParameter(Constants.METHOD_KEY, invocation.getMethodName());
        //获取Key 并从缓存获取
        String key = url.toFullString();
        Cache cache = caches.get(key);
        if (cache == null) {
            //交给子类创建缓存
            caches.put(key, createCache(url));
            cache = caches.get(key);
        }
        return cache;
    }

    protected abstract Cache createCache(URL url);

}

Lru实现

基于最近最少使用原则删除多余缓存,保持最热的数据被缓存。

LruCacheFactory

/**
 * LruCacheFactory
 */
public class LruCacheFactory extends AbstractCacheFactory {

    @Override
    protected Cache createCache(URL url) {
        return new LruCache(url);
    }

}

LruCache

public class LruCache implements Cache {

    /**
     * 缓存集合
     */
    private final Map<Object, Object> store;

    public LruCache(URL url) {
        // `"cache.size"` 配置项,设置缓存大小
        final int max = url.getParameter("cache.size", 1000);
        // 创建 LRUCache 对象 内部采用likendMap实现
        this.store = new LRUCache<Object, Object>(max);
    }

    @Override
    public void put(Object key, Object value) {
        store.put(key, value);
    }

    @Override
    public Object get(Object key) {
        return store.get(key);
    }

}

ThreadLocal实现

ThreadLocalCacheFactory

/**
 * ThreadLocalCacheFactory
 */
public class ThreadLocalCacheFactory extends AbstractCacheFactory {

    @Override
    protected Cache createCache(URL url) {
        return new ThreadLocalCache(url);
    }

}

ThreadLocalCache

/**
 * ThreadLocalCache
 */
public class ThreadLocalCache implements Cache {

    private final ThreadLocal<Map<Object, Object>> store;

    public ThreadLocalCache(URL url) {
        this.store = new ThreadLocal<Map<Object, Object>>() {
            @Override
            protected Map<Object, Object> initialValue() {
                return new HashMap<Object, Object>();
            }
        };
    }

    @Override
    public void put(Object key, Object value) {
        store.get().put(key, value);
    }

    @Override
    public Object get(Object key) {
        return store.get().get(key);
    }

}

JCache实现

与 JSR107 集成,可以桥接各种缓存实现。

https://www.cnblogs.com/MagicAsa/p/10756331.html

JCacheFactory

/**
 * JCacheFactory
 */
public class JCacheFactory extends AbstractCacheFactory {

    @Override
    protected Cache createCache(URL url) {
        return new JCache(url);
    }

}

JCache

public class JCache implements com.alibaba.dubbo.cache.Cache {

    private final Cache<Object, Object> store;

    public JCache(URL url) {
        String method = url.getParameter(Constants.METHOD_KEY, "");
        String key = url.getAddress() + "." + url.getServiceKey() + "." + method;
        // jcache parameter is the full-qualified class name of SPI implementation
        String type = url.getParameter("jcache");

        CachingProvider provider = type == null || type.length() == 0 ? Caching.getCachingProvider() : Caching.getCachingProvider(type);
        CacheManager cacheManager = provider.getCacheManager();
        Cache<Object, Object> cache = cacheManager.getCache(key);
        if (cache == null) {
            try {
                //configure the cache
                MutableConfiguration config =
                        new MutableConfiguration<Object, Object>()
                                .setTypes(Object.class, Object.class)
                                .setExpiryPolicyFactory(CreatedExpiryPolicy.factoryOf(new Duration(TimeUnit.MILLISECONDS, url.getMethodParameter(method, "cache.write.expire", 60 * 1000))))
                                .setStoreByValue(false)
                                .setManagementEnabled(true)
                                .setStatisticsEnabled(true);
                cache = cacheManager.createCache(key, config);
            } catch (CacheException e) {
                // concurrent cache initialization
                cache = cacheManager.getCache(key);
            }
        }

        this.store = cache;
    }

    @Override
    public void put(Object key, Object value) {
        store.put(key, value);
    }

    @Override
    public Object get(Object key) {
        return store.get(key);
    }

}

总结

我们可以通过JCCache扩展我们的缓存 比如redis缓存

原文地址:https://www.cnblogs.com/LQBlog/p/12503450.html

时间: 2024-11-09 06:08:29

dubbo源码阅读-Filter默认实现(十一)之CacheFiler的相关文章

【Dubbo源码阅读系列】服务暴露之远程暴露

引言 什么叫 远程暴露 ?试着想象着这么一种场景:假设我们新增了一台服务器 A,专门用于发送短信提示给指定用户.那么问题来了,我们的 Message 服务上线之后,应该如何告知调用方服务器,服务器 A 提供了 Message 功能?那么我们是不是可以把目前已提供的服务暴露在一个地方,让调用方知道某台机器提供了某个特定功能?带着这样的假设,我们今天就来聊聊 Dubbo 服务暴露之远程暴露!! 服务远程暴露 先回顾一下上篇文章,上篇文章我们聊到了 ServiceConfig 的 export() 方

【Dubbo源码阅读系列】之远程服务调用(上)

今天打算来讲一讲 Dubbo 服务远程调用.笔者在开始看 Dubbo 远程服务相关源码的时候,看的有点迷糊.后来慢慢明白 Dubbo 远程服务的调用的本质就是动态代理模式的一种实现.本地消费者无须知道远程服务具体的实现,消费者和提供者通过代理类来进行交互!! 一.JAVA 动态代理 简单看一段代码回顾一下动态代理: public class MyInvocationHandler implements InvocationHandler{ private Object object; publi

dubbo源码阅读笔记--服务调用时序

上接dubbo源码阅读笔记--暴露服务时序,继续梳理服务调用时序,下图右面红线流程. 整理了调用时序图 分为3步,connect,decode,invoke. 连接 AllChannelHandler.connected(Channel) line: 38 HeartbeatHandler.connected(Channel) line: 47 MultiMessageHandler(AbstractChannelHandlerDelegate).connected(Channel) line:

dubbo源码阅读-阅读前的准备(一)

说明 之前自己看了一篇dubbo源码,但是对整体还是没有清晰的了解所以重新跟着别人的博文整理一遍 获取源码 github代码fork 1.打开https://github.com/alibaba/dubbo fork到自己仓库,为了后续自己看的过程中会写一些自己的注释 从自己仓库将项目拉取到本地 模块划分 项目结构一览 官方dubbo框架设计文档地址 dubbo-common 提供 Util 类和通用模型 dubbo-remoting 远程通信模块:提供通用的客户端和服务端的通讯功能. 原文地址

dubbo源码阅读之服务导出

dubbo服务导出 常见的使用dubbo的方式就是通过spring配置文件进行配置.例如下面这样 <?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns

dubbo源码阅读-ProxyFactory(十一)之StubProxyFactoryWrapper本地存根

说明 Wrapper调用时机可以看:https://www.cnblogs.com/LQBlog/p/12470179.html#autoid-2-0-0 /** * StubProxyFactoryWrapper */ public class StubProxyFactoryWrapper implements ProxyFactory { private static final Logger LOGGER = LoggerFactory.getLogger(StubProxyFactor

Dubbo源码阅读笔记3

### 扩展点加载(ExtensionLoader) 每一种类型的扩展点都有一个ExtensionLoader实例 变量说明 public class ExtensionLoader<T> { // dubbo服务扫描目录 private static final String SERVICES_DIRECTORY = "META-INF/services/"; // dubbo扩展点配置扫描目录(自定义扩展时使用此目录) private static final Stri

dubbo源码阅读之负载均衡

负载均衡 在之前集群的文章中,我们分析了通过监听注册中心可以获取到多个服务提供者,并创建多个Invoker,然后通过集群类如FailoverClusterInvoker将多个Invoker封装在一起,而外部的调用者以这个封装的Invoker为入口调用内部的多个Invoker,但是我们一次调用实际只能调用一个真实的Invoker(这里的真实的Invoker对应一个提供者),所以怎么在多个Invoker中选择出一个Invoker来,使得整体的服务调用的性能最大化,这就是负载均衡策略.另外,除了负载均

Dubbo源码阅读笔记2

### 消费方初始化 消费方初始化的入口在ReferenceConfig类的get方法 前面基本和服务方的初始化一致 public class ReferenceConfig<T> extends AbstractReferenceConfig { private static final long serialVersionUID = -5864351140409987595L; private static final Protocol refprotocol = ExtensionLoa