【Dubbo 源码解析】06_Dubbo 服务调用

Dubbo 服务调用

根据上图,可以看出,服务调用过程为:

  1. Consumer 端的 Proxy 调用 Cluster 层选择集群中的某一个 Invoker(负载均衡)
  2. Invoker 最终会调用 Protocol 层进行 RPC 通讯(netty,tcp 长连接),将服务调用信息和配置信息进行传递
  3. Provider 端 Protocol 层接收到服务调用信息后,最终会调用真实的服务实现

Consumer 端调用过程

通过前面 Dubbo 服务发现&引用 的学习,我们知道,Consumer 端的调用过程大体如下:

  1. 执行 FailoverClusterInvoker#invoke(Invocation invocation) (负载均衡。当有多个 provider 时走这一步,没有的话,就跳过)
  2. 执行 Filter#invoke(Invoker<?> invoker, Invocation invocation) (所有 group="provider" 的 Filter)
  3. 执行 DubboInvoker#invoke(Invocation inv)

所以,最终的通讯过程是由 DubboInvoker 来完成的:

protected Result doInvoke(final Invocation invocation) throws Throwable {
    RpcInvocation inv = (RpcInvocation) invocation;
    final String methodName = RpcUtils.getMethodName(invocation);
    inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
    inv.setAttachment(Constants.VERSION_KEY, version);
    // 选择 ExchangeClient(一般情况下只有一个)
    ExchangeClient currentClient;
    if (clients.length == 1) {
        currentClient = clients[0];
    } else {
        currentClient = clients[index.getAndIncrement() % clients.length];
    }
    try {
        boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
        boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
        int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
        if (isOneway) {
            boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
            currentClient.send(inv, isSent);
            RpcContext.getContext().setFuture(null);
            return new RpcResult();
        } else if (isAsync) { // dubbo 异步调用
            ResponseFuture future = currentClient.request(inv, timeout);
            RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
            return new RpcResult();
        } else { // 服务调用分支
            RpcContext.getContext().setFuture(null);
            return (Result) currentClient.request(inv, timeout).get();
        }
    } catch (TimeoutException e) {
        throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
    } catch (RemotingException e) {
        throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
    }
}

阅读源码可以发现,获取 Rpc 调用结果的方法是: ExchangeClient#request(inv, timeout).get(), 最终会调用 NettyChannel#send(Object message, false) 去写消息。而消息的编解码是通过 com.alibaba.dubbo.rpc.protocol.dubbo.DubboCodec 来处理的。

// 对 Request 消息进行编码(DubboCodec.class)
protected void encodeRequestData(Channel channel, ObjectOutput out, Object data) throws IOException {
    RpcInvocation inv = (RpcInvocation) data;
    out.writeUTF(inv.getAttachment(Constants.DUBBO_VERSION_KEY, DUBBO_VERSION));
    out.writeUTF(inv.getAttachment(Constants.PATH_KEY));
    out.writeUTF(inv.getAttachment(Constants.VERSION_KEY));
    out.writeUTF(inv.getMethodName()); // 写出调用方法的名称
    out.writeUTF(ReflectUtils.getDesc(inv.getParameterTypes())); // 写出调用方法的参数
    Object[] args = inv.getArguments();
    if (args != null)
        for (int i = 0; i < args.length; i++) {
            out.writeObject(encodeInvocationArgument(channel, inv, i)); // 写出调用的实参
        }
    out.writeObject(inv.getAttachments()); // 写出 attachments
}

Provider 端调用过程

Provider 端接收到 Consumer 端的消息后,会通过 DubboCodec#decodeBody(Channel channel, InputStream is, byte[] header)处理,将消息转化为 RpcInvocation。最终通过预先设置好的 netty 的 handler 来将 RpcInvocation 转化为 Invoker 的调用。

通过 Dubbo 服务发现&引用 的学习,我们知道,这个 handler 最终会调用 DubboProtocol#requestHandler 来处理,最终将 Invocation 转化为 Invoker 的调用。

调用完成后,返回结果通过DubboCodec#encodeResponseData(Channel channel, ObjectOutput out, Object data)进行编码,最终写回到 Consumer 端:

// 对返回结果进行编码
protected void encodeResponseData(Channel channel, ObjectOutput out, Object data) throws IOException {
    Result result = (Result) data;
    Throwable th = result.getException();
    if (th == null) {
        Object ret = result.getValue();
        if (ret == null) {
            out.writeByte(RESPONSE_NULL_VALUE);
        } else {
            out.writeByte(RESPONSE_VALUE);
            out.writeObject(ret);
        }
    } else {
        out.writeByte(RESPONSE_WITH_EXCEPTION);
        out.writeObject(th);
    }
}

Consumer 端接收调用返回

Conmuser 端接收到调用的返回数据后,会由 DubboCodec#decodeBody(Channel channel, InputStream is, byte[] header)将数据转化成 RpcResult。

ExchangeClient#request(inv, timeout).get() 最终会调用 DefaultFuture#get() 来获取 Response 中的返回数据。DefaultFuture#get() 使用了同步阻塞的方式来等待 provider 端的返回数据。

官方如是说:

满眼都是 Invoker

由于 Invoker 是 Dubbo 领域模型中非常重要的一个概念,很多设计思路都是向它靠拢。这就使得 Invoker渗透在整个实现代码里,对于刚开始接触 Dubbo 的人,确实容易给搞混了。 下面我们用一个精简的图来说明最重要的两种 Invoker:服务提供 Invoker 和服务消费 Invoker

服务提供 Invoker:DubboInvoker(默认)、HessianRpcInvoker等(视协议而定)

服务消费 Invoker:AbstractProxyInvoker

原文地址:https://www.cnblogs.com/kevin-yuan/p/10346566.html

时间: 2024-08-01 08:00:47

【Dubbo 源码解析】06_Dubbo 服务调用的相关文章

dubbo源码阅读笔记--服务调用时序

上接dubbo源码阅读笔记--暴露服务时序,继续梳理服务调用时序,下图右面红线流程. 整理了调用时序图 分为3步,connect,decode,invoke. 连接 AllChannelHandler.connected(Channel) line: 38 HeartbeatHandler.connected(Channel) line: 47 MultiMessageHandler(AbstractChannelHandlerDelegate).connected(Channel) line:

dubbo源码解析-zookeeper创建节点

前言 在之前dubbo源码解析-本地暴露中的前言部分提到了两道高频的面试题,其中一道dubbo中zookeeper做注册中心,如果注册中心集群都挂掉,那发布者和订阅者还能通信吗?在上周的dubbo源码解析-zookeeper连接中已经讲到,这周解析的是另一道,即服务提供者能实现失效踢出是根据什么原理? 上周就有朋友问到我,为什么我的源码解析总是偏偏要和面试题挂上钩呢?原因很简单 1.dubbo源码这么多,试问你从哪里做为切入点?也就是说该从哪里看起?所以以面试题为切入点,你可以理解为我是在回答"

Dubbo源码分析系列-服务的发布

RPC简化类图 RPC模块核心接口和抽象实现 默认实现Dubbo协议的接口和抽象实现 服务发布过程 调用过程 上图是服务提供者暴露服务的主过程: 首先ServiceConfig类拿到对外提供服务的实际类ref(如:HelloWorldImpl),然后通过ProxyFactory类的getInvoker方法使用ref生成一个AbstractProxyInvoker实例,到这一步就完成具体服务到Invoker的转化.接下来就是Invoker转换到Exporter的过程. Dubbo处理服务暴露的关键

Dubbo源码解析之SPI(一):扩展类的加载过程

Dubbo是一款开源的.高性能且轻量级的Java RPC框架,它提供了三大核心能力:面向接口的远程方法调用.智能容错和负载均衡,以及服务自动注册和发现. Dubbo最早是阿里公司内部的RPC框架,于 2011 年开源,之后迅速成为国内该类开源项目的佼佼者,2018年2月,通过投票正式成为 Apache基金会孵化项目.目前宜信公司内部也有不少项目在使用Dubbo. 本系列文章通过拆解Dubbo源码,帮助大家了解Dubbo,做到知其然,并且知其所以然. 一.JDK SPI 1.1 什么是SPI? S

dubbo源码分析8——服务暴露概述

从上文中可知,com.alibaba.dubbo.config.spring.ServiceBean类是负责解析<dubbo:service/>的配置的,下面是它的类图 从类图上可知它继承了ServiceConfig类,并实现了5个接口,在这5个接口中有两个接口比较重要,其中InitializingBean是进行bean的初始化工作的,ApplicationListener接口是监听spring容器事件的.先通过outline视图观察一下ServiceBean类:    从outline视图中

dubbo源码阅读之服务导出

dubbo服务导出 常见的使用dubbo的方式就是通过spring配置文件进行配置.例如下面这样 <?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns

Dubbo源码学习之-服务导出

前言 忙的时候,会埋怨学习的时间太少,缺少个人的空间,于是会争分夺秒的工作.学习.而一旦繁忙的时候过去,有时间了之后,整个人又会不自觉的陷入一种懒散的状态中,时间也显得不那么重要了,随便就可以浪费掉几个小时.可见普通人的学习之路要主动地去克服掉很多阻碍,最主要的阻碍还是来自于自身,周期性的不想学习.不自觉的懒散.浅尝辄止的态度.好高骛远贪多的盲目...哎,学习之路,还是要时刻提醒自己,需勤勉致知. 闲话少叙,今天的学习目标是要尽量的了解清楚Dubbo框架中的服务导出功能,先附上Dubbo官网上的

【Dubbo 源码解析】02_Dubbo SPI

Dubbo SPI:(version:2.6.*) Dubbo 微内核 + 插件 模式,得益于 Dubbo SPI .其中 ExtentionLoader是 Dubbo SPI 最核心的类,它负责扩展点的加载和生命周期管理. ExtensionLoader ExtensionLoader 类似于 Java SPI 的 ServiceLoader,负责扩展的加载和生命周期维护,它是 Dubbo SPI 最核心的类. 使用最频繁的 API 有如下几个: public static <T> Exte

【Dubbo 源码解析】03_Dubbo Protocol&amp;Filter

Protocol & Filter Dubbo 服务暴露和服务引用都是通过的 com.alibaba.dubbo.rpc.Protocol 来实现的.它是一个 SPI 扩展. @SPI("dubbo") public interface Protocol { int getDefaultPort(); @Adaptive <T> Exporter<T> export(Invoker<T> invoker) throws RpcExceptio