三、主流程
关键代码逻辑,梳理成流程图,可以对照查看。高清图
3.1 main 启动入口
代码语言:go复制func main() {
// 选取leader
leaderElection := defaultLeaderElectionConfiguration()
leaderElection.LeaderElect = true
go func() {
// 注册指标、快照、监控检查接口
pathRecorderMux := mux.NewPathRecorderMux("cluster-autoscaler")
defaultMetricsHandler := legacyregistry.Handler().ServeHTTP
pathRecorderMux.HandleFunc("/metrics", func(w http.ResponseWriter, req *http.Request) {
defaultMetricsHandler(w, req)
})
if *debuggingSnapshotEnabled {
pathRecorderMux.HandleFunc("/snapshotz", debuggingSnapshotter.ResponseHandler)
}
pathRecorderMux.HandleFunc("/health-check", healthCheck.ServeHTTP)
err := http.ListenAndServe(*address, pathRecorderMux)
}()
if !leaderElection.LeaderElect {
run(healthCheck, debuggingSnapshotter)
} else {
...
// 入口函数
run(healthCheck, debuggingSnapshotter)
...
}
}
3.2 run
- 初始化 autoscaler
- 调用 autoscaler.Start,后台刷新缓存
- 间隔执行(默认10s)autoscaler.RunOnce 方法,实现扩缩容逻辑
func run(healthCheck *metrics.HealthCheck, debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter) {
metrics.RegisterAll(*emitPerNodeGroupMetrics)
// 构造 autoscaler 对象
autoscaler, err := buildAutoscaler(debuggingSnapshotter)
...
// 在后台启动 autoscaler
if err := autoscaler.Start(); err != nil {
klog.Fatalf("Failed to autoscaler background components: %v", err)
}
// Autoscale ad infinitum.
for {
select {
// 默认 10s 执行一次
case <-time.After(*scanInterval):
{
...
// 开始执行
err := autoscaler.RunOnce(loopStart)
...
}
}
}
}
3.3 autoscaler
3.3.1 buildAutoscaler
代码语言:go复制func buildAutoscaler(debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter) (core.Autoscaler, error) {
...
// Create autoscaler.
return core.NewAutoscaler(opts)
}
3.3.2 NewAutoscaler
代码语言:go复制func NewAutoscaler(opts AutoscalerOptions) (Autoscaler, errors.AutoscalerError) {
// 这个方法主要是做一下初始化工作,其中 provider 的初始化在介绍 aws provider 创建流程时介绍过
// 内部会初始化 awsManager、asgCache,并从云厂商同步 asg信息到本地缓存
err := initializeDefaultOptions(&opts)
if err != nil {
return nil, errors.ToAutoscalerError(errors.InternalError, err)
}
// 实例化 AutoScaler
return NewStaticAutoscaler(
opts.AutoscalingOptions,
opts.PredicateChecker,
opts.ClusterSnapshot,
opts.AutoscalingKubeClients,
opts.Processors,
opts.CloudProvider,
opts.ExpanderStrategy,
opts.EstimatorBuilder,
opts.Backoff,
opts.DebuggingSnapshotter), nil
}
initializeDefaultOptions
初始化的内容有:
- Processor:各种前置处理器
- PredicateChecker:扩容前的预调度检查
- CloudProvider:前面介绍过,主要用于操作 IaaS 云资源
- Estimator:评估扩容节点
- Expander:从多个符合扩容条件的 NodeGroup 中选择最终 Node 的策略
func initializeDefaultOptions(opts *AutoscalerOptions) error {
// 初始化 processor
if opts.Processors == nil {
opts.Processors = ca_processors.DefaultProcessors()
}
if opts.AutoscalingKubeClients == nil {
opts.AutoscalingKubeClients = context.NewAutoscalingKubeClients(opts.AutoscalingOptions, opts.KubeClient, opts.EventsKubeClient)
}
// 初始化前置校验
if opts.PredicateChecker == nil {
predicateCheckerStopChannel := make(chan struct{})
predicateChecker, err := simulator.NewSchedulerBasedPredicateChecker(opts.KubeClient, predicateCheckerStopChannel)
if err != nil {
return err
}
opts.PredicateChecker = predicateChecker
}
// 初始化快照
if opts.ClusterSnapshot == nil {
opts.ClusterSnapshot = simulator.NewBasicClusterSnapshot()
}
// 初始化 provider
if opts.CloudProvider == nil {
opts.CloudProvider = cloudBuilder.NewCloudProvider(opts.AutoscalingOptions)
}
// 初始化 expander 策略
if opts.ExpanderStrategy == nil {
expanderStrategy, err := factory.ExpanderStrategyFromStrings(strings.Split(opts.ExpanderNames, ","), opts.CloudProvider,
opts.AutoscalingKubeClients, opts.KubeClient, opts.ConfigNamespace, opts.GRPCExpanderCert, opts.GRPCExpanderURL)
if err != nil {
return err
}
opts.ExpanderStrategy = expanderStrategy
}
// 初始化 Estimate 策略
if opts.EstimatorBuilder == nil {
estimatorBuilder, err := estimator.NewEstimatorBuilder(opts.EstimatorName, estimator.NewThresholdBasedEstimationLimiter(opts.MaxNodesPerScaleUp, opts.MaxNodeGroupBinpackingDuration))
if err != nil {
return err
}
opts.EstimatorBuilder = estimatorBuilder
}
// 初始化 Backoff 策略
if opts.Backoff == nil {
opts.Backoff =
backoff.NewIdBasedExponentialBackoff(opts.InitialNodeGroupBackoffDuration, opts.MaxNodeGroupBackoffDuration, opts.NodeGroupBackoffResetTimeout)
}
return nil
}
3.3.3 Autoscaler.Start
定时从 cloud provider 获取 node group 以及 node group 下的 instance 信息,并刷新本地缓存
代码语言:go复制func (a *StaticAutoscaler) Start() error {
a.clusterStateRegistry.Start()
return nil
}
func (csr *ClusterStateRegistry) Start() {
csr.cloudProviderNodeInstancesCache.Start(csr.interrupt)
}
// 后台定时刷新数据
func (cache *CloudProviderNodeInstancesCache) Start(interrupt chan struct{}) {
go wait.Until(func() {
cache.Refresh()
}, CloudProviderNodeInstancesCacheRefreshInterval, interrupt)
}
Refresh
代码语言:go复制// Refresh refreshes cache.
func (cache *CloudProviderNodeInstancesCache) Refresh() {
// 从 cloud provider 获取 node group
// 调用 cloud provider 的第一个扩展点
nodeGroups := cache.cloudProvider.NodeGroups()
// 移除不存在的 node group
cache.removeEntriesForNonExistingNodeGroupsLocked(nodeGroups)
for _, nodeGroup := range nodeGroups {
// 调用 cloud provider 中 node group 接口扩展点
nodeGroupInstances, err := nodeGroup.Nodes()
// 更新缓存中的 node group
cache.updateCacheEntryLocked(nodeGroup, &cloudProviderNodeInstancesCacheEntry{nodeGroupInstances, time.Now()})
}
}
3.3.4 Autoscaler.RunOnce
关键逻辑:
- 获取现有集群所有 Node,以及Node上运行的Pod信息
- 经过几个 Processor 模块处理,将 Node 和 Pod 做分类规整到所属 asgCache 中的 各个asg 下,保存在 nodeInfosForGroups 中
- 获取所有资源不足导致 pending 的 pod
- 经过几个 Processor 模块处理,将未调度 pod 做处理,保存在 unschedulablePodsToHelp 中
- 根据 unschedulablePodsToHelp 判断当前是否需要执行 ScaleUp 进行扩容
- 根据是否配置了缩容参数ScaleDownEnabled,判断是否要进行缩容
核心逻辑伪代码
代码语言:go复制func (a *StaticAutoscaler) RunOnce() {
// 获取节点信息
allNodes, readyNodes := a.obtainNodeLists(a.CloudProvider)
// 将 Node 信息按照 asg 的id做分类规整
nodeInfosForGroups := a.processors.TemplateNodeInfoProvider.Process(...)
// 获取未调度的 pod
unschedulablePods, err := unschedulablePodLister.List()
// pod按调度类型分类
unschedulablePodsToHelp := a.processors.PodListProcessor.Process(unschedulablePods)
if len(unschedulablePodsToHelp) == 0 {
// 不需要扩容
} else if a.MaxNodesTotal > 0 && len(readyNodes) >= a.MaxNodesTotal {
// 扩容达到上限
} else if allPodsAreNew(unschedulablePodsToHelp, currentTime) {
// Node 扩容过程中,pod 新创建,等待一定冷却周期再尝试扩容
} else {
// 启动扩容
ScaleUp()
}
// 如果开启缩容
if a.ScaleDownEnabled {
// 缩容逻辑
}
}
RunOnce 实现细节如下:
代码语言:go复制func (a *StaticAutoscaler) RunOnce(currentTime time.Time) errors.AutoscalerError {
...
// 初始化获取未调度 pod的对象
unschedulablePodLister := a.UnschedulablePodLister()
// 获取所有的 node、ready 的node
allNodes, readyNodes, typedErr := a.obtainNodeLists(a.CloudProvider)
originalScheduledPods, err := scheduledPodLister.List()
// 计算集群资源,获取 node.Status.Capacity[resource] 的值
coresTotal, memoryTotal := calculateCoresMemoryTotal(allNodes, currentTime)
// 获取 ds 相关pod,后期加入调度器参与模拟调度
daemonsets, err := a.ListerRegistry.DaemonSetLister().List(labels.Everything())
// 手动刷新云资源,保持与本地缓存同步
err = a.AutoscalingContext.CloudProvider.Refresh()
// 找到 pod.Spec.Priority 值小于设定值 ExpendablePodsPriorityCutoff 的 pod
// 这些 pod 优先级高,不可以被 expend
nonExpendableScheduledPods := core_utils.FilterOutExpendablePods(originalScheduledPods, a.ExpendablePodsPriorityCutoff)
// TemplateNodeInfo
// 将所有运行的 pod,按照不同的 node分类存储好,构造出 NodeInfo 对象,为后续调度准备数据
// 取出 pod.Spec.NodeName, 依次存储好
// 依次调用了
// 1. MixedTemplateNodeInfoProvider
// 2. AnnotationNodeInfoProvider
nodeInfosForGroups, autoscalerError := a.processors.TemplateNodeInfoProvider.Process(autoscalingContext, readyNodes, daemonsets, a.ignoredTaints, currentTime)
// NodeInfoProcessor
nodeInfosForGroups, err = a.processors.NodeInfoProcessor.Process(autoscalingContext, nodeInfosForGroups)
// 获取未注册的 node(在 CA node group 中,但是未注册到 k8s)
unregisteredNodes := a.clusterStateRegistry.GetUnregisteredNodes()
if len(unregisteredNodes) > 0 {
// 删除这些 node
removedAny, err := removeOldUnregisteredNodes(unregisteredNodes, autoscalingContext,
a.clusterStateRegistry, currentTime, autoscalingContext.LogRecorder)
}
danglingNodes, err := a.deleteCreatedNodesWithErrors()
// 调整 node group size
fixedSomething, err := fixNodeGroupSize(autoscalingContext, a.clusterStateRegistry, currentTime)
// 获取未调度 pod
// 未调度 pod 的排查规则:
// selector := fields.ParseSelectorOrDie("spec.nodeName==" "" ",status.phase!="
// string(apiv1.PodSucceeded) ",status.phase!=" string(apiv1.PodFailed))
unschedulablePods, err := unschedulablePodLister.List()
unschedulablePodsToHelp, _ := a.processors.PodListProcessor.Process(a.AutoscalingContext, unschedulablePods)
unschedulablePodsToHelp = a.filterOutYoungPods(unschedulablePodsToHelp, currentTime)
// 根据未调度的 pod 数量,判断是否需要扩容
if len(unschedulablePodsToHelp) == 0 {
// 没有未调度的 pod,不需要扩容
scaleUpStatus.Result = status.ScaleUpNotNeeded
klog.V(1).Info("No unschedulable pods")
} else if a.MaxNodesTotal > 0 && len(readyNodes) >= a.MaxNodesTotal {
/// 已经达到扩容上限
scaleUpStatus.Result = status.ScaleUpNoOptionsAvailable
klog.V(1).Info("Max total nodes in cluster reached")
} else if allPodsAreNew(unschedulablePodsToHelp, currentTime) {
// 大量 pod 同时创建,一段时间内不再触发扩容,处于冷却期
a.processorCallbacks.DisableScaleDownForLoop()
scaleUpStatus.Result = status.ScaleUpInCooldown
klog.V(1).Info("Unschedulable pods are very new, waiting one iteration for more")
} else {
scaleUpStart := time.Now()
metrics.UpdateLastTime(metrics.ScaleUp, scaleUpStart)
// 真正执行扩容动作
scaleUpStatus, typedErr = ScaleUp(autoscalingContext, a.processors, a.clusterStateRegistry, unschedulablePodsToHelp, readyNodes, daemonsets, nodeInfosForGroups, a.ignoredTaints)
}
}
MixedTemplateNodeInfoProvider
将所有的 Node,以及 Node 上运行的 pod,按照 asg 的 id归类保存
代码语言:go复制func (p *MixedTemplateNodeInfoProvider) Process(ctx *context.AutoscalingContext, nodes []*apiv1.Node, daemonsets []*appsv1.DaemonSet, ignoredTaints taints.TaintKeySet, now time.Time) (map[string]*schedulerframework.NodeInfo, errors.AutoscalerError) {
...
// 获取 node 上运行的所有的 pod,key是 node-name,value 是 pod 列表
podsForNodes, err := getPodsForNodes(ctx.ListerRegistry)
// 定义一个回调函数,处理 node 节点
processNode := func(node *apiv1.Node) (bool, string, errors.AutoscalerError) {
// 根据 node信息,调用 clouder provider 扩展点,获取 node group 信息
// aws: 根据 node.Spec.ProviderID 调用 aws-sdk 获取
nodeGroup, err := ctx.CloudProvider.NodeGroupForNode(node)
// 得到 node group 的 id
id := nodeGroup.Id()
if _, found := result[id]; !found {
// 根据给定的node,构造节点信息,确认是否是需要创建的 node
// getRequiredPodsForNode:将 node 上的 dameonset pod 单独取出来(新节点也必须要运行这些 pod)
// schedulerframework.NewNodeInfo: 构造新的 node 信息,都是调度框架的函数
// pInfo.Update(pod):计算 pod 的亲和性信息
// n.AddPodInfo(...):计算 cpu、memory、端口占用、pvc 引用等信息
nodeInfo, err := simulator.BuildNodeInfoForNode(node, podsForNodes)
if err != nil {
return false, "", err
}
// 修改从 template 中生成的 node 信息,避免使用了重复的主机名
// sanitizeTemplateNode:根据 node group 规则自动生成主机名、新增 trait 信息
// schedulerframework.NewNodeInfo:更新完主机信息后,再次调用调度框架
sanitizedNodeInfo, err := utils.SanitizeNodeInfo(nodeInfo, id, ignoredTaints)
result[id] = sanitizedNodeInfo
return true, id, nil
}
return false, "", nil
}
// 从 Node Group 中扩展新的节点
for _, nodeGroup := range ctx.CloudProvider.NodeGroups() {
// nodeGroup.TemplateNodeInfo() 获取 节点模板
// daemonset.GetDaemonSetPodsForNode: 根据 ds 和 node 返回 pod
// schedulerframework.NewNodeInfo:构造完整的 pod
nodeInfo, err := utils.GetNodeInfoFromTemplate(nodeGroup, daemonsets, ctx.PredicateChecker, ignoredTaints)
result[id] = nodeInfo
}
}
AnnotationNodeInfoProvider
从 asg 中获取注解信息,并追加到 NodeInfo 中,便于后续参与调度
代码语言:go复制func (p *AnnotationNodeInfoProvider) Process(ctx *context.AutoscalingContext, nodes []*apiv1.Node, daemonsets []*appsv1.DaemonSet, ignoredTaints taints.TaintKeySet, currentTime time.Time) (map[string]*schedulerframework.NodeInfo, errors.AutoscalerError) {
// 先经过 mixedTemplateNodeInfoProvider 处理
nodeInfos, err := p.mixedTemplateNodeInfoProvider.Process(ctx, nodes, daemonsets, ignoredTaints, currentTime)
for _, nodeInfo := range nodeInfos {
// 拿到所有的 node group
nodeGroup, err := ctx.CloudProvider.NodeGroupForNode(nodeInfo.Node())
// 获取 node group 模板
template, err := nodeGroup.TemplateNodeInfo()
// 获取模板 Annotation 信息
for key, val := range template.Node().Annotations {
if _, ok := nodeInfo.Node().Annotations[key]; !ok {
// 将模板 annotation 添加到 node 上
nodeInfo.Node().Annotations[key] = val
}
}
}
return nodeInfos, nil
}
3.4 ScaleUp
- 找到 cloud provider 所有可用的 node group
- 把不可调度的 pod 按照扩容需求进行分组
- 调用
- 将前两步得到的数据作为输入,传给 estimator 模块的装箱算法,得到候选的 pod、node 分配方案
- 将上一步得到的结果,传给 expander 模块,得到最优的分配方案。默认提供好几种策略
伪代码实现关键步骤:
代码语言:go复制// 前面的动作,将集群所有 Node 和 Pod 做规整,构造 NodeInfo 信息,按 nodeGroupId 建立索引
nodeInfosForGroups, autoscalerError := a.processors.TemplateNodeInfoProvider.Process(...)
func ScaleUp(...) {
// 获取所有可用的 node group
nodeGroups := context.CloudProvider.NodeGroups()
// 将所有待调度的 pod 按照调度属性分类
podEquivalenceGroups := buildPodEquivalenceGroups(unschedulablePods)
expansionOptions := make(map[string]expander.Option, 0)
// 遍历所有的 node group
for _, nodeGroup := range nodeGroups {
// 获取当前 node group 的 nodeInfo 信息
nodeInfo, found := nodeInfos[nodeGroup.Id()]
// 计算当前 asg 能够提供的cpu、memory 资源量,确认是否超过限额
scaleUpResourcesDelta := computeScaleUpResourcesDelta(nodeInfo, nodeGroup, resourceLimiter)
// 调用 Extimate 模块背包算法,计算出当前 node group 下需要扩展几台 Node,能满足哪些 pod 调度,保存在 option 变量中
option, err := computeExpansionOption(podEquivalenceGroups, nodeGroup, nodeInfo, upcomingNodes)
// 将该 NodeGroup 扩容的情况保存起来
expansionOptions[nodeGroup.Id()] = option
}
// 计算结果重命名为 options
// 此时有多个满足条件的结果
options := expansionOptions
// 调用 Expander 模块,根据启动传入的策略参数,从多个选项中选择最终一个结果
bestOption := context.ExpanderStrategy.BestOption(options, nodeInfos)
// 如果 NodeGroup 不存在,创建 NodeGroup
processors.NodeGroupManager.CreateNodeGroup(context, bestOption.NodeGroup)
// 负载均衡策略计算多个 NodeGroup中各自需要扩容的信息
scaleUpInfos, typedErr := processors.NodeGroupSetProcessor.BalanceScaleUpBetweenGroups()
for _, info := range scaleUpInfos {
// 调用 provider 执行扩容
executeScaleUp(...)
// 负载均衡
clusterStateRegistry.Recalculate()
}
}
ScaleUp实现细节如下:
代码语言:go复制func ScaleUp(...) {
...
// 第一步:找到 cloud provider 所有可用的 node group
// 返回 node 列表中不在 node group 中的 node 子集
nodesFromNotAutoscaledGroups, err := utils.FilterOutNodesFromNotAutoscaledGroups(nodes, context.CloudProvider)
// 计算扩容可用的剩余资源
// calculateScaleUpCoresMemoryTotal:计算 node group 和非 Node group 所有的资源
// sum(nodeGroup.targetSize * nodeInfo.cpu(memory) )
// computeBelowMax(totalCores, max):根据 CA 配置的资源限额 - 目前所有已使用资源 = 可扩容的资源余量
scaleUpResourcesLeft, errLimits := computeScaleUpResourcesLeftLimits(context, processors, nodeGroups, nodeInfos, nodesFromNotAutoscaledGroups, resourceLimiter)
// Node在NodeGroup中但是没有Registed在kubenetes集群
// 数量为 newNodes := ar.CurrentTarget - (readiness.Ready readiness.Unready readiness.LongUnregistered)
upcomingNodes := make([]*schedulerframework.NodeInfo, 0)
for nodeGroup, numberOfNodes := range clusterStateRegistry.GetUpcomingNodes() {
}
processors.NodeGroupListProcessor.Process(...)
// 第二步:将所有待调度 pod 按照扩容需求分类
podEquivalenceGroups := buildPodEquivalenceGroups(unschedulablePods)
// 评估所有的 node group,哪些不可用跳过,哪些可用
skippedNodeGroups := map[string]status.Reasons{}
// 外层循环,遍历所有的 NodeGroup
for _, nodeGroup := range nodeGroups {
if nodeGroup.Exist() && !clusterStateRegistry.IsNodeGroupSafeToScaleUp(nodeGroup, now) {
...
}
// 取出当前大小
currentTargetSize, err := nodeGroup.TargetSize()
// 计算扩容需要的增量资源
// 取出 node group 下对应的 cpu、memory 信息
scaleUpResourcesDelta, err := computeScaleUpResourcesDelta(context, processors, nodeInfo, nodeGroup, resourceLimiter)
// 校验是否超过限额,对比可扩容量和待扩容量
checkResult := scaleUpResourcesLeft.checkScaleUpDeltaWithinLimits(scaleUpResourcesDelta)
// 第三步:将前两步得到的数据作为输入,传给 estimator 模块的装箱算法,得到候选的 pod、node 分配方案
option, err := computeExpansionOption(context, podEquivalenceGroups, nodeGroup, nodeInfo, upcomingNodes)
}
// 第四步:将上一步得到的结果,传给 expander 模块,得到最优的分配方案
// 根据expansion (random ,mostpods, price,waste)配置获取最佳伸缩组
// expander 是表示选择 node group 的策略
bestOption := context.ExpanderStrategy.BestOption(options, nodeInfos)
if bestOption != nil && bestOption.NodeCount > 0 {
// 得到需要扩容的节点数
newNodes := bestOption.NodeCount
// 判断是否达到扩容上限
if context.MaxNodesTotal > 0 && len(nodes) newNodes len(upcomingNodes) > context.MaxNodesTotal {
}
// 不存在 node group,创建新的
if !bestOption.NodeGroup.Exist() {
// 创建的 ng 包括主的、扩展的
createNodeGroupResult, err := processors.NodeGroupManager.CreateNodeGroup(context, bestOption.NodeGroup)
// 根据主 ng 创建 nodeinfo
// 将 daemonset 追加到到 node group 模板创建出来的 node节点 pod 列表中
// trait 信息追加到新创建 node 的 Spec.Trait 中
// 填充 node name
mainCreatedNodeInfo, err := utils.GetNodeInfoFromTemplate(createNodeGroupResult.MainCreatedNodeGroup, daemonSets, context.PredicateChecker, ignoredTaints)
// 依次创建多个扩展的 ng
for _, nodeGroup := range createNodeGroupResult.ExtraCreatedNodeGroups {
option, err2 := computeExpansionOption(context, podEquivalenceGroups, nodeGroup, nodeInfo, upcomingNodes)
}
// 重新计算缓存中节点信息
clusterStateRegistry.Recalculate()
// 计算出要扩容的资源
newNodes, err = applyScaleUpResourcesLimits(context, processors, newNodes, scaleUpResourcesLeft, nodeInfo, bestOption.NodeGroup, resourceLimiter)
if context.BalanceSimilarNodeGroups {
// 找到相似的 ng
similarNodeGroups, typedErr := processors.NodeGroupSetProcessor.FindSimilarNodeGroups(context, bestOption.NodeGroup, nodeInfos)
}
// 平衡多个 ng
scaleUpInfos, typedErr := processors.NodeGroupSetProcessor.BalanceScaleUpBetweenGroups(
context, targetNodeGroups, newNodes)
// 依次遍历所有待扩容的机器,执行扩容操作
for _, info := range scaleUpInfos {
// executeScaleUp 会执行 clouder provider 中的 IncreaseSize 方法
typedErr := executeScaleUp(context, clusterStateRegistry, info, gpu.GetGpuTypeForMetrics(gpuLabel, availableGPUTypes, nodeInfo.Node(), nil), now)
}
clusterStateRegistry.Recalculate()
// 返回扩容成功
return &status.ScaleUpStatus{
Result: status.ScaleUpSuccessful,
ScaleUpInfos: scaleUpInfos,
PodsRemainUnschedulable: getRemainingPods(podEquivalenceGroups, skippedNodeGroups),
ConsideredNodeGroups: nodeGroups,
CreateNodeGroupResults: createNodeGroupResults,
PodsTriggeredScaleUp: bestOption.Pods,
PodsAwaitEvaluation: getPodsAwaitingEvaluation(podEquivalenceGroups, bestOption.NodeGroup.Id()),
}, nil
}
// 返回不需要扩容
return &status.ScaleUpStatus{
Result: status.ScaleUpNoOptionsAvailable,
PodsRemainUnschedulable: getRemainingPods(podEquivalenceGroups, skippedNodeGroups),
ConsideredNodeGroups: nodeGroups,
}, nil
}
3.4.1 ScaleUpInfo
计算出来的 ScaleUpInfo 会传给 Clouder Provider,用于操作云资源
需要开通的资源数量 delta = ScaleUpInfo.NewSize - ScaleUpInfo.CurrentSize
代码语言:go复制type ScaleUpInfo struct {
// Group is the group to be scaled-up
Group cloudprovider.NodeGroup
// CurrentSize is the current size of the Group
CurrentSize int
// NewSize is the size the Group will be scaled-up to
NewSize int
// MaxSize is the maximum allowed size of the Group
MaxSize int
}
3.4.2 computeExpansionOption
这里分为两大块逻辑:预检查 背包计算
预检查:
- 遍历所有待调度pod
- 每个 pod 依次去和当前 Node 做预调度,确认一个 Node扩容后可以让 该pod 调度成功
- 将初步筛选出来满足条件的 pod 列表,
背包计算:
通过前面的计算:所有的 pod中,如果扩容一个Node,一定能满足调度条件;但是到底要创建最少多少个 Node,能满足所有的 pod 调度需求,就要经过 Estimate 模块的背包算法了
- 通过给定多个 Pod 和多个 Node,计算出最优的 Node 和 Pod 数量
func computeExpansionOption(...) {
// 遍历每个待调度的 pod
// 用每个 pod 去匹配 node,做模拟调度
for _, eg := range podEquivalenceGroups {
// 校验调度
// 内部调用调度框架
// p.framework.RunPreFilterPlugins(context.TODO(), state, pod)
// filterStatuses := p.framework.RunFilterPlugins(context.TODO(), state, pod, nodeInfo)
// 确认调度状态是否正确
if err := context.PredicateChecker.CheckPredicates(context.ClusterSnapshot, samplePod, nodeInfo.Node().Name); err == nil {
// 返回可以调度的 pod
option.Pods = append(option.Pods, eg.pods...)
// 标记 pod 理论上可以调度成功
eg.schedulable = true
}
// 可以成功调度 pod > 0,开始仿真调度
// 计算需要的 node 数,和可以成功调度的 pod 数
if len(option.Pods) > 0 {
estimator := context.EstimatorBuilder(context.PredicateChecker, context.ClusterSnapshot)
// 调用 Estimate 模块,后面单独介绍
option.NodeCount, option.Pods = estimator.Estimate(option.Pods, nodeInfo, option.NodeGroup)
}
return option, nil
}
3.5 Estimate
前面介绍过,通过上述计算后,所有的 pod中,如果扩容一个Node,一定能满足调度条件;但是到底要创建最少多少个 Node,能满足所有的 pod 调度需求,就要经过 Estimate 模块的背包算法了
3.5.1 优化目标
通过给定多个 Pod 和多个 Node,计算出最优的 Node 和 Pod 数量
输入
- 待调度 pod 列表
- nodeinfo
- NodeGroup
接口
代码语言:go复制type Estimator interface {
Estimate([]*apiv1.Pod, *schedulerframework.NodeInfo, cloudprovider.NodeGroup) (int, []*apiv1.Pod)
}
输出
- 节点数量
- pod列表
3.5.2 实现分析
- 将这组 pod 所需的cpu、memory 资源 / Node节点能提供的资源,计算出每个 pod 的得分
- 按照得分从高到底排序
- 按照由高到低得分顺序,依次遍历每个 pod,去匹配 Node
- 新创建的 Node 保存到 newNodeNames 数组中
- 如果没有找到,就去创建一个新的 Node。直到所有的 pod 都处理完。
func (e *BinpackingNodeEstimator) Estimate(
// 计算 pod 的得分
podInfos := calculatePodScore(pods, nodeTemplate)
// 按照得分排序
sort.Slice(podInfos, func(i, j int) bool { return podInfos[i].score > podInfos[j].score })
// 遍历所有的 pod
for _, podInfo := range podInfos {
found := false
// 确认给定的 pod 是否能调度到 给定的 node 上
// 内部调用调度框架的 preFilter 依次跟每个 node 过滤一遍, p.framework.RunPreFilterPlugins
nodeName, err := e.predicateChecker.FitsAnyNodeMatching(e.clusterSnapshot, podInfo.pod, func(nodeInfo *schedulerframework.NodeInfo) bool {
return newNodeNames[nodeInfo.Node().Name]
})
if err == nil {
// 为 pod 找到合适的 node
found = true
scheduledPods = append(scheduledPods, podInfo.pod)
newNodesWithPods[nodeName] = true
}
if !found {
if lastNodeName != "" && !newNodesWithPods[lastNodeName] {
continue
}
// 添加一个新的 node 进来
newNodeName, err := e.addNewNodeToSnapshot(nodeTemplate, newNodeNameIndex)
newNodeNameIndex
newNodeNames[newNodeName] = true
lastNodeName = newNodeName
// 再次尝试调度
// 如果还是失败:比如设置了 pod 拓扑分布,这种情况无法解决 pending 问题,尝试移除这类 pod
if err := e.predicateChecker.CheckPredicates(e.clusterSnapshot, podInfo.pod, newNodeName); err != nil {
continue
}
if err := e.clusterSnapshot.AddPod(podInfo.pod, newNodeName); err != nil {
klog.Errorf("Error adding pod %v.%v to node %v in ClusterSnapshot; %v", podInfo.pod.Namespace, podInfo.pod.Name, newNodeName, err)
return 0, nil
}
newNodesWithPods[newNodeName] = true
scheduledPods = append(scheduledPods, podInfo.pod)
}
}
return len(newNodesWithPods), scheduledPods
}
3.6 Expander 策略
3.6.1 概述
通过前面的处理,会返回多个 Option 对象,即有多个可选的组合可以满足此次调度需求(可能只是部分 pod,不是全部pod),
Expander 提供多种策略,在这一组答案中最终选择一个作为最终答案。
选择要扩展的节点组提供的不同策略,通过 --expander=least-waste 参数指定,可以多个策略组合
输入
给定多个 option,选择一个最合适的 option。每个 option 对应一个 NodeGroup、需要调度的 pod、Node数量
代码语言:go复制// Option describes an option to expand the cluster.
type Option struct {
NodeGroup cloudprovider.NodeGroup
NodeCount int
Debug string
Pods []*apiv1.Pod
}
接口
代码语言:go复制// Strategy describes an interface for selecting the best option when scaling up
type Strategy interface {
BestOption(options []Option, nodeInfo map[string]*schedulerframework.NodeInfo) *Option
}
输出
最终选定的 Option,即:扩容哪个 NodeGroup、扩容该 NodeGroup 的几台机器
3.6.2 实现分析
策略初始化
代码语言:go复制func ExpanderStrategyFromStrings(...) {
...
// 根据不同的策略,使用不同的 Filter
switch expanderFlag {
case expander.RandomExpanderName:
filters = append(filters, random.NewFilter())
case expander.MostPodsExpanderName:
filters = append(filters, mostpods.NewFilter())
case expander.LeastWasteExpanderName:
filters = append(filters, waste.NewFilter())
case expander.PriceBasedExpanderName:
if _, err := cloudProvider.Pricing(); err != nil {
return nil, err
}
filters = append(filters, price.NewFilter(cloudProvider,
price.NewSimplePreferredNodeProvider(autoscalingKubeClients.AllNodeLister()),
price.SimpleNodeUnfitness))
case expander.PriorityBasedExpanderName:
// It seems other listers do the same here - they never receive the termination msg on the ch.
// This should be currently OK.
stopChannel := make(chan struct{})
lister := kubernetes.NewConfigMapListerForNamespace(kubeClient, stopChannel, configNamespace)
filters = append(filters, priority.NewFilter(lister.ConfigMaps(configNamespace), autoscalingKubeClients.Recorder))
case expander.GRPCExpanderName:
filters = append(filters, grpcplugin.NewFilter(GRPCExpanderCert, GRPCExpanderURL))
default:
return nil, errors.NewAutoscalerError(errors.InternalError, "Expander %s not supported", expanderFlag)
}
if _, ok := filters[len(filters)-1].(expander.Strategy); ok {
strategySeen = true
}
}
// 最后追加一个 random 的 fallback
return newChainStrategy(filters, random.NewStrategy()), nil
}
调用策略
代码语言:go复制func ScaleUp() {
...
bestOption := context.ExpanderStrategy.BestOption(options, nodeInfos)
...
}
// 执行策略
func (c *chainStrategy) BestOption(options []expander.Option, nodeInfo map[string]*schedulerframework.NodeInfo) *expander.Option {
filteredOptions := options
// 依次执行所有的 Filter
for _, filter := range c.filters {
filteredOptions = filter.BestOptions(filteredOptions, nodeInfo)
if len(filteredOptions) == 1 {
return &filteredOptions[0]
}
}
return c.fallback.BestOption(filteredOptions, nodeInfo)
}
3.6.3 Filter 接口
Expander 策略是通过多个 Filter 实现的,Filter 定义了统一的接口,和多种实现
接口定义
代码语言:go复制type Filter interface {
BestOptions(options []Option, nodeInfo map[string]*schedulerframework.NodeInfo) []Option
}
leastwaste 实现
- 将所需资源与可用资源计算差值,得到分数
- 取出分数最小的值
func (l *leastwaste) BestOptions(expansionOptions []expander.Option, nodeInfo map[string]*schedulerframework.NodeInfo) []expander.Option {
var leastWastedScore float64
var leastWastedOptions []expander.Option
for _, option := range expansionOptions {
// 计算所有 pod 总共需要的 cpu、memory 资源
requestedCPU, requestedMemory := resourcesForPods(option.Pods)
// 确认当前的 node group 是否存在
node, found := nodeInfo[option.NodeGroup.Id()]
if !found {
// 不存在就匹配下一个 node group
klog.Errorf("No node info for: %s", option.NodeGroup.Id())
continue
}
// 找到 Node 能够提供的 cpu、memory 资源
// cpu = node.Status.Capacity[cpu]
// memory = node.Status.Capacity[memory]
nodeCPU, nodeMemory := resourcesForNode(node.Node())
// 可用资源 = 单节点资源 * nodeGroup数量
availCPU := nodeCPU.MilliValue() * int64(option.NodeCount)
availMemory := nodeMemory.Value() * int64(option.NodeCount)
// 浪费资源 = (可用资源 - 所需资源)/ 可用资源
wastedCPU := float64(availCPU-requestedCPU.MilliValue()) / float64(availCPU)
wastedMemory := float64(availMemory-requestedMemory.Value()) / float64(availMemory)
// 浪费资源数 = cpu浪费 memory 浪费
wastedScore := wastedCPU wastedMemory
klog.V(1).Infof("Expanding Node Group %s would waste %0.2f%% CPU, %0.2f%% Memory, %0.2f%% Blendedn", option.NodeGroup.Id(), wastedCPU*100.0, wastedMemory*100.0, wastedScore*50.0)
if wastedScore == leastWastedScore {
leastWastedOptions = append(leastWastedOptions, option)
}
// 取浪费分数最小的选项
if leastWastedOptions == nil || wastedScore < leastWastedScore {
leastWastedScore = wastedScore
leastWastedOptions = []expander.Option{option}
}
}
if len(leastWastedOptions) == 0 {
return nil
}
return leastWastedOptions
}
mostpods
代码语言:go复制func (m *mostpods) BestOptions(expansionOptions []expander.Option, nodeInfo map[string]*schedulerframework.NodeInfo) []expander.Option {
var maxPods int
var maxOptions []expander.Option
// 遍历所有的 option
for _, option := range expansionOptions {
if len(option.Pods) == maxPods {
maxOptions = append(maxOptions, option)
}
// 取 pod 数量最大的那个选项
if len(option.Pods) > maxPods {
maxPods = len(option.Pods)
maxOptions = []expander.Option{option}
}
}
if len(maxOptions) == 0 {
return nil
}
return maxOptions
}
random
代码语言:go复制func (r *random) BestOptions(expansionOptions []expander.Option, nodeInfo map[string]*schedulerframework.NodeInfo) []expander.Option {
// 调用 BestOption
best := r.BestOption(expansionOptions, nodeInfo)
if best == nil {
return nil
}
return []expander.Option{*best}
}
func (r *random) BestOption(expansionOptions []expander.Option, nodeInfo map[string]*schedulerframework.NodeInfo) *expander.Option {
if len(expansionOptions) <= 0 {
return nil
}
// 从所有备选 option 中随机选择一个
pos := rand.Int31n(int32(len(expansionOptions)))
return &expansionOptions[pos]
}
priority
代码语言:go复制func (p *priority) BestOptions(expansionOptions []expander.Option, nodeInfo map[string]*schedulerframework.NodeInfo) []expander.Option {
// 读取名为 cluster-autoscaler-priority-expander,key 为 priorities 的 configmap
// 将yaml数据转换为 type priorities map[int][]*regexp.Regexp 对象
priorities, cm, err := p.reloadConfigMap()
// 遍历所有 option
for _, option := range expansionOptions {
// 获取 node group 的 id
id := option.NodeGroup.Id()
found := false
// 遍历所有的优先级
for prio, nameRegexpList := range priorities {
// 优先级列表中匹配当前的 node group id
// 匹配不到就跳过
if !p.groupIDMatchesList(id, nameRegexpList) {
continue
}
found = true
// 当前优先级低就跳过
if prio < maxPrio {
continue
}
// 找到优先级最高那个
if prio > maxPrio {
maxPrio = prio
best = nil
}
best = append(best, option)
}
if !found {
msg := fmt.Sprintf("Priority expander: node group %s not found in priority expander configuration. "
"The group won't be used.", id)
p.logConfigWarning(cm, "PriorityConfigMapNotMatchedGroup", msg)
}
}
// 优先级失效
if len(best) == 0 {
msg := "Priority expander: no priorities info found for any of the expansion options. No options filtered."
p.logConfigWarning(cm, "PriorityConfigMapNoGroupMatched", msg)
return expansionOptions
}
for _, opt := range best {
klog.V(2).Infof("priority expander: %s chosen as the highest available", opt.NodeGroup.Id())
}
return best
}
price
选择成本最小的,依赖 cloud provider 的价格模型,aws cloud provider 没有实现,可以不用考虑
代码语言:go复制// BestOption selects option based on cost and preferred node type.
func (p *priceBased) BestOptions(expansionOptions []expander.Option, nodeInfos map[string]*schedulerframework.NodeInfo) []expander.Option {
var bestOptions []expander.Option
....
}
grpc
代码语言:go复制func (g *grpcclientstrategy) BestOptions(expansionOptions []expander.Option, nodeInfo map[string]*schedulerframework.NodeInfo) []expander.Option {
// 判断 grpcClient 参数是否传入
if g.grpcClient == nil {
klog.Errorf("Incorrect gRPC client config, filtering no options")
return expansionOptions
}
// 调用 grpc 请求
bestOptionsResponse, err := g.grpcClient.BestOptions(ctx, &protos.BestOptionsRequest{Options: grpcOptionsSlice, NodeMap: grpcNodeMap})
...
return options
}
func (c *expanderClient) BestOptions(ctx context.Context, in *BestOptionsRequest, opts ...grpc.CallOption) (*BestOptionsResponse, error) {
out := new(BestOptionsResponse)
err := c.cc.Invoke(ctx, "/grpcplugin.Expander/BestOptions", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
grpc对应 proto 文件
代码语言:text复制// Interface for Expander
service Expander {
rpc BestOptions (BestOptionsRequest)
returns (BestOptionsResponse) {}
}
3.7 缩容实现
3.7.1 缩容概述
缩容和扩容都在同一个定时器中,即默认10s一个检查循环。满足以下所有条件会触发缩容:
- 在改节点上运行的所有 pod 的 cpu、memory的总和 < 节点可分配总额的 50%。(所有参数都可定制)
- 节点上运行的所有 pod(除Daemonset),都可以移动到其他节点(特殊pod可以添加注解禁止CA调度到其他Node)
- Node 没有添加禁用缩减的 Annotation
缩容其他注意事项:
- 一个Node从检查出空闲,持续10min时间内依然空闲,才会被真正移除
- 缩容操作一次之后缩一个,避免不可预期的错误
关键源码实现:
代码语言:go复制func (a *StaticAutoscaler) RunOnce(...){
// 缩容逻辑
if a.ScaleDownEnabled {
// 获取可能将被删除的 Node 列表,只是初步判断 Node 所在 ASG 实例数是否缩容到最小了
scaleDownCandidates := GetScaleDownCandidates()
// 返回某个 Node 被删除后,可能容纳node上 pod 的Node,默认返回所有 nodes
podDestinations := GetPodDestinationCandidates()
// 关键方法,通过各个维度统计出不再需要的 Node
// 更新不再需要的 Node 信息,保存在 scaleDown.unneededNodes 中
scaleDown.UpdateUnneededNodes(podDestinations, scaleDownCandidates)
if scaleDownInCooldown {
// 缩容冷却中
scaleDownStatus.Result = status.ScaleDownInCooldown
} else if scaleDown.nodeDeletionTracker.IsNonEmptyNodeDeleteInProgress() {
// 正在进行缩容过程中
scaleDownStatus.Result = status.ScaleDownInProgress
} else {
// 可以开始缩容
scaleDownStatus := scaleDown.TryToScaleDown(currentTime, pdbs)
}
}
}
3.7.2 源码分析
代码语言:go复制func (a *StaticAutoscaler) RunOnce(currentTime time.Time) errors.AutoscalerError {
// 扩容逻辑,前面已分析
...
// 缩容逻辑
if a.ScaleDownEnabled {
// 特殊处理的 pod
pdbs, err := pdbLister.List()
// 计算不再需要的 node
// 保存待缩容的候选 Node
var scaleDownCandidates []*apiv1.Node
// 保存可以存放被删除 Node上pod的节点
var podDestinations []*apiv1.Node
if a.processors == nil || a.processors.ScaleDownNodeProcessor == nil {
scaleDownCandidates = allNodes
podDestinations = allNodes
} else {
var err errors.AutoscalerError
// 初步筛选处理
scaleDownCandidates, err = a.processors.ScaleDownNodeProcessor.GetScaleDownCandidates(
autoscalingContext, allNodes)
// pod选择新的 node
podDestinations, err = a.processors.ScaleDownNodeProcessor.GetPodDestinationCandidates(autoscalingContext, allNodes)
}
// We use scheduledPods (not originalScheduledPods) here, so artificial scheduled pods introduced by processors
// (e.g unscheduled pods with nominated node name) can block scaledown of given node.
if typedErr := scaleDown.UpdateUnneededNodes(podDestinations, scaleDownCandidates, currentTime, pdbs); typedErr != nil {
scaleDownStatus.Result = status.ScaleDownError
klog.Errorf("Failed to scale down: %v", typedErr)
return typedErr
}
metrics.UpdateDurationFromStart(metrics.FindUnneeded, unneededStart)
if klog.V(4).Enabled() {
for key, val := range scaleDown.unneededNodes {
klog.Infof("%s is unneeded since %s duration %s", key, val.String(), currentTime.Sub(val).String())
}
}
scaleDownInCooldown := a.processorCallbacks.disableScaleDownForLoop ||
a.lastScaleUpTime.Add(a.ScaleDownDelayAfterAdd).After(currentTime) ||
a.lastScaleDownFailTime.Add(a.ScaleDownDelayAfterFailure).After(currentTime) ||
a.lastScaleDownDeleteTime.Add(a.ScaleDownDelayAfterDelete).After(currentTime)
// In dry run only utilization is updated
calculateUnneededOnly := scaleDownInCooldown || scaleDown.nodeDeletionTracker.IsNonEmptyNodeDeleteInProgress()
klog.V(4).Infof("Scale down status: unneededOnly=%v lastScaleUpTime=%s "
"lastScaleDownDeleteTime=%v lastScaleDownFailTime=%s scaleDownForbidden=%v "
"isDeleteInProgress=%v scaleDownInCooldown=%v",
calculateUnneededOnly, a.lastScaleUpTime,
a.lastScaleDownDeleteTime, a.lastScaleDownFailTime, a.processorCallbacks.disableScaleDownForLoop,
scaleDown.nodeDeletionTracker.IsNonEmptyNodeDeleteInProgress(), scaleDownInCooldown)
metrics.UpdateScaleDownInCooldown(scaleDownInCooldown)
if scaleDownInCooldown {
scaleDownStatus.Result = status.ScaleDownInCooldown
} else if scaleDown.nodeDeletionTracker.IsNonEmptyNodeDeleteInProgress() {
scaleDownStatus.Result = status.ScaleDownInProgress
} else {
klog.V(4).Infof("Starting scale down")
// We want to delete unneeded Node Groups only if there was no recent scale up,
// and there is no current delete in progress and there was no recent errors.
removedNodeGroups, err := a.processors.NodeGroupManager.RemoveUnneededNodeGroups(autoscalingContext)
if err != nil {
klog.Errorf("Error while removing unneeded node groups: %v", err)
}
scaleDownStart := time.Now()
metrics.UpdateLastTime(metrics.ScaleDown, scaleDownStart)
// 开始尝试缩容
scaleDownStatus, typedErr := scaleDown.TryToScaleDown(currentTime, pdbs)
metrics.UpdateDurationFromStart(metrics.ScaleDown, scaleDownStart)
metrics.UpdateUnremovableNodesCount(scaleDown.getUnremovableNodesCount())
scaleDownStatus.RemovedNodeGroups = removedNodeGroups
if scaleDownStatus.Result == status.ScaleDownNodeDeleteStarted {
a.lastScaleDownDeleteTime = currentTime
a.clusterStateRegistry.Recalculate()
}
if (scaleDownStatus.Result == status.ScaleDownNoNodeDeleted ||
scaleDownStatus.Result == status.ScaleDownNoUnneeded) &&
a.AutoscalingContext.AutoscalingOptions.MaxBulkSoftTaintCount != 0 {
//
scaleDown.SoftTaintUnneededNodes(allNodes)
}
if a.processors != nil && a.processors.ScaleDownStatusProcessor != nil {
scaleDownStatus.SetUnremovableNodesInfo(scaleDown.unremovableNodeReasons, scaleDown.nodeUtilizationMap, scaleDown.context.CloudProvider)
a.processors.ScaleDownStatusProcessor.Process(autoscalingContext, scaleDownStatus)
scaleDownStatusProcessorAlreadyCalled = true
}
if typedErr != nil {
klog.Errorf("Failed to scale down: %v", typedErr)
a.lastScaleDownFailTime = currentTime
return typedErr
}
}
}
return nil
}
GetScaleDownCandidates
这一步只是判断哪些 Node 节点所在的 ASG 符合要求
代码语言:go复制func (n *PreFilteringScaleDownNodeProcessor) GetScaleDownCandidates(ctx *context.AutoscalingContext,
nodes []*apiv1.Node) ([]*apiv1.Node, errors.AutoscalerError) {
result := make([]*apiv1.Node, 0, len(nodes))
// 获取每个 asg 当前的实例个数,保存为 map
nodeGroupSize := utils.GetNodeGroupSizeMap(ctx.CloudProvider)
// 遍历所有 node
for _, node := range nodes {
// 获取当前 node 所属的 asg
nodeGroup, err := ctx.CloudProvider.NodeGroupForNode(node)
// 获取当前 asg 的实例数
size, found := nodeGroupSize[nodeGroup.Id()]
// 获取 asg 的最小实例数。当前实例数已经最小了,就跳过不再缩容
if size <= nodeGroup.MinSize() {
klog.V(1).Infof("Skipping %s - node group min size reached", node.Name)
continue
}
// 追加到结果中
result = append(result, node)
}
return result, nil
}
GetPodDestinationCandidates
默认返回所有的 Node
代码语言:go复制func (n *PreFilteringScaleDownNodeProcessor) GetPodDestinationCandidates(ctx *context.AutoscalingContext,
nodes []*apiv1.Node) ([]*apiv1.Node, errors.AutoscalerError) {
return nodes, nil
}
UpdateUnneededNodes
计算不再需要的node,从以下维度逐一排查:
- 所有的 pod 可以被调度到其他节点
- 资源使用率低于某个阈值
- 其他判断
找到可以移除的节点,放到 unneededNodes 数组中,便于后面移除
代码语言:go复制// destinationNodes:可以用来安置由于缩容导致被驱逐的pod的节点
// scaleDownCandidates:可以考虑缩容的节点
func (sd *ScaleDown) UpdateUnneededNodes(
destinationNodes []*apiv1.Node,
scaleDownCandidates []*apiv1.Node,
timestamp time.Time,
pdbs []*policyv1.PodDisruptionBudget,
) errors.AutoscalerError {
// 第一步:计算节点资源利用率(只计算被管理的节点)
for _, node := range scaleDownCandidates {
// 获取节点信息
nodeInfo, err := sd.context.ClusterSnapshot.NodeInfos().Get(node.Name)
// 检查节点情况,是否满足缩容
// 1. 节点是否最近已经被标记为删除,这种节点打了 ToBeDeletedByClusterAutoscaler 的 taint
// 2. 节点是否有 cluster-autoscaler.kubernetes.io/scale-down-disabled 这个禁止缩容的标签
// 3. CalculateUtilization 计算资源使用率:累加所有 pod 上容器设置的 cpu、memroy request值
// 4. isNodeBelowUtilizationThreshold 判断资源使用是否达到阈值(可启动时配置)
reason, utilInfo := sd.checkNodeUtilization(timestamp, node, nodeInfo)
// 保存可以被删除的节点
currentlyUnneededNodeNames = append(currentlyUnneededNodeNames, node.Name)
}
// 第二步:将候选缩容节点和其他节点分开
currentCandidates, currentNonCandidates := sd.chooseCandidates(currentlyUnneededNonEmptyNodes)
// 找到新节点,用于移除候选节点
nodesToRemove, unremovable, newHints, simulatorErr := simulator.FindNodesToRemove(
currentCandidates,
destinations,
nil,
sd.context.ClusterSnapshot,
sd.context.PredicateChecker,
len(currentCandidates),
true,
sd.podLocationHints,
sd.usageTracker,
timestamp,
pdbs)
// additionalCandidatesCount 表示用于缩容额外的备选节点数量
additionalCandidatesCount := sd.context.ScaleDownNonEmptyCandidatesCount - len(nodesToRemove)
if additionalCandidatesCount > len(currentNonCandidates) {
additionalCandidatesCount = len(currentNonCandidates)
}
// 限制并发缩容数量
additionalCandidatesPoolSize := int(math.Ceil(float64(len(allNodeInfos)) * sd.context.ScaleDownCandidatesPoolRatio))
if additionalCandidatesPoolSize < sd.context.ScaleDownCandidatesPoolMinCount {
additionalCandidatesPoolSize = sd.context.ScaleDownCandidatesPoolMinCount
}
if additionalCandidatesPoolSize > len(currentNonCandidates) {
additionalCandidatesPoolSize = len(currentNonCandidates)
}
if additionalCandidatesCount > 0 {
// 找到新节点,用于移除候选节点
additionalNodesToRemove, additionalUnremovable, additionalNewHints, simulatorErr :=
simulator.FindNodesToRemove(
currentNonCandidates[:additionalCandidatesPoolSize],
destinations,
nil,
sd.context.ClusterSnapshot,
sd.context.PredicateChecker,
additionalCandidatesCount,
true,
sd.podLocationHints,
sd.usageTracker,
timestamp,
pdbs)
}
// 将待移除节点保存到 unneededNodes 数组中
for _, node := range nodesToRemove {
name := node.Node.Name
unneededNodesList = append(unneededNodesList, node.Node)
if val, found := sd.unneededNodes[name]; !found {
result[name] = timestamp
} else {
result[name] = val
}
}
...
}
TryToScaleDown
代码语言:go复制func (sd *ScaleDown) TryToScaleDown(
currentTime time.Time,
pdbs []*policyv1.PodDisruptionBudget,
) (*status.ScaleDownStatus, errors.AutoscalerError) {
...
// 遍历待删除 node 列表
for nodeName, unneededSince := range sd.unneededNodes {
// 获取 nodeinfo、node 信息
nodeInfo, err := sd.context.ClusterSnapshot.NodeInfos().Get(nodeName)
node := nodeInfo.Node()
// 检查 node 是否打上了禁止删除的 annotation
if hasNoScaleDownAnnotation(node) {
klog.V(4).Infof("Skipping %s - scale down disabled annotation found", node.Name)
sd.addUnremovableNodeReason(node, simulator.ScaleDownDisabledAnnotation)
continue
}
// 获取 node 状态,根据状态做一些处理
ready, _, _ := kube_util.GetReadinessState(node)
// 计算缩容资源
scaleDownResourcesDelta, err := sd.computeScaleDownResourcesDelta(sd.context.CloudProvider, node, nodeGroup, resourcesWithLimits)
// 检查资源限制
checkResult := scaleDownResourcesLeft.checkScaleDownDeltaWithinLimits(scaleDownResourcesDelta)
...
candidateNames = append(candidateNames, node.Name)
candidateNodeGroups[node.Name] = nodeGroup
}
// 寻找一个待移除节点
nodesToRemove, unremovable, _, err := simulator.FindNodesToRemove(
candidateNames,
nodesWithoutMasterNames,
sd.context.ListerRegistry,
sd.context.ClusterSnapshot,
sd.context.PredicateChecker,
1,
false,
sd.podLocationHints,
sd.usageTracker,
time.Now(),
pdbs)
// 计算时差
nodeDeletionDuration = time.Now().Sub(nodeDeletionStart)
sd.nodeDeletionTracker.SetNonEmptyNodeDeleteInProgress(true)
go func() {
...
// 删除节点
result = sd.deleteNode(toRemove.Node, toRemove.PodsToReschedule, toRemove.DaemonSetPods, nodeGroup)
}
}
四、CA 使用注意
aws官方说明
4.1 asg 自动发现参数配置
- 为 AutoScaling 设置两个标签,便于 CA 自动发现
- 关于跨可用区:
- 可以设置多个 AutoScaling 组,每个组一个可用区,通过开启--balance-similar-node-groups` 功能。注意:需要为不同的组设置相同的一批标签
- 也可以设置同一个 AutoScaling 组,但是必须将组设置可跨多个可用区
- 更推荐使用多个 AutoScaling 组
4.2 优化节点组:
- 节点组中的每个节点必须具有相同的调度属性,包括标签、污点和资源
- 策略中指定的第一个实例类型模拟调度。
- 如果您的策略具有拥有更多资源的其他实例类型,则在横向扩展后可能会浪费资源。
- 如果您的策略具有其他实例类型,其资源比原始实例类型少,则 Pod 在实例上调度可能失败。
- 请使用较多节点配置较少数量的节点组,因为相反的配置可能会对可扩展性产生不利影响。
4.3 AutoScaling
- 混合实例策略:支持多个实例类型,配置时推荐使用相似的资源类型:比如:M4
、
M5、
M5a,和
M5n - 可以通过 configmap 设置不同 AutoScaling 的优先级
- AutoScaling 的机型也支持权重
- 支持启动配置、启动模板两种模式
- 启动模板里面指定机型
- 启动模板覆盖项支持配置多个机型
4.4 Expander 策略
选择要扩展的节点组提供的不同策略,通过 --expander=least-waste 参数指定
可选参数包括:
- random:随机选择
- most-pods:能满足最多 pod 调度的
- Least-waste:最少 cpu 和 memroy
- Price:成本最小
- priority:按用户指定的优先级
- grpc:调用外部 grpc 服务选择扩容节点
4.5 超额配置
- 通过配置一个空的Deployment,占用资源,如果资源不足优先驱逐,达到尽快扩容的目的
4.6 防止pod被驱逐
配置 `cluster-autoscaler.kubernetes.io/safe-to-evict=false 注解,可以确保 pod不被驱逐,pod所在 node 不被缩减