K8S-Node自动扩容项目CA源码分析(下)

2022-07-02 22:01:53 浏览数 (1)

三、主流程

关键代码逻辑,梳理成流程图,可以对照查看。高清图

CA源码分析.jpgCA源码分析.jpg

3.1 main 启动入口

代码语言:go复制
func main() {
  // 选取leader
 	leaderElection := defaultLeaderElectionConfiguration()
	leaderElection.LeaderElect = true

  go func() {
    // 注册指标、快照、监控检查接口
		pathRecorderMux := mux.NewPathRecorderMux("cluster-autoscaler")
		defaultMetricsHandler := legacyregistry.Handler().ServeHTTP
		pathRecorderMux.HandleFunc("/metrics", func(w http.ResponseWriter, req *http.Request) {
			defaultMetricsHandler(w, req)
		})
		if *debuggingSnapshotEnabled {
			pathRecorderMux.HandleFunc("/snapshotz", debuggingSnapshotter.ResponseHandler)
		}
		pathRecorderMux.HandleFunc("/health-check", healthCheck.ServeHTTP)

		err := http.ListenAndServe(*address, pathRecorderMux)
	}()

	if !leaderElection.LeaderElect {
		run(healthCheck, debuggingSnapshotter)
	} else {
    ...
    // 入口函数
    run(healthCheck, debuggingSnapshotter)
    ...
  }
}

3.2 run

  • 初始化 autoscaler
  • 调用 autoscaler.Start,后台刷新缓存
  • 间隔执行(默认10s)autoscaler.RunOnce 方法,实现扩缩容逻辑
代码语言:go复制
func run(healthCheck *metrics.HealthCheck, debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter) {
	metrics.RegisterAll(*emitPerNodeGroupMetrics)
	// 构造 autoscaler 对象
	autoscaler, err := buildAutoscaler(debuggingSnapshotter)
	...
  // 在后台启动 autoscaler
	if err := autoscaler.Start(); err != nil {
		klog.Fatalf("Failed to autoscaler background components: %v", err)
	}

	// Autoscale ad infinitum.
	for {
		select {
    // 默认 10s 执行一次
		case <-time.After(*scanInterval):
			{
				...
        // 开始执行
				err := autoscaler.RunOnce(loopStart)
				...
			}
		}
	}
}

3.3 autoscaler

3.3.1 buildAutoscaler
代码语言:go复制
func buildAutoscaler(debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter) (core.Autoscaler, error) {
	...
  // Create autoscaler.
	return core.NewAutoscaler(opts)
}
3.3.2 NewAutoscaler
代码语言:go复制
func NewAutoscaler(opts AutoscalerOptions) (Autoscaler, errors.AutoscalerError) {
	// 这个方法主要是做一下初始化工作,其中 provider 的初始化在介绍 aws provider 创建流程时介绍过
  // 内部会初始化 awsManager、asgCache,并从云厂商同步 asg信息到本地缓存
  err := initializeDefaultOptions(&opts)
	if err != nil {
		return nil, errors.ToAutoscalerError(errors.InternalError, err)
	}
  // 实例化 AutoScaler
	return NewStaticAutoscaler(
		opts.AutoscalingOptions,
		opts.PredicateChecker,
		opts.ClusterSnapshot,
		opts.AutoscalingKubeClients,
		opts.Processors,
		opts.CloudProvider,
		opts.ExpanderStrategy,
		opts.EstimatorBuilder,
		opts.Backoff,
		opts.DebuggingSnapshotter), nil
}
initializeDefaultOptions

初始化的内容有:

  • Processor:各种前置处理器
  • PredicateChecker:扩容前的预调度检查
  • CloudProvider:前面介绍过,主要用于操作 IaaS 云资源
  • Estimator:评估扩容节点
  • Expander:从多个符合扩容条件的 NodeGroup 中选择最终 Node 的策略
代码语言:go复制
func initializeDefaultOptions(opts *AutoscalerOptions) error {
	// 初始化 processor
  if opts.Processors == nil {
		opts.Processors = ca_processors.DefaultProcessors()
	}
	if opts.AutoscalingKubeClients == nil {
		opts.AutoscalingKubeClients = context.NewAutoscalingKubeClients(opts.AutoscalingOptions, opts.KubeClient, opts.EventsKubeClient)
	}
  // 初始化前置校验
	if opts.PredicateChecker == nil {
		predicateCheckerStopChannel := make(chan struct{})
		predicateChecker, err := simulator.NewSchedulerBasedPredicateChecker(opts.KubeClient, predicateCheckerStopChannel)
		if err != nil {
			return err
		}
		opts.PredicateChecker = predicateChecker
	}
  // 初始化快照
	if opts.ClusterSnapshot == nil {
		opts.ClusterSnapshot = simulator.NewBasicClusterSnapshot()
	}
  // 初始化 provider
	if opts.CloudProvider == nil {
		opts.CloudProvider = cloudBuilder.NewCloudProvider(opts.AutoscalingOptions)
	}
  // 初始化 expander 策略
	if opts.ExpanderStrategy == nil {
		expanderStrategy, err := factory.ExpanderStrategyFromStrings(strings.Split(opts.ExpanderNames, ","), opts.CloudProvider,
			opts.AutoscalingKubeClients, opts.KubeClient, opts.ConfigNamespace, opts.GRPCExpanderCert, opts.GRPCExpanderURL)
		if err != nil {
			return err
		}
		opts.ExpanderStrategy = expanderStrategy
	}
  // 初始化 Estimate 策略
	if opts.EstimatorBuilder == nil {
		estimatorBuilder, err := estimator.NewEstimatorBuilder(opts.EstimatorName, estimator.NewThresholdBasedEstimationLimiter(opts.MaxNodesPerScaleUp, opts.MaxNodeGroupBinpackingDuration))
		if err != nil {
			return err
		}
		opts.EstimatorBuilder = estimatorBuilder
	}
  // 初始化 Backoff 策略
	if opts.Backoff == nil {
		opts.Backoff =
			backoff.NewIdBasedExponentialBackoff(opts.InitialNodeGroupBackoffDuration, opts.MaxNodeGroupBackoffDuration, opts.NodeGroupBackoffResetTimeout)
	}

	return nil
}
3.3.3 Autoscaler.Start

定时从 cloud provider 获取 node group 以及 node group 下的 instance 信息,并刷新本地缓存

代码语言:go复制
func (a *StaticAutoscaler) Start() error {
	a.clusterStateRegistry.Start()
	return nil
}

func (csr *ClusterStateRegistry) Start() {
	csr.cloudProviderNodeInstancesCache.Start(csr.interrupt)
}

// 后台定时刷新数据
func (cache *CloudProviderNodeInstancesCache) Start(interrupt chan struct{}) {
	go wait.Until(func() {
		cache.Refresh()
	}, CloudProviderNodeInstancesCacheRefreshInterval, interrupt)
}
Refresh
代码语言:go复制
// Refresh refreshes cache.
func (cache *CloudProviderNodeInstancesCache) Refresh() {
	// 从 cloud provider 获取 node group
  // 调用 cloud provider 的第一个扩展点
	nodeGroups := cache.cloudProvider.NodeGroups()
  // 移除不存在的 node group
	cache.removeEntriesForNonExistingNodeGroupsLocked(nodeGroups)
	for _, nodeGroup := range nodeGroups {
    // 调用 cloud provider 中 node group 接口扩展点
		nodeGroupInstances, err := nodeGroup.Nodes()
		// 更新缓存中的 node group
		cache.updateCacheEntryLocked(nodeGroup, &cloudProviderNodeInstancesCacheEntry{nodeGroupInstances, time.Now()})
	}
}
3.3.4 Autoscaler.RunOnce

关键逻辑:

  • 获取现有集群所有 Node,以及Node上运行的Pod信息
  • 经过几个 Processor 模块处理,将 Node 和 Pod 做分类规整到所属 asgCache 中的 各个asg 下,保存在 nodeInfosForGroups 中
  • 获取所有资源不足导致 pending 的 pod
  • 经过几个 Processor 模块处理,将未调度 pod 做处理,保存在 unschedulablePodsToHelp 中
  • 根据 unschedulablePodsToHelp 判断当前是否需要执行 ScaleUp 进行扩容
  • 根据是否配置了缩容参数ScaleDownEnabled,判断是否要进行缩容

核心逻辑伪代码

代码语言:go复制
func (a *StaticAutoscaler) RunOnce() {
  // 获取节点信息
  allNodes, readyNodes := a.obtainNodeLists(a.CloudProvider)
  // 将 Node 信息按照 asg 的id做分类规整
  nodeInfosForGroups := a.processors.TemplateNodeInfoProvider.Process(...)
  // 获取未调度的 pod
  unschedulablePods, err := unschedulablePodLister.List()
  // pod按调度类型分类
  unschedulablePodsToHelp := a.processors.PodListProcessor.Process(unschedulablePods)
  
  if len(unschedulablePodsToHelp) == 0 {
		// 不需要扩容
	} else if a.MaxNodesTotal > 0 && len(readyNodes) >= a.MaxNodesTotal {
		// 扩容达到上限
	} else if allPodsAreNew(unschedulablePodsToHelp, currentTime) {
		// Node 扩容过程中,pod 新创建,等待一定冷却周期再尝试扩容
	} else {
    // 启动扩容
		ScaleUp()
  }
  
  // 如果开启缩容
  if a.ScaleDownEnabled {
    // 缩容逻辑
  }
}

RunOnce 实现细节如下:

代码语言:go复制
func (a *StaticAutoscaler) RunOnce(currentTime time.Time) errors.AutoscalerError {
 	...
  // 初始化获取未调度 pod的对象
  unschedulablePodLister := a.UnschedulablePodLister()

  // 获取所有的 node、ready 的node
  allNodes, readyNodes, typedErr := a.obtainNodeLists(a.CloudProvider)
  originalScheduledPods, err := scheduledPodLister.List()
  // 计算集群资源,获取 node.Status.Capacity[resource] 的值
  coresTotal, memoryTotal := calculateCoresMemoryTotal(allNodes, currentTime)

  // 获取 ds 相关pod,后期加入调度器参与模拟调度
	daemonsets, err := a.ListerRegistry.DaemonSetLister().List(labels.Everything())
	// 手动刷新云资源,保持与本地缓存同步
  err = a.AutoscalingContext.CloudProvider.Refresh()
  
  // 找到 pod.Spec.Priority 值小于设定值 ExpendablePodsPriorityCutoff 的 pod
  // 这些 pod 优先级高,不可以被 expend
  nonExpendableScheduledPods := core_utils.FilterOutExpendablePods(originalScheduledPods, a.ExpendablePodsPriorityCutoff)

  // TemplateNodeInfo
  // 将所有运行的 pod,按照不同的 node分类存储好,构造出 NodeInfo 对象,为后续调度准备数据
  // 取出 pod.Spec.NodeName, 依次存储好
  // 依次调用了 
  // 1. MixedTemplateNodeInfoProvider
  // 2. AnnotationNodeInfoProvider
  nodeInfosForGroups, autoscalerError := a.processors.TemplateNodeInfoProvider.Process(autoscalingContext, readyNodes, daemonsets, a.ignoredTaints, currentTime)

  // NodeInfoProcessor
	nodeInfosForGroups, err = a.processors.NodeInfoProcessor.Process(autoscalingContext, nodeInfosForGroups)

  // 获取未注册的 node(在 CA node group 中,但是未注册到 k8s)
  unregisteredNodes := a.clusterStateRegistry.GetUnregisteredNodes()
  if len(unregisteredNodes) > 0 {
		// 删除这些 node
		removedAny, err := removeOldUnregisteredNodes(unregisteredNodes, autoscalingContext,
			a.clusterStateRegistry, currentTime, autoscalingContext.LogRecorder)
	}
  danglingNodes, err := a.deleteCreatedNodesWithErrors()
  
  // 调整 node group size
  fixedSomething, err := fixNodeGroupSize(autoscalingContext, a.clusterStateRegistry, currentTime)

  // 获取未调度 pod
  // 未调度 pod 的排查规则:
  // selector := fields.ParseSelectorOrDie("spec.nodeName=="   ""   ",status.phase!="  
	//	string(apiv1.PodSucceeded)   ",status.phase!="   string(apiv1.PodFailed))
  unschedulablePods, err := unschedulablePodLister.List()

  unschedulablePodsToHelp, _ := a.processors.PodListProcessor.Process(a.AutoscalingContext, unschedulablePods)
	unschedulablePodsToHelp = a.filterOutYoungPods(unschedulablePodsToHelp, currentTime)
  
  // 根据未调度的 pod 数量,判断是否需要扩容
	if len(unschedulablePodsToHelp) == 0 {
    // 没有未调度的 pod,不需要扩容
		scaleUpStatus.Result = status.ScaleUpNotNeeded
		klog.V(1).Info("No unschedulable pods")
	} else if a.MaxNodesTotal > 0 && len(readyNodes) >= a.MaxNodesTotal {
    /// 已经达到扩容上限
		scaleUpStatus.Result = status.ScaleUpNoOptionsAvailable
		klog.V(1).Info("Max total nodes in cluster reached")
	} else if allPodsAreNew(unschedulablePodsToHelp, currentTime) {
    // 大量 pod 同时创建,一段时间内不再触发扩容,处于冷却期
		a.processorCallbacks.DisableScaleDownForLoop()
		scaleUpStatus.Result = status.ScaleUpInCooldown
		klog.V(1).Info("Unschedulable pods are very new, waiting one iteration for more")
	} else {
		scaleUpStart := time.Now()
		metrics.UpdateLastTime(metrics.ScaleUp, scaleUpStart)
		// 真正执行扩容动作
		scaleUpStatus, typedErr = ScaleUp(autoscalingContext, a.processors, a.clusterStateRegistry, unschedulablePodsToHelp, readyNodes, daemonsets, nodeInfosForGroups, a.ignoredTaints)
	}
}
MixedTemplateNodeInfoProvider

将所有的 Node,以及 Node 上运行的 pod,按照 asg 的 id归类保存

代码语言:go复制
func (p *MixedTemplateNodeInfoProvider) Process(ctx *context.AutoscalingContext, nodes []*apiv1.Node, daemonsets []*appsv1.DaemonSet, ignoredTaints taints.TaintKeySet, now time.Time) (map[string]*schedulerframework.NodeInfo, errors.AutoscalerError) {
	...
  // 获取 node 上运行的所有的 pod,key是 node-name,value 是 pod 列表
	podsForNodes, err := getPodsForNodes(ctx.ListerRegistry)
  
  // 定义一个回调函数,处理 node 节点
	processNode := func(node *apiv1.Node) (bool, string, errors.AutoscalerError) {
    // 根据 node信息,调用 clouder provider 扩展点,获取 node group 信息
    // aws: 根据 node.Spec.ProviderID 调用 aws-sdk 获取
		nodeGroup, err := ctx.CloudProvider.NodeGroupForNode(node)
		// 得到 node group 的 id
		id := nodeGroup.Id()
		if _, found := result[id]; !found {
			// 根据给定的node,构造节点信息,确认是否是需要创建的 node
      // getRequiredPodsForNode:将 node 上的 dameonset pod 单独取出来(新节点也必须要运行这些 pod)
      // schedulerframework.NewNodeInfo: 构造新的 node 信息,都是调度框架的函数
      //    pInfo.Update(pod):计算 pod 的亲和性信息
      //    n.AddPodInfo(...):计算 cpu、memory、端口占用、pvc 引用等信息
			nodeInfo, err := simulator.BuildNodeInfoForNode(node, podsForNodes)
			if err != nil {
				return false, "", err
			}
      // 修改从 template 中生成的 node 信息,避免使用了重复的主机名
      //    sanitizeTemplateNode:根据 node group 规则自动生成主机名、新增 trait 信息
      //    schedulerframework.NewNodeInfo:更新完主机信息后,再次调用调度框架
			sanitizedNodeInfo, err := utils.SanitizeNodeInfo(nodeInfo, id, ignoredTaints)
			
			result[id] = sanitizedNodeInfo
			return true, id, nil
		}
		return false, "", nil
	}
  
  // 从 Node Group 中扩展新的节点
	for _, nodeGroup := range ctx.CloudProvider.NodeGroups() {
    // nodeGroup.TemplateNodeInfo() 获取 节点模板
    // daemonset.GetDaemonSetPodsForNode: 根据 ds 和 node 返回 pod
    //  schedulerframework.NewNodeInfo:构造完整的 pod
  	nodeInfo, err := utils.GetNodeInfoFromTemplate(nodeGroup, daemonsets, ctx.PredicateChecker, ignoredTaints)
		result[id] = nodeInfo
  }
}
AnnotationNodeInfoProvider

从 asg 中获取注解信息,并追加到 NodeInfo 中,便于后续参与调度

代码语言:go复制
func (p *AnnotationNodeInfoProvider) Process(ctx *context.AutoscalingContext, nodes []*apiv1.Node, daemonsets []*appsv1.DaemonSet, ignoredTaints taints.TaintKeySet, currentTime time.Time) (map[string]*schedulerframework.NodeInfo, errors.AutoscalerError) {
  // 先经过 mixedTemplateNodeInfoProvider 处理
	nodeInfos, err := p.mixedTemplateNodeInfoProvider.Process(ctx, nodes, daemonsets, ignoredTaints, currentTime)
	
	for _, nodeInfo := range nodeInfos {
    // 拿到所有的 node group
		nodeGroup, err := ctx.CloudProvider.NodeGroupForNode(nodeInfo.Node())
		// 获取  node  group 模板
		template, err := nodeGroup.TemplateNodeInfo()
		// 获取模板 Annotation 信息
		for key, val := range template.Node().Annotations {
			if _, ok := nodeInfo.Node().Annotations[key]; !ok {
        // 将模板 annotation 添加到 node 上
				nodeInfo.Node().Annotations[key] = val
			}
		}
	}
	return nodeInfos, nil
}

3.4 ScaleUp

  • 找到 cloud provider 所有可用的 node group
  • 把不可调度的 pod 按照扩容需求进行分组
  • 调用
  • 将前两步得到的数据作为输入,传给 estimator 模块的装箱算法,得到候选的 pod、node 分配方案
  • 将上一步得到的结果,传给 expander 模块,得到最优的分配方案。默认提供好几种策略

伪代码实现关键步骤:

代码语言:go复制
// 前面的动作,将集群所有 Node 和 Pod 做规整,构造 NodeInfo 信息,按 nodeGroupId 建立索引
nodeInfosForGroups, autoscalerError := a.processors.TemplateNodeInfoProvider.Process(...)

func ScaleUp(...) {
  // 获取所有可用的 node group
 	nodeGroups := context.CloudProvider.NodeGroups()
  // 将所有待调度的 pod 按照调度属性分类
  podEquivalenceGroups := buildPodEquivalenceGroups(unschedulablePods)
  expansionOptions := make(map[string]expander.Option, 0)
  // 遍历所有的 node group
  for _, nodeGroup := range nodeGroups {
    // 获取当前 node group 的 nodeInfo 信息
    nodeInfo, found := nodeInfos[nodeGroup.Id()]
    // 计算当前 asg 能够提供的cpu、memory 资源量,确认是否超过限额
    scaleUpResourcesDelta := computeScaleUpResourcesDelta(nodeInfo, nodeGroup, resourceLimiter)
    // 调用 Extimate 模块背包算法,计算出当前 node group 下需要扩展几台 Node,能满足哪些 pod 调度,保存在 option 变量中 
    option, err := computeExpansionOption(podEquivalenceGroups, nodeGroup, nodeInfo, upcomingNodes)
    // 将该 NodeGroup 扩容的情况保存起来
    expansionOptions[nodeGroup.Id()] = option
  }
  // 计算结果重命名为 options
  // 此时有多个满足条件的结果
  options := expansionOptions
  // 调用 Expander 模块,根据启动传入的策略参数,从多个选项中选择最终一个结果
  bestOption := context.ExpanderStrategy.BestOption(options, nodeInfos)
  
  // 如果 NodeGroup 不存在,创建 NodeGroup
  processors.NodeGroupManager.CreateNodeGroup(context, bestOption.NodeGroup)
  
  // 负载均衡策略计算多个 NodeGroup中各自需要扩容的信息
  scaleUpInfos, typedErr := processors.NodeGroupSetProcessor.BalanceScaleUpBetweenGroups()
  for _, info := range scaleUpInfos {
    // 调用 provider 执行扩容
    executeScaleUp(...)
    // 负载均衡
    clusterStateRegistry.Recalculate()
  }
}

ScaleUp实现细节如下:

代码语言:go复制
func ScaleUp(...) {
  ...
  // 第一步:找到 cloud provider 所有可用的 node group
  // 返回 node 列表中不在 node group 中的 node 子集
  nodesFromNotAutoscaledGroups, err := utils.FilterOutNodesFromNotAutoscaledGroups(nodes, context.CloudProvider)
  
  // 计算扩容可用的剩余资源
  //  calculateScaleUpCoresMemoryTotal:计算 node group 和非 Node group 所有的资源
  //  sum(nodeGroup.targetSize * nodeInfo.cpu(memory) )
  // computeBelowMax(totalCores, max):根据 CA 配置的资源限额 - 目前所有已使用资源 = 可扩容的资源余量
	scaleUpResourcesLeft, errLimits := computeScaleUpResourcesLeftLimits(context, processors, nodeGroups, nodeInfos, nodesFromNotAutoscaledGroups, resourceLimiter)

  // Node在NodeGroup中但是没有Registed在kubenetes集群
  // 数量为 newNodes := ar.CurrentTarget - (readiness.Ready   readiness.Unready   readiness.LongUnregistered)
  upcomingNodes := make([]*schedulerframework.NodeInfo, 0)
	for nodeGroup, numberOfNodes := range clusterStateRegistry.GetUpcomingNodes() {
    
  }
  
  processors.NodeGroupListProcessor.Process(...)
  
  // 第二步:将所有待调度 pod 按照扩容需求分类
  podEquivalenceGroups := buildPodEquivalenceGroups(unschedulablePods)
  
  // 评估所有的 node group,哪些不可用跳过,哪些可用
  skippedNodeGroups := map[string]status.Reasons{}
  
  // 外层循环,遍历所有的 NodeGroup
	for _, nodeGroup := range nodeGroups {
    if nodeGroup.Exist() && !clusterStateRegistry.IsNodeGroupSafeToScaleUp(nodeGroup, now) {
      ...
    }
    // 取出当前大小
    currentTargetSize, err := nodeGroup.TargetSize()
    
    // 计算扩容需要的增量资源
    // 取出 node group 下对应的 cpu、memory 信息
    scaleUpResourcesDelta, err := computeScaleUpResourcesDelta(context, processors, nodeInfo, nodeGroup, resourceLimiter)
    
    // 校验是否超过限额,对比可扩容量和待扩容量
    checkResult := scaleUpResourcesLeft.checkScaleUpDeltaWithinLimits(scaleUpResourcesDelta)

    // 第三步:将前两步得到的数据作为输入,传给 estimator 模块的装箱算法,得到候选的 pod、node 分配方案
    option, err := computeExpansionOption(context, podEquivalenceGroups, nodeGroup, nodeInfo, upcomingNodes)
  }

  // 第四步:将上一步得到的结果,传给 expander 模块,得到最优的分配方案
  //  根据expansion (random ,mostpods, price,waste)配置获取最佳伸缩组
  // expander 是表示选择 node group 的策略
  bestOption := context.ExpanderStrategy.BestOption(options, nodeInfos)
  if bestOption != nil && bestOption.NodeCount > 0 {
    
    // 得到需要扩容的节点数
    newNodes := bestOption.NodeCount
    // 判断是否达到扩容上限
    if context.MaxNodesTotal > 0 && len(nodes) newNodes len(upcomingNodes) > context.MaxNodesTotal {
    }
    
    // 不存在 node group,创建新的
    if !bestOption.NodeGroup.Exist() {
      // 创建的 ng 包括主的、扩展的
      createNodeGroupResult, err := processors.NodeGroupManager.CreateNodeGroup(context, bestOption.NodeGroup)
      
      // 根据主 ng 创建 nodeinfo
      // 将 daemonset 追加到到 node group 模板创建出来的 node节点 pod 列表中
      // trait 信息追加到新创建 node 的 Spec.Trait 中
      // 填充 node name
      mainCreatedNodeInfo, err := utils.GetNodeInfoFromTemplate(createNodeGroupResult.MainCreatedNodeGroup, daemonSets, context.PredicateChecker, ignoredTaints)
      // 依次创建多个扩展的 ng
			for _, nodeGroup := range createNodeGroupResult.ExtraCreatedNodeGroups {
        option, err2 := computeExpansionOption(context, podEquivalenceGroups, nodeGroup, nodeInfo, upcomingNodes)

      }
      
      // 重新计算缓存中节点信息
      clusterStateRegistry.Recalculate()
      
       // 计算出要扩容的资源
  		newNodes, err = applyScaleUpResourcesLimits(context, processors, newNodes, scaleUpResourcesLeft, nodeInfo, bestOption.NodeGroup, resourceLimiter)
    
      if context.BalanceSimilarNodeGroups {
        // 找到相似的 ng
				similarNodeGroups, typedErr := processors.NodeGroupSetProcessor.FindSimilarNodeGroups(context, bestOption.NodeGroup, nodeInfos)
      }
      
      // 平衡多个 ng
      scaleUpInfos, typedErr := processors.NodeGroupSetProcessor.BalanceScaleUpBetweenGroups(
			context, targetNodeGroups, newNodes)
      
      // 依次遍历所有待扩容的机器,执行扩容操作
      for _, info := range scaleUpInfos {
        // executeScaleUp 会执行 clouder provider 中的 IncreaseSize 方法
			typedErr := executeScaleUp(context, clusterStateRegistry, info, gpu.GetGpuTypeForMetrics(gpuLabel, availableGPUTypes, nodeInfo.Node(), nil), now)
			}
      
     	clusterStateRegistry.Recalculate()
      // 返回扩容成功
			return &status.ScaleUpStatus{
        Result:                  status.ScaleUpSuccessful,
        ScaleUpInfos:            scaleUpInfos,
        PodsRemainUnschedulable: getRemainingPods(podEquivalenceGroups, skippedNodeGroups),
        ConsideredNodeGroups:    nodeGroups,
        CreateNodeGroupResults:  createNodeGroupResults,
        PodsTriggeredScaleUp:    bestOption.Pods,
        PodsAwaitEvaluation:     getPodsAwaitingEvaluation(podEquivalenceGroups, bestOption.NodeGroup.Id()),
      }, nil
    }
    // 返回不需要扩容
    return &status.ScaleUpStatus{
      Result:                  status.ScaleUpNoOptionsAvailable,
      PodsRemainUnschedulable: getRemainingPods(podEquivalenceGroups, skippedNodeGroups),
      ConsideredNodeGroups:    nodeGroups,
    }, nil
}
3.4.1 ScaleUpInfo

计算出来的 ScaleUpInfo 会传给 Clouder Provider,用于操作云资源

需要开通的资源数量 delta = ScaleUpInfo.NewSize - ScaleUpInfo.CurrentSize

代码语言:go复制
type ScaleUpInfo struct {
	// Group is the group to be scaled-up
	Group cloudprovider.NodeGroup
	// CurrentSize is the current size of the Group
	CurrentSize int
	// NewSize is the size the Group will be scaled-up to
	NewSize int
	// MaxSize is the maximum allowed size of the Group
	MaxSize int
}
3.4.2 computeExpansionOption

这里分为两大块逻辑:预检查 背包计算

预检查:

  • 遍历所有待调度pod
  • 每个 pod 依次去和当前 Node 做预调度,确认一个 Node扩容后可以让 该pod 调度成功
  • 将初步筛选出来满足条件的 pod 列表,

背包计算:

通过前面的计算:所有的 pod中,如果扩容一个Node,一定能满足调度条件;但是到底要创建最少多少个 Node,能满足所有的 pod 调度需求,就要经过 Estimate 模块的背包算法了

  • 通过给定多个 Pod 和多个 Node,计算出最优的 Node 和 Pod 数量
代码语言:go复制
	func computeExpansionOption(...) {
    // 遍历每个待调度的 pod
    // 用每个 pod 去匹配 node,做模拟调度
		for _, eg := range podEquivalenceGroups {
      // 校验调度
      // 内部调用调度框架
      // p.framework.RunPreFilterPlugins(context.TODO(), state, pod)
      // 	filterStatuses := p.framework.RunFilterPlugins(context.TODO(), state, pod, nodeInfo)
      // 确认调度状态是否正确
    	if err := context.PredicateChecker.CheckPredicates(context.ClusterSnapshot, samplePod, nodeInfo.Node().Name); err == nil {
			// 返回可以调度的 pod
			option.Pods = append(option.Pods, eg.pods...)
      // 标记 pod 理论上可以调度成功
			eg.schedulable = true
    }
      
    // 可以成功调度 pod > 0,开始仿真调度
    // 计算需要的 node 数,和可以成功调度的 pod 数
    if len(option.Pods) > 0 {
			estimator := context.EstimatorBuilder(context.PredicateChecker, context.ClusterSnapshot)
      // 调用 Estimate 模块,后面单独介绍
			option.NodeCount, option.Pods = estimator.Estimate(option.Pods, nodeInfo, option.NodeGroup)
		}

		return option, nil
  }

3.5 Estimate

前面介绍过,通过上述计算后,所有的 pod中,如果扩容一个Node,一定能满足调度条件;但是到底要创建最少多少个 Node,能满足所有的 pod 调度需求,就要经过 Estimate 模块的背包算法了

3.5.1 优化目标

通过给定多个 Pod 和多个 Node,计算出最优的 Node 和 Pod 数量

输入
  • 待调度 pod 列表
  • nodeinfo
  • NodeGroup
接口
代码语言:go复制
type Estimator interface {
	Estimate([]*apiv1.Pod, *schedulerframework.NodeInfo, cloudprovider.NodeGroup) (int, []*apiv1.Pod)
}
输出
  • 节点数量
  • pod列表
3.5.2 实现分析
  • 将这组 pod 所需的cpu、memory 资源 / Node节点能提供的资源,计算出每个 pod 的得分
  • 按照得分从高到底排序
  • 按照由高到低得分顺序,依次遍历每个 pod,去匹配 Node
  • 新创建的 Node 保存到 newNodeNames 数组中
  • 如果没有找到,就去创建一个新的 Node。直到所有的 pod 都处理完。
代码语言:go复制
func (e *BinpackingNodeEstimator) Estimate(
  // 计算 pod 的得分
	podInfos := calculatePodScore(pods, nodeTemplate)
	// 按照得分排序
	sort.Slice(podInfos, func(i, j int) bool { return podInfos[i].score > podInfos[j].score })
	// 遍历所有的 pod
  for _, podInfo := range podInfos {
		found := false
		// 确认给定的 pod 是否能调度到 给定的 node 上
    // 内部调用调度框架的 preFilter 依次跟每个 node 过滤一遍, p.framework.RunPreFilterPlugins
		nodeName, err := e.predicateChecker.FitsAnyNodeMatching(e.clusterSnapshot, podInfo.pod, func(nodeInfo *schedulerframework.NodeInfo) bool {
			return newNodeNames[nodeInfo.Node().Name]
		})
		if err == nil {
      // 为 pod 找到合适的 node
			found = true
			scheduledPods = append(scheduledPods, podInfo.pod)
			newNodesWithPods[nodeName] = true
		}

		if !found {
			if lastNodeName != "" && !newNodesWithPods[lastNodeName] {
				continue
			}

      // 添加一个新的 node 进来
			newNodeName, err := e.addNewNodeToSnapshot(nodeTemplate, newNodeNameIndex)
			
			newNodeNameIndex  
			newNodeNames[newNodeName] = true
			lastNodeName = newNodeName

      // 再次尝试调度
      // 如果还是失败:比如设置了 pod 拓扑分布,这种情况无法解决 pending 问题,尝试移除这类 pod
			if err := e.predicateChecker.CheckPredicates(e.clusterSnapshot, podInfo.pod, newNodeName); err != nil {
				continue
			}
			if err := e.clusterSnapshot.AddPod(podInfo.pod, newNodeName); err != nil {
				klog.Errorf("Error adding pod %v.%v to node %v in ClusterSnapshot; %v", podInfo.pod.Namespace, podInfo.pod.Name, newNodeName, err)
				return 0, nil
			}
			newNodesWithPods[newNodeName] = true
			scheduledPods = append(scheduledPods, podInfo.pod)
		}
	}
	return len(newNodesWithPods), scheduledPods
}

3.6 Expander 策略

3.6.1 概述

通过前面的处理,会返回多个 Option 对象,即有多个可选的组合可以满足此次调度需求(可能只是部分 pod,不是全部pod),

Expander 提供多种策略,在这一组答案中最终选择一个作为最终答案。

选择要扩展的节点组提供的不同策略,通过 --expander=least-waste 参数指定,可以多个策略组合

输入

给定多个 option,选择一个最合适的 option。每个 option 对应一个 NodeGroup、需要调度的 pod、Node数量

代码语言:go复制
// Option describes an option to expand the cluster.
type Option struct {
	NodeGroup cloudprovider.NodeGroup
	NodeCount int
	Debug     string
	Pods      []*apiv1.Pod
}
接口
代码语言:go复制
// Strategy describes an interface for selecting the best option when scaling up
type Strategy interface {
	BestOption(options []Option, nodeInfo map[string]*schedulerframework.NodeInfo) *Option
}
输出

最终选定的 Option,即:扩容哪个 NodeGroup、扩容该 NodeGroup 的几台机器

3.6.2 实现分析
策略初始化
代码语言:go复制
func ExpanderStrategyFromStrings(...) {
		...
    // 根据不同的策略,使用不同的 Filter
		switch expanderFlag {
		case expander.RandomExpanderName:
			filters = append(filters, random.NewFilter())
		case expander.MostPodsExpanderName:
			filters = append(filters, mostpods.NewFilter())
		case expander.LeastWasteExpanderName:
			filters = append(filters, waste.NewFilter())
		case expander.PriceBasedExpanderName:
			if _, err := cloudProvider.Pricing(); err != nil {
				return nil, err
			}
			filters = append(filters, price.NewFilter(cloudProvider,
				price.NewSimplePreferredNodeProvider(autoscalingKubeClients.AllNodeLister()),
				price.SimpleNodeUnfitness))
		case expander.PriorityBasedExpanderName:
			// It seems other listers do the same here - they never receive the termination msg on the ch.
			// This should be currently OK.
			stopChannel := make(chan struct{})
			lister := kubernetes.NewConfigMapListerForNamespace(kubeClient, stopChannel, configNamespace)
			filters = append(filters, priority.NewFilter(lister.ConfigMaps(configNamespace), autoscalingKubeClients.Recorder))
		case expander.GRPCExpanderName:
			filters = append(filters, grpcplugin.NewFilter(GRPCExpanderCert, GRPCExpanderURL))
		default:
			return nil, errors.NewAutoscalerError(errors.InternalError, "Expander %s not supported", expanderFlag)
		}
		if _, ok := filters[len(filters)-1].(expander.Strategy); ok {
			strategySeen = true
		}
	}
	// 最后追加一个 random 的 fallback
	return newChainStrategy(filters, random.NewStrategy()), nil
}
调用策略
代码语言:go复制
func ScaleUp() {
  ...
  bestOption := context.ExpanderStrategy.BestOption(options, nodeInfos)
  ...
}

// 执行策略
func (c *chainStrategy) BestOption(options []expander.Option, nodeInfo map[string]*schedulerframework.NodeInfo) *expander.Option {
	filteredOptions := options
  // 依次执行所有的 Filter
	for _, filter := range c.filters {
		filteredOptions = filter.BestOptions(filteredOptions, nodeInfo)
		if len(filteredOptions) == 1 {
			return &filteredOptions[0]
		}
	}
	return c.fallback.BestOption(filteredOptions, nodeInfo)
}
3.6.3 Filter 接口

Expander 策略是通过多个 Filter 实现的,Filter 定义了统一的接口,和多种实现

接口定义

代码语言:go复制
type Filter interface {
	BestOptions(options []Option, nodeInfo map[string]*schedulerframework.NodeInfo) []Option
}
leastwaste 实现
  • 将所需资源与可用资源计算差值,得到分数
  • 取出分数最小的值
代码语言:go复制
func (l *leastwaste) BestOptions(expansionOptions []expander.Option, nodeInfo map[string]*schedulerframework.NodeInfo) []expander.Option {
	var leastWastedScore float64
	var leastWastedOptions []expander.Option

	for _, option := range expansionOptions {
    // 计算所有 pod 总共需要的 cpu、memory 资源
		requestedCPU, requestedMemory := resourcesForPods(option.Pods)
    // 确认当前的 node group 是否存在
		node, found := nodeInfo[option.NodeGroup.Id()]
		if !found {
      // 不存在就匹配下一个 node group
			klog.Errorf("No node info for: %s", option.NodeGroup.Id())
			continue
		}
		// 找到 Node 能够提供的 cpu、memory 资源
    // cpu = node.Status.Capacity[cpu]
    // memory = node.Status.Capacity[memory]
		nodeCPU, nodeMemory := resourcesForNode(node.Node())
    // 可用资源 = 单节点资源 * nodeGroup数量
		availCPU := nodeCPU.MilliValue() * int64(option.NodeCount)
		availMemory := nodeMemory.Value() * int64(option.NodeCount)
    // 浪费资源 = (可用资源 - 所需资源)/ 可用资源
		wastedCPU := float64(availCPU-requestedCPU.MilliValue()) / float64(availCPU)
		wastedMemory := float64(availMemory-requestedMemory.Value()) / float64(availMemory)
    // 浪费资源数 = cpu浪费   memory 浪费
		wastedScore := wastedCPU   wastedMemory

		klog.V(1).Infof("Expanding Node Group %s would waste %0.2f%% CPU, %0.2f%% Memory, %0.2f%% Blendedn", option.NodeGroup.Id(), wastedCPU*100.0, wastedMemory*100.0, wastedScore*50.0)

		if wastedScore == leastWastedScore {
			leastWastedOptions = append(leastWastedOptions, option)
		}
		// 取浪费分数最小的选项
		if leastWastedOptions == nil || wastedScore < leastWastedScore {
			leastWastedScore = wastedScore
			leastWastedOptions = []expander.Option{option}
		}
	}

	if len(leastWastedOptions) == 0 {
		return nil
	}

	return leastWastedOptions
}
mostpods
代码语言:go复制
func (m *mostpods) BestOptions(expansionOptions []expander.Option, nodeInfo map[string]*schedulerframework.NodeInfo) []expander.Option {
	var maxPods int
	var maxOptions []expander.Option

  // 遍历所有的 option
	for _, option := range expansionOptions {
		if len(option.Pods) == maxPods {
			maxOptions = append(maxOptions, option)
		}

    // 取 pod 数量最大的那个选项
		if len(option.Pods) > maxPods {
			maxPods = len(option.Pods)
			maxOptions = []expander.Option{option}
		}
	}

	if len(maxOptions) == 0 {
		return nil
	}

	return maxOptions
}
random
代码语言:go复制
func (r *random) BestOptions(expansionOptions []expander.Option, nodeInfo map[string]*schedulerframework.NodeInfo) []expander.Option {
	// 调用 BestOption
  best := r.BestOption(expansionOptions, nodeInfo)
	if best == nil {
		return nil
	}
	return []expander.Option{*best}
}

func (r *random) BestOption(expansionOptions []expander.Option, nodeInfo map[string]*schedulerframework.NodeInfo) *expander.Option {
	if len(expansionOptions) <= 0 {
		return nil
	}
	// 从所有备选 option 中随机选择一个
	pos := rand.Int31n(int32(len(expansionOptions)))
	return &expansionOptions[pos]
}
priority
代码语言:go复制
func (p *priority) BestOptions(expansionOptions []expander.Option, nodeInfo map[string]*schedulerframework.NodeInfo) []expander.Option {
	
	// 读取名为 cluster-autoscaler-priority-expander,key 为 priorities 的 configmap
  // 将yaml数据转换为 type priorities map[int][]*regexp.Regexp 对象
	priorities, cm, err := p.reloadConfigMap()
	
  // 遍历所有 option
	for _, option := range expansionOptions {
		// 获取 node group 的 id
    id := option.NodeGroup.Id()
		found := false
    // 遍历所有的优先级
		for prio, nameRegexpList := range priorities {
      // 优先级列表中匹配当前的 node group id
      // 匹配不到就跳过
			if !p.groupIDMatchesList(id, nameRegexpList) {
				continue
			}
			found = true
      // 当前优先级低就跳过
			if prio < maxPrio {
				continue
			}
      // 找到优先级最高那个
			if prio > maxPrio {
				maxPrio = prio
				best = nil
			}
			best = append(best, option)

		}
		if !found {
			msg := fmt.Sprintf("Priority expander: node group %s not found in priority expander configuration. " 
				"The group won't be used.", id)
			p.logConfigWarning(cm, "PriorityConfigMapNotMatchedGroup", msg)
		}
	}
	// 优先级失效
	if len(best) == 0 {
		msg := "Priority expander: no priorities info found for any of the expansion options. No options filtered."
		p.logConfigWarning(cm, "PriorityConfigMapNoGroupMatched", msg)
		return expansionOptions
	}

	for _, opt := range best {
		klog.V(2).Infof("priority expander: %s chosen as the highest available", opt.NodeGroup.Id())
	}
	return best
}
price

选择成本最小的,依赖 cloud provider 的价格模型,aws cloud provider 没有实现,可以不用考虑

代码语言:go复制
// BestOption selects option based on cost and preferred node type.
func (p *priceBased) BestOptions(expansionOptions []expander.Option, nodeInfos map[string]*schedulerframework.NodeInfo) []expander.Option {
	var bestOptions []expander.Option
	....
}
grpc
代码语言:go复制
func (g *grpcclientstrategy) BestOptions(expansionOptions []expander.Option, nodeInfo map[string]*schedulerframework.NodeInfo) []expander.Option {
	// 判断 grpcClient 参数是否传入
  if g.grpcClient == nil {
		klog.Errorf("Incorrect gRPC client config, filtering no options")
		return expansionOptions
	}
	
  // 调用 grpc 请求
	bestOptionsResponse, err := g.grpcClient.BestOptions(ctx, &protos.BestOptionsRequest{Options: grpcOptionsSlice, NodeMap: grpcNodeMap})
	...
	return options
}

func (c *expanderClient) BestOptions(ctx context.Context, in *BestOptionsRequest, opts ...grpc.CallOption) (*BestOptionsResponse, error) {
	out := new(BestOptionsResponse)
	err := c.cc.Invoke(ctx, "/grpcplugin.Expander/BestOptions", in, out, opts...)
	if err != nil {
		return nil, err
	}
	return out, nil
}

grpc对应 proto 文件

代码语言:text复制
// Interface for Expander
service Expander {

  rpc BestOptions (BestOptionsRequest)
    returns (BestOptionsResponse) {}
}

3.7 缩容实现

3.7.1 缩容概述

缩容和扩容都在同一个定时器中,即默认10s一个检查循环。满足以下所有条件会触发缩容:

  • 在改节点上运行的所有 pod 的 cpu、memory的总和 < 节点可分配总额的 50%。(所有参数都可定制)
  • 节点上运行的所有 pod(除Daemonset),都可以移动到其他节点(特殊pod可以添加注解禁止CA调度到其他Node)
  • Node 没有添加禁用缩减的 Annotation

缩容其他注意事项:

  • 一个Node从检查出空闲,持续10min时间内依然空闲,才会被真正移除
  • 缩容操作一次之后缩一个,避免不可预期的错误

关键源码实现:

代码语言:go复制
func (a *StaticAutoscaler) RunOnce(...){
  // 缩容逻辑
  if a.ScaleDownEnabled {
    // 获取可能将被删除的 Node 列表,只是初步判断 Node 所在 ASG 实例数是否缩容到最小了
    scaleDownCandidates := GetScaleDownCandidates()
    // 返回某个 Node 被删除后,可能容纳node上 pod 的Node,默认返回所有 nodes
    podDestinations := GetPodDestinationCandidates()
    
    // 关键方法,通过各个维度统计出不再需要的 Node
    // 更新不再需要的 Node 信息,保存在 scaleDown.unneededNodes 中
    scaleDown.UpdateUnneededNodes(podDestinations, scaleDownCandidates)
    
    if scaleDownInCooldown {
      // 缩容冷却中
			scaleDownStatus.Result = status.ScaleDownInCooldown
		} else if scaleDown.nodeDeletionTracker.IsNonEmptyNodeDeleteInProgress() {
      // 正在进行缩容过程中
			scaleDownStatus.Result = status.ScaleDownInProgress
		} else {
    	// 可以开始缩容
      scaleDownStatus := scaleDown.TryToScaleDown(currentTime, pdbs)
    } 
  }
}
3.7.2 源码分析
代码语言:go复制
func (a *StaticAutoscaler) RunOnce(currentTime time.Time) errors.AutoscalerError {
  // 扩容逻辑,前面已分析
  ...
  // 缩容逻辑
  if a.ScaleDownEnabled {
    // 特殊处理的 pod
		pdbs, err := pdbLister.List()
		
    // 计算不再需要的 node
    // 保存待缩容的候选 Node
		var scaleDownCandidates []*apiv1.Node
    // 保存可以存放被删除 Node上pod的节点
		var podDestinations []*apiv1.Node

		if a.processors == nil || a.processors.ScaleDownNodeProcessor == nil {
			scaleDownCandidates = allNodes
			podDestinations = allNodes
		} else {
			var err errors.AutoscalerError
      // 初步筛选处理
			scaleDownCandidates, err = a.processors.ScaleDownNodeProcessor.GetScaleDownCandidates(
				autoscalingContext, allNodes)
			// pod选择新的 node
			podDestinations, err = a.processors.ScaleDownNodeProcessor.GetPodDestinationCandidates(autoscalingContext, allNodes)
		}

		// We use scheduledPods (not originalScheduledPods) here, so artificial scheduled pods introduced by processors
		// (e.g unscheduled pods with nominated node name) can block scaledown of given node.
		if typedErr := scaleDown.UpdateUnneededNodes(podDestinations, scaleDownCandidates, currentTime, pdbs); typedErr != nil {
			scaleDownStatus.Result = status.ScaleDownError
			klog.Errorf("Failed to scale down: %v", typedErr)
			return typedErr
		}

		metrics.UpdateDurationFromStart(metrics.FindUnneeded, unneededStart)

		if klog.V(4).Enabled() {
			for key, val := range scaleDown.unneededNodes {
				klog.Infof("%s is unneeded since %s duration %s", key, val.String(), currentTime.Sub(val).String())
			}
		}

		scaleDownInCooldown := a.processorCallbacks.disableScaleDownForLoop ||
			a.lastScaleUpTime.Add(a.ScaleDownDelayAfterAdd).After(currentTime) ||
			a.lastScaleDownFailTime.Add(a.ScaleDownDelayAfterFailure).After(currentTime) ||
			a.lastScaleDownDeleteTime.Add(a.ScaleDownDelayAfterDelete).After(currentTime)
		// In dry run only utilization is updated
		calculateUnneededOnly := scaleDownInCooldown || scaleDown.nodeDeletionTracker.IsNonEmptyNodeDeleteInProgress()

		klog.V(4).Infof("Scale down status: unneededOnly=%v lastScaleUpTime=%s " 
			"lastScaleDownDeleteTime=%v lastScaleDownFailTime=%s scaleDownForbidden=%v " 
			"isDeleteInProgress=%v scaleDownInCooldown=%v",
			calculateUnneededOnly, a.lastScaleUpTime,
			a.lastScaleDownDeleteTime, a.lastScaleDownFailTime, a.processorCallbacks.disableScaleDownForLoop,
			scaleDown.nodeDeletionTracker.IsNonEmptyNodeDeleteInProgress(), scaleDownInCooldown)
		metrics.UpdateScaleDownInCooldown(scaleDownInCooldown)

		if scaleDownInCooldown {
			scaleDownStatus.Result = status.ScaleDownInCooldown
		} else if scaleDown.nodeDeletionTracker.IsNonEmptyNodeDeleteInProgress() {
			scaleDownStatus.Result = status.ScaleDownInProgress
		} else {
			klog.V(4).Infof("Starting scale down")

			// We want to delete unneeded Node Groups only if there was no recent scale up,
			// and there is no current delete in progress and there was no recent errors.
			removedNodeGroups, err := a.processors.NodeGroupManager.RemoveUnneededNodeGroups(autoscalingContext)
			if err != nil {
				klog.Errorf("Error while removing unneeded node groups: %v", err)
			}

			scaleDownStart := time.Now()
			metrics.UpdateLastTime(metrics.ScaleDown, scaleDownStart)
      // 开始尝试缩容
			scaleDownStatus, typedErr := scaleDown.TryToScaleDown(currentTime, pdbs)
			metrics.UpdateDurationFromStart(metrics.ScaleDown, scaleDownStart)
			metrics.UpdateUnremovableNodesCount(scaleDown.getUnremovableNodesCount())

			scaleDownStatus.RemovedNodeGroups = removedNodeGroups

			if scaleDownStatus.Result == status.ScaleDownNodeDeleteStarted {
				a.lastScaleDownDeleteTime = currentTime
				a.clusterStateRegistry.Recalculate()
			}

			if (scaleDownStatus.Result == status.ScaleDownNoNodeDeleted ||
				scaleDownStatus.Result == status.ScaleDownNoUnneeded) &&
				a.AutoscalingContext.AutoscalingOptions.MaxBulkSoftTaintCount != 0 {
        // 
				scaleDown.SoftTaintUnneededNodes(allNodes)
			}

			if a.processors != nil && a.processors.ScaleDownStatusProcessor != nil {
				scaleDownStatus.SetUnremovableNodesInfo(scaleDown.unremovableNodeReasons, scaleDown.nodeUtilizationMap, scaleDown.context.CloudProvider)
				a.processors.ScaleDownStatusProcessor.Process(autoscalingContext, scaleDownStatus)
				scaleDownStatusProcessorAlreadyCalled = true
			}

			if typedErr != nil {
				klog.Errorf("Failed to scale down: %v", typedErr)
				a.lastScaleDownFailTime = currentTime
				return typedErr
			}
		}
	}
	return nil
}
GetScaleDownCandidates

这一步只是判断哪些 Node 节点所在的 ASG 符合要求

代码语言:go复制
func (n *PreFilteringScaleDownNodeProcessor) GetScaleDownCandidates(ctx *context.AutoscalingContext,
	nodes []*apiv1.Node) ([]*apiv1.Node, errors.AutoscalerError) {
	result := make([]*apiv1.Node, 0, len(nodes))

  // 获取每个 asg 当前的实例个数,保存为 map
	nodeGroupSize := utils.GetNodeGroupSizeMap(ctx.CloudProvider)

  // 遍历所有 node
	for _, node := range nodes {
    // 获取当前 node 所属的 asg
		nodeGroup, err := ctx.CloudProvider.NodeGroupForNode(node)
		// 获取当前 asg 的实例数
		size, found := nodeGroupSize[nodeGroup.Id()]
		// 获取 asg 的最小实例数。当前实例数已经最小了,就跳过不再缩容
		if size <= nodeGroup.MinSize() {
			klog.V(1).Infof("Skipping %s - node group min size reached", node.Name)
			continue
		}
    // 追加到结果中
		result = append(result, node)
	}
	return result, nil
}
GetPodDestinationCandidates

默认返回所有的 Node

代码语言:go复制
func (n *PreFilteringScaleDownNodeProcessor) GetPodDestinationCandidates(ctx *context.AutoscalingContext,
	nodes []*apiv1.Node) ([]*apiv1.Node, errors.AutoscalerError) {
	return nodes, nil
}
UpdateUnneededNodes

计算不再需要的node,从以下维度逐一排查:

  • 所有的 pod 可以被调度到其他节点
  • 资源使用率低于某个阈值
  • 其他判断

找到可以移除的节点,放到 unneededNodes 数组中,便于后面移除

代码语言:go复制
// destinationNodes:可以用来安置由于缩容导致被驱逐的pod的节点
// scaleDownCandidates:可以考虑缩容的节点
func (sd *ScaleDown) UpdateUnneededNodes(
	destinationNodes []*apiv1.Node,
	scaleDownCandidates []*apiv1.Node,
	timestamp time.Time,
	pdbs []*policyv1.PodDisruptionBudget,
) errors.AutoscalerError {

  // 第一步:计算节点资源利用率(只计算被管理的节点)
	for _, node := range scaleDownCandidates {
		// 获取节点信息
    nodeInfo, err := sd.context.ClusterSnapshot.NodeInfos().Get(node.Name)
		// 检查节点情况,是否满足缩容
    // 1. 节点是否最近已经被标记为删除,这种节点打了 ToBeDeletedByClusterAutoscaler 的 taint
    // 2. 节点是否有 cluster-autoscaler.kubernetes.io/scale-down-disabled 这个禁止缩容的标签
    // 3. CalculateUtilization 计算资源使用率:累加所有 pod 上容器设置的 cpu、memroy request值
    // 4. isNodeBelowUtilizationThreshold 判断资源使用是否达到阈值(可启动时配置)
		reason, utilInfo := sd.checkNodeUtilization(timestamp, node, nodeInfo)
		
		// 保存可以被删除的节点
		currentlyUnneededNodeNames = append(currentlyUnneededNodeNames, node.Name)
	}

	// 第二步:将候选缩容节点和其他节点分开
	currentCandidates, currentNonCandidates := sd.chooseCandidates(currentlyUnneededNonEmptyNodes)

  // 找到新节点,用于移除候选节点
	nodesToRemove, unremovable, newHints, simulatorErr := simulator.FindNodesToRemove(
		currentCandidates,
		destinations,
		nil,
		sd.context.ClusterSnapshot,
		sd.context.PredicateChecker,
		len(currentCandidates),
		true,
		sd.podLocationHints,
		sd.usageTracker,
		timestamp,
		pdbs)

  //  additionalCandidatesCount 表示用于缩容额外的备选节点数量
	additionalCandidatesCount := sd.context.ScaleDownNonEmptyCandidatesCount - len(nodesToRemove)
	if additionalCandidatesCount > len(currentNonCandidates) {
		additionalCandidatesCount = len(currentNonCandidates)
	}

  // 限制并发缩容数量
	additionalCandidatesPoolSize := int(math.Ceil(float64(len(allNodeInfos)) * sd.context.ScaleDownCandidatesPoolRatio))
	if additionalCandidatesPoolSize < sd.context.ScaleDownCandidatesPoolMinCount {
		additionalCandidatesPoolSize = sd.context.ScaleDownCandidatesPoolMinCount
	}
	if additionalCandidatesPoolSize > len(currentNonCandidates) {
		additionalCandidatesPoolSize = len(currentNonCandidates)
	}
	if additionalCandidatesCount > 0 {
	
    // 找到新节点,用于移除候选节点
		additionalNodesToRemove, additionalUnremovable, additionalNewHints, simulatorErr :=
			simulator.FindNodesToRemove(
				currentNonCandidates[:additionalCandidatesPoolSize],
				destinations,
				nil,
				sd.context.ClusterSnapshot,
				sd.context.PredicateChecker,
				additionalCandidatesCount,
				true,
				sd.podLocationHints,
				sd.usageTracker,
				timestamp,
				pdbs)
	}
  // 将待移除节点保存到 unneededNodes 数组中
  for _, node := range nodesToRemove {
		name := node.Node.Name
		unneededNodesList = append(unneededNodesList, node.Node)
		if val, found := sd.unneededNodes[name]; !found {
			result[name] = timestamp
		} else {
			result[name] = val
		}
	}
	...
}
TryToScaleDown
代码语言:go复制
func (sd *ScaleDown) TryToScaleDown(
	currentTime time.Time,
	pdbs []*policyv1.PodDisruptionBudget,
) (*status.ScaleDownStatus, errors.AutoscalerError) {

	...
  // 遍历待删除 node 列表
	for nodeName, unneededSince := range sd.unneededNodes {
		
		// 获取 nodeinfo、node 信息
    nodeInfo, err := sd.context.ClusterSnapshot.NodeInfos().Get(nodeName)
		node := nodeInfo.Node()
		// 检查 node 是否打上了禁止删除的 annotation
		if hasNoScaleDownAnnotation(node) {
			klog.V(4).Infof("Skipping %s - scale down disabled annotation found", node.Name)
			sd.addUnremovableNodeReason(node, simulator.ScaleDownDisabledAnnotation)
			continue
		}
		// 获取 node 状态,根据状态做一些处理
		ready, _, _ := kube_util.GetReadinessState(node)
		
    // 计算缩容资源
		scaleDownResourcesDelta, err := sd.computeScaleDownResourcesDelta(sd.context.CloudProvider, node, nodeGroup, resourcesWithLimits)
		// 检查资源限制
		checkResult := scaleDownResourcesLeft.checkScaleDownDeltaWithinLimits(scaleDownResourcesDelta)
		...
		candidateNames = append(candidateNames, node.Name)
		candidateNodeGroups[node.Name] = nodeGroup
	}

  // 寻找一个待移除节点
	nodesToRemove, unremovable, _, err := simulator.FindNodesToRemove(
		candidateNames,
		nodesWithoutMasterNames,
		sd.context.ListerRegistry,
		sd.context.ClusterSnapshot,
		sd.context.PredicateChecker,
		1,
		false,
		sd.podLocationHints,
		sd.usageTracker,
		time.Now(),
		pdbs)

  // 计算时差
	nodeDeletionDuration = time.Now().Sub(nodeDeletionStart)
	sd.nodeDeletionTracker.SetNonEmptyNodeDeleteInProgress(true)

	go func() {
		...
    // 删除节点
		result = sd.deleteNode(toRemove.Node, toRemove.PodsToReschedule, toRemove.DaemonSetPods, nodeGroup)
  }
}

四、CA 使用注意

aws官方说明

4.1 asg 自动发现参数配置

  • 为 AutoScaling 设置两个标签,便于 CA 自动发现
  • 关于跨可用区:
  • 可以设置多个 AutoScaling 组,每个组一个可用区,通过开启--balance-similar-node-groups` 功能。注意:需要为不同的组设置相同的一批标签
  • 也可以设置同一个 AutoScaling 组,但是必须将组设置可跨多个可用区
  • 更推荐使用多个 AutoScaling 组

4.2 优化节点组:

  • 节点组中的每个节点必须具有相同的调度属性,包括标签、污点和资源
    • 策略中指定的第一个实例类型模拟调度。
    • 如果您的策略具有拥有更多资源的其他实例类型,则在横向扩展后可能会浪费资源。
    • 如果您的策略具有其他实例类型,其资源比原始实例类型少,则 Pod 在实例上调度可能失败。
  • 请使用较多节点配置较少数量的节点组,因为相反的配置可能会对可扩展性产生不利影响。

4.3 AutoScaling

  • 混合实例策略:支持多个实例类型,配置时推荐使用相似的资源类型:比如:M4M5M5a,M5n
  • 可以通过 configmap 设置不同 AutoScaling 的优先级
  • AutoScaling 的机型也支持权重
  • 支持启动配置、启动模板两种模式
  • 启动模板里面指定机型
  • 启动模板覆盖项支持配置多个机型

4.4 Expander 策略

选择要扩展的节点组提供的不同策略,通过 --expander=least-waste 参数指定

可选参数包括:

  • random:随机选择
  • most-pods:能满足最多 pod 调度的
  • Least-waste:最少 cpu 和 memroy
  • Price:成本最小
  • priority:按用户指定的优先级
  • grpc:调用外部 grpc 服务选择扩容节点

4.5 超额配置

  • 通过配置一个空的Deployment,占用资源,如果资源不足优先驱逐,达到尽快扩容的目的

4.6 防止pod被驱逐

配置 `cluster-autoscaler.kubernetes.io/safe-to-evict=false 注解,可以确保 pod不被驱逐,pod所在 node 不被缩减

0 人点赞