徒手撸框架--实现 RPC 远程调用

微服务,已经是每个互联网开发者必须掌握的一项技术。而 RPC 框架,是构成微服务最重要的组成部分之一。趁最近有时间。又看了看 dubbo 的源码。dubbo 为了做到灵活和解耦,使用了大量的设计模式和 SPI机制,要看懂 dubbo 的代码也不太容易。

按照《徒手撸框架》系列文章的套路,我还是会极简的实现一个 RPC 框架。帮助大家理解 RPC 框架的原理。

广义的来讲一个完整的 RPC 包含了很多组件,包括服务发现,服务治理,远程调用,调用链分析,网关等等。我将会慢慢的实现这些功能,这篇文章主要先讲解的是 RPC 的基石,远程调用 的实现。

相信,读完这篇文章你也一定可以自己实现一个可以提供 RPC 调用的框架。

  1. RPC 的调用过程
    通过一图我们来了解一下 RPC 的调用过程,从宏观上来看看到底一次 RPC 调用经过些什么过程。

当一次调用开始:

client 会调用本地动态代理 proxy
这个代理会将调用通过协议转序列化字节流
通过 netty 网络框架,将字节流发送到服务端
服务端在受到这个字节流后,会根据协议,反序列化为原始的调用,利用反射原理调用服务方提供的方法
如果请求有返回值,又需要把结果根据协议序列化后,再通过 netty 返回给调用方

  1. 框架概览和技术选型
    看一看框架的组件:

clinet就是调用方。servive是服务的提供者。protocol包定义了通信协议。common包含了通用的一些逻辑组件。

技术选型项目使用 maven 作为包管理工具,json 作为序列化协议,使用spring boot管理对象的生命周期,netty作为 nio 的网路组件。所以要阅读这篇文章,你需要对spring boot和netty有基本的了解。

下面就看看每个组件的具体实现:

  1. protocol
    其实作为 RPC 的协议,需要考虑只有一个问题–就是怎么把一次方法的调用,变成能够被网络传输的字节流。

首先我们需要定义方法的调用和返回两个实体:

请求:

@Data
public class RpcRequest {
// 调用编号
private String requestId;
// 类名
private String className;
// 方法名
private String methodName;
// 请求参数的数据类型
private Class<?>[] parameterTypes;
// 请求的参数
private Object[] parameters;
}
结果:

@Data
public class RpcResponse {
// 调用编号
private String requestId;
// 抛出的异常
private Throwable throwable;
// 返回结果
private Object result;
}
确定了,需要序列化的对象,就要确定序列化的协议,实现两个方法,序列化和反序列化两个方法。

public interface Serialization {
<T> byte[] serialize(T obj);
<T> T deSerialize(byte[] data,Class<T> clz);
}
可选用的序列化的协议很多比如:

jdk 的序列化方法。(不推荐,不利于之后的跨语言调用)
json 可读性强,但是序列化速度慢,体积大。
protobuf,kyro,Hessian 等都是优秀的序列化框架,也可按需选择。
为了简单和便于调试,我们就选择 json 作为序列化协议,使用jackson作为 json 解析框架。

/**

public class RpcDecoder extends ByteToMessageDecoder {
private Class<?> clz;
private Serialization serialization;
public RpcDecoder(Class<?> clz,Serialization serialization){
this.clz = clz;
this.serialization = serialization;
br/>}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
if(in.readableBytes() < 4){
return;
}
in.markReaderIndex();
int dataLength = in.readInt();
if (in.readableBytes() < dataLength) {
in.resetReaderIndex();
return;
}
byte[] data = new byte[dataLength];
in.readBytes(data);
Object obj = serialization.deSerialize(data, clz);
out.add(obj);
}
}
public class RpcEncoder extends MessageToByteEncoder {
private Class<?> clz;
private Serialization serialization;
public RpcEncoder(Class<?> clz, Serialization serialization){
this.clz = clz;
this.serialization = serialization;
br/>}
@Override
protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception {
if(clz != null){
byte[] bytes = serialization.serialize(msg);
out.writeInt(bytes.length);
out.writeBytes(bytes);
}
}
}
至此,protocol 就实现了,我们就可以把方法的调用和结果的返回,转换为一串可以在网络中传输的 byte[] 数组了。

  1. server
    server 是负责处理客户端请求的组件。在互联网高并发的环境下,使用 Nio 非阻塞的方式可以相对轻松的应付高并发的场景。netty 是一个优秀的 Nio 处理框架。Server 的关键代码如下:

netty 是基于 Recotr 模型的。所以需要初始化两组线程 boss 和 worker 。boss 负责分发请求,worker 负责执行相应的 handler:

@Bean
public ServerBootstrap serverBootstrap() throws InterruptedException {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup(), workerGroup())
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.DEBUG))
.childHandler(serverInitializer);
Map<ChannelOption<?>, Object> tcpChannelOptions = tcpChannelOptions();
Set<ChannelOption<?>> keySet = tcpChannelOptions.keySet();
for (@SuppressWarnings("rawtypes") ChannelOption option : keySet) {
serverBootstrap.option(option, tcpChannelOptions.get(option));
}
return serverBootstrap;
}
netty 的操作是基于 pipeline 的。所以我们需要把在 protocol 实现的几个 coder 注册到 netty 的 pipeline 中。

ChannelPipeline pipeline = ch.pipeline();
// 处理 tcp 请求中粘包的 coder,具体作用可以自行 google
pipeline.addLast(new LengthFieldBasedFrameDecoder(65535,0,4));
// protocol 中实现的 序列化和反序列化 coder
pipeline.addLast(new RpcEncoder(RpcResponse.class,new JsonSerialization()));
pipeline.addLast(new RpcDecoder(RpcRequest.class,new JsonSerialization()));
// 具体处理请求的 handler 下文具体解释
pipeline.addLast(serverHandler);
实现具体的 ServerHandler 用于处理真正的调用。
ServerHandler 继承 SimpleChannelInboundHandler<RpcRequest>。简单来说这个 InboundHandler 会在数据被接受时或者对于的 Channel 的状态发生变化的时候被调用。当这个 handler 读取数据的时候方法 channelRead0() 会被用,所以我们就重写这个方法就够了。

@Override
protected void channelRead0(ChannelHandlerContext ctx, RpcRequest msg) throws Exception {
RpcResponse rpcResponse = new RpcResponse();
rpcResponse.setRequestId(msg.getRequestId());
try{
// 收到请求后开始处理请求
Object handler = handler(msg);
rpcResponse.setResult(handler);
}catch (Throwable throwable){
// 如果抛出异常也将异常存入 response 中
rpcResponse.setThrowable(throwable);
throwable.printStackTrace();
}
// 操作完以后写入 netty 的上下文中。netty 自己处理返回值。
ctx.writeAndFlush(rpcResponse);
}
handler(msg) 实际上使用的是 cglib 的 Fastclass 实现的,其实根本原理,还是反射。学好 java 中的反射真的可以为所欲为。

private Object handler(RpcRequest request) throws Throwable {
Class<?> clz = Class.forName(request.getClassName());
Object serviceBean = applicationContext.getBean(clz);
Class<?> serviceClass = serviceBean.getClass();
String methodName = request.getMethodName();
Class<?>[] parameterTypes = request.getParameterTypes();
Object[] parameters = request.getParameters();
// 根本思路还是获取类名和方法名,利用反射实现调用
FastClass fastClass = FastClass.create(serviceClass);
FastMethod fastMethod = fastClass.getMethod(methodName,parameterTypes);
// 实际调用发生的地方
return fastMethod.invoke(serviceBean,parameters);
}
总体上来看,server 的实现不是很困难。核心的知识点是 netty 的 channel 的使用和 cglib 的反射机制。

  1. client
    future

其实,对于我来说,client 的实现难度,远远大于 server 的实现。netty 是一个异步框架,所有的返回都是基于 Future 和 Callback 的机制。

所以在阅读以下文字前强烈推荐,我之前写的一篇文章 Future 研究。利用经典的 wite 和 notify 机制,实现异步的获取请求的结果。

/**

  • @author zhengxin
    */
    public class DefaultFuture {
    private RpcResponse rpcResponse;
    private volatile boolean isSucceed = false;
    private final Object object = new Object();
    public RpcResponse getResponse(int timeout){
    synchronized (object){
    while (!isSucceed){
    try {
    //wait
    object.wait(timeout);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    }
    return rpcResponse;
    }
    }
    public void setResponse(RpcResponse response){
    if(isSucceed){
    return;
    }
    synchronized (object) {
    this.rpcResponse = response;
    this.isSucceed = true;
    //notiy
    object.notify();
    }
    }
    }
    复用资源

为了能够提升 client 的吞吐量,可提供的思路有以下几种:

使用对象池:建立多个 client 以后保存在对象池中。但是代码的复杂度和维护 client 的成本会很高。
尽可能的复用 netty 中的 channel。
之前你可能注意到,为什么要在 RpcRequest 和 RpcResponse 中增加一个 ID。因为 netty 中的 channel 是会被多个线程使用的。当一个结果异步的返回后,你并不知道是哪个线程返回的。这个时候就可以考虑利用一个 Map,建立一个 ID 和 Future 映射。这样请求的线程只要使用对应的 ID 就能获取,相应的返回结果。

/**

使用 Transporters 对请求进行封装。

public class Transporters {
public static RpcResponse send(RpcRequest request){
NettyClient nettyClient = new NettyClient("127.0.0.1", 8080);
nettyClient.connect(nettyClient.getInetSocketAddress());
RpcResponse send = nettyClient.send(request);
return send;
}
}
动态代理的实现

动态代理技术最广为人知的应用,应该就是 Spring Aop,面向切面的编程实现。动态的在原有方法Before 或者 After 添加代码。而 RPC 框架中动态代理的作用就是彻底替换原有方法,直接调用远程方法。

代理工厂类:

public class ProxyFactory {br/>@SuppressWarnings("unchecked")
public static <T> T create(Class<T> interfaceClass){
return (T) Proxy.newProxyInstance(
interfaceClass.getClassLoader(),
new Class<?>[]{interfaceClass},
new RpcInvoker<T>(interfaceClass)
);
}
}
当 proxyFactory 生成的类被调用的时候,就会执行 RpcInvoker 方法。

public class RpcInvoker<T> implements InvocationHandler {
private Class<T> clz;
public RpcInvoker(Class<T> clz){
this.clz = clz;
br/>}
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
RpcRequest request = new RpcRequest();
String requestId = UUID.randomUUID().toString();
String className = method.getDeclaringClass().getName();
String methodName = method.getName();
Class<?>[] parameterTypes = method.getParameterTypes();
request.setRequestId(requestId);
request.setClassName(className);
request.setMethodName(methodName);
request.setParameterTypes(parameterTypes);
request.setParameters(args);
return Transporters.send(request).getResult();
}
}
看到这个 invoke 方法,主要三个作用,

生成 RequestId。
拼装 RpcRequest。
调用 Transports 发送请求,获取结果。
至此终于,整个调用链完整了。我们终于完成了一次 RPC 调用。

与 Spring 集成

为了使我们的 client 能够易于使用我们需要考虑,定义一个自定义注解 @RpcInterface 当我们的项目接入 Spring 以后,Spring 扫描到这个注解之后,自动的通过我们的 ProxyFactory 创建代理对象,并存放在 spring 的 applicationContext 中。这样我们就可以通过 @Autowired 注解直接注入使用了。

@Target({ElementType.TYPE})br/>@Retention(RetentionPolicy.RUNTIME)
public @interface RpcInterface {
}

@Configurationbr/>@Slf4j
public class RpcConfig implements ApplicationContextAware,InitializingBean {
private ApplicationContext applicationContext;
br/>@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
br/>}
@Override
public void afterPropertiesSet() throws Exception {
Reflections reflections = new Reflections("com.xilidou");
DefaultListableBeanFactory beanFactory = (DefaultListableBeanFactory) applicationContext.getAutowireCapableBeanFactory();
// 获取 @RpcInterfac 标注的接口
Set<Class<?>> typesAnnotatedWith = reflections.getTypesAnnotatedWith(RpcInterface.class);
for (Class<?> aClass : typesAnnotatedWith) {
// 创建代理对象,并注册到 spring 上下文。
beanFactory.registerSingleton(aClass.getSimpleName(),ProxyFactory.create(aClass));
}
log.info("afterPropertiesSet is {}",typesAnnotatedWith);
}
}
终于我们最简单的 RPC 框架就开发完了。下面可以测试一下。

  1. Demo
    api

@RpcInterface
public interface IHelloService {
String sayHi(String name);
}
server

IHelloSerivce 的实现:

@Servicebr/>@Slf4j
public class TestServiceImpl implements IHelloService {
br/>@Override
public String sayHi(String name) {
log.info(name);
return "Hello " + name;
}
}
启动服务:

@SpringBootApplication
public class Application {
public static void main(String[] args) throws InterruptedException {
ConfigurableApplicationContext context = SpringApplication.run(Application.class);
TcpService tcpService = context.getBean(TcpService.class);
tcpService.start();
}
}
`
client

@SpringBootApplication()
public class ClientApplication {
public static void main(String[] args) {
ConfigurableApplicationContext context = SpringApplication.run(ClientApplication.class);
IHelloService helloService = context.getBean(IHelloService.class);
System.out.println(helloService.sayHi("doudou"));
}
}
运行以后输出的结果:

Hello doudou

总结
终于我们实现了一个最简版的 RPC 远程调用的模块。

徒手撸框架--实现 RPC 远程调用

原文地址:http://blog.51cto.com/13952955/2287615

时间: 2024-07-30 10:05:02

徒手撸框架--实现 RPC 远程调用的相关文章

测试JSON RPC远程调用(JSON客户端)

#include <string> #include <iostream> #include <curl/curl.h> /* 标题:JSon客户端 Author: Kagula LastUpdateDate:2014-05-17 描述:测试JSON RPC远程调用 测试环境:Windows 8.1.Visual Studio 2013 SP1 curl-7.36.0 CPPCMS 1.0.4(JSON服务端) Java Servlet (JSON服务端) */ sta

測试JSON RPC远程调用(JSONclient)

#include <string> #include <iostream> #include <curl/curl.h> /* 标题:JSonclient Author: Kagula LastUpdateDate:2014-05-17 描写叙述:測试JSON RPC远程调用 測试环境:Windows 8.1.Visual Studio 2013 SP1 curl-7.36.0 CPPCMS 1.0.4(JSON服务端) Java Servlet (JSON服务端) *

rpc远程调用开发

RPC即远程过程调用,适用于集群管理,集群节点就是RPCServer,而我们发起远程调用的web服务器就是RPCClient.所以是少数rpcClient(可能一个)对多个RPCServer(集群节点). 今天讲述的RPC开发希望实现这样一个效果,在RPCClient上(也就是web服务器)执行一条shell命令,要求指定的远程主机执行指定的命令.命令的格式如下 rpc_client address command 比如 ./ssan_client 192.168.1.1 vmstat 希望这条

dubbo集成zookeeper rpc远程调用

注:下面使用dubbo依赖的是zookeeper注册中心,这里没有详细的介绍.在配置之前,请自行准备好zookeeper环境. 后续如果写zookeeper的配置会补放链接 添加Gradle依赖 compile group: 'com.alibaba', name: 'dubbo', version: '2.5.10'//dubbo compile group: 'org.apache.zookeeper', name: 'zookeeper', version: '3.3.3'//zookee

RPC远程调用框架rsf和dubbo

1.rsf(Remote service framework)框架整体的架构 思考点: 1.注册中心使用的zookeeper,多机房部署,各注册中心要求数据一致,如何在一个节点发生异常情况下,不影响其他节点? 服务发现模块会定时的将最新的服务提供方列表刷新到注册中心,如PUMP定时的将提供方的接口列表写入到注册中心.注册中心考虑到 ZK 的优势.局限和 Redis 优势,通过 Pump 定时批量刷新数据到 ZK 集群,减少 ZK 写入压力:通过 Redis 集群管理提供方上下线,由 Pump 订

RPC远程调用概念 &amp;amp;&amp;amp; demo实例

RPC是指远程过程调用,直观说法就是A通过网络调用B的过程方法. 也就是说两台serverA.B,一个应用部署在Aserver上,想要调用Bserver上应用提供的函数/方法,因为不在一个内存空间,不能直接调用.须要通过网络来表达调用的语义和传达调用的数据. 为什么RPC呢?就是无法在一个进程内,甚至一个计算机内通过本地调用的方式完毕的需求,比方比方不同的系统间的通讯,甚至不同的组织间的通讯.因为计算能力须要横向扩展.须要在多台机器组成的集群上部署应用 首先要解决寻址的问题,也就是说,Aserv

RPC远程调用概念 &amp;&amp; demo实例

RPC是指远程过程调用,直观说法就是A通过网络调用B的过程方法.也就是说两台服务器A,B,一个应用部署在A服务器上,想要调用B服务器上应用提供的函数/方法,由于不在一个内存空间,不能直接调用,需要通过网络来表达调用的语义和传达调用的数据. 为什么RPC呢?就是无法在一个进程内,甚至一个计算机内通过本地调用的方式完成的需求,比如比如不同的系统间的通讯,甚至不同的组织间的通讯.由于计算能力需要横向扩展,需要在多台机器组成的集群上部署应用 首先要解决寻址的问题,也就是说,A服务器上的应用怎么告诉底层的

手动实现RPC远程调用

java中的RPC核心思想就是:socket编程.传输Object.动态代理 package com.lala.rpc; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.lang.reflect.InvocationHandler; import java.lang.reflect.Method; import java.lang.reflect.Proxy; import java

[转]Linux下C语言-RPC远程调用编程rpcgen用法

在查看libc6-dev软件包提供的工具(用 dpkg -L libc6-dev 命令)的时候,发现此软件包提供了一个有用的工具rpcgen命令.通过rpcgen的man手册看到此工具的作用是把RPC源程序编译成C语言源程序,从而轻松实现远程过程调用.下面的例子程序的作用是客户端程序取中心服务器上时间的,编程过程如下:先编写一个 “ RPC 语言 ” ( RPC Language ( Remote Procedure Call Language ) ) 的源文件 test.x ,文件后缀名为 x