【原创】自己动手实现RPC服务调用框架

自己动手实现rpc服务调用框架

本文利用java自带的socket编程实现了一个简单的rpc调用框架,由两个工程组成分别名为battercake-provider(服务提供者)、battercake-consumer(服务调用者)。

服务提供者

本部分的工程为battercake-provider,项目结构图如下图所示

先上rpc框架调用部分的代码,RpcProvider,该部分代码可以总结为两步

  1. 将需要发布的服务存储在一个内存变量serviceList中
  2. 启动socket,server.accept()方法阻塞在那,监听输入
  3. 针对每一个请求,单独启动一个线程处理
package com.rjzheng.rpc;

import java.net.ServerSocket;
import java.net.Socket;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
/**
 * RPC服务提供器
 * @author zhengrongjun
 *
 */
public class RpcProvider {

    //存储注册的服务列表
    private static List<Object> serviceList;

    /**
     * 发布rpc服务
     * @param object
     * @param port
     * @throws Exception
     */
    public static void export(int port,Object... services) throws Exception {
        serviceList=Arrays.asList(services);
        ServerSocket server = new ServerSocket(port);
        Socket client = null;
        while (true) {
            //阻塞等待输入
            client = server.accept();
            //每一个请求,启动一个线程处理
            new Thread(new ServerThread(client,serviceList)).start();
        }
    }
}

接下来ServerThread线程处理类的代码,ServerThread主要做以下几个步骤

  1. 读取客户端发送的服务名
  2. 判断服务是否发布
  3. 如果发布,则走反射逻辑,动态调用,返回结果
  4. 如果未发布,则返回提示通知
package com.rjzheng.rpc;

import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.Method;
import java.net.Socket;
import java.util.List;

public class ServerThread implements Runnable {

    private Socket client = null;

    private List<Object> serviceList = null;

    public ServerThread(Socket client, List<Object> service) {
        this.client = client;
        this.serviceList = service;
    }

    @Override
    public void run() {
        ObjectInputStream input = null;
        ObjectOutputStream output = null;
        try {
            input = new ObjectInputStream(client.getInputStream());
            output = new ObjectOutputStream(client.getOutputStream());
            // 读取客户端要访问那个service
            Class serviceClass = (Class) input.readObject();
            // 找到该服务类
            Object obj = findService(serviceClass);
            if (obj == null) {
                output.writeObject(serviceClass.getName() + "服务未发现");
            } else {
                //利用反射调用该方法,返回结果
                try {
                    String methodName = input.readUTF();
                    Class<?>[] parameterTypes = (Class<?>[]) input.readObject();
                    Object[] arguments = (Object[]) input.readObject();
                    Method method = obj.getClass().getMethod(methodName, parameterTypes);
                    Object result = method.invoke(obj, arguments);
                    output.writeObject(result);
                } catch (Throwable t) {
                    output.writeObject(t);
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                client.close();
                input.close();
                output.close();
            } catch (IOException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }

    }

    private Object findService(Class serviceClass) {
        // TODO Auto-generated method stub
        for (Object obj : serviceList) {
            boolean isFather = serviceClass.isAssignableFrom(obj.getClass());
            if (isFather) {
                return obj;
            }
        }
        return null;
    }

}

接下来是使用的部分

先创建一个微服务,接口如下

package com.rjzheng.service;

public interface BatterCakeService {
    /**
     * 卖煎饼的服务
     * @param name
     * @return
     */
    public String sellBatterCake(String name);
}

实现类如下

package com.rjzheng.service.impl;

import com.rjzheng.service.BatterCakeService;

public class BatterCakeServiceImpl implements BatterCakeService {

    @Override
    public String sellBatterCake(String name) {
        // TODO Auto-generated method stub
        return name+"煎饼,卖的特别好";
    }

}

接下来就是发布服务

package com.rjzheng.start;

import com.rjzheng.rpc.RpcProvider;
import com.rjzheng.service.BatterCakeService;
import com.rjzheng.service.impl.BatterCakeServiceImpl;

public class RpcBootStrap {
    public static void main(String[] args) throws Exception {
        BatterCakeService batterCakeService =new BatterCakeServiceImpl();
        //发布卖煎饼的服务,注册在20006端口
        RpcProvider.export(20006,batterCakeService);
    }
}

服务消费者

本部分的工程为battercake-consumer,项目结构图如下图所示

先上rpc框架调用部分的代码RpcConsumer,步骤分两步

  1. 封装一个代理类处理器
  2. 返回service的代理类对象
package com.rjzheng.rpc;

import java.lang.reflect.Proxy;

public class RpcConsumer {

    public static <T> T getService(Class<T> clazz,String ip,int port) {
        ProxyHandler proxyHandler =new ProxyHandler(ip,port);
        return (T)Proxy.newProxyInstance(RpcConsumer.class.getClassLoader(), new Class<?>[] {clazz}, proxyHandler);
    }
}

接下来上代理类处理器的代码,代理类处理步骤分以下几步

  1. 建立socket连接
  2. 封装请求数据,发送给服务提供者
  3. 返回结果
package com.rjzheng.rpc;

import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.net.Socket;

import com.rjzheng.service.BatterCakeService;

public class ProxyHandler implements InvocationHandler {

    private String ip;
    private int port;

    public ProxyHandler(String ip, int port) {
        // TODO Auto-generated constructor stub
        this.ip = ip;
        this.port = port;
    }

    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        // TODO Auto-generated method stub
        Socket socket = new Socket(this.ip, this.port);
        ObjectOutputStream output = new ObjectOutputStream(socket.getOutputStream());
        ObjectInputStream input = new ObjectInputStream(socket.getInputStream());
        try {
            output.writeObject(BatterCakeService.class);
            output.writeUTF(method.getName());
            output.writeObject(method.getParameterTypes());
            output.writeObject(args);
            output.flush();
            Object result = input.readObject();
            if(result instanceof Throwable) {
                throw (Throwable) result;
            }
                return result;
        } finally {
            socket.shutdownOutput();
        }
    }

}

接下来建立一个测试类RpcTest如下(跑该测试类前,记得运行在battercake-provider端的RpcBootstrap类发布BatterCakeService服务)

package com.rjzheng.start;

import com.rjzheng.rpc.RpcConsumer;
import com.rjzheng.service.BatterCakeService;
public class RpcTest {

    public static void main(String[] args) {
        BatterCakeService batterCakeService=RpcConsumer.getService(BatterCakeService.class, "127.0.0.1", 20006);
        String result=batterCakeService.sellBatterCake("双蛋");
        System.out.println(result);
    }
}

输出结果如下

双蛋煎饼,卖的特别好

至此,我们就实现了一个简易的rpc服务调用框架

原文地址:https://www.cnblogs.com/rjzheng/p/8798556.html

时间: 2024-10-08 04:27:27

【原创】自己动手实现RPC服务调用框架的相关文章

服务调用框架DataStrom

根据以前的命名服务,从新构建了下服务框架: 结构模式:c-center-s; 1.服务端: 服务端启动,讲自己的IP,端口注册到注册中心节点(master),然后注册自己的处理类(需要继承对应接口); 同时需要设置服务类型(是否是主从服务,如果是主从服务还需要设置自己是否是master); 如果不是主从服务则中心采用hash负载均衡进行服务调度: 同时有心跳给注册中心: 2.注册中心 启动注册中心,注册中心是主从方式存在,启动时选举自己为master;然后接收其余注册中心回应,2秒没有收到则认为

RPC远程调用框架rsf和dubbo

1.rsf(Remote service framework)框架整体的架构 思考点: 1.注册中心使用的zookeeper,多机房部署,各注册中心要求数据一致,如何在一个节点发生异常情况下,不影响其他节点? 服务发现模块会定时的将最新的服务提供方列表刷新到注册中心,如PUMP定时的将提供方的接口列表写入到注册中心.注册中心考虑到 ZK 的优势.局限和 Redis 优势,通过 Pump 定时批量刷新数据到 ZK 集群,减少 ZK 写入压力:通过 Redis 集群管理提供方上下线,由 Pump 订

Thrift 个人实战--Thrift RPC服务框架日志的优化

前言: Thrift作为Facebook开源的RPC框架, 通过IDL中间语言, 并借助代码生成引擎生成各种主流语言的rpc框架服务端/客户端代码. 不过Thrift的实现, 简单使用离实际生产环境还是有一定距离, 本系列将对Thrift作代码解读和框架扩充, 使得它更加贴近生产环境. 本文讲述RPC服务框架中, 日志的重要性, 以及logid的引入. 日志不仅包含丰富的数据(就看是否会挖掘), 而且还是线上服务问题追踪和排查错误最好的方式. 日志级别 采用大家喜闻乐见的log4j作为该RPC服

自研发RPC调用框架

自主研发设计RPC远程调用框架,实现服务自动注册,服务发现,远程RPC调用,后续实现服务负载均衡 主要包括:客户端服务,服务端,服务发现,服务注册 github地址:https://github.com/btshoutn/rpc_project 原文地址:https://www.cnblogs.com/shoutn/p/8297345.html

Pegasus学习三:服务通信框架介绍

Pegasus来源于点评的开源框架pigeon:https://github.com/dianping/pigeon 什么是Pegasus Pegasus是一个高性能的分布式服务框架,致力于提供RPC远程服务调用方案,并可作为精简版SOA服务治理的方案.实现基本的RPC,路由,负载均衡,容错,统计报表,服务注册订阅功能. 其核心模块包括以下几类: 远程通信:基于NIO的Socket通信,"底层异步上层同步" + "请求|响应" 模式的信息交互方式,可支持多种序列化协

自己动手写RPC框架到dubbo的服务动态注册,服务路由,负载均衡功能实现

RPC即远程过程调用,它的实现方式有很多,比如webservice等.框架调多了,烦了,没激情了,我们就该问自己,这些框架的作用到底是什么,来找回当初的激情. 一般来说,我们写的系统就是一个单机系统,一个web服务器一个数据库服务,但是当这单台服务器的处理能力受硬件成本的限制,是不能无限的提升处理性能的.这个时候我们使用RPC将原来的本地调用转变为调用远端的服务器上的方法,给系统的处理能力和吞吐量带来了提升. RPC的实现包括客户端和服务端,即服务的调用方和服务的提供方.服务调用方发送rpc请求

基于netty轻量的高性能分布式RPC服务框架forest&lt;下篇&gt;

基于netty轻量的高性能分布式RPC服务框架forest<上篇> 文章已经简单介绍了forest的快速入门,本文旨在介绍forest用户指南. 基本介绍 Forest是一套基于java开发的RPC框架,除了常规的点对点调用外,Motan还提供服务治理功能,包括服务节点的自动发现.摘除.高可用和负载均衡等. 架构概述 Forest中分为服务提供方(RPC Server),服务调用方(RPC Client)和服务注册中心(Registry)三个角色. Server提供服务,向Registry注册

基于开源Dubbo分布式RPC服务框架的部署整合

一.前言 Dubbo 作为SOA服务化治理方案的核心框架,用于提高业务逻辑的复用.整合.集中管理,具有极高的可靠性(HA)和伸缩性,被应用于阿里巴巴各成员站点,同时在包括JD.当当在内的众多互联网项目中有着广泛应用.dubbo 通过高性能 RPC 实现服务的输出和输入功能,框架基于 Spring Framework 进行无缝集成,使用过程中基本看不到 Dubbo API的直接调用,Dubbo服务支持RMI.Hessian.Dubbo.WebService等众多通信协议,同时提供了对服务的监控和管

【Rpc】基于开源Dubbo分布式RPC服务框架的部署整合

一.前言 Dubbo 作为SOA服务化治理方案的核心框架,用于提高业务逻辑的复用.整合.集中管理,具有极高的可靠性(HA)和伸缩性,被应用于阿里巴巴各成员站点,同时在包括JD.当当在内的众多互联网项目中有着广泛应用.dubbo 通过高性能 RPC 实现服务的输出和输入功能,框架基于 Spring Framework 进行无缝集成,使用过程中基本看不到 Dubbo API的直接调用,Dubbo服务支持RMI.Hessian.Dubbo.WebService等众多通信协议,同时提供了对服务的监控和管