SchedulerCache 是 kubernetes scheduler 中负责本地数据缓存的核心数据结构, 其实现了 Cache 接口, 负责存储从 apiserver 获取的数据, 提供给 Scheduler 调度器获取 Node 的信息, 然后由调度算法的决策 pod 的最终 node 节点, 其中 Snapshot 和节点打散算法非常值得借鉴
设计目标
数据感知
SchedulerCache 的数据从 apiserver 通过网络感知, 其数据的同步一致性主要是通过 kubernetes 中的 Reflector 组件来负责保证, SchedulerCache 本身就是一个单纯数据的存储
Snapshot 机制
当 scheduler 获取一个待调度的 pod, 则需要从 Cache 中获取当前集群中的快照数据(当前此时集群中 node 的统计信息), 用于后续调度流程中使用
节点打散
节点打散主要是指的调度器调度的时候, 在满足调度需求的情况下, 为了保证 pod 均匀分配到所有的 node 节点上, 通常会按照逐个 zone 逐个 node 节点进行分配, 从而让 pod 节点打散在整个集群中
过期删除
Scheduler 进行完成调度流程的决策之后, 为 pod 选择了一个 node 节点, 此时还未进行后续的 Bind 操作, 但实际上资源已经分配给该 pod, 此时会先更新到本地缓存(), 然后再等待 apiserver 进行数据的广播并且最终被 kubelet 来进行实际的调度
但如果因为某些原因导致 pod 后续的事件都没有被监听到, 则需要将对应的 pod 资源进行删除, 并删除对 node 资源的占用
cache 内部 pod 状态机
在 scheduler cache 中 pod 会一个内部的状态机: initial,Assumed,Expired,Added,Delete, 实际上所有的操作都是围绕着该状态机在进行, 状态如下:
Initial: 初始化完成从 apiserver 监听到(也可能是监听到一个已经完成分配的 pod)
Assumed: 在 scheduler 中完成分配最终完成 bind 操作的 pod(未实际分配)
Added: 首先监听到事件可能是一个已经完成实际调度的 pod(即从 initial 到 Added), 其次可能是经过调度决策后, 被实际调度(从 Assumed 到 Added), 最后则是后续 pod 的更新(Update), Added 语义上其实就是往 Cache 中添加一个 Pod 状态
Deleted: 某个 pod 被监听到删除事件, 只有被 Added 过的数据才可以被 Deleted
Expired: Assumed pod 经过一段时间后没有感知到真正的分配事件被删除
源码实现
数据结构
- type schedulerCache struct {
- stop <-chan struct{}
- ttl time.Duration
- period time.Duration
- // 保证数据的安全
- mu sync.RWMutex
- // 存储假定 pod 的信息集合, 经过 scheduler 调度后假定 pod 被调度到某些节点, 进行本地临时存储
- // 主要是为了进行 node 资源的占用, 可以通过 key 在 podStats 查找到假定的 pod 信息
- assumedPods map[string]bool
- // pod 的状态
- podStates map[string]*podState
- // 存储 node 的映射
- nodes map[string]*nodeInfoListItem
- csiNodes map[string]*storagev1beta1.CSINode
- // node 信息的链表, 按照最近更新时间来进行连接
- headNode *nodeInfoListItem
- // 存储 node,zone 的映射信息
- nodeTree *NodeTree
- // 镜像信息
- imageStates map[string]*imageState
- }
Snapshot 机制
数据结构
Snapshot 数据结构主要负责存储当前集群中的 node 信息, 并且通过 Generation 记录当前更新的最后一个周期
- type Snapshot struct {
- NodeInfoMap map[string]*NodeInfo
- Generation int64
- }
Snapshot 的创建与更新
创建主要位于 kubernetes/pkg/scheduler/core/generic_scheduler.go, 实际上就是创建一个空的 snapshot 对象
nodeInfoSnapshot: framework.NodeInfoSnapshot(),
数据的更新则是通过 snapshot 方法来调用 Cache 的更新接口来进行更新
- func (g *genericScheduler) snapshot() error {
- // Used for all fit and priority funcs.
- return g.cache.UpdateNodeInfoSnapshot(g.nodeInfoSnapshot)
- }
借助 headNode 实现增量标记
随着集群中 node 和 pod 的数量的增加, 如果每次都全量获取 snapshot 则会严重影响调度器的调度效率, 在 Cache 中通过一个双向链表和 node 的递增计数 (etcd 实现) 来实现增量更新
- func (cache *schedulerCache) UpdateNodeInfoSnapshot(nodeSnapshot *schedulernodeinfo.Snapshot) error {
- cache.mu.Lock()
- defer cache.mu.Unlock()
- balancedVolumesEnabled := utilfeature.DefaultFeatureGate.Enabled(features.BalanceAttachedNodeVolumes)
- // 获取当前 snapshot 的 Genration
- snapshotGeneration := nodeSnapshot.Generation
- // 遍历双向链表, 更新 snapshot 信息
- for node := cache.headNode; node != nil; node = node.next {
- if node.info.GetGeneration() <= snapshotGeneration {
- // 所有 node 信息都更新完毕
- break
- }
- if balancedVolumesEnabled && node.info.TransientInfo != nil {
- // Transient scheduler info is reset here.
- node.info.TransientInfo.ResetTransientSchedulerInfo()
- }
- if np := node.info.Node(); np != nil {
- nodeSnapshot.NodeInfoMap[np.Name] = node.info.Clone()
- }
- }
- // 更新 snapshot 的 genration
- if cache.headNode != nil {
- nodeSnapshot.Generation = cache.headNode.info.GetGeneration()
- }
- // 如果 snapshot 里面包含过期的 pod 信息则进行清理工作
- if len(nodeSnapshot.NodeInfoMap)> len(cache.nodes) {
- for name := range nodeSnapshot.NodeInfoMap {
- if _, ok := cache.nodes[name]; !ok {
- delete(nodeSnapshot.NodeInfoMap, name)
- }
- }
- }
- return nil
- }
- nodeTree
nodeTree 主要负责节点的打散, 用于让 pod 均匀分配在多个 zone 中的 node 节点上
2.3.1 数据结构
- type NodeTree struct {
- tree map[string]*nodeArray // 存储 zone 和 zone 下面的 node 信息
- zones []string // 存储 zones
- zoneIndex int
- numNodes int
- mu sync.RWMutex
- }
其中 zones 和 zoneIndex 主要用于后面的节点打散算法使用, 实现按 zone 逐个分配
nodeArray
nodeArray 负责存储一个 zone 下面的所有 node 节点, 并且通过 lastIndex 记录当前 zone 分配的节点索引
- type nodeArray struct {
- nodes []string
- lastIndex int
- }
添加 node
添加 node 其实很简单, 只需要获取对应 node 的 zone 信息, 然后加入对应 zone 的 nodeArray 中
- func (nt *NodeTree) addNode(n *v1.Node) {
- // 获取 zone
- zone := utilnode.GetZoneKey(n)
- if na, ok := nt.tree[zone]; ok {
- for _, nodeName := range na.nodes {
- if nodeName == n.Name {
- klog.Warningf("node %q already exist in the NodeTree", n.Name)
- return
- }
- }
- // 吧节点加入到 zone 中
- na.nodes = append(na.nodes, n.Name)
- } else {
- // 新加入 zone
- nt.zones = append(nt.zones, zone)
- nt.tree[zone] = &nodeArray{nodes: []string{n.Name}, lastIndex: 0}
- }
- klog.V(2).Infof("Added node %q in group %q to NodeTree", n.Name, zone)
- nt.numNodes++
- }
数据打散算法
数据打散算法很简单, 首先我们存储了 zone 和 nodeArray 的信息, 然后我们只需要通过两个索引 zoneIndex 和 nodeIndex 就可以实现节点的打散操作, 只有当当前集群中所有 zone 里面的所有节点都进行一轮分配后, 然后重建分配索引
- func (nt *NodeTree) Next() string {
- nt.mu.Lock()
- defer nt.mu.Unlock()
- if len(nt.zones) == 0 {
- return ""
- }
- // 记录分配完所有 node 的 zone 的计数, 用于进行状态重置
- // 比如有 3 个 zone: 则当 numExhaustedZones=3 的时候, 就会重新从头开始进行分配
- numExhaustedZones := 0
- for {
- if nt.zoneIndex>= len(nt.zones) {
- nt.zoneIndex = 0
- }
- // 按照 zone 索引来进行逐个 zone 分配
- zone := nt.zones[nt.zoneIndex]
- nt.zoneIndex++
- // 返回当前 zone 下面的 next 节点, 如果 exhausted 为 True 则表明当前 zone 所有的节点, 在这一轮调度中都已经分配了一次
- // 就需要从下个 zone 继续获取节点
- nodeName, exhausted := nt.tree[zone].next()
- if exhausted {
- numExhaustedZones++
- // 所有的 zone 下面的 node 都被分配了一次, 这里进行重置, 从头开始继续分配
- if numExhaustedZones>= len(nt.zones) { // all zones are exhausted. we should reset.
- nt.resetExhausted()
- }
- } else {
- return nodeName
- }
- }
- }
重建索引
重建索引则是将所有 nodeArray 的索引和当前 zoneIndex 进行归零
- func (nt *NodeTree) resetExhausted() {// 重置索引
- for _, na := range nt.tree {
- na.lastIndex = 0
- }
- nt.zoneIndex = 0
- }
数据过期清理
数据存储
Cache 要定时将之前在经过本地 scheduler 分配完成后的假设的 pod 的信息进行清理, 如果这些 pod 在给定时间内仍然没有感知到对应的 pod 真正的添加事件则就这些 pod 删除
assumedPods map[string]bool
后台定时任务
默认每 30s 进行清理一次
- func (cache *schedulerCache) run() {
- go wait.Until(cache.cleanupExpiredAssumedPods, cache.period, cache.stop)
- }
清理逻辑
清理逻辑主要是针对那些已经完成绑定的 pod 来进行, 如果一个 pod 完成了在 scheduler 里面的所有操作后, 会有一个过期时间, 当前是 30s, 如果超过该时间即 deadline 小于当前的时间就删除该 pod
- // cleanupAssumedPods exists for making test deterministic by taking time as input argument.
- func (cache *schedulerCache) cleanupAssumedPods(now time.Time) {
- cache.mu.Lock()
- defer cache.mu.Unlock()
- // The size of assumedPods should be small
- for key := range cache.assumedPods {
- ps, ok := cache.podStates[key]
- if !ok {
- panic("Key found in assumed set but not in podStates. Potentially a logical error.")
- }
- // 未完成绑定的 pod 不会被进行清理
- if !ps.bindingFinished {
- klog.V(3).Infof("Couldn't expire cache for pod %v/%v. Binding is still in progress.",
- ps.pod.Namespace, ps.pod.Name)
- continue
- }
- // 在完成 bind 之后会设定一个过期时间, 目前是 30s, 如果 deadline 即 bind 时间 + 30s 小于当前时间就过期删除
- if now.After(*ps.deadline) {
- klog.Warningf("Pod %s/%s expired", ps.pod.Namespace, ps.pod.Name)
- if err := cache.expirePod(key, ps); err != nil {
- klog.Errorf("ExpirePod failed for %s: %v", key, err)
- }
- }
- }
- }
清理 pod
清理 pod 主要分为如下几个部分:
1. 对应 pod 假定分配 node 的信息
2. 清理映射的 podState 信息
- func (cache *schedulerCache) expirePod(key string, ps *podState) error {
- if err := cache.removePod(ps.pod); err != nil {
- return err
- }
- delete(cache.assumedPods, key)
- delete(cache.podStates, key)
- return nil
- }
设计总结
核心数据结构数据流如上所示, 其核心是通过 nodes,headNode 实现一个 Snapshot 为调度器提供当前系统资源的快照, 并通过 nodeTree 进行 node 节点的打散, 最后内部通过一个 pod 的状态机来进行系统内部的 pod 资源状态的转换, 并通过后台的定时任务来保证经过经过 Reflector 获取的数据的最终一致性(删除那些经过 bind 的但是却没被实际调度或者事件丢失的 pod), 借助这些其实一个最基础的工业级调度器的本地 cache 功能就实现了
微信号: baxiaoshi2020
关注公告号阅读更多源码分析文章
更多文章关注 http://www.sreguide.com/
来源: https://www.cnblogs.com/buyicoding/p/12190532.html