带你手写基于 Spring 的可插拔式 RPC 框架(四)代理类的注入与服务启动

上一章节我们已经实现了从客户端往服务端发送数据并且通过反射方法调用服务端的实现类最后返回给客户端的底层协议。
这一章节我们来实现客户端代理类的注入。

承接上一章,我们实现了多个底层协议,procotol 有 netty,http,和 socket 三个实现类,每个实现类都有启动服务端和客户端发送数据两个方法。

问题

  1. 如何实现底层协议的选择那?
    可以通过配置文件来选择协议。
  2. 单独的配置文件还是和 Spring 的配置文件结合起来那?
    我们选择与 Spring 结合的配置文件,自定义一些属性的标签,这样能够更好的利用 Spring 的特性。

自定义 Spring 标签

先看整体的结构:

  1. 写一个 xsd 文件来自定义我们的标签和属性,注意 schema 的 xmlns 和
    targetNamespace 属性, http://paul.com/schema。

    <xsd:schema
             xmlns="http://paul.com/schema"
             xmlns:xsd="http://www.w3.org/2001/XMLSchema"
             targetNamespace="http://paul.com/schema">
         <xsd:complexType name="procotol-type">
             <xsd:attribute name="procotol" type="xsd:string">
                 <xsd:annotation>
                     <xsd:documentation><![CDATA[ The elementname1 name. ]]></xsd:documentation>
                 </xsd:annotation>
             </xsd:attribute>
             <xsd:attribute name="port" type="xsd:int">
                 <xsd:annotation>
                     <xsd:documentation><![CDATA[ The elementname1 age. ]]></xsd:documentation>
                 </xsd:annotation>
             </xsd:attribute>
             <xsd:attribute name="serialize" type="xsd:string">
             <xsd:annotation>
                 <xsd:documentation><![CDATA[ The elementname1 age. ]]></xsd:documentation>
             </xsd:annotation>
             </xsd:attribute>
             <xsd:attribute name="stragety" type="xsd:string">
                 <xsd:annotation>
                     <xsd:documentation><![CDATA[ The elementname1 age. ]]></xsd:documentation>
                 </xsd:annotation>
              </xsd:attribute>
             <xsd:attribute name="role" type="xsd:string">
                 <xsd:annotation>
                     <xsd:documentation><![CDATA[ The elementname1 age. ]]></xsd:documentation>
                 </xsd:annotation>
             </xsd:attribute>
             <xsd:attribute name="address" type="xsd:string">
                 <xsd:annotation>
                     <xsd:documentation><![CDATA[ The elementname1 age. ]]></xsd:documentation>
                 </xsd:annotation>
             </xsd:attribute>
         </xsd:complexType>
    
         <xsd:element name="procotol" type="procotol-type">
             <xsd:annotation>
                 <xsd:documentation><![CDATA[ elementname1的文档 ]]></xsd:documentation>
             </xsd:annotation>
         </xsd:element>
    
         <xsd:complexType name="application-type">
             <xsd:attribute name="name" type="xsd:string">
                 <xsd:annotation>
                     <xsd:documentation><![CDATA[ The elementname1 name. ]]></xsd:documentation>
                 </xsd:annotation>
             </xsd:attribute>
         </xsd:complexType>
    
         <xsd:element name="application" type="application-type">
             <xsd:annotation>
                 <xsd:documentation><![CDATA[ elementname1的文档 ]]></xsd:documentation>
             </xsd:annotation>
         </xsd:element>
    
         <xsd:complexType name="service-type">
             <xsd:attribute name="interfaces" type="xsd:string">
                 <xsd:annotation>
                     <xsd:documentation><![CDATA[ The elementname1 name. ]]></xsd:documentation>
                 </xsd:annotation>
             </xsd:attribute>
             <xsd:attribute name="ref" type="xsd:string">
                 <xsd:annotation>
                     <xsd:documentation><![CDATA[ The elementname1 name. ]]></xsd:documentation>
                 </xsd:annotation>
             </xsd:attribute>
             <xsd:attribute name="timeout" type="xsd:int">
                 <xsd:annotation>
                     <xsd:documentation><![CDATA[ The elementname1 age. ]]></xsd:documentation>
                 </xsd:annotation>
             </xsd:attribute>
         </xsd:complexType>
    
         <xsd:element name="service" type="service-type">
             <xsd:annotation>
                 <xsd:documentation><![CDATA[ elementname1的文档 ]]></xsd:documentation>
             </xsd:annotation>
         </xsd:element>
    
         <xsd:complexType name="provider-type">
             <xsd:attribute name="interf" type="xsd:string">
                 <xsd:annotation>
                     <xsd:documentation><![CDATA[ The elementname1 name. ]]></xsd:documentation>
                 </xsd:annotation>
             </xsd:attribute>
             <xsd:attribute name="impl" type="xsd:string">
                 <xsd:annotation>
                     <xsd:documentation><![CDATA[ The elementname1 name. ]]></xsd:documentation>
                 </xsd:annotation>
             </xsd:attribute>
         </xsd:complexType>
    
         <xsd:element name="provider" type="provider-type">
             <xsd:annotation>
                 <xsd:documentation><![CDATA[ elementname1的文档 ]]></xsd:documentation>
             </xsd:annotation>
         </xsd:element>
    
     </xsd:schema>
  2. 在自定义的 BeanDefinitionParser 来对我们自定义标签的属性进行解析。
    在 BeanDefinitionParser 里面我们可以使用 Spring 的一些组件,也可以只将我们自定的属性解析出来。parse 方法里面传入的两个参数,通过 element 可以获得 xml 中的属性信息,通过 parserContext 可以获取到 BeanDefinitionRegistry,熟悉 Spring 源码的同学应该知道这个类,我们可以通过这个类将我们的类注入到 Spring 容器中。
    构造方法中的 beanClass 我们可以传入自己定义的类,将解析出来的属性赋值到类的属性中。

    rpc:procotol 标签
    这个标签中包含了协议类型,端口,序列化协议,注册中心地址和角色(服务端还是客户端)。这个标签解析中我们将一些属性赋值到了 Configuration 配置类中,根据属性选择了协议类型,如果是客户端,提前初始化出 channel 保存到阻塞队列中,提高并发能力,如果是客户端则启动通信服务器。

    客户端 procotol 标签配置:

    <rpc:procotol procotol="Dubbo" port="3230" serialize="ProtoStuff" address="47.107.56.23:2181"/>

    服务端 procotol 标签配置:

    <rpc:procotol procotol="Dubbo" port="3230" serialize="ProtoStuff" role="provider" address="47.107.56.23:2181"/>

    对应的解析器。

    public class ProcotolBeanDefinitionParser implements BeanDefinitionParser {
    
         private final Class<?> beanClass;
    
         public ProcotolBeanDefinitionParser(Class<?> beanClass) {
             this.beanClass = beanClass;
         }
    
         @Override
         public BeanDefinition parse(Element element, ParserContext parserContext) {
             System.out.println("1");
             String pro = element.getAttribute("procotol");
             int port = Integer.parseInt(element.getAttribute("port"));
             Configuration.getInstance().setProcotol(pro);
             Configuration.getInstance().setPort(port);
             Configuration.getInstance().setSerialize(element.getAttribute("serialize"));
             Configuration.getInstance().setStragety(element.getAttribute("stragety"));
             Configuration.getInstance().setRole(element.getAttribute("role"));
             Configuration.getInstance().setAddress(element.getAttribute("address"));
             if("provider".equals(element.getAttribute("role"))){
                 Procotol procotol = null;
                     if("Dubbo".equalsIgnoreCase(pro)){
                         procotol = new DubboProcotol();
                     }else if("Http".equalsIgnoreCase(pro)){
                         procotol = new HttpProcotol();
                     }else if("Socket".equalsIgnoreCase(pro)){
                         procotol = new SocketProcotol();
                     }else{
                         procotol = new DubboProcotol();
                     }
    
                     try {
                         InetAddress addr = InetAddress.getLocalHost();
                         String ip = addr.getHostAddress();
                         if(port == 0){
                             port = 32115;
                         }
                         URL url = new URL(ip,port);
                         procotol.start(url);
    
                     } catch (Exception e) {
                         e.printStackTrace();
                     }
             }else{
                 //获取服务注册中心
                 ZookeeperRegisterCenter registerCenter4Consumer = ZookeeperRegisterCenter.getInstance();
                 //初始化服务提供者列表到本地缓存
                 registerCenter4Consumer.initProviderMap();
                 //初始化Netty Channel
                 Map<String, List<ServiceProvider>> providerMap = registerCenter4Consumer.getServiceMetaDataMap4Consumer();
                 if (MapUtils.isEmpty(providerMap)) {
                     throw new RuntimeException("service provider list is empty.");
                 }
                 NettyChannelPoolFactory.getInstance().initNettyChannelPoolFactory(providerMap);
             }
             return null;
         }
     }
    

    rpc:provider 标签,这个是服务端服务发布标签。通过这个标签表明服务端想要将哪些服务发布出来。

    <rpc:provider interf="com.paul.service.HelloService" impl="com.paul.service.HelloServiceImpl" />
    <rpc:provider interf="com.paul.service.UserService" impl="com.paul.service.UserServiceImpl" />

    对应的解析器:
    将需要暴露的服务注册中 zookeeper。

    public class ProviderBeanDefinitionParser implements BeanDefinitionParser {
    
         private final Class<?> beanClass;
    
         public ProviderBeanDefinitionParser(Class<?> beanClass) {
             this.beanClass = beanClass;
         }
         @Override
         public BeanDefinition parse(Element element, ParserContext parserContext) {
             System.out.println("15");
             String interfaces = element.getAttribute("interf");
             String impl = element.getAttribute("impl");
    
             int port = Configuration.getInstance().getPort();
             InetAddress addr = null;
             try {
                 addr = InetAddress.getLocalHost();
                 String ip = addr.getHostAddress();
                 if(port == 0) {
                     port = 32115;
                 }
                 List<ServiceProvider> providerList = new ArrayList<>();
                 ServiceProvider providerService = new ServiceProvider();
                 providerService.setProvider(Class.forName(interfaces));
                 providerService.setServiceObject(impl);
                 providerService.setIp(ip);
                 providerService.setPort(port);
                 providerService.setTimeout(5000);
                 providerService.setServiceMethod(null);
                 providerService.setApplicationName("");
                 providerService.setGroupName("nettyrpc");
                 providerList.add(providerService);
    
                 //注册到zk,元数据注册中心
                 RegisterCenter4Provider registerCenter4Provider = ZookeeperRegisterCenter.getInstance();
                 registerCenter4Provider.registerProvider(providerList);
    
             } catch (UnknownHostException e) {
                 e.printStackTrace();
             } catch (ClassNotFoundException e) {
                 e.printStackTrace();
             }
             return null;
         }
     }

    rpc:service 标签,这个标签表明客户端需要调用哪些服务端的接口,将对应的代理类注入到 Spring 中,在成需中可以直接使用 @Autowired 注入这个代理类,就可以像调用本地服务一样调用远程服务了。

    <rpc:service interfaces="com.paul.service.HelloService" ref="helloService" timeout="5000"/>

    对应的解析器:
    将接口的代理类注入到 Spring 中,并且将消费者也就是客户端注册到注册中心。

    public class ServiceBeanDefinitionParser implements BeanDefinitionParser {
    
         private final Class<?> beanClass;
    
         public ServiceBeanDefinitionParser(Class<?> beanClass) {
             this.beanClass = beanClass;
         }
    
         @Override
         public BeanDefinition parse(Element element, ParserContext parserContext) {
    
             String interfaces = element.getAttribute("interfaces");
             String ref = element.getAttribute("ref");
             Class clazz = null;
             try {
                 clazz = Class.forName(interfaces);
             } catch (ClassNotFoundException e) {
                 e.printStackTrace();
             }
             BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition(clazz);
             GenericBeanDefinition definition = (GenericBeanDefinition) builder.getRawBeanDefinition();
    
             definition.getConstructorArgumentValues().addGenericArgumentValue(clazz);
    
             definition.setBeanClass(ProxyFactory.class);
             definition.setAutowireMode(GenericBeanDefinition.AUTOWIRE_BY_TYPE);
    
             BeanDefinitionRegistry beanDefinitionRegistry = parserContext.getRegistry();
             beanDefinitionRegistry.registerBeanDefinition(ref,definition);
    
             //获取服务注册中心
             ZookeeperRegisterCenter registerCenter4Consumer = ZookeeperRegisterCenter.getInstance();
    
             //将消费者信息注册到注册中心
             ServiceConsumer invoker = new ServiceConsumer();
             List<ServiceConsumer> consumers = new ArrayList<>();
             consumers.add(invoker);
             invoker.setConsumer(clazz);
             invoker.setServiceObject(interfaces);
             invoker.setGroupName("");
             registerCenter4Consumer.registerConsumer(consumers);
    
             return definition;
         }
     }
  3. 定义一个 NamespaceHandler 来注册对应的标签和 BeanDefinitionParser。
    public class RpcNamespaceHandler extends NamespaceHandlerSupport {
         @Override
         public void init() {
             registerBeanDefinitionParser("procotol", new ProcotolBeanDefinitionParser(Configuration.class));
     //        registerBeanDefinitionParser("register", new RegisterBeanDefinitionParser(Configuration.class));
             registerBeanDefinitionParser("application", new ApplicationBeanDefinitionParser(Configuration.class));
             registerBeanDefinitionParser("provider", new ProviderBeanDefinitionParser(Configuration.class));
      //       registerBeanDefinitionParser("role", new ServerBeanDefinitionParser(Configuration.class));
             registerBeanDefinitionParser("service", new ServiceBeanDefinitionParser(Configuration.class));
         }
     }
  4. 在 Spring 中注册上面的 schema 和 handler。
    spring.handlers, 这里要将 schema 和我们自定义的 handler 类 mapping 起来。

    http\://paul.com/schema=com.paul.spring.RpcNamespaceHandler

    spring.schema,表明 xsd 文件的位置。

    http\://paul.com/schema/rpc.xsd=META-INF/rpc.xsd

    通过上面的配置我们实现了根据配置来做通信协议,序列化协议的选择以及客户端代理类注入到 Spring 中方便我们以后调用,还实现了服务端的启动,以及对应注册到注册中心的功能。

获取接口代理类的实现
我们使用的是 JDK 动态代理。
```java
public class ProxyFactory implements FactoryBean {
private Class interfaceClass;

    private ApplicationContext ctx;

    public ProxyFactory(Class<T> interfaceClass) {
        this.interfaceClass = interfaceClass;
    }

    @Override
    public T getObject() throws Exception {
        return (T)Proxy.newProxyInstance(interfaceClass.getClassLoader(), new Class[]{interfaceClass}, new Handler(interfaceClass));
    }

    @Override
    public Class<?> getObjectType() {
        return interfaceClass;
    }

}
```

Invocation 的实现类 handler, 也就是动态代理类的 invoke 方法的调用,通过 invoke 方法调用对应协议的 send 方法去发送数据。在发送数据前,通过负载均衡策略选择对应的服务端地址,拼装 RpcRequest 调用 proctol 接口实现类的 send 方法发送数据。
```java
public class Handler implements InvocationHandler{

private Class<T> interfaceClass;

public Handler(Class<T> interfaceClass) {
    this.interfaceClass = interfaceClass;
}

@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
    Configuration configuration = Configuration.getInstance();

    Procotol procotol;

    if("Dubbo".equalsIgnoreCase(configuration.getProcotol())){
        procotol = new DubboProcotol();
    }else if("Http".equalsIgnoreCase(configuration.getProcotol())){
        procotol = new HttpProcotol();
    }else if("Socket".equalsIgnoreCase(configuration.getProcotol())){
        procotol = new SocketProcotol();
    }else{
        procotol = new DubboProcotol();
    }

    //服务接口名称
    String serviceKey = interfaceClass.getName();
    //获取某个接口的服务提供者列表
    RegisterCenter4Consumer registerCenter4Consumer = ZookeeperRegisterCenter.getInstance();
    List<ServiceProvider> providerServices = registerCenter4Consumer.getServiceMetaDataMap4Consumer().get(serviceKey);
    //根据软负载策略,从服务提供者列表选取本次调用的服务提供者
    String stragety = configuration.getStragety();
    if(null == stragety || stragety == ""){
        stragety = "random";
    }
    System.out.println("paul:"+ providerServices.get(0).toString());
    LoadStrategy loadStrategyService = LoadBalanceEngine.queryLoadStrategy(stragety);
    ServiceProvider serviceProvider = loadStrategyService.select(providerServices);
    URL url = new URL(serviceProvider.getIp(),serviceProvider.getPort());
    String impl = serviceProvider.getServiceObject().toString();
    int timeout = 20000;
    RpcRequest invocation = new RpcRequest(UUID.randomUUID().toString(),interfaceClass.getName(),method.getName(),args, method.getParameterTypes(),impl,timeout);
    Object res = procotol.send(url, invocation);
    return res;
}

}

```

这样我们完成 rpc-spring 模块的代码。

原文地址:https://www.cnblogs.com/paulwang92115/p/11116948.html

时间: 2024-10-11 18:20:38

带你手写基于 Spring 的可插拔式 RPC 框架(四)代理类的注入与服务启动的相关文章

带你手写基于 Spring 的可插拔式 RPC 框架(二)整体结构

前言 上一篇文章中我们已经知道了什么是 RPC 框架和为什么要做一个 RPC 框架了,这一章我们来从宏观上分析,怎么来实现一个 RPC 框架,这个框架都有那些模块以及这些模块的作用. 总体设计 在我们的整个框架里比较重要的几个模块: rpc-procotol: 既然是可插拔是框架,我们需要支持选择底层协议,这部分是通信协议相关的模块. rpc-spring: 我们的框架是基于 spring 开发的,这个模块是将我们的一些功能和 spring 整合起来,比如自动注入代理 bean,启动服务端 se

带你手写基于 Spring 的可插拔式 RPC 框架(三)通信协议模块

在写代码之前我们先要想清楚几个问题. 我们的框架到底要实现什么功能? 我们要实现一个远程调用的 RPC 协议. 最终实现效果是什么样的? 我们能像调用本地服务一样调用远程的服务. 怎样实现上面的效果? 前面几章已经给大家说了,使用动态代理,在客户端生成接口代理类使用,在代理类的 invoke 方法里面将方法参数等信息组装成 request 发给服务端,服务端需要起一个服务器一直等待接收这种消息,接收之后使用反射调 用对应接口的实现类. 首先我们需要实现底层的通信的服务端和客户端,可以有一下几种实

带你手写基于 Spring 的可插拔式 RPC 框架(一)介绍

概述 首先这篇文章是要带大家来实现一个框架,听到框架大家可能会觉得非常高大上,其实这和我们平时写业务员代码没什么区别,但是框架是要给别人使用的,所以我们要换位思考,怎么才能让别人用着舒服,怎么样才能让我们的框架性能优异.通过自己写一个框架,我们能学到的有很多,能让我们脱离 CURD,在更高的层面上去思考. 目的 写这个框架最主要的目的是要让大家了解整个框架的设计思想和用到的技术,并不是让大家关注代码,当然我实现的代码一定不是完美的,还有很多需要改进的地方,希望大家不吝赐教,一起进步. 提前准备

带你手写基于 Spring 的可插拔式 RPC 框架(五)注册中心

注册中心代码使用 zookeeper 实现,通过图片来看看我们注册中心的架构. 首先说明, zookeeper 的实现思路和代码是参考架构探险这本书上的,另外在 github 和我前面配置文件中的 zookeeper 服务器是用的1个月免费适用的阿里云,大家也可以用它当测试用. 不多说,一次性给出注册中心全部代码. 客户端对应的注册中心接口 public interface RegisterCenter4Consumer { /** * 消费端初始化服务提供者信息本地缓存 */ public v

基于Spring开发的一个BIO-RPC框架(对小白很友好)

PART1:先来整体看下项目的构成 其中bio-rpc-core就是所谓的rpc框架 bio-rpc-example-client即所谓的服务调用方(你的项目中想要调用服务的地方) bio-rpc-example-server即所谓的服务提供方(你的项目中写好服务想要供别人调用的地方) github地址:https://github.com/Luyu05/BioRpcExample PART2:这个框架咋用? 服务使用方 1.首先,在想要调用服务的地方(bio-rpc-example-clien

手写MyBatis,纯手工打造开源框架(第四篇:决胜千里)- 第272篇

说明 MyBatis版本:3.5.1 相关历史文章(阅读本文之前,您可能需要先看下之前的系列) Spring Boot MyBatis最全教程:你值得拥有MyBatis能脱离Spring吗一图纵览MyBatis的工作原理从源码看MyBatis,竟如此简单MyBatis的Mapper是什么`垃圾` 手写MyBatis,纯手工打造开源框架(第一篇:风云再起) 手写MyBatis,纯手工打造开源框架(第二篇:君临天下) 手写MyBatis,纯手工打造开源框架(第三篇:运筹帷幄) 前言        运

携程系统架构师带你手写spring mvc,解读spring核心源码!

讲师简介: James老师 系统架构师.项目经理 十余年Java经验,曾就职于携程.人人网等一线互联网公司,专注于java领域,精通软件架构设计,对于高并发.高性能服务有深刻的见解, 在服务化基础架构和微服务技术有大量的建设和设计经验. 课程内容: 1.为什么读Spring源码? 如果你是一名JAVA开发人员,你一定用过Spring Framework. 作为一款非常经典的开源框架,从2004年发布的1.0版本到现在的5.0版本,经历了14年的洗礼, 持久不衰 与其说现在是JAVA的天下, 不如

手写基于Promise A+规范的Promise

const PENDING = 'pending';//初始态const FULFILLED = 'fulfilled';//初始态const REJECTED = 'rejected';//初始态function Promise(executor){ let self = this;//先缓存当前promise实例 self.status = PENDING;//设置状态 //定义存放成功的回调的数组 self.onResolvedCallbacks = []; //定义存放失败回调的数组 s

58同城架构师带你手写Kafka流处理技术

Kakfa介绍 Kafka是什么 Kafka最初是LinkedIn的内部内部基础设施系统.它被认为是一个流平台,在Kafka上可以发布和订阅流数据,并把它们保存起来.进行处理.但是我们在使用Kafka中,最多的就是将它作为一个消息系统使用,类似于ActiveMQ.RabbitMQ等.但是Kafka与这些传统的消息系统又有着许多的不同点,这些差异使它又不同于消息系统. Kafka是一个分布式系统,以集群(支持自由伸缩)的方式运行.(所以我们总称为分布式消息队列) Kafka可以用来存储数据,数据存