【Dubbo 源码解析】03_Dubbo Protocol&Filter

Protocol & Filter

Dubbo 服务暴露和服务引用都是通过的 com.alibaba.dubbo.rpc.Protocol 来实现的。它是一个 SPI 扩展。

@SPI("dubbo")
public interface Protocol {
    int getDefaultPort();

    @Adaptive
    <T> Exporter<T> export(Invoker<T> invoker) throws RpcException;

    @Adaptive
    <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException;

    void destroy();
}

SPI 扩展文件 META-INF/dubbo/internal/com.alibaba.dubbo.rpc.Protocol 内容:

filter=com.alibaba.dubbo.rpc.protocol.ProtocolFilterWrapper
listener=com.alibaba.dubbo.rpc.protocol.ProtocolListenerWrapper
mock=com.alibaba.dubbo.rpc.support.MockProtocol
dubbo=com.alibaba.dubbo.rpc.protocol.dubbo.DubboProtocol
injvm=com.alibaba.dubbo.rpc.protocol.injvm.InjvmProtocol
rmi=com.alibaba.dubbo.rpc.protocol.rmi.RmiProtocol
hessian=com.alibaba.dubbo.rpc.protocol.hessian.HessianProtocol
com.alibaba.dubbo.rpc.protocol.http.HttpProtocol
com.alibaba.dubbo.rpc.protocol.webservice.WebServiceProtocol
thrift=com.alibaba.dubbo.rpc.protocol.thrift.ThriftProtocol
memcached=com.alibaba.dubbo.rpc.protocol.memcached.MemcachedProtocol
redis=com.alibaba.dubbo.rpc.protocol.redis.RedisProtocol
rest=com.alibaba.dubbo.rpc.protocol.rest.RestProtocol
registry=com.alibaba.dubbo.registry.integration.RegistryProtocol
qos=com.alibaba.dubbo.qos.protocol.QosProtocolWrapper

根据前面 Dubbo SPI 的分析,我们可以知道 Dubbo 会默认使用名称为 "dubbo" 的 Protocol 协议(可以通过配置去 override)。 如果我们通过 Protocol extension = (Protocol)ExtensionLoader.getExtensionLoader(Protocol.class).getDefaultExtension();去获取 Protocol 的话,则 SPI 扩展文件中的 wrapper 类会包装在 DubboProtocol 实例上。所以,我们获取到的 extension 实际上是一个 Wrappered Extention。

ProtocolFilterWrapper

在 Protocol 的 Extension 中, ProtocolFilterWrapper 为 Protocol 的 export() 和 refer() 方法添加了一层 Filter 扩展,它会在服务引用和服务消费时,为 Invoker 实体域上包装一层层的 Filter 来做代码增强(AOP)。也就是在 Invoker 被执行之前会经过一堆的 Filter 的处理。

如果说 SPI 是 Dubbo 的静态扩展能力的话,那么 Filter 就是 Dubbo 的动态扩展能力。它能通过 URL 中的参数,来动态拼装 filter。

ProtocolFilterWrapper 会获取所有被激活的 Filter(@Activate),然后为 Invoker 构建一条 filter 链。阅读源码,我们会发现,export() 和 refer() 过程中都会构建 filter 链,也就是说,在服务被调用时 consumer 端会先经过一层 filter 的调用,然后通过 netty 调用到 provider 端;然后 provider 端再经过一层filter 之后,再去调用真正的服务接口实现。

public class ProtocolFilterWrapper implements Protocol {
    private final Protocol protocol;
    public ProtocolFilterWrapper(Protocol protocol) {
        if (protocol == null) {
            throw new IllegalArgumentException("protocol == null");
        }
        this.protocol = protocol;
    }
    // 构建 Invoker Chain
    private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
        // 获取被激活的 Filter 扩展
        Invoker<T> last = invoker;
        List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
        if (!filters.isEmpty()) {
            for (int i = filters.size() - 1; i >= 0; i--) {
                final Filter filter = filters.get(i);
                final Invoker<T> next = last;
                last = new Invoker<T>() {
                    @Override
                    public Class<T> getInterface() {
                        return invoker.getInterface();
                    }
                    @Override
                    public URL getUrl() {
                        return invoker.getUrl();
                    }
                    @Override
                    public boolean isAvailable() {
                        return invoker.isAvailable();
                    }
                    @Override
                    public Result invoke(Invocation invocation) throws RpcException {
                        return filter.invoke(next, invocation);
                    }
                    @Override
                    public void destroy() {
                        invoker.destroy();
                    }
                    @Override
                    public String toString() {
                        return invoker.toString();
                    }
                };
            }
        }
        return last;
    }
?
    @Override
    public int getDefaultPort() {
        return protocol.getDefaultPort();
    }
?
    @Override
    public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
            return protocol.export(invoker);
        }
        return protocol.export(buildInvokerChain(invoker, Constants.SERVICE_FILTER_KEY, Constants.PROVIDER));
    }
?
    @Override
    public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
        if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
            return protocol.refer(type, url);
        }
        return buildInvokerChain(protocol.refer(type, url), Constants.REFERENCE_FILTER_KEY, Constants.CONSUMER);
    }
?
    @Override
    public void destroy() {
        protocol.destroy();
    }
?
}

SPI 扩展文件 META-INF/dubbo/internal/com.alibaba.dubbo.rpc.Filter 内容:

cache=com.alibaba.dubbo.cache.filter.CacheFilter
validation=com.alibaba.dubbo.validation.filter.ValidationFilter
echo=com.alibaba.dubbo.rpc.filter.EchoFilter
generic=com.alibaba.dubbo.rpc.filter.GenericFilter
genericimpl=com.alibaba.dubbo.rpc.filter.GenericImplFilter
token=com.alibaba.dubbo.rpc.filter.TokenFilter
accesslog=com.alibaba.dubbo.rpc.filter.AccessLogFilter
activelimit=com.alibaba.dubbo.rpc.filter.ActiveLimitFilter
classloader=com.alibaba.dubbo.rpc.filter.ClassLoaderFilter
context=com.alibaba.dubbo.rpc.filter.ContextFilter
consumercontext=com.alibaba.dubbo.rpc.filter.ConsumerContextFilter
exception=com.alibaba.dubbo.rpc.filter.ExceptionFilter
executelimit=com.alibaba.dubbo.rpc.filter.ExecuteLimitFilter
deprecated=com.alibaba.dubbo.rpc.filter.DeprecatedFilter
compatible=com.alibaba.dubbo.rpc.filter.CompatibleFilter
timeout=com.alibaba.dubbo.rpc.filter.TimeoutFilter
trace=com.alibaba.dubbo.rpc.protocol.dubbo.filter.TraceFilter
future=com.alibaba.dubbo.rpc.protocol.dubbo.filter.FutureFilter
monitor=com.alibaba.dubbo.monitor.support.MonitorFilter

我们可以添加自定义的 filter 扩展。

DubboProtocol#export(Invoker<T> invoker)

Dubbo 服务暴露时,首先会创建一个 DubboExporter,然后再通过 netty 开启服务端口监听。

DubboExporter 的作用是缓存 Invoker,方便后续操作获取 Invoker。

public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
    URL url = invoker.getUrl();
    // export service.
    String key = serviceKey(url);
    DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
    exporterMap.put(key, exporter);
    ......
    // 开启服务监听
    openServer(url);
    optimizeSerialization(url);
    return exporter;
}

DubboProtocol#refer(Class<T> type,URL url)

Dubbo 服务引用时,首先创建一条与 provider 的 tcp 连接,然后再创建一个 DubboInvoker。

public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {
    optimizeSerialization(url);
    // create rpc invoker. 同时,创建一条与 provider 的 tcp 连接
    DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
    invokers.add(invoker);
    return invoker;
}

创建连接:ExchangeClient[] getClients(@NotNull URL url)

原文地址:https://www.cnblogs.com/kevin-yuan/p/10346550.html

时间: 2024-10-10 11:35:20

【Dubbo 源码解析】03_Dubbo Protocol&Filter的相关文章

dubbo源码解析-zookeeper创建节点

前言 在之前dubbo源码解析-本地暴露中的前言部分提到了两道高频的面试题,其中一道dubbo中zookeeper做注册中心,如果注册中心集群都挂掉,那发布者和订阅者还能通信吗?在上周的dubbo源码解析-zookeeper连接中已经讲到,这周解析的是另一道,即服务提供者能实现失效踢出是根据什么原理? 上周就有朋友问到我,为什么我的源码解析总是偏偏要和面试题挂上钩呢?原因很简单 1.dubbo源码这么多,试问你从哪里做为切入点?也就是说该从哪里看起?所以以面试题为切入点,你可以理解为我是在回答"

Dubbo源码解析之SPI(一):扩展类的加载过程

Dubbo是一款开源的.高性能且轻量级的Java RPC框架,它提供了三大核心能力:面向接口的远程方法调用.智能容错和负载均衡,以及服务自动注册和发现. Dubbo最早是阿里公司内部的RPC框架,于 2011 年开源,之后迅速成为国内该类开源项目的佼佼者,2018年2月,通过投票正式成为 Apache基金会孵化项目.目前宜信公司内部也有不少项目在使用Dubbo. 本系列文章通过拆解Dubbo源码,帮助大家了解Dubbo,做到知其然,并且知其所以然. 一.JDK SPI 1.1 什么是SPI? S

【Dubbo 源码解析】06_Dubbo 服务调用

Dubbo 服务调用 根据上图,可以看出,服务调用过程为: Consumer 端的 Proxy 调用 Cluster 层选择集群中的某一个 Invoker(负载均衡) Invoker 最终会调用 Protocol 层进行 RPC 通讯(netty,tcp 长连接),将服务调用信息和配置信息进行传递 Provider 端 Protocol 层接收到服务调用信息后,最终会调用真实的服务实现 Consumer 端调用过程 通过前面 Dubbo 服务发现&引用 的学习,我们知道,Consumer 端的调

【Dubbo 源码解析】02_Dubbo SPI

Dubbo SPI:(version:2.6.*) Dubbo 微内核 + 插件 模式,得益于 Dubbo SPI .其中 ExtentionLoader是 Dubbo SPI 最核心的类,它负责扩展点的加载和生命周期管理. ExtensionLoader ExtensionLoader 类似于 Java SPI 的 ServiceLoader,负责扩展的加载和生命周期维护,它是 Dubbo SPI 最核心的类. 使用最频繁的 API 有如下几个: public static <T> Exte

【Dubbo 源码解析】07_Dubbo 重试机制

Dubbo 重试机制 通过前面 Dubbo 服务发现&引用 的分析,我们知道,Dubbo 的重试机制是通过 com.alibaba.dubbo.rpc.cluster.support.FailoverClusterInvoker 来实现的: public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcExce

【Dubbo 源码解析】01_Dubbo 设计简介

Dubbo 设计简介 Dubbo 采用 Microkernel + Plugin (微内核 + 插件)模式,Microkernel 只负责组装 Plugin,Dubbo 自身的功能也是通过扩展点实现的,也就是 Dubbo 的所有功能点都可被用户自定义扩展所替换. Dubbo 的核心领域模型 Protocol 是服务域,它是 Invoker 暴露和引用的主功能入口,它负责 Invoker 的生命周期管理. Invoker 是实体域,它是 Dubbo 的核心模型,其它模型都向它靠扰,或转换成它,它代

【Dubbo 源码解析】08_Dubbo与Spring结合

Dubbo 与 Spring 结合 基于 dubbo.jar 内的 META-INF/spring.handlers 配置,Spring 在遇到 dubbo 名称空间时,会回调 DubboNamespaceHandler. 所有 dubbo 的标签,都统一用 DubboBeanDefinitionParser 进行解析,基于一对一属性映射,将 XML 标签解析为 Bean 对象. 在 ServiceConfig.export() 或 ReferenceConfig.get() 初始化时,将 Be

dubbo源码学习(四)初始化过程细节:解析服务

初学dubbo的源码,只做尝试性的去学习,做为自己学习的一个记录,各位看官如果觉得写的有错误或理解的不对,请在留言区告诉我,互相学习.本人能力有限,有大神进入 时请指点. 前面大概介绍了一下关于学习dubbo源码的一些基本知识,今天将真正去看dubbo内部的实现过程,看dubbo的源码前我先把dubbo的用户指南和开发指指南大概的看了一遍,然后从上面找到相应的切入点去看源码,今天将介绍的是dubbo的初始化解析bean的过程.从之前使用过dubbo一些经验,加上http://dubbo.io/的

Dubbo 源码分析 - 服务导出全过程解析

1.服务导出过程 本篇文章,我们来研究一下 Dubbo 导出服务的过程.Dubbo 服务导出过程始于 Spring 容器发布刷新事件,Dubbo 在接收到事件后,会立即执行服务导出逻辑.整个逻辑大致可分为三个部分,第一是前置工作,主要用于检查参数,组装 URL.第二是导出服务,包含导出服务到本地 (JVM),和导出服务到远程两个过程.第三是向注册中心注册服务,用于服务发现.本篇文章将会对这三个部分代码进行详细的分析,在分析之前,我们先来了解一下服务的导出过程. Dubbo 支持两种服务导出方式,