导语
gRPC 是什么, 不用多说了.
gRPC 如何用, 也不用多说了 .
但是, gRPC 是如何 work 的, 清楚的理解其调用逻辑, 对于我们更好, 更深入的使用 gRPC 很有必要. 因此我们必须深度解析下 gRPC 的实现逻辑, 在本文中, 将分别从客户端和服务端来说明 gRPC 的实现原理.
准备条件
本文将以 gRPC Github 上 helloword 代码 https://github.com/grpc/grpc-go/tree/master/examples/helloworld 作为一个完整的项目示例作为介绍的基础, 在展开分析之前, 简单介绍下作为 gRPC 的文件结构:
- greeter_client
- greeter_server
- helloworld
- mock_helloworld
在这里, 我们只需要关注前三个文件夹的内容.
其中, greet_client 和 greet_server 文件中分别是 grpc 客户端和服务端的业务调用代码, 包含了一个标准的 gRPC 调用过程. helloworld 中包含了是 protobuf 的协议文件和生成的 helloworld.pb.go 文件(至于 pb 的协议和 *.pb.go 文件的生成等内容, 不作为本文的介绍范围, 不再赘述).
客户端
首先, 我们以 Github 官网上的 example 为示例来一览 gRPC client 端的使用, 从而跟踪其调用的逻辑个原理. 总的来看, 调用的过程基本就是分为三步:
创建 connection
创建业务客户端实例
调用 RPC 接口
- {
- ...
- // 创建 connection
- conn, err := grpc.Dial(address, grpc.WithInsecure())
- if err != nil {
- log.Fatalf("did not connect: %v", err)
- }
- defer conn.Close()
- // 创建 client
- c := pb.NewGreeterClient(conn)
- // 调用 RPC 接口
- name := defaultName
- r, err := c.SayHello(context.TODO(), &pb.HelloRequest{Name: name})
- if err != nil {
- log.Fatalf("could not greet: %v", err)
- }
- ...
- }
创建 connection
通过 grpc.Dial()接口创建了一个 ClientConn 类型实例.
Dial()函数的第一个参数作为 endpoint 我们就不多说了, 同时 Dial()还接受变长参数 DialOption.DialOption 是一个接口类型, 在 grpc 中存在着多种返回了 DialOption 类型的函数, 这些返回了 DialOption 类型的函数, 例如编解码, 负载均衡策略等, 一些函数声明示例如下:
- func WithBalancer() DialOption
- func WithInsecure() DialOption
- func WithCodec() DialOption
根据 client 的需求, 调用方在调用 Dial()的时候可以将这些函数作为参数传入 Dial()中.
在 Dial()中, 首先是会根据参数进行一系列的初始化和赋值操作, 就不在这里列出说明, 而对于这些 DailOption 参数, 在 Dial()中最终实现对 grpc.ClientConn 的成员变量 dopts 中的 CallOption 进行了赋值.
通过 Dial()的调用, grpc 已经建立了到服务端的链接, 同时也会附带一些诸如负载均衡, 证书检查, Backoff 等策略的执行(如果有进行配置的话).
创建客户端实例
创建业务 client 实例, 在使用 gRPC 的时候, 我们都知道其传递协议是 protobuf.
而 NewGreeterClient()则是通过对 pb 协议生成的代码接口, 存在于 helloworld.pb.go 中, 该函数主要是返回了一个 greeterClient 类型的实例.
调用 RPC 请求
SayHello()中的 RPC 接口也是存在于根据 pb 协议生成的 helloworld.pb.go 文件中.
SayHello()除了接受一个 context 存储上下文信息和一个 request 类型参数, 同时也支持一个 CallOption 类型的变量. 关于 CallOption 在上文中有提到, 其本身也是一个接口, 其中 before()用于在请求发送之前设置参数, 而 after()则是在请求调用完毕之后提取信息. 通过对这两个函数的调用, 方便的实现了在请求前后的一些参数设置的功能:
- type CallOption interface {
- before(*callInfo) error
- after(*callInfo)
- }
任何一个我们我们上文说到了返回值为 DialOption 的函数, 大部分都有一个对应的结构实现了 CallOption, 诸如上面的 WithCodec(), 其对应的结构为:
- type CustomCodecCallOption struct {
- Codec Codec
- }
- func (o CustomCodecCallOption) before(c *callInfo) error {
- c.codec = o.Codec
- return nil
- }
- func (o CustomCodecCallOption) after(c *callInfo) {}
回到 SayHello()函数的逻辑中来, 该函数最终会调用 grpc 中的 call.go 中的 invoke 函数来执行具体的操作.
在 invoke()函数中, newClientStream()会首先获取传输层 Trasport 结构的实例并包装到一个 ClientStream 实例中返回, 随后将 RPC 请求通过 SendMsg()接口发送出去, 注意, 由于 SendMsg()并不会等待服务端收到数据, 因此还需要通过 RecvMsg()同步接收收到的回复消息 (关于 SendMsg() 和 RecvMsg()中的具体发送和接收数据逻辑, 不在赘述, 可以去源码再详细了解).
- // pb.go 文件
- func (c *greeterClient) SayHello(ctx context.Context, in *HelloRequest, opts ...grpc.CallOption) (*HelloReply, error) {
- out := new(HelloReply)
- err := c.cc.Invoke(ctx, "/helloworld.Greeter/SayHello", in, out, opts...)
- if err != nil {
- return nil, err
- }
- return out, nil
- }
- ...
- // grpc/grpc.go/call.go 文件
- func invoke(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, opts ...CallOption) error {
- cs, err := newClientStream(ctx, unaryStreamDesc, cc, method, opts...)
- if err != nil {
- return err
- }
- if err := cs.SendMsg(req); err != nil {
- return err
- }
- return cs.RecvMsg(reply)
- }
服务端
对于 Server 端, 我们同样地根据 Github 上的官网示例来展开说明. 总的来看, grpc 在 server 端的调用逻辑如下, 基本就是分为四步:
创建端口监听 listener
创建 server 实例
注册服务(并未真正开始服务)
启动服务端
- {
- ...
- // 创建 listener
- lis, err := net.Listen("tcp", port)
- if err != nil {
- log.Fatalf("failed to listen: %v", err)
- }
- // 创建 server 示例
- s := grpc.NewServer()
- // 注册服务
- pb.RegisterGreeterServer(s, &server{})
- reflection.Register(s)
- // 启动服务端监听
- if err := s.Serve(lis); err != nil {
- log.Fatalf("failed to serve: %v", err)
- }
- ...
- }
创建监听端口
创建 listener, 不用多介绍了, 就是创建了一个监听 tcp 端口的 Listener 实例.
创建服务端实例
NewServer()方法创建了一个 grpc.Server 实例, 其函数内部会对该实例进行一系列初始化赋值操作. 该接口与客户端中的 Dial()接口类似, 可以接受多个 ServerOption 入参, 在 helloworld 的示例中并未传入任务参数, 一个简单那的示例如下:
svr := grpc.NewServer(grpc.CustomCodec(proxy.Codec()))
在 grpc 中, 也存在了多种类似于 CustomCodec()这样返回值类型为 ServerOption 的函数, 从而满足调用方在需要求进行传参赋值:
- func CustomCodec() ServerOption
- func MaxConcurrentStreams() ServerOption
- func UnknownServiceHandler() ServerOption
服务注册
RegisterGreeterServer()是由 helloworld.pb.go 生成的接口, 其主要调用了 grpc 的 RegisterService() 来注册当前 service 及其实现.
grpc.RegisterService()接收一个参数类型为 ServiceDesc 的实例_Greeter_serviceDesc 用以岁 service 的描述说明, 同时接收一个 service 实例作注册进来. 其中_Greeter_serviceDesc 是由 pb 生成的对业务 RPC 接口的描述, 如下所示:
- // helloworld.pb.go
- func RegisterGreeterServer(s *grpc.Server, srv GreeterServer) {
- s.RegisterService(&_Greeter_serviceDesc, srv)
- }
- var _Greeter_serviceDesc = grpc.ServiceDesc{
- ServiceName: "helloworld.Greeter",
- HandlerType: (*GreeterServer)(nil),
- Methods: []grpc.MethodDesc{
- {
- MethodName: "SayHello",
- Handler: _Greeter_SayHello_Handler,
- },
- },
- Streams: []grpc.StreamDesc{},
- Metadata: "helloworld.proto",
- }
我们可以看到, 在 grpc.ServiceDesc 中对 Methods 变量进行了赋值. 其中 Methods 包含了一个 RPC 接口名到 handler 的映射数组, 描述了当前 service 支持的所有的方法, MethodName 即为调用的 RPC 接口名, 而 handler 的值_Greeter_SayHello_Handler()也是由 pb 生成的方法, 在其内部通过注册进来的 service 实例, 实现了对我们的业务函数 SayHello()进行了调用:
- func _Greeter_SayHello_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
- in := new(HelloRequest)
- if err := dec(in); err != nil {
- return nil, err
- }
- if interceptor == nil {
- return srv.(GreeterServer).SayHello(ctx, in)
- }
- info := &grpc.UnaryServerInfo{
- Server: srv,
- FullMethod: "/helloworld.Greeter/SayHello",
- }
- handler := func(ctx context.Context, req interface{}) (interface{}, error) {
- return srv.(GreeterServer).SayHello(ctx, req.(*HelloRequest))
- }
- return interceptor(ctx, in, info, handler)
- }
启动服务
Serve 函数中开始接收来到 listener 的请求(实际上也就是对 listener 进行了 Accept()), 并为每一个请求创建一个 go 程来服务.
Serve 函数的逻辑判断比较复杂, 但其实真正的调用逻辑过程十分简单, 在下面列出, 从而有助于我们的理解.
- func (s *Server) Serve(lis net.Listener) error {
- ...
- for {
- // 开始接受服务
- rawConn, err := lis.Accept()
- ...
- // 为每一个请求启动一个 go 程来处理链接
- s.serveWG.Add(1)
- go func() {
- s.handleRawConn(rawConn)
- s.serveWG.Done()
- }()
- }
- }
- func (s *Server) handleRawConn(rawConn net.Conn) {
- // 鉴权操作
- conn, authInfo, err := s.useTransportAuthenticator(rawConn)
- ...
- // 基于 HTTP2, 创建一个 ServerTransport
- st := s.newHTTP2Transport(conn, authInfo)
- ...
- go func() {
- s.serveStreams(st)
- s.removeConn(st)
- }()
- }
其中, newHTTP2Transport()的代码主要部分有一些关于 HTTP2 的赋值和初始化操作, 存在于 internal/transport/http2_server.go 中, 这儿就不再进入介绍 http2 的具体实现方式了. 而 serveStreams()中则主要是调用了 HandleStreams()接口去真正的接受请求流.
- func (s *Server) serveStreams(st transport.ServerTransport) {
- defer st.Close()
- var wg sync.WaitGroup
- st.HandleStreams(func(stream *transport.Stream) {
- wg.Add(1)
- go func() {
- defer wg.Done()
- s.handleStream(st, stream, s.traceInfo(st, stream))
- }()
- }, func(ctx context.Context, method string) context.Context {
- if !EnableTracing {
- return ctx
- }
- tr := trace.New("grpc.Recv."+methodFamily(method), method)
- return trace.NewContext(ctx, tr)
- })
- wg.Wait()
- }
HandleStreams()中的实现在 grpc-go/internal/transport/handler_server.go 文件中.
在 HandleStreams()实现中前面一大部分是对数据流 Stream 的初始化, 数据接收以及赋值, 详细的处理过程大家可以去文件中详细的看代码, 这里我们只做逻辑流程的分析. 在数据流 stream 接收完毕后, 通过注册进来的 server 的 startStream()来处理数据流.
注册进来的 startStream()最终调用了 Server 中的 startStream()函数, 区分出是 unary 请求还是 stream 请求, 并分别通过 processUnaryRPC()和 processStreamingRPC()进行区分处理. 对于两个主要的处理函数 processUnaryRPC()和 processStreamingRPC(), 基本上是一些具体的数据接收, 编解码等操作, 就不在浪费篇幅贴出代码了.
- func (ht *serverHandlerTransport) HandleStreams(startStream func(*Stream), traceCtx func(context.Context, string) context.Context) {
- ...
- // 数据流 Stream 的接受和赋值
- startStream(s)
- ht.runStream()
- close(requestOver)
- // 等待数据读取完毕
- req.Body.Close()
- <-readerDone
- }
- func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Stream, trInfo *traceInfo) {
- ...
- // Unary RPC or Streaming RPC?
- if md, ok := srv.md[method]; ok {
- s.processUnaryRPC(t, stream, srv, md, trInfo)
- return
- }
- if sd, ok := srv.sd[method]; ok {
- s.processStreamingRPC(t, stream, srv, sd, trInfo)
- return
- }
- ...
- if unknownDesc := s.opts.unknownStreamDesc; unknownDesc != nil {
- s.processStreamingRPC(t, stream, nil, unknownDesc, trInfo)
- return
- }
- ...
- }
最后, 简单以一个图示来展示 grpc 服务端的调用流程:
gRPC Server 简化调用流程
总结
上面的就是关于 gRPC 调用逻辑的分析, gRPC 中的代码十分复杂, 本文只涉及了其调用逻辑的分析, 在分析展示源码时, 省略的一些错误处理或者数据处理的代码, 而侧重于逻辑调用的过程, 从而在使用 gRPC 的时候可以更好的理解其原理.
来源: https://www.qcloud.com/developer/article/1189548