K8S-Node自动扩容项目CA源码分析(上)

2022-07-02 21:58:09 浏览数 (1)

一、概述

上一篇文章介绍了 k8s 自动扩缩容的三种方式:HPA、VPA、CA,以及各自的使用场景和架构。本文针对 CA 做源码分析。

1.1 CA架构回顾

参考

CA由一下几个模块组成:

  • autoscaler:核心模块,负责整体扩缩容功能
  • Estimator:负责评估计算扩容节点
  • Simulator:负责模拟调度,计算缩容节点
  • Cloud Provider:与云上 IaaS 层交互,负责增删改查节点。云厂商需要实现相关接口。
caca

1.2 仓库代码结构

源码地址

CA 代码在 k8s 官方的 autoscaler 仓库下,该仓库存放自动扩缩容相关组件,包括前文介绍的 VPA、今天的主角CA、还有一个VPA修改pod资源的插件 Addon Resizer。使用的版本是 cluster-autoscaler-release-1.21,目录结构如下

代码语言:shell复制
.
├── CONTRIBUTING.md
├── LICENSE
├── OWNERS
├── README.md
├── SECURITY_CONTACTS
├── addon-resizer       # addon-resizer 代码
├── builder
├── charts
├── cluster-autoscaler  # CA 代码
├── code-of-conduct.md
├── hack
└── vertical-pod-autoscaler # VPA 代码

1.3 CA 代码结构

代码语言:shell复制
.
├── FAQ.md															# FAQ,里面有很多关于 CA 原理和使用的说明
├── cloudprovider												# cloud provider模块,包括接口和各个云厂商的实现
│   ├── alicloud
│   ├── aws
│   ├── azure
│   ├── baiducloud
│   ├── builder
│   ├── cloud_provider.go								# cloud provider 接口,要对接自己的云,需要实现该接口操作 IaaS 资源
│   ├── gce
│   ├── huaweicloud
├── core                                # CA 核心模块
│   ├── autoscaler.go										# 定义 Autoscaler 接口
│   ├── equivalence_groups.go						# 资源不足的 pod 按扩容类型分类的处理逻辑
│   ├── filter_out_schedulable.go
│   ├── scale_down.go     							# 缩容
│   ├── scale_up.go											# 扩容
│   ├── static_autoscaler.go					  # Autoscaler 的实现类
│   └── utils
├── estimator														# Estimator 模块,评估扩容节点
│   ├── binpacking_estimator.go					# Estimator 实现类,实现首次适应背包算法(装箱算法)
│   └── estimator.go										# 定义 Estimator 接口
├── expander				
│   ├── expander.go											# expander 定义了选择多个符合条件的 NodeGroup 的策略接口
│   ├── factory													# 根据传入的不同策略名称,创建对应的实现类
│   ├── mostpods												# mostpods 策略:调度最多的 pod
│   ├── price														# price 策略:价格最低
│   ├── priority												# priority 策略:根据 NodeGroup 的优先级选择
│   ├── random													# random 策略:随机选择符合条件的 NodeGroup 中的一个
│   └── waste														# waste 策略:资源利用率最高
├── go.mod
├── go.sum															
├── main.go															# main 方法
├── metrics															# 指标采集
├── processors
│   ├── callbacks
│   ├── customresources
│   ├── nodegroupconfig
│   ├── nodegroups
│   ├── nodegroupset
│   ├── nodeinfos
│   ├── nodes
│   ├── pods
│   ├── processors.go
│   └── status
├── proposals													# 提案,设计文档信息
│   ├── balance_similar.md
│   ├── circumvent-tag-limit-aws.md
│   ├── clusterstate.md
│   ├── images
│   ├── kubemark_integration.md
│   ├── metrics.md
│   ├── min_at_zero_gcp.md
│   ├── node_autoprovisioning.md
│   ├── plugable-provider-grpc.md
│   ├── pricing.md
├── simulator														# 模拟调度模块,主要用于缩容
│   ├── basic_cluster_snapshot.go
│   ├── cluster.go
│   ├── cluster_snapshot.go
│   ├── delegating_shared_lister.go
│   ├── delta_cluster_snapshot.go
│   ├── drain.go
│   ├── nodes.go
│   ├── nodes_test.go
│   ├── predicate_error.go
│   ├── predicates_checker_interface.go
│   ├── scheduler_based_predicates_checker.go
│   ├── test_predicates_checker.go
│   ├── test_utils.go
│   ├── tracker.go

二、CloudProvider 模块

2.1 CloudProvider 接口

包含配置信息、与云厂商交互的方法。核心方法有:

  • Name():提供唯一的名称
  • Refresh():刷新云厂商资源信息
  • NodeGroups():获取所有的节点组
  • NodeGroupForNode(...):获取某个节点所属的节点组
代码语言:go复制
type CloudProvider interface {
  // cloud provider 名称
	Name() string
	// 返回 cloud privider 配置的所有 node group
	NodeGroups() []NodeGroup
	// 返回给定 Node 节点所属的 NodeGroup
  // 如果节点不应该被 autoscaler 处理,应该返回 nil
	NodeGroupForNode(*apiv1.Node) (NodeGroup, error)
  // 可选实现,价格模型
	Pricing() (PricingModel, errors.AutoscalerError)
  // 可选实现,获取 cloud provider 支持的所有机器型号
	GetAvailableMachineTypes() ([]string, error)
  // 基于定义的 node,构建理论的 node group
  // 阻塞方法,直到 node group 创建出来
  // 可选实现
	NewNodeGroup(machineType string, labels map[string]string, systemLabels map[string]string,
		taints []apiv1.Taint, extraResources map[string]resource.Quantity) (NodeGroup, error)
	// 返回结构化资源限额
  GetResourceLimiter() (*ResourceLimiter, error)
	// 返回添加到 Node 上的 GPU 资源标签
	GPULabel() string
	// 返回所有可用的 GPU 类型
	GetAvailableGPUTypes() map[string]struct{}
	// 清理工作,比如:go 协程
	Cleanup() error
	// 在每次主循环之前调用,并且用于动态更新 cloud provider 状态
  // 尤其是由 NodeGroups 改变,导致返回一个 node group 列表
	Refresh() error
}

2.2 NodeGroup 节点组

NodeGroup提供相关接口,操作具有相同容量和标签的一组节点。核心方法有:

  • MaxSize():节点组允许的最大扩容数量
  • MinSize():节点组允许的最小缩容数量
  • TargetSize():节点组当前数量
  • IncreaseSize(delta int):新增 delta 个节点的方法
  • DecreaseTargetSize(delta int):减少节点的方法
  • DeleteNodes(...):删除实例的方法
  • TemplateNodeInfo():

包含配置信息和方法控制

代码语言:go复制
type NodeGroup interface {
  // 返回 node group 的最大数量
	MaxSize() int
	// 返回 node group 的最小数量
	MinSize() int
  // 必须实现该方法。
  // 返回当前目标数量,必须实现该方法
  // 有可能 k8s 节点数量和这个值不相等,但是一旦一切稳定(node完成启动和注册、节点彻底删除)就应该等于 Size()
	TargetSize() (int, error)
  // 必须实现该方法。
  // 增加 node group 的数量,为了删除节点你需要显示指定名称并调用 DeleteNode 方法
  // 该方法会阻塞知道 node group 数量更新
	IncreaseSize(delta int) error
  // 必须实现该方法。
  // 从 node group 中删除节点。
  // 如果失败或者 node 不属于 node group 将会报错。
  // 该方法会阻塞知道 node group 数量更新
	DeleteNodes([]*apiv1.Node) error
  // 从 Node group 中减少目标数量
  // 该方法不允许删除任何节点,仅仅用于减少没有完全填充的新节点
  // 参数 delta 必须是负数,假定 cloud provider 在调整目标数量时,将不会删除存在的节点
	DecreaseTargetSize(delta int) error
	// 返回 node group 唯一标识
	Id() string
	// 返回调试信息
	Debug() string
  // 返回所有属于 node group 的节点列表
  // Instance 对象必须包含 id 字段,其他字段值可选
  // 列表也包含不会成为 k8s 的那些节点
	Nodes() ([]Instance, error)

	// 可选实现
  // 返回包含空 node 新的的调度结构体,将被用于扩容仿真,以预测一个新的扩容节点是什么样的
  // 返回的 NodeInfo 信息包含完整的 Node 对象信N,包括:标签、容量、能分配的 pod 信息(类似kube-proxy)
	TemplateNodeInfo() (*schedulerframework.NodeInfo, error)
  // 必须实现。返回 node group 是否存在
	Exist() bool
	// 可选实现。创建 node group
	Create() (NodeGroup, error)
  // 可选实现,删除 node group
	Delete() error
  // 是否支持自动供应
	Autoprovisioned() bool
  // 可选实现,返回配置参数
	GetOptions(defaults config.NodeGroupAutoscalingOptions) (*config.NodeGroupAutoscalingOptions, error)
}

2.3 CloudProvider 实现的厂商

大部分云厂商都实现了该接口,参考

  • 国外的:亚马逊AWS、谷歌GCE、微软Azure
  • 国内的:阿里云、华为云、腾讯云、百度云
  • 其他:......

2.4 AWS 接口实现

以 aws 为例分析实现实现逻辑,代码结构如下

代码语言:shell复制
.
├── auto_scaling.go
├── auto_scaling_groups.go	 # 获取 asg 信息,保存在 asgCache 缓存中
├── aws_cloud_provider.go		 # awsCloudProvider 实现 CloudProvider 接口里的方法
├── aws_manager.go					 # 根据账号密码,构建 awsManager 对象来操作 aws 资源
├── aws_util.go							 # 获取机型、可用区等信息
├── ec2.go
├── ec2_instance_types			
│   └── gen.go
├── ec2_instance_types.go		 # 默认机型列表
└── examples
    ├── cluster-autoscaler-autodiscover.yaml	# 自动发现模式部署 ca
    ├── cluster-autoscaler-multi-asg.yaml			# 多 asg 模式部署 ca
    ├── cluster-autoscaler-one-asg.yaml				# 单 asg 模式部署 ca
    └── cluster-autoscaler-run-on-control-plane.yaml
2.4.1 Name

返回 provider 的名称 aws

代码语言:go复制
// AwsProviderName = "aws"
func (aws *awsCloudProvider) Name() string {
   return cloudprovider.AwsProviderName
}
2.4.2 Refresh

调用链:CloudProvider -> AwsManager -> asgCache

refresh 的功能是获取 aws 中的 asg,以及模板、实例等信息保存到缓存中

代码语言:go复制
func (aws *awsCloudProvider) Refresh() error {
	// 调用 awsManager 的 Refresh 方法
  return aws.awsManager.Refresh()
}

func (m *AwsManager) Refresh() error {
	...
  // 调用 forceRefresh
	return m.forceRefresh()
}

func (m *AwsManager) forceRefresh() error {
  // 调用 asgCache 的 regenerate
	if err := m.asgCache.regenerate(); err != nil {
		...
	}
	...
	return nil
}

func (m *asgCache) regenerate() error {
	...
	newInstanceToAsgCache := make(map[AwsInstanceRef]*asg)
	newAsgToInstancesCache := make(map[AwsRef][]AwsInstanceRef)

	// Build list of knowns ASG names
  // 获取所有的 asg name
	refreshNames, err := m.buildAsgNames()

  // 根据 asg name 获取 asg 对象
  // 调用 aws-sdk-go 中 AutoScaling 的 DescribeAutoScalingGroupsPages
	groups, err := m.service.getAutoscalingGroupsByNames(refreshNames)
	// 填充 asg 启动配置的实例模板
	err = m.service.populateLaunchConfigurationInstanceTypeCache(groups)
	if err != nil {
		klog.Warningf("Failed to fully populate all launchConfigurations: %v", err)
	}

	// If currently any ASG has more Desired than running Instances, introduce placeholders
	// for the instances to come up. This is required to track Desired instances that
	// will never come up, like with Spot Request that can't be fulfilled
	groups = m.createPlaceholdersForDesiredNonStartedInstances(groups)

	// Register or update ASGs
	exists := make(map[AwsRef]bool)
	for _, group := range groups {
		asg, err := m.buildAsgFromAWS(group)
		if err != nil {
			return err
		}
		exists[asg.AwsRef] = true
		// 注册 asg
		asg = m.register(asg)

		newAsgToInstancesCache[asg.AwsRef] = make([]AwsInstanceRef, len(group.Instances))

    // 将 group 中所有的实例信息保存到缓存中
		for i, instance := range group.Instances {
      // 根据 group 的 instance 信息构造 instance
			ref := m.buildInstanceRefFromAWS(instance)
			newInstanceToAsgCache[ref] = asg
			newAsgToInstancesCache[asg.AwsRef][i] = ref
		}
	}

  // 注销不存在的 asg
	for _, asg := range m.registeredAsgs {
		if !exists[asg.AwsRef] && !m.explicitlyConfigured[asg.AwsRef] {
			m.unregister(asg)
		}
	}

  // 生成 asg -> instance 的缓存
	m.asgToInstances = newAsgToInstancesCache
  // 生成 instance -> asg 的缓存
	m.instanceToAsg = newInstanceToAsgCache
	return nil
}
2.4.3 NodeGroups

调用 awsManager 获取所有的 asg

代码语言:go复制
func (aws *awsCloudProvider) NodeGroups() []cloudprovider.NodeGroup {
  // 调用 awsManager 获取所有的 asg 
  asgs := aws.awsManager.getAsgs()
   ngs := make([]cloudprovider.NodeGroup, len(asgs))
   for i, asg := range asgs {
      ngs[i] = &AwsNodeGroup{
         asg:        asg,
         awsManager: aws.awsManager,
      }
   }

   return ngs
}
2.4.4 NodeGroupForNode
  • 从 Node.Spec.ProviderID 中获取 id
  • 调用 awsManager 获取 id 对应的 asg
代码语言:go复制
func (aws *awsCloudProvider) NodeGroupForNode(node *apiv1.Node) (cloudprovider.NodeGroup, error) {
   if len(node.Spec.ProviderID) == 0 {
      klog.Warningf("Node %v has no providerId", node.Name)
      return nil, nil
   }
   // 从 Node.Spec.ProviderID 中获取  id
   ref, err := AwsRefFromProviderId(node.Spec.ProviderID)
   if err != nil {
      return nil, err
   }
   // 获取 asg
   asg := aws.awsManager.GetAsgForInstance(*ref)

   if asg == nil {
      return nil, nil
   }

   return &AwsNodeGroup{
      asg:        asg,
      awsManager: aws.awsManager,
   }, nil
}
2.4.5 IncreaseSize

调用链:CloudProvider -> AwsManager -> asgCache -> aws-sdk-go

通过传入操作 aws asg 的参数,调用 aws-sdk-go 的 asg 接口,实现新增节点的效果

代码语言:go复制
func (ng *AwsNodeGroup) IncreaseSize(delta int) error {
  // 增量不能小于 0
	if delta <= 0 {
		return fmt.Errorf("size increase must be positive")
	}
	size := ng.asg.curSize
  // 增加后不能超过最大值
	if size delta > ng.asg.maxSize {
		return fmt.Errorf("size increase too large - desired:%d max:%d", size delta, ng.asg.maxSize)
	}
  // 调用 awsManager 设置 size 为当前 size   delta
	return ng.awsManager.SetAsgSize(ng.asg, size delta)
}

// SetAsgSize sets ASG size.
func (m *AwsManager) SetAsgSize(asg *asg, size int) error {
	return m.asgCache.SetAsgSize(asg, size)
}

// 加锁操作
func (m *asgCache) SetAsgSize(asg *asg, size int) error {
	m.mutex.Lock()
	defer m.mutex.Unlock()

	return m.setAsgSizeNoLock(asg, size)
}

func (m *asgCache) setAsgSizeNoLock(asg *asg, size int) error {
  // 拼接参数:name、size、honorCooldown
	params := &autoscaling.SetDesiredCapacityInput{
		AutoScalingGroupName: aws.String(asg.Name),
		DesiredCapacity:      aws.Int64(int64(size)),
		HonorCooldown:        aws.Bool(false),
	}
	klog.V(0).Infof("Setting asg %s size to %d", asg.Name, size)
  // 调用 aws-sdk-go 操作 ASG 的 AutoScaling 接口完成操作
	_, err := m.service.SetDesiredCapacity(params)
	if err != nil {
		return err
	}

	// Proactively set the ASG size so autoscaler makes better decisions
	asg.curSize = size

	return nil
}
2.4.6 DecreaseTargetSize

执行代码同 IncreaseSize,不同的仅仅是参数 delta是负数。

代码语言:go复制
func (ng *AwsNodeGroup) DecreaseTargetSize(delta int) error {
	// 增量必须为负数
  if delta >= 0 {
		return fmt.Errorf("size decrease size must be negative")
	}

	size := ng.asg.curSize
  // 查询目前 ASG 的节点数
	nodes, err := ng.awsManager.GetAsgNodes(ng.asg.AwsRef)
	if err != nil {
		return err
	}
  // 删除的数量不能超过已有数量
	if int(size) delta < len(nodes) {
		return fmt.Errorf("attempt to delete existing nodes targetSize:%d delta:%d existingNodes: %d",
			size, delta, len(nodes))
	}
  // 方法同 IncreaseSize 中一样
	return ng.awsManager.SetAsgSize(ng.asg, size delta)
}
2.4.7 DeleteNodes

调用链:CloudProvider -> AwsManager -> aws-sdk-go

通过 Node.Spec.ProviderID 拿到实例唯一 id,调用 SDK 时传入 id,执行删除操作

代码语言:go复制
func (ng *AwsNodeGroup) DeleteNodes(nodes []*apiv1.Node) error {
	size := ng.asg.curSize
	if int(size) <= ng.MinSize() {
		return fmt.Errorf("min size reached, nodes will not be deleted")
	}
	refs := make([]*AwsInstanceRef, 0, len(nodes))
	for _, node := range nodes {
    // 判断待删除 Node 是否是当前 ASG
		belongs, err := ng.Belongs(node)
		if err != nil {
			return err
		}
		if belongs != true {
			return fmt.Errorf("%s belongs to a different asg than %s", node.Name, ng.Id())
		}
    // 获取 Node 的 aws 唯一凭证信息
    // providerID 保存在 node.Spec.ProviderID 字段中
		awsref, err := AwsRefFromProviderId(node.Spec.ProviderID)
		if err != nil {
			return err
		}
		refs = append(refs, awsref)
	}
  // 调用 AwsManager 的删除实例方法
	return ng.awsManager.DeleteInstances(refs)
}

// providerID 的格式是:aws:///zone/name
func AwsRefFromProviderId(id string) (*AwsInstanceRef, error) {
	if validAwsRefIdRegex.FindStringSubmatch(id) == nil {
		return nil, fmt.Errorf("wrong id: expected format aws:///<zone>/<name>, got %v", id)
	}
	splitted := strings.Split(id[7:], "/")
	return &AwsInstanceRef{
		ProviderID: id,
		Name:       splitted[1],
	}, nil
}
2.4.8 TemplateNodeInfo
  • getAsgTemplate:获取 template
代码语言:go复制
func (ng *AwsNodeGroup) TemplateNodeInfo() (*schedulerframework.NodeInfo, error) {
	// 获取 asg 的模板信息
  template, err := ng.awsManager.getAsgTemplate(ng.asg)
	
	// 通过模板构造 Node 对象
	node, err := ng.awsManager.buildNodeFromTemplate(ng.asg, template)
	// 通过调度框架接口构造调度对象,用于 CA 模拟调度
	nodeInfo := schedulerframework.NewNodeInfo(cloudprovider.BuildKubeProxy(ng.asg.Name))
	nodeInfo.SetNode(node)
	return nodeInfo, nil
}

2.5 AwsManager 实现

通过前面的分析发现,aws接口实现中和底层IaaS操作的很多逻辑都封装到了 AwsManager 中,这里专门针对 AwsManager做分析。

2.5.1 getAsgTemplate

获取 asg 中第一个可用区的模板信息

代码语言:go复制
func (m *AwsManager) getAsgTemplate(asg *asg) (*asgTemplate, error) {
	// 判断是否有可用区信息
  if len(asg.AvailabilityZones) < 1 {
		return nil, fmt.Errorf("unable to get first AvailabilityZone for ASG %q", asg.Name)
	}

  // asg可配置多个az信息, 默认选择 asg 中第一个可用 az
	az := asg.AvailabilityZones[0]
	region := az[0 : len(az)-1]

	if len(asg.AvailabilityZones) > 1 {
		klog.V(4).Infof("Found multiple availability zones for ASG %q; using %s for %s labeln", asg.Name, az, apiv1.LabelFailureDomainBetaZone)
	}
	// 获取可用机型,通过调用 aws-sdk-go 获取
	instanceTypeName, err := m.buildInstanceType(asg)
	
	if t, ok := m.instanceTypes[instanceTypeName]; ok {
		return &asgTemplate{
			InstanceType: t,
			Region:       region,
			Zone:         az,
			Tags:         asg.Tags,
		}, nil
	}
	return nil, fmt.Errorf("ASG %q uses the unknown EC2 instance type %q", asg.Name, instanceTypeName)
}
2.5.2 buildNodeFromTemplate

根据 template 构造 node 对象,预调度就是通过虚拟出来的 Node 对象,传给调度框架来实现。

Node 数据的来源包括:

  • asg 模板:提供node 的 cpu、memory、机型、az等信息
  • asg tag:提供node 的 taint、label 信息
代码语言:go复制
func (m *AwsManager) buildNodeFromTemplate(asg *asg, template *asgTemplate) (*apiv1.Node, error) {
	node := apiv1.Node{}
  // 生成随机字符串,拼接上 {asgName}-asg-{rand.Int63} 作为主机名
	nodeName := fmt.Sprintf("%s-asg-%d", asg.Name, rand.Int63())

	node.ObjectMeta = metav1.ObjectMeta{
		Name:     nodeName,
		SelfLink: fmt.Sprintf("/api/v1/nodes/%s", nodeName),
		Labels:   map[string]string{},
	}
	/
	node.Status = apiv1.NodeStatus{
		Capacity: apiv1.ResourceList{},
	}

  // 将 asg 返回的机器规格信息填充到 node.Status 中,便于后续调度
	node.Status.Capacity[apiv1.ResourcePods] = *resource.NewQuantity(110, resource.DecimalSI)
  // 构造 node 的 cpu 信息
	node.Status.Capacity[apiv1.ResourceCPU] = *resource.NewQuantity(template.InstanceType.VCPU, resource.DecimalSI)
  // 构造 node 的 gpu 信息
	node.Status.Capacity[gpu.ResourceNvidiaGPU] = *resource.NewQuantity(template.InstanceType.GPU, resource.DecimalSI)
  // 构造 node 的 memory 信息
  // asg 的实例类型的 MemroyMb * 1024 * 1024 作为 node 的 memory
	node.Status.Capacity[apiv1.ResourceMemory] = *resource.NewQuantity(template.InstanceType.MemoryMb*1024*1024, resource.DecimalSI)

	resourcesFromTags := extractAllocatableResourcesFromAsg(template.Tags)
	for resourceName, val := range resourcesFromTags {
		node.Status.Capacity[apiv1.ResourceName(resourceName)] = *val
	}

	// TODO: use proper allocatable!!
	node.Status.Allocatable = node.Status.Capacity

	// 生成 node 的 generic 信息,填充到 label
  // - "kubernetes.io/arch":asg instance 的机型
  // - "kubernetes.io/os":linux
	// - "node.kubernetes.io/instance-type"
	// - "topology.kubernetes.io/region"
  // - "topology.kubernetes.io/zone"
	// - "kubernetes.io/hostname"
	node.Labels = cloudprovider.JoinStringMaps(node.Labels, buildGenericLabels(template, nodeName))

  // 填充 node.Label 信息
	// 读取所有的 k8s.io/cluster-autoscaler/node-template/label/ 前缀的 tags
	node.Labels = cloudprovider.JoinStringMaps(node.Labels, extractLabelsFromAsg(template.Tags))
	// 填充 node.Spec.Taints 信息
  // 读取所有的 k8s.io/cluster-autoscaler/node-template/taint/ 前缀
  // 且满足正则 "(.*):(?:NoSchedule|NoExecute|PreferNoSchedule)" 的 tags
	node.Spec.Taints = extractTaintsFromAsg(template.Tags)
	// 填充 node.Status.Conditions
	node.Status.Conditions = cloudprovider.BuildReadyConditions()
	return &node, nil
}

2.6 asgCache

asgCache用于缓存 aws 中所有的 asg 信息

代码语言:go复制
// asgCache 保存 aws 当前所有的 asg 缓存信息
type asgCache struct {
  // 所有的 asg 列表
	registeredAsgs []*asg
  // asg -> instance 的映射
	asgToInstances map[AwsRef][]AwsInstanceRef
  // instance -> asg 的映射
	instanceToAsg  map[AwsInstanceRef]*asg
	mutex          sync.Mutex
	service        autoScalingWrapper
	interrupt      chan struct{}

	asgAutoDiscoverySpecs []asgAutoDiscoveryConfig
	explicitlyConfigured  map[AwsRef]bool
}

// asg 对应 aws 的 AutoScalingGroup
type asg struct {
	AwsRef

	minSize int
	maxSize int
	curSize int

	AvailabilityZones       []string
	LaunchConfigurationName string
	LaunchTemplate          *launchTemplate
	MixedInstancesPolicy    *mixedInstancesPolicy
	Tags                    []*autoscaling.TagDescription
}
2.6.1 regenerate

CA 配置自动发现 asg 机制后,该方法会查找所有打了响应标签的 asg,并将asg的基本信息、实例信息同步到本地内存

代码语言:go复制
func (m *asgCache) regenerate() error {
	m.mutex.Lock()
	defer m.mutex.Unlock()

	newInstanceToAsgCache := make(map[AwsInstanceRef]*asg)
	newAsgToInstancesCache := make(map[AwsRef][]AwsInstanceRef)

	// 通过 CA 启动参数中配置的标签,查找符合条件的所有 asg
	refreshNames, err := m.buildAsgNames()
	
  // 根据 asg 名称,查找完整的 asg 详细信息
	groups, err := m.service.getAutoscalingGroupsByNames(refreshNames)
	
 
	for _, group := range groups {
		asg, err := m.buildAsgFromAWS(group)
		if err != nil {
			return err
		}
		exists[asg.AwsRef] = true
		// 注册 asg 到缓存
		asg = m.register(asg)

		newAsgToInstancesCache[asg.AwsRef] = make([]AwsInstanceRef, len(group.Instances))
		// 建立映射关系
		for i, instance := range group.Instances {
			ref := m.buildInstanceRefFromAWS(instance)
			newInstanceToAsgCache[ref] = asg
			newAsgToInstancesCache[asg.AwsRef][i] = ref
		}
	}

	// Unregister no longer existing auto-discovered ASGs
	for _, asg := range m.registeredAsgs {
		if !exists[asg.AwsRef] && !m.explicitlyConfigured[asg.AwsRef] {
			m.unregister(asg)
		}
	}

	m.asgToInstances = newAsgToInstancesCache
	m.instanceToAsg = newInstanceToAsgCache
	return nil
}

2.7 Aws provider 创建的流程

调用链路: NewCloudProvider -> buildCloudProvider -> BuildAWS -> CreateAwsManager -> BuildAwsCloudProvider

  • AWS账号初始化
    • 读取配置文件
    • 根据配置文件创建 AWSSDKProvider
    • 创建 Session,创建 AwsService
  • 构造 asgCache
    • 解析自动发现 asg 的tag等入参信息
    • 自动同步符合 tag 的 aws asg 到本地 asgCache
  • 初始化 awsManager
代码语言:go复制
func initializeDefaultOptions(opts *AutoscalerOptions) error {
  ...
  if opts.CloudProvider == nil {
    // 创建一个 CloudProvider
		opts.CloudProvider = cloudBuilder.NewCloudProvider(opts.AutoscalingOptions)
	}
  ...
}

func NewCloudProvider(opts config.AutoscalingOptions) cloudprovider.CloudProvider {
	...
  // 根据 options参数,构建 provider
	provider := buildCloudProvider(opts, do, rl)
	if provider != nil {
		return provider
	}
}

func buildCloudProvider(opts config.AutoscalingOptions, do cloudprovider.NodeGroupDiscoveryOptions, rl *cloudprovider.ResourceLimiter) cloudprovider.CloudProvider {
	switch opts.CloudProviderName {
	...
  // aws 的 provider
	case cloudprovider.AwsProviderName:
		return aws.BuildAWS(opts, do, rl)
  ...
 }
}

func BuildAWS(...) {
  ...
  // 初始化 awsManager
  manager, err := CreateAwsManager(config, do, instanceTypes)
	// 初始化 provider
	provider, err := BuildAwsCloudProvider(manager, rl)
	
	return provider
}

func CreateAwsManager(...) (*AwsManager, error) {
	return createAWSManagerInternal(configReader, discoveryOpts, nil, nil, instanceTypes)
}

func createAWSManagerInternal(
	configReader io.Reader,
	discoveryOpts cloudprovider.NodeGroupDiscoveryOptions,
	autoScalingService *autoScalingWrapper,
	ec2Service *ec2Wrapper,
	instanceTypes map[string]*InstanceType,
) (*AwsManager, error) {
	// 读取配置文件
	cfg, err := readAWSCloudConfig(configReader)
	...
  // 解析自动发现 asg 的入参信息
	specs, err := parseASGAutoDiscoverySpecs(discoveryOpts)
	...
  // 初始化 asgCache
	cache, err := newASGCache(*autoScalingService, discoveryOpts.NodeGroupSpecs, specs)

  // 初始化 awsManager
	manager := &AwsManager{
		autoScalingService: *autoScalingService,
		ec2Service:         *ec2Service,
		asgCache:           cache,
		instanceTypes:      instanceTypes,
	}
	// 执行刷新操作,将 aws 的 asg 信息同步到本地 asgCache
	if err := manager.forceRefresh(); err != nil {
		return nil, err
	}

	return manager, nil
}
  
 
2.7.1 BuildAWS
代码语言:go复制
func BuildAWS(opts config.AutoscalingOptions, do cloudprovider.NodeGroupDiscoveryOptions, rl *cloudprovider.ResourceLimiter) cloudprovider.CloudProvider {
	// 读取参数中配置相关的 CloudConfig 文件
  var config io.ReadCloser
	if opts.CloudConfig != "" {
		var err error
		config, err = os.Open(opts.CloudConfig)
		if err != nil {
			klog.Fatalf("Couldn't open cloud provider configuration %s: %#v", opts.CloudConfig, err)
		}
		defer config.Close()
	}

	// 获取机型
	instanceTypes, lastUpdateTime := GetStaticEC2InstanceTypes()
  // 获取静态机型,可能会过时
	if opts.AWSUseStaticInstanceList {
		klog.Warningf("Using static EC2 Instance Types, this list could be outdated. Last update time: %s", lastUpdateTime)
	} else {
    // 实时当前可用区
    // 先读取环境变量:AWS_REGION,找不到再调接口获取
		region, err := GetCurrentAwsRegion()
		...
    // 获取机型
		generatedInstanceTypes, err := GenerateEC2InstanceTypes(region)
		...
	}
	// 创建 AwsManager
  // AwsManager 用于操作 aws 资源
	manager, err := CreateAwsManager(config, do, instanceTypes)
	...
  // 创建 provider
	provider, err := BuildAwsCloudProvider(manager, rl)
	...
  // 注册指标
	RegisterMetrics()
	return provider
}

0 人点赞