预选流程
predicate 的并发
一个 node 的 predicate
predicates 的顺序
单个 predicate 执行过程
具体的 predicate 函数
本系列文章已经开源到 GitHub:
1. 预选流程
predicate 过程从 pkg/scheduler/core/generic_scheduler.go:389 findNodesThatFit()方法就算正式开始了, 这个方法根据给定的 predicate functions 过滤所有的 nodes 来寻找一堆可以跑 pod 的 node 集. 老规矩, 我们来看主干代码:
- pkg/scheduler/core/generic_scheduler.go:389
- func (g *genericScheduler) findNodesThatFit(pod *v1.Pod, nodes []*v1.Node) ([]*v1.Node, FailedPredicateMap, error) {
- checkNode := func(i int) {
- fits, failedPredicates, err := podFitsOnNode(
- //......
- )
- if fits {
- length := atomic.AddInt32(&filteredLen, 1)
- filtered[length-1] = g.cachedNodeInfoMap[nodeName].Node()
- }
- }
- workqueue.ParallelizeUntil(ctx, 16, int(allNodes), checkNode)
- if len(filtered)> 0 && len(g.extenders) != 0 {
- for _, extender := range g.extenders {
- // Logic of extenders
- }
- }
- return filtered, failedPredicateMap, nil
- }
如上, 删的有点多, 大家也可以看一下原函数然后对比一下, 看看我为什么只保留这一点. 从上面代码中我们可以发现, 最重要的是一个子函数调用过程 fits, failedPredicates, err := podFitsOnNode(), 这个函数的参数我没有贴出来, 下面会详细讲; 下半部分是一个 extender 过程, extender 不影响对 predicate 过程的理解, 我们后面专门当作一个主题讲. 所以这里的关注点是 podFitsOnNode()函数.
2. predicate 的并发
进入 podFitsOnNode()函数逻辑之前, 我们先看一下调用到 podFitsOnNode()函数的匿名函数变量 checkNode 是怎么被调用的:
- pkg/scheduler/core/generic_scheduler.go:458
- workqueue.ParallelizeUntil(ctx, 16, int(allNodes), checkNode)
ParallelizeUntil()函数是用于并行执行 N 个独立的工作过程的, 这个逻辑写的挺有意思, 我们看一下完整的代码(这段的分析思路写到注释里哦):
- vendor/k8s.io/client-go/util/workqueue/parallelizer.go:38
- func ParallelizeUntil(ctx context.Context, workers, pieces int, doWorkPiece DoWorkPieceFunc) {
- // 从形参列表看, 需要关注的有 workers 和 pieces 两个数字类型的参数, doworkPiece 这个函数类型的参数
- // DoWorkPieceFunc 类型也就是 func(piece int)类型
- // 注意到上面调用的时候 workers 的实参是 16,pieces 是 allNodes, 也就是 node 数量
- var stop <-chan struct{}
- if ctx != nil {
- stop = ctx.Done()
- }
- // 这里定义 toProcess 的容量和 pieces 相等, 也就是和 node 数量相等
- toProcess := make(chan int, pieces)
- for i := 0; i <pieces; i++ {
- // 假设有 100 个 node, 那么这里就写了 100 个数到 toProcess 里
- toProcess <- i
- }
- // 关闭了一个有缓存的 channel
- close(toProcess)
- // 如果 pieces 数量比较少, 也就是说假设 node 只有 10 个, 那么 workers 就赋值为 10 个
- // 到这里差不多可以猜到 worker 是并发工作数, 当 node 大于 16 时并发是 16, 当 node 小于 16 时并法数就是 node 数
- if pieces < workers {
- workers = pieces
- }
- wg := sync.WaitGroup{}
- wg.Add(workers)
- // 要批量开 goroutine 了
- for i := 0; i < workers; i++ {
- // 如果 100 个 node, 这里时 16; 如果是 10 个 node, 这里是 10
- go func() {
- defer utilruntime.HandleCrash()
- defer wg.Done()
- for piece := range toProcess {
- // 从 toProcess 中拿一个数, 举个例子, 假如现在并发是 10, 那么 toProcess 里面存的数据其实
- // 也是 10 个, 也就是 1 个 goroutine 拿到 1 个数, 开始了一个下面的 default 逻辑;
- // 假设并发数是 16,node 数是 100, 这时候 toProcess 里面也就是 100 个数,
- // 这时候就是 16 个 "消费者" 在消耗 100 个数. 当然每拿到一个数需要执行到一次下面的 default
- select {
- case <-stop:
- return
- default:
- // 第 piece 个节点被 doWorkPiece 了;
- // 对应调用过程也就是 checkNode 函数传入了一个整型参数 piece
- doWorkPiece(piece)
- }
- }
- }()
- }
- wg.Wait()
- }
回想一下前面的 checkNode := func(i int){......}, 上面的 doWorkPiece(piece)也就是调用到了这里的这个匿名函数 func(i int){......}; 到这里就清楚如何实现并发执行多个 node 的 predicate 过程了.
3. 一个 node 的 predicate
checkNode 的主要逻辑就是上面介绍的并发加上下面这个 podFitsOnNode()函数逻辑:
- pkg/scheduler/core/generic_scheduler.go:425
- fits, failedPredicates, err := podFitsOnNode(
- pod,
- meta,
- g.cachedNodeInfoMap[nodeName],
- g.predicates,
- nodeCache,
- g.schedulingQueue,
- g.alwaysCheckAllPredicates,
- equivClass,
- )
我们从 podFitsOnNode()的函数定义入手:
- pkg/scheduler/core/generic_scheduler.go:537
- func podFitsOnNode(
- pod *v1.Pod,
- meta algorithm.PredicateMetadata,
- info *schedulercache.NodeInfo,
- predicateFuncs map[string]algorithm.FitPredicate,
- nodeCache *equivalence.NodeCache,
- queue internalqueue.SchedulingQueue,
- alwaysCheckAllPredicates bool,
- equivClass *equivalence.Class,
- ) (bool, []algorithm.PredicateFailureReason, error)
关于这个函数的逻辑, 注释里的描述翻译过来大概是这个意思:
podFitsOnNode()函数检查一个通过 NodeInfo 形式给定的 node 是否满足指定的 predicate functions. 对于给定的一个 Pod,podFitsOnNode()函数会检查是否有某个 "等价的 pod" 存在, 然后重用那个等价 pod 缓存的 predicate 结果. 这个函数的调用入口有 2 处: Schedule and Preempt.
当从 Schedule 进入时: 这个函数想要测试 node 上所有已经存在的 pod 外加被指定将要调度到这个 node 上的其他所有高优先级 (优先级不比自己低, 也就是>=) 的 pod 后, 当前 pod 是否可以被调度到这个 node 上.
当从 Preempt 进入时: 后面讲 preempt 时再详细分析.
podFitsOnNode()函数的参数有点多, 每个跟进去就是一堆知识点. 这里建议大家从字面先过一边, 然后跟进去看一下类型定义, 类型的注释等, 了解一下功能, 先不深究. 整体看完一边调度器代码后回过头深入细节.
我们一起看一下其中这个参数: predicateFuncs map[string]algorithm.FitPredicate; 这里的 predicateFuncs 是一个 map, 表示所有的 predicate 函数. 这个 map 的 key 是个字符串, 也就是某种形式的 name 了; value 类型跟进去看一下:
- pkg/scheduler/algorithm/types.go:36
- // FitPredicate is a function that indicates if a pod fits into an existing node.
- // The failure information is given by the error.
- type FitPredicate func(pod *v1.Pod, meta PredicateMetadata, nodeInfo *schedulercache.NodeInfo) (bool, []PredicateFailureReason, error)
FitPredicate 是一个函数类型, 3 个参数, pod 和 node 都很好理解, meta 跟进去简单看一下可以发现定义的是一些和 predicate 相关的一些元数据, 这些数据是根据 pod 和 node 信息获取到的, 类似 pod 的端口有哪些, pod 亲和的 pod 列表等. 返回值是一个表示是否 fit 的 bool 值, predicate 失败的原因列表, 一个错误类型.
也就是说, FitPredicate 这个函数类型也就是前面一直说的 predicate functions 的真面目了. 下面看 podFitsOnNode()函数的具体逻辑吧:
- pkg/scheduler/core/generic_scheduler.go:537
- func podFitsOnNode(
- pod *v1.Pod,
- meta algorithm.PredicateMetadata,
- info *schedulercache.NodeInfo,
- predicateFuncs map[string]algorithm.FitPredicate,
- nodeCache *equivalence.NodeCache,
- queue internalqueue.SchedulingQueue,
- alwaysCheckAllPredicates bool,
- equivClass *equivalence.Class,
- ) (bool, []algorithm.PredicateFailureReason, error) {
- podsAdded := false
- for i := 0; i < 2; i++ {
- metaToUse := meta
- nodeInfoToUse := info
- if i == 0 {
- podsAdded, metaToUse, nodeInfoToUse = addNominatedPods(pod, meta, info, queue)
- } else if !podsAdded || len(failedPredicates) != 0 {
- break
- }
- eCacheAvailable = equivClass != nil && nodeCache != nil && !podsAdded
- // 这里省略一个 for 循环, 下面会单独讲
- }
- return len(failedPredicates) == 0, failedPredicates, nil
- }
这里的逻辑是从一个 for 循环开始的, 关于这个 2 次循环的含义代码里有很长的一段注释, 我们先看一下注释里怎么说的(这里可以多看几遍体会一下):
出于某些原因考虑我们需要运行两次 predicate. 如果 node 上有更高或者相同优先级的 "指定 pods"(这里的 "指定 pods" 指的是通过 schedule 计算后指定要跑在一个 node 上但是还未真正运行到那个 node 上的 pods), 我们将这些 pods 加入到 meta 和 nodeInfo 后执行一次计算过程.
如果这个过程所有的 predicates 都成功了, 我们再假设这些 "指定 pods" 不会跑到 node 上再运行一次. 第二次计算是必须的, 因为有一些 predicates 比如 pod 亲和性, 也许在 "指定 pods" 没有成功跑到 node 的情况下会不满足.
如果没有 "指定 pods" 或者第一次计算过程失败了, 那么第二次计算不会进行.
我们在第一次调度的时候只考虑相等或者更高优先级的 pods, 因为这些 pod 是当前 pod 必须 "臣服" 的, 也就是说不能够从这些 pod 中抢到资源, 这些 pod 不会被当前 pod"抢占"; 这样当前 pod 也就能够安心从低优先级的 pod 手里抢资源了.
新 pod 在上述 2 种情况下都可调度基于一个保守的假设: 资源和 pod 反亲和性等的 predicate 在 "指定 pods" 被处理为 Running 时更容易失败; pod 亲和性在 "指定 pods" 被处理为 Not Running 时更加容易失败.
我们不能假设 "指定 pods" 是 Running 的因为它们当前还没有运行, 而且事实上, 它们确实有可能最终又被调度到其他 node 上了.
看了这个注释后, 上面代码里的前几行就很好理解了, 在第一次进入循环体和第二次进入时做了不同的处理, 具体怎么做的处理我们暂时不关注. 下面看省略的这个 for 循环做了啥:
- pkg/scheduler/core/generic_scheduler.go:583
- // predicates.Ordering()得到的是一个[]string,predicate 名字集合
- for predicateID, predicateKey := range predicates.Ordering() {
- var (
- fit bool
- reasons []algorithm.PredicateFailureReason
- err error
- )
- // 如果 predicateFuncs 有这个 key, 则调用这个 predicate; 也就是说 predicateFuncs 如果定义了一堆乱七八遭的名字, 会被忽略调, 因为 predicateKey 是内置的.
- if predicate, exist := predicateFuncs[predicateKey]; exist {
- // 降低难度, 先不看缓存情况.
- if eCacheAvailable {
- fit, reasons, err = nodeCache.RunPredicate(predicate, predicateKey, predicateID, pod, metaToUse, nodeInfoToUse, equivClass)
- } else {
- // 真正调用 predicate 函数了!!!!!!!!!
- fit, reasons, err = predicate(pod, metaToUse, nodeInfoToUse)
- }
- if err != nil {
- return false, []algorithm.PredicateFailureReason{}, err
- }
- if !fit {
- // ......
- }
- }
- }
如上, 我们看一下 2 个地方:
- predicates.Ordering()
- fit, reasons, err = predicate(pod, metaToUse, nodeInfoToUse)
分两个小节吧~
3.1. predicates 的顺序
- pkg/scheduler/algorithm/predicates/predicates.go:130
- var (
- predicatesOrdering = []string{
- CheckNodeConditionPred,
- CheckNodeUnschedulablePred,
- GeneralPred,
- HostNamePred,
- PodFitsHostPortsPred,
- MatchNodeSelectorPred,
- PodFitsResourcesPred,
- NoDiskConflictPred,
- PodToleratesNodeTaintsPred,
- PodToleratesNodeNoExecuteTaintsPred,
- CheckNodeLabelPresencePred,
- CheckServiceAffinityPred,
- MaxEBSVolumeCountPred,
- MaxGCEPDVolumeCountPred,
- MaxCSIVolumeCountPred,
- MaxAzureDiskVolumeCountPred,
- CheckVolumeBindingPred,
- NoVolumeZoneConflictPred,
- CheckNodeMemoryPressurePred,
- CheckNodePIDPressurePred,
- CheckNodeDiskPressurePred,
- MatchInterPodAffinityPred}
- )
如上, 这里定义了一个次序, 前面的 for 循环遍历的是这个[]string, 这样也就实现了不管 predicateFuncs 里定义了怎样的顺序, 影响不了 predicate 的实际调用顺序. 官网对于这个顺序有这样一个表格解释:
Position | Predicate | comments (note, justification...) |
---|---|---|
1 | CheckNodeConditionPredicate | we really don't want to check predicates against unschedulable nodes. |
2 | PodFitsHost | we check the pod.spec.nodeName. |
3 | PodFitsHostPorts | we check ports asked on the spec. |
4 | PodMatchNodeSelector | check node label after narrowing search. |
5 | PodFitsResources | this one comes here since it's not restrictive enough as we do not try to match values but ranges. |
6 | NoDiskConflict | Following the resource predicate, we check disk |
7 | PodToleratesNodeTaints | check toleration here, as node might have toleration |
8 | PodToleratesNodeNoExecuteTaints | check toleration here, as node might have toleration |
9 | CheckNodeLabelPresence | labels are easy to check, so this one goes before |
10 | checkServiceAffinity | - |
11 | MaxPDVolumeCountPredicate | - |
12 | VolumeNodePredicate | - |
13 | VolumeZonePredicate | - |
14 | CheckNodeMemoryPressurePredicate | doesn't happen often |
15 | CheckNodeDiskPressurePredicate | doesn't happen often |
16 | InterPodAffinityMatches | Most expensive predicate to compute |
这个表格大家对着字面意思体会一下吧, 基本还是可以联想到意义的.
当然这个顺序是可以被配置文件覆盖的, 用户可以使用类似这样的配置:
- {
- "kind" : "Policy",
- "apiVersion" : "v1",
- "predicates" : [
- {"name" : "PodFitsHostPorts", "order": 2},
- {"name" : "PodFitsResources", "order": 3},
- {"name" : "NoDiskConflict", "order": 5},
- {"name" : "PodToleratesNodeTaints", "order": 4},
- {"name" : "MatchNodeSelector", "order": 6},
- {"name" : "PodFitsHost", "order": 1}
- ],
- "priorities" : [
- {"name" : "LeastRequestedPriority", "weight" : 1},
- {"name" : "BalancedResourceAllocation", "weight" : 1},
- {"name" : "ServiceSpreadingPriority", "weight" : 1},
- {"name" : "EqualPriority", "weight" : 1}
- ],
- "hardPodAffinitySymmetricWeight" : 10
- }
整体过完源码后我们再实际尝试一下这些特性, 这一边先知道有这回事吧, ok, 继续~
3.2. 单个 predicate 执行过程
fit, reasons, err = predicate(pod, metaToUse, nodeInfoToUse)
这行代码其实没有啥复杂逻辑, 不过我们还是重复讲一下, 清晰理解这一行很有必要. 这里的 predicate()来自前几行的 if 语句 predicate, exist := predicateFuncs[predicateKey], 往前跟也就是 FitPredicate 类型, 我们前面提过, 类型定义在 pkg/scheduler/algorithm/types.go:36, 这个 ߇#x7C7B; 型表示的是一个具体的 predicate 函数, 这里使用 predicate()也就是一个函数调用的语法, 很和谐了.
3.3. 具体的 predicate 函数
一直在讲 predicate, 那么 predicate 函数到底长什么样子呢, 我们从具体的实现函数找一个看一下. 开始讲 design 的时候提到过 predicate 的实现在 pkg/scheduler/algorithm/predicates/predicates.go 文件中, 先看一眼 Structure 吧:
这个文件中 predicate 函数有点多, 这样看眼花, 我们具体点开一个观察一下:
- pkg/scheduler/algorithm/predicates/predicates.go:277
- func NoDiskConflict(pod *v1.Pod, meta algorithm.PredicateMetadata, nodeInfo *schedulercache.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) {
- for _, v := range pod.Spec.Volumes {
- for _, ev := range nodeInfo.Pods() {
- if isVolumeConflict(v, ev) {
- return false, []algorithm.PredicateFailureReason{ErrDiskConflict}, nil
- }
- }
- }
- return true, nil, nil
- }
我们知道 predicate 函数的特点, 这样就很好在这个一千六百多行 go 文件中寻找 predicate 函数了. 像上面这个 NoDiskConflict()函数, 参数是 pod,meta 和 nodeinfo, 很明显是 FitPredicate 类型的, 标准的 predicate 函数.
这个函数的实现也特别简单, 遍历 pod 的 Volumes, 然后对于 pod 的每一个 Volume, 遍历 node 上的每个 pod, 看是否和当前 podVolume 冲突. 如果不 fit 就返回 false 加原因; 如果 fit 就返回 true, 很清晰.
来源: https://www.cnblogs.com/cloudgeek/p/10495179.html