1. 概述
go 源码中带了 rpc 框架, 以相对精简的当时方式实现了 rpc 功能, 目前源码中的 rpc 官方已经宣布不再添加新功能, 并推荐使用 grpc. 作为 go 标准库中 rpc 框架, 还是有很多地方值得借鉴及学习, 这里将从源码角度分析 go 原生 rpc 框架.
2.server 端
server 端主要分为两个步骤, 首先进行方法注册, 通过反射处理将方法取出, 并存到 map 中. 然后是网络调用, 主要是监听端口, 读取数据包, 解码请求 调用反射处理后的方法, 将返回值编码, 返回给客户端.
2.1 方法注册
- 2.1.1 Register
- // Register publishes the receiver's methods in the DefaultServer.
- func Register(rcvr interface{}) error { return DefaultServer.Register(rcvr) }
- // RegisterName is like Register but uses the provided name for the type
- // instead of the receiver's concrete type.
- func RegisterName(name string, rcvr interface{}) error {
- return DefaultServer.RegisterName(name, rcvr)
- }
如上, 方法注册的入口函数有两个, 分别为 Register 以及 RegisterName, 这里 interface{}通常是带方法的对象. 如果想要自定义方法的接收对象, 则可以使用 RegisterName.
2.1.2 反射处理过程
type methodType struct {
sync.Mutex // protects counters
method reflect.Method // 反射后的函数
ArgType reflect.Type // 请求参数的反射值
ReplyType reflect.Type // 返回参数的反射值
numCalls uint // 调用次数
- }
- type service struct {
- name string // 服务名, 这里通常为 register 时的对象名或自定义对象名
rcvr reflect.Value // 服务的接收者的反射值
typ reflect.Type // 接收者的类型
method map[string]*methodType // 对象的所有方法的反射结果.
}
反射处理过程, 其实就是将对象以及对象的方法, 通过反射生成上面的结构, 如注册 Arith.Multiply(xx,xx) error 这样的对象时, 生成的结构为 map["Arith"]*service, service 中 ethod 为 map["Multiply"]*methodType.
几个关键代码如下:
生成 service 对象
- func (server *Server) register(rcvr interface{}, name string, useName bool) error {
- // 生成 service
- s := new(service)
- s.typ = reflect.TypeOf(rcvr)
- s.rcvr = reflect.ValueOf(rcvr)
- sname := reflect.Indirect(s.rcvr).Type().Name()
- ....
- s.name = sname
- // 通过 suitableMethods 将对象的方法转换成 map[string]*methodType 结构
- s.method = suitableMethods(s.typ, true)
- ....
- //service 存储为键值对
- if _, dup := server.serviceMap.LoadOrStore(sname, s); dup {
- return errors.New("rpc: service already defined:" + sname)
- }
- return nil
- }
生成 map[string] *methodType
- func suitableMethods(typ reflect.Type, reportErr bool) map[string]*methodType {
- methods := make(map[string]*methodType)
- // 通过反射, 遍历所有的方法
- for m := 0; m < typ.NumMethod(); m++ {
- method := typ.Method(m)
- mtype := method.Type
- mname := method.Name
- // Method must be exported.
- if method.PkgPath != "" {
- continue
- }
- // Method needs three ins: receiver, *args, *reply.
- if mtype.NumIn() != 3 {
- if reportErr {
- log.Println("method", mname, "has wrong number of ins:", mtype.NumIn())
- }
- continue
- }
- // 取出请求参数类型
- argType := mtype.In(1)
- ...
- // 取出响应参数类型, 响应参数必须为指针
- replyType := mtype.In(2)
- if replyType.Kind() != reflect.Ptr {
- if reportErr {
- log.Println("method", mname, "reply type not a pointer:", replyType)
- }
- continue
- }
- ...
- // 去除函数的返回值, 函数的返回值必须为 error.
- if returnType := mtype.Out(0); returnType != typeOfError {
- if reportErr {
- log.Println("method", mname, "returns", returnType.String(), "not error")
- }
- continue
- }
- // 将方法存储成 key-value
- methods[mname] = &methodType{method: method, ArgType: argType, ReplyType: replyType}
- }
- return methods
- }
2.2 网络调用
- // Request 每次 rpc 调用的请求的头部分
- type Request struct {
ServiceMethod string // 格式为: "Service.Method"
Seq uint64 // 客户端生成的序列号
- next *Request // server 端保持的链表
- }
- // Response 每次 rpc 调用的响应的头部分
- type Response struct {
ServiceMethod string // 对应请求部分的 ServiceMethod
Seq uint64 // 对应请求部分的 Seq
- Error string // 错误
- next *Response // server 端保持的链表
- }
如上, 网络调用主要用到上面的两个结构体, 分别是请求参数以及返回参数, 通过编解码器 (gob/json) 实现二进制到结构体的相互转换. 主要涉及到下面几个步骤:
关键代码如下: 取出请求, 并得到相应函数的调用参数
- func (server *Server) readRequestHeader(codec ServerCodec) (svc *service, mtype *methodType, req *Request, keepReading bool, err error) {
- // Grab the request header.
- req = server.getRequest()
- // 编码器读取生成请求
- err = codec.ReadRequestHeader(req)
- if err != nil {
- // 错误处理
- ...
- return
- }
- keepReading = true
- // 取出服务名以及方法名
- dot := strings.LastIndex(req.ServiceMethod, ".")
- if dot < 0 {
- err = errors.New("rpc: service/method request ill-formed:" + req.ServiceMethod)
- return
- }
- serviceName := req.ServiceMethod[:dot]
- methodName := req.ServiceMethod[dot+1:]
- // 从注册时生成的 map 中查询出相应的方法的结构
- svci, ok := server.serviceMap.Load(serviceName)
- if !ok {
- err = errors.New("rpc: can't find service " + req.ServiceMethod)
- return
- }
- svc = svci.(*service)
- // 获取出方法的类型
- mtype = svc.method[methodName]
- if mtype == nil {
- err = errors.New("rpc: can't find method " + req.ServiceMethod)
- }
- // 循环处理, 不断读取链接上的字节流, 解密出请求, 调用方法, 编码响应, 回写到客户端.
- func (server *Server) ServeCodec(codec ServerCodec) {
- sending := new(sync.Mutex)
- for {
- // 读取请求
- service, mtype, req, argv, replyv, keepReading, err := server.readRequest(codec)
- if err != nil {
- ...
- }
- // 调用
- go service.call(server, sending, mtype, req, argv, replyv, codec)
- }
- codec.Close()
- }
通过参数进行函数调用
- func (s *service) call(server *Server, sending *sync.Mutex, mtype *methodType, req *Request, argv, replyv reflect.Value, codec ServerCodec) {
- mtype.Lock()
- mtype.numCalls++
- mtype.Unlock()
- function := mtype.method.Func
- // 通过反射进行函数调用
- returnValues := function.Call([]reflect.Value{s.rcvr, argv, replyv})
- // 返回值是不为空时, 则取出错误的 string
- errInter := returnValues[0].Interface()
- errmsg := ""
- if errInter != nil {
- errmsg = errInter.(error).Error()
- }
- // 发送相应, 并释放请求结构
- server.sendResponse(sending, req, replyv.Interface(), codec, errmsg)
- server.freeRequest(req)
- }
3.client 端
- // 异步调用
- func (client *Client) Go(serviceMethod string, args interface{}, reply interface{}, done chan *Call) *Call {
- }
- // 同步调用
- func (client *Client) Call(serviceMethod string, args interface{}, reply interface{}) error {
- }
- // Call represents an active RPC.
- type Call struct {
ServiceMethod string // 服务名及方法名 格式: 服务. 方法
Args interface{} // 函数的请求参数 (*struct).
Reply interface{} // 函数的响应参数 (*struct).
Error error // 方法完成后 error 的状态.
Done chan *Call // 方法调用结束后的 channel.
}
client 端部分则相对要简单很多, 主要提供 Call 以及 Go 两个方法, 分别表示同步调用以及异步调用, 但其实同步调用底层实现其实也是异步调用, 调用时主要用到了 Call 结构, 相关解释如上.
3.1 主要流程
3.2 关键代码
发送请求部分代码, 每次 send 一次请求, 均生成一个 call 对象, 并使用 seq 作为 key 保存在 map 中, 服务端返回时从 map 取出 call, 进行相应处理.
- func (client *Client) send(call *Call) {
- // 请求级别的锁
- client.reqMutex.Lock()
- defer client.reqMutex.Unlock()
- // Register this call.
- client.mutex.Lock()
- if client.shutdown || client.closing {
- call.Error = ErrShutdown
- client.mutex.Unlock()
- call.done()
- return
- }
- // 生成 seq, 每次调用均生成唯一的 seq, 在服务端相应后会通过该值进行匹配
- seq := client.seq
- client.seq++
- client.pending[seq] = call
- client.mutex.Unlock()
- // 请求并发送请求
- client.request.Seq = seq
- client.request.ServiceMethod = call.ServiceMethod
- err := client.codec.WriteRequest(&client.request, call.Args)
- if err != nil {
- // 发送请求错误时, 将 map 中 call 对象删除.
- client.mutex.Lock()
- call = client.pending[seq]
- delete(client.pending, seq)
- client.mutex.Unlock()
- if call != nil {
- call.Error = err
- call.done()
- }
- }
- }
接收响应部分的代码, 这里是一个 for 循环, 不断读取 tcp 上的流, 并解码成 Response 对象以及方法的 Reply 对象.
- func (client *Client) input() {
- var err error
- var response Response
- for err == nil {
- response = Response{}
- err = client.codec.ReadResponseHeader(&response)
- if err != nil {
- break
- }
- // 通过 response 中的 Seq 获取 call 对象
- seq := response.Seq
- client.mutex.Lock()
- call := client.pending[seq]
- delete(client.pending, seq)
- client.mutex.Unlock()
- switch {
- case call == nil:
- err = client.codec.ReadResponseBody(nil)
- if err != nil {
- err = errors.New("reading error body:" + err.Error())
- }
- case response.Error != "":
- // 服务端返回错误, 直接将错误返回
- call.Error = ServerError(response.Error)
- err = client.codec.ReadResponseBody(nil)
- if err != nil {
- err = errors.New("reading error body:" + err.Error())
- }
- call.done()
- default:
- // 通过编码器, 将 Resonse 的 body 部分解码成 reply 对象.
- err = client.codec.ReadResponseBody(call.Reply)
- if err != nil {
- call.Error = errors.New("reading body" + err.Error())
- }
- call.done()
- }
- }
- // 客户端退出处理
- client.reqMutex.Lock()
- client.mutex.Lock()
- client.shutdown = true
- closing := client.closing
- if err == io.EOF {
- if closing {
- err = ErrShutdown
- } else {
- err = io.ErrUnexpectedEOF
- }
- }
- for _, call := range client.pending {
- call.Error = err
- call.done()
- }
- client.mutex.Unlock()
- client.reqMutex.Unlock()
- if debugLog && err != io.EOF && !closing {
- log.Println("rpc: client protocol error:", err)
- }
- }
4. 一些缺点
同步调用无法超时
由于原生 rpc 只提供两个方法, 同步的 Call 以及异步的 Go, 同步的 Call 服务端不返回则会一直阻塞, 这里如果存在大量的不返回, 会导致协程一直无法释放.
异步调用超时后会内存泄漏
基于异步调用加 channel 实现超时功能也会存在泄漏问题, 原因是 client 的请求会存在 map 结构中, Go 函数退出并不会清理 map 的内容, 因此如果 server 端不返回的话, map 中的请求会一直存储, 从而导致内存泄漏.
底层链接状态无法维持
由于没有 keepalive 机制, 当对底层链接进行复用时会出现链接实际已经不可用, 但上层无法感知到的情况, 从而导致发出请求, 一直无法收到回应.
5. 总结
总的来说, go 原生 rpc 算是个基础版本的 rpc, 代码精简, 可扩展性高, 但是只是实现了 rpc 最基本的网络通讯, 像超时熔断, 链接管理(保活与重连), 服务注册发现, 还是欠缺的, 因此还是达不到生产环境开箱即用, 不过 git 就有一个基于 rpc 的功能增强版本, 叫 rpcx, 支持了大部分主流 rpc 的特性.
6. 参考
rpc https://golang.org/pkg/net/rpc/
来源: https://juejin.im/post/5ad9bc80f265da0b9265245d