SchedulingQueue 是 kubernetes scheduler 中负责进行等待调度 pod 存储的对, Scheduler 通过 SchedulingQueue 来获取当前系统中等待调度的 Pod, 本文主要讨论 SchedulingQueue 的设计与实现的各种实现, 了解探究其内部实现与底层源码, 本系列代码基于 kubernets1.1.6 分析而来, 图解主要位于第二部分
SchedulingQueue 设计
队列与优先级
队列与场景
类型 | 描述 | 通常实现 |
---|---|---|
队列 | 普通队列是一个 FIFO 的数据结构,根据元素入队的次序依次出队 | 数组或者链表 |
优先级队列 | 优先级队列通常是指根据某些优先级策略,高优先级会优先被获取 | 数组或者树 |
其实在大多数的调度场景中, 大多都是采用优先级队列来实现, 优先满足优先级比较高的任务或者需求, 从而减少后续高优先级对低优先级的抢占, scheduler 中也是如此
优先级的选择
k8s 中调度的单元是 Pod,scheduler 中根据 pod 的优先级的高低来进行优先级队列的构建, 这个其实是在 kubernets 的 adminission 准入插件中, 会为用户创建的 pod 根据用户的设置, 进行优先级字段的计算
三级队列
活动队列
活动队列存储当前系统中所有正在等待调度的队列
不可调度队列
当 pod 的资源在当前集群中不能被满足时, 则会被加入到一个不可调度队列中, 然后等待稍后再进行尝试
backoff 队列
backoff 机制是并发编程中常见的一种机制, 即如果任务反复执行依旧失败, 则会按次增长等待调度时间, 降低重试效率, 从而避免反复失败浪费调度资源
针对调度失败的 pod 会优先存储在 backoff 队列中, 等待后续重试
阻塞与抢占
阻塞设计
当队列中不存在等待调度的 pod 的时候, 会阻塞 scheduler 等待有需要调度的 pod 的时候再唤醒调度器, 获取 pod 进行调度
抢占相关
nominatedPods 存储 pod 被提议运行的 node, 主要用于抢占调度流程中使用, 本节先不分析
源码分析
数据结构
kubernetes 中默认的 schedulingQueue 实现是 PriorityQueue, 本章就以该数据结构来分析
- type PriorityQueue struct {
- stop <-chan struct{}
- clock util.Clock
- // 存储 backoff 的 pod 计时器
- podBackoff *PodBackoffMap
- lock sync.RWMutex
- // 用于协调通知因为获取不到调度 pod 而阻塞的 cond
- cond sync.Cond
- // 活动队列
- activeQ *util.Heap
- // backoff 队列
- podBackoffQ *util.Heap
- // 不可调度队列
- unschedulableQ *UnschedulablePodsMap
- // 存储 pod 和被提名的 node, 实际上就是存储 pod 和建议的 node 节点
- nominatedPods *nominatedPodMap
- // schedulingCycle 是一个调度周期的递增序号, 当 pod pop 的时候会递增
- schedulingCycle int64
- // moveRequestCycle 缓存 schedulingCycle, 当未调度的 pod 重新被添加到 activeQueue 中
- // 会保存 schedulingCycle 到 moveRequestCycle 中
- moveRequestCycle int64
- closed bool
- }
PriorityQueue 作为实现 SchedulingQueue 的实现, 其核心数据结构主要包含三个队列: activeQ,podBackoffQ,unscheduleQ 内部通过 cond 来实现 Pop 操作的阻塞与通知, 接下来先分析核心的调度流程, 最后再分析 util.Heap 里面的具体实现
activeQ
存储所有等待调度的 Pod 的队列, 默认是基于堆来实现, 其中元素的优先级则通过对比 pod 的创建时间和 pod 的优先级来进行排序
- // activeQ is heap structure that scheduler actively looks at to find pods to
- // schedule. Head of heap is the highest priority pod.
- activeQ *util.Heap
优先级比较函数
- // activeQComp is the function used by the activeQ heap algorithm to sort pods.
- // It sorts pods based on their priority. When priorities are equal, it uses
- // PodInfo.timestamp.
- func activeQComp(podInfo1, podInfo2 interface{}) bool {
- pInfo1 := podInfo1.(*framework.PodInfo)
- pInfo2 := podInfo2.(*framework.PodInfo)
- prio1 := util.GetPodPriority(pInfo1.Pod)
- prio2 := util.GetPodPriority(pInfo2.Pod)
- // 首先根据优先级的高低进行比较, 然后根据 pod 的创建时间, 越高优先级的 Pod 越被优先调度
- // 越早创建的 pod 越优先
- return (prio1> prio2) || (prio1 == prio2 && pInfo1.Timestamp.Before(pInfo2.Timestamp))
- }
- podbackOffQ
podBackOffQ 主要存储那些在多个 schedulingCycle 中依旧调度失败的情况下, 则会通过之前说的 backOff 机制, 延迟等待调度的时间
- // podBackoffQ is a heap ordered by backoff expiry. Pods which have completed backoff
- // are popped from this heap before the scheduler looks at activeQ
- podBackoffQ *util.Heap
- podBackOff
上面提到 podBackOffQ 队列中并没有存储 pod 的 backOff 的具体信息, 比如 backoff 的计数器, 最后一次更新的时间等, podBackOff 则类似一个记分板, 记录这些信息, 供 podBackOffQ 使用
- // podBackoff tracks backoff for pods attempting to be rescheduled
- podBackoff *PodBackoffMap
- // PodBackoffMap is a structure that stores backoff related information for pods
- type PodBackoffMap struct {
- // lock for performing actions on this PodBackoffMap
- lock sync.RWMutex
- // initial backoff duration
- initialDuration time.Duration // 当前值是 1 秒
- // maximal backoff duration
- maxDuration time.Duration // 当前值是 1 分钟
- // map for pod -> number of attempts for this pod
- podAttempts map[ktypes.NamespacedName]int
- // map for pod -> lastUpdateTime pod of this pod
- podLastUpdateTime map[ktypes.NamespacedName]time.Time
- }
- unschedulableQ
存储已经尝试调度但是当前集群资源不满足的 pod 的队列
moveRequestCycle
当因为集群资源发生变化会尝试进行 unschedulableQ 中的 pod 转移到 activeQ,moveRequestCycle 就是存储资源变更时的 schedulingCycle
- func (p *PriorityQueue) MoveAllToActiveQueue() {
- // 省略其他代码
- p.moveRequestCycle = p.schedulingCycle
- }
- schedulingCycle
schedulingCycle 是一个递增的序列每次从 activeQ 中 pop 出一个 pod 都会递增
- func (p *PriorityQueue) Pop() (*v1.Pod, error) {
- // 省略其他
- p.schedulingCycle++
- }
并发活动队列
并发从活动队列中获取 pod
SchedulingQueue 提供了一个 Pop 接口用于从获取当前集群中等待调度的 pod, 其内部实现主要通过上面 cond 与 activeQ 来实现
当前队列中没有可调度的 pod 的时候, 则通过 cond.Wait 来进行阻塞, 然后在忘 activeQ 中添加 pod 的时候通过 cond.Broadcast 来实现通知
- func (p *PriorityQueue) Pop() (*v1.Pod, error) {
- p.lock.Lock()
- defer p.lock.Unlock()
- for p.activeQ.Len() == 0 {
- if p.closed {
- return nil, fmt.Errorf(queueClosed)
- }
- //
- p.cond.Wait()
- }
- obj, err := p.activeQ.Pop()
- if err != nil {
- return nil, err
- }
- pInfo := obj.(*framework.PodInfo)
- p.schedulingCycle++
- return pInfo.Pod, err
- }
加入调度 pod 到活动队列
当 pod 加入活动队列中, 除了加入 activeQ 的优先级队列中, 还需要从 podBackoffQ 和 unschedulableQ 中移除当前的 pod, 最后进行广播通知阻塞在 Pop 操作的 scheudler 进行最新 pod 的获取
- func (p *PriorityQueue) Add(pod *v1.Pod) error {
- p.lock.Lock()
- defer p.lock.Unlock()
- pInfo := p.newPodInfo(pod)
- // 加入 activeQ
- if err := p.activeQ.Add(pInfo); err != nil {
- klog.Errorf("Error adding pod %v/%v to the scheduling queue: %v", pod.Namespace, pod.Name, err)
- return err
- }
- // 从 unschedulableQ 删除
- if p.unschedulableQ.get(pod) != nil {
- klog.Errorf("Error: pod %v/%v is already in the unschedulable queue.", pod.Namespace, pod.Name)
- p.unschedulableQ.delete(pod)
- }
- // Delete pod from backoffQ if it is backing off
- // 从 podBackoffQ 删除
- if err := p.podBackoffQ.Delete(pInfo); err == nil {
- klog.Errorf("Error: pod %v/%v is already in the podBackoff queue.", pod.Namespace, pod.Name)
- }
- // 存储 pod 和被提名的 node
- p.nominatedPods.add(pod, "")
- p.cond.Broadcast()
- return nil
- }
schedulingCycle 与 moveRequestCycle
未调度的队列的及时重试
导致调度周期 schedulingCyclye 变更主要因素如下:
1. 当集群资源发生变化的时候: 比如新添加 pv,node 等资源, 那之前在 unschedulableQ 中因为资源不满足需求的 pod 就可以进行放入 activeQ 中或者 podBackoffQ 中, 及时进行调度
2.pod 被成功调度: 之前由于亲和性不满足被放入到 unschedulableQ 中的 pod, 此时也可以进行尝试, 而不必等到超时之后, 再加入
这两种情况下会分别触发 MoveAllToActiveQueue 和 movePodsToActiveQueue 变更 moveRequestCycle 使其等于 schedulingCycle
对重试机制的影响
当前一个 pod 失败的时候, 有两种选择一是加入 podBackoffQ 中, 二是加入 unschedulableQ 中, 那么针对一个失败的 pod 如何选择该进入那个队列中呢
结合上面的 moveRequestCycle 变更时机, 什么时候 moveRequestCycle 会大于等于 podSchedulingCycle 呢? 答案就是当前集群中进行过集群资源的变更或者 pod 被成功分配, 那这个时候我们如果重试一个失败的调度则可能会成功, 因为集群资源变更了可能有新的资源加入
- if p.moveRequestCycle>= podSchedulingCycle {
- if err := p.podBackoffQ.Add(pInfo); err != nil {
- return fmt.Errorf("error adding pod %v to the backoff queue: %v", pod.Name, err)
- }
- } else {
- p.unschedulableQ.addOrUpdate(pInfo)
- }
失败处理逻辑的注入
注入调度失败逻辑处理
在创建 scheduler Config 的时候会通过 MakeDefaultErrorFunc 注入一个失败处理函数, 在 scheduler 调度的时候会进行调用
kubernetes/pkg/scheduler/factory/factory.go: MakeDefaultErrorFunc 会将没有调度到任何一个 node 的 pod 重新放回到优先级队列中
- podSchedulingCycle := podQueue.SchedulingCycle()
- // 省略非核心代码
- if len(pod.Spec.NodeName) == 0 {
- // 重新放回队列
- if err := podQueue.AddUnschedulableIfNotPresent(pod, podSchedulingCycle); err != nil {
- klog.Error(err)
- }
- }
失败处理的回调
当调度 pod 的失败的时候, scheduler 会同时调用 sched.Error 就是上面注入的失败处理逻辑, 来将调度失败未分配 node 的 pod 节点重新加入到队里钟
- kubernetes/pkg/scheduler/scheduler.go
- func (sched *Scheduler) recordSchedulingFailure(pod *v1.Pod, err error, reason string, message string) {
- // 错误回调
- sched.Error(pod, err)
- sched.Recorder.Eventf(pod, nil, v1.EventTypeWarning, "FailedScheduling", "Scheduling", message)
- if err := sched.PodConditionUpdater.Update(pod, &v1.PodCondition{
- Type: v1.PodScheduled,
- Status: v1.ConditionFalse,
- Reason: reason,
- Message: err.Error(),
- }); err != nil {
- klog.Errorf("Error updating the condition of the pod %s/%s: %v", pod.Namespace, pod.Name, err)
- }
- }
- PodBackoffMap
PodBackoffMap 主要用于存储 pod 的最后一次失败的更新时间与实现次数, 从而根据这些数据来进行 pod 的 backoffTime 的计算
数据结构设计
- type PodBackoffMap struct {
- // lock for performing actions on this PodBackoffMap
- lock sync.RWMutex
- // 初始化 backoff duration
- initialDuration time.Duration // 当前值是 1 秒
- // 最大 backoff duration
- maxDuration time.Duration // 当前值是 1 分钟
- // 记录 pod 重试的次数
- podAttempts map[ktypes.NamespacedName]int
- // 记录 pod 的最后一次的更新时间
- podLastUpdateTime map[ktypes.NamespacedName]time.Time
- }
backoffTime 计算算法
初始化的时候回设定 initialDuration 和 maxDuration, 在当前版本中分别是 1s 和 10s, 也就是 backoffQ 中的 pod 最长 10s 就会重新加入 activeQ 中 (需要等待定时任务进行辅助)
在每次失败回调的时候, 都会进行 BackoffPod 方法来进行计数更新, 在后续获取 pod 的 backoffTime 的时候, 只需要获取次数然后结合 initialDuration 进行算法计算, 结合 pod 最后一次的更新时间, 就会获取 pod 的 backoffTime 的终止时间
backoffDuration 计算
其实最终的计算很简单就是 2 的 N 次幂
- func (pbm *PodBackoffMap) calculateBackoffDuration(nsPod ktypes.NamespacedName) time.Duration {
- // initialDuration 是 1s
- backoffDuration := pbm.initialDuration
- if _, found := pbm.podAttempts[nsPod]; found {
- // podAttempts 里面包含 pod 的尝试失败的次数
- for i := 1; i <pbm.podAttempts[nsPod]; i++ {
- backoffDuration = backoffDuration * 2
- // 最大 10s
- if backoffDuration> pbm.maxDuration {
- return pbm.maxDuration
- }
- }
- }
- return backoffDuration
- }
- podBackoffQ
优先级函数
podBackoffQ 实际上会根据 pod 的 backoffTime 来进行优先级排序, 所以 podBackoffQ 的队列头部, 就是最近一个要过期的 pod
- func (p *PriorityQueue) podsCompareBackoffCompleted(podInfo1, podInfo2 interface{}) bool {
- pInfo1 := podInfo1.(*framework.PodInfo)
- pInfo2 := podInfo2.(*framework.PodInfo)
- bo1, _ := p.podBackoff.GetBackoffTime(nsNameForPod(pInfo1.Pod))
- bo2, _ := p.podBackoff.GetBackoffTime(nsNameForPod(pInfo2.Pod))
- return bo1.Before(bo2)
- }
调度失败加入到 podBackoffQ
如果调度失败, 并且 moveRequestCycle=podSchedulingCycle 的时候就加入 podBackfoffQ 中
- func (p *PriorityQueue) AddUnschedulableIfNotPresent(pod *v1.Pod, podSchedulingCycle int64) error {
- // 省略检查性代码
- // 更新 pod 的 backoff 信息
- p.backoffPod(pod)
- // moveRequestCycle 将 pod 从 unscheduledQ 大于 pod 的调度周期添加到 如果 pod 的调度周期小于当前的调度周期
- if p.moveRequestCycle>= podSchedulingCycle {
- if err := p.podBackoffQ.Add(pInfo); err != nil {
- return fmt.Errorf("error adding pod %v to the backoff queue: %v", pod.Name, err)
- }
- } else {
- p.unschedulableQ.addOrUpdate(pInfo)
- }
- p.nominatedPods.add(pod, "")
- return nil
- }
从 unschedulableQ 迁移
在前面介绍的当集群资源发生变更的时候, 会触发尝试 unschedulabelQ 中的 pod 进行转移, 如果发现当前 pod 还未到达 backoffTime, 就加入到 podBackoffQ 中
- if p.isPodBackingOff(pod) {
- if err := p.podBackoffQ.Add(pInfo); err != nil {
- klog.Errorf("Error adding pod %v to the backoff queue: %v", pod.Name, err)
- addErrorPods = append(addErrorPods, pInfo)
- }
- } else {
- if err := p.activeQ.Add(pInfo); err != nil {
- klog.Errorf("Error adding pod %v to the scheduling queue: %v", pod.Name, err)
- addErrorPods = append(addErrorPods, pInfo)
- }
- }
podBackoffQ 定时转移
在创建 PriorityQueue 的时候, 会创建两个定时任务其中一个就是讲 backoffQ 中的 pod 到期后的转移, 每秒钟尝试一次
- func (p *PriorityQueue) run() {
- go wait.Until(p.flushBackoffQCompleted, 1.0*time.Second, p.stop)
- go wait.Until(p.flushUnschedulableQLeftover, 30*time.Second, p.stop)
- }
因为是一个堆结果, 所以只需要获取堆顶的元素, 然后确定是否到期, 如果到期后则进行 pop 处来, 加入到 activeQ 中
- func (p *PriorityQueue) flushBackoffQCompleted() {
- p.lock.Lock()
- defer p.lock.Unlock()
- for {
- // 获取堆顶元素
- rawPodInfo := p.podBackoffQ.Peek()
- if rawPodInfo == nil {
- return
- }
- pod := rawPodInfo.(*framework.PodInfo).Pod
- // 获取到期时间
- boTime, found := p.podBackoff.GetBackoffTime(nsNameForPod(pod))
- if !found {
- // 如果当前已经不在 podBackoff 中, 则就 pop 出来然后放入到 activeQ
- klog.Errorf("Unable to find backoff value for pod %v in backoffQ", nsNameForPod(pod))
- p.podBackoffQ.Pop()
- p.activeQ.Add(rawPodInfo)
- defer p.cond.Broadcast()
- continue
- }
- // 未超时
- if boTime.After(p.clock.Now()) {
- return
- }
- // 超时就 pop 出来
- _, err := p.podBackoffQ.Pop()
- if err != nil {
- klog.Errorf("Unable to pop pod %v from backoffQ despite backoff completion.", nsNameForPod(pod))
- return
- }
- // 加入到 activeQ 中
- p.activeQ.Add(rawPodInfo)
- defer p.cond.Broadcast()
- }
- }
- unschedulableQ
调度失败
调度失败后, 如果当前集群资源没有发生变更, 就加入到 unschedulable, 原因上面说过
- func (p *PriorityQueue) AddUnschedulableIfNotPresent(pod *v1.Pod, podSchedulingCycle int64) error {
- // 省略检查性代码
- // 更新 pod 的 backoff 信息
- p.backoffPod(pod)
- // moveRequestCycle 将 pod 从 unscheduledQ 大于 pod 的调度周期添加到 如果 pod 的调度周期小于当前的调度周期
- if p.moveRequestCycle>= podSchedulingCycle {
- if err := p.podBackoffQ.Add(pInfo); err != nil {
- return fmt.Errorf("error adding pod %v to the backoff queue: %v", pod.Name, err)
- }
- } else {
- p.unschedulableQ.addOrUpdate(pInfo)
- }
- p.nominatedPods.add(pod, "")
- return nil
- }
定时转移任务
定时任务每 30 秒执行一次
- func (p *PriorityQueue) run() {
- go wait.Until(p.flushUnschedulableQLeftover, 30*time.Second, p.stop)
- }
逻辑其实就非常简单如果当前时间 - pod 的最后调度时间大于 60s, 就重新调度, 转移到 podBackoffQ 或者 activeQ 中
- func (p *PriorityQueue) flushUnschedulableQLeftover() {
- p.lock.Lock()
- defer p.lock.Unlock()
- var podsToMove []*framework.PodInfo
- currentTime := p.clock.Now()
- for _, pInfo := range p.unschedulableQ.podInfoMap {
- lastScheduleTime := pInfo.Timestamp
- // 如果该 pod1 分钟内没有被调度就加入到 podsToMove
- if currentTime.Sub(lastScheduleTime)> unschedulableQTimeInterval {
- podsToMove = append(podsToMove, pInfo)
- }
- }
- if len(podsToMove)> 0 {
- // podsToMove 将这些 pod 移动到 activeQ
- p.movePodsToActiveQueue(podsToMove)
- }
- }
调度队列总结
数据流设计总结
3.1.1 三队列与后台定时任务
从设计上三队列分别存储: 活动队列, bakcoff 队列, 不可调度队列, 其中 backoff 中会根据任务的失败来逐步递增重试时间 (最长 10s),unschedulableQ 队列则延迟 60s
通过后台定时任务分别将 backoffQ 队列, unschedulableQ 队列来进行重试, 加入到 activeQ 中, 从而加快完成 pod 的失败重试调度
cycle 与优先调度
schedulingCycle,moveRequestCycle 两个 cycle 其实本质上也是为了加快失败任务的重试调度, 当集群资源发生变化的时候, 进行立即重试, 那些失败的优先级比较高, 亲和性问题的 pod 都可能会被优先调度
锁与 cond 实现线程安全 pop
内部通过 lock 保证线程安全, 并通过 cond 来实现阻塞等待, 从而实现阻塞 scheduler worker 的通知
今天就分析到这里, 其实参考这个实现, 我们也可以从中抽象出一些设计思想, 实现自己的一个具有优先级, 快速重试, 高可用的任务队列, 先分析到这, 下一个分析的组件是 SchedulerCache, 感兴趣可以加我微信一起交流学习, 毕竟三个臭皮匠算计不过诸葛亮
微信号: baxiaoshi2020
关注公告号阅读更多源码分析文章
更多文章关注 http://www.sreguide.com/
来源: http://www.tuicool.com/articles/Fv6Fne3