Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Limit the rate of adding new uneeded nodes
This is an optimization to avoid spending too much time in scale down
simulation. The main idea is that it doesn't make sense to add new
unneeded nodes at a rate that is higher than the rate at which we can
delete them afterwards.
  • Loading branch information
x13n committed Mar 2, 2023
commit 5e59ae7c601fa20665a7f5856c3c46698d1ddeb6
60 changes: 57 additions & 3 deletions cluster-autoscaler/core/scaledown/planner/planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,6 @@ import (
klog "k8s.io/klog/v2"
)

const unneededNodesLimit = 1000

type eligibilityChecker interface {
FilterOutUnremovable(context *context.AutoscalingContext, scaleDownCandidates []*apiv1.Node, timestamp time.Time, unremovableNodes *unremovable.Nodes) ([]string, map[string]utilization.Info, []*simulator.UnremovableNode)
}
Expand All @@ -68,6 +66,7 @@ type Planner struct {
rs removalSimulator
actuationInjector *scheduling.HintingSimulator
latestUpdate time.Time
minUpdateInterval time.Duration
eligibilityChecker eligibilityChecker
nodeUtilizationMap map[string]utilization.Info
actuationStatus scaledown.ActuationStatus
Expand All @@ -79,6 +78,10 @@ type Planner struct {
// New creates a new Planner object.
func New(context *context.AutoscalingContext, processors *processors.AutoscalingProcessors, deleteOptions simulator.NodeDeleteOptions) *Planner {
resourceLimitsFinder := resource.NewLimitsFinder(processors.CustomResourcesProcessor)
minUpdateInterval := context.AutoscalingOptions.NodeGroupDefaults.ScaleDownUnneededTime
if minUpdateInterval == 0*time.Nanosecond {
minUpdateInterval = 1 * time.Nanosecond
}
return &Planner{
context: context,
unremovableNodes: unremovable.NewNodes(),
Expand All @@ -90,13 +93,18 @@ func New(context *context.AutoscalingContext, processors *processors.Autoscaling
resourceLimitsFinder: resourceLimitsFinder,
cc: newControllerReplicasCalculator(context.ListerRegistry),
scaleDownSetProcessor: processors.ScaleDownSetProcessor,
minUpdateInterval: minUpdateInterval,
}
}

// UpdateClusterState needs to be periodically invoked to provide Planner with
// up-to-date information about the cluster.
// Planner will evaluate scaleDownCandidates in the order provided here.
func (p *Planner) UpdateClusterState(podDestinations, scaleDownCandidates []*apiv1.Node, as scaledown.ActuationStatus, currentTime time.Time) errors.AutoscalerError {
updateInterval := currentTime.Sub(p.latestUpdate)
if updateInterval < p.minUpdateInterval {
p.minUpdateInterval = updateInterval
}
p.latestUpdate = currentTime
p.actuationStatus = as
// Avoid persisting changes done by the simulation.
Expand Down Expand Up @@ -254,10 +262,14 @@ func (p *Planner) categorizeNodes(podDestinations map[string]bool, scaleDownCand
timer := time.NewTimer(p.context.ScaleDownSimulationTimeout)

for i, node := range currentlyUnneededNodeNames {
if timedOut(timer) || len(removableList) >= unneededNodesLimit {
if timedOut(timer) {
klog.Warningf("%d out of %d nodes skipped in scale down simulation due to timeout.", len(currentlyUnneededNodeNames)-i, len(currentlyUnneededNodeNames))
break
}
if len(removableList) >= p.unneededNodesLimit() {
klog.V(4).Infof("%d out of %d nodes skipped in scale down simulation: there are already %d unneeded nodes so no point in looking for more.", len(currentlyUnneededNodeNames)-i, len(currentlyUnneededNodeNames), len(removableList))
break
}
removable, unremovable := p.rs.SimulateNodeRemoval(node, podDestinations, p.latestUpdate, p.context.RemainingPdbTracker.GetPdbs())
if removable != nil {
_, inParallel, _ := p.context.RemainingPdbTracker.CanRemovePods(removable.PodsToReschedule)
Expand All @@ -279,6 +291,48 @@ func (p *Planner) categorizeNodes(podDestinations map[string]bool, scaleDownCand
}
}

// unneededNodesLimit returns the number of nodes after which calculating more
// unneeded nodes is a waste of time. The reasoning behind it is essentially as
// follows.
// If the nodes are being removed instantly, then during each iteration we're
// going to delete up to MaxScaleDownParallelism nodes. Therefore, it doesn't
// really make sense to add more unneeded nodes than that.
// Let N = MaxScaleDownParallelism. When there are no unneeded nodes, we only
// need to find N of them in the first iteration. Once the unneeded time
// accumulates for them, only up to N will get deleted in a single iteration.
// When there are >0 unneeded nodes, we only need to add N more: once the first
// N will be deleted, we'll need another iteration for the next N nodes to get
// deleted.
// Of course, a node may stop being unneeded at any given time. To prevent
// slowdown stemming from having too little unneeded nodes, we're adding an
// extra buffer of N nodes. Note that we don't have to be super precise about
// the buffer size - if it is too small, we'll simply remove less than N nodes
// in one iteration.
// Finally, we know that in practice nodes are not removed instantly,
// especially when they require draining, so incrementing the limit by N every
// loop may in practice lead the limit to increase too much after a number of
// loops. To help with that, we can put another, not incremental upper bound on
// the limit: with max unneded time U and loop interval I, we're going to have
// up to U/I loops before a node is removed. This means that the total number
// of unneeded nodes shouldn't really exceed N*U/I - scale down will not be
// able to keep up with removing them anyway.
func (p *Planner) unneededNodesLimit() int {
n := p.context.AutoscalingOptions.MaxScaleDownParallelism
extraBuffer := n
limit := len(p.unneededNodes.AsList()) + n + extraBuffer
// TODO(x13n): Use moving average instead of min.
loopInterval := int64(p.minUpdateInterval)
u := int64(p.context.AutoscalingOptions.NodeGroupDefaults.ScaleDownUnneededTime)
if u < loopInterval {
u = loopInterval
}
upperBound := n*int(u/loopInterval) + extraBuffer
if upperBound < limit {
return upperBound
}
return limit
}

// getKnownOwnerRef returns ownerRef that is known by CA and CA knows the logic of how this controller recreates pods.
func getKnownOwnerRef(ownerRefs []metav1.OwnerReference) *metav1.OwnerReference {
for _, ownerRef := range ownerRefs {
Expand Down
116 changes: 115 additions & 1 deletion cluster-autoscaler/core/scaledown/planner/planner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package planner

import (
"fmt"
"testing"
"time"

Expand Down Expand Up @@ -480,7 +481,13 @@ func TestUpdateClusterState(t *testing.T) {
assert.NoError(t, err)
registry := kube_util.NewListerRegistry(nil, nil, nil, nil, nil, nil, nil, nil, rsLister, nil)
provider := testprovider.NewTestCloudProvider(nil, nil)
context, err := NewScaleTestAutoscalingContext(config.AutoscalingOptions{ScaleDownSimulationTimeout: 1 * time.Second}, &fake.Clientset{}, registry, provider, nil, nil)
context, err := NewScaleTestAutoscalingContext(config.AutoscalingOptions{
NodeGroupDefaults: config.NodeGroupAutoscalingOptions{
ScaleDownUnneededTime: 10 * time.Minute,
},
ScaleDownSimulationTimeout: 1 * time.Second,
MaxScaleDownParallelism: 10,
}, &fake.Clientset{}, registry, provider, nil, nil)
assert.NoError(t, err)
clustersnapshot.InitializeClusterSnapshotOrDie(t, context.ClusterSnapshot, tc.nodes, tc.pods)
deleteOptions := simulator.NodeDeleteOptions{}
Expand All @@ -506,6 +513,113 @@ func TestUpdateClusterState(t *testing.T) {
}
}

func TestUpdateClusterStatUnneededNodesLimit(t *testing.T) {
testCases := []struct {
name string
previouslyUnneeded int
nodes int
maxParallelism int
maxUnneededTime time.Duration
updateInterval time.Duration
wantUnneeded int
}{
{
name: "no unneeded, default settings",
previouslyUnneeded: 0,
nodes: 100,
maxParallelism: 10,
maxUnneededTime: 1 * time.Minute,
updateInterval: 10 * time.Second,
wantUnneeded: 20,
},
{
name: "some unneeded, default settings",
previouslyUnneeded: 3,
nodes: 100,
maxParallelism: 10,
maxUnneededTime: 1 * time.Minute,
updateInterval: 10 * time.Second,
wantUnneeded: 23,
},
{
name: "max unneeded, default settings",
previouslyUnneeded: 70,
nodes: 100,
maxParallelism: 10,
maxUnneededTime: 1 * time.Minute,
updateInterval: 10 * time.Second,
wantUnneeded: 70,
},
{
name: "too many unneeded, default settings",
previouslyUnneeded: 77,
nodes: 100,
maxParallelism: 10,
maxUnneededTime: 1 * time.Minute,
updateInterval: 10 * time.Second,
wantUnneeded: 70,
},
{
name: "instant kill nodes",
previouslyUnneeded: 0,
nodes: 100,
maxParallelism: 10,
maxUnneededTime: 0 * time.Minute,
updateInterval: 10 * time.Second,
wantUnneeded: 20,
},
{
name: "quick loops",
previouslyUnneeded: 13,
nodes: 100,
maxParallelism: 10,
maxUnneededTime: 1 * time.Minute,
updateInterval: 1 * time.Second,
wantUnneeded: 33,
},
{
name: "slow loops",
previouslyUnneeded: 13,
nodes: 100,
maxParallelism: 10,
maxUnneededTime: 1 * time.Minute,
updateInterval: 30 * time.Second,
wantUnneeded: 30,
},
}
for _, tc := range testCases {
tc := tc
t.Run(tc.name, func(t *testing.T) {
t.Parallel()
nodes := make([]*apiv1.Node, tc.nodes)
for i := 0; i < tc.nodes; i++ {
nodes[i] = BuildTestNode(fmt.Sprintf("n%d", i), 1000, 10)
}
previouslyUnneeded := make([]simulator.NodeToBeRemoved, tc.previouslyUnneeded)
for i := 0; i < tc.previouslyUnneeded; i++ {
previouslyUnneeded[i] = simulator.NodeToBeRemoved{Node: nodes[i]}
}
provider := testprovider.NewTestCloudProvider(nil, nil)
context, err := NewScaleTestAutoscalingContext(config.AutoscalingOptions{
NodeGroupDefaults: config.NodeGroupAutoscalingOptions{
ScaleDownUnneededTime: tc.maxUnneededTime,
},
ScaleDownSimulationTimeout: 1 * time.Hour,
MaxScaleDownParallelism: tc.maxParallelism,
}, &fake.Clientset{}, nil, provider, nil, nil)
assert.NoError(t, err)
clustersnapshot.InitializeClusterSnapshotOrDie(t, context.ClusterSnapshot, nodes, nil)
deleteOptions := simulator.NodeDeleteOptions{}
p := New(&context, NewTestProcessors(&context), deleteOptions)
p.eligibilityChecker = &fakeEligibilityChecker{eligible: asMap(nodeNames(nodes))}
p.minUpdateInterval = tc.updateInterval
p.unneededNodes.Update(previouslyUnneeded, time.Now())
assert.NoError(t, p.UpdateClusterState(nodes, nodes, &fakeActuationStatus{}, time.Now()))
assert.Equal(t, tc.wantUnneeded, len(p.unneededNodes.AsList()))
})
}
}

func generateReplicaSets(name string, replicas int32) []*appsv1.ReplicaSet {
return []*appsv1.ReplicaSet{
{
Expand Down