一文搞懂 Kubernetes HPA 实现原理-(上篇)

2023-08-11 15:25:28 浏览数 (1)

Hello folks,我是 Luga,今天我们来聊一下云原生生态核心技术 Kubernetes Autoscaling 之一的—— Horizontal Pod Autoscaler (HPA)。

01

Horizontal Pod Autoscaler (HPA) 基礎概念

在前一期的文章中,我们有对 Horizontal Pod Autoscaler (HPA) 基礎概念进行简要解析,具体可点击如下图片查看。

HPA 是 Kubernetes 的一个核心组件,能够自动更新部署和 StatefulSet 等工作负载资源,并根据集群中应用程序的需求进行水平扩展。水平扩展是指增加更多的 Pod 来应对负载的增加,与垂直扩展(通常被定义为已运行的 Pod 分配更多的 Kubernetes 节点资源,例如内存和 CPU)不同。

当负载减少并且 Pod 数量超过了最小配置值时,HPA 会通知相应的工作负载资源(例如 Deployment)进行缩小,以节省资源并提高效率。使用 HPA 可以确保应用程序始终具有所需的资源,并且可以自动适应负载变化,从而提高应用程序的可用性和性能。

02

Horizontal Pod Autoscaler (HPA) 实现原理

通常来讲,Horizontal Pod Autoscaler(HPA)是 Kubernetes 中的一种资源对象,用于自动调整 Pod 的副本数量,以确保 Pod 的资源利用率和可用性达到最优状态。

Horizontal Pod Autoscaler(HPA)跟据当前指标和期望指标来计算扩缩比例,具体公式为:

代码语言:javascript复制
DesiredReplicas = ceil[currentReplicas * ( currentMetricValue / desiredMetricValue )]

(1)currentReplicas:当前 Pod 实例的副本数量。

(2)currentMetricValue:当前 Pod 实例的指标值,例如 CPU 利用率或内存利用率等。

(3)desiredMetricValue:期望的指标值,即 HPA 对象中定义的目标指标值。

(4)DesiredReplicas:根据当前指标和期望指标计算出的目标副本数量,需要向上取整。

上述公式的含义在于,根据当前 Pod 实例的指标值和期望的指标值,计算出应该有多少个 Pod 实例在运行,以实现最优的资源利用率和可用性。

例如,如果当前 Pod 实例的副本数量为 10,当前 CPU 利用率为 50%,而期望的 CPU 利用率为 80%,则根据上述公式,应该有 ceil[10 * (0.5 / 0.8)] = ceil[6.25] = 7 个 Pod 实例在运行,以实现最优的资源利用率和可用性。因此,HPA 会自动将 Pod 实例的副本数量增加到 7 个,以实现自动缩放的效果。

需要注意的是,HPA 会同时根据多个指标进行自动调整,例如 CPU 利用率、内存利用率、网络吞吐量等,以确保 Pod 的资源利用率和可用性达到最优状态。同时,HPA 还会根据 HPA 对象中定义的扩缩比例限制和最大副本数量、最小副本数量等规则,对目标副本数量进行调整,以保证集群的稳定性和可靠性。

Horizontal Pod Autoscaler(HPA)实现原理可参考下图所示:

基于上述参考示意图,我们可以看到,HPA 的工作原理可以分为以下几个关键步骤,具体:

1、监听 Pod 资源使用情况:HPA Controller 会周期性地获取与 Pod 相关的指标数据,例如 CPU 利用率和内存利用率等。

2、计算目标副本数量:根据 HPA 对象中定义的规则和指标,HPA Controller 会计算出当前 Pod 所需要的目标副本数量。例如,如果 CPU 利用率超过了一定阈值,则需要增加 Pod 实例的副本数量。

3、更新 ReplicaSet 对象:一旦计算出目标副本数量,HPA Controller 会自动更新关联的 ReplicaSet 对象,以确保 Pod 的副本数量始终保持在目标范围内。例如,如果目标副本数量为 3,而当前只有 2 个 Pod 实例在运行,则 HPA Controller 会自动增加一个新的 Pod 实例,以实现副本数量的自动缩放。

4、监控和调整:一旦更新了 ReplicaSet 对象,HPA Controller 会持续监控 Pod 的资源使用情况,并根据需要自动调整 Pod 的副本数量,以确保 Pod 的资源利用率和可用性达到最优状态。如果资源利用率下降了,HPA Controller 会自动减少 Pod 实例的副本数量,以降低资源消耗;如果资源利用率上升,HPA Controller 会自动增加 Pod 实例的副本数量,以提高资源利用率和可用性。

03

Horizontal Pod Autoscaler (HPA) 源码剖析

「注:本次源码剖析版本为 Kubernetes v1.25.0」

1. 初始化操作

源码路径:cmd/kube-controller-manager/app/controllermanager.go。

代码语言:javascript复制
// paired to their InitFunc.  This allows for structured downstream composition and subdivision.
func NewControllerInitializers(loopMode ControllerLoopMode) map[string]InitFunc {
  controllers := map[string]InitFunc{}
  controllers["endpoint"] = startEndpointController
  controllers["endpointslice"] = startEndpointSliceController
  controllers["endpointslicemirroring"] = startEndpointSliceMirroringController
  controllers["replicationcontroller"] = startReplicationController
  controllers["podgc"] = startPodGCController
  controllers["resourcequota"] = startResourceQuotaController
  controllers["namespace"] = startNamespaceController
  controllers["serviceaccount"] = startServiceAccountController
  controllers["garbagecollector"] = startGarbageCollectorController
  controllers["daemonset"] = startDaemonSetController
  controllers["job"] = startJobController
  controllers["deployment"] = startDeploymentController
  controllers["replicaset"] = startReplicaSetController
  controllers["horizontalpodautoscaling"] = startHPAController
  ...

通常来讲,NewControllerInitializers 是命名控制器组的公共映射(我们可以在 init 函数中启动多个),用于实现 Controller 的初始化和注册。命名控制器组是 Kubernetes 提供的一种机制,用于将多个 Controller 组织起来,并为它们提供共享的初始化和配置方法。在命名控制器组中,每个 Controller 都有一个唯一的名称,用于标识和区分不同的 Controller。通过 NewControllerInitializers 方法,可以将多个 Controller 的初始化方法注册到命名控制器组中,并在控制器管理器启动时自动加载和启动这些 Controller。

在 init 函数中,可以通过 NewControllerInitializers 方法启动多个 Controller,并将它们注册到不同的命名控制器组中。这样,就可以实现更加灵活和高效的控制器管理和调度,提高系统的可用性和稳定性。

在上述源码中,HPA Controller 作为 Kubernetes 中的一种 Controller,主要负责根据当前的 Pod 资源使用情况,自动调整 ReplicaSet 的副本数量,以确保 Pod 的资源利用率和可用性达到最优状态。与其他 Controller 类似,HPA Controlle r也需要在 Kubernetes 的 NewControllerInitializers 方法中进行注册,并通过 startHPAController 方法进行启动。

2. 启动 HPAController

源码路径:cmd/kube-controller-manager/app/autoscaling.go。

代码语言:javascript复制
func startHPAController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
  if !controllerContext.AvailableResources[schema.GroupVersionResource{Group: "autoscaling", Version: "v1", Resource: "horizontalpodautoscalers"}] {
    return nil, false, nil
  }

  return startHPAControllerWithRESTClient(ctx, controllerContext)
}

func startHPAControllerWithRESTClient(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
  clientConfig := controllerContext.ClientBuilder.ConfigOrDie("horizontal-pod-autoscaler")
  hpaClient := controllerContext.ClientBuilder.ClientOrDie("horizontal-pod-autoscaler")

  apiVersionsGetter := custom_metrics.NewAvailableAPIsGetter(hpaClient.Discovery())
  // invalidate the discovery information roughly once per resync interval our API
  // information is *at most* two resync intervals old.
  go custom_metrics.PeriodicallyInvalidate(
    apiVersionsGetter,
    controllerContext.ComponentConfig.HPAController.HorizontalPodAutoscalerSyncPeriod.Duration,
    ctx.Done())

  metricsClient := metrics.NewRESTMetricsClient(
    resourceclient.NewForConfigOrDie(clientConfig),
    custom_metrics.NewForConfig(clientConfig, controllerContext.RESTMapper, apiVersionsGetter),
    external_metrics.NewForConfigOrDie(clientConfig),
  )
  return startHPAControllerWithMetricsClient(ctx, controllerContext, metricsClient)
}

func startHPAControllerWithMetricsClient(ctx context.Context, controllerContext ControllerContext, metricsClient metrics.MetricsClient) (controller.Interface, bool, error) {
  hpaClient := controllerContext.ClientBuilder.ClientOrDie("horizontal-pod-autoscaler")
  hpaClientConfig := controllerContext.ClientBuilder.ConfigOrDie("horizontal-pod-autoscaler")

  // we don't use cached discovery because DiscoveryScaleKindResolver does its own caching,
  // so we want to re-fetch every time when we actually ask for it
  scaleKindResolver := scale.NewDiscoveryScaleKindResolver(hpaClient.Discovery())
  scaleClient, err := scale.NewForConfig(hpaClientConfig, controllerContext.RESTMapper, dynamic.LegacyAPIPathResolverFunc, scaleKindResolver)
  if err != nil {
    return nil, false, err
  }

  go podautoscaler.NewHorizontalController(
    hpaClient.CoreV1(),
    scaleClient,
    hpaClient.AutoscalingV2(),
    controllerContext.RESTMapper,
    metricsClient,
    controllerContext.InformerFactory.Autoscaling().V2().HorizontalPodAutoscalers(),
    controllerContext.InformerFactory.Core().V1().Pods(),
    controllerContext.ComponentConfig.HPAController.HorizontalPodAutoscalerSyncPeriod.Duration,
    controllerContext.ComponentConfig.HPAController.HorizontalPodAutoscalerDownscaleStabilizationWindow.Duration,
    controllerContext.ComponentConfig.HPAController.HorizontalPodAutoscalerTolerance,
    controllerContext.ComponentConfig.HPAController.HorizontalPodAutoscalerCPUInitializationPeriod.Duration,
    controllerContext.ComponentConfig.HPAController.HorizontalPodAutoscalerInitialReadinessDelay.Duration,
  ).Run(ctx)
  return nil, true, nil
}

在上述代码中,startHPAController 方法是 HPA Controller 的入口,其首先调用 startHPAControllerWithLegacyClient 方法,创建一个新的 HPAController 对象,并将它注册到 Kubernetes 的命名控制器组中。接着,startHPAControllerWithLegacyClient 方法会创建一个新的 HeapsterMetricsClient 对象,并传递给 startHPAControllerWithMetricsClient 方法。

startHPAControllerWithMetricsClient 方法则会使用传入的 HeapsterMetricsClient 对象,创建一个新的 scaleClient 对象,并调用 NewHorizontalController 方法,初始化 HPA Controller 的相关参数和配置。这个方法会创建一个新的 HorizontalController 对象,并使用传入的参数,启动 HPA Controller 的主循环。

3. Replicas (副本数)计算

接下来,正式进入 HPAController 的核心环节,具体如下所示:‍‍‍

代码语言:javascript复制
// 开始观测及同步
func (a *HorizontalController) Run(ctx context.Context) {
  defer utilruntime.HandleCrash()
  defer a.queue.ShutDown()

  klog.Infof("Starting HPA controller")
  defer klog.Infof("Shutting down HPA controller")

  if !cache.WaitForNamedCacheSync("HPA", ctx.Done(), a.hpaListerSynced, a.podListerSynced) {
    return
  }

  // 启动一个每 s执行一次的异步线程
  go wait.UntilWithContext(ctx, a.worker, time.Second)

  <-ctx.Done()
}

上述代码会启动 HPA Controller 并开始监听 Pod 资源使用情况的变化,根据 Pod 资源利用情况自动调整 ReplicaSet 的副本数量,以确保 Pod 的资源利用率和可用性达到最优状态。

在 Run 方法中,首先使用 cache.WaitForNamedCacheSync 方法等待 HPA Controller 的相关缓存数据同步完成。该方法会阻塞当前线程,直到所有缓存数据都已经同步完成,或者收到了上下文对象的取消信号(ctx.Done())。

随后,Run 方法会启动一个异步线程,每秒执行一次 a.worker 方法。a.worker 方法是 HPA Controller 的核心逻辑,它会根据当前 Pod 的资源利用情况,计算出应该有多少个 Pod 实例在运行,并更新关联的 ReplicaSet 对象。这样,就可以确保在各种负载下,Pod 的资源利用率和可用性都得到了最大化的优化。

最后,Run 方法会阻塞当前线程,直到收到上下文对象的取消信号(ctx.Done())。一旦收到取消信号,Run 方法会退出当前循环,并执行清理工作,包括关闭队列和输出日志信息。

接下来,我们来看一下 HPA 的核心部分:reconcileAutoscaler 方法。具体源码实现如下所示:

代码语言:javascript复制
func (a *HorizontalController) reconcileAutoscaler(ctx context.Context, hpaShared *autoscalingv2.HorizontalPodAutoscaler, key string) error {
  // make a copy so that we never mutate the shared informer cache (conversion can mutate the object)
  hpa := hpaShared.DeepCopy()
  hpaStatusOriginal := hpa.Status.DeepCopy()
 ...
 if scale.Spec.Replicas == 0 && minReplicas != 0 {
    // 如果副本数为0,则该资源的自动缩放被禁用
    desiredReplicas = 0
    rescale = false
    setCondition(hpa, autoscalingv2.ScalingActive, v1.ConditionFalse, "ScalingDisabled", "scaling is disabled since the replica count of the target is zero")
  } 
    // 如果当前副本数大于最大期望副本数,那么设置期望副本数为最大副本数
    else if currentReplicas > hpa.Spec.MaxReplicas {
    rescaleReason = "Current number of replicas above Spec.MaxReplicas"
    desiredReplicas = hpa.Spec.MaxReplicas
  } 
    // 与上述对应
    else if currentReplicas < minReplicas {
    rescaleReason = "Current number of replicas below Spec.MinReplicas"
    desiredReplicas = minReplicas
  } else {
    // 计算需要扩缩容的数量
    var metricTimestamp time.Time
    metricDesiredReplicas, metricName, metricStatuses, metricTimestamp, err = a.computeReplicasForMetrics(ctx, hpa, scale, hpa.Spec.Metrics)
    if err != nil {
      a.setCurrentReplicasInStatus(hpa, currentReplicas)
      if err := a.updateStatusIfNeeded(ctx, hpaStatusOriginal, hpa); err != nil {
        utilruntime.HandleError(err)
      }
      a.eventRecorder.Event(hpa, v1.EventTypeWarning, "FailedComputeMetricsReplicas", err.Error())
      return fmt.Errorf("failed to compute desired number of replicas based on listed metrics for %s: %v", reference, err)
    }

    klog.V(4).Infof("proposing %v desired replicas (based on %s from %s) for %s", metricDesiredReplicas, metricName, metricTimestamp, reference)

    rescaleMetric := ""
    if metricDesiredReplicas > desiredReplicas {
      desiredReplicas = metricDesiredReplicas
      rescaleMetric = metricName
    }
    if desiredReplicas > currentReplicas {
      rescaleReason = fmt.Sprintf("%s above target", rescaleMetric)
    }
    if desiredReplicas < currentReplicas {
      rescaleReason = "All metrics below target"
    }
    if hpa.Spec.Behavior == nil {
      desiredReplicas = a.normalizeDesiredReplicas(hpa, key, currentReplicas, desiredReplicas, minReplicas)
    } else {
      desiredReplicas = a.normalizeDesiredReplicasWithBehaviors(hpa, key, currentReplicas, desiredReplicas, minReplicas)
    }
    rescale = desiredReplicas != currentReplicas
  }

  if rescale {
    scale.Spec.Replicas = desiredReplicas
    _, err = a.scaleNamespacer.Scales(hpa.Namespace).Update(ctx, targetGR, scale, metav1.UpdateOptions{})
    if err != nil {
      a.eventRecorder.Eventf(hpa, v1.EventTypeWarning, "FailedRescale", "New size: %d; reason: %s; error: %v", desiredReplicas, rescaleReason, err.Error())
      setCondition(hpa, autoscalingv2.AbleToScale, v1.ConditionFalse, "FailedUpdateScale", "the HPA controller was unable to update the target scale: %v", err)
      a.setCurrentReplicasInStatus(hpa, currentReplicas)
      if err := a.updateStatusIfNeeded(ctx, hpaStatusOriginal, hpa); err != nil {
        utilruntime.HandleError(err)
      }
      return fmt.Errorf("failed to rescale %s: %v", reference, err)
    }
    setCondition(hpa, autoscalingv2.AbleToScale, v1.ConditionTrue, "SucceededRescale", "the HPA controller was able to update the target scale to %d", desiredReplicas)
    a.eventRecorder.Eventf(hpa, v1.EventTypeNormal, "SuccessfulRescale", "New size: %d; reason: %s", desiredReplicas, rescaleReason)
    a.storeScaleEvent(hpa.Spec.Behavior, key, currentReplicas, desiredReplicas)
    klog.Infof("Successful rescale of %s, old size: %d, new size: %d, reason: %s",
      hpa.Name, currentReplicas, desiredReplicas, rescaleReason)
  } else {
    klog.V(4).Infof("decided not to scale %s to %v (last scale time was %s)", reference, desiredReplicas, hpa.Status.LastScaleTime)
    desiredReplicas = currentReplicas
  }

  a.setStatus(hpa, currentReplicas, desiredReplicas, metricStatuses, rescale)
  return a.updateStatusIfNeeded(ctx, hpaStatusOriginal, hpa)
}

上述代码展示了 HPA Controller 的核心逻辑,通过根据 Pod 资源利用情况和 HPA 对象中定义的指标,计算出应该有多少个 Pod 实例在运行,并自动调整 ReplicaSet 的副本数量,以确保 Pod 的资源利用率和可用性达到最优状态。同时,此方法还负责追踪缩放原因、更新状态信息和处理错误信息等。

具体核心逻辑实现如下:‍‍

首先,代码判断 scale.Spec.Replicas 是否等于 0,如果等于 0 并且 minReplicas 不等于 0,则认为自动缩放已被禁用,目标副本数量为 0,不需要进行缩放。否则,继续进行下一步判断。

接着,代码判断当前 Pod 实例的副本数量是否大于 HPA 对象中定义的最大副本数量(hpa.Spec.MaxReplicas),如果是,则将目标副本数量设置为最大副本数量,同时记录下缩放原因为“当前副本数量已超过最大副本数量”。

如果当前 Pod 实例的副本数量小于 HPA 对象中定义的最小副本数量(hpa.Spec.MinReplicas),则将目标副本数量设置为最小副本数量,同时记录下缩放原因为“当前副本数量已低于最小副本数量”。

如果当前 Pod 实例的副本数量既不超过最大副本数量,也不低于最小副本数量,则通过 a.computeReplicasForMetrics 方法计算出目标副本数量。该方法会根据 HPA 对象中定义的指标(hpa.Spec.Metrics)和 Pod 的资源利用情况,计算出应该有多少个 Pod 实例在运行,并更新关联的 ReplicaSet 对象。

如果计算目标副本数量的过程中出现错误,将会进行错误处理,并返回错误信息。

如果计算目标副本数量成功,则根据计算出的目标副本数量和当前副本数量,判断是否需要进行缩放操作。如果需要进行缩放,将会更新 ReplicaSet 对象的副本数量,并记录下缩放原因。

最后,代码会更新 HPA 对象的状态,并将状态信息更新到 Kubernetes API 服务器。如果更新状态信息过程中出现错误,将会进行错误处理。

由于篇幅原因,本次源码解析到此为止。在接下来的文章中,我们将围绕核心代码进行一步步分析,敬请期待,谢谢!

Adiós !

··································

0 人点赞