前言
上一篇文章里我们实现了基本的 RPC 客户端和服务端, 这次我们开始着手实现更上层的功能. 篇幅所限, 具体的代码实现参见: 代码地址
基础支撑部分
升级版的 Client 和 Server
client 实现
server 实现
首先让我们来重新定义 Client 和 Server:SGClient 和 SGServer.SGClient 封装了上一节定义的 RPCClient 的操作, 提供服务治理的相关特性; SGServer 则由上一节定义的 RPCServer 升级而来, 支持服务治理的相关特性. 这里的 SG(service governance)表示服务治理. 这里直接贴上相关的定义:
- type SGClient interface {
- Go(ctx context.Context, ServiceMethod string, arg interface{}, reply interface{}, done chan *Call) (*Call, error)
- Call(ctx context.Context, ServiceMethod string, arg interface{}, reply interface{}) error
- }
- type sgClient struct {
- shutdown bool
- option SGOption
- clients sync.Map //map[string]RPCClient
- serversMu sync.RWMutex
- servers []registry.Provider
- }
- type RPCServer interface {
- Register(rcvr interface{}, metaData map[string]string) error
- Serve(network string, addr string) error
- Services() []ServiceInfo
- Close() error
- }
- type SGServer struct { // 原来的 RPCServer
- codec codec.Codec
- serviceMap sync.Map
- tr transport.ServerTransport
- mutex sync.Mutex
- shutdown bool
- Option Option
- }
拦截器
在之前的文章提到过, 我们需要提供过滤器一样的使用方式, 来达到对扩展开放对修改关闭的目标. 我们这里采用高阶函数的方式来定义方切面和法拦截器, 首先定义几个切面:
- // 客户端切面
- type CallFunc func(ctx context.Context, ServiceMethod string, arg interface{
- }, reply interface{
- }) error
- type GoFunc func(ctx context.Context, ServiceMethod string, arg interface{
- }, reply interface{
- }, done chan *Call) *Call
- // 服务端切面
- type ServeFunc func(network string, addr string) error
- type ServeTransportFunc func(tr transport.Transport)
- type HandleRequestFunc func(ctx context.Context, request *protocol.Message, response *protocol.Message, tr transport.Transport)
以上几个是 RPC 调用在客户端和服务端会经过的几个函数, 我们将其定义为切面, 然后再定义对应的拦截器:
- // 客户端拦截器
- packege client
- type Wrapper interface {
- WrapCall(option *SGOption, callFunc CallFunc) CallFunc
- WrapGo(option *SGOption, goFunc GoFunc) GoFunc
- }
- //f 服务端拦截器
- package server
- type Wrapper interface {
- WrapServe(s *SGServer, serveFunc ServeFunc) ServeFunc
- WrapServeTransport(s *SGServer, transportFunc ServeTransportFunc) ServeTransportFunc
- WrapHandleRequest(s *SGServer, requestFunc HandleRequestFunc) HandleRequestFunc
- }
这样一来, 用户可以通过实现 Wapper 接口来对客户端或者服务端的行为进行增强, 比如将请求参数和结果记录到日志里, 动态的修改参数或者响应等等. 我们的框架自身 的相关功能也可以通过 Wrapper 实现. 目前客户端实现了用于封装元数据的 MetaDataWrapper 和记录请求和响应的 LogWrapper; 服务端目前在 DefaultWrapper 实现了用于服务注册, 监听退出信号以及请求计数的逻辑.
因为 go 并不提供抽象类的方式, 所以对于某些实现类可能并不需要拦截所有切面(比如只拦截 Call 不想拦截 Go), 这种情况直接返回参数里的函数对象就可以了.
客户端拦截器实现
服务端拦截器实现
服务治理部分
服务注册与发现
在这之前, 我们的 RPC 服务调用都是通过在客户端指定服务端的 ip 和端口来调用的, 这种方式十分简单但也场景十分有限, 估计只能在测试或者 demo 中使用. 所以我们需要提供服务注册和发现相关的功能, 让客户端的配置不再与实际的 IP 绑定, 而是通过独立的注册中心获取服务端的列表, 并且能够在服务端节点变更时获得实时更新.
首先定义相关的接口(代码地址):
- //Registry 包含两部分功能: 服务注册 (用于服务端) 和服务发现(用于客户端)
- type Registry interface {
- Register(option RegisterOption, provider ...Provider) // 注册
- Unregister(option RegisterOption, provider ...Provider) // 注销
- GetServiceList() []Provider // 获取服务列表
- Watch() Watcher // 监听服务列表的变化
- Unwatch(watcher Watcher) // 取消监听
- }
- type RegisterOption struct {
- AppKey string //AppKey 用于唯一标识某个应用
- }
- type Watcher interface {
- Next() (*Event, error) // 获取下一次服务列表的更新
- Close()
- }
- type EventAction byte
- const (
- Create EventAction = iota
- Update
- Delete
- )
- type Event struct { //Event 表示一次更新
- Action EventAction
- AppKey string
- Providers []Provider // 具体变化的服务提供者(增量而不是全量)
- }
- type Provider struct { // 某个具体的服务提供者
- ProviderKey string // Network+"@"+Addr
- Network string
- Addr string
- Meta map[string]string
- }
AppKey
我们使用 AppKey 这样一个概念来标识某个服务, 比如 com.meituan.demo.rpc.server. 服务端在启动时将自身的相关信息 (包括 AppKey,ip,port, 方法列表等) 注册到注册中心; 客户端在需要调用时只需要根据服务端的 AppKey 到注册中心查找即可.
目前暂时只实现了直连 (peer2peer) 和基于内存 (InMemory) 的服务注册, 后续再接入其他独立的组件如 etcd 或者 zookeeper 等等.
InMemory 代码实现地址
负载均衡
有了服务注册与发现之后, 一个客户端所面对的可能就不只有一个服务端了, 客户端在发起调用前需要从多个服务端中选择一个出来进行实际的通信, 具体的选择策略有很多, 比如随机选择, 轮询, 基于权重选择, 基于服务端负载或者自定义规则等等.
这里先给出接口定义:
- //Filter 用于自定义规则过滤某个节点
- type Filter func(provider registry.Provider, ctx context.Context, ServiceMethod string, arg interface{}) bool
- type SelectOption struct {
- Filters []Filter
- }
- type Selector interface {
- Next(providers []registry.Provider, ctx context.Context, ServiceMethod string, arg interface{}, opt SelectOption) (registry.Provider, error)
- }
目前暂时只实现了随机负载均衡, 后续再实现其他策略比如轮询或者一致性哈希等等, 用户也可以选择实现自己的负载均衡策略.
容错处理
长连接以及网络重连
为了减少频繁创建和断开网络连接的开销, 我们维持了客户端到服务端的长连接, 并把创建好的连接 (RPCClient 对象) 用 map 缓存起来, key 就是对应的服务端的标识. 客户端在调用前根据负载均衡的结果检索到缓存好的 RPCClient 然后发起调用. 当我们检索不到对应的客户端或者发现缓存的客户端已经失效时, 需要重新建立连接(重新创建 RPCClient 对象).
- func (c *sgClient) selectClient(ctx context.Context, ServiceMethod string, arg interface{}) (provider registry.Provider, client RPCClient, err error) {
- // 根据负载均衡决定要调用的服务端
- provider, err = c.option.Selector.Next(c.providers(), ctx, ServiceMethod, arg, c.option.SelectOption)
- if err != nil {
- return
- }
- client, err = c.getClient(provider)
- return
- }
- func (c *sgClient) getClient(provider registry.Provider) (client RPCClient, err error) {
- key := provider.ProviderKey
- rc, ok := c.clients.Load(key)
- if ok {
- client := rc.(RPCClient)
- if client.IsShutDown() {
- // 如果已经失效则清除掉
- c.clients.Delete(key)
- }
- }
- // 再次检索
- rc, ok = c.clients.Load(key)
- if ok {
- // 已经有缓存了, 返回缓存的 RPCClient
- client = rc.(RPCClient)
- } else {
- // 没有缓存, 新建一个然后更新到缓存后返回
- client, err = NewRPCClient(provider.Network, provider.Addr, c.option.Option)
- if err != nil {
- return
- }
- c.clients.Store(key, client)
- }
- return
- }
目前的实现当中, 每个服务提供者只有一个对应的 RPCClient, 后续可以考虑类似连接池的实现, 即每个服务提供者对应多个 RPCClient, 每次调用前从连接池中取出一个 RPCClient.
集群容错
在分布式系统中, 异常是不可避免的, 当发生调用失败时, 我们可以选择要采取的处理方式, 这里列举了常见的几种:
- type FailMode byte
- const (
- FailFast FailMode = iota // 快速失败
- FailOver // 重试其他服务器
- FailRetry // 重试同一个服务器
- FailSafe // 忽略失败, 直接返回
- )
具体实现比较简单, 就是根据配置的容错选项和重试次数决定是否重试; 其他包括 FailBack(延时一段时间后重发),Fork 以及 Broadcast 等等暂时没有实现.
优雅退出
在收到程序退出信号时, server 端会尝试优先处理完当前还未结束的请求, 等请求处理完毕之后再退出, 当超出了指定的时间 (默认 12s) 仍未处理完毕时, server 端会直接退出.
- func (s *SGServer) Close() error {
- s.mutex.Lock()
- defer s.mutex.Unlock()
- s.shutdown = true
- // 等待当前请求处理完或者直到指定的时间
- ticker := time.NewTicker(s.Option.ShutDownWait)
- defer ticker.Stop()
- for {
- if s.requestInProcess <= 0 { //requestInProcess 表示当前正在处理的请求数, 在 wrapper 里计数
- break
- }
- select {
- case <-ticker.C:
- break
- }
- }
- return s.tr.Close()
- }
结语
到这里就是这次的全部内容了, 总的来说是在之前的基础上做了封装, 预留了后续的扩展点, 然后实现了简单的服务治理相关的功能. 总结一下, 这次我们在上一篇文章的基础上做了以下改动:
重新定义了 Client 和 Server 的接口
提供了拦截器(Wrapper 接口)
提供了服务注册与发现以及负载均衡的接口和简单实现
实现了简单的容错处理
实现了简单的优雅退出
增加了 gob 序列化方式支持(比较简单, 文章里并没有提到)
历史链接
从零开始实现一个 RPC 框架(零)
从零开始实现一个 RPC 框架(一)
来源: https://juejin.im/post/5c8de09fe51d45242c195117