[dubbo 源码之 ]2. 服务消费方如何启动服务

启动流程

消费者在启动之后,会通过ReferenceConfig#get()来生成远程调用代理类。在get方法中,会启动一系列调用函数,我们来一个个解析。

配置同样包含2种:

  • XML
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:dubbo="http://dubbo.apache.org/schema/dubbo"
       xmlns="http://www.springframework.org/schema/beans"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.3.xsd
       http://dubbo.apache.org/schema/dubbo http://dubbo.apache.org/schema/dubbo/dubbo.xsd">

    <dubbo:application name="first-consumer-xml"/>
    <dubbo:registry address="zookeeper://127.0.0.1:2181/"/>
    <dubbo:reference proxy="javassist" scope="remote"
                     id="demoService"
                     generic="false"
                     check="false"
                     async="false"
                     group="dubbo-sxzhongf-group"
                     version="1.0.0"
                     interface="com.sxzhongf.deep.in.dubbo.api.service.IGreetingService">
        <dubbo:method name="sayHello" retries="3" timeout="5000" mock="false" />
        <dubbo:method name="testGeneric" retries="3" timeout="5000" mock="false" />
    </dubbo:reference>
</beans>
  • Java API
public class ApiConsumerApplication {
    public static void main(String[] args) {
        // 1. 创建服务引用对象实例
        ReferenceConfig<IGreetingService> referenceConfig = new ReferenceConfig<IGreetingService>();
        // 2. 设置应用程序信息
        referenceConfig.setApplication(new ApplicationConfig("deep-in-dubbo-first-consumer"));
        // 3. 设置注册中心
        referenceConfig.setRegistry(new RegistryConfig("zookeeper://127.0.0.1:2181/"));
        // 4. 设置服务接口和超时时间
        referenceConfig.setInterface(IGreetingService.class);
        // 默认重试3次
        referenceConfig.setTimeout(5000);
        // 5 设置服务分组和版本
        referenceConfig.setGroup("dubbo-sxzhongf-group");
        referenceConfig.setVersion("1.0.0");
        // 6. 引用服务
        IGreetingService greetingService = referenceConfig.get();
        // 7. 设置隐式参数
        RpcContext.getContext().setAttachment("company", "sxzhongf");
        // 获取provider端传递的隐式参数(FIXME: 需要后续追踪)
//        System.out.println("年龄是:" + RpcContext.getContext().getAttachment("age"));
        //8. 调用服务
        System.out.println(greetingService.sayHello("pan"));
    }
}
1. new ReferenceConfig

在此阶段,会初始化org.apache.dubbo.config.AbstractConfig & org.apache.dubbo.config.ReferenceConfig的静态变量以及静态代码块。

2. ReferenceConfig#get
  • ReferenceConfig#init

    1. 通过DubboBootstrap启动dubbo。
    2. 继而初始化服务的元数据信息,URL.buildKey(interfaceName, group, version)这段用来生成唯一服务的key,所以我们之前说dubbo的唯一标识是通过全路径和group以及version来决定的。
    3. 接下来通过org.apache.dubbo.config.utils.ConfigValidationUtils#checkMock来检查我们mock是否设置正确。
    4. 设置一系列要用的参数(系统运行参数、是否为consumer、是否为泛型调用等等),检查dubbo的注册地址,默认为当前主机IP
  • ReferenceConfig#createProxy 创建调用代理开始
  1. ReferenceConfig#shouldJvmRefer首先判断是否为Injvm调用
  2. 如果不为injvm,判断是否为peer to peer端对端设置,如果为p2p,那么就直连url
  3. 检查注册中心是否存在(注册中心有可能有多个)
  4. 循环检查注册中心是否有效
  5. 配置转换URL
    json registry://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?application=deep-in-dubbo-first-consumer&dubbo=2.0.2&pid=9780&refer=application%3Ddeep-in-dubbo-first-consumer%26dubbo%3D2.0.2%26group%3Ddubbo-sxzhongf-group%26interface%3Dcom.sxzhongf.deep.in.dubbo.api.service.IGreetingService%26methods%3DsayHello%2CtestGeneric%26pid%3D9780%26register.ip%3D192.168.14.99%26release%3D2.7.5%26revision%3D1.0.0%26side%3Dconsumer%26sticky%3Dfalse%26timeout%3D5000%26timestamp%3D1582959441066%26version%3D1.0.0&registry=zookeeper&release=2.7.5&timestamp=1582961922459
  6. 如果只有一个注册中心,执行REF_PROTOCOL.refer(interfaceClass, urls.get(0));来将URL转为Invoker对象,因为private static final Protocol REF_PROTOCOL = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();是扩展是Adaptive,因此在这里会执行Protocol$Adaptive#refer方法,又由于protocol参数值为registry,因此会只是RegistryProtocol#refer,又由于被Wrapper类装配,因此会先执行三个Wrapper类,最后才能执行到RegistryProtocol#refer -> RegistryProtocol#doRefer,在doRefer方法中会订阅服务提供者地址,最后返回Invoker对象。!

那么究竟是如何生成的Invoker对象呢?我们来看下具体代码:
java private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) { // 1.可以查寻RegistryDirectory & StaticDirectory 区别 RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url); directory.setRegistry(registry); directory.setProtocol(protocol); // all attributes of REFER_KEY Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters()); //2. 封装订阅所用URL URL subscribeUrl = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters); if (!ANY_VALUE.equals(url.getServiceInterface()) && url.getParameter(REGISTER_KEY, true)) { directory.setRegisteredConsumerUrl(getRegisteredConsumerUrl(subscribeUrl, url)); registry.register(directory.getRegisteredConsumerUrl()); } //3.build路由规则链 directory.buildRouterChain(subscribeUrl); //4.订阅服务提供者地址 directory.subscribe(subscribeUrl.addParameter(CATEGORY_KEY, PROVIDERS_CATEGORY + "," + CONFIGURATORS_CATEGORY + "," + ROUTERS_CATEGORY)); //5.封装集群策略到虚拟invoker Invoker invoker = cluster.join(directory); return invoker; }
上述代码中,步骤1根据URL生成了一个RegistryDirectory(关于Directory接口的作用,可以自行查询一些,直白一些就是将一堆Invoker对象封装成一个List,只有2种实现RegistryDirectory & StaticDirectory,从命名可看出一个是动态可变,一个不可变),代码2 封装了订阅所要使用的参数信息,代码3则是封装绑定路由规则链,代码4订阅。代码5调用 Cluster$Adaptive#join方法生成Invoker对象。

在代码2种从zk获取服务提供者信息:

一旦zk返回服务提供者列表之后,就会调用RegistryDirectory#notify,如下:

org.apache.dubbo.common.utils.UrlUtils#isMatch中对provider和consumer的api进行匹配校验。继续跟踪:RegistryDirectory#notify -> RegistryDirectory#refreshOverrideAndInvoker -> RegistryDirectory#refreshInvoker -> RegistryDirectory#toInvokerstoInvokers正式将URL转换为Invoker,通过invoker = new InvokerDelegate<>(protocol.refer(serviceType, url), url, providerUrl); 在这里protocol#refer同样执行顺序如:

(dubbo 2.7.5) protocol#refer -> protocol$Adaptive#refer -> QosProtocolWrapper#refer -> ProtocolListenerWrapper#refer -> ProtocolFilterWrapper#refer ->AbstractProtocol#refer->DubboProtocol#protocolBindingRefer,调用代码如下:

```java
@Override
public <T> Invoker<T> protocolBindingRefer(Class<T> serviceType, URL url) throws RpcException {
    optimizeSerialization(url);

    // create rpc invoker.
    DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
    invokers.add(invoker);

    return invoker;
}
```

关注getClients,其中执行了DubboProtocol#getSharedClient -> DubboProtocol#initClient 创建netty client进行连接。

因为这里使用的是明确的DubboInvoker,在回调的时候,Wrapper会对该Invoker进行包装,执行效果如下:

因此,会执行到ProtocolFilterWrapper#buildInvokerChain,该函数会对服务进行调用链跟踪:

```java
private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
    Invoker<T> last = invoker;
    // 获取所有在MATA-INF文件中配置的激活的责任链接口
    List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);

    if (!filters.isEmpty()) {
        for (int i = filters.size() - 1; i >= 0; i--) {
            final Filter filter = filters.get(i);
            final Invoker<T> next = last;
            last = new Invoker<T>() {

                @Override
                public Class<T> getInterface() {
                    return invoker.getInterface();
                }

                @Override
                public URL getUrl() {
                    return invoker.getUrl();
                }

                @Override
                public boolean isAvailable() {
                    return invoker.isAvailable();
                }

                @Override
                public Result invoke(Invocation invocation) throws RpcException {
                    Result asyncResult;
                    try {
                        asyncResult = filter.invoke(next, invocation);
                    } catch (Exception e) {
                        if (filter instanceof ListenableFilter) {// Deprecated!
                            Filter.Listener listener = ((ListenableFilter) filter).listener();
                            if (listener != null) {
                                listener.onError(e, invoker, invocation);
                            }
                        } else if (filter instanceof Filter.Listener) {
                            Filter.Listener listener = (Filter.Listener) filter;
                            listener.onError(e, invoker, invocation);
                        }
                        throw e;
                    } finally {

                    }
                    return asyncResult.whenCompleteWithContext((r, t) -> {
                        if (filter instanceof ListenableFilter) {// Deprecated!
                            Filter.Listener listener = ((ListenableFilter) filter).listener();
                            if (listener != null) {
                                if (t == null) {
                                    listener.onMessage(r, invoker, invocation);
                                } else {
                                    listener.onError(t, invoker, invocation);
                                }
                            }
                        } else if (filter instanceof Filter.Listener) {
                            Filter.Listener listener = (Filter.Listener) filter;
                            if (t == null) {
                                listener.onMessage(r, invoker, invocation);
                            } else {
                                listener.onError(t, invoker, invocation);
                            }
                        } else {// Deprecated!
                            filter.onResponse(r, invoker, invocation);
                        }
                    });
                }

                @Override
                public void destroy() {
                    invoker.destroy();
                }

                @Override
                public String toString() {
                    return invoker.toString();
                }
            };
        }
    }

    return last;
}
```

所有的负载均衡、容错策略等都是在这里绑定的。
7.如果有多个注册中心,将会循环执行步骤6,将URL转换为Invoker对象,然后添加到一个List,分别进行注册之后,然后判断最后一个注册中心url是否有效,针对多订阅的场景,URL中添加cluster参数,默认使用org.apache.dubbo.rpc.cluster.support.registry.ZoneAwareCluster策略,使用org.apache.dubbo.rpc.cluster.Cluster#join将多个Invoker对象封装一个虚拟的Invoker中,否则如果最后一个注册中心是无效的,直接封装Invoker对象。
8.创建服务代理ProxyFactory#getProxy(org.apache.dubbo.rpc.Invoker<T>),因为ProxyFactory是一个适配类。那么同样这里会调用ProxyFactory$Adaptive#getProxy,这里最终就是返回一个代理服务的Invoker对象。

至此,我们的消费端的绑定远程zk的服务就已经结束了。
那么,我们在调用服务器方法的时候服务器端和客户端都是如何处理的呢?下节我们将继续分析。

原文地址:https://www.cnblogs.com/zhangpan1244/p/12394033.html

时间: 2024-11-05 23:37:51

[dubbo 源码之 ]2. 服务消费方如何启动服务的相关文章

dubbo源码学习(一)基础知识及使用的相关技术

初学dubbo的源码,只做尝试性的去学习,做为自己学习的一个记录,各位看官如果觉得写的有错误或理解的不对,请在留言区告诉我,互相学习.本人能力有限,有大神进入 时请指点. Dubbo是Alibaba开源的分布式服务框架,它最大的特点是按照分层的方式来架构,使用这种方式可以使各个层之间解耦合(或者最大限度地松耦合),我们可以非常容易地通过Dubbo来构建分布式服务,并根据自己实际业务应用场景来选择合适的集群容错模式,这个对于很多应用都是迫切希望的,只需要通过简单的配置就能够实现分布式服务调用,也就

Dubbo源码分析(一):概览

Dubbo是一个分布式服务框架,致力于提供高性能和透明化的RPC远程服务调用方案,以及SOA服务治理方案,是阿里巴巴的一个开源项目. 笔者认为阿里巴巴的Dubbo的user guide写得非常好,下面是笔者摘抄的一部分内容: 随着互联网的发展,网站应用的规模不断扩大,常规的垂直应用架构已无法应对,分布式服务架构以及流动计算架构势在必行,亟需一个治理系统确保架构有条不紊的演进. 单一应用架构 当网站流量很小时,只需一个应用,将所有功能都部署在一起,以减少部署节点和成本. 此时,用于简化增删改查工作

Dubbo 源码分析 - 服务导出全过程解析

1.服务导出过程 本篇文章,我们来研究一下 Dubbo 导出服务的过程.Dubbo 服务导出过程始于 Spring 容器发布刷新事件,Dubbo 在接收到事件后,会立即执行服务导出逻辑.整个逻辑大致可分为三个部分,第一是前置工作,主要用于检查参数,组装 URL.第二是导出服务,包含导出服务到本地 (JVM),和导出服务到远程两个过程.第三是向注册中心注册服务,用于服务发现.本篇文章将会对这三个部分代码进行详细的分析,在分析之前,我们先来了解一下服务的导出过程. Dubbo 支持两种服务导出方式,

Dubbo源码分析系列-服务的发布

RPC简化类图 RPC模块核心接口和抽象实现 默认实现Dubbo协议的接口和抽象实现 服务发布过程 调用过程 上图是服务提供者暴露服务的主过程: 首先ServiceConfig类拿到对外提供服务的实际类ref(如:HelloWorldImpl),然后通过ProxyFactory类的getInvoker方法使用ref生成一个AbstractProxyInvoker实例,到这一步就完成具体服务到Invoker的转化.接下来就是Invoker转换到Exporter的过程. Dubbo处理服务暴露的关键

dubbo源码之四——服务发布二

dubbo版本:2.5.4 2. 服务提供者暴露一个服务的详细过程 上图是服务提供者暴露服务的主过程: 首先ServiceConfig类拿到对外提供服务的实际类ref(如:HelloWorldImpl),然后通过ProxyFactory类的getInvoker方法使用ref生成一个AbstractProxyInvoker实例,到这一步就完成具体服务到Invoker的转化.接下来就是Invoker转换到Exporter的过程. Dubbo处理服务暴露的关键就在Invoker转换到Exporter的

dubbo源码学习(五)dubbo暴露服务的过程

初学dubbo的源码,只做尝试性的去学习,做为自己学习的一个记录,各位看官如果觉得写的有错误或理解的不对,请在留言区告诉我,互相学习.本人能力有限,有大神进入 时请指点. dubbo采用的nio异步的通信,通信协议默认为 netty,当然也可以选择 mina,grizzy.在服务端(provider)在启动时主要是开启netty监听,在zookeeper上注册服务节点,处理消费者请求,返回处理后的消息给消费者,消费者使用服务时主要是订阅服务的节点,监听zookeeper节点目录,服务端的变化时z

【Dubbo源码阅读系列】服务暴露之远程暴露

引言 什么叫 远程暴露 ?试着想象着这么一种场景:假设我们新增了一台服务器 A,专门用于发送短信提示给指定用户.那么问题来了,我们的 Message 服务上线之后,应该如何告知调用方服务器,服务器 A 提供了 Message 功能?那么我们是不是可以把目前已提供的服务暴露在一个地方,让调用方知道某台机器提供了某个特定功能?带着这样的假设,我们今天就来聊聊 Dubbo 服务暴露之远程暴露!! 服务远程暴露 先回顾一下上篇文章,上篇文章我们聊到了 ServiceConfig 的 export() 方

dubbo源码阅读笔记--服务调用时序

上接dubbo源码阅读笔记--暴露服务时序,继续梳理服务调用时序,下图右面红线流程. 整理了调用时序图 分为3步,connect,decode,invoke. 连接 AllChannelHandler.connected(Channel) line: 38 HeartbeatHandler.connected(Channel) line: 47 MultiMessageHandler(AbstractChannelHandlerDelegate).connected(Channel) line:

dubbo源码学习(四)初始化过程细节:解析服务

初学dubbo的源码,只做尝试性的去学习,做为自己学习的一个记录,各位看官如果觉得写的有错误或理解的不对,请在留言区告诉我,互相学习.本人能力有限,有大神进入 时请指点. 前面大概介绍了一下关于学习dubbo源码的一些基本知识,今天将真正去看dubbo内部的实现过程,看dubbo的源码前我先把dubbo的用户指南和开发指指南大概的看了一遍,然后从上面找到相应的切入点去看源码,今天将介绍的是dubbo的初始化解析bean的过程.从之前使用过dubbo一些经验,加上http://dubbo.io/的