Skip to content

Commit bb046ee

Browse files
committed
CA: stop passing NotStarted nodes as scale-down candidates
Without this, with aggressive settings, scale-down could be removing NotStarted nodes before they have a chance to become ready (the duration of which should be unrelated to the scale-down settings).
1 parent 6978ff8 commit bb046ee

File tree

2 files changed

+199
-14
lines changed

2 files changed

+199
-14
lines changed

cluster-autoscaler/core/static_autoscaler.go

Lines changed: 32 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -467,6 +467,20 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) errors.AutoscalerError
467467
return errors.ToAutoscalerError(errors.InternalError, err)
468468
}
469469
}
470+
// Nodes that are registered but not yet ready are classified as NotStarted by CSR. Above, we inject "upcoming" replacement
471+
// nodes for them that are faked to appear ready, so that we can pack unschedulable pods on them and not trigger another scale-up.
472+
// The initial nodes have to be filtered out of the all nodes list so that scale-down can't consider them as candidates. Otherwise, with
473+
// aggressive scale-down settings, we could be removing the nodes before they have a chance to first become ready (the duration
474+
// of which should be unrelated to the scale-down settings).
475+
allNodes = subtractNodesByName(allNodes, a.clusterStateRegistry.GetClusterReadiness().NotStarted)
476+
// Remove the nodes from the snapshot as well so that the state is consistent.
477+
for _, notStartedNodeName := range a.clusterStateRegistry.GetClusterReadiness().NotStarted {
478+
err := a.ClusterSnapshot.RemoveNode(notStartedNodeName)
479+
if err != nil {
480+
klog.Errorf("Failed to remove NotStarted node %s from cluster snapshot: %v", notStartedNodeName, err)
481+
return errors.ToAutoscalerError(errors.InternalError, err)
482+
}
483+
}
470484

471485
l, err := a.ClusterSnapshot.NodeInfos().List()
472486
if err != nil {
@@ -921,17 +935,29 @@ func countsByReason(nodes []*simulator.UnremovableNode) map[simulator.Unremovabl
921935
return counts
922936
}
923937

924-
func subtractNodes(a []*apiv1.Node, b []*apiv1.Node) []*apiv1.Node {
938+
func subtractNodesByName(nodes []*apiv1.Node, namesToRemove []string) []*apiv1.Node {
925939
var c []*apiv1.Node
926-
namesToDrop := make(map[string]bool)
927-
for _, n := range b {
928-
namesToDrop[n.Name] = true
940+
removeSet := make(map[string]bool)
941+
for _, name := range namesToRemove {
942+
removeSet[name] = true
929943
}
930-
for _, n := range a {
931-
if namesToDrop[n.Name] {
944+
for _, n := range nodes {
945+
if removeSet[n.Name] {
932946
continue
933947
}
934948
c = append(c, n)
935949
}
936950
return c
937951
}
952+
953+
func subtractNodes(a []*apiv1.Node, b []*apiv1.Node) []*apiv1.Node {
954+
return subtractNodesByName(a, nodeNames(b))
955+
}
956+
957+
func nodeNames(ns []*apiv1.Node) []string {
958+
names := make([]string, len(ns))
959+
for i, node := range ns {
960+
names[i] = node.Name
961+
}
962+
return names
963+
}

cluster-autoscaler/core/static_autoscaler_test.go

Lines changed: 167 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,9 @@ import (
4242
"k8s.io/autoscaler/cluster-autoscaler/estimator"
4343
ca_processors "k8s.io/autoscaler/cluster-autoscaler/processors"
4444
"k8s.io/autoscaler/cluster-autoscaler/simulator"
45+
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot"
46+
"k8s.io/autoscaler/cluster-autoscaler/simulator/utilization"
47+
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
4548
"k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
4649
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
4750
. "k8s.io/autoscaler/cluster-autoscaler/utils/test"
@@ -1273,6 +1276,167 @@ func TestStaticAutoscalerInstanceCreationErrors(t *testing.T) {
12731276
nodeGroupC.AssertNumberOfCalls(t, "DeleteNodes", 0)
12741277
}
12751278

1279+
type candidateTrackingFakePlanner struct {
1280+
lastCandidateNodes map[string]bool
1281+
}
1282+
1283+
func (f *candidateTrackingFakePlanner) UpdateClusterState(podDestinations, scaleDownCandidates []*apiv1.Node, as scaledown.ActuationStatus, pdb []*policyv1.PodDisruptionBudget, currentTime time.Time) errors.AutoscalerError {
1284+
f.lastCandidateNodes = map[string]bool{}
1285+
for _, node := range scaleDownCandidates {
1286+
f.lastCandidateNodes[node.Name] = true
1287+
}
1288+
return nil
1289+
}
1290+
1291+
func (f *candidateTrackingFakePlanner) CleanUpUnneededNodes() {
1292+
}
1293+
1294+
func (f *candidateTrackingFakePlanner) NodesToDelete(currentTime time.Time) (empty, needDrain []*apiv1.Node) {
1295+
return nil, nil
1296+
}
1297+
1298+
func (f *candidateTrackingFakePlanner) UnneededNodes() []*apiv1.Node {
1299+
return nil
1300+
}
1301+
1302+
func (f *candidateTrackingFakePlanner) UnremovableNodes() []*simulator.UnremovableNode {
1303+
return nil
1304+
}
1305+
1306+
func (f *candidateTrackingFakePlanner) NodeUtilizationMap() map[string]utilization.Info {
1307+
return nil
1308+
}
1309+
1310+
func assertSnapshotNodeCount(t *testing.T, snapshot clustersnapshot.ClusterSnapshot, wantCount int) {
1311+
nodeInfos, err := snapshot.NodeInfos().List()
1312+
assert.NoError(t, err)
1313+
assert.Len(t, nodeInfos, wantCount)
1314+
}
1315+
1316+
func assertNodesNotInSnapshot(t *testing.T, snapshot clustersnapshot.ClusterSnapshot, nodeNames map[string]bool) {
1317+
nodeInfos, err := snapshot.NodeInfos().List()
1318+
assert.NoError(t, err)
1319+
for _, nodeInfo := range nodeInfos {
1320+
assert.NotContains(t, nodeNames, nodeInfo.Node().Name)
1321+
}
1322+
}
1323+
1324+
func assertNodesInSnapshot(t *testing.T, snapshot clustersnapshot.ClusterSnapshot, nodeNames map[string]bool) {
1325+
nodeInfos, err := snapshot.NodeInfos().List()
1326+
assert.NoError(t, err)
1327+
snapshotNodeNames := map[string]bool{}
1328+
for _, nodeInfo := range nodeInfos {
1329+
snapshotNodeNames[nodeInfo.Node().Name] = true
1330+
}
1331+
for nodeName := range nodeNames {
1332+
assert.Contains(t, snapshotNodeNames, nodeName)
1333+
}
1334+
}
1335+
1336+
func TestStaticAutoscalerUpcomingScaleDownCandidates(t *testing.T) {
1337+
startTime := time.Time{}
1338+
1339+
// Generate a number of ready and unready nodes created at startTime, spread across multiple node groups.
1340+
provider := testprovider.NewTestCloudProvider(nil, nil)
1341+
allNodeNames := map[string]bool{}
1342+
readyNodeNames := map[string]bool{}
1343+
notReadyNodeNames := map[string]bool{}
1344+
var allNodes []*apiv1.Node
1345+
var readyNodes []*apiv1.Node
1346+
1347+
readyNodesCount := 4
1348+
unreadyNodesCount := 2
1349+
nodeGroupCount := 2
1350+
for ngNum := 0; ngNum < nodeGroupCount; ngNum++ {
1351+
ngName := fmt.Sprintf("ng-%d", ngNum)
1352+
provider.AddNodeGroup(ngName, 0, 1000, readyNodesCount+unreadyNodesCount)
1353+
1354+
for i := 0; i < readyNodesCount; i++ {
1355+
node := BuildTestNode(fmt.Sprintf("%s-ready-node-%d", ngName, i), 2000, 1000)
1356+
node.CreationTimestamp = metav1.NewTime(startTime)
1357+
SetNodeReadyState(node, true, startTime)
1358+
provider.AddNode(ngName, node)
1359+
1360+
allNodes = append(allNodes, node)
1361+
allNodeNames[node.Name] = true
1362+
1363+
readyNodes = append(readyNodes, node)
1364+
readyNodeNames[node.Name] = true
1365+
}
1366+
for i := 0; i < unreadyNodesCount; i++ {
1367+
node := BuildTestNode(fmt.Sprintf("%s-unready-node-%d", ngName, i), 2000, 1000)
1368+
node.CreationTimestamp = metav1.NewTime(startTime)
1369+
SetNodeReadyState(node, false, startTime)
1370+
provider.AddNode(ngName, node)
1371+
1372+
allNodes = append(allNodes, node)
1373+
allNodeNames[node.Name] = true
1374+
1375+
notReadyNodeNames[node.Name] = true
1376+
}
1377+
}
1378+
1379+
// Create fake listers for the generated nodes, nothing returned by the rest (but the ones used in the tested path have to be defined).
1380+
allNodeLister := kubernetes.NewTestNodeLister(allNodes)
1381+
readyNodeLister := kubernetes.NewTestNodeLister(readyNodes)
1382+
daemonSetLister, err := kubernetes.NewTestDaemonSetLister(nil)
1383+
assert.NoError(t, err)
1384+
listerRegistry := kube_util.NewListerRegistry(allNodeLister, readyNodeLister, kubernetes.NewTestPodLister(nil), kubernetes.NewTestPodLister(nil), kubernetes.NewTestPodDisruptionBudgetLister(nil), daemonSetLister, nil, nil, nil, nil)
1385+
1386+
// Create context with minimal options that guarantee we reach the tested logic.
1387+
// We're only testing the input to UpdateClusterState which should be called whenever scale-down is enabled, other options shouldn't matter.
1388+
options := config.AutoscalingOptions{ScaleDownEnabled: true}
1389+
processorCallbacks := newStaticAutoscalerProcessorCallbacks()
1390+
ctx, err := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, listerRegistry, provider, processorCallbacks, nil)
1391+
assert.NoError(t, err)
1392+
1393+
// Create CSR with unhealthy cluster protection effectively disabled, to guarantee we reach the tested logic.
1394+
csrConfig := clusterstate.ClusterStateRegistryConfig{OkTotalUnreadyCount: nodeGroupCount * unreadyNodesCount}
1395+
csr := clusterstate.NewClusterStateRegistry(provider, csrConfig, ctx.LogRecorder, NewBackoff())
1396+
1397+
// Setting the Actuator is necessary for testing any scale-down logic, it shouldn't have anything to do in this test.
1398+
actuator := actuation.NewActuator(&ctx, csr, deletiontracker.NewNodeDeletionTracker(0*time.Second), simulator.NodeDeleteOptions{})
1399+
ctx.ScaleDownActuator = actuator
1400+
1401+
// Fake planner that keeps track of the scale-down candidates passed to UpdateClusterState.
1402+
planner := &candidateTrackingFakePlanner{}
1403+
1404+
autoscaler := &StaticAutoscaler{
1405+
AutoscalingContext: &ctx,
1406+
clusterStateRegistry: csr,
1407+
scaleDownActuator: actuator,
1408+
scaleDownPlanner: planner,
1409+
processors: NewTestProcessors(&ctx),
1410+
processorCallbacks: processorCallbacks,
1411+
}
1412+
1413+
// RunOnce run right when the nodes are created. Ready nodes should be passed as scale-down candidates, unready nodes should be classified as
1414+
// NotStarted and not passed as scale-down candidates (or inserted into the cluster snapshot). The fake upcoming nodes also shouldn't be passed,
1415+
// but they should be inserted into the snapshot.
1416+
err = autoscaler.RunOnce(startTime)
1417+
assert.NoError(t, err)
1418+
assert.Equal(t, readyNodeNames, planner.lastCandidateNodes)
1419+
assertNodesInSnapshot(t, autoscaler.ClusterSnapshot, readyNodeNames)
1420+
assertNodesNotInSnapshot(t, autoscaler.ClusterSnapshot, notReadyNodeNames)
1421+
assertSnapshotNodeCount(t, autoscaler.ClusterSnapshot, len(allNodeNames)) // Ready nodes + fake upcoming copies for unready nodes.
1422+
1423+
// RunOnce run in the last moment when unready nodes are still classified as NotStarted - assertions the same as above.
1424+
err = autoscaler.RunOnce(startTime.Add(clusterstate.MaxNodeStartupTime).Add(-time.Second))
1425+
assert.NoError(t, err)
1426+
assert.Equal(t, readyNodeNames, planner.lastCandidateNodes)
1427+
assertNodesInSnapshot(t, autoscaler.ClusterSnapshot, readyNodeNames)
1428+
assertNodesNotInSnapshot(t, autoscaler.ClusterSnapshot, notReadyNodeNames)
1429+
assertSnapshotNodeCount(t, autoscaler.ClusterSnapshot, len(allNodeNames)) // Ready nodes + fake upcoming copies for unready nodes.
1430+
1431+
// RunOnce run in the first moment when unready nodes exceed the startup threshold, stop being classified as NotStarted, and start being classified
1432+
// Unready instead. The unready nodes should be passed as scale-down candidates at this point, and inserted into the snapshot. Fake upcoming
1433+
// nodes should no longer be inserted.
1434+
err = autoscaler.RunOnce(startTime.Add(clusterstate.MaxNodeStartupTime).Add(time.Second))
1435+
assert.Equal(t, allNodeNames, planner.lastCandidateNodes)
1436+
assertNodesInSnapshot(t, autoscaler.ClusterSnapshot, allNodeNames)
1437+
assertSnapshotNodeCount(t, autoscaler.ClusterSnapshot, len(allNodeNames)) // Ready nodes + actual unready nodes.
1438+
}
1439+
12761440
func TestStaticAutoscalerProcessorCallbacks(t *testing.T) {
12771441
processorCallbacks := newStaticAutoscalerProcessorCallbacks()
12781442
assert.Equal(t, false, processorCallbacks.disableScaleDownForLoop)
@@ -1426,6 +1590,9 @@ func TestSubtractNodes(t *testing.T) {
14261590
for _, tc := range testCases {
14271591
got := subtractNodes(tc.a, tc.b)
14281592
assert.Equal(t, nodeNames(got), nodeNames(tc.c))
1593+
1594+
got = subtractNodesByName(tc.a, nodeNames(tc.b))
1595+
assert.Equal(t, nodeNames(got), nodeNames(tc.c))
14291596
}
14301597
}
14311598

@@ -1526,14 +1693,6 @@ func TestFilterOutYoungPods(t *testing.T) {
15261693
}
15271694
}
15281695

1529-
func nodeNames(ns []*apiv1.Node) []string {
1530-
names := make([]string, len(ns))
1531-
for i, node := range ns {
1532-
names[i] = node.Name
1533-
}
1534-
return names
1535-
}
1536-
15371696
func waitForDeleteToFinish(t *testing.T, deleteFinished <-chan bool) {
15381697
select {
15391698
case <-deleteFinished:

0 commit comments

Comments
 (0)