前言
RPC 是远程过程调用 (Remote Procedure Call) 的简称, 通过 RPC 我们可以像调用本地方法一样调用位于其他位置的函数. 大家更常见的可能是 HTTP API 调用, 简单来对比的话, RPC 比起 HTTP 调用封装更完善, 调用者不必手动处理序列化和反序列化, 使用成本更低一些(虽然学习成本可能会更高).
出于学习目的, 这次的目标是使用 go 语言来实现一个自己的 RPC. 在现实世界里, 对于一个 RPC 工具, 除了方法调用以外, 人们更看重的是其他功能比如服务发现, 负载均衡, 熔断降级之类的功能, 这里暂时不会涉及, 而是仅关注实现一个可以工作的方法调用.
在之前的文章里大致了解了 go 语言自带的 rpc 框架, 其中就提到 go rpc 预留了 codec 接口, 可以让用户在 go rpc 使用自己的序列化协议, 这次就尝试实现一个自己的 codec 来实现自己的 RPC.
准备工作
序列化协议
要实现一个 RPC, 基本的元素大概有这几个: 序列化协议, 网络模型和线程模型. 而 go rpc 里的 codec 基本上实现的就是序列化协议.
本来想着用比较熟悉的 thrift 协议, 但是使用 thrift 本身实现了 RPC 流程, 所以它并不是一个单纯的序列化协议, 它的序列化逻辑可能无法和 go rpc 很好的契合, 再加上还需要书写 IDL 定义, 增加复杂度. 本来就是为了熟悉 go, 所以这里先从简单的开始, 于是选择 MessagePack 作为序列化协议.
MessagePack https://msgpack.org/ 是一个比较轻量级的序列化协议, 它的逻辑和 JSON 类似, 但是使用的是二进制形式, 所以比 JSON 序列化更快, 序列化后产生的数据也更小, 基本上可以认为是一个二进制版本的 JSON.
创建类定义
要实现自己的 codec, 需要分别实现 go rpc 中提供个两个接口: ServerCodec 和 ClientCodec, 很明显他们分别表示服务端和客户端的逻辑, 两个接口的定义具体如下:
- type ServerCodec interface {
- ReadRequestHeader(*Request) error
- ReadRequestBody(interface{}) error
- WriteResponse(*Response, interface{}) error
- Close() error
- }
- type ClientCodec interface {
- WriteRequest(*Request, interface{}) error
- ReadResponseHeader(*Response) error
- ReadResponseBody(interface{}) error
- Close() error
- }
可以看到, go rpc 将一次请求 / 响应抽象成了 header+body 的形式, 读取数据时分为读取 head 和读取 body, 写入数据时只需写入 body 部分, go rpc 会替我们加上 head 部分. 接下来我们定义两个结构, 用来表示一次请求 / 响应的完整数据:
- type MsgpackReq struct {
- rpc.Request //head
- Arg interface{} //body
- }
- type MsgpackResp struct {
- rpc.Response //head
- Reply interface{} //body
- }
这里的 msgpackReq 和 msgpackResp 直接内嵌了 go rpc 里自带的 Request 和 Response, 自带的 Request 和 Response 定义了序号, 方法名等信息.
接下来就是自定义 Codec 的声明:
- type MessagePackServerCodec struct {
- rwc io.ReadWriteCloser // 用于读写数据, 实际是一个网络连接
- req MsgpackReq // 用于缓存解析到的请求
- closed bool // 标识 codec 是否关闭
- }
- type MessagePackClientCodec struct {
- rwc io.ReadWriteCloser
- resp MsgpackResp // 用于缓存解析到的请求
- closed bool
- }
- func NewServerCodec(conn.NET.Conn) *MessagePackServerCodec {
- return &MessagePackServerCodec{conn, MsgpackReq{}, false}
- }
- func NewClientCodec(conn.NET.Conn) *MessagePackClientCodec {
- return &MessagePackClientCodec{conn, MsgpackResp{}, false}
- }
在之前的文章里提到了, codec 需要包含一个数据源用于读写数据, 这里直接将网路连接传递进去.
实现 Codec 方法
实现思路
接下来是具体的方法实现, 处于简单起见, 这里将反序列化部分的两步合并为一步, 在读取 head 部分时就将所有的数据解析好并缓存起来, 读取 body 时直接返回缓存的结果. 具体的思路就是:
客户端在发送请求时, 将数据包装成一个 MsgpackReq, 然后用 MessagePack 序列化并发送出去
服务端在读取请求 head 部分时, 将收到的数据用 MessagePack 反序列化成一个 MsgpackReq, 并将得到的结果缓存起来
服务端在读取请求 body 部分时, 从缓存的 MsgpackReq 中获取到 Arg 字段并返回
服务端在发送响应时, 将数据包装成一个 MsgpackResp, 然后用 MessagePack 序列化并发送出去
客户端在读取响应 head 部分时, 将收到的数据用 MessagePack 反序列化成一个 MsgpackResp, 并将得到的结果缓存起来
客户端在读取响应 body 部分时, 从缓存的 MsgpackResp 中获取到 Reply 或者 Error 字段并返回
Client 实现
这里直接上代码:
- func (c *MessagePackClientCodec) WriteRequest(r *rpc.Request, arg interface{}) error {
- // 先判断 codec 是否已经关闭, 如果是则直接返回
- if c.closed {
- return nil
- }
- // 将 r 和 arg 组装成一个 MsgpackReq 并序列化
- request := &MsgpackReq{*r, arg}
- reqData, err := msgpack.Marshal(request)
- if err != nil {
- panic(err)
- return err
- }
- // 先发送数据长度
- head := make([]byte, 4)
- binary.BigEndian.PutUint32(head, uint32(len(reqData)))
- _, err = c.rwc.Write(head)
- // 再将序列化产生的数据发送出去
- _, err = c.rwc.Write(reqData)
- return err
- }
- func (c *MessagePackClientCodec) ReadResponseHeader(r *rpc.Response) error {
- // 先判断 codec 是否已经关闭, 如果是则直接返回
- if c.closed {
- return nil
- }
- // 读取数据
- data, err := readData(c.rwc)
- if err != nil {
- //client 一旦初始化就会开始轮询数据, 所以要处理连接 close 的情况
- if strings.Contains(err.Error(), "use of closed network connection") {
- return nil
- }
- panic(err) // 简单起见, 出现异常直接 panic
- }
- // 将读取到的数据反序列化成一个 MsgpackResp
- var response MsgpackResp
- err = msgpack.Unmarshal(data, &response)
- if err != nil {
- panic(err) // 简单起见, 出现异常直接 panic
- }
- // 根据读取到的数据设置 request 的各个属性
- r.ServiceMethod = response.ServiceMethod
- r.Seq = response.Seq
- // 同时将读取到的数据缓存起来
- c.resp = response
- return nil
- }
- func (c *MessagePackClientCodec) ReadResponseBody(reply interface{}) error {
- // 这里直接用缓存的数据返回即可
- if "" != c.resp.Error {// 如果返回的是异常
- return errors.New(c.resp.Error)
- }
- if reply != nil {
- // 正常返回, 通过反射将结果设置到 reply 变量, 因为 reply 一定是指针类型, 所以不必检查 CanSet
- reflect.ValueOf(reply).Elem().Set(reflect.ValueOf(c.resp.Reply))
- }
- return nil
- }
- func (c *MessagePackClientCodec) Close() error {
- c.closed = true // 关闭时将 closed 设置为 true
- if c.rwc != nil {
- return c.rwc.Close()
- }
- return nil
- }
以上就是 client 部分的实现, 值得注意的有几点:
读写数据前, 需要检查 codec 是否已经关闭了
读写数据时需要处理拆包粘包(通过 readData 函数处理)
Server 实现
同样直接上代码:
- func (c *MessagePackServerCodec) WriteResponse(r *rpc.Response, reply interface{}) error {
- // 先判断 codec 是否已经关闭, 如果是则直接返回
- if c.closed {
- return nil
- }
- // 将 r 和 reply 组装成一个 MsgpackResp 并序列化
- response := &MsgpackResp{*r, reply}
- respData, err := msgpack.Marshal(response)
- if err != nil {
- panic(err)
- return err
- }
- head := make([]byte, 4)
- binary.BigEndian.PutUint32(head, uint32(len(respData)))
- _, err = c.rwc.Write(head)
- // 将序列化产生的数据发送出去
- _, err = c.rwc.Write(respData)
- return err
- }
- func (c *MessagePackServerCodec) ReadRequestHeader(r *rpc.Request) error {
- // 先判断 codec 是否已经关闭, 如果是则直接返回
- if c.closed {
- return nil
- }
- // 读取数据
- data, err := readData(c.rwc)
- if err != nil {
- // 这里不能直接 panic, 需要处理 EOF 和 reset 的情况
- if err == io.EOF {
- return err
- }
- if strings.Contains(err.Error(), "connection reset by peer") {
- return err
- }
- panic(err) // 其他异常直接 panic
- }
- // 将读取到的数据反序列化成一个 MsgpackReq
- var request MsgpackReq
- err = msgpack.Unmarshal(data, &request)
- if err != nil {
- panic(err) // 简单起见, 出现异常直接 panic
- }
- // 根据读取到的数据设置 request 的各个属性
- r.ServiceMethod = request.ServiceMethod
- r.Seq = request.Seq
- // 同时将解析到的数据缓存起来
- c.req = request
- return nil
- }
- func (c *MessagePackServerCodec) ReadRequestBody(arg interface{}) error {
- if arg != nil {
- // 参数不为 nil, 通过反射将结果设置到 arg 变量
- reflect.ValueOf(arg).Elem().Set(reflect.ValueOf(c.req.Arg))
- }
- return nil
- }
- func (c *MessagePackServerCodec) Close() error {
- c.closed = true
- if c.rwc != nil {
- return c.rwc.Close()
- }
- return nil
- }
实际上 server 端的实现几乎和 client 端逻辑的一样, 只是 request 和 response 的角色不同而已. 其中有几点需要注意:
server 端读取数据时需要处理 EOF 和连接 reset 的情况
server 在返回数据时没有显式处理接口产生的 error, 只是将 reply 传递了回去, 这是因为 error 在 rpc.Request 里存着, 不用 codec 处理
处理拆包粘包
具体思路参考 go 语言处理 TCP 拆包 / 粘包 , 这里附上 readData 的实现:
- func readData(conn io.ReadWriteCloser) (data []byte, returnError error) {
- const HeadSize = 4 // 设定长度部分占 4 个字节
- headBuf := bytes.NewBuffer(make([]byte, 0, HeadSize))
- headData := make([]byte, HeadSize)
- for {
- readSize, err := conn.Read(headData)
- if err != nil {
- returnError = err
- return
- }
- headBuf.Write(headData[0:readSize])
- if headBuf.Len() == HeadSize {
- break
- } else {
- headData = make([]byte, HeadSize-readSize)
- }
- }
- bodyLen := int(binary.BigEndian.Uint32(headBuf.Bytes()))
- bodyBuf := bytes.NewBuffer(make([]byte, 0, bodyLen))
- bodyData := make([]byte, bodyLen)
- for {
- readSize, err := conn.Read(bodyData)
- if err != nil {
- returnError = err
- return
- }
- bodyBuf.Write(bodyData[0:readSize])
- if bodyBuf.Len() == bodyLen {
- break
- } else {
- bodyData = make([]byte, bodyLen-readSize)
- }
- }
- data = bodyBuf.Bytes()
- returnError = nil
- return
- }
测试代码
接下来我们通过简单的 Echo 调用测试一下我们的 codec:
- // 声明接口类
- type EchoService struct {}
- // 定义方法 Echo
- func (service *EchoService) Echo(arg string, result *string) error {
- *result = arg
- return nil
- }
- // 服务端启动逻辑
- func RegisterAndServeOnTcp() {
- err := rpc.Register(&EchoService{})// 注册并不是注册方法, 而是注册 EchoService 的一个实例
- if err != nil {
- log.Fatal("error registering", err)
- return
- }
- tcpAddr, err := net.ResolveTCPAddr("tcp", ":1234")
- if err != nil {
- log.Fatal("error resolving tcp", err)
- }
- listener, err := net.ListenTCP("tcp", tcpAddr)
- for {
- conn, err := listener.Accept()
- if err != nil {
- log.Fatal("error accepting", err)
- } else {
- // 这里先通过 NewServerCodec 获得一个实例, 然后调用 rpc.ServeCodec 来启动服务
- rpc.ServeCodec(msgpk.NewServerCodec(conn))
- }
- }
- }
- // 客户端调用逻辑
- func CallEcho(method string, arg interface{}) (result interface{}, err error) {
- var client *rpc.Client
- conn, err := net.Dial("tcp", ":1234")
- client = rpc.NewClientWithCodec(msgpk.NewClientCodec(conn))
- defer func() {
- conn.Close()
- client.Close()
- }()
- if err != nil {
- return "", err
- }
- err = client.Call(method, arg, &result) // 通过类型加方法名指定要调用的方法
- if err != nil {
- return "", err
- }
- return result, err
- }
- //main 函数
- func main() {
- go server.RegisterAndServeOnTcp() // 先启动服务端
- time.Sleep(1e9)
- wg := new(sync.WaitGroup) //waitGroup 用于阻塞主线程防止提前退出
- callTimes := 10
- wg.Add(callTimes)
- for i := 0; i < callTimes; i++ {
- go func() {
- // 使用 hello world 加一个随机数作为参数
- argString := "hello world"+strconv.Itoa(rand.Int())
- resultString, err := client.Echo(argString)
- if err != nil {
- log.Fatal("error calling:", err)
- }
- if resultString != argString {
- fmt.Println("error")
- } else {
- fmt.Printf("echo:%s\n", resultString)
- }
- wg.Done()
- }()
- }
- wg.Wait()
- }
上面的例子里首先通过 go server.RegisterAndServeOnTcp()启动了服务端, 然后同时启动了 10 个 go routine 来发起请求, 客户端在收到响应之后会打印对应的结果. 最后执行 main 函数, 控制台会输出结果(后面的随机数可能会不同):
- echo:hello world 8674665223082153551
- echo:hello world 6129484611666145821
- echo:hello world 5577006791947779410
- echo:hello world 605394647632969758
- echo:hello world 4037200794235010051
- echo:hello world 3916589616287113937
- echo:hello world 894385949183117216
- echo:hello world 1443635317331776148
- echo:hello world 2775422040480279449
- echo:hello world 6334824724549167320
结语
到这里, 一个简单的自定义的 go 语言 rpc 就已经完成了, 虽然自定义部分只有序列化协议部分而已, 比如线程模型仍是 go rpc 自带的逻辑, 除此之外也没有前言里提到的各种高级功能. 后续再考虑尝试用 go 语言从零开始实现一个 RPC 吧.
其他
并发场景
有细心的同学可能已经发现了, 这里实现的逻辑当中完全没有考虑并发的问题, 缓存数据也是直接放到 codec 对象. 而这样简单的实现也不会导致并发调用失败, 其中具体的原因就是 go rpc 在处理每个 codec 对象时, 读取请求都是循序的, 然后再并发的处理请求并返回结果.
来源: https://juejin.im/post/5c4d7005f265da61223ab198