总结来说, 微服务化要实现的服务注册发现, 负载均衡是微服务框架要实现的基本功能根据负载均衡的位置可以分为集中式 LB 进程内 LB 和独立 LB 进程本节的图转自博客 gRPC 服务发现 & 负载均衡 这种集中式 lb 服务消费方 invoke 独立 LB, 通常是 HAproxy 之类的软件或专门的硬件它主要的问题是单点问题, 所有流量都经过 LB, 容易成为瓶颈和发生单点故障
记得挺早之前我们平台的服务发现就是一套基于 etcd 的注册发现, 通过 haproxy 做负载均衡的模式
与之对应的是进程内 lb, 它把 LB 和服务发现的能力分散到了每个服务消费者的进程内部, 同时服务之间是直接调用, 性能较好这就是现在很多微服务框架的客户端负载均衡模式, 它需要一个注册中心, 服务提供者向其注册并保持心跳, 客户端 sdk 从注册中心缓存目标服务列表它的问题是绑定了 sdk, 有一定开发成本和升级维护成本
针对进程内 lb 的问题, 这种 sideCar 模式的微服务框架把 sdk 作为一个独立的进程, 它不需要应用程序绑定技术栈, 或某种开发语言, 也不需要修改代码, 通过 proxy 的方式通过 lb invoke 服务提供方这种就是现在的 service mesh 模式
gRPC-Balancer
gPRC 没有提供 Balancer 的实现, 但给出了接口基本原理是 gRPC 的 client 端和 server 端为每个 target 维护了 clientConn , 在没有 Balancer 时这个 clientConn 就只有一个 address 和对应的一个 conn 当有 Balancer 时 Balancer 将给出 target 对应的地址列表 clientConn 将维护对应每个地址和其对应的 conn
Balancer 根据 Resolver 去 Watch 具体 target 的地址信息变化, 并通知 clientConn 更新它的缓存 这是通过 Balancer 的 Start&Notify 方法实现的, 它初始化 Resolver, 并开始 Watch 注册中心, Watch 的事件将用来更新 Balancer 自身的 addr 缓存, 并通过 Notify 通知 clientConn 更新整个连接池的缓存
每次调用时 Balancer 通过特定的策略给出具体地址, client 从 conns 中找到对应的 conn 去 transport 这时通过 Balancer 的 Get 方法实现的, Get 方法从已建立好的 Balancer 的 addrs 中根据不同的 lb 策略选择这次的 addr 返回
Balancer 在缓存中维护地址列表, 并提供接口给 clientConn 根据连接状态, 更新地址列表中的地址状态
- type ClientConn struct {
- target string
- conns map[Address]*addrConn
- }
- type Balancer interface {
- Start(target string, config BalancerConfig) error
- Up(addr Address) (down func(error))
- Get(ctx context.Context, opts BalancerGetOptions)(addr Address, putfunc(),errerror)
- Notify()<-chan[]Address
- Close()error
- }
总结下来就是 Balander 需要域名解析 Resolver 提供的 Watcher, 还需要负载均衡策略 policy(selector)如图所示, gRPC 没有提供 Resolver 的实现, 就是没有提供服务注册发现监听的实现, 需要开发者结合 etcd consul 等注册中心自己实现该接口 lb 策略 gRPC 有个 roundRobin 的实现可供参考
ClientConn & Balancer
简单总结 gRPC 客户端在 Dial 时负载均衡相关操作 Dial 的过程会返回 ClientConn , 其中重要的数据成员是维护了一组对应 target 地址的连接
conns map[Address]*addrConn
在 Dial 中开启 goroutine 完成了 balancer 的 Start, 即启动 resolver 并开始 watchUpdates 同时开始 lbWatcher , 开始维护 clientconn 的连接池
- go func() {
- defer close(waitC)
- if cc.dopts.balancer != nil {
- err := cc.dopts.balancer.Start(target, config)
- ch := cc.dopts.balancer.Notify()
- if ch != nil {
- // if not block
- go cc.lbWatcher(nil)
- return
- }
- }
- }()
Notify 返回的 ch 是 balacer 提供给外部的全量 addrs,lbWatcher 每次都对比 addrs 和目前自己维护的 clientconn, 新增的重新建立连接 resetAddrConn, 删掉的 tearDownConn
- func (cc *ClientConn)lbWatcher(doneChanchan struct{}) {
- for addrs := range cc.dopts.balancer.Notify() {
- var (
- add []Address // Addresses need to setup connections.
- del []*addrConn // Connections need to tear down.
- )
- cc.mu.Lock()
- for _, a := range addrs {
- if _, ok := cc.conns[a]; !ok {
- add = append(add, a)
- }
- }
- for k, c := range cc.conns {
- var keep bool
- for _, a := range addrs {
- if k == a {
- keep = true
- break
- }
- }
- if !keep {
- del = append(del, c)
- delete(cc.conns, c.addr)
- }
- }
- cc.mu.Unlock()
- for _, a := range add {
- cc.resetAddrConn(a, false, nil)
- }
- for _, c := range del {
- c.tearDown(errConnDrain)
- }
- }
- }
最后在 clientConn 获取 getTransport 时, 通过 balancer.Get 可以获取满足 lb 策略的 addr, 并最终在 ClientConn 维护的 conn 缓存中获取对应的 conn
Start
gRPC 给了一个 RoundRobin 的实现, 其中 addrs 是所有客户端要连接的地址 addrCh 是用来通知 gRPC 内部 client 地址列表的 chan, 也就是上面 Notify() 返回的 chanwaitCh 在无地址可连接时 blocknext 指向 Get 方法返回的 addrs id
- type roundRobin struct {
- r naming.Resolver
- w naming.Watcher
- addrs []*addrInfo
- mu sync.Mutex
- addrCh chan []Address
- next int // index of the next address to return for Get()
- waitCh chan struct{}
- done bool // The Balancer is closed.
- }
start 方法通过 Resolve 获取对应 target 的 watcher, 并开始 watchAddrUpdates watcher 的 Next 方法在没有事件时 block, 一旦有 updates 产生, balancer 将按事件类型更新本地的 addrs 缓存, 并把全部 addrs 发到 addrCh 上, 这个 chan 通过 Notify 这个 Balancer 的方法对外提供读取
slice copy 的用法是 slice 长度较小的决定, copy 是删除 slice 元素的一个简单方法
- func (rr *roundRobin)Start(targetstring, config BalancerConfig)error {
- rr.mu.Lock()
- defer rr.mu.Unlock()
- w, err := rr.r.Resolve(target)
- if err != nil {
- return err
- }
- rr.w = w
- rr.addrCh = make(chan []Address)
- go func() {
- for {
- if err := rr.watchAddrUpdates(); err != nil {
- return
- }
- }
- }()
- return nil
- }
- func (rr *roundRobin)watchAddrUpdates()error {
- updates, err := rr.w.Next()
- // lock...
- for _, update := range updates {
- addr := Address{
- Addr: update.Addr,
- Metadata: update.Metadata,
- }
- switch update.Op {
- case naming.Add:
- var exist bool
- for _, v := range rr.addrs {
- if addr == v.addr {
- exist = true
- break
- }
- }
- if exist {
- continue
- }
- rr.addrs = append(rr.addrs, &addrInfo{addr: addr})
- case naming.Delete:
- for i, v := range rr.addrs {
- if addr == v.addr {
- copy(rr.addrs[i:], rr.addrs[i+1:])
- rr.addrs = rr.addrs[:len(rr.addrs)-1]
- break
- }
- }
- }
- }
- open := make([]Address, len(rr.addrs))
- for i, v := range rr.addrs {
- open[i] = v.addr
- }
- rr.addrCh <- open
- return nil
- }
gRPC 并没有提供 Resolver 的实现, 只是给出了它的接口可以外部通过对接 etcd consul 等注册中心来实现 resolver 接口, 它返回对特定 target 的 Watcher,watcher 的 Next 给的是一次增量事件, 需要 balancer 自己解析事件类型和维护本地的 addrs 列表
- type Resolver interface {
- Resolve(target string) (Watcher, error)
- }
- type Watcher interface {
- Next() ([]*Update, error)
- Close()
- }
- Up Notify Close
up 方法是用来 set 连接状态的下面节选了一段 resetTransport 的实现, balancer 的主要作用是在 clientconn 维护连接池时, 同步的更新 balancer 维护的 addrs 的连接状态, 即是否 connected, 并且返回将 addrs 状态设为非连接的方法 ac.down
- func (ac *addrConn)resetTransport(closeTransportbool)error {
- for retries := 0; ; retries++ {
- newTransport, err := transport.NewClientTransport(ctx, sinfo, ac.dopts.copts)
- ac.transport = newTransport
- if ac.cc.dopts.balancer != nil {
- ac.down = ac.cc.dopts.balancer.Up(ac.addr)
- }
- ac.mu.Unlock()
- return nil
- }
- }
- func (rr *roundRobin)Up(addr Address)func(error) {
- rr.mu.Lock()
- defer rr.mu.Unlock()
- var cnt int
- for _, a := range rr.addrs {
- if a.addr == addr {
- if a.connected {
- return nil
- }
- a.connected = true
- }
- if a.connected {
- cnt++
- }
- }
- // addr is only one which is connected. Notify the Get() callers who are blocking.
- if cnt == 1 && rr.waitCh != nil {
- close(rr.waitCh)
- rr.waitCh = nil
- }
- return func(err error) {
- rr.down(addr, err)
- }
- }
Notify 就是把 watchAddrUpdates 维护的全量 adrrs 返回给 lbWatcher, 供上层维护 clientconnClose 会关掉 addrch 和 waitch, 同时要关掉 resolve 的 watcher
- func (rr *roundRobin)Notify()<-chan[]Address {
- return rr.addrCh
- }
- Get
Get 方法按负载均衡策略返回 balancer 某个状态为 connected 的 addr,gRPC 给了个 rouncrobin 的例子, 通过 next 这个 index 来确定每次连接的 addr 如果没有 addr, 可以不 block, 直接返回导致调用失败, 也可以利用 wiatCh 来做 for-select, 直到上层在连接恢复时调用 Up, 可以 close(rr.waitCh)唤醒这个 block, 代码省略
- func (rr *roundRobin)Get(ctx context.Context, opts BalancerGetOptions)(
- addr Address, putfunc(),errerror) {
- var ch chan struct{}
- rr.mu.Lock()
- if len(rr.addrs)> 0 {
- if rr.next>= len(rr.addrs) {
- rr.next = 0
- }
- next := rr.next
- for {
- a := rr.addrs[next]
- next = (next + 1) % len(rr.addrs)
- if a.connected {
- addr = a.addr
- rr.next = next
- rr.mu.Unlock()
- return
- }
- if next == rr.next {
- break
- }
- }
- }
- if !opts.BlockingWait {
- // Returns the next addr on rr.addrs for failfast RPCs.
- }
- // Wait on rr.waitCh for non-failfast RPCs.
- }
- Selector
总结完 Balancer 的使用和基本实现, 因为负载均衡策略和 Resolver 可以按接口自行扩展, 所有将这两部分单独说说, resolver 的实现下一节写服务注册中心再说吧, 这个库 grpc-lb 可以参考它的 rouncrobin 就是 gRPC 的实现, 只是把 balancer 内部跟策略相关的重新封装成了 Selector, 在此基础上重写了 Get 方法实现了几种负载均衡策略
- type Selector interface {
- Add(addr grpc.Address) error
- Delete(addr grpc.Address) error
- Up(addr grpc.Address) (cnt int, connected bool)
- Down(addr grpc.Address) error
- AddrList() []grpc.Address
- Get(ctx context.Context) (grpc.Address, error)
- Put(addr string) error
- }
其中 AddDelete 用来 watchAddrUpdates 时更新 selector 缓存 AddList 用于给 Notify 返回全量 addrsUpDown 用来更新 addr 的 connected 状态 Get 用来给不同 lb-selector 实现选择算法这里的 Put 是作者给每个 addr 的负载 load 信息自减的方法详见 grpc-lb
random
在 gRPC 的 Selector 中包含实现 Selector 接口的 baseSelector 要考虑当前 connected 的状态, 去除这些跟框架相关的代码后, 就是
rand.Int() % len(addrs)
这里通常都要初始化随机种子, 否则每次获取的随机值都一样, 通常用当前时间来初始化一下种子
rand.Seed(time.Now().UnixNano())
, 保证种子的随机
官网文档里有提示 Seed should not be called concurrently with any other Rand method. 如果并发的 seed rand.Seed 踩坑 提到会导致 goroutine 暴增
- func init() { rand.Seed(time.Now().UnixNano()) }
- func Random(addrs []Address)(Address, error) {
- if len(addrs) == 0 {
- return nil, ErrNoneAvailable
- }
- k := rand.Int() % len(addrs)
- return addrs[k], nil
- }
- roundrobin
轮询就是通过维护一个 next 作为 index, 每次 + 1 完成的代码例子经过调整, 省去了很多, 在实际的例子中要注意 mux 的使用, 尽量使加锁范围小还有问题是从 balancer 的 watchUpdates 而获取的 addrs 列表动态变化, 缓存维护了 connected 状态可以尽量避免访问到已经 teardown 的实例, 但仍然要提供容错和 backOff 机制
- var next int
- func RoundRobin(addrs []Address, metadatainterface{})(Address, error) {
- var mtx sync.Mutex
- if len(addrs) == 0 {
- return nil, ErrNoneAvailable
- }
- mtx.Lock()
- addr := addrs[next%len(addrs)]
- next++
- mtx.Unlock()
- return addr, nil
- }
- latency or leastaction
根据 client 端与 server 端实际连接动态调整 addr 的策略 leastaction 需要额外统计当前与每个服务实例的连接数, 选择服务实例连接数最少的访问 latency 要计算与每个服务实例的平均时延, 选择平均时延最小的实例访问
go-chassis 有类似实现, 主要是通过 transport 统计 latency, 由 latency_strategy 管理和保存平均时延
consistent hash
这个以后再写一篇单独的分析吧, 这里简单总结个大概一致性 hash 是要解决当节点数量变化时, 原有的数据来源 hash 到的节点 (cache) 基本不一致 (失效) 的问题, 通过一致性 hash 希望将节点变化时, 数据迁移量降到最低
基本原理是 hash ring, 即将节点 node 本身也 hash 到环上, 通过数据和节点的 hash 相对位置来决定数据归属, 因此当有新 node 加入时只有一部分的数据迁移但事实上, 这样的一致性 hash 导致数据分布不均匀, 因为 node 在 hash ring 上的分布不均匀分布不均匀的问题通过引入虚拟节点来解决, 虚拟节点是均匀分布在 ring 上的, 数据做两次 match, 最终到实际节点上这样来保证数据分布的均匀性
groupcache 有个 consistenthash 的实现, royhunter 有个带虚拟节点的实现以 groupcache 为例, 默认 HashFunc 是 crc32.ChecksumIEEE keys 相当于是 hashRing ,replicas 相当于虚拟节点
- type Hash func(data []byte)uint32
- typeMapstruct {
- hash Hash
- replicas int
- keys []int // Sorted
- hashMap map[int]string
- }
在添加节点时, 为每个节点创建 replica 个虚拟节点, 并计算虚拟节点的 hash 值存入 hashring , 也就是 keys 这个 slice 中, 同时把这些虚拟节点的 hash 值与 node 的对应关系保存在 hashMap 最后给 keys 排个序, 就像在环上分布, 顺时针递增一样
- func(m * Map) Add(keys...string) {
- for _,
- key: =range keys {
- for i: =0;
- i <m.replicas;
- i++{
- hash: =int(m.hash([] byte(strconv.Itoa(i) + key))) m.keys = append(m.keys, hash) m.hashMap[hash] = key
- }
- }
- sort.Ints(m.keys)
- }
Get 方法是获取数据对应的节点, 相当于负载均衡中源 ip 对应到哪个节点或哪个 cache 计算数据的 hash, 并在 hashRing 上二分查找第一个大于 hash 的虚拟节点, 也就通过 hashMap 找到了对应的真实节点
- func(m * Map) Get(keystring) string {
- if m.IsEmpty() {
- return ""
- }
- hash: =int(m.hash([] byte(key)))
- // Binary search for appropriate replica.
- idx: =sort.Search(len(m.keys), func(iint) bool {
- return m.keys[i]>= hash
- })
- // Means we have cycled back to the first replica.
- if idx == len(m.keys) {
- idx = 0
- }
- return m.hashMap[m.keys[idx]]
- }
来源: http://www.tuicool.com/articles/nI7nquv