Skip to content

Commit b35497d

Browse files
authored
Merge pull request #7012 from kmsarabu/cluster-autoscaler-release-1.28
PR#6911 Backport for 1.28: Fix/aws asg unsafe decommission #5829
2 parents 0c97874 + 423d0e6 commit b35497d

File tree

2 files changed

+192
-37
lines changed

2 files changed

+192
-37
lines changed

cluster-autoscaler/cloudprovider/aws/auto_scaling_groups.go

Lines changed: 80 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -308,45 +308,78 @@ func (m *asgCache) DeleteInstances(instances []*AwsInstanceRef) error {
308308
}
309309
}
310310

311+
placeHolderInstancesCount := m.GetPlaceHolderInstancesCount(instances)
312+
// Check if there are any placeholder instances in the list.
313+
if placeHolderInstancesCount > 0 {
314+
// Log the check for placeholders in the ASG.
315+
klog.V(4).Infof("Detected %d placeholder instance(s) in ASG %s",
316+
placeHolderInstancesCount, commonAsg.Name)
317+
318+
asgNames := []string{commonAsg.Name}
319+
asgDetail, err := m.awsService.getAutoscalingGroupsByNames(asgNames)
320+
321+
if err != nil {
322+
klog.Errorf("Error retrieving ASG details %s: %v", commonAsg.Name, err)
323+
return err
324+
}
325+
326+
activeInstancesInAsg := len(asgDetail[0].Instances)
327+
desiredCapacityInAsg := int(*asgDetail[0].DesiredCapacity)
328+
klog.V(4).Infof("asg %s has placeholders instances with desired capacity = %d and active instances = %d. updating ASG to match active instances count",
329+
commonAsg.Name, desiredCapacityInAsg, activeInstancesInAsg)
330+
331+
// If the difference between the active instances and the desired capacity is greater than 1,
332+
// it means that the ASG is under-provisioned and the desired capacity is not being reached.
333+
// In this case, we would reduce the size of ASG by the count of unprovisioned instances
334+
// which is equal to the total count of active instances in ASG
335+
336+
err = m.setAsgSizeNoLock(commonAsg, activeInstancesInAsg)
337+
338+
if err != nil {
339+
klog.Errorf("Error reducing ASG %s size to %d: %v", commonAsg.Name, activeInstancesInAsg, err)
340+
return err
341+
}
342+
}
343+
311344
for _, instance := range instances {
312-
// check if the instance is a placeholder - a requested instance that was never created by the node group
313-
// if it is, just decrease the size of the node group, as there's no specific instance we can remove
314-
if m.isPlaceholderInstance(instance) {
315-
klog.V(4).Infof("instance %s is detected as a placeholder, decreasing ASG requested size instead "+
316-
"of deleting instance", instance.Name)
317-
m.decreaseAsgSizeByOneNoLock(commonAsg)
318-
} else {
319-
// check if the instance is already terminating - if it is, don't bother terminating again
320-
// as doing so causes unnecessary API calls and can cause the curSize cached value to decrement
321-
// unnecessarily.
322-
lifecycle, err := m.findInstanceLifecycle(*instance)
323-
if err != nil {
324-
return err
325-
}
326345

327-
if lifecycle != nil &&
328-
*lifecycle == autoscaling.LifecycleStateTerminating ||
329-
*lifecycle == autoscaling.LifecycleStateTerminatingWait ||
330-
*lifecycle == autoscaling.LifecycleStateTerminatingProceed {
331-
klog.V(2).Infof("instance %s is already terminating, will skip instead", instance.Name)
332-
continue
333-
}
346+
if m.isPlaceholderInstance(instance) {
347+
// skipping placeholder as placeholder instances don't exist
348+
// and we have already reduced ASG size during placeholder check.
349+
continue
350+
}
351+
// check if the instance is already terminating - if it is, don't bother terminating again
352+
// as doing so causes unnecessary API calls and can cause the curSize cached value to decrement
353+
// unnecessarily.
354+
lifecycle, err := m.findInstanceLifecycle(*instance)
355+
if err != nil {
356+
return err
357+
}
334358

335-
params := &autoscaling.TerminateInstanceInAutoScalingGroupInput{
336-
InstanceId: aws.String(instance.Name),
337-
ShouldDecrementDesiredCapacity: aws.Bool(true),
338-
}
339-
start := time.Now()
340-
resp, err := m.awsService.TerminateInstanceInAutoScalingGroup(params)
341-
observeAWSRequest("TerminateInstanceInAutoScalingGroup", err, start)
342-
if err != nil {
343-
return err
344-
}
345-
klog.V(4).Infof(*resp.Activity.Description)
359+
if lifecycle != nil &&
360+
*lifecycle == autoscaling.LifecycleStateTerminated ||
361+
*lifecycle == autoscaling.LifecycleStateTerminating ||
362+
*lifecycle == autoscaling.LifecycleStateTerminatingWait ||
363+
*lifecycle == autoscaling.LifecycleStateTerminatingProceed {
364+
klog.V(2).Infof("instance %s is already terminating in state %s, will skip instead", instance.Name, *lifecycle)
365+
continue
366+
}
346367

347-
// Proactively decrement the size so autoscaler makes better decisions
348-
commonAsg.curSize--
368+
params := &autoscaling.TerminateInstanceInAutoScalingGroupInput{
369+
InstanceId: aws.String(instance.Name),
370+
ShouldDecrementDesiredCapacity: aws.Bool(true),
371+
}
372+
start := time.Now()
373+
resp, err := m.awsService.TerminateInstanceInAutoScalingGroup(params)
374+
observeAWSRequest("TerminateInstanceInAutoScalingGroup", err, start)
375+
if err != nil {
376+
return err
349377
}
378+
klog.V(4).Infof(*resp.Activity.Description)
379+
380+
// Proactively decrement the size so autoscaler makes better decisions
381+
commonAsg.curSize--
382+
350383
}
351384
return nil
352385
}
@@ -623,3 +656,16 @@ func (m *asgCache) buildInstanceRefFromAWS(instance *autoscaling.Instance) AwsIn
623656
func (m *asgCache) Cleanup() {
624657
close(m.interrupt)
625658
}
659+
660+
// GetPlaceHolderInstancesCount returns count of placeholder instances in the cache
661+
func (m *asgCache) GetPlaceHolderInstancesCount(instances []*AwsInstanceRef) int {
662+
663+
placeholderInstancesCount := 0
664+
for _, instance := range instances {
665+
if strings.HasPrefix(instance.Name, placeholderInstanceNamePrefix) {
666+
placeholderInstancesCount++
667+
668+
}
669+
}
670+
return placeholderInstancesCount
671+
}

cluster-autoscaler/cloudprovider/aws/aws_cloud_provider_test.go

Lines changed: 112 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,7 @@ limitations under the License.
1717
package aws
1818

1919
import (
20-
"testing"
21-
20+
"fmt"
2221
"github.com/stretchr/testify/assert"
2322
"github.com/stretchr/testify/mock"
2423
apiv1 "k8s.io/api/core/v1"
@@ -27,6 +26,7 @@ import (
2726
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider/aws/aws-sdk-go/aws"
2827
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider/aws/aws-sdk-go/service/autoscaling"
2928
"k8s.io/autoscaler/cluster-autoscaler/config"
29+
"testing"
3030
)
3131

3232
var testAwsManager = &AwsManager{
@@ -550,7 +550,7 @@ func TestDeleteNodesWithPlaceholder(t *testing.T) {
550550
err = asgs[0].DeleteNodes([]*apiv1.Node{node})
551551
assert.NoError(t, err)
552552
a.AssertNumberOfCalls(t, "SetDesiredCapacity", 1)
553-
a.AssertNumberOfCalls(t, "DescribeAutoScalingGroupsPages", 1)
553+
a.AssertNumberOfCalls(t, "DescribeAutoScalingGroupsPages", 2)
554554

555555
newSize, err := asgs[0].TargetSize()
556556
assert.NoError(t, err)
@@ -595,6 +595,115 @@ func TestDeleteNodesAfterMultipleRefreshes(t *testing.T) {
595595
assert.NoError(t, err)
596596
}
597597

598+
func TestDeleteNodesWithPlaceholderAndStaleCache(t *testing.T) {
599+
// This test validates the scenario where ASG cache is not in sync with Autoscaling configuration.
600+
// we are taking an example where ASG size is 10, cache as 3 instances "i-0000", "i-0001" and "i-0002
601+
// But ASG has 6 instances i-0000 to i-10005. When DeleteInstances is called with 2 instances ("i-0000", "i-0001" )
602+
// and placeholders, CAS will terminate only these 2 instances after reducing ASG size by the count of placeholders
603+
604+
a := &autoScalingMock{}
605+
provider := testProvider(t, newTestAwsManagerWithAsgs(t, a, nil, []string{"1:10:test-asg"}))
606+
asgs := provider.NodeGroups()
607+
commonAsg := &asg{
608+
AwsRef: AwsRef{Name: asgs[0].Id()},
609+
minSize: asgs[0].MinSize(),
610+
maxSize: asgs[0].MaxSize(),
611+
}
612+
613+
// desired capacity will be set as 6 as ASG has 4 placeholders
614+
a.On("SetDesiredCapacity", &autoscaling.SetDesiredCapacityInput{
615+
AutoScalingGroupName: aws.String(asgs[0].Id()),
616+
DesiredCapacity: aws.Int64(6),
617+
HonorCooldown: aws.Bool(false),
618+
}).Return(&autoscaling.SetDesiredCapacityOutput{})
619+
620+
// Look up the current number of instances...
621+
var expectedInstancesCount int64 = 10
622+
a.On("DescribeAutoScalingGroupsPages",
623+
&autoscaling.DescribeAutoScalingGroupsInput{
624+
AutoScalingGroupNames: aws.StringSlice([]string{"test-asg"}),
625+
MaxRecords: aws.Int64(maxRecordsReturnedByAPI),
626+
},
627+
mock.AnythingOfType("func(*autoscaling.DescribeAutoScalingGroupsOutput, bool) bool"),
628+
).Run(func(args mock.Arguments) {
629+
fn := args.Get(1).(func(*autoscaling.DescribeAutoScalingGroupsOutput, bool) bool)
630+
fn(testNamedDescribeAutoScalingGroupsOutput("test-asg", expectedInstancesCount, "i-0000", "i-0001", "i-0002", "i-0003", "i-0004", "i-0005"), false)
631+
632+
expectedInstancesCount = 4
633+
}).Return(nil)
634+
635+
a.On("DescribeScalingActivities",
636+
&autoscaling.DescribeScalingActivitiesInput{
637+
AutoScalingGroupName: aws.String("test-asg"),
638+
},
639+
).Return(&autoscaling.DescribeScalingActivitiesOutput{}, nil)
640+
641+
provider.Refresh()
642+
643+
initialSize, err := asgs[0].TargetSize()
644+
assert.NoError(t, err)
645+
assert.Equal(t, 10, initialSize)
646+
647+
var awsInstanceRefs []AwsInstanceRef
648+
instanceToAsg := make(map[AwsInstanceRef]*asg)
649+
650+
var nodes []*apiv1.Node
651+
for i := 3; i <= 9; i++ {
652+
providerId := fmt.Sprintf("aws:///us-east-1a/i-placeholder-test-asg-%d", i)
653+
node := &apiv1.Node{
654+
Spec: apiv1.NodeSpec{
655+
ProviderID: providerId,
656+
},
657+
}
658+
nodes = append(nodes, node)
659+
awsInstanceRef := AwsInstanceRef{
660+
ProviderID: providerId,
661+
Name: fmt.Sprintf("i-placeholder-test-asg-%d", i),
662+
}
663+
awsInstanceRefs = append(awsInstanceRefs, awsInstanceRef)
664+
instanceToAsg[awsInstanceRef] = commonAsg
665+
}
666+
667+
for i := 0; i <= 2; i++ {
668+
providerId := fmt.Sprintf("aws:///us-east-1a/i-000%d", i)
669+
node := &apiv1.Node{
670+
Spec: apiv1.NodeSpec{
671+
ProviderID: providerId,
672+
},
673+
}
674+
// only setting 2 instances to be terminated out of 3 active instances
675+
if i < 2 {
676+
nodes = append(nodes, node)
677+
a.On("TerminateInstanceInAutoScalingGroup", &autoscaling.TerminateInstanceInAutoScalingGroupInput{
678+
InstanceId: aws.String(fmt.Sprintf("i-000%d", i)),
679+
ShouldDecrementDesiredCapacity: aws.Bool(true),
680+
}).Return(&autoscaling.TerminateInstanceInAutoScalingGroupOutput{
681+
Activity: &autoscaling.Activity{Description: aws.String("Deleted instance")},
682+
})
683+
}
684+
awsInstanceRef := AwsInstanceRef{
685+
ProviderID: providerId,
686+
Name: fmt.Sprintf("i-000%d", i),
687+
}
688+
awsInstanceRefs = append(awsInstanceRefs, awsInstanceRef)
689+
instanceToAsg[awsInstanceRef] = commonAsg
690+
}
691+
692+
// modifying provider to bring disparity between ASG and cache
693+
provider.awsManager.asgCache.asgToInstances[AwsRef{Name: "test-asg"}] = awsInstanceRefs
694+
provider.awsManager.asgCache.instanceToAsg = instanceToAsg
695+
696+
// calling delete nodes 2 nodes and remaining placeholders
697+
err = asgs[0].DeleteNodes(nodes)
698+
assert.NoError(t, err)
699+
a.AssertNumberOfCalls(t, "SetDesiredCapacity", 1)
700+
a.AssertNumberOfCalls(t, "DescribeAutoScalingGroupsPages", 2)
701+
702+
// This ensures only 2 instances are terminated which are mocked in this unit test
703+
a.AssertNumberOfCalls(t, "TerminateInstanceInAutoScalingGroup", 2)
704+
705+
}
706+
598707
func TestGetResourceLimiter(t *testing.T) {
599708
mockAutoScaling := &autoScalingMock{}
600709
mockEC2 := &ec2Mock{}

0 commit comments

Comments
 (0)