用Golang手写一个RPC,理解RPC原理

代码结构

.
├── client.go
├── coder.go
├── coder_test.go
├── rpc_test.go
├── server.go
├── session.go
└── session_test.go

代码

client.go

package rpc

import (
	"net"
	"reflect"
)

// rpc 客户端实现

// 抽象客户端方法
type Client struct {
	conn net.Conn
}

// client构造方法
func NewClient(conn net.Conn) *Client {
	return &Client{conn: conn}
}

// 客户端调用服务端rpc实现
// client.RpcCall("login", &req)
func (c *Client) RpcCall(name string, fpr interface{}) {
	// 反射获取函数原型
	fn := reflect.ValueOf(fpr).Elem()
	// 客户端逻辑的实现
	f := func(args []reflect.Value) (results []reflect.Value) {
		// 从匿名函数中构建请求参数
		inArgs := make([]interface{}, 0, len(args))
		for _, v := range args {
			inArgs = append(inArgs, v.Interface())
		}
		// 组装rpc data请求数据
		reqData := RpcData{Name: name, Args: inArgs}
		// 进行数据编码
		reqByteData, err := encode(reqData)
		if err != nil {
			return
		}
		// 创建session 对象
		session := NewSession(c.conn)
		// 客户端发送数据
		err = session.Write(reqByteData)
		if err != nil {
			return
		}
		// 读取客户端数据
		rspByteData, err := session.Read()
		if err != nil {
			return
		}
		// 数据进行解码
		rspData, err := decode(rspByteData)
		if err != nil {
			return
		}
		// 处理服务端返回的数据结果
		outArgs := make([]reflect.Value, 0, len(rspData.Args))
		for i, v := range rspData.Args {
			// 数据特殊情况处理
			if v == nil {
				// reflect.Zero() 返回某类型的零值的value
				// .Out()返回函数输出的参数类型
				// 得到具体第几个位置的参数的零值
				outArgs = append(outArgs, reflect.Zero(fn.Type().Out(i)))
				continue
			}
			outArgs = append(outArgs, reflect.ValueOf(v))
		}

		return outArgs
	}

	// 函数原型到调用的关键,需要2个参数
	// 参数1:函数原型,是Type类型
	// 参数2:返回类型是Value类型
	// 简单理解:参数1是函数原型,参数2是客户端逻辑
	v := reflect.MakeFunc(fn.Type(), f)
	fn.Set(v)
}

coder.go

package rpc

import (
	"bytes"
	"encoding/gob"
	"fmt"
)

// 对传输的数据进行编解码
// 使用Golang自带的一个数据结构序列化编码/解码工具 gob

// 定义rpc数据交互式数据传输格式
type RpcData struct {
	Name string        // 调用方法名
	Args []interface{} // 调用和返回的参数列表
}

// 编码
func encode(data RpcData) ([]byte, error) {
	// gob进行编码
	var buf bytes.Buffer
	// 得到字节编码器
	encoder := gob.NewEncoder(&buf)
	// 进行编码
	if err := encoder.Encode(data); err != nil {
		fmt.Printf("gob encode failed, err: %v\n", err)
		return nil, err
	}
	return buf.Bytes(), nil
}

// 解码
func decode(data []byte) (RpcData, error) {
	// 得到字节解码器
	buf := bytes.NewBuffer(data)
	decoder := gob.NewDecoder(buf)
	// 解码数据
	var rd RpcData
	if err := decoder.Decode(&rd); err != nil {
		fmt.Printf("gob decode failed, err: %v\n", err)
		return rd, err
	}
	return rd, nil
}

server.go

package rpc

import (
	"net"
	"reflect"
)

// rpc 服务端实现

// 抽象服务端
type Server struct {
	add   string                   // 连接地址
	funcs map[string]reflect.Value // 存储方法名和方法的对应关系,服务注册
}

// server 构造方法
func NewServer(addr string) *Server {
	return &Server{add: addr, funcs: make(map[string]reflect.Value)}
}

// 注册接口
func (s *Server) Register(name string, fc interface{}) {
	if _, ok := s.funcs[name]; ok {
		return
	}
	s.funcs[name] = reflect.ValueOf(fc)
}

func (s *Server) Run() (err error) {
	listener, err := net.Listen("tcp", s.add)
	if err != nil {
		return
	}
	for {
		// 监听连接
		conn, err := listener.Accept()
		if err != nil {
			conn.Close()
			continue
		}
		// 创建会话
		session := NewSession(conn)
		// 读取会话请求数据
		reqData, err := session.Read()
		if err != nil {
			conn.Close()
			continue
		}
		// 数据解码
		rpcReqData, err := decode(reqData)
		// 获取客户端要调用的方法
		fc, ok := s.funcs[rpcReqData.Name];
		if !ok {
			conn.Close()
			continue
		}
		// 获取请求的参数列表
		args := make([]reflect.Value, 0, len(rpcReqData.Args))
		for _, v := range rpcReqData.Args {
			args = append(args, reflect.ValueOf(v))
		}
		// 调用
		callReslut := fc.Call(args)
		// 处理调用返回的数据结果
		rargs := make([]interface{}, 0, len(callReslut))
		for _, rv := range callReslut {
			rargs = append(rargs, rv.Interface())
		}
		// 构建返回的rpc数据
		rpcRspData := RpcData{Name: rpcReqData.Name, Args: rargs}
		// 返回数据进行编码
		rspData, err := encode(rpcRspData)
		if err != nil {
			conn.Close()
			continue
		}
		err = session.Write(rspData)
		if err != nil {
			conn.Close()
			continue
		}
	}
	return
}

session.go

package rpc

import (
	"encoding/binary"
	"fmt"
	"io"
	"net"
)

// 处理连接会话

// 会话对象结构体
type Session struct {
	conn net.Conn
}

// 传输数据存储方式
// 字节数组, 添加4个字节的头,用来存储数据的长度

// 会话构造函数
func NewSession(conn net.Conn) *Session {
	return &Session{conn: conn}
}

// 从连接中读取数据
func (s *Session) Read() (data []byte, err error) {
	// 读取数据header数据
	header := make([]byte, 4)
	_, err = s.conn.Read(header)
	if err != nil {
		fmt.Printf("read conn header data failed, err: %v\n", err)
		return
	}
	// 读取body数据
	hlen := binary.BigEndian.Uint32(header)
	data = make([]byte, hlen)
	_, err = io.ReadFull(s.conn, data)
	if err != nil {
		fmt.Printf("read conn body data failed, err: %v\n", err)
		return
	}
	return
}

// 向连接中写入数据
func (s *Session) Write(data []byte) (err error) {
	// 创建数据字节切片
	buf := make([]byte, 4+len(data))
	// 向header写入数据长度
	binary.BigEndian.PutUint32(buf[:4], uint32(len(data)))
	// 写入body内容
	copy(buf[4:], data)
	// 写入连接数据
	_, err = s.conn.Write(buf)
	if err != nil {
		fmt.Printf("write conn data failed, err: %v\n", err)
		return
	}
	return
}

coder_test.go

package rpc

import (
	"testing"
)

func TestCoder(t *testing.T) {
	rd := RpcData{
		Name: "login",
		Args: []interface{}{"zhangsan", "zs123"},
	}

	eData, err := encode(rd)
	if err != nil {
		t.Error(err)
		return
	}
	t.Logf("gob 编码后数据长度: %d\n", len(eData))

	dData, err := decode(eData)
	if err != nil {
		t.Error(err)
		return
	}
	t.Logf("%#v\n", dData)
}

session_test.go

package rpc

import (
	"net"
	"sync"
	"testing"
)

func TestSession(t *testing.T) {
	addr := ":8080"
	test_data := "my is test data"
	var wg sync.WaitGroup
	wg.Add(2)
	// 写数据
	go func() {
		defer wg.Done()
		listener, err := net.Listen("tcp", addr)
		if err != nil {
			t.Fatal(err)
			return
		}
		conn, _ := listener.Accept()
		s := NewSession(conn)
		data, err := s.Read()
		if err != nil {
			t.Error(err)
			return
		}
		t.Log(string(data))
	}()

	// 读数据
	go func() {
		defer wg.Done()
		conn, err := net.Dial("tcp", addr)
		if err != nil {
			t.Fatal(err)
			return
		}
		s := NewSession(conn)
		err = s.Write([]byte(test_data))
		if err != nil {
			return
		}
		t.Log("写入数据成功")
		return
	}()

	wg.Wait()
}

rpc_test.go

package rpc

import (
	"encoding/gob"
	"fmt"
	"net"
	"testing"
)

// rpc 客户端和服务端测试

// 定义一个服务端结构体
// 定义一个方法
// 通过调用rpc方法查询用户的信息

type User struct {
	Name string
	Age  int
}

// 定义查询用户的方法
// 通过用户id查询用户数据
func queryUser(id int) (User, error) {
	// 造一些查询user的假数据
	users := make(map[int]User)
	users[0] = User{"user01", 22}
	users[1] = User{"user02", 23}
	users[2] = User{"user03", 24}
	if u, ok := users[id]; ok {
		return u, nil
	}
	return User{}, fmt.Errorf("%d id not found", id)

}

func TestRpc(t *testing.T) {
	// 给gob注册类型
	gob.Register(User{})

	addr := ":8080"

	// 创建服务端
	server := NewServer(addr)
	// 注册服务
	server.Register("queryUser", queryUser)
	// 启动服务端
	go server.Run()

	// 创建客户端连接
	conn, err := net.Dial("tcp", addr)
	if err != nil {
		return
	}
	// 创客户端
	client := NewClient(conn)
	// 定义函数调用原型
	var query func(int) (User, error)
	// 客户端调用rpc
	client.RpcCall("queryUser", &query)
	// 得到返回结果
	user, err := query(1)
	if err != nil {
		t.Error(err)
		return
	}
	fmt.Printf("%#v\n", user)
}

原文地址:https://www.cnblogs.com/zhichaoma/p/12638184.html

时间: 2024-10-08 12:44:42

用Golang手写一个RPC,理解RPC原理的相关文章

手写一个模块化的 TCP 服务端客户端

前面的博客 基于 socket 手写一个 TCP 服务端及客户端 写过一个简单的 TCP 服务端客户端,没有对代码结构进行任何设计,仅仅是实现了相关功能,用于加深对 socket 编程的认识. 这次我们对整个代码结构进行一下优化,使其模块化,易扩展,成为一个简单意义上的“框架”. 对于 Socket 编程这类所需知识偏底层的情况(OS 协议栈的运作机制,TCP 协议的理解,多线程的理解,BIO/NIO 的理解,阻塞函数的运作原理甚至是更底层处理器的中断.网卡等外设与内核的交互.核心态与内核态的切

放弃antd table,基于React手写一个虚拟滚动的表格

缘起 标题有点夸张,并不是完全放弃antd-table,毕竟在react的生态圈里,对国人来说,比较好用的PC端组件库,也就antd了.即便经历了2018年圣诞彩蛋事件,antd的使用者也不仅不减,反而有所上升. 客观地说,antd是开源的,UI设计得比较美观(甩出其他组件库一条街),而且是蚂蚁金服的体验技术部(一堆p7,p8,p9,基本都是大牛级的)在持续地开发维护,质量可以信任. 不过,antd虽好,但一些组件在某一些场景下,是很不适用的.例如,以表格形式无限滚动地展示大量数据(1w+)时,

利用SpringBoot+Logback手写一个简单的链路追踪

目录 一.实现原理 二.代码实战 三.测试 最近线上排查问题时候,发现请求太多导致日志错综复杂,没办法把用户在一次或多次请求的日志关联在一起,所以就利用SpringBoot+Logback手写了一个简单的链路追踪,下面详细介绍下. 一.实现原理 Spring Boot默认使用LogBack日志系统,并且已经引入了相关的jar包,所以我们无需任何配置便可以使用LogBack打印日志. MDC(Mapped Diagnostic Context,映射调试上下文)是log4j和logback提供的一种

如何写一个解释器(1):编译原理

最近在看DSL的东西,对于外部DSL,写一个解释器是必不可少的.我试图归纳一下我学到的,以写一个解释器为目标,讲一下如果来实现一个可用的解释器.一个解释器通常可以分为一下几个阶段: 词法分析(Lexer) 语法分析(Parser, BNF, CFG, AST) 语义分析(AST的处理, annotated AST) 目标语言生成(stack-based) 这里的解释器不包括目标语言的执行和运行时环境,如果需要类似于python/ruby的解析执行器的话,还需要bytecode-compiler,

手写一个词法分析器

前言 最近大部分时间都在撸 Python,其中也会涉及到将数据库表转换为 Python 中 ORM 框架的 Model,但我们并没有找到一个合适的工具来做这个意义不大的"体力活",所以每次新建表后大家都是根据自己的表结构手写一遍 Model. 一两张表还好,一旦 10 几张表都要写一遍时那痛苦只有自己知道:这时程序员的 slogan 再次印证:一切毫无意义的体力劳动终将被计算机取代. intellij plugin 既然没有现成的工具那就自己写一个吧,演示效果如下: 考虑到我们主要是用

Spring系列之手写一个SpringMVC

目录 Spring系列之IOC的原理及手动实现 Spring系列之DI的原理及手动实现 Spring系列之AOP的原理及手动实现 Spring系列之手写注解与配置文件的解析 引言 在前面的几个章节中我们已经简单的完成了一个简易版的spring,已经包括容器,依赖注入,AOP和配置文件解析等功能.这一节我们来实现一个自己的springMvc. 关于MVC/SpringMVC springMvc是一个基于mvc模式的web框架,SpringMVC框架是一种提供了MVC(模型 - 视图 - 控制器)架

css手写一个表头固定

Bootstrap,layui等前端框架里面都对表头固定,表格滚动有实现,偏偏刚入职的公司选择了手动渲染表格,后期又觉得表格数据拉太长想要做表头固定.为了避免对代码改动太大,所以决定手写表头固定 主要遇到的个问题就是固定以后数据表格与表头的对齐问题,也看了很多我文章试下来都不怎么成功,只好自己一点点试 表头固定的一般思路是布两个table,一个放表头,一个放表格体,然后将表格体加上高度height以及overflow-y <div class="content"> <

手写一个IOC容器

链接:https://pan.baidu.com/s/1MhKJYamBY1ejjjhz3BKoWQ 提取码:e8on 明白什么是IOC容器: IOC(Inversion of Control,控制反转).这是spring的核心,贯穿始终.所谓IOC,对于spring框架来说,就是由spring来负责控制对象的生命周期和对象间的关系. 传统的java代码中,我们需要使用哪个对象,就new一个对象,很正常对吧? 然而,这时出现了一个新思想:IOC(控制反转) 由它创建和管理所有的对象,我们需要的时

爬虫入门 手写一个Java爬虫

本文内容 涞源于  罗刚 老师的 书籍 << 自己动手写网络爬虫一书 >> ; 本文将介绍 1: 网络爬虫的是做什么的?  2: 手动写一个简单的网络爬虫; 1: 网络爬虫是做什么的?  他的主要工作就是 跟据指定的url地址 去发送请求,获得响应, 然后解析响应 , 一方面从响应中查找出想要查找的数据,另一方面从响应中解析出新的URL路径, 然后继续访问,继续解析;继续查找需要的数据和继续解析出新的URL路径  . 这就是网络爬虫主要干的工作.  下面是流程图: 通过上面的流程图