Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 51 additions & 47 deletions cluster-autoscaler/cloudprovider/aws/auto_scaling_groups.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,17 @@ limitations under the License.
package aws

import (
"context"
"fmt"
"reflect"
"strings"
"sync"
"time"

"k8s.io/autoscaler/cluster-autoscaler/cloudprovider/aws/aws-sdk-go/aws"
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider/aws/aws-sdk-go/service/autoscaling"
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider/aws/aws-sdk-go/service/ec2"
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider/aws/aws-sdk-go-v2/aws"
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider/aws/aws-sdk-go-v2/service/autoscaling"
autoscalingtypes "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/aws/aws-sdk-go-v2/service/autoscaling/types"
ec2types "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/aws/aws-sdk-go-v2/service/ec2/types"
"k8s.io/autoscaler/cluster-autoscaler/config/dynamic"
klog "k8s.io/klog/v2"
)
Expand All @@ -41,7 +43,7 @@ type asgCache struct {
asgToInstances map[AwsRef][]AwsInstanceRef
instanceToAsg map[AwsInstanceRef]*asg
instanceStatus map[AwsInstanceRef]*string
instanceLifecycle map[AwsInstanceRef]*string
instanceLifecycle map[AwsInstanceRef]autoscalingtypes.LifecycleState
asgInstanceTypeCache *instanceTypeExpirationStore
mutex sync.Mutex
awsService *awsWrapper
Expand All @@ -60,8 +62,8 @@ type launchTemplate struct {
type mixedInstancesPolicy struct {
launchTemplate *launchTemplate
instanceTypesOverrides []string
instanceRequirementsOverrides *autoscaling.InstanceRequirements
instanceRequirements *ec2.InstanceRequirements
instanceRequirementsOverrides *autoscalingtypes.InstanceRequirements
instanceRequirements *ec2types.InstanceRequirements
}

type asg struct {
Expand All @@ -76,7 +78,7 @@ type asg struct {
LaunchConfigurationName string
LaunchTemplate *launchTemplate
MixedInstancesPolicy *mixedInstancesPolicy
Tags []*autoscaling.TagDescription
Tags []autoscalingtypes.TagDescription
}

func newASGCache(awsService *awsWrapper, explicitSpecs []string, autoDiscoverySpecs []asgAutoDiscoveryConfig) (*asgCache, error) {
Expand All @@ -86,7 +88,7 @@ func newASGCache(awsService *awsWrapper, explicitSpecs []string, autoDiscoverySp
asgToInstances: make(map[AwsRef][]AwsInstanceRef),
instanceToAsg: make(map[AwsInstanceRef]*asg),
instanceStatus: make(map[AwsInstanceRef]*string),
instanceLifecycle: make(map[AwsInstanceRef]*string),
instanceLifecycle: make(map[AwsInstanceRef]autoscalingtypes.LifecycleState),
asgInstanceTypeCache: newAsgInstanceTypeCache(awsService),
interrupt: make(chan struct{}),
asgAutoDiscoverySpecs: autoDiscoverySpecs,
Expand Down Expand Up @@ -243,12 +245,12 @@ func (m *asgCache) InstanceStatus(ref AwsInstanceRef) (*string, error) {
return nil, fmt.Errorf("could not find instance %v", ref)
}

func (m *asgCache) findInstanceLifecycle(ref AwsInstanceRef) (*string, error) {
func (m *asgCache) findInstanceLifecycle(ref AwsInstanceRef) (autoscalingtypes.LifecycleState, error) {
if lifecycle, found := m.instanceLifecycle[ref]; found {
return lifecycle, nil
}

return nil, fmt.Errorf("could not find instance %v", ref)
return "", fmt.Errorf("could not find instance %v", ref)
}

func (m *asgCache) SetAsgSize(asg *asg, size int) error {
Expand All @@ -261,12 +263,12 @@ func (m *asgCache) SetAsgSize(asg *asg, size int) error {
func (m *asgCache) setAsgSizeNoLock(asg *asg, size int) error {
params := &autoscaling.SetDesiredCapacityInput{
AutoScalingGroupName: aws.String(asg.Name),
DesiredCapacity: aws.Int64(int64(size)),
DesiredCapacity: aws.Int32(int32(size)),
HonorCooldown: aws.Bool(false),
}
klog.V(0).Infof("Setting asg %s size to %d", asg.Name, size)
start := time.Now()
_, err := m.awsService.SetDesiredCapacity(params)
_, err := m.awsService.SetDesiredCapacity(context.Background(), params)
observeAWSRequest("SetDesiredCapacity", err, start)
if err != nil {
return err
Expand Down Expand Up @@ -356,12 +358,11 @@ func (m *asgCache) DeleteInstances(instances []*AwsInstanceRef) error {
return err
}

if lifecycle != nil &&
*lifecycle == autoscaling.LifecycleStateTerminated ||
*lifecycle == autoscaling.LifecycleStateTerminating ||
*lifecycle == autoscaling.LifecycleStateTerminatingWait ||
*lifecycle == autoscaling.LifecycleStateTerminatingProceed {
klog.V(2).Infof("instance %s is already terminating in state %s, will skip instead", instance.Name, *lifecycle)
if lifecycle == autoscalingtypes.LifecycleStateTerminated ||
lifecycle == autoscalingtypes.LifecycleStateTerminating ||
lifecycle == autoscalingtypes.LifecycleStateTerminatingWait ||
lifecycle == autoscalingtypes.LifecycleStateTerminatingProceed {
klog.V(2).Infof("instance %s is already terminating in state %s, will skip instead", instance.Name, lifecycle)
continue
}

Expand All @@ -370,12 +371,12 @@ func (m *asgCache) DeleteInstances(instances []*AwsInstanceRef) error {
ShouldDecrementDesiredCapacity: aws.Bool(true),
}
start := time.Now()
resp, err := m.awsService.TerminateInstanceInAutoScalingGroup(params)
resp, err := m.awsService.TerminateInstanceInAutoScalingGroup(context.Background(), params)
observeAWSRequest("TerminateInstanceInAutoScalingGroup", err, start)
if err != nil {
return err
}
klog.V(4).Infof(*resp.Activity.Description)
klog.V(4).Infof("Terminated instance %s in autoscaling group: %s", instance.Name, aws.ToString(resp.Activity.Description))

// Proactively decrement the size so autoscaler makes better decisions
commonAsg.curSize--
Expand Down Expand Up @@ -421,7 +422,7 @@ func (m *asgCache) regenerate() error {
newInstanceToAsgCache := make(map[AwsInstanceRef]*asg)
newAsgToInstancesCache := make(map[AwsRef][]AwsInstanceRef)
newInstanceStatusMap := make(map[AwsInstanceRef]*string)
newInstanceLifecycleMap := make(map[AwsInstanceRef]*string)
newInstanceLifecycleMap := make(map[AwsInstanceRef]autoscalingtypes.LifecycleState)

// Fetch details of all ASGs
refreshNames := m.buildAsgNames()
Expand All @@ -448,7 +449,7 @@ func (m *asgCache) regenerate() error {
// Register or update ASGs
exists := make(map[AwsRef]bool)
for _, group := range groups {
asg, err := m.buildAsgFromAWS(group)
asg, err := m.buildAsgFromAWS(&group)
if err != nil {
return err
}
Expand Down Expand Up @@ -497,19 +498,21 @@ func (m *asgCache) regenerate() error {
return nil
}

func (m *asgCache) createPlaceholdersForDesiredNonStartedInstances(groups []*autoscaling.Group) []*autoscaling.Group {
func (m *asgCache) createPlaceholdersForDesiredNonStartedInstances(groups []autoscalingtypes.AutoScalingGroup) []autoscalingtypes.AutoScalingGroup {
var updatedGroups []autoscalingtypes.AutoScalingGroup
for _, g := range groups {
desired := *g.DesiredCapacity
realInstances := int64(len(g.Instances))
realInstances := int32(len(g.Instances))
if desired <= realInstances {
updatedGroups = append(updatedGroups, g)
continue
}

klog.V(4).Infof("Instance group %s has only %d instances created while requested count is %d. "+
"Creating placeholder instances.", *g.AutoScalingGroupName, realInstances, desired)

healthStatus := ""
isAvailable, err := m.isNodeGroupAvailable(g)
isAvailable, err := m.isNodeGroupAvailable(&g)
if err != nil {
klog.V(4).Infof("Could not check instance availability, creating placeholder node anyways: %v", err)
} else if !isAvailable {
Expand All @@ -519,23 +522,24 @@ func (m *asgCache) createPlaceholdersForDesiredNonStartedInstances(groups []*aut

for i := realInstances; i < desired; i++ {
id := fmt.Sprintf("%s-%s-%d", placeholderInstanceNamePrefix, *g.AutoScalingGroupName, i)
g.Instances = append(g.Instances, &autoscaling.Instance{
g.Instances = append(g.Instances, autoscalingtypes.Instance{
InstanceId: &id,
AvailabilityZone: g.AvailabilityZones[0],
AvailabilityZone: aws.String(g.AvailabilityZones[0]),
HealthStatus: &healthStatus,
})
}
updatedGroups = append(updatedGroups, g)
}
return groups
return updatedGroups
}

func (m *asgCache) isNodeGroupAvailable(group *autoscaling.Group) (bool, error) {
func (m *asgCache) isNodeGroupAvailable(group *autoscalingtypes.AutoScalingGroup) (bool, error) {
input := &autoscaling.DescribeScalingActivitiesInput{
AutoScalingGroupName: group.AutoScalingGroupName,
}

start := time.Now()
response, err := m.awsService.DescribeScalingActivities(input)
response, err := m.awsService.DescribeScalingActivities(context.Background(), input)
observeAWSRequest("DescribeScalingActivities", err, start)
if err != nil {
return true, err // If we can't describe the scaling activities we assume the node group is available
Expand All @@ -547,8 +551,8 @@ func (m *asgCache) isNodeGroupAvailable(group *autoscaling.Group) (bool, error)
lut := a.lastUpdateTime
if activity.StartTime.Before(lut) {
break
} else if *activity.StatusCode == "Failed" {
klog.Warningf("ASG %s scaling failed with %s", asgRef.Name, *activity)
} else if activity.StatusCode == autoscalingtypes.ScalingActivityStatusCodeFailed {
klog.Warningf("ASG %s scaling failed with description: %s", asgRef.Name, aws.ToString(activity.Description))
return false, nil
}
} else {
Expand All @@ -558,11 +562,11 @@ func (m *asgCache) isNodeGroupAvailable(group *autoscaling.Group) (bool, error)
return true, nil
}

func (m *asgCache) buildAsgFromAWS(g *autoscaling.Group) (*asg, error) {
func (m *asgCache) buildAsgFromAWS(g *autoscalingtypes.AutoScalingGroup) (*asg, error) {
spec := dynamic.NodeGroupSpec{
Name: aws.StringValue(g.AutoScalingGroupName),
MinSize: int(aws.Int64Value(g.MinSize)),
MaxSize: int(aws.Int64Value(g.MaxSize)),
Name: aws.ToString(g.AutoScalingGroupName),
MinSize: int(aws.ToInt32(g.MinSize)),
MaxSize: int(aws.ToInt32(g.MaxSize)),
SupportScaleToZero: scaleToZeroSupported,
}

Expand All @@ -575,9 +579,9 @@ func (m *asgCache) buildAsgFromAWS(g *autoscaling.Group) (*asg, error) {
minSize: spec.MinSize,
maxSize: spec.MaxSize,

curSize: int(aws.Int64Value(g.DesiredCapacity)),
AvailabilityZones: aws.StringValueSlice(g.AvailabilityZones),
LaunchConfigurationName: aws.StringValue(g.LaunchConfigurationName),
curSize: int(aws.ToInt32(g.DesiredCapacity)),
AvailabilityZones: g.AvailabilityZones,
LaunchConfigurationName: aws.ToString(g.LaunchConfigurationName),
Tags: g.Tags,
}

Expand All @@ -586,8 +590,8 @@ func (m *asgCache) buildAsgFromAWS(g *autoscaling.Group) (*asg, error) {
}

if g.MixedInstancesPolicy != nil {
getInstanceTypes := func(overrides []*autoscaling.LaunchTemplateOverrides) []string {
res := []string{}
getInstanceTypes := func(overrides []autoscalingtypes.LaunchTemplateOverrides) []string {
var res []string
for _, override := range overrides {
if override.InstanceType != nil {
res = append(res, *override.InstanceType)
Expand All @@ -596,7 +600,7 @@ func (m *asgCache) buildAsgFromAWS(g *autoscaling.Group) (*asg, error) {
return res
}

getInstanceTypeRequirements := func(overrides []*autoscaling.LaunchTemplateOverrides) *autoscaling.InstanceRequirements {
getInstanceTypeRequirements := func(overrides []autoscalingtypes.LaunchTemplateOverrides) *autoscalingtypes.InstanceRequirements {
if len(overrides) == 1 && overrides[0].InstanceRequirements != nil {
return overrides[0].InstanceRequirements
}
Expand Down Expand Up @@ -625,8 +629,8 @@ func (m *asgCache) buildAsgFromAWS(g *autoscaling.Group) (*asg, error) {
return asg, nil
}

func (m *asgCache) getInstanceRequirementsFromMixedInstancesPolicy(policy *mixedInstancesPolicy) (*ec2.InstanceRequirements, error) {
instanceRequirements := &ec2.InstanceRequirements{}
func (m *asgCache) getInstanceRequirementsFromMixedInstancesPolicy(policy *mixedInstancesPolicy) (*ec2types.InstanceRequirements, error) {
instanceRequirements := &ec2types.InstanceRequirements{}
if policy.instanceRequirementsOverrides != nil {
var err error
instanceRequirements, err = m.awsService.getEC2RequirementsFromAutoscaling(policy.instanceRequirementsOverrides)
Expand All @@ -646,11 +650,11 @@ func (m *asgCache) getInstanceRequirementsFromMixedInstancesPolicy(policy *mixed
return instanceRequirements, nil
}

func (m *asgCache) buildInstanceRefFromAWS(instance *autoscaling.Instance) AwsInstanceRef {
providerID := fmt.Sprintf("aws:///%s/%s", aws.StringValue(instance.AvailabilityZone), aws.StringValue(instance.InstanceId))
func (m *asgCache) buildInstanceRefFromAWS(instance autoscalingtypes.Instance) AwsInstanceRef {
providerID := fmt.Sprintf("aws:///%s/%s", aws.ToString(instance.AvailabilityZone), aws.ToString(instance.InstanceId))
return AwsInstanceRef{
ProviderID: providerID,
Name: aws.StringValue(instance.InstanceId),
Name: aws.ToString(instance.InstanceId),
}
}

Expand Down
Loading
Loading