SOFA 源码分析 —— 服务发布过程

前言

SOFA 包含了 RPC 框架,底层通信框架是 bolt ,基于 Netty 4,今天将通过 SOFA—RPC 源码中的例子,看看他是如何发布一个服务的。

示例代码

下面的代码在 com.alipay.sofa.rpc.quickstart.QuickStartServer 类下。

ServerConfig serverConfig = new ServerConfig()
    .setProtocol("bolt") // 设置一个协议,默认bolt
    .setPort(9696) // 设置一个端口,默认12200
    .setDaemon(false); // 非守护线程

ProviderConfig<HelloService> providerConfig = new ProviderConfig<HelloService>()
    .setInterfaceId(HelloService.class.getName()) // 指定接口
    .setRef(new HelloServiceImpl()) // 指定实现
    .setServer(serverConfig); // 指定服务端

providerConfig.export(); // 发布服务

首先,创建一个 ServerConfig ,包含了端口,协议等基础信息,当然,这些都是手动设定的,在该类加载的时候,会自动加载很多配置文件中的服务器默认配置。比如 RpcConfigs 类,RpcRuntimeContext 上下文等。

然后呢,创建一个 ProviderConfig,也是个 config,不过多继承了一个 AbstractInterfaceConfig 抽象类,该类是接口级别的配置,而 ServerConfig 是 服务器级别的配置。虽然都继承了 AbstractIdConfig。

ProviderConfig 包含了接口名称,接口指定实现类,还有服务器的配置。

最后,ProviderConfig 调用 export 发布服务。

展示给我的 API 很简单,但内部是如何实现的呢?

在看源码之前,我们思考一下:如果我们自己来实现,怎么弄?

RPC 框架简单一点来说,就是使用动态代理和 Socket。

SOFA 使用 Netty 来做网络通信框架,我们之前也写过一个简单的 Netty RPC,主要是通过 handler 的 channelRead 方法来实现。

SOFA 是这么操作的吗?

一起来看看。

# 源码分析

上面的示例代码其实就是 3 个步骤,创建 ServerConfig,创建 ProviderConfig,调用 export 方法。

先看第一步,还是有点意思的。

虽然是空构造方法,但 ServerConfig 的属性都是自动初始化的,而他的父类 AbstractIdConfig 更有意思了,父类有 1 个地方值得注意:

static {
    RpcRuntimeContext.now();
}

熟悉类加载的同学都知道,这是为了主动加载 RpcRuntimeContext ,看名字是 RPC 运行时上下文,所谓上下文,大约就是我们人类聊天中的 "老地方" 的意思。

这个上下文会在静态块中加载 Module(基于扩展点实现),注册 JVM 关闭钩子(类似 Tomcat)。还有很多配置信息。

然后呢?创建 ProviderConfig 对象。这个类比上面的那个类多继承了一个 AbstractInterfaceConfig,接口级别的配置。比如有些方法我不想发布啊,比如权重啊,比如超时啊,比如具体的实现类啊等等,当然还需要一个 ServerConfig 的属性(注册到 Server 中啊喂)。

最后就是发布了。export 方法。

ProviderCofing 拥有一个 export 方法,但并不是直接就在这里发布的,因为他是一个 config,不适合在config 里面做这些事情,违背单一职责。

SOFA 使用了一个 Bootstrap 类来进行操作。和大部分服务器类似,这里就是启动服务器的地方。因为这个类会多线程使用,比如并发的发布服务。而不是一个一个慢慢的发布服务。所以他不是单例的,而是和 Config 一起使用的,并缓存在 map 中。

ProviderBootstrap 目前有 3 个实现:Rest,Bolt,Dubbo。Bolt 是他的默认实现。

export 方法默认有个实现(Dubbo 的话就要重写了)。主要逻辑是执行 doExport 方法,其中包括延迟加载逻辑。

而 doExport 方法中,就是 SOFA 发布服务的逻辑所在了。

楼主将方法的异常处理逻辑去除,整体如下:

 private void doExport() {
        if (exported) {
            return;
        }
        String key = providerConfig.buildKey();
        String appName = providerConfig.getAppName();
        // 检查参数
        checkParameters();
        // 注意同一interface,同一uniqleId,不同server情况
        AtomicInteger cnt = EXPORTED_KEYS.get(key); // 计数器
        if (cnt == null) { // 没有发布过
            cnt = CommonUtils.putToConcurrentMap(EXPORTED_KEYS, key, new AtomicInteger(0));
        }
        int c = cnt.incrementAndGet();
        int maxProxyCount = providerConfig.getRepeatedExportLimit();
        if (maxProxyCount > 0) {
          // 超过最大数量,直接抛出异常
        }
        // 构造请求调用器
        providerProxyInvoker = new ProviderProxyInvoker(providerConfig);
        // 初始化注册中心
        if (providerConfig.isRegister()) {
            List<RegistryConfig> registryConfigs = providerConfig.getRegistry();
            if (CommonUtils.isNotEmpty(registryConfigs)) {
                for (RegistryConfig registryConfig : registryConfigs) {
                    RegistryFactory.getRegistry(registryConfig); // 提前初始化Registry
                }
            }
        }
        // 将处理器注册到server
        List<ServerConfig> serverConfigs = providerConfig.getServer();
        for (ServerConfig serverConfig : serverConfigs) {
            Server server = serverConfig.buildIfAbsent();
            // 注册序列化接口
            server.registerProcessor(providerConfig, providerProxyInvoker);
            if (serverConfig.isAutoStart()) {
                server.start();
            }
        }

        // 注册到注册中心
        providerConfig.setConfigListener(new ProviderAttributeListener());
        register();

        // 记录一些缓存数据
        RpcRuntimeContext.cacheProviderConfig(this);
        exported = true;
    }

主要逻辑如下:

  1. 根据 providerConfig 创建一个 key 和 AppName。
  2. 检验同一个服务多次发布的次数。
  3. 创建一个 ProviderProxyInvoker, 其中包含了过滤器链,而过滤器链的最后一链就是对接口实现类的调用。
  4. 初始化注册中心,创建 Server(会有多个Server,因为可能配置了多个协议)。
  5. 将 config 和 invoker 注册到 Server 中。内部是将其放进了一个 Map 中。
  6. 启动 Server。启动 Server 其实就是启动 Netty 服务,并创建一个 RpcHandler,也就是 Netty 的 Handler,这个 RpcHandler 内部含有一个数据结构,包含接口级别的 invoker。所以,当请求进入的时候,RpcHandler 的 channelRead 方法会被调用,然后间接的调用 invoker 方法。
  7. 成功启动后,注册到注册中心。将数据缓存到 RpcRuntimeContext 的一个 Set 中。

一起来详细看看。

Invoker 怎么构造的?很简单,最主要的就是过滤器。关于过滤器,我们之前已经写过一篇文章了。不再赘述。

关键看看 Server 是如何构造的。

关键代码 serverConfig.buildIfAbsent(),类似 HashMap 的 putIfAbsent。如果不存在就创建。

Server 接口目前有 2 个实现,bolt 和 rest。当然,Server 也是基于扩展的,所以,不用怕,可以随便增加实现。

关键代码在 ServerFactory 的 getServer 中,其中会获取扩展点的 Server,然后,执行 Server 的 init 方法,我们看看默认 bolt 的 init 方法。

    @Override
    public void init(ServerConfig serverConfig) {
        this.serverConfig = serverConfig;
        // 启动线程池
        bizThreadPool = initThreadPool(serverConfig);
        boltServerProcessor = new BoltServerProcessor(this);
    }

保存了 serverConfig 的引用,启动了一个业务线程池,创建了一个 BoltServerProcessor 对象。

第一:这个线程池会在 Bolt 的 RpcHandler 中被使用,也就是说,复杂业务都是在这个线程池执行,不会影响 Netty 的 IO 线程。

第二:BoltServerProcessor 非常重要,他的构造方法包括了当前的 BoltServer,所以他俩是互相依赖的。关键点来了:

BoltServerProcessor 实现了 UserProcessor 接口,而 Bolt 的 RpcHandler 持有一个 Map<String, UserProcessor<?>>,所以,当 RpcHandler 被执行 channelRead 方法的时候,一定会根据接口名称找到对应的 UserProcessor,并执行他的 handlerRequest 方法。

那么,RpcHandler 是什么时候创建并放置到 RpcHandler 中的呢?

具体是这样的:在 server.start() 执行的时候,该方法会初始化 Netty 的 Server,在 SOFA 中,叫 RpcServer,将 BoltServerProcessor 放置到名叫 userProcessors 的 Map 中。然后,当 RpcServer 启动的时候,也就是 start 方法,会执行一个 init 方法,该方法内部就是设置 Netty 各种属性的地方,包括 Hander,其中有 2 行代码对我们很重要:

final RpcHandler rpcHandler = new RpcHandler(true, this.userProcessors);
pipeline.addLast("handler", rpcHandler);

创建了一个 RpcHandler,并添加到 pipeline 中,这个 Handler 的构造参数就是包含所有 BoltServerProcessor 的 Map。

所以,总的流程就是:

每个接口都会创建一个 providerConfig 对象,这个对象会创建对应的 invoker 对象(包含过滤器链),这两个对象都会放到 BoltServer 的 invokerMap 中,而 BoltServer 还包含其他对象,比如 BoltServerProcessor(继承 UserProcessor), RpcServer(依赖 RpcHandler)。当初始化 BoltServerProcessor 的时候,会传入 this(BoltServer),当初始化 RpcServer 的时候,会传入 BoltServerProcessor 到 RpcServer 的 Map 中。在 RpcHandler 初始化的时候,又会将 RpcServer 的 Map 传进自己的内部。完成最终的依赖。
当请求进入,RpcHandler 调用对应的 UserProcessor 的 handlerRequest 方法,而该方法中,会调用对应的 invoker,invoker 调用过滤器链,知道调用真正的实现类。

而大概的 UML 图就是下面这样的:

红色部分是 RPC 的核心,包含 Solt 的 Server,实现 UserProcessor 接口的 BoltServerProcessor,业务线程池,存储所有接口实现的 Map。

绿色部分是 Bolt 的接口和类,只要实现了 UserProcessor 接口,就能将具体实现替换,也既是处理具体数据的逻辑。

最后,看看关键类 BoltServerProcessor ,他是融合 RPC 和 Bolt 的胶水类。

该类会注册一个序列化器替代 Bolt 默认的。handleRequest 方法是这个类的核心方法。有很多逻辑,主要看这里:

// 查找服务
Invoker invoker = boltServer.findInvoker(serviceName);
// 真正调用
response = doInvoke(serviceName, invoker, request);

/**
 * 找到服务端Invoker
 *
 * @param serviceName 服务名
 * @return Invoker对象
 */
public Invoker findInvoker(String serviceName) {
    return invokerMap.get(serviceName);
}

根据服务名称,从 Map 中找到服务,然后调用 invoker 的 invoker 方法。

再看看 Netty 到 BoltServerProcessor 的 handlerRequest 的调用链,使用 IDEA 的 Hierarchy 功能,查看该方法,最后停留在 ProcessTast 中,一个 Runnable.

根据经验,这个类肯定是被放到线程池了。什么时候放的呢?看看他的构造方法的 Hierarchy。

从图中可以看到 ,Bolt 的 RpcHandler 的 channelRead 最终会调用 ProcessTask 的 构造方法。

那么 BoltServer 的用户线程池什么时候使用呢?还是使用 IDEA 的 Hierarchy 功能。

其实也是在这个过程中,当用户没有设置线程池,则使用系统线程池。

总结

好了,关于 SOFA 的服务发布和服务的接收过程,就介绍完了,可以说,整个框架还是非常轻量级的。基本操作就是:内部通过在 Netty的 Handler 中保存一个存储服务实现的 Map 完成远程调用。

其实和我们之前用 Netty 写的小 demo 类似。

原文地址:https://www.cnblogs.com/stateis0/p/8975294.html

时间: 2024-08-06 08:49:35

SOFA 源码分析 —— 服务发布过程的相关文章

SOFA 源码分析 —— 服务引用过程

前言 在前面的 SOFA 源码分析 -- 服务发布过程 文章中,我们分析了 SOFA 的服务发布过程,一个完整的 RPC 除了发布服务,当然还需要引用服务. So,今天就一起来看看 SOFA 是如何引用服务的.实际上,基础逻辑和我们之前用 Netty 写的 RPC 小 demo 类似.有兴趣可以看看这个 demo-- 自己用 Netty 实现一个简单的 RPC. 示例代码 ConsumerConfig<HelloService> consumerConfig = new ConsumerCon

MyBatis 源码分析 - 配置文件解析过程

* 本文速览 由于本篇文章篇幅比较大,所以这里拿出一节对本文进行快速概括.本篇文章对 MyBatis 配置文件中常用配置的解析过程进行了较为详细的介绍和分析,包括但不限于settings,typeAliases和typeHandlers等,本文的篇幅也主要在对这三个配置解析过程的分析上.下面,我们来一起看一下本篇文章的目录结构. 从目录上可以看出,2.3节.2.5节和2.8节的内容比较多.其中2.3节是关于settings配置解析过程的分析,除了对常规的 XML 解析过程分析,本节额外的分析了元

源码分析HotSpot GC过程(一)

«上一篇:源码分析HotSpot GC过程(一)»下一篇:源码分析HotSpot GC过程(三):TenuredGeneration的GC过程 原文地址:https://www.cnblogs.com/WCFGROUP/p/9743676.html

netty 5 alph1源码分析(服务端创建过程)

参照<Netty系列之Netty 服务端创建>,研究了netty的服务端创建过程.至于netty的优势,可以参照网络其他文章.<Netty系列之Netty 服务端创建>是 李林锋撰写的netty源码分析的一篇好文,绝对是技术干货.但抛开技术来说,也存在一些瑕疵. 缺点如下 代码衔接不连贯,上下不连贯. 代码片段是截图,对阅读代理不便(可能和阅读习惯有关) 本篇主要内容,参照<Netty系列之Netty 服务端创建>,梳理出自己喜欢的阅读风格. 1.整体逻辑图 整体将服务

Dubbo 源码分析 - 服务导出全过程解析

1.服务导出过程 本篇文章,我们来研究一下 Dubbo 导出服务的过程.Dubbo 服务导出过程始于 Spring 容器发布刷新事件,Dubbo 在接收到事件后,会立即执行服务导出逻辑.整个逻辑大致可分为三个部分,第一是前置工作,主要用于检查参数,组装 URL.第二是导出服务,包含导出服务到本地 (JVM),和导出服务到远程两个过程.第三是向注册中心注册服务,用于服务发现.本篇文章将会对这三个部分代码进行详细的分析,在分析之前,我们先来了解一下服务的导出过程. Dubbo 支持两种服务导出方式,

SOFA 源码分析 —— 过滤器设计

前言 通常 Web 服务器在处理请求时,都会使用过滤器模式,无论是 Tomcat ,还是 Netty,过滤器的好处是能够将处理的流程进行分离和解耦,比如一个 Http 请求进入服务器,可能需要解析 http 报头,权限验证,国际化处理等等,过滤器可以很好的将这些过程隔离,并且,过滤器可以随时卸载,安装. 每个 Web 服务器的过滤器思想都是类似的,只是实现方式略有不同. 比如 Tomcat,Tomcat 使用了一个 FilterChain 对象保存了所有的 filter,通过循环所有 filte

深入理解 spring 容器,源码分析加载过程

Spring框架提供了构建Web应用程序的全功能MVC模块,叫Spring MVC,通过Spring Core+Spring MVC即可搭建一套稳定的Java Web项目.本文通过Spring MVC源码分析介绍它的核心实现原理. Tomcat服务器启动入口文件是web.xml,通过在其中配置相关的Listener和Servlet即可加载Spring MVC所需数据.基于Spring MVC最简单的配置如下. <!-- 加载Spring配置文件 --> <context-param>

MapReduce阶段源码分析以及shuffle过程详解

MapReducer工作流程图: 1. MapReduce阶段源码分析 1)客户端提交源码分析 解释:   - 判断是否打印日志   - 判断是否使用新的API,检查连接   - 在检查连接时,检查输入输出路径,计算切片,将jar.配置文件复制到HDFS   - 计算切片时,计算最小切片数(默认为1,可自定义)和最大切片数(默认是long的最大值,可以自定义)   - 查看给定的是否是文件,如果是否目录计算目录下所有文件的切片   - 通过block大小和最小切片数.最大切片数计算出切片大小  

深入源码分析SpringMVC执行过程

本文主要讲解 SpringMVC 执行过程,并针对相关源码进行解析. 首先,让我们从 Spring MVC 的四大组件:前端控制器(DispatcherServlet).处理器映射器(HandlerMapping).处理器适配器(HandlerAdapter)以及视图解析器(ViewResolver) 的角度来看一下 Spring MVC 对用户请求的处理过程,过程如下图所示: SpringMVC 执行过程 用户请求发送到前端控制器 DispatcherServlet. 前端控制器 Dispat