简介
kubectl scale
命令可以来实现 Pod 的扩缩容功能,但是这个毕竟是完全手动操作的,要应对线上的各种复杂情况,我们需要能够做到自动化去感知业务,来自动进行扩缩容。为此,Kubernetes 也为我们提供了这样的一个资源对象: Horizontal Pod Autoscaling(Pod 水平自动伸缩)
,简称 HPA
,HPA 通过监控分析一些控制器控制的所有 Pod 的负载变化情况来确定是否需要调整 Pod 的副本数量
HPA(Horizontal Pod Autoscaler)是kubernetes的一种资源对象,能够根据某些指标对在statefulset、replicacontroller、replicaset等集合中的pod数量进行动态伸缩,使运行在上面的服务对指标的变化有一定的自适应能力。
HPA目前支持四种类型的指标,分别是Resource、Object、External、Pods。其中在稳定版本autoscaling/v1只支持对CPU指标的动态伸缩,在测试版本autoscaling/v2beta2中支持memory和自定义指标的动态伸缩,并以annotation的方式工作在autoscaling/v1版本中。 注意:Pod的自动缩放不适用于无法缩放的对象。
设置
可以通过使用kubectl来创建HPA。如通过 kubectl create 命令创建一个 HPA 对象,也可以通过kubectl autoscale来创建 HPA 对象。 例如,命令 kubectl autoscale rs foo --min=2 --max=5 --cpu-percent=80 将会为名 为 foo 的 ReplicationSet 创建一个 HPA 对象, 目标 CPU 使用率为 80%,副本数量配置为 2 到 5 之间。
如果指标变化太频繁,我们也可以使用 --horizontal-pod-autoscaler-downscale-stabilization
指令设置扩缩容延迟时间,表示的是自从上次缩容执行结束后,多久可以再次执行缩容,默认是5m。
配置
代码语言:javascript复制apiVersion: autoscaling/v2beta2
kind: HorizontalPodAutoscaler
metadata:
name: php-apache
namespace: default
spec:
# HPA的伸缩对象描述,HPA会动态修改该对象的pod数量
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: php-apache
# HPA的最小pod数量和最大pod数量
minReplicas: 1
maxReplicas: 10
# 监控的指标数组,支持多种类型的指标共存
metrics:
# Object类型的指标
- type: Object
object:
metric:
# 指标名称
name: requests-per-second
# 监控指标的对象描述,指标数据来源于该对象
describedObject:
apiVersion: networking.k8s.io/v1beta1
kind: Ingress
name: main-route
# Value类型的目标值,Object类型的指标只支持Value和AverageValue类型的目标值
target:
type: Value
value: 10k
# Resource类型的指标
- type: Resource
resource:
name: cpu
# Utilization类型的目标值,Resource类型的指标只支持Utilization和AverageValue类型的目标值
target:
type: Utilization
averageUtilization: 50
# Pods类型的指标
- type: Pods
pods:
metric:
name: packets-per-second
# AverageValue类型的目标值,Pods指标类型下只支持AverageValue类型的目标值
target:
type: AverageValue
averageValue: 1k
# External类型的指标
- type: External
external:
metric:
name: queue_messages_ready
# 该字段与第三方的指标标签相关联
selector:
matchLabels:
env: "stage"
app: "myapp"
# External指标类型下只支持Value和AverageValue类型的目标值
target:
type: AverageValue
averageValue: 30
源码分析
先上结论: HPA在kubernetes中也由一个controller控制,controller会间隔循环HPA,检查每个HPA中监控的指标是否触发伸缩条件,默认的间隔时间为15s。一旦触发伸缩条件,controller会向kubernetes发送请求,修改伸缩对象(statefulSet、replicaController、replicaSet)子对象scale中控制pod数量的字段。kubernetess响应请求,修改scale结构体,然后会刷新一次伸缩对象的pod数量。伸缩对象被修改后,自然会通过list/watch机制增加或减少pod数量,达到动态伸缩的目的。
- 对于每个pod的资源指标(如CPU),控制器从资源指标API中获取每一个HorizontalPodAutoscaler指定的pod的指标,如果设置了目标使用率,控制器会获取每个Pod中的容器资源使用情况,并计算资源使用率。如果使用原始值,将直接使用原始数据,进而计算出目标副本数。这里注意的是,如果Pod某些容器不支持资源采集,那么该控制器将不会使用该pod的CPU使用率。
- 如果pod使用自定义指标,控制器机制与资源指标类型,区别在于自定义的指标只适用原始值,而不是利用率。
- 如果pod使用的对象指标和外部指标(每个指标描述一个对象信息),这个指标将直接跟目标指标设定值相比较,并生成一个上述的缩放比例。在最新的autoscaling/v2beta2版本API中,这个指标也可以根据pod数量平分后再进行计算。通常情况,控制器从一系列的聚合API(metrics.k8s.io,custom.metrics.k8s.io和external.metrics.k8s.io)中获取指标数据。metrics.k8s.io API通常由metrics-server(这里需要额外启动)提供。
下面来看下具体的源码分析 hpa也是由一个controller控制的,因此: 入口:cmd/kube-controller-manager/app/controllermanager.go
代码语言:javascript复制func NewControllerInitializers(loopMode ControllerLoopMode) map[string]InitFunc {
controllers := map[string]InitFunc{}
controllers["endpoint"] = startEndpointController
controllers["endpointslice"] = startEndpointSliceController
controllers["endpointslicemirroring"] = startEndpointSliceMirroringController
controllers["replicationcontroller"] = startReplicationController
controllers["podgc"] = startPodGCController
controllers["resourcequota"] = startResourceQuotaController
controllers["namespace"] = startNamespaceController
controllers["serviceaccount"] = startServiceAccountController
controllers["garbagecollector"] = startGarbageCollectorController
controllers["daemonset"] = startDaemonSetController
controllers["job"] = startJobController
controllers["deployment"] = startDeploymentController
controllers["replicaset"] = startReplicaSetController
controllers["horizontalpodautoscaling"] = startHPAController
}
从上面可以看出:HPA Controller和其他的Controller一样,都在NewControllerInitializers方法中进行注册,然后通过startHPAController来启动。
根据启动函数,层层调用,最终会走到HorizontalController
的run方法
// Run begins watching and syncing.
func (a *HorizontalController) Run(ctx context.Context) {
defer utilruntime.HandleCrash()
defer a.queue.ShutDown()
klog.Infof("Starting HPA controller")
defer klog.Infof("Shutting down HPA controller")
if !cache.WaitForNamedCacheSync("HPA", ctx.Done(), a.hpaListerSynced, a.podListerSynced) {
return
}
// start a single worker (we may wish to start more in the future)
go wait.UntilWithContext(ctx, a.worker, time.Second)
<-ctx.Done()
}
从上面可以看出:
是启动了一个异步线程,每秒执行一次worker方法
下面来看下worker方法,经过层层调用,最终走到下面的主逻辑方法中:
代码语言:javascript复制func (a *HorizontalController) reconcileAutoscaler(ctx context.Context, hpaShared *autoscalingv2.HorizontalPodAutoscaler, key string) error {
// make a copy so that we never mutate the shared informer cache (conversion can mutate the object)
hpa := hpaShared.DeepCopy()
hpaStatusOriginal := hpa.Status.DeepCopy()
reference := fmt.Sprintf("%s/%s/%s", hpa.Spec.ScaleTargetRef.Kind, hpa.Namespace, hpa.Spec.ScaleTargetRef.Name)
targetGV, err := schema.ParseGroupVersion(hpa.Spec.ScaleTargetRef.APIVersion)
if err != nil {
a.eventRecorder.Event(hpa, v1.EventTypeWarning, "FailedGetScale", err.Error())
setCondition(hpa, autoscalingv2.AbleToScale, v1.ConditionFalse, "FailedGetScale", "the HPA controller was unable to get the target's current scale: %v", err)
a.updateStatusIfNeeded(ctx, hpaStatusOriginal, hpa)
return fmt.Errorf("invalid API version in scale target reference: %v", err)
}
targetGK := schema.GroupKind{
Group: targetGV.Group,
Kind: hpa.Spec.ScaleTargetRef.Kind,
}
mappings, err := a.mapper.RESTMappings(targetGK)
if err != nil {
a.eventRecorder.Event(hpa, v1.EventTypeWarning, "FailedGetScale", err.Error())
setCondition(hpa, autoscalingv2.AbleToScale, v1.ConditionFalse, "FailedGetScale", "the HPA controller was unable to get the target's current scale: %v", err)
a.updateStatusIfNeeded(ctx, hpaStatusOriginal, hpa)
return fmt.Errorf("unable to determine resource for scale target reference: %v", err)
}
scale, targetGR, err := a.scaleForResourceMappings(ctx, hpa.Namespace, hpa.Spec.ScaleTargetRef.Name, mappings)
if err != nil {
a.eventRecorder.Event(hpa, v1.EventTypeWarning, "FailedGetScale", err.Error())
setCondition(hpa, autoscalingv2.AbleToScale, v1.ConditionFalse, "FailedGetScale", "the HPA controller was unable to get the target's current scale: %v", err)
a.updateStatusIfNeeded(ctx, hpaStatusOriginal, hpa)
return fmt.Errorf("failed to query scale subresource for %s: %v", reference, err)
}
setCondition(hpa, autoscalingv2.AbleToScale, v1.ConditionTrue, "SucceededGetScale", "the HPA controller was able to get the target's current scale")
currentReplicas := scale.Spec.Replicas
a.recordInitialRecommendation(currentReplicas, key)
var (
metricStatuses []autoscalingv2.MetricStatus
metricDesiredReplicas int32
metricName string
)
desiredReplicas := int32(0)
rescaleReason := ""
var minReplicas int32
if hpa.Spec.MinReplicas != nil {
minReplicas = *hpa.Spec.MinReplicas
} else {
// Default value
minReplicas = 1
}
rescale := true
if scale.Spec.Replicas == 0 && minReplicas != 0 {
// Autoscaling is disabled for this resource
desiredReplicas = 0
rescale = false
setCondition(hpa, autoscalingv2.ScalingActive, v1.ConditionFalse, "ScalingDisabled", "scaling is disabled since the replica count of the target is zero")
} else if currentReplicas > hpa.Spec.MaxReplicas {
rescaleReason = "Current number of replicas above Spec.MaxReplicas"
desiredReplicas = hpa.Spec.MaxReplicas
} else if currentReplicas < minReplicas {
rescaleReason = "Current number of replicas below Spec.MinReplicas"
desiredReplicas = minReplicas
} else {
var metricTimestamp time.Time
metricDesiredReplicas, metricName, metricStatuses, metricTimestamp, err = a.computeReplicasForMetrics(ctx, hpa, scale, hpa.Spec.Metrics)
if err != nil {
a.setCurrentReplicasInStatus(hpa, currentReplicas)
if err := a.updateStatusIfNeeded(ctx, hpaStatusOriginal, hpa); err != nil {
utilruntime.HandleError(err)
}
a.eventRecorder.Event(hpa, v1.EventTypeWarning, "FailedComputeMetricsReplicas", err.Error())
return fmt.Errorf("failed to compute desired number of replicas based on listed metrics for %s: %v", reference, err)
}
klog.V(4).Infof("proposing %v desired replicas (based on %s from %s) for %s", metricDesiredReplicas, metricName, metricTimestamp, reference)
rescaleMetric := ""
if metricDesiredReplicas > desiredReplicas {
desiredReplicas = metricDesiredReplicas
rescaleMetric = metricName
}
if desiredReplicas > currentReplicas {
rescaleReason = fmt.Sprintf("%s above target", rescaleMetric)
}
if desiredReplicas < currentReplicas {
rescaleReason = "All metrics below target"
}
if hpa.Spec.Behavior == nil {
desiredReplicas = a.normalizeDesiredReplicas(hpa, key, currentReplicas, desiredReplicas, minReplicas)
} else {
desiredReplicas = a.normalizeDesiredReplicasWithBehaviors(hpa, key, currentReplicas, desiredReplicas, minReplicas)
}
rescale = desiredReplicas != currentReplicas
}
if rescale {
scale.Spec.Replicas = desiredReplicas
_, err = a.scaleNamespacer.Scales(hpa.Namespace).Update(ctx, targetGR, scale, metav1.UpdateOptions{})
if err != nil {
a.eventRecorder.Eventf(hpa, v1.EventTypeWarning, "FailedRescale", "New size: %d; reason: %s; error: %v", desiredReplicas, rescaleReason, err.Error())
setCondition(hpa, autoscalingv2.AbleToScale, v1.ConditionFalse, "FailedUpdateScale", "the HPA controller was unable to update the target scale: %v", err)
a.setCurrentReplicasInStatus(hpa, currentReplicas)
if err := a.updateStatusIfNeeded(ctx, hpaStatusOriginal, hpa); err != nil {
utilruntime.HandleError(err)
}
return fmt.Errorf("failed to rescale %s: %v", reference, err)
}
setCondition(hpa, autoscalingv2.AbleToScale, v1.ConditionTrue, "SucceededRescale", "the HPA controller was able to update the target scale to %d", desiredReplicas)
a.eventRecorder.Eventf(hpa, v1.EventTypeNormal, "SuccessfulRescale", "New size: %d; reason: %s", desiredReplicas, rescaleReason)
a.storeScaleEvent(hpa.Spec.Behavior, key, currentReplicas, desiredReplicas)
klog.Infof("Successful rescale of %s, old size: %d, new size: %d, reason: %s",
hpa.Name, currentReplicas, desiredReplicas, rescaleReason)
} else {
klog.V(4).Infof("decided not to scale %s to %v (last scale time was %s)", reference, desiredReplicas, hpa.Status.LastScaleTime)
desiredReplicas = currentReplicas
}
a.setStatus(hpa, currentReplicas, desiredReplicas, metricStatuses, rescale)
return a.updateStatusIfNeeded(ctx, hpaStatusOriginal, hpa)
}
HPA的主要伸缩流程如下:
1)判断当前Pod数量是否在HPA设定的Pod数量空间中,如果不在,过小返回最小值,过大返回最大值,结束伸缩。
2)判断指标的类型,并向api server发送对应的请求,拿到设定的监控指标。一般来说指标会从下面系列聚合API中获取(metrics.k8s.io,custom.metrics.k8s.io和external.metrics.k8s.io)。其中metrics.k8s.io一般由kubernetes自带的metrics-server来提供,主要是cpu、memory使用率指标。另外两种需要第三方的adapter来提供。custom.metrics.k8s.io提供的自定义指标数据,一般与kubernetes集群有关,比如跟特定的pod相关。external.metrics.k8s.io同样提供自定义指标数据,但一般与kubernetes集群无关,许多知名的第三方监控平台提供了adapter实现上述api(如prometheus),可以将监控和adapter一同部署在kubenetes集群中提供服务。甚至能够替换原来的metrics-server来提供上述三类api指标,达到深度定制监控数据的目标。
3)根据获取的指标,使用相关的算法计算出一个伸缩系数,并乘以当前pod数量以获得期望的pod数量。这里系数是指标的期望值与目前值的比值,如果大于1表示扩容,小于1表示缩容。指数数值有平均值(AverageValue)、平均使用率(Utilization)、裸值(Value)三种类型 每种类型的数值都有对应的算法。注意下面事项:如果系数有小数点,统一进一;系数如果未达到某个容忍值,HPA认为变化太小,会忽略这次变化,容忍值默认为0.1。
- 这里HPA扩容算法比较保守,如果出现获取不到指标的情况,扩容时算最小值,缩容时算最大值。如果需要计算平均值,出现pod没准备好的情况,我们保守地假设尚未就绪的pods消耗了试题指标的0%,从而进一步降低了伸缩的幅度。
- 一个HPA支持多个指标的监控,HPA会循环获取所有的指标,并计算期望的pod数量,并从期望结果中获得最大的pod数量作为最终的伸缩的pod数量。一个伸缩对象在k8s中允许对应多个HPA,但是只是k8s不会报错而已,事实上HPA彼此不知道自己监控的是同一个伸缩对象,在这个伸缩对象中的pod会被多个HPA无意义地来回修改pod数量,给系统增加消耗,如果想要指定多个监控指标,可以如上述所说,在一个HPA中添加多个监控指标。
- 4)检查最终pod数量是否在HPA设定的pod数量范围的区间,如果超过最大值或不足最小值都会修改为最大值或者最小值。然后会向kubernetes发出请求,修改伸缩对象的子对象scale的pod数量,结束一个HPA的检查,获取下一个HPA,完成一个伸缩流程。
下面来看下计算副本的逻辑:
代码语言:javascript复制func (a *HorizontalController) computeReplicasForMetric(ctx context.Context, hpa *autoscalingv2.HorizontalPodAutoscaler, spec autoscalingv2.MetricSpec,
specReplicas, statusReplicas int32, selector labels.Selector, status *autoscalingv2.MetricStatus) (replicaCountProposal int32, metricNameProposal string,
timestampProposal time.Time, condition autoscalingv2.HorizontalPodAutoscalerCondition, err error) {
switch spec.Type {
case autoscalingv2.ObjectMetricSourceType:
metricSelector, err := metav1.LabelSelectorAsSelector(spec.Object.Metric.Selector)
if err != nil {
condition := a.getUnableComputeReplicaCountCondition(hpa, "FailedGetObjectMetric", err)
return 0, "", time.Time{}, condition, fmt.Errorf("failed to get object metric value: %v", err)
}
replicaCountProposal, timestampProposal, metricNameProposal, condition, err = a.computeStatusForObjectMetric(specReplicas, statusReplicas, spec, hpa, selector, status, metricSelector)
if err != nil {
return 0, "", time.Time{}, condition, fmt.Errorf("failed to get object metric value: %v", err)
}
case autoscalingv2.PodsMetricSourceType:
metricSelector, err := metav1.LabelSelectorAsSelector(spec.Pods.Metric.Selector)
if err != nil {
condition := a.getUnableComputeReplicaCountCondition(hpa, "FailedGetPodsMetric", err)
return 0, "", time.Time{}, condition, fmt.Errorf("failed to get pods metric value: %v", err)
}
replicaCountProposal, timestampProposal, metricNameProposal, condition, err = a.computeStatusForPodsMetric(specReplicas, spec, hpa, selector, status, metricSelector)
if err != nil {
return 0, "", time.Time{}, condition, fmt.Errorf("failed to get pods metric value: %v", err)
}
case autoscalingv2.ResourceMetricSourceType:
replicaCountProposal, timestampProposal, metricNameProposal, condition, err = a.computeStatusForResourceMetric(ctx, specReplicas, spec, hpa, selector, status)
if err != nil {
return 0, "", time.Time{}, condition, fmt.Errorf("failed to get %s resource metric value: %v", spec.Resource.Name, err)
}
case autoscalingv2.ContainerResourceMetricSourceType:
replicaCountProposal, timestampProposal, metricNameProposal, condition, err = a.computeStatusForContainerResourceMetric(ctx, specReplicas, spec, hpa, selector, status)
if err != nil {
return 0, "", time.Time{}, condition, fmt.Errorf("failed to get %s container metric value: %v", spec.ContainerResource.Container, err)
}
case autoscalingv2.ExternalMetricSourceType:
replicaCountProposal, timestampProposal, metricNameProposal, condition, err = a.computeStatusForExternalMetric(specReplicas, statusReplicas, spec, hpa, selector, status)
if err != nil {
return 0, "", time.Time{}, condition, fmt.Errorf("failed to get %s external metric value: %v", spec.External.Metric.Name, err)
}
default:
errMsg := fmt.Sprintf("unknown metric source type %q", string(spec.Type))
err = fmt.Errorf(errMsg)
condition := a.getUnableComputeReplicaCountCondition(hpa, "InvalidMetricSourceType", err)
return 0, "", time.Time{}, condition, err
}
return replicaCountProposal, metricNameProposal, timestampProposal, autoscalingv2.HorizontalPodAutoscalerCondition{}, nil
}
这段代码就是根据hpa yaml文件中不同类型的metrics,调用不同的metrics restclient从对应的apiserver中获取指标,参见图2-1及说明。三种资源类型由统一接口分别调用获取对应指标。
代码语言:javascript复制// MetricsClient knows how to query a remote interface to retrieve container-level
// resource metrics as well as pod-level arbitrary metrics
type MetricsClient interface {
// GetResourceMetric gets the given resource metric (and an associated oldest timestamp)
// for all pods matching the specified selector in the given namespace
GetResourceMetric(resource v1.ResourceName, namespace string, selector labels.Selector) (PodMetricsInfo, time.Time, error)
// GetRawMetric gets the given metric (and an associated oldest timestamp)
// for all pods matching the specified selector in the given namespace
GetRawMetric(metricName string, namespace string, selector labels.Selector, metricSelector labels.Selector) (PodMetricsInfo, time.Time, error)
// GetObjectMetric gets the given metric (and an associated timestamp) for the given
// object in the given namespace
GetObjectMetric(metricName string, namespace string, objectRef *autoscaling.CrossVersionObjectReference, metricSelector labels.Selector) (int64, time.Time, error)
// GetExternalMetric gets all the values of a given external metric
// that match the specified selector.
GetExternalMetric(metricName string, namespace string, selector labels.Selector) ([]int64, time.Time, error)
}
GetResourceMetric请求的是metrics-server通常获取workload的每个pod cpu/mem指标使用信息。GetRawMetric 、GetObjectMetric 对接的是custom-metrics-server,GetExternalMetric对接的是external-metrics-apiserver,两者分别有对应的adapter方法提供获取指标的标准接口,开发者只需按照自己的逻辑去实现对应接口,以及按照apiservice服务注册的方式进行部署。限于篇幅,在此不对custom-metrics-apiserver开发作详细讲解。有兴趣可参考 http://www.github.com/kubernetes-incubator/custom-metrics-apiserver 、https://github.com/ DirectXMan12/k8s-prometheus-adapter.git。