基于Netty的高性能JAVA的RPC框架

前言

今年7月份左右报名参加了阿里巴巴组织的高性能中间件挑战赛,这次比赛不像以往的比赛,是从一个工程的视角来比赛的。
这个比赛有两个赛题,第一题是实现一个RPC框架,第二道题是实现一个Mom消息中间件。
RPC题目如下

一个简单的RPC框架
RPC(Remote Procedure Call )——远程过程调用,它是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议。RPC协议假定某些传输协议的存在,如TCP或UDP,为通信程序之间携带信息数据。在OSI网络通信模型中,RPC跨越了传输层和应用层。RPC使得开发包括网络分布式多程序在内的应用程序更加容易。
框架——让编程人员便捷地使用框架所提供的功能,由于RPC的特性,聚焦于应用的分布式服务化开发,所以成为一个对开发人员无感知的接口代理,显然是RPC框架优秀的设计。
题目要求
1.要成为框架:对于框架的使用者,隐藏RPC实现。
2.网络模块可以自己编写,如果要使用IO框架,要求使用netty-4.0.23.Final。
3.支持异步调用,提供future、callback的能力。
4.能够传输基本类型、自定义业务类型、异常类型(要在客户端抛出)。
5.要处理超时场景,服务端处理时间较长时,客户端在指定时间内跳出本次调用。
6.提供RPC上下文,客户端可以透传数据给服务端。
7.提供Hook,让开发人员进行RPC层面的AOP。
注:为了降低第一题的难度,RPC框架不需要注册中心,客户端识别-DSIP的JVM参数来获取服务端IP。
衡量标准
满足所有要求。 性能测试。
测试时会运行rpc-use-demo中的测试用例,测试的demo包由测试工具做好。
参赛者必须以com.alibaba.middleware.race.rpc.api.impl.RpcConsumerImpl为全类名,继承com.alibaba.middleware.race.rpc.api.RpcConsumer,并覆写所有的public方法。
参赛者必须以com.alibaba.middleware.race.rpc.api.impl.RpcProviderImpl为全类名,继承com.alibaba.middleware.race.rpc.api.RpcProvider,并覆写所有的public方法。
参赛者依赖公共maven中心库上的三方包,即可看到一个示例的demo,按照对应的包名,在自己的工程中建立对应的类(包名、类名一致)。
三方库里的代码起到提示的作用,可以作为参考,不要在最终的pom中依赖。
所以最终参赛者需要打出一个rpc-api的jar包,供测试工程调用。 (注意,参考完rpc-api的示例后,请从pom依赖中将其删除,避免依赖冲突)
测试Demo工程请参考Taocode SVN上的代码。

RPC的实现

题目中推荐的网络框架使用Netty4来实现,这个RPC框架中需要实现的有

  1. RPC客户端
  2. RPC服务端

RPC客户端的实现

RPC客户端和RPC服务器端需要一个相同的接口类,RPC客户端通过一个代理类来调用RPC服务器端的函数

RpcConsumerImpl的实现
......
package com.alibaba.middleware.race.rpc.api.impl;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong;

import com.alibaba.middleware.race.rpc.aop.ConsumerHook;
import com.alibaba.middleware.race.rpc.api.RpcConsumer;
import com.alibaba.middleware.race.rpc.async.ResponseCallbackListener;
import com.alibaba.middleware.race.rpc.context.RpcContext;
import com.alibaba.middleware.race.rpc.model.RpcRequest;
import com.alibaba.middleware.race.rpc.model.RpcResponse;
import com.alibaba.middleware.race.rpc.netty.RpcConnection;
import com.alibaba.middleware.race.rpc.netty.RpcNettyConnection;
import com.alibaba.middleware.race.rpc.tool.Tool;

public class RpcConsumerImpl extends RpcConsumer implements InvocationHandler {

private static AtomicLong callTimes = new AtomicLong(0L);
private RpcConnection connection;
private List<RpcConnection> connection_list;
private Map<String,ResponseCallbackListener> asyncMethods;
private Class<?> interfaceClass;

private String version;

private int timeout;

private ConsumerHook hook;

public Class<?> getInterfaceClass() {
    return interfaceClass;
}
public String getVersion() {
    return version;
}
public int getTimeout() {
    this.connection.setTimeOut(timeout);
    return timeout;
}
public ConsumerHook getHook() {
    return hook;
}
RpcConnection select()
{
    //Random rd=new Random(System.currentTimeMillis());
    int d=(int) (callTimes.getAndIncrement()%(connection_list.size()+1));
    if(d==0)
        return connection;
    else
    {
        return connection_list.get(d-1);
    }
}
public RpcConsumerImpl()
{
    //String ip=System.getProperty("SIP");
    String ip="127.0.0.1";
    this.asyncMethods=new HashMap<String,ResponseCallbackListener>();
    this.connection=new RpcNettyConnection(ip,8888);
    this.connection.connect();
    connection_list=new ArrayList<RpcConnection>();
    int num=Runtime.getRuntime().availableProcessors()/3 -2;
    for (int i = 0; i < num; i++) {
        connection_list.add(new RpcNettyConnection(ip, 8888));
    }
    for (RpcConnection conn:connection_list)
    {
        conn.connect();
    }

}
public void destroy() throws Throwable {
    if (null != connection) {
        connection.close();
    }
}

@SuppressWarnings("unchecked")
public <T> T proxy(Class<T> interfaceClass) throws Throwable {
    if (!interfaceClass.isInterface()) {
        throw new IllegalArgumentException(interfaceClass.getName()
                + " is not an interface");
    }
    return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(),
            new Class<?>[] { interfaceClass }, this);
}
@Override
public RpcConsumer interfaceClass(Class<?> interfaceClass) {
    // TODO Auto-generated method stub
    this.interfaceClass=interfaceClass;
    return this;
}

@Override
public RpcConsumer version(String version) {
    // TODO Auto-generated method stub
    this.version=version;
    return this;
}

@Override
public RpcConsumer clientTimeout(int clientTimeout) {
    // TODO Auto-generated method stub
    this.timeout=clientTimeout;
    return this;
}

@Override
public RpcConsumer hook(ConsumerHook hook) {
    // TODO Auto-generated method stub
    this.hook=hook;
    return this;
}

@Override
public Object instance() {
    // TODO Auto-generated method stub
    try {
        return proxy(this.interfaceClass);
    }
    catch (Throwable e)
    {
        e.printStackTrace();
    }
    return null;
}

@Override
public void asynCall(String methodName) {
    // TODO Auto-generated method stub
     asynCall(methodName, null);
}

@Override
public <T extends ResponseCallbackListener> void asynCall(
        String methodName, T callbackListener) {

    this.asyncMethods.put(methodName, callbackListener);
    this.connection.setAsyncMethod(asyncMethods);

    for (RpcConnection conn:connection_list)
    {
        conn.setAsyncMethod(asyncMethods);
    }
}

@Override
public void cancelAsyn(String methodName) {
    // TODO Auto-generated method stub
    this.asyncMethods.remove(methodName);
    this.connection.setAsyncMethod(asyncMethods);
    for (RpcConnection conn:connection_list)
    {
        conn.setAsyncMethod(asyncMethods);
    }
}

@Override
public Object invoke(Object proxy, Method method, Object[] args)
        throws Throwable {
    // TODO Auto-generated method stub
    List<String> parameterTypes = new LinkedList<String>();
    for (Class<?> parameterType : method.getParameterTypes()) {
        parameterTypes.add(parameterType.getName());
    }
    RpcRequest request = new RpcRequest();
    request.setRequestId(UUID.randomUUID().toString());
    request.setClassName(method.getDeclaringClass().getName());
    request.setMethodName(method.getName());
    request.setParameterTypes(method.getParameterTypes());
    request.setParameters(args);
    if(hook!=null)
        hook.before(request);
    RpcResponse response = null;
    try
    {
        request.setContext(RpcContext.props);
        response = (RpcResponse) select().Send(request,asyncMethods.containsKey(request.getMethodName()));
        if(hook!=null)
            hook.after(request);

        if(!asyncMethods.containsKey(request.getMethodName())&&response.getExption()!=null)
        {

            Throwable e=(Throwable) Tool.deserialize(response.getExption(),response.getClazz());
            throw e.getCause();
        }
    }
    catch (Throwable t)
    {
        //t.printStackTrace();
        //throw new RuntimeException(t);
        throw t;
    }
    finally
    {

// if(asyncMethods.containsKey(request.getMethodName())&&asyncMethods.get(request.getMethodName())!=null)
// {
// cancelAsyn(request.getMethodName());
// }
}
if(response==null)
{
return null;
}
else if (response.getErrorMsg() != null)
{
throw response.getErrorMsg();
}
else
{
return response.getAppResponse();
}

}

}

RpcConsumer consumer;
consumer = (RpcConsumer) getConsumerImplClass().newInstance();
consumer.someMethod();123

因为consumer对象是通过代理生成的,所以当consumer调用的时候,就会调用invoke函数,我们就可以把这次本地的函数调用的信息通过网络发送到RPC服务器然后等待服务器返回的信息后再返回。

服务器实现

RPC服务器主要是在收到RPC客户端之后解析出RPC调用的接口名,函数名以及参数。

package com.alibaba.middleware.race.rpc.api.impl;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

import java.lang.reflect.Method;
import java.util.HashMap;
import java.util.Map;

import net.sf.cglib.reflect.FastClass;
import net.sf.cglib.reflect.FastMethod;
import com.alibaba.middleware.race.rpc.context.RpcContext;
import com.alibaba.middleware.race.rpc.model.RpcRequest;
import com.alibaba.middleware.race.rpc.model.RpcResponse;
import com.alibaba.middleware.race.rpc.serializer.KryoSerialization;
import com.alibaba.middleware.race.rpc.tool.ByteObjConverter;
import com.alibaba.middleware.race.rpc.tool.ReflectionCache;
import com.alibaba.middleware.race.rpc.tool.Tool;

/**

// Method method = ReflectionCache.getMethod(clazz.getName(),methodName, parameterTypes);
// method.setAccessible(true);

     //System.out.println(className+":"+methodName+":"+parameters.length);
     if(methodCacheName!=null&&methodCacheName.equals(request))
     {
         return methodCacheValue;
     }
     else
     {
         try
         {
             methodCacheName=request;
             if(methodMap.containsKey(methodName))
             {
                 methodCacheValue= methodMap.get(methodName).invoke(classimpl, parameters);
                 return methodCacheValue;
             }
             else
             {
                 FastClass serviceFastClass = FastClass.create(clazz);
                 FastMethod serviceFastMethod = serviceFastClass.getMethod(methodName, parameterTypes);
                 methodMap.put(methodName, serviceFastMethod);
                 methodCacheValue= serviceFastMethod.invoke(classimpl, parameters);
                 return methodCacheValue;
             }
             //return method.invoke(classimpl, parameters);
         }
         catch (Throwable e)
         {
             throw e.getCause();
         }
     }
 }
  private Map<String,FastMethod> methodMap=new HashMap<String, FastMethod>();
  @Override
  public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
    ctx.flush();
  }

  @Override
  public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception
  {
      //ctx.close();
      //cause.printStackTrace();
      ctx.close();
  }
}

handel函数通过Java的反射机制,找到要调用的接口类然后调用对应函数然后执行,然后返回结果到客户端,本次RPC调用结束。

RPC主要的实现类在我的github上可以看见,我的这套RPC框架虽说不上完美,但是性能还是挺好的在服务器上测试时TPC有9w+。
主要的优化就是使用Neety4这个框架以及对数据包的处理,数据序列化与反序列化的速度

基于Netty的高性能JAVA的RPC框架

原文地址:http://blog.51cto.com/13952955/2298477

时间: 2024-10-10 09:23:03

基于Netty的高性能JAVA的RPC框架的相关文章

谷歌发布的首款基于HTTP/2和protobuf的RPC框架:GRPC

Google 刚刚开源了grpc,  一个基于HTTP2 和 Protobuf 的高性能.开源.通用的RPC框架.Protobuf 本身虽然提供了RPC  的定义语法,但是一直以来,Google 只开源了Protobuf 序列化反序列化的代码,而没有开源RPC 的实现,于是存在着众多良莠不齐的第三方RPC 实现,不过我在项目中采用WCF搭配Protobuf是一个很不错的RPC实现,Google这个框架是是基于HTTP2的,这是他有特色的地方,带来诸如双向流.流控.头部压缩.单TCP连接上的多复用

基于Netty构建高性能的部标808协议的GPS服务器

使用Java语言开发一个高质量和高性能的jt808 协议的GPS通信服务器,并不是一件简单容易的事情,开发出来一段程序和能够承受数十万台车载接入是两码事,除去开发部标808协议的固有复杂性和几个月长周期的协议Bug调试,作为大批量794车载终端接入的服务端,需要能够处理网络的闪断.客户端的重连.安全认证和消息的编解码.半包处理等.如果没有足够的网络编程经验积累和深入了解部标808协议文档,自研的GPS服务器往往需要半年甚至数年的时间才能最终稳定下来,这种成本即便对一个大公司而言也是个严重的挑战.

Netty自娱自乐之类Dubbo RPC 框架设计构想 【上篇】

之前在前一篇的<Netty自娱自乐之协议栈设计>,菜鸟我已经自娱自乐了设计协议栈,gitHub地址为https://github.com/vOoT/ncustomer-protocal.先这一篇中,准备接着自娱去实现一个RPC框架,现在公司共的是Dubbo,那么先不看其代码,先自行实现一下吧. dubbo 包括 注册和服务调用,细节我们先不管,然后,我先先实现一个如下的简单模型 哈哈哈,第一个版本就是这么简单,粗暴.说到自定义配置,首先想到的是Spring 自定义标签,利用标签进行配置服务.而

JAVA中几种常用的RPC框架介绍

RPC是远程过程调用的简称,广泛应用在大规模分布式应用中,作用是有助于系统的垂直拆分,使系统更易拓展.Java中的RPC框架比较多,各有特色,广泛使用的有RMI.Hessian.Dubbo等.RPC还有一个特点就是能够跨语言,本文只以JAVA语言里的RPC为例. 对于RPC有一个逻辑关系图,以RMI为例: 其他的框架结构也类似,区别在于对象的序列化方法,传输对象的通讯协议,以及注册中心的管理与failover设计(利用zookeeper). 客户端和服务端可以运行在不同的JVM中,Client只

Java实现简单的RPC框架

一.RPC简介 RPC,全称为Remote Procedure Call,即远程过程调用,它是一个计算机通信协议.它允许像调用本地服务一样调用远程服务.它可以有不同的实现方式.如RMI(远程方法调用).Hessian.Http invoker等.另外,RPC是与语言无关的. RPC示意图 如上图所示,假设Computer1在调用sayHi()方法,对于Computer1而言调用sayHi()方法就像调用本地方法一样,调用 –>返回.但从后续调用可以看出Computer1调用的是Computer2

从零开始手写 dubbo rpc 框架

rpc rpc 是基于 netty 实现的 java rpc 框架,类似于 dubbo. 主要用于个人学习,由渐入深,理解 rpc 的底层实现原理. 前言 工作至今,接触 rpc 框架已经有很长时间. 但是对于其原理一直只是知道个大概,从来没有深入学习过. 以前一直想写,但由于各种原因被耽搁. 技术准备 Java 并发实战学习 TCP/IP 协议学习笔记 Netty 权威指南学习 这些技术的准备阶段,花费了比较长的时间. 也建议想写 rpc 框架的有相关的知识储备. 其他 rpc 框架使用的经验

Apache thrift - 使用,内部实现及构建一个可扩展的RPC框架

本文首先介绍了什么是Apache Thrift,接着介绍了Thrift的安装部署及如何利用Thrift来实现一个简单的RPC应用,并简单的探究了一下Thrift的内部实现原理,最后给出一个基于Thrift的可扩展的分布式RPC调用框架,在中小型项目中是一个常见的SOA实践. Thrift介绍 Apache Thrift是Facebook 开发的远程服务调用框架,它采用接口描述语言(IDL)定义并创建服务,支持可扩展的跨语言服务开发,所包含的代码生成引擎可以在多种语言中,如 C++, Java,

venus java高并发框架

http://www.iteye.com/topic/1118484 因为有 netty.mima等远程框架.包括spring jboss等remoting框架 和阿里的dubbo相比, 没有亮点... dubbo都要出2.0.9了. 监控系统界面都出来了..重要是dubbo是SOA的思想, 而且提供了很多自治的一些功能. 还在阿里内部大范围使用(久经考验, 最近的阿里社区网站也是用dubbo做的)(http://code.alibabatech.com/wiki/display/dubbo/H

基于netty轻量的高性能分布式RPC服务框架forest&lt;下篇&gt;

基于netty轻量的高性能分布式RPC服务框架forest<上篇> 文章已经简单介绍了forest的快速入门,本文旨在介绍forest用户指南. 基本介绍 Forest是一套基于java开发的RPC框架,除了常规的点对点调用外,Motan还提供服务治理功能,包括服务节点的自动发现.摘除.高可用和负载均衡等. 架构概述 Forest中分为服务提供方(RPC Server),服务调用方(RPC Client)和服务注册中心(Registry)三个角色. Server提供服务,向Registry注册