Dubbo源码阅读笔记3

### 扩展点加载(ExtensionLoader)

每一种类型的扩展点都有一个ExtensionLoader实例

  1. 变量说明

    public class ExtensionLoader<T> {
    // dubbo服务扫描目录
    private static final String SERVICES_DIRECTORY = "META-INF/services/";
    // dubbo扩展点配置扫描目录(自定义扩展时使用此目录)
    private static final String DUBBO_DIRECTORY = "META-INF/dubbo/";
    // dubbo内部扩展点配置扫描目录
    private static final String DUBBO_INTERNAL_DIRECTORY = DUBBO_DIRECTORY + "internal/";
    
    private static final Pattern NAME_SEPARATOR = Pattern.compile("\\s*[,]+\\s*");
    
    // 缓存ExtensionLoader
    private static final ConcurrentMap<Class<?>, ExtensionLoader<?>> EXTENSION_LOADERS = new ConcurrentHashMap<Class<?>, ExtensionLoader<?>>();
    // 缓存扩展点实例
    private static final ConcurrentMap<Class<?>, Object> EXTENSION_INSTANCES = new ConcurrentHashMap<Class<?>, Object>();
    
    // 前面是常量,以下是变量
    // ==============================
    
    // 当前扩展点的接口类型
    private final Class<?> type;
    
    // 对象工厂
    private final ExtensionFactory objectFactory;
    
    private final ConcurrentMap<Class<?>, String> cachedNames = new ConcurrentHashMap<Class<?>, String>();
    
    // 该扩展点类型所有配置的实现类类型
    private final Holder<Map<String, Class<?>>> cachedClasses = new Holder<Map<String, Class<?>>>();
    
    // 配置中自适应扩展的注解信息
    private final Map<String, Activate> cachedActivates = new ConcurrentHashMap<String, Activate>();
    
    // 扩展点实例
    private final ConcurrentMap<String, Holder<Object>> cachedInstances = new ConcurrentHashMap<String, Holder<Object>>();
    
    // 自适应扩展点实例
    private final Holder<Object> cachedAdaptiveInstance = new Holder<Object>();
    
    // 自适应扩展点类型
    private volatile Class<?> cachedAdaptiveClass = null;
    
    // 默认扩展点的名
    private String cachedDefaultName;
    
    // 包装类类型
    private Set<Class<?>> cachedWrapperClasses;
    
    private volatile Throwable createAdaptiveInstanceError;
    
    private Map<String, IllegalStateException> exceptions = new ConcurrentHashMap<String, IllegalStateException>();
    
    // ...
    }
  2. 初始化

先从全局缓存里面取,如果取不到则新建

public static <T> ExtensionLoader<T> getExtensionLoader(Class<T> type) {
    if (type == null)
        throw new IllegalArgumentException("Extension type == null");
    if (!type.isInterface()) {
        throw new IllegalArgumentException("Extension type(" + type + ") is not interface!");
    }
    if (!withExtensionAnnotation(type)) {
        throw new IllegalArgumentException("Extension type(" + type +
                ") is not extension, because WITHOUT @" + SPI.class.getSimpleName() + " Annotation!");
    }

    ExtensionLoader<T> loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type);
    if (loader == null) {
        EXTENSION_LOADERS.putIfAbsent(type, new ExtensionLoader<T>(type));
        loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type);
    }
    return loader;
}

ExtensionLoader构建方法,保存扩展点接口类型和对象工厂

扩展点对象工厂也是从通过ExtensionLoader加载出来的

private ExtensionLoader(Class<?> type) {
    this.type = type;
    objectFactory = (type == ExtensionFactory.class ? null : ExtensionLoader.getExtensionLoader(ExtensionFactory.class).getAdaptiveExtension());
}
  1. 获取扩展点实例

先从缓存中取,如果没有则开始创建

Holder对象主要是上同步锁的时候用,锁在Holder级别,保证之后get和set方法原子性

public T getExtension(String name) {
    if (name == null || name.length() == 0)
        throw new IllegalArgumentException("Extension name == null");
    if ("true".equals(name)) {
        return getDefaultExtension();
    }
    Holder<Object> holder = cachedInstances.get(name);
    if (holder == null) {
        cachedInstances.putIfAbsent(name, new Holder<Object>());
        holder = cachedInstances.get(name);
    }
    Object instance = holder.get();
    if (instance == null) {
        synchronized (holder) {
            instance = holder.get();
            if (instance == null) {
                instance = createExtension(name);
                holder.set(instance);
            }
        }
    }
    return (T) instance;
}

createExtension是在同步块中调用的,所以不需要加synchroneized,是线程安全的

private T createExtension(String name) {
    // 取出对应类型
    Class<?> clazz = getExtensionClasses().get(name);
    if (clazz == null) {
        throw findException(name);
    }
    try {
        // 从缓存的实例取出,如果没有则新建
        T instance = (T) EXTENSION_INSTANCES.get(clazz);
        if (instance == null) {
            EXTENSION_INSTANCES.putIfAbsent(clazz, (T) clazz.newInstance());
            instance = (T) EXTENSION_INSTANCES.get(clazz);
        }

        // 给实例注入属性
        injectExtension(instance);

        // 如果有配置包装类,则实例化包装类并注入属性
        Set<Class<?>> wrapperClasses = cachedWrapperClasses;
        if (wrapperClasses != null && wrapperClasses.size() > 0) {
            for (Class<?> wrapperClass : wrapperClasses) {
                instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance));
            }
        }
        return instance;
    } catch (Throwable t) {
        throw new IllegalStateException("Extension instance(name: " + name + ", class: " +
                type + ")  could not be instantiated: " + t.getMessage(), t);
    }
}

// 获取所有扩展点类型的map,如果缓存中没有就从配置文件中取出
private Map<String, Class<?>> getExtensionClasses() {
    Map<String, Class<?>> classes = cachedClasses.get();
    if (classes == null) {
        synchronized (cachedClasses) {
            classes = cachedClasses.get();
            if (classes == null) {
                classes = loadExtensionClasses();
                cachedClasses.set(classes);
            }
        }
    }
    return classes;
}
private T injectExtension(T instance) {
    try {
        if (objectFactory != null) {
            for (Method method : instance.getClass().getMethods()) {
                // 只处理set开头,只有一个参数且是public的方法
                if (method.getName().startsWith("set")
                        && method.getParameterTypes().length == 1
                        && Modifier.isPublic(method.getModifiers())) {
                    Class<?> pt = method.getParameterTypes()[0];
                    try {
                        String property = method.getName().length() > 3 ? method.getName().substring(3, 4).toLowerCase() + method.getName().substring(4) : "";
                        // 从对象工厂中获取属性值,对象工厂中会递归注入值
                        Object object = objectFactory.getExtension(pt, property);
                        if (object != null) {
                            method.invoke(instance, object);
                        }
                    } catch (Exception e) {
                        logger.error("fail to inject via method " + method.getName()
                                + " of interface " + type.getName() + ": " + e.getMessage(), e);
                    }
                }
            }
        }
    } catch (Exception e) {
        logger.error(e.getMessage(), e);
    }
    return instance;
}

默认的对象工厂实现是AdaptiveExtensionFactory,其实就是SpringExtensionFactory和SpiExtensionFactory两个一起用。

主要看SpiExtensionFactory实现

可以看出这里进入了递归,直到相关扩展点全部加载完成

public <T> T getExtension(Class<T> type, String name) {
    if (type.isInterface() && type.isAnnotationPresent(SPI.class)) {
        ExtensionLoader<T> loader = ExtensionLoader.getExtensionLoader(type);
        if (loader.getSupportedExtensions().size() > 0) {
            return loader.getAdaptiveExtension();
        }
    }
    return null;
}

前面的代码是返回普通扩展点,接下来的是返回自适应扩展点,AdaptiveExtension

自适应扩展点不同的地方在于,不是直接返回扩展点实现,而是通过字节码技术生成一个代理类,

代理类会根据调用时的参数不同,再去选择不同的扩展点实现。也就是调用了获取扩展点的方法getExtension(name)

// 和普通扩展点基本一致
public T getAdaptiveExtension() {
    Object instance = cachedAdaptiveInstance.get();
    if (instance == null) {
        if (createAdaptiveInstanceError == null) {
            synchronized (cachedAdaptiveInstance) {
                instance = cachedAdaptiveInstance.get();
                if (instance == null) {
                    try {
                        instance = createAdaptiveExtension();
                        cachedAdaptiveInstance.set(instance);
                    } catch (Throwable t) {
                        createAdaptiveInstanceError = t;
                        throw new IllegalStateException("fail to create adaptive instance: " + t.toString(), t);
                    }
                }
            }
        } else {
            throw new IllegalStateException("fail to create adaptive instance: " + createAdaptiveInstanceError.toString(), createAdaptiveInstanceError);
        }
    }

    return (T) instance;
}

// 这里类型不是从getExtensionClasses中取而是getAdaptiveExtensionClass
private T createAdaptiveExtension() {
    try {
        return injectExtension((T) getAdaptiveExtensionClass().newInstance());
    } catch (Exception e) {
        throw new IllegalStateException("Can not create adaptive extenstion " + type + ", cause: " + e.getMessage(), e);
    }
}

private Class<?> getAdaptiveExtensionClass() {
    getExtensionClasses();
    if (cachedAdaptiveClass != null) {
        return cachedAdaptiveClass;
    }
    return cachedAdaptiveClass = createAdaptiveExtensionClass();
}

// 这里使用字节码技术,生成了代理类
private Class<?> createAdaptiveExtensionClass() {
    String code = createAdaptiveExtensionClassCode();
    ClassLoader classLoader = findClassLoader();
    com.alibaba.dubbo.common.compiler.Compiler compiler = ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.common.compiler.Compiler.class).getAdaptiveExtension();
    return compiler.compile(code, classLoader);
}

createAdaptiveExtensionClassCode代码太长就不贴出来了

这是其中一个扩展点生成的源代码,可以看出代码里根据url中的参数选择合适的扩展点实现

这些用反射用动态代理也是可以做的,不过效率肯定没字节码好,这个可以学习下。

package com.alibaba.dubbo.rpc;
import com.alibaba.dubbo.common.extension.ExtensionLoader;
public class Protocol$Adaptive implements com.alibaba.dubbo.rpc.Protocol {
    public void destroy() {
        throw new UnsupportedOperationException("method public abstract void com.alibaba.dubbo.rpc.Protocol.destroy() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!");
    }

    public int getDefaultPort() {
        throw new UnsupportedOperationException("method public abstract int com.alibaba.dubbo.rpc.Protocol.getDefaultPort() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!");
    }

    public com.alibaba.dubbo.rpc.Exporter export(com.alibaba.dubbo.rpc.Invoker arg0) throws com.alibaba.dubbo.rpc.RpcException {
        if (arg0 == null)
            throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null");
        if (arg0.getUrl() == null)
            throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");com.alibaba.dubbo.common.URL url = arg0.getUrl();

        String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol() );

        if(extName == null)
            throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");
        com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
        return extension.export(arg0);
    }

    public com.alibaba.dubbo.rpc.Invoker refer(java.lang.Class arg0, com.alibaba.dubbo.common.URL arg1) throws com.alibaba.dubbo.rpc.RpcException {
        if (arg1 == null)
            throw new IllegalArgumentException("url == null");

        com.alibaba.dubbo.common.URL url = arg1;
        String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol() );
        if(extName == null)
            throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");
        com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
        return extension.refer(arg0, arg1);
    }
}

为了方便理解下面上个小例子

新建一个maven项目,结构如下:

|--pom.xml
|--src
    |--main
        |--java
            |--com
                |--serviceloader
                    |--service
                        |--Book.java
                        |--Car.java
                        |--English.java
                        |--Honda.java
                        |--Human.java
                        |--Man.java
                    |--ServiceLoader.java
                    |--SPI.java
        |--resources
            |--config.properties

SPI注解,用来指定实现者

// SPI.java
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE})
public @interface SPI {
    String value() default "";
}

建3个接口,并加上注解,设置默认实现者

@SPI("english")
public interface Book {
    String read();
}

@SPI("honda")
public interface Car {
    void driver(String name);
}

// 也可以是woman
@SPI("man")
public interface Human {
    String sayHello();
}

以及实现者

public class Man implements Human {
    private Car car;
    private Book book;

    @Override
    public String sayHello() {
        return "hello man";
    }

    public Car getCar() {
        return car;
    }

    public void setCar(Car car) {
        this.car = car;
    }

    public Book getBook() {
        return book;
    }

    public void setBook(Book book) {
        this.book = book;
    }
}

public class Woman implements Human {
    @Override
    public String sayHello() {
        return "hello man";
    }
}

public class Honda implements Car {
    private Book book;

    @Override
    public void driver(String name) {
        System.out.println("i am " + name);
    }

    public Book getBook() {
        return book;
    }

    public void setBook(Book book) {
        this.book = book;
    }
}

public class English implements Book {
    @Override
    public String read() {
        return "hello my name is denis";
    }
}

配置文件,用来配置实现者的类型

man=com.serviceloader.service.Man
woman=com.serviceloader.service.Woman
english=com.serviceloader.service.English
honda=com.serviceloader.service.Honda

最后是服务加载器

public class ServiceLoader {
    private static ConcurrentMap<Class<?>, Object> SERVICE_INSTANCES = new ConcurrentHashMap<>();
    private static ConcurrentMap<String, Class<?>> SERVICE_CLASS;

    @SuppressWarnings("unchecked")
    public static <T> T get(Class<T> clazz) {
        if (SERVICE_CLASS == null) {
            SERVICE_CLASS = getServiceClass();
        }

        SPI spi = clazz.getAnnotation(SPI.class);
        if (spi == null) {
            throw new RuntimeException("不是SPI接口");
        }

        Class<?> targetClass = SERVICE_CLASS.get(spi.value());  // 这里可以根据其它配置更换实现者
        if (targetClass == null) {
            throw new RuntimeException("没有配置实现类型");
        }

        try {
            T instance = (T) SERVICE_INSTANCES.get(clazz);
            if (instance == null) {
                SERVICE_INSTANCES.putIfAbsent(clazz, targetClass.newInstance());
                instance = (T) SERVICE_INSTANCES.get(clazz);
            }

            injectExtension(instance);

            return instance;
        } catch (InstantiationException | IllegalAccessException e) {
            throw new RuntimeException(e.getMessage());
        }
    }

    /**
     * 注入属性
     *
     * @param instance
     * @param <T>
     */
    private static <T> void injectExtension(T instance) {
        Method[] methods = instance.getClass().getMethods();
        for (Method method : methods) {
            if (method.getName().startsWith("set") && method.getName().length() > 3
                    && method.getParameterTypes().length == 1
                    && Modifier.isPublic(method.getModifiers())) {
                try {
                    Class<?> pt = method.getParameterTypes()[0];
                    Object object = get(pt);  // 递归

                    if (object != null) {
                        method.invoke(instance, object);
                    }
                } catch (IllegalAccessException | InvocationTargetException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    /**
     * 从配置文件中取出实现者名称与类型对应map
     *
     * @return
     */
    private static ConcurrentMap<String,Class<?>> getServiceClass() {
        try {
            if (SERVICE_CLASS == null) {
                synchronized (ServiceLoader.class) {
                    if (SERVICE_CLASS == null) {
                        SERVICE_CLASS = new ConcurrentHashMap<>();

                        InputStream is = ServiceLoader.class.getClassLoader().getResourceAsStream("config.properties");
                        Properties p = new Properties();
                        p.load(is);

                        Set<String> keys = p.stringPropertyNames();
                        for (String key : keys) {
                            Class<?> clazz = Class.forName(String.valueOf(p.get(key)));
                            SERVICE_CLASS.putIfAbsent(key, clazz);
                        }
                    }
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }

        return SERVICE_CLASS;
    }

    public static void main(String[] args) {
        Human human = ServiceLoader.get(Human.class);
        System.out.println("class : " + human.getClass().getName());
        System.out.println(human.sayHello());

        Car car = ServiceLoader.get(Car.class);
        System.out.println("class : " + car.getClass().getName());
        car.driver("大卡车");

        Book book = ServiceLoader.get(Book.class);
        System.out.println("class : " + book.getClass().getName());
        System.out.println(book.read());
    }
}

运行后输出:

class : com.serviceloader.service.Man
hello man
class : com.serviceloader.service.Honda
i am 大卡车
class : com.serviceloader.service.English
hello my name is denis

原文地址:https://www.cnblogs.com/amwyyyy/p/8353523.html

时间: 2024-11-06 07:44:13

Dubbo源码阅读笔记3的相关文章

dubbo源码阅读笔记--服务调用时序

上接dubbo源码阅读笔记--暴露服务时序,继续梳理服务调用时序,下图右面红线流程. 整理了调用时序图 分为3步,connect,decode,invoke. 连接 AllChannelHandler.connected(Channel) line: 38 HeartbeatHandler.connected(Channel) line: 47 MultiMessageHandler(AbstractChannelHandlerDelegate).connected(Channel) line:

dubbo源码阅读笔记--暴露服务时序

本文许多内容来源于http://dubbo.io/Developer+Guide-zh.htm#DeveloperGuide-zh-%E8%B0%83%E7%94%A8%E9%93%BE,感谢作者的分享. dubbo的整体架构图中包含了很多内容,这里只分析服务暴露部分. 针对服务暴露,对上面的图做了些简化 时序图 分成六个阶段,下面分别列出每个阶段的调用堆栈. 1 初始化接口invoker JavassistProxyFactory.getInvoker(T, Class<T>, URL) l

Dubbo源码阅读笔记2

### 消费方初始化 消费方初始化的入口在ReferenceConfig类的get方法 前面基本和服务方的初始化一致 public class ReferenceConfig<T> extends AbstractReferenceConfig { private static final long serialVersionUID = -5864351140409987595L; private static final Protocol refprotocol = ExtensionLoa

Dubbo源码阅读笔记4

### 发布服务到本地 发布本地服务的代码在ServiceConfig.doExportUrlsFor1Protocol方法里 主要代码如下 // 通过动态代理工厂生成实现类调用器Invoker, Invoker相当于动态代理类 Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url); // 只是将代理和配置放到一起 DelegateProviderMetaDataInvoker wrappe

CI框架源码阅读笔记3 全局函数Common.php

从本篇开始,将深入CI框架的内部,一步步去探索这个框架的实现.结构和设计. Common.php文件定义了一系列的全局函数(一般来说,全局函数具有最高的加载优先权,因此大多数的框架中BootStrap引导文件都会最先引入全局函数,以便于之后的处理工作). 打开Common.php中,第一行代码就非常诡异: if ( ! defined('BASEPATH')) exit('No direct script access allowed'); 上一篇(CI框架源码阅读笔记2 一切的入口 index

源码阅读笔记 - 1 MSVC2015中的std::sort

大约寒假开始的时候我就已经把std::sort的源码阅读完毕并理解其中的做法了,到了寒假结尾,姑且把它写出来 这是我的第一篇源码阅读笔记,以后会发更多的,包括算法和库实现,源码会按照我自己的代码风格格式化,去掉或者展开用于条件编译或者debug检查的宏,依重要程度重新排序函数,但是不会改变命名方式(虽然MSVC的STL命名实在是我不能接受的那种),对于代码块的解释会在代码块前(上面)用注释标明. template<class _RanIt, class _Diff, class _Pr> in

CI框架源码阅读笔记5 基准测试 BenchMark.php

上一篇博客(CI框架源码阅读笔记4 引导文件CodeIgniter.php)中,我们已经看到:CI中核心流程的核心功能都是由不同的组件来完成的.这些组件类似于一个一个单独的模块,不同的模块完成不同的功能,各模块之间可以相互调用,共同构成了CI的核心骨架. 从本篇开始,将进一步去分析各组件的实现细节,深入CI核心的黑盒内部(研究之后,其实就应该是白盒了,仅仅对于应用来说,它应该算是黑盒),从而更好的去认识.把握这个框架. 按照惯例,在开始之前,我们贴上CI中不完全的核心组件图: 由于BenchMa

CI框架源码阅读笔记2 一切的入口 index.php

上一节(CI框架源码阅读笔记1 - 环境准备.基本术语和框架流程)中,我们提到了CI框架的基本流程,这里这次贴出流程图,以备参考: 作为CI框架的入口文件,源码阅读,自然由此开始.在源码阅读的过程中,我们并不会逐行进行解释,而只解释核心的功能和实现. 1.       设置应用程序环境 define('ENVIRONMENT', 'development'); 这里的development可以是任何你喜欢的环境名称(比如dev,再如test),相对应的,你要在下面的switch case代码块中

Apache Storm源码阅读笔记

欢迎转载,转载请注明出处. 楔子 自从建了Spark交流的QQ群之后,热情加入的同学不少,大家不仅对Spark很热衷对于Storm也是充满好奇.大家都提到一个问题就是有关storm内部实现机理的资料比较少,理解起来非常费劲. 尽管自己也陆续对storm的源码走读发表了一些博文,当时写的时候比较匆忙,有时候衔接的不是太好,此番做了一些整理,主要是针对TridentTopology部分,修改过的内容采用pdf格式发布,方便打印. 文章中有些内容的理解得益于徐明明和fxjwind两位的指点,非常感谢.