grpc proxy 是一个基于 L7 层的无状态的 gRPC 的 etcd 反向代理服务. 这个 L7 只的是 OSI 模型中的第七层, 会话层. 它除了提供 etcd client 的基本功能之外, 同样提供且优化了以下功能:
Watch API
grpc Pproxy 提供监听机制, 客户端可以监听某个 key 或者某些 key 的变更(v2 和 v3 的机制不同, 参看后面文章). 用于监听和推送变更.
它可以将多个客户端 (c-watchers) 对同一个 key 的监控合并到一个链接 (s-watcher) 到 etcd server 的请求. 同时它会广播从 s-watcher 收到的时间到所有的 c-watchers.
- +-------------+
- | etcd server |
- +------+------+
- ^ watch key A (s-watcher)
- |
- +-------+-----+
- | gRPC proxy | <-------+
- | | |
- ++-----+------+ |watch key A (c-watcher)
- watch key A ^ ^ watch key A |
- (c-watcher) | | (c-watcher) |
- +-------+-+ ++--------+ +----+----+
- | client | | client | | client |
- | | | | | |
- +---------+ +---------+ +---------+
如上图所示, 3 个 client 对同一个 key A 的 watcher, 注册到 gRPC proxy 中, gRPC proxy 会合并生成一个 s-watcher 注册到 etcd server.
lease API
lease API 支持续约机制, 客户端通过定时刷新 (heartbean) 来实现续约(v2 和 v3 的实现机制也不一样). 用于集群监控以及服务注册发现.
跟上图类似, 为了减少 etcd server 的交互次数, gRPC proxy 同样提供了合并功能:
- +-------------+
- | etcd server |
- +------+------+
- ^
| heartbeat L1, L2, L3
- | (s-stream)
- v
- +-------+-----+
- | gRPC proxy +<-----------+
- +---+------+--+ | heartbeat L3
- ^ ^ | (c-stream)
heartbeat L1 | | heartbeat L2 |
- (c-stream) v v (c-stream) v
- +------+-+ +-+------+ +-----+--+
- | client | | client | | client |
- +--------+ +--------+ +--------+
如上图所示, 3 个 client 注册到 gRPC proxy 中 (c-stream), 通过心跳(heartbeat) 来定时续约, gRPC proxy 会合并生成一个 s-stream 注册到 etcd server.
缓存请求
gRPC proxy 会缓存来自客户端的请求, 保证 etcd server 频繁的被客户端请求滥用.
1. 源码实现
1.1 startGRPCProxy 解读
- func startGRPCProxy(cmd *cobra.Command, args []string) {
- //1. 校验参数的合法性
- checkArgs()
- //2. 判断是否校验 https
- tlsinfo := newTLS(grpcProxyListenCA, grpcProxyListenCert, grpcProxyListenKey)
- if tlsinfo == nil && grpcProxyListenAutoTLS {
- host := []string{"https://" + grpcProxyListenAddr}
- dir := filepath.Join(grpcProxyDataDir, "fixtures", "proxy")
- autoTLS, err := transport.SelfCert(dir, host)
- if err != nil {
- plog.Fatal(err)
- }
- tlsinfo = &autoTLS
- }
- if tlsinfo != nil {
- plog.Infof("ServerTLS: %s", tlsinfo)
- }
- //3. 生成 cmux 路由
- m := mustListenCMux(tlsinfo)
- //4. grpc cmux
- grpcl := m.Match(cmux.HTTP2())
- defer func() {
- grpcl.Close()
- plog.Infof("stopping listening for grpc-proxy client requests on %s", grpcProxyListenAddr)
- }()
- //5. 生成一个向 etcd 服务 注册的 stream 链接, 前面提到的合并链接也就是它产生的.
- client := mustNewClient()
- //6. 一些性能和资源监控的封装, 如 Prometheus,PProf 等
- srvhttp, httpl := mustHTTPListener(m, tlsinfo, client)
- errc := make(chan error)
- //7. grpc 服务
- go func() { errc <- newGRPCProxyServer(client).Serve(grpcl) }()
- //8. http 服务
- go func() { errc <- srvhttp.Serve(httpl) }()
- //9. cmux serve
- go func() { errc <- m.Serve() }()
- //10. 下面这个代码, 跟上面 srvhttp 有些重复, 只是它把监控信息给通过 grpcProxyMetricsListenAddr 给独立了出来
- if len(grpcProxyMetricsListenAddr)> 0 {
- mhttpl := mustMetricsListener(tlsinfo)
- go func() {
- mux := http.NewServeMux()
- etcdhttp.HandlePrometheus(mux)
- grpcproxy.HandleHealth(mux, client)
- plog.Fatal(http.Serve(mhttpl, mux))
- }()
- }
- // grpc-proxy is initialized, ready to serve
- notifySystemd()
- fmt.Fprintln(os.Stderr, <-errc)
- os.Exit(1)
- }
1.2 服务监控
在上面的代码的第 6 步中和第 10 步都是在监控服务器的性能, 首先在 5 步中生成了一直指向 etcd 服务端口的 client, 然后启动了两个服务:
Prometheus: 监控服务的资源
health: 监控 etcd 服务的健康状态
以上两个服务在第 10 步同样能生成, 只要指定参数 metrics-addr .
大概的效果, 我们分别启动两个服务 $ etcd , 这个服务会默认暴露一个 2379 的服务端口, 然后启动:
./etcd grpc-proxy start --metrics-addr=http://127.0.0.1:6061 --enable-pprof=true
, 这个也会默认的链接 2379 这个端口.
metrics 资源监控
然后我们通过 web 浏览器分别打开
http://127.0.0.1:23790/metrics
和
http://127.0.0.1:6061/metrics
出现两个一样的网页, 大概的内容是:
- # HELP etcd_debugging_disk_backend_commit_rebalance_duration_seconds The latency distributions of commit.rebalance called by bboltdb backend.
- # TYPE etcd_debugging_disk_backend_commit_rebalance_duration_seconds histogram
- etcd_debugging_disk_backend_commit_rebalance_duration_seconds_bucket{le="0.001"} 0
- etcd_debugging_disk_backend_commit_rebalance_duration_seconds_bucket{le="0.002"} 0
- .........
- .....
health 监控(etcd 服务健康状况)
再打开地址
http://127.0.0.1:6061/health
和
http://127.0.0.1:23790/health
, 会显示以下内容:
{"health":"true"}
这个是显示在 --endpoints 或 --discovery-srv 中所指定的 etcd server 是否正常存活. 当终止 etcd 服务之后, 再次调用地址会返回 false.
pprof 调试
上面有个参数是
--enable-pprof=true
, 当指定该参数的时候, 可以打开地址
http://127.0.0.1:23790/debug/pprof/
, 来分析程序性能.
- /debug/pprof/
- profiles:
- 0 block
- 26 goroutine
- 3 heap
- 0 mutex
- 12 threadcreate
- full goroutine stack dump
总结一下, gRPC proxy 默认会在 --listen-addr 监控 etcd 服务的状态是否正常, 同时也可以指定一个 metrics-addr 端口来监控服务. 强调一下, --enable-pprof 这个参数只有在 listen-addr 这个地址打开才有效.
1.3 gRPC proxy 服务
到这个地方, 就到了这个章节最重要的地方, 开始介绍整个 gRPC proxy 所提供的服务 API.
通过第 7 步进入到具体的代码流程中, newGRPCProxyServer, 代码如下:
- func newGRPCProxyServer(lg *zap.Logger, client *clientv3.Client) *grpc.Server {
- if grpcProxyEnableOrdering {
- vf := ordering.NewOrderViolationSwitchEndpointClosure(*client)
- client.KV = ordering.NewKV(client.KV, vf)
- lg.Info("waiting for linearized read from cluster to recover ordering")
- for {
- _, err := client.KV.Get(context.TODO(), "_", clientv3.WithKeysOnly())
- if err == nil {
- break
- }
- lg.Warn("ordering recovery failed, retrying in 1s", zap.Error(err))
- time.Sleep(time.Second)
- }
- }
- if len(grpcProxyNamespace)> 0 {
- client.KV = namespace.NewKV(client.KV, grpcProxyNamespace)
- client.Watcher = namespace.NewWatcher(client.Watcher, grpcProxyNamespace)
- client.Lease = namespace.NewLease(client.Lease, grpcProxyNamespace)
- }
- if len(grpcProxyLeasing)> 0 {
- client.KV, _, _ = leasing.NewKV(client, grpcProxyLeasing)
- }
- kvp, _ := grpcproxy.NewKvProxy(client)
- watchp, _ := grpcproxy.NewWatchProxy(client)
- if grpcProxyResolverPrefix != "" {
- grpcproxy.Register(client, grpcProxyResolverPrefix, grpcProxyAdvertiseClientURL, grpcProxyResolverTTL)
- }
- clusterp, _ := grpcproxy.NewClusterProxy(client, grpcProxyAdvertiseClientURL, grpcProxyResolverPrefix)
- leasep, _ := grpcproxy.NewLeaseProxy(client)
- mainp := grpcproxy.NewMaintenanceProxy(client)
- authp := grpcproxy.NewAuthProxy(client)
- electionp := grpcproxy.NewElectionProxy(client)
- lockp := grpcproxy.NewLockProxy(client)
- server := grpc.NewServer(
- grpc.StreamInterceptor(grpc_prometheus.StreamServerInterceptor),
- grpc.UnaryInterceptor(grpc_prometheus.UnaryServerInterceptor),
- grpc.MaxConcurrentStreams(math.MaxUint32),
- )
- pb.RegisterKVServer(server, kvp)
- pb.RegisterWatchServer(server, watchp)
- pb.RegisterClusterServer(server, clusterp)
- pb.RegisterLeaseServer(server, leasep)
- pb.RegisterMaintenanceServer(server, mainp)
- pb.RegisterAuthServer(server, authp)
- v3electionpb.RegisterElectionServer(server, electionp)
- v3lockpb.RegisterLockServer(server, lockp)
- // set zero values for metrics registered for this grpc server
- grpc_prometheus.Register(server)
- return server
- }
上面主要是通过封装 etcd 的 client 提供各种服务, 当接受到来自用户的请求时, 通过复用 client 连接到 etcd 服务, 后面我们来看一下 gRPC proxy 所封装的各种服务.
这里提一下 grpcProxyEnableOrdering 和 grpcProxyNamespace 两个参数的意义:
grpcProxyEnableOrdering: experimental-serializable-ordering
保证 grpc proxy 的 Revision(版本号)小于或等于 etcd 服务器之间的 Revision(版本号), 后面再解释这个版本号
grpcProxyNamespace:
为所有的 key 请求加上前缀空间
1.4 KvProxy
这个是对 etcd client 的 kv 的封装, 通过对其结构体创建方法
Newc(c *clientv3.Client)
的, 我们可以看出:
- func Newc(c *clientv3.Client) (pb.KVServer, <-chan struct{}) {
- kv := &kvProxy{
- kv: c.KV,
- cache: cache.NewCache(cache.DefaultMaxEntries),
- }
- donec := make(chan struct{})
- close(donec)
- return kv, donec
- }
方法上面通过一个 cache 的封装来缓存客户端请求, 这个正好印证前面所说的 gRPC proxy 可以缓存客户端请求. 我们来具体的看一下这个 cache 是如何处理的.
- type Cache interface {
- //// 添加查询请求到缓存中
- Add(req *pb.RangeRequest, resp *pb.RangeResponse)
- // 从缓存中获取请求结果
- Get(req *pb.RangeRequest) (*pb.RangeResponse, error)
- Compact(revision int64)
- // 判断缓存是否失效
- Invalidate(key []byte, endkey []byte)
- // 缓存长度
- Size() int
- Close()
- }
cache 接口主要提供以上的方法来缓存来子客户端的请求信息, 看一下具体的 cache 类:
- // cache implements Cache
- type cache struct {
- mu sync.RWMutex
- lru *lru.Cache
- // a reverse index for cache invalidation
- cachedRanges adt.IntervalTree
- compactedRev int64
- }
上面有个 lru 的缓存信息(算法为最近最少未使用), 同时实现了一个 IntervalTree(线段树), 用来缓存范围查询, 具体的算法可以查看对应的源码.
总的来说就是在 client 的 kv 上面封装了一层代码, 加上了一层 cache.
1.5 WatchProxy
同上, 这个也是封装了 client 的 watch API, 如上面所示, gRPC proxy 支持 watch 的链接复用,
- type watchProxy struct {
- cw clientv3.Watcher
- ctx context.Context
- leader *leader
- ranges *watchRanges
- // mu protects adding outstanding watch servers through wg.
- mu sync.Mutex
- // wg waits until all outstanding watch servers quit.
- wg sync.WaitGroup
- // kv is used for permission checking
- kv clientv3.KV
- }
上面这个结构体实际上很明显能够告诉我们, 如果 watchProxy 能够复用连接, 那一定是在 watchRanges 中实现的.
我们先看一下 watchProxy.Watch 方法 由于方法太长, 所以我简略的说下步骤:
检查 watchProxy 是否退出
wp.wg.Add(1): watch servers +1
生成一个 watchProxyStream 结构体
再次判断 leader 是否丢失链接
循环判断 watchProxyStream 的 recvLoop 以及 sendLoop 方法
首先我们要知道 leader 的作用是什么?
打开 leader.go 文件中的发现有一个叫 recvLoop() 的方法,** 这个方法的作用实际上就是通过对一个 key(__lostleader)的监视来定时的判断 client 是否失效.**
所以实际上我们的真正的业务逻辑在 watchProxyStream 这个结构体中中.
先来看一下 watchProxyStream.recvLoop()这个方法:
- func (wps *watchProxyStream) recvLoop() error {
- for {
- req, err := wps.stream.Recv()
- if err != nil {
- return err
- }
- switch uv := req.RequestUnion.(type) {
- case *pb.WatchRequest_CreateRequest:
- cr := uv.CreateRequest
- if err = wps.checkPermissionForWatch(cr.Key, cr.RangeEnd); err != nil && err == rpctypes.ErrPermissionDenied {
- // Return WatchResponse which is caused by permission checking if and only if
- // the error is permission denied. For other errors (e.g. timeout or connection closed),
- // the permission checking mechanism should do nothing for preserving error code.
- wps.watchCh <- &pb.WatchResponse{Header: &pb.ResponseHeader{}, WatchId: -1, Created: true, Canceled: true}
- continue
- }
- w := &watcher{
- wr: watchRange{string(cr.Key), string(cr.RangeEnd)},
- id: wps.nextWatcherID,
- wps: wps,
- nextrev: cr.StartRevision,
- progress: cr.ProgressNotify,
- prevKV: cr.PrevKv,
- filters: v3rpc.FiltersFromRequest(cr),
- }
- if !w.wr.valid() {
- w.post(&pb.WatchResponse{WatchId: -1, Created: true, Canceled: true})
- continue
- }
- wps.nextWatcherID++
- w.nextrev = cr.StartRevision
- wps.watchers[w.id] = w
- wps.ranges.add(w)
- case *pb.WatchRequest_CancelRequest:
- wps.delete(uv.CancelRequest.WatchId)
- default:
- panic("not implemented")
- }
- }
- }
首先我们来明确一下 wps.stream 这个流是属于与 gRPC proxy 连接的客户端. 所以当收到来自客户端
WatchRequest_CreateRequest
请求时, 会创建一个 watcher, 同时会在 wps.watchers 以及 wps.ranges 中添加该 watcher, 并且在收到这个请求的时候用
checkPermissionForWatch
会向 etcd server 同时发起一个服务, 判断是否允许接入链接. 在收到来自 etcd server 的正确回答之后, 会在 ranges 中 add 这个方法在加载这个 watcher 之后同样会向 etcd server 发起请求, 并且得到应答之后会广播, 这里面实现了一个 broadcast 的结构体用来做广播.
上面说的这个 ranges 实际上是来源于 watchProxy 这个结构体, 而 watchers 来源于同一个 stream 中, 这说明, 对于客户端来说, 它同样可以复用 stream 流来处理 watch.
我们总结一下, 对一来自于同一个客户端的的 watch 是它的 stream 可以复用, 不同客户端的链接都会被同一链接复用, gRPC 只有在收到来之客户端 stream 的
WatchRequest_CreateRequest
请求的时候才会向 etcd server 发起请求.
1.6 lease proxy
我门先来看一下 lease proxy 的源码:
- type leaseProxy struct {
- // leaseClient handles req from LeaseGrant() that requires a lease ID.
- leaseClient pb.LeaseClient
- lessor clientv3.Lease
- ctx context.Context
- leader *leader
- // mu protects adding outstanding leaseProxyStream through wg.
- mu sync.RWMutex
- // wg waits until all outstanding leaseProxyStream quit.
- wg sync.WaitGroup
- }
实际上大致的内容大同小异, leader 的作用跟上面类似, leaseClient 和 lessor 继承于 clientv3, 后面介绍. 在接收到 lease 请求的时候, 会生成一个 leaseProxyStream 结构体, 这个结构体有三个方法 recvLoop 和 sendLoop 和上面类似, 同样有一个 keepAliveLoop, 该方法是 核心方法, 它是通过的一个 TTL 的时间定时去跟 etcd server 续约.
1.6 其它 proxy
gRPC proxy 基本上实现 etcd client 的那一套 API, 通过对 clientv3.Client 的封装.
2 使用场景
gRPC proxy 通过对 etcd client 的封装, 实现了与 etcd 连接, 不同客户端请求复用同一个 client, 其目的是为了加少 etcd server 的负载. 这个目的也决定了 proxy 的使用场景, 即降低 etcd 负载.
来源: https://www.cnblogs.com/songjingsong/p/9232670.html