源码分析---SOFARPC客户端服务调用

我们首先看看BoltClientProxyInvoker的关系图

所以当我们用BoltClientProxyInvoker#invoke的时候实际上是调用了父类的invoke方法
ClientProxyInvoker#invoke

    @Override
    public SofaResponse invoke(SofaRequest request) throws SofaRpcException {
        SofaResponse response = null;
        Throwable throwable = null;
        try {
            RpcInternalContext.pushContext();
            RpcInternalContext context = RpcInternalContext.getContext();
            context.setProviderSide(false);
            // 包装request请求
            decorateRequest(request);
            try {
                // 产生开始调用事件
                if (EventBus.isEnable(ClientStartInvokeEvent.class)) {
                    EventBus.post(new ClientStartInvokeEvent(request));
                }
                // 得到结果
                response = cluster.invoke(request);
            } catch (SofaRpcException e) {
                throwable = e;
                throw e;
            } finally {
                // 产生调用结束事件
                if (!request.isAsync()) {
                    if (EventBus.isEnable(ClientEndInvokeEvent.class)) {
                        EventBus.post(new ClientEndInvokeEvent(request, response, throwable));
                    }
                }
            }
            // 包装响应
            decorateResponse(response);
            return response;
        } finally {
            RpcInternalContext.removeContext();
            RpcInternalContext.popContext();
        }
    }

这个方法主要做了几件事:

  1. 包装request请求,设置必要的参数
  2. 调用FailOverCluster的invoke方法,将reques请求发送出去,并得到response相应
  3. 包装response响应

我们在调用FailOverCluster的时候实际上是调用的父类AbstractCluster的invoker方法,FailOverCluster关系图如下:

所以我们进入到AbstractCluster的invoker方法中:

    @Override
    public SofaResponse invoke(SofaRequest request) throws SofaRpcException {
        SofaResponse response = null;
        try {
            // 做一些初始化检查,例如未连接可以连接
            checkClusterState();
            // 开始调用
            countOfInvoke.incrementAndGet(); // 计数+1
            response = doInvoke(request);
            return response;
        } catch (SofaRpcException e) {
            // 客户端收到异常(客户端自己的异常)
            throw e;
        } finally {
            countOfInvoke.decrementAndGet(); // 计数-1
        }
    }

checkClusterState方法主要是用来校验是否已销毁了,或是调用了init方法进行初始化了。
然后会在调用之前记一下数。
然后我们进入到doInvoke方法中:

    public SofaResponse doInvoke(SofaRequest request) throws SofaRpcException {
        String methodName = request.getMethodName();
        int retries = consumerConfig.getMethodRetries(methodName);
        int time = 0;
        SofaRpcException throwable = null;// 异常日志
        List<ProviderInfo> invokedProviderInfos = new ArrayList<ProviderInfo>(retries + 1);
        do {
            //负载均衡
            ProviderInfo providerInfo = select(request, invokedProviderInfos);
            try {
                //调用过滤器链
                SofaResponse response = filterChain(providerInfo, request);
                if (response != null) {
                    if (throwable != null) {
                        if (LOGGER.isWarnEnabled(consumerConfig.getAppName())) {
                            LOGGER.warnWithApp(consumerConfig.getAppName(),
                                LogCodes.getLog(LogCodes.WARN_SUCCESS_BY_RETRY,
                                    throwable.getClass() + ":" + throwable.getMessage(),
                                    invokedProviderInfos));
                        }
                    }
                    return response;
                } else {
                    throwable = new SofaRpcException(RpcErrorType.CLIENT_UNDECLARED_ERROR,
                        "Failed to call " + request.getInterfaceName() + "." + methodName
                            + " on remote server " + providerInfo + ", return null");
                    time++;
                }
            } catch (SofaRpcException e) { // 服务端异常+ 超时异常 才发起rpc异常重试
                if (e.getErrorType() == RpcErrorType.SERVER_BUSY
                    || e.getErrorType() == RpcErrorType.CLIENT_TIMEOUT) {
                    throwable = e;
                    time++;
                } else {
                    throw e;
                }
            } catch (Exception e) { // 其它异常不重试
                throw new SofaRpcException(RpcErrorType.CLIENT_UNDECLARED_ERROR,
                    "Failed to call " + request.getInterfaceName() + "." + request.getMethodName()
                        + " on remote server: " + providerInfo + ", cause by unknown exception: "
                        + e.getClass().getName() + ", message is: " + e.getMessage(), e);
            } finally {
                if (RpcInternalContext.isAttachmentEnable()) {
                    RpcInternalContext.getContext().setAttachment(RpcConstants.INTERNAL_KEY_INVOKE_TIMES,
                        time + 1); // 重试次数
                }
            }
            invokedProviderInfos.add(providerInfo);
        } while (time <= retries);

        throw throwable;
    }

这个方法里面主要做了这这件事:

  1. 如果失败的话就循环调用
  2. 负载均衡,选取provider
  3. 通过过滤器链调用服务端,并返回结果
  4. 异常处理

接着我们进入到filterChain方法中,根据过滤器链最后会跳到ConsumerInvoker中的invoke方法

    @Override
    public SofaResponse invoke(SofaRequest sofaRequest) throws SofaRpcException {
        // 设置下服务器应用
        ProviderInfo providerInfo = RpcInternalContext.getContext().getProviderInfo();
        String appName = providerInfo.getStaticAttr(ProviderInfoAttrs.ATTR_APP_NAME);
        if (StringUtils.isNotEmpty(appName)) {
            sofaRequest.setTargetAppName(appName);
        }

        // 目前只是通过client发送给服务端
        return consumerBootstrap.getCluster().sendMsg(providerInfo, sofaRequest);
    }

consumerBootstrap.getCluster()会返回FailOverCluster实例,然后调用父类AbstractCluster的sendMsg方法

    public SofaResponse sendMsg(ProviderInfo providerInfo, SofaRequest request) throws SofaRpcException {
        ClientTransport clientTransport = connectionHolder.getAvailableClientTransport(providerInfo);
        if (clientTransport != null && clientTransport.isAvailable()) {
            return doSendMsg(providerInfo, clientTransport, request);
        } else {
            throw unavailableProviderException(request.getTargetServiceUniqueName(), providerInfo.getOriginUrl());
        }
    }

    protected SofaResponse doSendMsg(ProviderInfo providerInfo, ClientTransport transport,
                                     SofaRequest request) throws SofaRpcException {
        RpcInternalContext context = RpcInternalContext.getContext();
        // 添加调用的服务端远程地址
        RpcInternalContext.getContext().setRemoteAddress(providerInfo.getHost(), providerInfo.getPort());
        try {
            checkProviderVersion(providerInfo, request); // 根据服务端版本特殊处理
            String invokeType = request.getInvokeType();
            int timeout = resolveTimeout(request, consumerConfig, providerInfo);

            SofaResponse response = null;
            // 同步调用
            if (RpcConstants.INVOKER_TYPE_SYNC.equals(invokeType)) {
                long start = RpcRuntimeContext.now();
                try {
                    response = transport.syncSend(request, timeout);
                } finally {
                    if (RpcInternalContext.isAttachmentEnable()) {
                        long elapsed = RpcRuntimeContext.now() - start;
                        context.setAttachment(RpcConstants.INTERNAL_KEY_CLIENT_ELAPSE, elapsed);
                    }
                }
            }
            // 单向调用
            else if (RpcConstants.INVOKER_TYPE_ONEWAY.equals(invokeType)) {
                long start = RpcRuntimeContext.now();
                try {
                    transport.oneWaySend(request, timeout);
                    response = buildEmptyResponse(request);
                } finally {
                    if (RpcInternalContext.isAttachmentEnable()) {
                        long elapsed = RpcRuntimeContext.now() - start;
                        context.setAttachment(RpcConstants.INTERNAL_KEY_CLIENT_ELAPSE, elapsed);
                    }
                }
            }
            // Callback调用
            else if (RpcConstants.INVOKER_TYPE_CALLBACK.equals(invokeType)) {
                // 调用级别回调监听器
                SofaResponseCallback sofaResponseCallback = request.getSofaResponseCallback();
                if (sofaResponseCallback == null) {
                    SofaResponseCallback methodResponseCallback = consumerConfig
                        .getMethodOnreturn(request.getMethodName());
                    if (methodResponseCallback != null) { // 方法的Callback
                        request.setSofaResponseCallback(methodResponseCallback);
                    }
                }
                // 记录发送开始时间
                context.setAttachment(RpcConstants.INTERNAL_KEY_CLIENT_SEND_TIME, RpcRuntimeContext.now());
                // 开始调用
                transport.asyncSend(request, timeout);
                response = buildEmptyResponse(request);
            }
            // Future调用
            else if (RpcConstants.INVOKER_TYPE_FUTURE.equals(invokeType)) {
                // 记录发送开始时间
                context.setAttachment(RpcConstants.INTERNAL_KEY_CLIENT_SEND_TIME, RpcRuntimeContext.now());
                // 开始调用
                ResponseFuture future = transport.asyncSend(request, timeout);
                // 放入线程上下文
                RpcInternalContext.getContext().setFuture(future);
                response = buildEmptyResponse(request);
            } else {
                throw new SofaRpcException(RpcErrorType.CLIENT_UNDECLARED_ERROR, "Unknown invoke type:" + invokeType);
            }
            return response;
        } catch (SofaRpcException e) {
            throw e;
        } catch (Throwable e) { // 客户端其它异常
            throw new SofaRpcException(RpcErrorType.CLIENT_UNDECLARED_ERROR, e);
        }
    }

sendMsg方法最后会调用到doSendMsg。
soSendMsg里面主要做了如下几件事:

  1. 如果是同步调用,则直接返回封装好的参数
  2. 如果是单向调用,则调用buildEmptyResponse方法,返回一个空的response
  3. 如果是callback调用asyncSend,RPC在获取到服务端的结果后会自动执行该回调实现。
  4. 服务端返回响应结果被 RPC 缓存,当客户端需要响应结果的时候需要主动获取结果,获取结果的过程阻塞线程。

原文地址:https://www.cnblogs.com/luozhiyun/p/11261303.html

时间: 2024-08-02 16:21:42

源码分析---SOFARPC客户端服务调用的相关文章

源码分析---SOFARPC可扩展的机制SPI

这几天离职在家,正好没事可以疯狂的输出一下,本来想写DUBBO的源码解析的,但是发现写DUBBO源码的太多了,所以找一个写的不那么多的框架,所以就选中SOFARPC这个框架了. SOFARPC是蚂蚁金服开源的一个RPC框架,相比DUBBO它没有这么多历史的包袱,代码更加简洁,设计思路更加清晰,更加容易去理解其中的代码. 那么为什么要去重写原生的SPI呢?官方给出了如下解释: 按需加载 可以有别名 可以有优先级进行排序和覆盖 可以控制是否单例 可以在某些场景下使用编码 可以指定扩展配置位置 可以排

【Android】从源码分析PagerAdapter/FragmentPagerAdapter调用notifydataSetChanged()刷新的原理

相信用过viewpager的同学都会遇到调用notifydataSetChanged()后不刷新或者不符合预期的问题,今天就来分析分析这里的来龙去脉.这一切还得从viewpager的setAdapter说起: /** * Set a PagerAdapter that will supply views for this pager as needed. * * @param adapter Adapter to use */ public void setAdapter(PagerAdapte

8.源码分析---从设计模式中看SOFARPC中的EventBus?

我们在前面分析客户端引用的时候会看到如下这段代码: // 产生开始调用事件 if (EventBus.isEnable(ClientStartInvokeEvent.class)) { EventBus.post(new ClientStartInvokeEvent(request)); } 这里用EventBus调用了一下post方法之后就什么也没做了,就方法名来看是发送了一个post请求,也不知道发给谁,到底有什么用. 所以这一节我们来分析一下EventBus这个类的作用. 首先我们来看一下

PopupWindow源码分析

目录介绍 1.最简单的创建方法 1.1 PopupWindow构造方法 1.2 显示PopupWindow 1.3 最简单的创建 1.4 注意问题宽和高属性 2.源码分析 2.1 setContentView(View contentView) 2.2 showAsDropDown()源码 2.3 dismiss()源码分析 2.4 PopupDecorView源码分析 3.经典总结 3.1 PopupWindow和Dialog有什么区别? 3.2 创建和销毁的大概流程 3.3 为何弹窗点击一下

android 从源码分析为什么Listview初次显示时没滚动却自动调用onScroll方法的原因

我们做Listview的分批加载时,需要为Listview调用setOnScrollListener(具体代码可见我上一篇博客) 可是,我们会发现,当运行程序时,listview明明没有滚动,那为什么系统会调用onScroll方法呢?(补充:此时onScrollStateChanged并不会调用) 我们先看setOnScrollListener源码: public void setOnScrollListener(OnScrollListener l) { mOnScrollListener =

7、SpringMVC源码分析(2):分析HandlerAdapter.handle方法,了解handler方法的调用细节以及@ModelAttribute注解

从上一篇 SpringMVC源码分析(1) 中我们了解到在DispatcherServlet.doDispatch方法中会通过 mv = ha.handle(processedRequest, response, mappedHandler.getHandler()) 这样的方式来执行request的handler方法. 先来分析一下ha.handle方法的调用过程:HandlerAdapter接口有一个抽象实现类AbstractHandlerMethodAdapter,在该抽象类中通过具体方法

vlc源码分析之调用live555接收RTSP数据

首先了解RTSP/RTP/RTCP相关概念,尤其是了解RTP协议:RTP与RTCP协议介绍(转载). vlc使用模块加载机制调用live555,调用live555的文件是live555.cpp. 一.几个重要的类 以下向左箭头("<-")为继承关系. 1. RTPInterface RTPInterface是RTPSource的成员变量,其成员函数handleRead会读取网络数据存入BufferedPacket内,该类最终会调到UDP的发送接收函数. Boolean RTPIn

SpringCloud(4)---Ribbon服务调用,源码分析

SpringCloud(4)---Ribbon 本篇模拟订单服务调用商品服务,同时商品服务采用集群部署. 注册中心服务端口号7001,订单服务端口号9001,商品集群端口号:8001.8002.8003. 各服务的配置文件这里我这边不在显示了,和上篇博客配置一样.博客地址:SpringCloud(3)---Eureka服务注册与发现 一.商品中心服务端 1.pom.xml <?xml version="1.0" encoding="UTF-8"?> &l

Yarn源码分析之MRAppMaster上MapReduce作业处理总流程(一)

我们知道,如果想要在Yarn上运行MapReduce作业,仅需实现一个ApplicationMaster组件即可,而MRAppMaster正是MapReduce在Yarn上ApplicationMaster的实现,由其控制MR作业在Yarn上的执行.如此,随之而来的一个问题就是,MRAppMaster是如何控制MapReduce作业在Yarn上运行的,换句话说,MRAppMaster上MapReduce作业处理总流程是什么?这就是本文要研究的重点. 通过MRAppMaster类的定义我们就能看出