Kubernetes调度器原理解析

2022-11-28 17:31:20 浏览数 (1)

对于接触过Kubernetes的同学来说,Kubernetes调度器应该不陌生。Kube-Scheduler 是Kubernetes的核心组件之一,它主要负责整个集群资源的调度,根据特定的调度算法和策略,将Pod调度到最优的工作节点上面去,从而更加合理、更加充分的利用集群的资源。

作者:张永曦, 中国移动云能力中心软件开发工程师,专注于云原生、Istio、微服务、Spring Cloud 等领域。

01

调度器的职责

Kube-Scheduler的主要作用就是根据特定的调度算法和调度策略将 Pod 调度到合适的 Node节点上去。Kube-Scheduler是一个独立的二进制程序,启动之后会监控Api-Server,获取到未被调度的Pod,经过一些列调度算法之后,最终给Pod调度到最合适的Node节点上运行。Kube-Scheduler的主要职责,可以理解为为Pod选择最合适的Node。

图1 Kube-Scheduler的职责

那么,Kube-Scheduler是如何实现Pod的Node节点选择,调度算法大概是怎样的呢?概括来讲,调度算法分为如下两大步骤:

① 首先是预选过程,过滤掉不满足条件的节点,这个过程称为 Predicates(过滤)

② 然后是优选过程,对通过的节点按照优先级排序,称之为 Priorities(打分)

③ 最后从中选择优先级最高的节点,如果中间任何一步骤有错误,就直接返回错误

Predicates阶段首先遍历全部节点,过滤掉不满足条件的节点,属于强制性规则,这一阶段输出的所有满足要求的节点将被记录并作为第二阶段的输入,如果所有的节点都不满足条件,那么 Pod 将会一直处于 Pending 状态,直到有节点满足条件,在这期间调度器会不断的重试。

Priorities阶段即再次对节点进行筛选,如果有多个节点都满足条件的话,那么系统会按照节点的优先级(priorites)大小对节点进行排序,最后选择优先级最高的节点来部署 Pod 应用。

02

调度器的架构设计

Kube-Scheduler实现了一套独有的调度框架,上文中提到的Predicates阶段和 Priorities 阶段分别对应了调度框架的若干个扩展点,调度框架的每一个扩展点,都对应了若干个Plugin,调度框架在执行调度工作流时,遇到对应的扩展点,会串行执行对应注册在扩展点上的所有Plugin,所有扩展点上的Plugin协同,共同完成Pod的调度任务。

图2 Kube-Scheduler的调度框架

Kube-Scheduler的调度框架如图2所示,调度工作流按照箭头顺序执行,其中绿色部分为调度过程,是同步执行的,黄色部分为绑定过程,是异步执行的。扩展点描述如下:

1)Sort 扩展用于对 Pod 的待调度队列进行排序,以决定先调度哪个 Pod,Sort 扩展本质上只需要实现一个方法 Less(Pod1, Pod2) 用于比较两个 Pod 谁更优先获得调度即可,同一时间点只能有一个 Sort 插件生效。

2)Pre-filter 扩展用于对 Pod 的信息进行预处理,或者检查一些集群或 Pod 必须满足的前提条件,然后将其存入缓存中待 Filter 扩展点执行的时候使用,如果 pre-filter 返回了 error,则调度过程终止。

3)Filter 扩展用于排除那些不能运行该 Pod 的节点,对于每一个节点,调度器将按顺序执行 filter 扩展;如果任何一个 filter 将节点标记为不可选,则余下的 filter 扩展将不会被执行,调度器可以同时对多个节点执行 filter 扩展。

4)Post-filter 如果在 Filter 扩展点全部节点都被过滤掉了,没有合适的节点进行调度,才会执行 PostFilter 扩展点,如果启用了 Pod 抢占特性,那么会在这个扩展点进行抢占操作,可以用于 logs/metircs。

5)PreScore 扩展会对 Score 扩展点的数据做一些预处理操作,然后将其存入缓存中待 Score 扩展点执行的时候使用。

6)Score 扩展用于为所有可选节点进行打分,调度器将针对每一个节点调用每个 Sore 扩展,评分结果是一个范围内的整数,代表最小和最大分数。在 normalize scoring 阶段,调度器将会把每个 score 扩展对具体某个节点的评分结果和该扩展的权重合并起来,作为最终评分结果。

7)Normalize score 扩展在调度器对节点进行最终排序之前修改每个节点的评分结果,注册到该扩展点的扩展在被调用时,将获得同一个插件中的 score 扩展的评分结果作为参数,调度框架每执行一次调度,都将调用所有插件中的一个 normalize score 扩展一次。

8)Reserve 是一个通知性质的扩展点,有状态的插件可以使用该扩展点来获得节点上为 Pod 预留的资源,该事件发生在调度器将 Pod 绑定到节点之前,目的是避免调度器在等待 Pod 与节点绑定的过程中调度新的 Pod 到节点上时,发生实际使用资源超出可用资源的情况(因为绑定 Pod 到节点上是异步发生的)。这是调度过程的最后一个步骤,Pod 进入 reserved 状态以后,要么在绑定失败时触发 Unreserve 扩展,要么在绑定成功时,由 Post-bind 扩展结束绑定过程。

9)Permit 扩展在每个 Pod 调度周期的最后调用,用于阻止或者延迟 Pod 与节点的绑定。Permit 扩展可以做下面三件事中的一项:

- approve(批准):当所有的 permit 扩展都 approve 了 Pod 与节点的绑定,调度器将继续执行绑定过程

- deny(拒绝):如果任何一个 permit 扩展 deny 了 Pod 与节点的绑定,Pod 将被放回到待调度队列,此时将触发Unreserve 扩展

- wait(等待):如果一个 permit 扩展返回了 wait,则 Pod 将保持在 permit 阶段,直到被其他扩展 approve,如果超时事件发生,wait 状态变成 deny,Pod 将被放回到待调度队列,此时将触发 Unreserve 扩展

10)WaitOnPermit 扩展与 Permit 扩展点配合使用实现延时调度功能(内部默认实现的)。

11)Pre-bind 扩展用于在 Pod 绑定之前执行某些逻辑。例如,pre-bind 扩展可以将一个基于网络的数据卷挂载到节点上,以便 Pod 可以使用。如果任何一个 pre-bind 扩展返回错误,Pod 将被放回到待调度队列,此时将触发 Unreserve 扩展。

12)Bind 扩展用于将 Pod 绑定到节点上:

- 只有所有的 pre-bind 扩展都成功执行了,bind 扩展才会执行

- 调度框架按照 bind 扩展注册的顺序逐个调用 bind 扩展

- 具体某个 bind 扩展可以选择处理或者不处理该 Pod

- 如果某个 bind 扩展处理了该 Pod 与节点的绑定,余下的 bind 扩展将被忽略

13)Post-bind 是一个通知性质的扩展:

- Post-bind 扩展在 Pod 成功绑定到节点上之后被动调用

- Post-bind 扩展是绑定过程的最后一个步骤,可以用来执行资源清理的动作

14)Unreserve 是一个通知性质的扩展,如果为 Pod 预留了资源,Pod 又在被绑定过程中被拒绝绑定,则 unreserve 扩展将被调用。Unreserve 扩展应该释放已经为 Pod 预留的节点上的计算资源。在一个插件中,reserve 扩展和 unreserve 扩展应该成对出现。

Kube-Scheduler的默认调度策略如下代码所示。其中,Predicates阶段对应PreFilter扩展点和Filter扩展点,Priorities阶段对应PreScor扩展点和Score扩展点。除此之外,还有QueueSort扩展点和Bingding扩展点。其中每个扩展点,都对应注册多个调度插件。

代码语言:javascript复制
return &schedulerapi.Plugins{
    QueueSort: &schedulerapi.PluginSet{
      Enabled: []schedulerapi.Plugin{
        {Name: queuesort.Name},
      },
    },
    PreFilter: &schedulerapi.PluginSet{
      Enabled: []schedulerapi.Plugin{
        {Name: noderesources.FitName},
        {Name: nodeports.Name},
        {Name: interpodaffinity.Name},
      },
    },
    Filter: &schedulerapi.PluginSet{
      Enabled: []schedulerapi.Plugin{
        {Name: nodeunschedulable.Name},
        {Name: noderesources.FitName},
        {Name: nodename.Name},
        {Name: nodeports.Name},
        {Name: nodeaffinity.Name},
        {Name: volumerestrictions.Name},
        {Name: tainttoleration.Name},
        {Name: nodevolumelimits.EBSName},
        {Name: nodevolumelimits.GCEPDName},
        {Name: nodevolumelimits.CSIName},
        {Name: nodevolumelimits.AzureDiskName},
        {Name: volumebinding.Name},
        {Name: volumezone.Name},
        {Name: interpodaffinity.Name},
      },
    },
    PreScore: &schedulerapi.PluginSet{
      Enabled: []schedulerapi.Plugin{
        {Name: interpodaffinity.Name},
        {Name: defaultpodtopologyspread.Name},
        {Name: tainttoleration.Name},
      },
    },
    Score: &schedulerapi.PluginSet{
      Enabled: []schedulerapi.Plugin{
        {Name: noderesources.BalancedAllocationName, Weight: 1},
        {Name: imagelocality.Name, Weight: 1},
        {Name: interpodaffinity.Name, Weight: 1},
        {Name: noderesources.LeastAllocatedName, Weight: 1},
        {Name: nodeaffinity.Name, Weight: 1},
        {Name: nodepreferavoidpods.Name, Weight: 10000},
        {Name: defaultpodtopologyspread.Name, Weight: 1},
        {Name: tainttoleration.Name, Weight: 1},
      },
    },
    Bind: &schedulerapi.PluginSet{
      Enabled: []schedulerapi.Plugin{
        {Name: defaultbinder.Name},
      },
    },
  }
}

03

一个Pod的调度过程

上文提到的Kube-Scheduler的调度工作流(如图2所示),主体是在Scheduler.scheduleOne()函数中实现的。scheduleOne函数的部分代码如下:

代码语言:javascript复制
func (sched *Scheduler) scheduleOne(ctx context.Context) {
  podInfo := sched.NextPod()
......
  scheduleResult, err := sched.Algorithm.Schedule(schedulingCycleCtx, prof, state, pod)
  if err != nil {
    if fitError, ok := err.(*core.FitError); ok {
      if sched.DisablePreemption {
        klog.V(3).Infof("Pod priority feature is not enabled or preemption is disabled by scheduler configuration."  
          " No preemption is performed.")
      } else {
        preemptionStartTime := time.Now()
        sched.preempt(schedulingCycleCtx, prof, state, pod, fitError)
        metrics.PreemptionAttempts.Inc()
      }
      metrics.PodScheduleFailures.Inc()
    } else if err == core.ErrNoNodesAvailable {
      // No nodes available is counted as unschedulable rather than an error.
      metrics.PodScheduleFailures.Inc()
    } else {
      klog.Errorf("error selecting node for pod: %v", err)
      metrics.PodScheduleErrors.Inc()
    }
    sched.recordSchedulingFailure(prof, podInfo.DeepCopy(), err, v1.PodReasonUnschedulable, err.Error())
    return
  }
......
}

scheduleOne每执行一次,代表一个调度周期,对应一个Pod的调度工作流。从代码中可以看出,调度器首先调用sched.NextPod()方法,从调度器优先级队列PriorityQueue取出下一个待调度的Pod。然后执行sched.Algorithm.Schedule方法(方法中包括Pre-filter、Filter、PreScore、Score四个扩展点),选出最优的Node;若是调度算法执行失败,则执行sched.preempt(Post-filter扩展点)抢占操作,并将Pod重新写入到PriorityQueue当中,并返回。scheduleOne接下来的工作,即按照图2调度框架描述的工作流继续执行剩余的扩展点插件。

sched.Algorithm.Schedule方法,是Kube-Scheduler的调度算法核心。Predicates阶段和 Priorities 阶段就是在这个方法里实现的。代码如下:

代码语言:javascript复制
func (g*genericScheduler) Schedule(ctx context.Context, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) (result ScheduleResult, err error) {
  defer trace.LogIfLong(100 * time.Millisecond)
  if err := g.snapshot(); err != nil {
    return result, err
  }
  trace.Step("Snapshotting scheduler cache and node infos done")
  if g.nodeInfoSnapshot.NumNodes() == 0 {
    return result, ErrNoNodesAvailable
  }
  feasibleNodes, diagnosis, err := g.findNodesThatFitPod(ctx, fwk, state, pod)
  if err != nil {
    return result, err
  }
  trace.Step("Computing predicates done")
  if len(feasibleNodes) == 0 {
    return result, &framework.FitError{
      Pod:         pod,
      NumAllNodes: g.nodeInfoSnapshot.NumNodes(),
      Diagnosis:   diagnosis,
    }
  }
  // When only one node after predicate, just use it.
  if len(feasibleNodes) == 1 {
    return ScheduleResult{
      SuggestedHost:  feasibleNodes[0].Name,
      EvaluatedNodes: 1   len(diagnosis.NodeToStatusMap),
      FeasibleNodes:  1,
    }, nil
  }
  priorityList, err := g.prioritizeNodes(ctx, fwk, state, pod, feasibleNodes)
  if err != nil {
    return result, err
  }
  host, err := g.selectHost(priorityList)
  trace.Step("Prioritizing done")
  return ScheduleResult{
    SuggestedHost:  host,
    EvaluatedNodes: len(feasibleNodes)   len(diagnosis.NodeToStatusMap),
    FeasibleNodes:  len(feasibleNodes),
  }, err
}

整个核心调度的实现流程很简单:

① 调度前检查

② 获取调度器缓存和节点信息快照

③ 执行预选操作,找到一批合适的待调度的节点

④ 找不到节点的话,返回 FitError 错误,只找到一个节点,直接返回这个节点的信息

⑤ 执行优选操作,为每个节点进行打分

⑥ 选择一个分数最高的作为待调度的节点进行返回

04

调度器插件简介

上文中提到,对于每个扩展点,可以注册多个调度插件。那么,调度插件是什么,是怎么实现的,本章以NodeResourcesFit插件来说明辅以说明。NodeResourcesFit插件在默认的调度策略中,分别注册在了PreFilter和Filter两个扩展点,以下是 NodeResourcesFit插件的关键代码。

代码语言:javascript复制
const (
  // FitName is the name of the plugin used in the plugin registry and configurations.
  FitName = "NodeResourcesFit"
  // preFilterStateKey is the key in CycleState to NodeResourcesFit pre-computed data.
  // Using the name of the plugin will likely help us avoid collisions with other plugins.
  preFilterStateKey = "PreFilter"   FitName
)
// Fit is a plugin that checks if a node has sufficient resources.
type Fit struct {
  ignoredResources sets.String
}
// Name returns name of the plugin. It is used in logs, etc.
func (f *Fit) Name() string {
  return FitName
}
func computePodResourceRequest(pod *v1.Pod) *preFilterState {
  result := &preFilterState{}
  for _, container := range pod.Spec.Containers {
    result.Add(container.Resources.Requests)
  }
  // take max_resource(sum_pod, any_init_container)
  for _, container := range pod.Spec.InitContainers {
    result.SetMaxResource(container.Resources.Requests)
  }
  // If Overhead is being utilized, add to the total requests for the pod
  if pod.Spec.Overhead != nil && utilfeature.DefaultFeatureGate.Enabled(features.PodOverhead) {
    result.Add(pod.Spec.Overhead)
  }
  return result
}
// PreFilter invoked at the prefilter extension point.
func (f *Fit) PreFilter(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod) *framework.Status {
  cycleState.Write(preFilterStateKey, computePodResourceRequest(pod))
  return nil
}
func (f *Fit) Filter(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo) *framework.Status {
  s, err := getPreFilterState(cycleState)
  if err != nil {
    return framework.NewStatus(framework.Error, err.Error())
  }
  insufficientResources := fitsRequest(s, nodeInfo, f.ignoredResources)
  if len(insufficientResources) != 0 {
    // We will keep all failure reasons.
    failureReasons := make([]string, 0, len(insufficientResources))
    for _, r := range insufficientResources {
      failureReasons = append(failureReasons, r.Reason)
    }
    return framework.NewStatus(framework.Unschedulable, failureReasons...)
  }
  return nil
}

从代码中可以看出,NodeResourcesFit插件分别实现了PreFilter方法和Filter方法,正因为如此,NodeResourcesFit插件可以分别注册在PreFilter扩展点和Filter扩展点。

PreFilter方法中,插件通过computePodResourceRequest函数,计算出了待调度Pod所需要的运行资源需求,并将该信息写到调度上下文中(framework.CycleState)。

在Filter方法中,插件首先从调度上下文获取到了待调度Pod的运行资源需求信息,然后依据该信息对Node进行过滤,不满足资源需求的Node会被过滤掉。

04

调度器的扩展方式

Kube-Scheduler拥有一个灵活的调度器架构,能够方便的,灵活的实现对于调度器本身的各种扩展。目前,针对Kube-Scheduler的扩展主要有两大类:

① 基于内部扩展点的扩展,可以通过自定义调度插件,并选择扩展点的注册插件的方式的实现;

② 基于外部Extender的扩展,大部分扩展点,都支持Extender方式调用外部服务的方式,丰富扩展点的功能。

1.基于内部扩展点的扩展

这种扩展方式需要修改Kube-Scheduler的代码。在Kube-Scheduler的main函数中,有一个NewSchedulerCommand方法,默认该方法的传参为空。为了实现调度器的扩展,可以传入自己定义的调度插件。

代码语言:javascript复制
// Option configures a framework.Registry.
type Option func(framework.Registry) error
// NewSchedulerCommand creates a *cobra.Command object with default parameters and registryOptions
func NewSchedulerCommand(registryOptions ...Option) *cobra.Command {
...
}

WithPlugin函数,可以用来注册插件,并返回一个Option实例,作为NewSchedulerCommand函数的参数,传递到调度器中。

代码语言:javascript复制
// WithPlugin creates an Option based on plugin name and factory. Please don't remove this function: it is used to register out-of-tree plugins,
// hence there are no references to it from the kubernetes scheduler code base.
func WithPlugin(name string, factory framework.PluginFactory) Option {
  return func(registry framework.Registry) error {
    return registry.Register(name, factory)
  }

调度器最终的main函数代码如下,插件的实现方式,可以参考上一章NodeResourcesFit插件代码。

代码语言:javascript复制
func main() {
  rand.Seed(time.Now().UnixNano())
  command := app.NewSchedulerCommand(app.WithPlugin(plugins.Name, plugins.New))
  logs.InitLogs()
  defer logs.FlushLogs()
  if err := command.Execute(); err != nil {
    os.Exit(1)
  }
}

假设新加入的插件,名为“sample-plugin”,新的调度策略需要在默认调度策略的基础上,将sample-plugin插件新增注册到PreFilter和Filter两个扩展点上,则KubeSchedulerConfiguration的Profile部分,可作如下修改。

代码语言:javascript复制
profiles:
- pluginConfig:
- args:
args1: "example1"
name: sample-plugin
schedulerName: sample-scheduler
plugins:
preFilter:
enabled:
- name: "sample-plugin"
filter:
enabled:
- name: "sample-plugin"

2.基于外部Extender的扩展

外部Extender扩展的原理在于,调度的很多扩展点,在调用完内部注册的插件之后,都会以调用外部Http服务的形式,来丰富调度器的调度策略。

代码语言:javascript复制
func (g *genericScheduler) findNodesThatFitPod(ctx context.Context, prof *profile.Profile, state *framework.CycleState, pod *v1.Pod) ([]*v1.Node, framework.NodeToStatusMap, error) {
  filteredNodesStatuses := make(framework.NodeToStatusMap)
  filtered, err := g.findNodesThatPassFilters(ctx, prof, state, pod, filteredNodesStatuses)
  if err != nil {
    return nil, nil, err
  }
  filtered, err = g.findNodesThatPassExtenders(pod, filtered, filteredNodesStatuses)
  if err != nil {
    return nil, nil, err
  }
  return filtered, filteredNodesStatuses, nil
}

如代码中所示,在Filter扩展点,调度器会调用findNodesThatPassExtenders方法,继续调用Extender外部扩展服务,而Filter扩展点调用Extender外部扩展服务的代码,对应scheduler.core.Filter()方法。KubeSchedulerConfiguration配置的如下:

代码语言:javascript复制
apiVersion: kubescheduler.config.k8s.io/v1alpha1
kind: KubeSchedulerConfiguration
clientConnection:
kubeconfig: /home/xyz2277/.kube/config
leaderElection:
leaderElect: false
leaseDuration: 25s
renewDeadline: 24s
algorithmSource:
policy:
file:
path: policy-config.json
policy-config.json:
{
"kind": "Policy",
"apiVersion": "v1",
"predicates": [
{
"name": "PodFitsHostPorts"
},
{
"name": "PodFitsResources"
},
{
"name": "NoDiskConflict"
},
{
"name": "MatchNodeSelector"
},
{
"name": "HostName"
}
],
"extenders": [
{
"urlPrefix": "http://<ip>:<port>/scheduler",
"apiVersion": "v1beta1",
"filterVerb": "predicates",
"prioritizeVerb": "prioritizes",
"bindVerb":   "bind",
"weight": 1,
"enableHttps": false,
"nodeCacheCapable": false,
"managedResources": [
{
"name": "ecloud.10086.cn/vcuda-core",
"ignoredByScheduler": false
}
]
}
],
"hardPodAffinitySymmetricWeight": 10,
"alwaysCheckAllPredicates": false
}

其中Filter扩展点的Extender外部扩展服务地址为http://<ip>:<port>/scheduler/predicates。

参考:

1.Kubernetes 官方文档

https://kubernetes.io/docs/concepts/scheduling-eviction/Kube-Scheduler

2.kuboard.cn文档

https://kuboard.cn/learning/k8s-advanced/schedule/

0 人点赞