一、概述
上一篇文章介绍了 k8s 自动扩缩容的三种方式:HPA、VPA、CA,以及各自的使用场景和架构。本文针对 CA 做源码分析。
1.1 CA架构回顾
参考
CA由一下几个模块组成:
- autoscaler:核心模块,负责整体扩缩容功能
- Estimator:负责评估计算扩容节点
- Simulator:负责模拟调度,计算缩容节点
- Cloud Provider:与云上 IaaS 层交互,负责增删改查节点。云厂商需要实现相关接口。
1.2 仓库代码结构
源码地址
CA 代码在 k8s 官方的 autoscaler 仓库下,该仓库存放自动扩缩容相关组件,包括前文介绍的 VPA、今天的主角CA、还有一个VPA修改pod资源的插件 Addon Resizer。使用的版本是 cluster-autoscaler-release-1.21,目录结构如下
代码语言:shell复制.
├── CONTRIBUTING.md
├── LICENSE
├── OWNERS
├── README.md
├── SECURITY_CONTACTS
├── addon-resizer # addon-resizer 代码
├── builder
├── charts
├── cluster-autoscaler # CA 代码
├── code-of-conduct.md
├── hack
└── vertical-pod-autoscaler # VPA 代码
1.3 CA 代码结构
代码语言:shell复制.
├── FAQ.md # FAQ,里面有很多关于 CA 原理和使用的说明
├── cloudprovider # cloud provider模块,包括接口和各个云厂商的实现
│ ├── alicloud
│ ├── aws
│ ├── azure
│ ├── baiducloud
│ ├── builder
│ ├── cloud_provider.go # cloud provider 接口,要对接自己的云,需要实现该接口操作 IaaS 资源
│ ├── gce
│ ├── huaweicloud
├── core # CA 核心模块
│ ├── autoscaler.go # 定义 Autoscaler 接口
│ ├── equivalence_groups.go # 资源不足的 pod 按扩容类型分类的处理逻辑
│ ├── filter_out_schedulable.go
│ ├── scale_down.go # 缩容
│ ├── scale_up.go # 扩容
│ ├── static_autoscaler.go # Autoscaler 的实现类
│ └── utils
├── estimator # Estimator 模块,评估扩容节点
│ ├── binpacking_estimator.go # Estimator 实现类,实现首次适应背包算法(装箱算法)
│ └── estimator.go # 定义 Estimator 接口
├── expander
│ ├── expander.go # expander 定义了选择多个符合条件的 NodeGroup 的策略接口
│ ├── factory # 根据传入的不同策略名称,创建对应的实现类
│ ├── mostpods # mostpods 策略:调度最多的 pod
│ ├── price # price 策略:价格最低
│ ├── priority # priority 策略:根据 NodeGroup 的优先级选择
│ ├── random # random 策略:随机选择符合条件的 NodeGroup 中的一个
│ └── waste # waste 策略:资源利用率最高
├── go.mod
├── go.sum
├── main.go # main 方法
├── metrics # 指标采集
├── processors
│ ├── callbacks
│ ├── customresources
│ ├── nodegroupconfig
│ ├── nodegroups
│ ├── nodegroupset
│ ├── nodeinfos
│ ├── nodes
│ ├── pods
│ ├── processors.go
│ └── status
├── proposals # 提案,设计文档信息
│ ├── balance_similar.md
│ ├── circumvent-tag-limit-aws.md
│ ├── clusterstate.md
│ ├── images
│ ├── kubemark_integration.md
│ ├── metrics.md
│ ├── min_at_zero_gcp.md
│ ├── node_autoprovisioning.md
│ ├── plugable-provider-grpc.md
│ ├── pricing.md
├── simulator # 模拟调度模块,主要用于缩容
│ ├── basic_cluster_snapshot.go
│ ├── cluster.go
│ ├── cluster_snapshot.go
│ ├── delegating_shared_lister.go
│ ├── delta_cluster_snapshot.go
│ ├── drain.go
│ ├── nodes.go
│ ├── nodes_test.go
│ ├── predicate_error.go
│ ├── predicates_checker_interface.go
│ ├── scheduler_based_predicates_checker.go
│ ├── test_predicates_checker.go
│ ├── test_utils.go
│ ├── tracker.go
二、CloudProvider 模块
2.1 CloudProvider 接口
包含配置信息、与云厂商交互的方法。核心方法有:
- Name():提供唯一的名称
- Refresh():刷新云厂商资源信息
- NodeGroups():获取所有的节点组
- NodeGroupForNode(...):获取某个节点所属的节点组
type CloudProvider interface {
// cloud provider 名称
Name() string
// 返回 cloud privider 配置的所有 node group
NodeGroups() []NodeGroup
// 返回给定 Node 节点所属的 NodeGroup
// 如果节点不应该被 autoscaler 处理,应该返回 nil
NodeGroupForNode(*apiv1.Node) (NodeGroup, error)
// 可选实现,价格模型
Pricing() (PricingModel, errors.AutoscalerError)
// 可选实现,获取 cloud provider 支持的所有机器型号
GetAvailableMachineTypes() ([]string, error)
// 基于定义的 node,构建理论的 node group
// 阻塞方法,直到 node group 创建出来
// 可选实现
NewNodeGroup(machineType string, labels map[string]string, systemLabels map[string]string,
taints []apiv1.Taint, extraResources map[string]resource.Quantity) (NodeGroup, error)
// 返回结构化资源限额
GetResourceLimiter() (*ResourceLimiter, error)
// 返回添加到 Node 上的 GPU 资源标签
GPULabel() string
// 返回所有可用的 GPU 类型
GetAvailableGPUTypes() map[string]struct{}
// 清理工作,比如:go 协程
Cleanup() error
// 在每次主循环之前调用,并且用于动态更新 cloud provider 状态
// 尤其是由 NodeGroups 改变,导致返回一个 node group 列表
Refresh() error
}
2.2 NodeGroup 节点组
NodeGroup提供相关接口,操作具有相同容量和标签的一组节点。核心方法有:
- MaxSize():节点组允许的最大扩容数量
- MinSize():节点组允许的最小缩容数量
- TargetSize():节点组当前数量
- IncreaseSize(delta int):新增 delta 个节点的方法
- DecreaseTargetSize(delta int):减少节点的方法
- DeleteNodes(...):删除实例的方法
- TemplateNodeInfo():
包含配置信息和方法控制
代码语言:go复制type NodeGroup interface {
// 返回 node group 的最大数量
MaxSize() int
// 返回 node group 的最小数量
MinSize() int
// 必须实现该方法。
// 返回当前目标数量,必须实现该方法
// 有可能 k8s 节点数量和这个值不相等,但是一旦一切稳定(node完成启动和注册、节点彻底删除)就应该等于 Size()
TargetSize() (int, error)
// 必须实现该方法。
// 增加 node group 的数量,为了删除节点你需要显示指定名称并调用 DeleteNode 方法
// 该方法会阻塞知道 node group 数量更新
IncreaseSize(delta int) error
// 必须实现该方法。
// 从 node group 中删除节点。
// 如果失败或者 node 不属于 node group 将会报错。
// 该方法会阻塞知道 node group 数量更新
DeleteNodes([]*apiv1.Node) error
// 从 Node group 中减少目标数量
// 该方法不允许删除任何节点,仅仅用于减少没有完全填充的新节点
// 参数 delta 必须是负数,假定 cloud provider 在调整目标数量时,将不会删除存在的节点
DecreaseTargetSize(delta int) error
// 返回 node group 唯一标识
Id() string
// 返回调试信息
Debug() string
// 返回所有属于 node group 的节点列表
// Instance 对象必须包含 id 字段,其他字段值可选
// 列表也包含不会成为 k8s 的那些节点
Nodes() ([]Instance, error)
// 可选实现
// 返回包含空 node 新的的调度结构体,将被用于扩容仿真,以预测一个新的扩容节点是什么样的
// 返回的 NodeInfo 信息包含完整的 Node 对象信N,包括:标签、容量、能分配的 pod 信息(类似kube-proxy)
TemplateNodeInfo() (*schedulerframework.NodeInfo, error)
// 必须实现。返回 node group 是否存在
Exist() bool
// 可选实现。创建 node group
Create() (NodeGroup, error)
// 可选实现,删除 node group
Delete() error
// 是否支持自动供应
Autoprovisioned() bool
// 可选实现,返回配置参数
GetOptions(defaults config.NodeGroupAutoscalingOptions) (*config.NodeGroupAutoscalingOptions, error)
}
2.3 CloudProvider 实现的厂商
大部分云厂商都实现了该接口,参考
- 国外的:亚马逊AWS、谷歌GCE、微软Azure
- 国内的:阿里云、华为云、腾讯云、百度云
- 其他:......
2.4 AWS 接口实现
以 aws 为例分析实现实现逻辑,代码结构如下
代码语言:shell复制.
├── auto_scaling.go
├── auto_scaling_groups.go # 获取 asg 信息,保存在 asgCache 缓存中
├── aws_cloud_provider.go # awsCloudProvider 实现 CloudProvider 接口里的方法
├── aws_manager.go # 根据账号密码,构建 awsManager 对象来操作 aws 资源
├── aws_util.go # 获取机型、可用区等信息
├── ec2.go
├── ec2_instance_types
│ └── gen.go
├── ec2_instance_types.go # 默认机型列表
└── examples
├── cluster-autoscaler-autodiscover.yaml # 自动发现模式部署 ca
├── cluster-autoscaler-multi-asg.yaml # 多 asg 模式部署 ca
├── cluster-autoscaler-one-asg.yaml # 单 asg 模式部署 ca
└── cluster-autoscaler-run-on-control-plane.yaml
2.4.1 Name
返回 provider 的名称 aws
代码语言:go复制// AwsProviderName = "aws"
func (aws *awsCloudProvider) Name() string {
return cloudprovider.AwsProviderName
}
2.4.2 Refresh
调用链:CloudProvider -> AwsManager -> asgCache
refresh 的功能是获取 aws 中的 asg,以及模板、实例等信息保存到缓存中
代码语言:go复制func (aws *awsCloudProvider) Refresh() error {
// 调用 awsManager 的 Refresh 方法
return aws.awsManager.Refresh()
}
func (m *AwsManager) Refresh() error {
...
// 调用 forceRefresh
return m.forceRefresh()
}
func (m *AwsManager) forceRefresh() error {
// 调用 asgCache 的 regenerate
if err := m.asgCache.regenerate(); err != nil {
...
}
...
return nil
}
func (m *asgCache) regenerate() error {
...
newInstanceToAsgCache := make(map[AwsInstanceRef]*asg)
newAsgToInstancesCache := make(map[AwsRef][]AwsInstanceRef)
// Build list of knowns ASG names
// 获取所有的 asg name
refreshNames, err := m.buildAsgNames()
// 根据 asg name 获取 asg 对象
// 调用 aws-sdk-go 中 AutoScaling 的 DescribeAutoScalingGroupsPages
groups, err := m.service.getAutoscalingGroupsByNames(refreshNames)
// 填充 asg 启动配置的实例模板
err = m.service.populateLaunchConfigurationInstanceTypeCache(groups)
if err != nil {
klog.Warningf("Failed to fully populate all launchConfigurations: %v", err)
}
// If currently any ASG has more Desired than running Instances, introduce placeholders
// for the instances to come up. This is required to track Desired instances that
// will never come up, like with Spot Request that can't be fulfilled
groups = m.createPlaceholdersForDesiredNonStartedInstances(groups)
// Register or update ASGs
exists := make(map[AwsRef]bool)
for _, group := range groups {
asg, err := m.buildAsgFromAWS(group)
if err != nil {
return err
}
exists[asg.AwsRef] = true
// 注册 asg
asg = m.register(asg)
newAsgToInstancesCache[asg.AwsRef] = make([]AwsInstanceRef, len(group.Instances))
// 将 group 中所有的实例信息保存到缓存中
for i, instance := range group.Instances {
// 根据 group 的 instance 信息构造 instance
ref := m.buildInstanceRefFromAWS(instance)
newInstanceToAsgCache[ref] = asg
newAsgToInstancesCache[asg.AwsRef][i] = ref
}
}
// 注销不存在的 asg
for _, asg := range m.registeredAsgs {
if !exists[asg.AwsRef] && !m.explicitlyConfigured[asg.AwsRef] {
m.unregister(asg)
}
}
// 生成 asg -> instance 的缓存
m.asgToInstances = newAsgToInstancesCache
// 生成 instance -> asg 的缓存
m.instanceToAsg = newInstanceToAsgCache
return nil
}
2.4.3 NodeGroups
调用 awsManager 获取所有的 asg
代码语言:go复制func (aws *awsCloudProvider) NodeGroups() []cloudprovider.NodeGroup {
// 调用 awsManager 获取所有的 asg
asgs := aws.awsManager.getAsgs()
ngs := make([]cloudprovider.NodeGroup, len(asgs))
for i, asg := range asgs {
ngs[i] = &AwsNodeGroup{
asg: asg,
awsManager: aws.awsManager,
}
}
return ngs
}
2.4.4 NodeGroupForNode
- 从 Node.Spec.ProviderID 中获取 id
- 调用 awsManager 获取 id 对应的 asg
func (aws *awsCloudProvider) NodeGroupForNode(node *apiv1.Node) (cloudprovider.NodeGroup, error) {
if len(node.Spec.ProviderID) == 0 {
klog.Warningf("Node %v has no providerId", node.Name)
return nil, nil
}
// 从 Node.Spec.ProviderID 中获取 id
ref, err := AwsRefFromProviderId(node.Spec.ProviderID)
if err != nil {
return nil, err
}
// 获取 asg
asg := aws.awsManager.GetAsgForInstance(*ref)
if asg == nil {
return nil, nil
}
return &AwsNodeGroup{
asg: asg,
awsManager: aws.awsManager,
}, nil
}
2.4.5 IncreaseSize
调用链:CloudProvider -> AwsManager -> asgCache -> aws-sdk-go
通过传入操作 aws asg 的参数,调用 aws-sdk-go 的 asg 接口,实现新增节点的效果
代码语言:go复制func (ng *AwsNodeGroup) IncreaseSize(delta int) error {
// 增量不能小于 0
if delta <= 0 {
return fmt.Errorf("size increase must be positive")
}
size := ng.asg.curSize
// 增加后不能超过最大值
if size delta > ng.asg.maxSize {
return fmt.Errorf("size increase too large - desired:%d max:%d", size delta, ng.asg.maxSize)
}
// 调用 awsManager 设置 size 为当前 size delta
return ng.awsManager.SetAsgSize(ng.asg, size delta)
}
// SetAsgSize sets ASG size.
func (m *AwsManager) SetAsgSize(asg *asg, size int) error {
return m.asgCache.SetAsgSize(asg, size)
}
// 加锁操作
func (m *asgCache) SetAsgSize(asg *asg, size int) error {
m.mutex.Lock()
defer m.mutex.Unlock()
return m.setAsgSizeNoLock(asg, size)
}
func (m *asgCache) setAsgSizeNoLock(asg *asg, size int) error {
// 拼接参数:name、size、honorCooldown
params := &autoscaling.SetDesiredCapacityInput{
AutoScalingGroupName: aws.String(asg.Name),
DesiredCapacity: aws.Int64(int64(size)),
HonorCooldown: aws.Bool(false),
}
klog.V(0).Infof("Setting asg %s size to %d", asg.Name, size)
// 调用 aws-sdk-go 操作 ASG 的 AutoScaling 接口完成操作
_, err := m.service.SetDesiredCapacity(params)
if err != nil {
return err
}
// Proactively set the ASG size so autoscaler makes better decisions
asg.curSize = size
return nil
}
2.4.6 DecreaseTargetSize
执行代码同 IncreaseSize,不同的仅仅是参数 delta是负数。
代码语言:go复制func (ng *AwsNodeGroup) DecreaseTargetSize(delta int) error {
// 增量必须为负数
if delta >= 0 {
return fmt.Errorf("size decrease size must be negative")
}
size := ng.asg.curSize
// 查询目前 ASG 的节点数
nodes, err := ng.awsManager.GetAsgNodes(ng.asg.AwsRef)
if err != nil {
return err
}
// 删除的数量不能超过已有数量
if int(size) delta < len(nodes) {
return fmt.Errorf("attempt to delete existing nodes targetSize:%d delta:%d existingNodes: %d",
size, delta, len(nodes))
}
// 方法同 IncreaseSize 中一样
return ng.awsManager.SetAsgSize(ng.asg, size delta)
}
2.4.7 DeleteNodes
调用链:CloudProvider -> AwsManager -> aws-sdk-go
通过 Node.Spec.ProviderID 拿到实例唯一 id,调用 SDK 时传入 id,执行删除操作
代码语言:go复制func (ng *AwsNodeGroup) DeleteNodes(nodes []*apiv1.Node) error {
size := ng.asg.curSize
if int(size) <= ng.MinSize() {
return fmt.Errorf("min size reached, nodes will not be deleted")
}
refs := make([]*AwsInstanceRef, 0, len(nodes))
for _, node := range nodes {
// 判断待删除 Node 是否是当前 ASG
belongs, err := ng.Belongs(node)
if err != nil {
return err
}
if belongs != true {
return fmt.Errorf("%s belongs to a different asg than %s", node.Name, ng.Id())
}
// 获取 Node 的 aws 唯一凭证信息
// providerID 保存在 node.Spec.ProviderID 字段中
awsref, err := AwsRefFromProviderId(node.Spec.ProviderID)
if err != nil {
return err
}
refs = append(refs, awsref)
}
// 调用 AwsManager 的删除实例方法
return ng.awsManager.DeleteInstances(refs)
}
// providerID 的格式是:aws:///zone/name
func AwsRefFromProviderId(id string) (*AwsInstanceRef, error) {
if validAwsRefIdRegex.FindStringSubmatch(id) == nil {
return nil, fmt.Errorf("wrong id: expected format aws:///<zone>/<name>, got %v", id)
}
splitted := strings.Split(id[7:], "/")
return &AwsInstanceRef{
ProviderID: id,
Name: splitted[1],
}, nil
}
2.4.8 TemplateNodeInfo
- getAsgTemplate:获取 template
func (ng *AwsNodeGroup) TemplateNodeInfo() (*schedulerframework.NodeInfo, error) {
// 获取 asg 的模板信息
template, err := ng.awsManager.getAsgTemplate(ng.asg)
// 通过模板构造 Node 对象
node, err := ng.awsManager.buildNodeFromTemplate(ng.asg, template)
// 通过调度框架接口构造调度对象,用于 CA 模拟调度
nodeInfo := schedulerframework.NewNodeInfo(cloudprovider.BuildKubeProxy(ng.asg.Name))
nodeInfo.SetNode(node)
return nodeInfo, nil
}
2.5 AwsManager 实现
通过前面的分析发现,aws接口实现中和底层IaaS操作的很多逻辑都封装到了 AwsManager 中,这里专门针对 AwsManager做分析。
2.5.1 getAsgTemplate
获取 asg 中第一个可用区的模板信息
代码语言:go复制func (m *AwsManager) getAsgTemplate(asg *asg) (*asgTemplate, error) {
// 判断是否有可用区信息
if len(asg.AvailabilityZones) < 1 {
return nil, fmt.Errorf("unable to get first AvailabilityZone for ASG %q", asg.Name)
}
// asg可配置多个az信息, 默认选择 asg 中第一个可用 az
az := asg.AvailabilityZones[0]
region := az[0 : len(az)-1]
if len(asg.AvailabilityZones) > 1 {
klog.V(4).Infof("Found multiple availability zones for ASG %q; using %s for %s labeln", asg.Name, az, apiv1.LabelFailureDomainBetaZone)
}
// 获取可用机型,通过调用 aws-sdk-go 获取
instanceTypeName, err := m.buildInstanceType(asg)
if t, ok := m.instanceTypes[instanceTypeName]; ok {
return &asgTemplate{
InstanceType: t,
Region: region,
Zone: az,
Tags: asg.Tags,
}, nil
}
return nil, fmt.Errorf("ASG %q uses the unknown EC2 instance type %q", asg.Name, instanceTypeName)
}
2.5.2 buildNodeFromTemplate
根据 template 构造 node 对象,预调度就是通过虚拟出来的 Node 对象,传给调度框架来实现。
Node 数据的来源包括:
- asg 模板:提供node 的 cpu、memory、机型、az等信息
- asg tag:提供node 的 taint、label 信息
func (m *AwsManager) buildNodeFromTemplate(asg *asg, template *asgTemplate) (*apiv1.Node, error) {
node := apiv1.Node{}
// 生成随机字符串,拼接上 {asgName}-asg-{rand.Int63} 作为主机名
nodeName := fmt.Sprintf("%s-asg-%d", asg.Name, rand.Int63())
node.ObjectMeta = metav1.ObjectMeta{
Name: nodeName,
SelfLink: fmt.Sprintf("/api/v1/nodes/%s", nodeName),
Labels: map[string]string{},
}
/
node.Status = apiv1.NodeStatus{
Capacity: apiv1.ResourceList{},
}
// 将 asg 返回的机器规格信息填充到 node.Status 中,便于后续调度
node.Status.Capacity[apiv1.ResourcePods] = *resource.NewQuantity(110, resource.DecimalSI)
// 构造 node 的 cpu 信息
node.Status.Capacity[apiv1.ResourceCPU] = *resource.NewQuantity(template.InstanceType.VCPU, resource.DecimalSI)
// 构造 node 的 gpu 信息
node.Status.Capacity[gpu.ResourceNvidiaGPU] = *resource.NewQuantity(template.InstanceType.GPU, resource.DecimalSI)
// 构造 node 的 memory 信息
// asg 的实例类型的 MemroyMb * 1024 * 1024 作为 node 的 memory
node.Status.Capacity[apiv1.ResourceMemory] = *resource.NewQuantity(template.InstanceType.MemoryMb*1024*1024, resource.DecimalSI)
resourcesFromTags := extractAllocatableResourcesFromAsg(template.Tags)
for resourceName, val := range resourcesFromTags {
node.Status.Capacity[apiv1.ResourceName(resourceName)] = *val
}
// TODO: use proper allocatable!!
node.Status.Allocatable = node.Status.Capacity
// 生成 node 的 generic 信息,填充到 label
// - "kubernetes.io/arch":asg instance 的机型
// - "kubernetes.io/os":linux
// - "node.kubernetes.io/instance-type"
// - "topology.kubernetes.io/region"
// - "topology.kubernetes.io/zone"
// - "kubernetes.io/hostname"
node.Labels = cloudprovider.JoinStringMaps(node.Labels, buildGenericLabels(template, nodeName))
// 填充 node.Label 信息
// 读取所有的 k8s.io/cluster-autoscaler/node-template/label/ 前缀的 tags
node.Labels = cloudprovider.JoinStringMaps(node.Labels, extractLabelsFromAsg(template.Tags))
// 填充 node.Spec.Taints 信息
// 读取所有的 k8s.io/cluster-autoscaler/node-template/taint/ 前缀
// 且满足正则 "(.*):(?:NoSchedule|NoExecute|PreferNoSchedule)" 的 tags
node.Spec.Taints = extractTaintsFromAsg(template.Tags)
// 填充 node.Status.Conditions
node.Status.Conditions = cloudprovider.BuildReadyConditions()
return &node, nil
}
2.6 asgCache
asgCache用于缓存 aws 中所有的 asg 信息
代码语言:go复制// asgCache 保存 aws 当前所有的 asg 缓存信息
type asgCache struct {
// 所有的 asg 列表
registeredAsgs []*asg
// asg -> instance 的映射
asgToInstances map[AwsRef][]AwsInstanceRef
// instance -> asg 的映射
instanceToAsg map[AwsInstanceRef]*asg
mutex sync.Mutex
service autoScalingWrapper
interrupt chan struct{}
asgAutoDiscoverySpecs []asgAutoDiscoveryConfig
explicitlyConfigured map[AwsRef]bool
}
// asg 对应 aws 的 AutoScalingGroup
type asg struct {
AwsRef
minSize int
maxSize int
curSize int
AvailabilityZones []string
LaunchConfigurationName string
LaunchTemplate *launchTemplate
MixedInstancesPolicy *mixedInstancesPolicy
Tags []*autoscaling.TagDescription
}
2.6.1 regenerate
CA 配置自动发现 asg 机制后,该方法会查找所有打了响应标签的 asg,并将asg的基本信息、实例信息同步到本地内存
代码语言:go复制func (m *asgCache) regenerate() error {
m.mutex.Lock()
defer m.mutex.Unlock()
newInstanceToAsgCache := make(map[AwsInstanceRef]*asg)
newAsgToInstancesCache := make(map[AwsRef][]AwsInstanceRef)
// 通过 CA 启动参数中配置的标签,查找符合条件的所有 asg
refreshNames, err := m.buildAsgNames()
// 根据 asg 名称,查找完整的 asg 详细信息
groups, err := m.service.getAutoscalingGroupsByNames(refreshNames)
for _, group := range groups {
asg, err := m.buildAsgFromAWS(group)
if err != nil {
return err
}
exists[asg.AwsRef] = true
// 注册 asg 到缓存
asg = m.register(asg)
newAsgToInstancesCache[asg.AwsRef] = make([]AwsInstanceRef, len(group.Instances))
// 建立映射关系
for i, instance := range group.Instances {
ref := m.buildInstanceRefFromAWS(instance)
newInstanceToAsgCache[ref] = asg
newAsgToInstancesCache[asg.AwsRef][i] = ref
}
}
// Unregister no longer existing auto-discovered ASGs
for _, asg := range m.registeredAsgs {
if !exists[asg.AwsRef] && !m.explicitlyConfigured[asg.AwsRef] {
m.unregister(asg)
}
}
m.asgToInstances = newAsgToInstancesCache
m.instanceToAsg = newInstanceToAsgCache
return nil
}
2.7 Aws provider 创建的流程
调用链路: NewCloudProvider -> buildCloudProvider -> BuildAWS -> CreateAwsManager -> BuildAwsCloudProvider
- AWS账号初始化
- 读取配置文件
- 根据配置文件创建 AWSSDKProvider
- 创建 Session,创建 AwsService
- 构造 asgCache
- 解析自动发现 asg 的tag等入参信息
- 自动同步符合 tag 的 aws asg 到本地 asgCache
- 初始化 awsManager
func initializeDefaultOptions(opts *AutoscalerOptions) error {
...
if opts.CloudProvider == nil {
// 创建一个 CloudProvider
opts.CloudProvider = cloudBuilder.NewCloudProvider(opts.AutoscalingOptions)
}
...
}
func NewCloudProvider(opts config.AutoscalingOptions) cloudprovider.CloudProvider {
...
// 根据 options参数,构建 provider
provider := buildCloudProvider(opts, do, rl)
if provider != nil {
return provider
}
}
func buildCloudProvider(opts config.AutoscalingOptions, do cloudprovider.NodeGroupDiscoveryOptions, rl *cloudprovider.ResourceLimiter) cloudprovider.CloudProvider {
switch opts.CloudProviderName {
...
// aws 的 provider
case cloudprovider.AwsProviderName:
return aws.BuildAWS(opts, do, rl)
...
}
}
func BuildAWS(...) {
...
// 初始化 awsManager
manager, err := CreateAwsManager(config, do, instanceTypes)
// 初始化 provider
provider, err := BuildAwsCloudProvider(manager, rl)
return provider
}
func CreateAwsManager(...) (*AwsManager, error) {
return createAWSManagerInternal(configReader, discoveryOpts, nil, nil, instanceTypes)
}
func createAWSManagerInternal(
configReader io.Reader,
discoveryOpts cloudprovider.NodeGroupDiscoveryOptions,
autoScalingService *autoScalingWrapper,
ec2Service *ec2Wrapper,
instanceTypes map[string]*InstanceType,
) (*AwsManager, error) {
// 读取配置文件
cfg, err := readAWSCloudConfig(configReader)
...
// 解析自动发现 asg 的入参信息
specs, err := parseASGAutoDiscoverySpecs(discoveryOpts)
...
// 初始化 asgCache
cache, err := newASGCache(*autoScalingService, discoveryOpts.NodeGroupSpecs, specs)
// 初始化 awsManager
manager := &AwsManager{
autoScalingService: *autoScalingService,
ec2Service: *ec2Service,
asgCache: cache,
instanceTypes: instanceTypes,
}
// 执行刷新操作,将 aws 的 asg 信息同步到本地 asgCache
if err := manager.forceRefresh(); err != nil {
return nil, err
}
return manager, nil
}
2.7.1 BuildAWS
代码语言:go复制func BuildAWS(opts config.AutoscalingOptions, do cloudprovider.NodeGroupDiscoveryOptions, rl *cloudprovider.ResourceLimiter) cloudprovider.CloudProvider {
// 读取参数中配置相关的 CloudConfig 文件
var config io.ReadCloser
if opts.CloudConfig != "" {
var err error
config, err = os.Open(opts.CloudConfig)
if err != nil {
klog.Fatalf("Couldn't open cloud provider configuration %s: %#v", opts.CloudConfig, err)
}
defer config.Close()
}
// 获取机型
instanceTypes, lastUpdateTime := GetStaticEC2InstanceTypes()
// 获取静态机型,可能会过时
if opts.AWSUseStaticInstanceList {
klog.Warningf("Using static EC2 Instance Types, this list could be outdated. Last update time: %s", lastUpdateTime)
} else {
// 实时当前可用区
// 先读取环境变量:AWS_REGION,找不到再调接口获取
region, err := GetCurrentAwsRegion()
...
// 获取机型
generatedInstanceTypes, err := GenerateEC2InstanceTypes(region)
...
}
// 创建 AwsManager
// AwsManager 用于操作 aws 资源
manager, err := CreateAwsManager(config, do, instanceTypes)
...
// 创建 provider
provider, err := BuildAwsCloudProvider(manager, rl)
...
// 注册指标
RegisterMetrics()
return provider
}