Dubbo的SPI机制与JDK机制的不同及原理分析

从今天开始,将会逐步介绍关于DUbbo的有关知识。首先先简单介绍一下DUbbo的整体概述。

概述

Dubbo是SOA(面向服务架构)服务治理方案的核心框架。用于分布式调用,其重点在于分布式的治理。
简单的来说,可以把它分为四个角色。服务提供方(Provider)、服务消费方(Consumer)、注册中心和监控中心。通过注册中心对服务进行注册和订阅,通过监控中心对服务进行监控。
核心功能

  • Remoting:远程通讯,提供对多种NIO框架抽象封装,包括“同步转异步”和“请求-响应”模式的信息交换方式。
  • Cluster: 服务框架,提供基于接口方法的透明远程过程调用,包括多协议支持,以及软负载均衡,失败容错,地址路由,动态配置等集群支持。
  • Registry: 服务注册,基于注册中心目录服务,使服务消费方能动态的查找服务提供方,使地址透明,使服务提供方可以平滑增加或减少机器。

Dubbo组件角色

Provider: 暴露服务的服务提供方
Consumer: 调用远程服务的服务消费方
Registry: 服务注册与发现的注册中心
Monitor: 统计服务的调用次数和调用时间的监控中心
Container: 服务运行容器,常见的容器有Spring容器

调用关系:

  1. 服务容器负责启动,加载,运行服务提供者
  2. 服务提供者在启动时,向注册中心注册自己提供的服务。
  3. 服务消费者在启动时,向注册中心订阅自己所需的服务。
  4. 注册中心返回服务提供者地址列表消费者,如果有变更,注册中心将基于长连接推送变更数据给消费者。
  5. 服务消费者,从提供者地址列表中,基于软负载均衡算法,选一台提供者进行调用,如果调用失败,再选另一台调用。
  6. 服务消费者和提供者,在内存中累计调用次数和调用时间,定时每分钟发送一次统计数据到监控中心Monitor。

SPI(Service Provider Interfaces)

它是Java提供的一套用来被第三方实现或者扩展的API,它可以用来启用框架扩展和替换组件。在JDK文档中,它这样解释道:

A service is a well-known set of interfaces and (usually abstract) classes. A service provider is a specific implementation of a service.

在面向对象的设计里面,模块之间推荐是基于接口编程,而不是对实现类进行硬编码,这样做也是为了模块设计的可拔插原则。为了在模块装配的时候不再程序里指明是那个实现,就需要一种服务发现的机制,jDK的SPI就是为某个接口寻找服务实现。

Java SPI实际上就是基于接口的编程+策略模式+配置文件组合实现的动态加载机制。
它为某个接口寻找服务实现的机制。有点类似IOC的思想,就是将装配的控制权移到程序之外,在模式化设计中这个机制尤其重要,所以它的核心思想是解耦

使用场景

  • 数据库驱动加载接口实现类的加载
    JDBC加载不同类型数据库的驱动
  • 日志门面接口实现类加载
    SLF4J加载不同提供商的日志实现类
  • Spring
  • Dubbo

使用说明

  1. 当服务提供者提供了接口的一种具体实现后,在jar包的META-INF/service目录下创建一个以"接口全限定名"为命名的文件,内容为实现类的全限定名。
  2. 接口实现类所在的jar包放在主程序的classpath中
  3. 主程序通过java.util.ServiceLoader动态加载实现模板,它通过扫描META-INF/services目录下的配置文件找到实现类的全限定名,把类加载到JVM
  4. SPI的实现类必须携带一个不带参数的构造方法
public final class ServiceLoader<S> implements Iterable<S>
{

    private static final String PREFIX = "META-INF/services/";

    // 代表被加载的类或者接口
    private final Class<S> service;
    // 用于定位、加载和实例化providers的类加载器
    private final ClassLoader loader;
    // 创建ServiceLoader时采用的访问控制上下文
    private final AccessControlContext acc;
    // 缓存providers,按照实例化的顺序排序
    private LinkedHashMap<String,S> providers = new LinkedHashMap<>();
    // 懒查找迭代器
    private LazyIterator lookupIterator;

    //重新加载,就相当于重新创建ServiceLoader了,用于新的服务提供者安装到正在运行的Java虚拟机
    public void reload() {
        //清空缓存中所有已实例化的服务提供者
        providers.clear();
        //新建一个迭代器,该迭代器会从头查找和实例化服务提供者。
        lookupIterator = new LazyIterator(service, loader);
    }

    /**
    ** 私有构造器
    ** 使用指定的类加载器和服务创建服务加载器
    ** 如果没有指定类加载器,使用系统类加载器,就是应用类加载器
    **/
    private ServiceLoader(Class<S> svc, ClassLoader cl) {
        service = Objects.requireNonNull(svc, "Service interface cannot be null");
        loader = (cl == null) ? ClassLoader.getSystemClassLoader() : cl;
        acc = (System.getSecurityManager() != null) ? AccessController.getContext() : null;
        reload();
    }

   //解析失败处理的方法
    private static void fail(Class<?> service, String msg, Throwable cause)
        throws ServiceConfigurationError
    {
        throw new ServiceConfigurationError(service.getName() + ": " + msg,
                                            cause);
    }

    private static void fail(Class<?> service, String msg)
        throws ServiceConfigurationError
    {
        throw new ServiceConfigurationError(service.getName() + ": " + msg);
    }

    private static void fail(Class<?> service, URL u, int line, String msg)
        throws ServiceConfigurationError
    {
        fail(service, u + ":" + line + ": " + msg);
    }

    //解析服务提供者配置文件中的一行
    //首先去掉注释检验,然后保存
    //返回下一行行号
    //重复的配置项不会被保存
    private int parseLine(Class<?> service, URL u, BufferedReader r, int lc,
                          List<String> names)
        throws IOException, ServiceConfigurationError
    {
        String ln = r.readLine();
        if (ln == null) {
            return -1;
        }
        int ci = ln.indexOf('#');
        if (ci >= 0) ln = ln.substring(0, ci);
        ln = ln.trim();
        int n = ln.length();
        if (n != 0) {
            if ((ln.indexOf(' ') >= 0) || (ln.indexOf('\t') >= 0))
                fail(service, u, lc, "Illegal configuration-file syntax");
            int cp = ln.codePointAt(0);
            if (!Character.isJavaIdentifierStart(cp))
                fail(service, u, lc, "Illegal provider-class name: " + ln);
            for (int i = Character.charCount(cp); i < n; i += Character.charCount(cp)) {
                cp = ln.codePointAt(i);
                if (!Character.isJavaIdentifierPart(cp) && (cp != '.'))
                    fail(service, u, lc, "Illegal provider-class name: " + ln);
            }
            if (!providers.containsKey(ln) && !names.contains(ln))
                names.add(ln);
        }
        return lc + 1;
    }

  //解析配置文件,解析指定的url配置文件
  //使用parseLine方法进行解析,未被实例化的服务提供者会被保存到缓存中。
    private Iterator<String> parse(Class<?> service, URL u)
        throws ServiceConfigurationError
    {
        InputStream in = null;
        BufferedReader r = null;
        ArrayList<String> names = new ArrayList<>();
        try {
            in = u.openStream();
            r = new BufferedReader(new InputStreamReader(in, "utf-8"));
            int lc = 1;
            while ((lc = parseLine(service, u, r, lc, names)) >= 0);
        } catch (IOException x) {
            fail(service, "Error reading configuration file", x);
        } finally {
            try {
                if (r != null) r.close();
                if (in != null) in.close();
            } catch (IOException y) {
                fail(service, "Error closing configuration file", y);
            }
        }
        return names.iterator();
    }

    //服务提供者查找的迭代器
    private class LazyIterator implements Iterator<S>
    {
        //服务提供者接口
        Class<S> service;
        //类加载器
        ClassLoader loader;
        //保存实现类的url
        Enumeration<URL> configs = null;
        //保存实现类的全名
        Iterator<String> pending = null;
        //迭代器中下一个实现类的全名
        String nextName = null;

        private LazyIterator(Class<S> service, ClassLoader loader) {
            this.service = service;
            this.loader = loader;
        }

        private boolean hasNextService() {
            if (nextName != null) {
                return true;
            }
            if (configs == null) {
                try {
                    String fullName = PREFIX + service.getName();
                    if (loader == null)
                        configs = ClassLoader.getSystemResources(fullName);
                    else
                        configs = loader.getResources(fullName);
                } catch (IOException x) {
                    fail(service, "Error locating configuration files", x);
                }
            }
            while ((pending == null) || !pending.hasNext()) {
                if (!configs.hasMoreElements()) {
                    return false;
                }
                pending = parse(service, configs.nextElement());
            }
            nextName = pending.next();
            return true;
        }

        private S nextService() {
            if (!hasNextService())
                throw new NoSuchElementException();
            String cn = nextName;
            nextName = null;
            Class<?> c = null;
            try {
                c = Class.forName(cn, false, loader);
            } catch (ClassNotFoundException x) {
                fail(service,
                     "Provider " + cn + " not found");
            }
            if (!service.isAssignableFrom(c)) {
                fail(service,
                     "Provider " + cn  + " not a subtype");
            }
            try {
                S p = service.cast(c.newInstance());
                providers.put(cn, p);
                return p;
            } catch (Throwable x) {
                fail(service,
                     "Provider " + cn + " could not be instantiated",
                     x);
            }
            throw new Error();          // This cannot happen
        }

        public boolean hasNext() {
            if (acc == null) {
                return hasNextService();
            } else {
                PrivilegedAction<Boolean> action = new PrivilegedAction<Boolean>() {
                    public Boolean run() { return hasNextService(); }
                };
                return AccessController.doPrivileged(action, acc);
            }
        }

        public S next() {
            if (acc == null) {
                return nextService();
            } else {
                PrivilegedAction<S> action = new PrivilegedAction<S>() {
                    public S run() { return nextService(); }
                };
                return AccessController.doPrivileged(action, acc);
            }
        }

        public void remove() {
            throw new UnsupportedOperationException();
        }

    }
    //获取迭代器
    //返回遍历服务提供者的迭代器
    //以懒加载的方式加载可用的服务提供者
    //懒加载的实现是:解析配置文件和实例化服务提供者的工作由迭代器本身完成
    public Iterator<S> iterator() {
        return new Iterator<S>() {

            Iterator<Map.Entry<String,S>> knownProviders
                = providers.entrySet().iterator();

            public boolean hasNext() {
                if (knownProviders.hasNext())
                    return true;
                return lookupIterator.hasNext();
            }

            public S next() {
                if (knownProviders.hasNext())
                    return knownProviders.next().getValue();
                return lookupIterator.next();
            }

            public void remove() {
                throw new UnsupportedOperationException();
            }

        };
    }

   //为指定的服务使用指定的类加载器来创建一个ServiceLoader
    public static <S> ServiceLoader<S> load(Class<S> service,
                                            ClassLoader loader)
    {
        return new ServiceLoader<>(service, loader);
    }

   //使用线程上下文的类加载器来创建一个ServiceLoader
    public static <S> ServiceLoader<S> load(Class<S> service) {
        ClassLoader cl = Thread.currentThread().getContextClassLoader();
        return ServiceLoader.load(service, cl);
    }

   //使用扩展类加载器为指定的服务创建ServiceLoader
   //只能找到并加载已经安装到当前Java虚拟机中的服务提供者,应用程序类路径中的服务提供者将被忽略
    public static <S> ServiceLoader<S> loadInstalled(Class<S> service) {
        ClassLoader cl = ClassLoader.getSystemClassLoader();
        ClassLoader prev = null;
        while (cl != null) {
            prev = cl;
            cl = cl.getParent();
        }
        return ServiceLoader.load(service, prev);
    }

    /**
     * Returns a string describing this service.
     *
     * @return  A descriptive string
     */
    public String toString() {
        return "java.util.ServiceLoader[" + service.getName() + "]";
    }

}

ServiceLoader不是实例化以后,就去读取文件的具体实现。而是等到使用迭代器去遍历的时候,才会加载对应的配置文件去解析,调用hasNext方法时就去加载配置文件进行解析,调用Next方法的时候进行实例化并缓存。

优点
使用Java SPI机制的优势是实现解耦,使得第三方服务模块的装配控制的逻辑与调用者的业务代码分离,而不是耦合在一起。应用程序可以根据实际业务情况启用框架扩展或替代框架组件。

缺点
虽然ServiceLoader也算是使用的延迟加载,但是基本只能通过遍历全部获取,也就是接口的实现类全部加载并实例化一遍。如果你并不想用某些实现类,它也被加载并实例化了,这就造成了浪费。获取某个实现类的方式不够灵活,只能通过Iterator形式获取,不能根据某个参数来获取对应的实现类。
多个并发多线程使用ServiceLoader类的实例是不安全的。

Dubbo的SPI机制

从图中可以看出,Dubbo进行各个模块的扩展时,是通过ExtensionLoader与扩展点进行关联的。
在Dubbo中的扩展点需要满足以下几个特点:

  1. 扩展点必须是Interface类型,必须被@SPI注释
  2. 配置文件存储在META-INF/services/META-INF/dubbo/META-INF/dubbo/internal,这些路径下定义的文件名为扩展点接口的全类名,文件中以键值对的形式配置扩展点的扩展实现,这与JDk SPI的存储形式有很大不同,所以在Dubbo中无法直接使用ServiceLoader, 而是使用ExtensionLoader,可用于载入Dubbo中的各种可配置组件,比如动态代理方式(ProxyFactory)、负载均衡策略(LoadBalance)、RCP协议(Protocol)、拦截器(Filter)、容器类型(Container)、集群方式(Cluster)和注册中心类型等。
    META-INF/dubbo/internal/com.alibaba.dubbo.common.extension.ExtensionFactory 中定义的扩展 :
adaptive = com.alibaba.dubbo.common.extension.factory.AdaptiveExtensionFactory
spi = com.alibaba.dubbo.common.extension.factory.SpiExtensionFactory
spring = com.alibaba.dubbo.config.spring.extension.SpringExtensionFactor

在标识扩展点时会用到这几个标识,@SPI 、 @Adaptive、 @Activate

@SPI (注解在类上):该注解标识了接口是一个扩展点,属性value用来指定默认适配扩展点的名称。
@Activate(注解在类型和方法上):@Activate注解在扩展点的实现类上,表示了一个扩展类被获取到的条件,符合条件就被获取,不符合条件就不获取,根据@Activate中的group、value属性来过滤。
@Adaptive(注解在类型和方法上):如果注解在类上,这个类就是缺省的适配扩展。注解在扩展点Interface的方法上时,dubbo会动态的生成一个这个扩展点的适配扩展类(生成代码,动态编译实例化Class),名称为扩展点Interface的简单类名+$Adaptive,这样做的目的是为了在运行时去适配不同的扩展实例,在运行时通过传入的URL类型的参数或者内部含有获取URL方法的参数,从URL中获取到要使用的扩展类的名称,再去根据名称加载对应的扩展实例,用这个扩展实例对象调用相同的方法。如果运行时没有适配到运行的扩展实例,那么就使用@SPI注解缺省指定的扩展。通过这种方式就实现了运行时去适配到对应的扩展。
我们随机找一个源码中定义的接口: Transporter

@SPI("netty")
public interface Transporter {
    // 绑定一个服务器
    @Adaptive({Constants.SERVER_KEY, Constants.TRANSPORTER_KEY})
    Server bind(URL url, ChannelHandler handler) throws RemotingException;

    // 连接一个服务器,即创建一个客户端
    @Adaptive({Constants.CLIENT_KEY, Constants.TRANSPORTER_KEY})
    Client connect(URL url, ChannelHandler handler) throws RemotingException;
}

ExtensionLoader会通过createAdaptiveExtensionClassCode方法动态生成一个Transporter$Adaptive类,生成的代码如下:

package com.alibaba.dubbo.remoting;
import com.alibaba.dubbo.common.extension.ExtensionLoader;
public class Transporter$Adaptive implements com.alibaba.dubbo.remoting.Transporter{

public com.alibaba.dubbo.remoting.Client connect(com.alibaba.dubbo.common.URL arg0, com.alibaba.dubbo.remoting.ChannelHandler arg1) throws com.alibaba.dubbo.remoting.RemotingException {
        //URL参数为空则抛出异常。
        if (arg0 == null)
            throw new IllegalArgumentException("url == null");

        com.alibaba.dubbo.common.URL url = arg0;
        String extName = url.getParameter("client", url.getParameter("transporter", "netty"));
        if(extName == null)
            throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.remoting.Transporter) name from url(" + url.toString() + ") use keys([client, transporter])");
        com.alibaba.dubbo.remoting.Transporter extension = (com.alibaba.dubbo.remoting.Transporter)ExtensionLoader.getExtensionLoader

        (com.alibaba.dubbo.remoting.Transporter.class).getExtension(extName);
        return extension.connect(arg0, arg1);
    }
    public com.alibaba.dubbo.remoting.Server bind(com.alibaba.dubbo.common.URL arg0, com.alibaba.dubbo.remoting.ChannelHandler arg1) throws com.alibaba.dubbo.remoting.RemotingException {
        if (arg0 == null)
            throw new IllegalArgumentException("url == null");
        com.alibaba.dubbo.common.URL url = arg0;
        String extName = url.getParameter("server", url.getParameter("transporter", "netty"));
        if(extName == null)
            throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.remoting.Transporter) name from url(" + url.toString() + ") use keys([server, transporter])");
        com.alibaba.dubbo.remoting.Transporter extension = (com.alibaba.dubbo.remoting.Transporter)ExtensionLoader.getExtensionLoader
        (com.alibaba.dubbo.remoting.Transporter.class).getExtension(extName);

        return extension.bind(arg0, arg1);
    }
}

这些代码都是模板代码,最核心的代码只有一行,是为了去获取指定名称的扩展实例对象。
com.alibaba.dubbo.remoting.Transporter extension = (com.alibaba.dubbo.remoting.Transporter)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.remoting.Transporter.class).getExtension(extName);

扩展加载器 ExtensionLoader

它控制着所有扩展点的初始化、加载扩展的过程。
ExtensionLoader中会存储两个静态属性,EXTENSION_LOADERS保存内核开放的扩展点对应的ExtensionLoader实例对象;EXTENSION_INSTANCES保存了扩展类型(Class)和扩展类型的实例对象

private static final Logger logger = LoggerFactory.getLogger(ExtensionLoader.class);

    //这是jdk的SPI扩展机制中配置文件路径,dubbo为了兼容jdk的SPI
    private static final String SERVICES_DIRECTORY = "META-INF/services/";

    //用于用户自定义的扩展实现配置文件存放路径
    private static final String DUBBO_DIRECTORY = "META-INF/dubbo/";

    //用于dubbo内部提供的扩展实现配置文件存放路径
    private static final String DUBBO_INTERNAL_DIRECTORY = DUBBO_DIRECTORY + "internal/";

    private static final Pattern NAME_SEPARATOR = Pattern.compile("\\s*[,]+\\s*");

    //扩展加载器集合,key为扩展接口,例如Protocol等
    private static final ConcurrentMap<Class<?>, ExtensionLoader<?>> EXTENSION_LOADERS = new ConcurrentHashMap<Class<?>, ExtensionLoader<?>>();

    //扩展实现集合,key为扩展实现类,value为扩展对象
    //例如key为Class<DubboProtocol>,value为DubboProtocol对象
    private static final ConcurrentMap<Class<?>, Object> EXTENSION_INSTANCES = new ConcurrentHashMap<Class<?>, Object>();
    //扩展接口,例如Protocol等
    private final Class<?> type;

    //对象工厂,获得扩展实现的实例,用于injectExtension方法中将扩展实现类的实例注入到相关的依赖属性。
    //比如StubProxyFactoryWrapper类中有Protocol protocol属性,就是通过set方法把Protocol的实现类实例赋值
    private final ExtensionFactory objectFactory;

    //以下提到的扩展名就是在配置文件中的key值,类似于“dubbo”等

    //缓存的扩展名与扩展类映射,和cachedClasses的key和value对换。
    private final ConcurrentMap<Class<?>, String> cachedNames = new ConcurrentHashMap<Class<?>, String>();

    //缓存的扩展实现类集合
    private final Holder<Map<String, Class<?>>> cachedClasses = new Holder<Map<String, Class<?>>>();

    //扩展名与加有@Activate的自动激活类的映射
    private final Map<String, Activate> cachedActivates = new ConcurrentHashMap<String, Activate>();

    //缓存的扩展对象集合,key为扩展名,value为扩展对象
    //例如Protocol扩展,key为dubbo,value为DubboProcotol
    private final ConcurrentMap<String, Holder<Object>> cachedInstances = new ConcurrentHashMap<String, Holder<Object>>();

    //缓存的自适应( Adaptive )扩展对象,例如例如AdaptiveExtensionFactory类的对象
    private final Holder<Object> cachedAdaptiveInstance = new Holder<Object>();

    //缓存的自适应扩展对象的类,例如AdaptiveExtensionFactory类
    private volatile Class<?> cachedAdaptiveClass = null;

    //缓存的默认扩展名,就是@SPI中设置的值
    private String cachedDefaultName;

    //创建cachedAdaptiveInstance异常
    private volatile Throwable createAdaptiveInstanceError;

    //拓展Wrapper实现类集合
    private Set<Class<?>> cachedWrapperClasses;

    //拓展名与加载对应拓展类发生的异常的映射
    private Map<String, IllegalStateException> exceptions = new ConcurrentHashMap<String, IllegalStateException>();

ExtensionLoader没有提供public的构造方法,有一个私有的构造方法,获取ExtensionLoader实例的工厂方法,但是提供了一个public static的getExtensionLoader。其public成员方法中有三个比较重要的方法:
getActiveExtension: 根据条件获取当前扩展可自动激活的实现
getExtension: 根据名称获取当前扩展的指定实现
getAdaptiveExtension: 获取当前扩展的自适应实现

 private ExtensionLoader(Class<?> type) {
        this.type = type;
        objectFactory = (type == ExtensionFactory.class ? null : ExtensionLoader.getExtensionLoader(ExtensionFactory.class).getAdaptiveExtension());
    }
@SPI
public interface ExtensionFactory {
    <T> T getExtension(Class<T> type, String name);

}

从上可以看出ExtensionFactory也是一个扩展点,有两个实现类:SpiExtensionFactoryAdaptiveExtensionFactory,实际上还有一个SpringExtensionFactory,不同的实现类可以用不同的方式来完成扩展点实现的加载。如果要加载的扩展点类型是ExtensionFactory,那么object设置为null。
默认的ExtensionFactory实现中,AdaptiveExtensionFactory被@Adaptive注解注释,也就是说它是ExtensionFactory对应的自适应扩展实现(每个扩展点最多只能有一个自适应实现,如果所有实现中没有被@Adaptive注释的,那么dubbo会动态生成一个自适应实现类)

@Adaptive
public class AdaptiveExtensionFactory implements ExtensionFactory {

    //扩展对象的集合,默认的可以分为dubbo 的SPI中接口实现类对象或者Spring bean对象
    private final List<ExtensionFactory> factories;

    public AdaptiveExtensionFactory() {
        ExtensionLoader<ExtensionFactory> loader = ExtensionLoader.getExtensionLoader(ExtensionFactory.class);
        List<ExtensionFactory> list = new ArrayList<ExtensionFactory>();
        //遍历所有支持的扩展名
        for (String name : loader.getSupportedExtensions()) {
            //扩展对象加入到集合中
            list.add(loader.getExtension(name));
        }
        //返回一个不可修改的集合
        factories = Collections.unmodifiableList(list);
    }

    @Override
    public <T> T getExtension(Class<T> type, String name) {
        for (ExtensionFactory factory : factories) {
            //通过扩展接口和扩展名获得扩展对象
            T extension = factory.getExtension(type, name);
            if (extension != null) {
                return extension;
            }
        }
        return null;
    }

}

上述代码中调用到了ExtensionLoader类中的getSupportedExtensions方法,所以接下来再分析ExtensionLoader类。

 public static <T> ExtensionLoader<T> getExtensionLoader(Class<T> type) {
        //扩展点接口为空,抛出异常
        if (type == null)
            throw new IllegalArgumentException("Extension type == null");
        //判断type是否是一个接口类
        if (!type.isInterface()) {
            throw new IllegalArgumentException("Extension type(" + type + ") is not interface!");
        }
        //判断是否为可扩展的接口
        if (!withExtensionAnnotation(type)) {
            throw new IllegalArgumentException("Extension type(" + type +
                    ") is not extension, because WITHOUT @" + SPI.class.getSimpleName() + " Annotation!");
        }

        //从扩展加载器集合中取出扩展接口对应的扩展加载器
        ExtensionLoader<T> loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type);

        //如果为空,则创建该扩展接口的扩展加载器,并且添加到EXTENSION_LOADERS
        if (loader == null) {
            EXTENSION_LOADERS.putIfAbsent(type, new ExtensionLoader<T>(type));
            loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type);
        }
        return loader;
    }
public T getAdaptiveExtension() {
        Object instance = cachedAdaptiveInstance.get();
        if (instance == null) {
            if (createAdaptiveInstanceError == null) {
                synchronized (cachedAdaptiveInstance) {
                    instance = cachedAdaptiveInstance.get();
                    if (instance == null) {
                        try {
                            //创建适配器对象
                            instance = createAdaptiveExtension();
                            cachedAdaptiveInstance.set(instance);
                        } catch (Throwable t) {
                            createAdaptiveInstanceError = t;
                            throw new IllegalStateException("fail to create adaptive instance: " + t.toString(), t);
                        }
                    }
                }
            } else {
                throw new IllegalStateException("fail to create adaptive instance: " + createAdaptiveInstanceError.toString(), createAdaptiveInstanceError);
            }
        }

        return (T) instance;
    }

在ExtensionLoader的私有构造方法中可以看出,在选择ExtensionFactory的时候,并不是用getExtension(name)来获取某个具体的实现类,而是调用getAdaptiveExtension来获取一个自适应的实现。
首先检查缓存的adaptiveInstance是否存在,如果存在则直接使用,否则的话调用createAdaptiveExtension方法来创建新的adaptiveInstance并且缓存起来,也就是说对于某个扩展点,每次调用ExtensionLoader.getAdaptiveExtension获取到的都是同一个实例。
在调用getAdaptiveExtensionClass中首先调用getExtensionClasses()
在getAdaptiveExtensionClass()中,调用getExtensionClasses()获取扩展实现类数组,并存放在cachedClasses属性中。
再从getExtensionClasses()看,当cachedClasses为空时,调用loadExtensionClasses()
getExtensionClasses()会加载当前Extension的所有实现,如果有@Adaptive类型,则会赋值给cachedAdaptiveClass属性缓存起来,如果没有找到@Adaptive类型实现,则动态创建一个AdaptiveExtensionClass。

首先会获取到该扩展点类的注解中的值,获取默认值,然后从特定目录下读取配置文件中的信息,
最后通过loadClass,将有关类放到extensionClasses变量中

 private T createAdaptiveExtension() {
        try {
            return injectExtension((T) getAdaptiveExtensionClass().newInstance());
        } catch (Exception e) {
            throw new IllegalStateException("Can not create adaptive extension " + type + ", cause: " + e.getMessage(), e);
        }
    }
 private Class<?> getAdaptiveExtensionClass() {
        getExtensionClasses();
        //缓存的自适应扩展对象
        if (cachedAdaptiveClass != null) {
            return cachedAdaptiveClass;
        }
        return cachedAdaptiveClass = createAdaptiveExtensionClass();
    }
private Map<String, Class<?>> getExtensionClasses() {
        Map<String, Class<?>> classes = cachedClasses.get();
        if (classes == null) {
            synchronized (cachedClasses) {
                classes = cachedClasses.get();
                if (classes == null) {
                    classes = loadExtensionClasses();
                    cachedClasses.set(classes);
                }
            }
        }
        return classes;
    }

private Map<String, Class<?>> loadExtensionClasses() {
        final SPI defaultAnnotation = type.getAnnotation(SPI.class);
        if (defaultAnnotation != null) {
            //@SPI内的默认值
            String value = defaultAnnotation.value();
            if ((value = value.trim()).length() > 0) {
                String[] names = NAME_SEPARATOR.split(value);
                //只允许有一个默认值
                if (names.length > 1) {
                    throw new IllegalStateException("more than 1 default extension name on extension " + type.getName()
                            + ": " + Arrays.toString(names));
                }
                if (names.length == 1) cachedDefaultName = names[0];
            }
        }

        //从配置文件中加载实现类数组
        Map<String, Class<?>> extensionClasses = new HashMap<String, Class<?>>();
        loadDirectory(extensionClasses, DUBBO_INTERNAL_DIRECTORY);
        loadDirectory(extensionClasses, DUBBO_DIRECTORY);
        loadDirectory(extensionClasses, SERVICES_DIRECTORY);
        return extensionClasses;
    }

private void loadDirectory(Map<String, Class<?>> extensionClasses, String dir) {
        //拼接接口全限定名,得到完整的文件名
        String fileName = dir + type.getName();
        try {
            Enumeration<java.net.URL> urls;
            //获取ExtensionLoader类信息
            ClassLoader classLoader = findClassLoader();
            if (classLoader != null) {
                urls = classLoader.getResources(fileName);
            } else {
                urls = ClassLoader.getSystemResources(fileName);
            }
            if (urls != null) {
                //遍历文件
                while (urls.hasMoreElements()) {
                    java.net.URL resourceURL = urls.nextElement();
                    loadResource(extensionClasses, classLoader, resourceURL);
                }
            }
        } catch (Throwable t) {
            logger.error("Exception when load extension class(interface: " +
                    type + ", description file: " + fileName + ").", t);
        }
    }

private void loadResource(Map<String, Class<?>> extensionClasses, ClassLoader classLoader, java.net.URL resourceURL) {
        try {
            BufferedReader reader = new BufferedReader(new InputStreamReader(resourceURL.openStream(), "utf-8"));
            try {
                String line;
                while ((line = reader.readLine()) != null) {
                    //跳过被#注释的内容
                    final int ci = line.indexOf('#');
                    if (ci >= 0) line = line.substring(0, ci);
                    line = line.trim();
                    if (line.length() > 0) {
                        try {
                            String name = null;
                            int i = line.indexOf('=');
                            if (i > 0) {
                                //根据"="拆分key跟value
                                name = line.substring(0, i).trim();
                                line = line.substring(i + 1).trim();
                            }
                            if (line.length() > 0) {
                                //加载扩展类
                                loadClass(extensionClasses, resourceURL, Class.forName(line, true, classLoader), name);
                            }
                        } catch (Throwable t) {
                            IllegalStateException e = new IllegalStateException("Failed to load extension class(interface: " + type + ", class line: " + line + ") in " + resourceURL + ", cause: " + t.getMessage(), t);
                            exceptions.put(line, e);
                        }
                    }
                }
            } finally {
                reader.close();
            }
        } catch (Throwable t) {
            logger.error("Exception when load extension class(interface: " +
                    type + ", class file: " + resourceURL + ") in " + resourceURL, t);
        }
    }

private void loadClass(Map<String, Class<?>> extensionClasses, java.net.URL resourceURL, Class<?> clazz, String name) throws NoSuchMethodException {
        //该类是否实现扩展接口
        if (!type.isAssignableFrom(clazz)) {
            throw new IllegalStateException("Error when load extension class(interface: " +
                    type + ", class line: " + clazz.getName() + "), class "
                    + clazz.getName() + "is not subtype of interface.");
        }
        //判断该类是否为扩展接口的适配器
        if (clazz.isAnnotationPresent(Adaptive.class)) {
            if (cachedAdaptiveClass == null) {
                cachedAdaptiveClass = clazz;
            } else if (!cachedAdaptiveClass.equals(clazz)) {
                throw new IllegalStateException("More than 1 adaptive class found: "
                        + cachedAdaptiveClass.getClass().getName()
                        + ", " + clazz.getClass().getName());
            }
        } else if (isWrapperClass(clazz)) {
            Set<Class<?>> wrappers = cachedWrapperClasses;
            if (wrappers == null) {
                cachedWrapperClasses = new ConcurrentHashSet<Class<?>>();
                wrappers = cachedWrapperClasses;
            }
            wrappers.add(clazz);
        } else {
            //通过反射获得构造器对象
            clazz.getConstructor();
            //未配置扩展名,自动生成,例如DemoFilter为 demo,主要用于兼容java SPI的配置。
            if (name == null || name.length() == 0) {
                name = findAnnotationName(clazz);
                if (name.length() == 0) {
                    throw new IllegalStateException("No such extension name for the class " + clazz.getName() + " in the config " + resourceURL);
                }
            }
            // 获得扩展名,可以是数组,有多个拓扩展名。
            String[] names = NAME_SEPARATOR.split(name);
            if (names != null && names.length > 0) {
                Activate activate = clazz.getAnnotation(Activate.class);
                //如果是自动激活的实现类,则加入到缓存
                if (activate != null) {
                    cachedActivates.put(names[0], activate);
                }
                for (String n : names) {
                    if (!cachedNames.containsKey(clazz)) {
                        cachedNames.put(clazz, n);
                    }
                    //缓存扩展实现类
                    Class<?> c = extensionClasses.get(n);
                    if (c == null) {
                        extensionClasses.put(n, clazz);
                    } else if (c != clazz) {
                        throw new IllegalStateException("Duplicate extension " + type.getName() + " name " + n + " on " + c.getName() + " and " + clazz.getName());
                    }
                }
            }
        }
    }

上述代码完成了自适应扩展点类型的实现和实例化,下面方法是扩展点自动注入的实现,它会获取处理当前实例的所有set方法对应的参数类型和property名称,根据这两个条件从ExtensionFactory中查询,如果有返回扩展点实例,那么就进行注入操作。

private T injectExtension(T instance) {
        try {
            if (objectFactory != null) {
                //反射获得该类中所有的方法
                for (Method method : instance.getClass().getMethods()) {
                    //如果是set方法
                    if (method.getName().startsWith("set")
                            && method.getParameterTypes().length == 1
                            && Modifier.isPublic(method.getModifiers())) {
                        /**
                         * Check {@link DisableInject} to see if we need auto injection for this property
                         */
                        if (method.getAnnotation(DisableInject.class) != null) {
                            continue;
                        }
                        Class<?> pt = method.getParameterTypes()[0];
                        try {
                            //获得属性,比如StubProxyFactoryWrapper类中有Protocol protocol属性,
                            String property = method.getName().length() > 3 ? method.getName().substring(3, 4).toLowerCase() + method.getName().substring(4) : "";
                            //获得属性值,比如Protocol对象,也可能是Bean对象
                            Object object = objectFactory.getExtension(pt, property);
                            if (object != null) {
                                //注入依赖属性
                                method.invoke(instance, object);
                            }
                        } catch (Exception e) {
                            logger.error("fail to inject via method " + method.getName()
                                    + " of interface " + type.getName() + ": " + e.getMessage(), e);
                        }
                    }
                }
            }
        } catch (Exception e) {
            logger.error(e.getMessage(), e);
        }
        return instance;
    }

文章参考:
dubbo源码一:ExtensionLoader及获取适配类过程解析
Dubbo扩展点加载机制 - ExtensionLoader
【Dubbo】Adaptive

原文地址:https://www.cnblogs.com/maratong/p/12333717.html

时间: 2024-10-28 13:16:08

Dubbo的SPI机制与JDK机制的不同及原理分析的相关文章

jdk和dubbo的SPI机制

前言:开闭原则一直是软件开发领域中所追求的,开闭原则中的"开"是指对于组件功能的扩展是开放的,是允许对其进行功能扩展的,“闭”,是指对于原有代码的修改是封闭的,即不应该修改原有的代码.对于一个高度集成化的.成熟.稳健的系统来讲,永远不是封闭.固守的,它需要向外提供一定的可扩展的能力,外部的实现类或者jar包都可以调用它.在面向对象的开发领域中,接口是对系统功能的高度抽象,因为SPI可谓是"应运而生",本篇博客就开始走进SPI,探究java自身的SPI和Dubbo的S

Dubbo的SPI机制(1)

插件机制是Dubbo用于可插拔地扩展底层的一些实现而定制的一套机制,比如dubbo底层的RPC协议.注册中心的注册方式等等.具体的实现方式是参照了JDK的SPI思想,在dubbo的jar内部的/META-INF/dubbo/internal下会有许多以接口名命名的文件,如图: 这些文件正是用于扩展用途的,我们称之为SPI描述文件.每个文件就代表一种扩展,文件打开后,在文件中会以行为单位配置该扩展接口的具体实现类,每个扩展可以有多个实现,以key-value的形式进行配置.例如打开/META-IN

Dubbo中SPI扩展机制解析

dubbo的SPI机制类似与Java的SPI,Java的SPI会一次性的实例化所有扩展点的实现,有点显得浪费资源. dubbo的扩展机制可以方便的获取某一个想要的扩展实现,每个实现都有自己的name,可以通过name找到具体的实现. 每个扩展点都有一个@Adaptive实例,用来注入到依赖这个扩展点的某些类中,运行时通过url参数去动态判断最终选择哪个Extension实例用. dubbo的SPI扩展机制增加了对扩展点自动装配(类似IOC)和自动包装(类似AOP)的支持. 标注了@Activat

类加载机制与jdk智能调优命令

类加载机制与JDK调优监控工具 类的生命周期 1.加载将.class文件从磁盘读到内存2.连接2.1 验证验证字节码文件的正确性2.2 准备给类的静态变量分配内存,并赋予默认值2.3 解析类装载器装入类所引用的其它所有类 3.初始化为类的静态变量赋予正确的初始值,上述的准备阶段为静态变量赋予的是虚拟机默认的初始值,此处赋予的才是程序编写者为变量分配的真正的初始值,执行静态代码块4.使用5.卸载 类加载器的种类 启动类加载器(Bootstrap ClassLoader)负责加载JRE的核心类库,如

分布式的几件小事(五)dubbo的spi思想是什么

1.什么是SPI机制 SPI 全称为 Service Provider Interface,是一种服务发现机制. SPI 的本质是将接口实现类的全限定名配置在文件中,并由服务加载器读取配置文件,加载实现类.这样可以在运行时,动态为接口替换实现类. 正因此特性,我们可以很容易的通过 SPI 机制为我们的程序提供拓展功能.SPI 机制在第三方框架中也有所应用,比如jdbc. java定义了一套jdbc的接口,但是java是没有提供jdbc的实现类. 但是实际上项目跑的时候,要使用jdbc接口的哪些实

ActiveMQ讯息传送机制以及ACK机制

http://blog.csdn.net/lulongzhou_llz/article/details/42270113 ActiveMQ消息传送机制以及ACK机制详解 AcitveMQ是作为一种消息存储和分发组件,涉及到client与broker端数据交互的方方面面,它不仅要担保消息的存储安全性,还要提供额外的手段来确保消息的分发是可靠的. 一. ActiveMQ消息传送机制 Producer客户端使用来发送消息的, Consumer客户端用来消费消息:它们的协同中心就是ActiveMQ br

【转载】Android 的 Handler 机制实现原理分析

handler在安卓开发中是必须掌握的技术,但是很多人都是停留在使用阶段.使用起来很简单,就两个步骤,在主线程重写handler的handleMessage( )方法,在工作线程发送消息.但是,有没有人想过这种技术是怎么实现的呢?下面我们一起探讨下. 先上图,让大家好理解下handler机制: handler机制示例图 上面一共出现了几种类,ActivityThread,Handler,MessageQueue,Looper,msg(Message),对这些类作简要介绍: ActivityThr

同步机制与异步机制的理解

同步机制与异步机制的理解 同步机制即在进行输入输出时,必须等待输入输出完毕后,才能进行后面的操作. 异步传输机制就不必等待完毕就可进行其它操作. 网络上有一个比较通俗的例子:请吃饭 同步就是我叫你吃饭,你听到了就立刻跟我去,若你没有反应,那我就不停的叫你,直到你回应.(同步的特点是我不能做其他任何的事情,专心等你...个人觉得女生肯定希望遇到此类的男生吧...) 异步就是我叫了你,然后我就去吃饭了,不管你听没听见.(异步的特点是不会等待正在执行的事件结束,他就可以执行其它的事件). 看了别人的帖

Redis数据持久化机制AOF原理分析二

Redis数据持久化机制AOF原理分析二 分类: Redis 2014-01-12 15:36  737人阅读  评论(0)  收藏  举报 redis AOF rewrite 目录(?)[+] 本文所引用的源码全部来自Redis2.8.2版本. Redis AOF数据持久化机制的实现相关代码是redis.c, redis.h, aof.c, bio.c, rio.c, config.c 在阅读本文之前请先阅读Redis数据持久化机制AOF原理分析之配置详解文章,了解AOF相关参数的解析,文章链