K8S之HPA自动扩缩容机制

2023-08-19 09:33:24 浏览数 (1)

简介

kubectl scale 命令可以来实现 Pod 的扩缩容功能,但是这个毕竟是完全手动操作的,要应对线上的各种复杂情况,我们需要能够做到自动化去感知业务,来自动进行扩缩容。为此,Kubernetes 也为我们提供了这样的一个资源对象: Horizontal Pod Autoscaling(Pod 水平自动伸缩) ,简称 HPA ,HPA 通过监控分析一些控制器控制的所有 Pod 的负载变化情况来确定是否需要调整 Pod 的副本数量

HPA(Horizontal Pod Autoscaler)是kubernetes的一种资源对象,能够根据某些指标对在statefulset、replicacontroller、replicaset等集合中的pod数量进行动态伸缩,使运行在上面的服务对指标的变化有一定的自适应能力。

HPA目前支持四种类型的指标,分别是Resource、Object、External、Pods。其中在稳定版本autoscaling/v1只支持对CPU指标的动态伸缩,在测试版本autoscaling/v2beta2中支持memory和自定义指标的动态伸缩,并以annotation的方式工作在autoscaling/v1版本中。 注意:Pod的自动缩放不适用于无法缩放的对象。

设置

可以通过使用kubectl来创建HPA。如通过 kubectl create 命令创建一个 HPA 对象,也可以通过kubectl autoscale来创建 HPA 对象。 例如,命令 kubectl autoscale rs foo --min=2 --max=5 --cpu-percent=80 将会为名 为 foo 的 ReplicationSet 创建一个 HPA 对象, 目标 CPU 使用率为 80%,副本数量配置为 2 到 5 之间。 如果指标变化太频繁,我们也可以使用 --horizontal-pod-autoscaler-downscale-stabilization 指令设置扩缩容延迟时间,表示的是自从上次缩容执行结束后,多久可以再次执行缩容,默认是5m。

配置

代码语言:javascript复制
apiVersion: autoscaling/v2beta2
kind: HorizontalPodAutoscaler
metadata:
  name: php-apache
  namespace: default
spec:
  # HPA的伸缩对象描述,HPA会动态修改该对象的pod数量
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: php-apache
  # HPA的最小pod数量和最大pod数量
  minReplicas: 1
  maxReplicas: 10
  # 监控的指标数组,支持多种类型的指标共存
  metrics:
  # Object类型的指标
  - type: Object
    object:
      metric:
        # 指标名称
        name: requests-per-second
      # 监控指标的对象描述,指标数据来源于该对象
      describedObject:
        apiVersion: networking.k8s.io/v1beta1
        kind: Ingress
        name: main-route
      # Value类型的目标值,Object类型的指标只支持Value和AverageValue类型的目标值
      target:
        type: Value
        value: 10k
  # Resource类型的指标
  - type: Resource
    resource:
      name: cpu
      # Utilization类型的目标值,Resource类型的指标只支持Utilization和AverageValue类型的目标值
      target:
        type: Utilization
        averageUtilization: 50
  # Pods类型的指标
  - type: Pods
    pods:
      metric:
        name: packets-per-second
      # AverageValue类型的目标值,Pods指标类型下只支持AverageValue类型的目标值
      target:
        type: AverageValue
        averageValue: 1k
  # External类型的指标
  - type: External
    external:
      metric:
        name: queue_messages_ready
        # 该字段与第三方的指标标签相关联
        selector:
          matchLabels:
            env: "stage"
            app: "myapp"
      # External指标类型下只支持Value和AverageValue类型的目标值
      target:
        type: AverageValue
        averageValue: 30

源码分析

先上结论: HPA在kubernetes中也由一个controller控制,controller会间隔循环HPA,检查每个HPA中监控的指标是否触发伸缩条件,默认的间隔时间为15s。一旦触发伸缩条件,controller会向kubernetes发送请求,修改伸缩对象(statefulSet、replicaController、replicaSet)子对象scale中控制pod数量的字段。kubernetess响应请求,修改scale结构体,然后会刷新一次伸缩对象的pod数量。伸缩对象被修改后,自然会通过list/watch机制增加或减少pod数量,达到动态伸缩的目的。

  • 对于每个pod的资源指标(如CPU),控制器从资源指标API中获取每一个HorizontalPodAutoscaler指定的pod的指标,如果设置了目标使用率,控制器会获取每个Pod中的容器资源使用情况,并计算资源使用率。如果使用原始值,将直接使用原始数据,进而计算出目标副本数。这里注意的是,如果Pod某些容器不支持资源采集,那么该控制器将不会使用该pod的CPU使用率。
  • 如果pod使用自定义指标,控制器机制与资源指标类型,区别在于自定义的指标只适用原始值,而不是利用率。
  • 如果pod使用的对象指标和外部指标(每个指标描述一个对象信息),这个指标将直接跟目标指标设定值相比较,并生成一个上述的缩放比例。在最新的autoscaling/v2beta2版本API中,这个指标也可以根据pod数量平分后再进行计算。通常情况,控制器从一系列的聚合API(metrics.k8s.io,custom.metrics.k8s.io和external.metrics.k8s.io)中获取指标数据。metrics.k8s.io API通常由metrics-server(这里需要额外启动)提供。

下面来看下具体的源码分析 hpa也是由一个controller控制的,因此: 入口:cmd/kube-controller-manager/app/controllermanager.go

代码语言:javascript复制
func NewControllerInitializers(loopMode ControllerLoopMode) map[string]InitFunc {
  controllers := map[string]InitFunc{}
  controllers["endpoint"] = startEndpointController
  controllers["endpointslice"] = startEndpointSliceController
  controllers["endpointslicemirroring"] = startEndpointSliceMirroringController
  controllers["replicationcontroller"] = startReplicationController
  controllers["podgc"] = startPodGCController
  controllers["resourcequota"] = startResourceQuotaController
  controllers["namespace"] = startNamespaceController
  controllers["serviceaccount"] = startServiceAccountController
  controllers["garbagecollector"] = startGarbageCollectorController
  controllers["daemonset"] = startDaemonSetController
  controllers["job"] = startJobController
  controllers["deployment"] = startDeploymentController
  controllers["replicaset"] = startReplicaSetController
  controllers["horizontalpodautoscaling"] = startHPAController
}

从上面可以看出:HPA Controller和其他的Controller一样,都在NewControllerInitializers方法中进行注册,然后通过startHPAController来启动。

根据启动函数,层层调用,最终会走到HorizontalController的run方法

代码语言:javascript复制
// Run begins watching and syncing.
func (a *HorizontalController) Run(ctx context.Context) {
   defer utilruntime.HandleCrash()
   defer a.queue.ShutDown()

   klog.Infof("Starting HPA controller")
   defer klog.Infof("Shutting down HPA controller")

   if !cache.WaitForNamedCacheSync("HPA", ctx.Done(), a.hpaListerSynced, a.podListerSynced) {
      return
   }

   // start a single worker (we may wish to start more in the future)
   go wait.UntilWithContext(ctx, a.worker, time.Second)

   <-ctx.Done()
}

从上面可以看出:

是启动了一个异步线程,每秒执行一次worker方法

下面来看下worker方法,经过层层调用,最终走到下面的主逻辑方法中:

代码语言:javascript复制
func (a *HorizontalController) reconcileAutoscaler(ctx context.Context, hpaShared *autoscalingv2.HorizontalPodAutoscaler, key string) error {
   // make a copy so that we never mutate the shared informer cache (conversion can mutate the object)
   hpa := hpaShared.DeepCopy()
   hpaStatusOriginal := hpa.Status.DeepCopy()

   reference := fmt.Sprintf("%s/%s/%s", hpa.Spec.ScaleTargetRef.Kind, hpa.Namespace, hpa.Spec.ScaleTargetRef.Name)

   targetGV, err := schema.ParseGroupVersion(hpa.Spec.ScaleTargetRef.APIVersion)
   if err != nil {
      a.eventRecorder.Event(hpa, v1.EventTypeWarning, "FailedGetScale", err.Error())
      setCondition(hpa, autoscalingv2.AbleToScale, v1.ConditionFalse, "FailedGetScale", "the HPA controller was unable to get the target's current scale: %v", err)
      a.updateStatusIfNeeded(ctx, hpaStatusOriginal, hpa)
      return fmt.Errorf("invalid API version in scale target reference: %v", err)
   }

   targetGK := schema.GroupKind{
      Group: targetGV.Group,
      Kind: hpa.Spec.ScaleTargetRef.Kind,
   }

   mappings, err := a.mapper.RESTMappings(targetGK)
   if err != nil {
      a.eventRecorder.Event(hpa, v1.EventTypeWarning, "FailedGetScale", err.Error())
      setCondition(hpa, autoscalingv2.AbleToScale, v1.ConditionFalse, "FailedGetScale", "the HPA controller was unable to get the target's current scale: %v", err)
      a.updateStatusIfNeeded(ctx, hpaStatusOriginal, hpa)
      return fmt.Errorf("unable to determine resource for scale target reference: %v", err)
   }

   scale, targetGR, err := a.scaleForResourceMappings(ctx, hpa.Namespace, hpa.Spec.ScaleTargetRef.Name, mappings)
   if err != nil {
      a.eventRecorder.Event(hpa, v1.EventTypeWarning, "FailedGetScale", err.Error())
      setCondition(hpa, autoscalingv2.AbleToScale, v1.ConditionFalse, "FailedGetScale", "the HPA controller was unable to get the target's current scale: %v", err)
      a.updateStatusIfNeeded(ctx, hpaStatusOriginal, hpa)
      return fmt.Errorf("failed to query scale subresource for %s: %v", reference, err)
   }
   setCondition(hpa, autoscalingv2.AbleToScale, v1.ConditionTrue, "SucceededGetScale", "the HPA controller was able to get the target's current scale")
   currentReplicas := scale.Spec.Replicas
   a.recordInitialRecommendation(currentReplicas, key)

   var (
      metricStatuses []autoscalingv2.MetricStatus
      metricDesiredReplicas int32
      metricName string
   )

   desiredReplicas := int32(0)
   rescaleReason := ""

   var minReplicas int32

   if hpa.Spec.MinReplicas != nil {
      minReplicas = *hpa.Spec.MinReplicas
   } else {
      // Default value
      minReplicas = 1
   }

   rescale := true

   if scale.Spec.Replicas == 0 &amp;&amp; minReplicas != 0 {
      // Autoscaling is disabled for this resource
      desiredReplicas = 0
      rescale = false
      setCondition(hpa, autoscalingv2.ScalingActive, v1.ConditionFalse, "ScalingDisabled", "scaling is disabled since the replica count of the target is zero")
   } else if currentReplicas > hpa.Spec.MaxReplicas {
      rescaleReason = "Current number of replicas above Spec.MaxReplicas"
      desiredReplicas = hpa.Spec.MaxReplicas
   } else if currentReplicas < minReplicas {
      rescaleReason = "Current number of replicas below Spec.MinReplicas"
      desiredReplicas = minReplicas
   } else {
      var metricTimestamp time.Time
      metricDesiredReplicas, metricName, metricStatuses, metricTimestamp, err = a.computeReplicasForMetrics(ctx, hpa, scale, hpa.Spec.Metrics)
      if err != nil {
         a.setCurrentReplicasInStatus(hpa, currentReplicas)
         if err := a.updateStatusIfNeeded(ctx, hpaStatusOriginal, hpa); err != nil {
            utilruntime.HandleError(err)
         }
         a.eventRecorder.Event(hpa, v1.EventTypeWarning, "FailedComputeMetricsReplicas", err.Error())
         return fmt.Errorf("failed to compute desired number of replicas based on listed metrics for %s: %v", reference, err)
      }

      klog.V(4).Infof("proposing %v desired replicas (based on %s from %s) for %s", metricDesiredReplicas, metricName, metricTimestamp, reference)

      rescaleMetric := ""
      if metricDesiredReplicas > desiredReplicas {
         desiredReplicas = metricDesiredReplicas
         rescaleMetric = metricName
      }
      if desiredReplicas > currentReplicas {
         rescaleReason = fmt.Sprintf("%s above target", rescaleMetric)
      }
      if desiredReplicas < currentReplicas {
         rescaleReason = "All metrics below target"
      }
      if hpa.Spec.Behavior == nil {
         desiredReplicas = a.normalizeDesiredReplicas(hpa, key, currentReplicas, desiredReplicas, minReplicas)
      } else {
         desiredReplicas = a.normalizeDesiredReplicasWithBehaviors(hpa, key, currentReplicas, desiredReplicas, minReplicas)
      }
      rescale = desiredReplicas != currentReplicas
   }

   if rescale {
      scale.Spec.Replicas = desiredReplicas
      _, err = a.scaleNamespacer.Scales(hpa.Namespace).Update(ctx, targetGR, scale, metav1.UpdateOptions{})
      if err != nil {
         a.eventRecorder.Eventf(hpa, v1.EventTypeWarning, "FailedRescale", "New size: %d; reason: %s; error: %v", desiredReplicas, rescaleReason, err.Error())
         setCondition(hpa, autoscalingv2.AbleToScale, v1.ConditionFalse, "FailedUpdateScale", "the HPA controller was unable to update the target scale: %v", err)
         a.setCurrentReplicasInStatus(hpa, currentReplicas)
         if err := a.updateStatusIfNeeded(ctx, hpaStatusOriginal, hpa); err != nil {
            utilruntime.HandleError(err)
         }
         return fmt.Errorf("failed to rescale %s: %v", reference, err)
      }
      setCondition(hpa, autoscalingv2.AbleToScale, v1.ConditionTrue, "SucceededRescale", "the HPA controller was able to update the target scale to %d", desiredReplicas)
      a.eventRecorder.Eventf(hpa, v1.EventTypeNormal, "SuccessfulRescale", "New size: %d; reason: %s", desiredReplicas, rescaleReason)
      a.storeScaleEvent(hpa.Spec.Behavior, key, currentReplicas, desiredReplicas)
      klog.Infof("Successful rescale of %s, old size: %d, new size: %d, reason: %s",
         hpa.Name, currentReplicas, desiredReplicas, rescaleReason)
   } else {
      klog.V(4).Infof("decided not to scale %s to %v (last scale time was %s)", reference, desiredReplicas, hpa.Status.LastScaleTime)
      desiredReplicas = currentReplicas
   }

   a.setStatus(hpa, currentReplicas, desiredReplicas, metricStatuses, rescale)
   return a.updateStatusIfNeeded(ctx, hpaStatusOriginal, hpa)
}

HPA的主要伸缩流程如下:

1)判断当前Pod数量是否在HPA设定的Pod数量空间中,如果不在,过小返回最小值,过大返回最大值,结束伸缩。

2)判断指标的类型,并向api server发送对应的请求,拿到设定的监控指标。一般来说指标会从下面系列聚合API中获取(metrics.k8s.io,custom.metrics.k8s.io和external.metrics.k8s.io)。其中metrics.k8s.io一般由kubernetes自带的metrics-server来提供,主要是cpu、memory使用率指标。另外两种需要第三方的adapter来提供。custom.metrics.k8s.io提供的自定义指标数据,一般与kubernetes集群有关,比如跟特定的pod相关。external.metrics.k8s.io同样提供自定义指标数据,但一般与kubernetes集群无关,许多知名的第三方监控平台提供了adapter实现上述api(如prometheus),可以将监控和adapter一同部署在kubenetes集群中提供服务。甚至能够替换原来的metrics-server来提供上述三类api指标,达到深度定制监控数据的目标。

3)根据获取的指标,使用相关的算法计算出一个伸缩系数,并乘以当前pod数量以获得期望的pod数量。这里系数是指标的期望值与目前值的比值,如果大于1表示扩容,小于1表示缩容。指数数值有平均值(AverageValue)、平均使用率(Utilization)、裸值(Value)三种类型 每种类型的数值都有对应的算法。注意下面事项:如果系数有小数点,统一进一;系数如果未达到某个容忍值,HPA认为变化太小,会忽略这次变化,容忍值默认为0.1。

  • 这里HPA扩容算法比较保守,如果出现获取不到指标的情况,扩容时算最小值,缩容时算最大值。如果需要计算平均值,出现pod没准备好的情况,我们保守地假设尚未就绪的pods消耗了试题指标的0%,从而进一步降低了伸缩的幅度。
  • 一个HPA支持多个指标的监控,HPA会循环获取所有的指标,并计算期望的pod数量,并从期望结果中获得最大的pod数量作为最终的伸缩的pod数量。一个伸缩对象在k8s中允许对应多个HPA,但是只是k8s不会报错而已,事实上HPA彼此不知道自己监控的是同一个伸缩对象,在这个伸缩对象中的pod会被多个HPA无意义地来回修改pod数量,给系统增加消耗,如果想要指定多个监控指标,可以如上述所说,在一个HPA中添加多个监控指标。
  • 4)检查最终pod数量是否在HPA设定的pod数量范围的区间,如果超过最大值或不足最小值都会修改为最大值或者最小值。然后会向kubernetes发出请求,修改伸缩对象的子对象scale的pod数量,结束一个HPA的检查,获取下一个HPA,完成一个伸缩流程。

下面来看下计算副本的逻辑:

代码语言:javascript复制
func (a *HorizontalController) computeReplicasForMetric(ctx context.Context, hpa *autoscalingv2.HorizontalPodAutoscaler, spec autoscalingv2.MetricSpec,
   specReplicas, statusReplicas int32, selector labels.Selector, status *autoscalingv2.MetricStatus) (replicaCountProposal int32, metricNameProposal string,
   timestampProposal time.Time, condition autoscalingv2.HorizontalPodAutoscalerCondition, err error) {

   switch spec.Type {
   case autoscalingv2.ObjectMetricSourceType:
      metricSelector, err := metav1.LabelSelectorAsSelector(spec.Object.Metric.Selector)
      if err != nil {
         condition := a.getUnableComputeReplicaCountCondition(hpa, "FailedGetObjectMetric", err)
         return 0, "", time.Time{}, condition, fmt.Errorf("failed to get object metric value: %v", err)
      }
      replicaCountProposal, timestampProposal, metricNameProposal, condition, err = a.computeStatusForObjectMetric(specReplicas, statusReplicas, spec, hpa, selector, status, metricSelector)
      if err != nil {
         return 0, "", time.Time{}, condition, fmt.Errorf("failed to get object metric value: %v", err)
      }
   case autoscalingv2.PodsMetricSourceType:
      metricSelector, err := metav1.LabelSelectorAsSelector(spec.Pods.Metric.Selector)
      if err != nil {
         condition := a.getUnableComputeReplicaCountCondition(hpa, "FailedGetPodsMetric", err)
         return 0, "", time.Time{}, condition, fmt.Errorf("failed to get pods metric value: %v", err)
      }
      replicaCountProposal, timestampProposal, metricNameProposal, condition, err = a.computeStatusForPodsMetric(specReplicas, spec, hpa, selector, status, metricSelector)
      if err != nil {
         return 0, "", time.Time{}, condition, fmt.Errorf("failed to get pods metric value: %v", err)
      }
   case autoscalingv2.ResourceMetricSourceType:
      replicaCountProposal, timestampProposal, metricNameProposal, condition, err = a.computeStatusForResourceMetric(ctx, specReplicas, spec, hpa, selector, status)
      if err != nil {
         return 0, "", time.Time{}, condition, fmt.Errorf("failed to get %s resource metric value: %v", spec.Resource.Name, err)
      }
   case autoscalingv2.ContainerResourceMetricSourceType:
      replicaCountProposal, timestampProposal, metricNameProposal, condition, err = a.computeStatusForContainerResourceMetric(ctx, specReplicas, spec, hpa, selector, status)
      if err != nil {
         return 0, "", time.Time{}, condition, fmt.Errorf("failed to get %s container metric value: %v", spec.ContainerResource.Container, err)
      }
   case autoscalingv2.ExternalMetricSourceType:
      replicaCountProposal, timestampProposal, metricNameProposal, condition, err = a.computeStatusForExternalMetric(specReplicas, statusReplicas, spec, hpa, selector, status)
      if err != nil {
         return 0, "", time.Time{}, condition, fmt.Errorf("failed to get %s external metric value: %v", spec.External.Metric.Name, err)
      }
   default:
      errMsg := fmt.Sprintf("unknown metric source type %q", string(spec.Type))
      err = fmt.Errorf(errMsg)
      condition := a.getUnableComputeReplicaCountCondition(hpa, "InvalidMetricSourceType", err)
      return 0, "", time.Time{}, condition, err
   }
   return replicaCountProposal, metricNameProposal, timestampProposal, autoscalingv2.HorizontalPodAutoscalerCondition{}, nil
}

这段代码就是根据hpa yaml文件中不同类型的metrics,调用不同的metrics restclient从对应的apiserver中获取指标,参见图2-1及说明。三种资源类型由统一接口分别调用获取对应指标。

代码语言:javascript复制
// MetricsClient knows how to query a remote interface to retrieve container-level
// resource metrics as well as pod-level arbitrary metrics
type MetricsClient interface {
   // GetResourceMetric gets the given resource metric (and an associated oldest timestamp)
   // for all pods matching the specified selector in the given namespace
   GetResourceMetric(resource v1.ResourceName, namespace string, selector labels.Selector) (PodMetricsInfo, time.Time, error)
   // GetRawMetric gets the given metric (and an associated oldest timestamp)
   // for all pods matching the specified selector in the given namespace
   GetRawMetric(metricName string, namespace string, selector labels.Selector, metricSelector labels.Selector) (PodMetricsInfo, time.Time, error)
   // GetObjectMetric gets the given metric (and an associated timestamp) for the given
   // object in the given namespace
   GetObjectMetric(metricName string, namespace string, objectRef *autoscaling.CrossVersionObjectReference, metricSelector labels.Selector) (int64, time.Time, error)
   // GetExternalMetric gets all the values of a given external metric
   // that match the specified selector.
   GetExternalMetric(metricName string, namespace string, selector labels.Selector) ([]int64, time.Time, error)
}

GetResourceMetric请求的是metrics-server通常获取workload的每个pod cpu/mem指标使用信息。GetRawMetric 、GetObjectMetric 对接的是custom-metrics-server,GetExternalMetric对接的是external-metrics-apiserver,两者分别有对应的adapter方法提供获取指标的标准接口,开发者只需按照自己的逻辑去实现对应接口,以及按照apiservice服务注册的方式进行部署。限于篇幅,在此不对custom-metrics-apiserver开发作详细讲解。有兴趣可参考 http://www.github.com/kubernetes-incubator/custom-metrics-apiserver 、https://github.com/ DirectXMan12/k8s-prometheus-adapter.git。

0 人点赞