Kubernetes 学习(九)Kubernetes 源码阅读之正式篇------核心组件之 Scheduler

2022-11-23 19:35:54 浏览数 (1)

0. 前言

  • 继续上一篇博客阅读 Kubernetes 源码,参照《k8s 源码阅读》首先学习 Kubernetes 的一些核心组件,首先是 kube-scheduler
  • 本文严重参考原文:《k8s 源码阅读》之 2.2 章节:scheduler,加入部分自己阅读的体会作为自己的阅读笔记
  • 感谢《k8s 源码阅读》的作者们辛苦编写教材,在此郑重表示感谢,望大家多多支持!~

1. 整体设计

1.1 概述

  • 官网描述:
    • The Kubernetes scheduler runs as a process alongside the other master components such as the API server.
    • Its interface to the API server is to watch for Pods with an empty PodSpec.NodeName, and for each Pod,
    • it posts a binding indicating where the Pod should be scheduled.
  • Scheduler 是相对独立的一个组件,主动访问 API server,寻找等待调度的 Pod(PodSpec.NodeName 为空)
  • 然后通过一系列调度算法寻找哪个 Node 适合跑这个 Pod
  • 然后将这个 Pod 和 Node 的绑定关系发给 API server,即整个调度流程

1.2 源码层次

  • cmd/kube-scheduler/scheduler.go:main() 函数入口位置,在 Scheduler 过程开始前的一系列初始化工作
  • pkg/scheduler/scheduler.go:调度框架整体逻辑,抽象调用各个算法的 Interface
  • pkg/scheduler/core/generic_scheduler.go:计算哪些 Node适合跑哪些 Pod 的具体算法

1.3 调度流程

  • 通过一系列的 Predicates(预选) 过滤掉不能运行 Pod 的 Node
    • 比如一个 Pod 需要 500M 的内存,有些节点剩余内存只有 100M 了,就会被剔除
  • 通过一系列的 Priorities(优选)给剩下的 Node 排一个分数,寻找能够运行 Pod 的 Node 中最合适的一个
  • 得分最高的一个 Node 胜出,获得了运行 Pod 的资格
  • 若是上述预选过程中没有找到任何一个合适的 Node,则进入抢占模式,移除部分低优先级的 Pod,再选择 Node

1.4 Predicates 和 Priorities 策略

  • Predicates 是用于过滤不合适 Node的策略
  • Priorities 是用于计算 Node 分数的策略(作用在通过 Predicates 过滤的 Node上)
  • Kubernetes 默认内建了一些 Predicates 和 Priorities 策略,代码分别在:
    • pkg/scheduler/algorithm/predicates/predicates.go
    • pkg/scheduler/algorithm/priorities/

1.5 调度策略的修改

  • 默认调度策略是通过 defaultPredicates() 和 defaultPriorities() 函数定义的
  • 代码在 pkg/scheduler/algorithmprovider/defaults/defaults.go
  • 可以通过命令行 flag --policy-config-file 来覆盖默认行为
    • 可以修改 pkg/scheduler/algorithm/predicates/predicates.go/pkg/scheduler/algorithm/priorities/,然后注册到 defaultPredicates() 和 defaultPriorities() 来实现
    • 或者通过配置文件:
代码语言:javascript复制
{
"kind" : "Policy",
"apiVersion" : "v1",
"predicates" : [
    {"name" : "PodFitsHostPorts"},
    {"name" : "PodFitsResources"},
    {"name" : "NoDiskConflict"},
    {"name" : "NoVolumeZoneConflict"},
    {"name" : "MatchNodeSelector"},
    {"name" : "HostName"}
    ],
"priorities" : [
    {"name" : "LeastRequestedPriority", "weight" : 1},
    {"name" : "BalancedResourceAllocation", "weight" : 1},
    {"name" : "ServiceSpreadingPriority", "weight" : 1},
    {"name" : "EqualPriority", "weight" : 1}
    ],
"hardPodAffinitySymmetricWeight" : 10,
"alwaysCheckAllPredicates" : false
}

2. 启动前逻辑

  • Scheduler 可以分为三层,第一层是调度器启动前的逻辑,包括:命令行参数解析、参数校验、调度器初始化等一系列逻辑

2.1 Cobra

2.1.1 简介

  • Cobra 既是一个创建强大的现代化命令行程序的库,又是一个用于生成应用和命令行文件的程序。有很多流行的 Golang 项目用了 Cobra(Kubernetes 和 Docker)

2.1.2 具体使用

  • Go 1.11 版本以上先暂时关闭 go modules,安装 Cobra:go get -u github.com/spf13/cobra/cobra
    • 若是网卡,可能导致安装不成功,可以先下载报错的依赖包,再重新 go get 安装
    • 安装成功后会看到 $GOPATH/
  • 进入 $GOPATH/src 目录,初始化项目:cobra init myapp --pkg-name myapp
代码语言:javascript复制
$ cobra init myapp --pkg-name myapp
Your Cobra applicaton is ready at
/home/ao/go/src/myapp
$ ls myapp 
cmd  LICENSE  main.go
$ pwd
/home/ao/go/src
  • 本地可以看到一个 main.go 和一个 cmd 目录
  • 查看 cmd/root.go:
代码语言:javascript复制
package cmd

import (
  "fmt"
  "os"
  "github.com/spf13/cobra"

  homedir "github.com/mitchellh/go-homedir"
  "github.com/spf13/viper"

)


var cfgFile string


// rootCmd represents the base command when called without any subcommands
var rootCmd = &cobra.Command{
  Use:   "myapp",
  Short: "A brief description of your application",
  Long: `A longer description that spans multiple lines and likely contains
examples and usage of using your application. For example:

Cobra is a CLI library for Go that empowers applications.
This application is a tool to generate the needed files
to quickly create a Cobra application.`,
  // Uncomment the following line if your bare application
  // has an action associated with it:
  //    Run: func(cmd *cobra.Command, args []string) { },
}
  • 查看 main.go:
代码语言:javascript复制
package main

import "myapp/cmd"

func main() {
  cmd.Execute()
}
  • main.go 里 import了 myapp/cmd(也就是 root.go 文件)
    • 在 Execute 里面调用了 rootCmd.Execute() 方法
    • rootCmd 是*cobra.Command 类型
  • 添加新的命令:cobra add version
代码语言:javascript复制
$ cobra add version                
version created at /home/ao/go/src/myapp% 
  • 查看 cmd/version.go:
    • init() 函数里面调用 rootCmd.AddCommand(versionCmd),子命令就是 version
代码语言:javascript复制
package cmd

import (
    "fmt"

    "github.com/spf13/cobra"
)

// versionCmd represents the version command
var versionCmd = &cobra.Command{
    Use:   "version",
    Short: "A brief description of your command",
    Long: `A longer description that spans multiple lines and likely contains examples
and usage of using your command. For example:

Cobra is a CLI library for Go that empowers applications.
This application is a tool to generate the needed files
to quickly create a Cobra application.`,
    Run: func(cmd *cobra.Command, args []string) {
        fmt.Println("version called")
    },
}

func init() {
    rootCmd.AddCommand(versionCmd)
    ......
}
  • 添加多级子命令:
代码语言:javascript复制
$ cobra add server 
server created at /home/ao/go/src/myapp%                                                                                         
$ cobra add create -p serverCmd
create created at /home/ao/go/src/myapp%    
  • 查看 cmd/create.go 文件:
    • init() 函数调用 serverCmd.AddCommand(createCmd),表示 create 为子命令
代码语言:javascript复制
package cmd

import (
    "fmt"

    "github.com/spf13/cobra"
)

// createCmd represents the create command
var createCmd = &cobra.Command{
    Use:   "create",
    Short: "A brief description of your command",
    Long: `A longer description that spans multiple lines and likely contains examples
and usage of using your command. For example:

Cobra is a CLI library for Go that empowers applications.
This application is a tool to generate the needed files
to quickly create a Cobra application.`,
    Run: func(cmd *cobra.Command, args []string) {
        fmt.Println("create called")
    },
}

func init() {
    serverCmd.AddCommand(createCmd)
    ......
}

2.2 Scheduler 的 main 函数

  • 下面正式进入 Kubernetes 的 Scheduler 源码阅读
  • 查看 cmd/kube-scheduler/scheduler.go,删除非主干代码:
代码语言:javascript复制
func main() {
    command := app.NewSchedulerCommand()
    if err := command.Execute(); err != nil {
        fmt.Fprintf(os.Stderr, "%vn", err)
        os.Exit(1)
    }
}
  • 查看定义 NewSchedulerCommand 的 cmd/kube-scheduler/app/server.go,删除非主干代码:
代码语言:javascript复制
/ NewSchedulerCommand creates a *cobra.Command object with default parameters
func NewSchedulerCommand() *cobra.Command {
    cmd := &cobra.Command{
        Use: "kube-scheduler",
        Long: `The Kubernetes scheduler is a policy-rich, topology-aware,
workload-specific function that significantly impacts availability, performance,
and capacity. The scheduler needs to take into account individual and collective
resource requirements, quality of service requirements, hardware/software/policy
constraints, affinity and anti-affinity specifications, data locality, inter-workload
interference, deadlines, and so on. Workload-specific requirements will be exposed
through the API as necessary.`,
        Run: func(cmd *cobra.Command, args []string) {
            if err := runCommand(cmd, args, opts); err != nil {
                fmt.Fprintf(os.Stderr, "%vn", err)
                os.Exit(1)
            }
        },
    }
    return cmd
}
  • Schduler 启动时调用了 runCommand(cmd, args, opts),查看 cmd/kube-scheduler/app/server.go 关于 runCommand 定义,删除非主干代码:
代码语言:javascript复制
// runCommand runs the scheduler.
func runCommand(cmd *cobra.Command, args []string, opts *options.Options) error {
    c, err := opts.Config()
    stopCh := make(chan struct{})
    // Get the completed config
    cc := c.Complete()
    return Run(cc, stopCh)
}
  • 处理配置问题后调用了一个 Run() 函数,Run() 的作用是基于给定的配置启动 Scheduler,它只会在出错时或者 channel stopCh 被关闭时才退出,查看 cmd/kube-scheduler/app/server.go 关于 Run 定义,删除非主干代码:
代码语言:javascript复制
// Run executes the scheduler based on the given configuration. It only return on error or when stopCh is closed.
func Run(cc schedulerserverconfig.CompletedConfig, stopCh <-chan struct{}) error {
    // Create the scheduler.
    sched, err := scheduler.New(cc.Client,
        cc.InformerFactory.Core().V1().Nodes(),
        stopCh,
        scheduler.WithName(cc.ComponentConfig.SchedulerName))

    // Prepare a reusable runCommand function.
    run := func(ctx context.Context) {
        sched.Run()
        <-ctx.Done()
    }

    ctx, cancel := context.WithCancel(context.TODO()) 
    defer cancel()

    go func() {
        select {
        case <-stopCh:
            cancel()
        case <-ctx.Done():
        }
    }()

    // Leader election is disabled, so runCommand inline until done.
    run(ctx)
    return fmt.Errorf("finished without leader elect")
}
  • 最终是要跑 sched.Run() 这个方法来启动 Scheduler,sched.Run() 方法就是 Scheduler 框架真正运行的逻辑
  • 上述有一个 sched 变量,linux 里经常会看到一些软件叫 ***d,d 也就是 daemon,守护进程的意思,也就是一直跑在后台的一个程序
    • 这里的 sched 也就是指 “Scheduler Daemon“
    • sched 的其实是 *Scheduler 类型,定义在 pkg/scheduler/scheduler.go
      • Scheduler 监控新创建的未被调度的 Pods,然后尝试寻找合适的 Node,回写一个绑定关系到 API Server
      • 这里也可以体会到 Daemon 的感觉,我们平时搭建的 k8s 集群中运行着一个 Daemon 进程叫做 kube-scheduler,在程序里面也就对应这样一个对象:Scheduler
代码语言:javascript复制
// Scheduler watches for new unscheduled pods. It attempts to find
// nodes that they fit on and writes bindings back to the api server.
type Scheduler struct {
    config *factory.Config
}
  • Scheduler 结构体中的 Config 属性对象定义在 pkg/scheduler/factory/factory.go,主要属性如下:
代码语言:javascript复制
// Config is an implementation of the Scheduler's configured input data.
type Config struct {
    // It is expected that changes made via SchedulerCache will be observed
    // by NodeLister and Algorithm.
    SchedulerCache schedulerinternalcache.Cache
    // Ecache is used for optimistically invalid affected cache items after
    // successfully binding a pod
    Ecache     *equivalence.Cache
    NodeLister algorithm.NodeLister
    Algorithm  algorithm.ScheduleAlgorithm
    GetBinder  func(pod *v1.Pod) Binder
    // PodConditionUpdater is used only in case of scheduling errors. If we succeed
    // with scheduling, PodScheduled condition will be updated in apiserver in /bind
    // handler so that binding and setting PodCondition it is atomic.
    PodConditionUpdater PodConditionUpdater
    // PodPreemptor is used to evict pods and update pod annotations.
    PodPreemptor PodPreemptor

    // NextPod should be a function that blocks until the next pod
    // is available. We don't use a channel for this, because scheduling
    // a pod may take some amount of time and we don't want pods to get
    // stale while they sit in a channel.
    NextPod func() *v1.Pod

    // SchedulingQueue holds pods to be scheduled
    SchedulingQueue internalqueue.SchedulingQueue
}

3. 整体调度框架

3.1 启动

  • 调度的第二层:负责 Scheduler 除了具体 Node 过滤算法外的工作逻辑
  • Scheduler 对象绑定了一个 Run() 方法,定义如下:
代码语言:javascript复制
// Run begins watching and scheduling. It waits for cache to be synced, then starts a goroutine and returns immediately.
func (sched *Scheduler) Run() {
    if !sched.config.WaitForCacheSync() {
        return
    }
    go wait.Until(sched.scheduleOne, 0, sched.config.StopEverything)
}
  • Run 函数在 cmd/kube-scheduler/app/server.go 调用:
代码语言:javascript复制
    // Prepare a reusable runCommand function.
    run := func(ctx context.Context) {
        sched.Run()
        <-ctx.Done()
    }
  • 调用了 sched.Run() 之后就等待 ctx.Done()
  • wait.Until 实现:每隔 n 时间调用 f 一次,除非 channel c 被关闭
    • 这里的 n 就是 0,也就是一直调用,前一次调用返回下一次调用就开始了
    • f 指 sched.scheduleOne,c 指 sched.config.StopEverything

3.2 一个 Pod 的基本调度流程

  • scheduleOne 实现 1 个 Pod 的完整调度工作流
  • 这个过程是顺序执行的,也就是非并发的
    • 即前一个 Pod 的 scheduleOne 一完成,下一个 Pod 的 ScheduleOne 立马接着执行
  • scheduleOne 的主要逻辑:
代码语言:javascript复制
func (sched *Scheduler) scheduleOne() {
    pod := sched.config.NextPod()
    suggestedHost, err := sched.schedule(pod)
    if err != nil {
        if fitError, ok := err.(*core.FitError); ok {
            preemptionStartTime := time.Now()
            sched.preempt(pod, fitError)
        }
        return
    }
    assumedPod := pod.DeepCopy()
    allBound, err := sched.assumeVolumes(assumedPod, suggestedHost)
    err = sched.assume(assumedPod, suggestedHost)
    go func() {
        err := sched.bind(assumedPod, &v1.Binding{
            ObjectMeta: metav1.ObjectMeta{Namespace: assumedPod.Namespace, Name: assumedPod.Name, UID: assumedPod.UID},
            Target: v1.ObjectReference{
                Kind: "Node",
                Name: suggestedHost,
            },
        })
    }()
}
  • 参考流程如下:

3.3 Schedule 算法计算合适 Node

  • 主流程核心步骤是 suggestedHost, err := sched.schedule(pod)
  • 这里完成了非抢占模式下 Node 的计算,包括预选过程、优选过程等
  • sched.schedule(pod) 方法定义见 pkg/scheduler/scheduler.go
代码语言:javascript复制
// schedule implements the scheduling algorithm and returns the suggested host.
func (sched *Scheduler) schedule(pod *v1.Pod) (string, error) {
    host, err := sched.config.Algorithm.Schedule(pod, sched.config.NodeLister)
    if err != nil {
        pod = pod.DeepCopy()
        sched.config.Error(pod, err)
        sched.config.Recorder.Eventf(pod, v1.EventTypeWarning, "FailedScheduling", "%v", err)
        sched.config.PodConditionUpdater.Update(pod, &v1.PodCondition{
            Type:          v1.PodScheduled,
            Status:        v1.ConditionFalse,
            LastProbeTime: metav1.Now(),
            Reason:        v1.PodReasonUnschedulable,
            Message:       err.Error(),
        })
        return "", err
    }
    return host, err
}
  • 里面调用了 sched.config.Algorithm.Schedule() 方法(本身为接口),定义见 pkg/scheduler/algorithm/scheduler_interface.go
代码语言:javascript复制
type ScheduleAlgorithm interface {
    Schedule(*v1.Pod, NodeLister) (selectedMachine string, err error)
    Preempt(*v1.Pod, NodeLister, error) (selectedNode *v1.Node, preemptedPods []*v1.Pod, cleanupNominatedPods []*v1.Pod, err error)
    Predicates() map[string]FitPredicate
    Prioritizers() []PriorityConfig
}
  • 这个接口有 4 个方法,实现 ScheduleAlgorithm 接口的对象实现如何调度 Pods 到 Nodes 上
  • 默认的实现是 pkg/scheduler/core/generic_scheduler.gogenericScheduler 对象
    • Schedule():给定 Pod 和 Nodes,计算出一个适合跑 Pod 的 Node 并返回
    • Preempt():抢占模式
    • Predicates():预选过程
    • Prioritizers():优选过程

4. 一般调度过程

6. 参考文献

  • 《k8s 源码阅读》之 2.2 章节:scheduler

0 人点赞