Skip to content

Commit b2943ad

Browse files
Add expire time for nodeInfo cache items
1 parent 994fbac commit b2943ad

File tree

7 files changed

+94
-23
lines changed

7 files changed

+94
-23
lines changed

cluster-autoscaler/core/scale_test_common.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,7 @@ func NewTestProcessors() *processors.AutoscalingProcessors {
148148
AutoscalingStatusProcessor: &status.NoOpAutoscalingStatusProcessor{},
149149
NodeGroupManager: nodegroups.NewDefaultNodeGroupManager(),
150150
NodeInfoProcessor: nodeinfos.NewDefaultNodeInfoProcessor(),
151-
TemplateNodeInfoProvider: nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(),
151+
TemplateNodeInfoProvider: nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil),
152152
NodeGroupConfigProcessor: nodegroupconfig.NewDefaultNodeGroupConfigProcessor(),
153153
CustomResourcesProcessor: customresources.NewDefaultCustomResourcesProcessor(),
154154
ActionableClusterProcessor: actionablecluster.NewDefaultActionableClusterProcessor(),

cluster-autoscaler/core/scale_up_test.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -530,7 +530,7 @@ func runSimpleScaleUpTest(t *testing.T, config *scaleTestConfig) *scaleTestResul
530530
}
531531
context.ExpanderStrategy = expander
532532

533-
nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider().Process(&context, nodes, []*appsv1.DaemonSet{}, nil, now)
533+
nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil).Process(&context, nodes, []*appsv1.DaemonSet{}, nil, now)
534534
clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, newBackoff())
535535
clusterState.UpdateNodes(nodes, nodeInfos, time.Now())
536536

@@ -691,7 +691,7 @@ func TestScaleUpUnhealthy(t *testing.T) {
691691
assert.NoError(t, err)
692692

693693
nodes := []*apiv1.Node{n1, n2}
694-
nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider().Process(&context, nodes, []*appsv1.DaemonSet{}, nil, now)
694+
nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil).Process(&context, nodes, []*appsv1.DaemonSet{}, nil, now)
695695
clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, newBackoff())
696696
clusterState.UpdateNodes(nodes, nodeInfos, time.Now())
697697
p3 := BuildTestPod("p-new", 550, 0)
@@ -732,7 +732,7 @@ func TestScaleUpNoHelp(t *testing.T) {
732732
assert.NoError(t, err)
733733

734734
nodes := []*apiv1.Node{n1}
735-
nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider().Process(&context, nodes, []*appsv1.DaemonSet{}, nil, now)
735+
nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil).Process(&context, nodes, []*appsv1.DaemonSet{}, nil, now)
736736
clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, newBackoff())
737737
clusterState.UpdateNodes(nodes, nodeInfos, time.Now())
738738
p3 := BuildTestPod("p-new", 500, 0)
@@ -799,7 +799,7 @@ func TestScaleUpBalanceGroups(t *testing.T) {
799799
context, err := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, listers, provider, nil, nil)
800800
assert.NoError(t, err)
801801

802-
nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider().Process(&context, nodes, []*appsv1.DaemonSet{}, nil, now)
802+
nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil).Process(&context, nodes, []*appsv1.DaemonSet{}, nil, now)
803803
clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, newBackoff())
804804
clusterState.UpdateNodes(nodes, nodeInfos, time.Now())
805805

@@ -867,7 +867,7 @@ func TestScaleUpAutoprovisionedNodeGroup(t *testing.T) {
867867
processors.NodeGroupManager = &mockAutoprovisioningNodeGroupManager{t, 0}
868868

869869
nodes := []*apiv1.Node{}
870-
nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider().Process(&context, nodes, []*appsv1.DaemonSet{}, nil, time.Now())
870+
nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil).Process(&context, nodes, []*appsv1.DaemonSet{}, nil, time.Now())
871871

872872
scaleUpStatus, err := ScaleUp(&context, processors, clusterState, []*apiv1.Pod{p1}, nodes, []*appsv1.DaemonSet{}, nodeInfos, nil)
873873
assert.NoError(t, err)
@@ -920,7 +920,7 @@ func TestScaleUpBalanceAutoprovisionedNodeGroups(t *testing.T) {
920920
processors.NodeGroupManager = &mockAutoprovisioningNodeGroupManager{t, 2}
921921

922922
nodes := []*apiv1.Node{}
923-
nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider().Process(&context, nodes, []*appsv1.DaemonSet{}, nil, time.Now())
923+
nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil).Process(&context, nodes, []*appsv1.DaemonSet{}, nil, time.Now())
924924

925925
scaleUpStatus, err := ScaleUp(&context, processors, clusterState, []*apiv1.Pod{p1, p2, p3}, nodes, []*appsv1.DaemonSet{}, nodeInfos, nil)
926926
assert.NoError(t, err)

cluster-autoscaler/main.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ import (
4646
"k8s.io/autoscaler/cluster-autoscaler/metrics"
4747
ca_processors "k8s.io/autoscaler/cluster-autoscaler/processors"
4848
"k8s.io/autoscaler/cluster-autoscaler/processors/nodegroupset"
49+
"k8s.io/autoscaler/cluster-autoscaler/processors/nodeinfosprovider"
4950
"k8s.io/autoscaler/cluster-autoscaler/simulator"
5051
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
5152
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
@@ -185,6 +186,7 @@ var (
185186

186187
emitPerNodeGroupMetrics = flag.Bool("emit-per-nodegroup-metrics", false, "If true, emit per node group metrics.")
187188
debuggingSnapshotEnabled = flag.Bool("debugging-snapshot-enabled", false, "Whether the debugging snapshot of cluster autoscaler feature is enabled")
189+
nodeInfoCacheExpireTime = flag.Duration("node-info-cache-expire-time", -1*time.Minute, "Node Info cache expire time for each item. If the value isn't set or negative, the cache items won't be expired")
188190
)
189191

190192
func createAutoscalingOptions() config.AutoscalingOptions {
@@ -322,6 +324,7 @@ func buildAutoscaler(debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter
322324
}
323325

324326
opts.Processors = ca_processors.DefaultProcessors()
327+
opts.Processors.TemplateNodeInfoProvider = nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nodeInfoCacheExpireTime)
325328
opts.Processors.PodListProcessor = core.NewFilterOutSchedulablePodListProcessor()
326329

327330
nodeInfoComparatorBuilder := nodegroupset.CreateGenericNodeInfoComparator

cluster-autoscaler/processors/nodeinfosprovider/mixed_nodeinfos_processor.go

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -36,17 +36,31 @@ import (
3636

3737
const stabilizationDelay = 1 * time.Minute
3838

39+
type cacheItem struct {
40+
nodeInfo *schedulerframework.NodeInfo
41+
added time.Time
42+
}
43+
3944
// MixedTemplateNodeInfoProvider build nodeInfos from the cluster's nodes and node groups.
4045
type MixedTemplateNodeInfoProvider struct {
41-
nodeInfoCache map[string]*schedulerframework.NodeInfo
46+
nodeInfoCache map[string]cacheItem
47+
ttl *time.Duration
4248
}
4349

4450
// NewMixedTemplateNodeInfoProvider returns a NodeInfoProvider processor building
4551
// NodeInfos from real-world nodes when available, otherwise from node groups templates.
46-
func NewMixedTemplateNodeInfoProvider() *MixedTemplateNodeInfoProvider {
52+
func NewMixedTemplateNodeInfoProvider(ttl *time.Duration) *MixedTemplateNodeInfoProvider {
4753
return &MixedTemplateNodeInfoProvider{
48-
nodeInfoCache: make(map[string]*schedulerframework.NodeInfo),
54+
nodeInfoCache: make(map[string]cacheItem),
55+
ttl: ttl,
56+
}
57+
}
58+
59+
func (p *MixedTemplateNodeInfoProvider) isCacheItemExpired(added time.Time) bool {
60+
if p.ttl == nil || *p.ttl < 0 {
61+
return false
4962
}
63+
return time.Now().Sub(added) > *p.ttl
5064
}
5165

5266
// CleanUp cleans up processor's internal structures.
@@ -102,7 +116,7 @@ func (p *MixedTemplateNodeInfoProvider) Process(ctx *context.AutoscalingContext,
102116
}
103117
if added && p.nodeInfoCache != nil {
104118
if nodeInfoCopy, err := utils.DeepCopyNodeInfo(result[id]); err == nil {
105-
p.nodeInfoCache[id] = nodeInfoCopy
119+
p.nodeInfoCache[id] = cacheItem{nodeInfo: nodeInfoCopy, added: time.Now()}
106120
}
107121
}
108122
}
@@ -115,8 +129,10 @@ func (p *MixedTemplateNodeInfoProvider) Process(ctx *context.AutoscalingContext,
115129

116130
// No good template, check cache of previously running nodes.
117131
if p.nodeInfoCache != nil {
118-
if nodeInfo, found := p.nodeInfoCache[id]; found {
119-
if nodeInfoCopy, err := utils.DeepCopyNodeInfo(nodeInfo); err == nil {
132+
if cacheItem, found := p.nodeInfoCache[id]; found {
133+
if p.isCacheItemExpired(cacheItem.added) {
134+
delete(p.nodeInfoCache, id)
135+
} else if nodeInfoCopy, err := utils.DeepCopyNodeInfo(cacheItem.nodeInfo); err == nil {
120136
result[id] = nodeInfoCopy
121137
continue
122138
}

cluster-autoscaler/processors/nodeinfosprovider/mixed_nodeinfos_processor_test.go

Lines changed: 60 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,10 @@ import (
3232
schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework"
3333
)
3434

35+
var (
36+
cacheTtl = 1 * time.Second
37+
)
38+
3539
func TestGetNodeInfosForGroups(t *testing.T) {
3640
now := time.Now()
3741
ready1 := BuildTestNode("n1", 1000, 1000)
@@ -81,7 +85,7 @@ func TestGetNodeInfosForGroups(t *testing.T) {
8185
ListerRegistry: registry,
8286
},
8387
}
84-
res, err := NewMixedTemplateNodeInfoProvider().Process(&ctx, []*apiv1.Node{justReady5, unready4, unready3, ready2, ready1}, []*appsv1.DaemonSet{}, nil, now)
88+
res, err := NewMixedTemplateNodeInfoProvider(&cacheTtl).Process(&ctx, []*apiv1.Node{justReady5, unready4, unready3, ready2, ready1}, []*appsv1.DaemonSet{}, nil, now)
8589
assert.NoError(t, err)
8690
assert.Equal(t, 5, len(res))
8791
info, found := res["ng1"]
@@ -108,7 +112,7 @@ func TestGetNodeInfosForGroups(t *testing.T) {
108112
ListerRegistry: registry,
109113
},
110114
}
111-
res, err = NewMixedTemplateNodeInfoProvider().Process(&ctx, []*apiv1.Node{}, []*appsv1.DaemonSet{}, nil, now)
115+
res, err = NewMixedTemplateNodeInfoProvider(&cacheTtl).Process(&ctx, []*apiv1.Node{}, []*appsv1.DaemonSet{}, nil, now)
112116
assert.NoError(t, err)
113117
assert.Equal(t, 0, len(res))
114118
}
@@ -167,7 +171,7 @@ func TestGetNodeInfosForGroupsCache(t *testing.T) {
167171
ListerRegistry: registry,
168172
},
169173
}
170-
niProcessor := NewMixedTemplateNodeInfoProvider()
174+
niProcessor := NewMixedTemplateNodeInfoProvider(&cacheTtl)
171175
res, err := niProcessor.Process(&ctx, []*apiv1.Node{unready4, unready3, ready2, ready1}, []*appsv1.DaemonSet{}, nil, now)
172176
assert.NoError(t, err)
173177
// Check results
@@ -187,10 +191,10 @@ func TestGetNodeInfosForGroupsCache(t *testing.T) {
187191
// Check cache
188192
cachedInfo, found := niProcessor.nodeInfoCache["ng1"]
189193
assert.True(t, found)
190-
assertEqualNodeCapacities(t, ready1, cachedInfo.Node())
194+
assertEqualNodeCapacities(t, ready1, cachedInfo.nodeInfo.Node())
191195
cachedInfo, found = niProcessor.nodeInfoCache["ng2"]
192196
assert.True(t, found)
193-
assertEqualNodeCapacities(t, ready2, cachedInfo.Node())
197+
assertEqualNodeCapacities(t, ready2, cachedInfo.nodeInfo.Node())
194198
cachedInfo, found = niProcessor.nodeInfoCache["ng3"]
195199
assert.False(t, found)
196200
cachedInfo, found = niProcessor.nodeInfoCache["ng4"]
@@ -216,14 +220,14 @@ func TestGetNodeInfosForGroupsCache(t *testing.T) {
216220
// Check cache
217221
cachedInfo, found = niProcessor.nodeInfoCache["ng2"]
218222
assert.True(t, found)
219-
assertEqualNodeCapacities(t, ready2, cachedInfo.Node())
223+
assertEqualNodeCapacities(t, ready2, cachedInfo.nodeInfo.Node())
220224
cachedInfo, found = niProcessor.nodeInfoCache["ng4"]
221225
assert.False(t, found)
222226

223227
// Fill cache manually
224228
infoNg4Node6 := schedulerframework.NewNodeInfo()
225229
infoNg4Node6.SetNode(ready6.DeepCopy())
226-
niProcessor.nodeInfoCache = map[string]*schedulerframework.NodeInfo{"ng4": infoNg4Node6}
230+
niProcessor.nodeInfoCache = map[string]cacheItem{"ng4": {nodeInfo: infoNg4Node6, added: now}}
227231
res, err = niProcessor.Process(&ctx, []*apiv1.Node{unready4, unready3, ready2, ready1}, []*appsv1.DaemonSet{}, nil, now)
228232
// Check if cache was used
229233
assert.NoError(t, err)
@@ -236,6 +240,55 @@ func TestGetNodeInfosForGroupsCache(t *testing.T) {
236240
assertEqualNodeCapacities(t, ready6, info.Node())
237241
}
238242

243+
func TestGetNodeInfosCacheExpired(t *testing.T) {
244+
now := time.Now()
245+
ready1 := BuildTestNode("n1", 1000, 1000)
246+
SetNodeReadyState(ready1, true, now.Add(-2*time.Minute))
247+
248+
// Cloud provider with TemplateNodeInfo not implemented.
249+
provider := testprovider.NewTestAutoprovisioningCloudProvider(nil, nil, nil, nil, nil, nil)
250+
podLister := kube_util.NewTestPodLister([]*apiv1.Pod{})
251+
registry := kube_util.NewListerRegistry(nil, nil, podLister, nil, nil, nil, nil, nil, nil, nil)
252+
predicateChecker, err := simulator.NewTestPredicateChecker()
253+
assert.NoError(t, err)
254+
255+
ctx := context.AutoscalingContext{
256+
CloudProvider: provider,
257+
PredicateChecker: predicateChecker,
258+
AutoscalingKubeClients: context.AutoscalingKubeClients{
259+
ListerRegistry: registry,
260+
},
261+
}
262+
tn := BuildTestNode("tn", 5000, 5000)
263+
tni := schedulerframework.NewNodeInfo()
264+
tni.SetNode(tn)
265+
// Cache expire time is set.
266+
niProcessor1 := NewMixedTemplateNodeInfoProvider(&cacheTtl)
267+
niProcessor1.nodeInfoCache = map[string]cacheItem{
268+
"ng1": {nodeInfo: tni, added: now.Add(-2 * time.Second)},
269+
"ng2": {nodeInfo: tni, added: now.Add(-2 * time.Second)},
270+
}
271+
provider.AddNodeGroup("ng1", 1, 10, 1)
272+
provider.AddNode("ng1", ready1)
273+
274+
assert.Equal(t, 2, len(niProcessor1.nodeInfoCache))
275+
_, err = niProcessor1.Process(&ctx, []*apiv1.Node{ready1}, []*appsv1.DaemonSet{}, nil, now)
276+
assert.NoError(t, err)
277+
assert.Equal(t, 1, len(niProcessor1.nodeInfoCache))
278+
279+
// Cache expire time isn't set.
280+
niProcessor2 := NewMixedTemplateNodeInfoProvider(nil)
281+
niProcessor2.nodeInfoCache = map[string]cacheItem{
282+
"ng1": {nodeInfo: tni, added: now.Add(-2 * time.Second)},
283+
"ng2": {nodeInfo: tni, added: now.Add(-2 * time.Second)},
284+
}
285+
assert.Equal(t, 2, len(niProcessor2.nodeInfoCache))
286+
_, err = niProcessor1.Process(&ctx, []*apiv1.Node{ready1}, []*appsv1.DaemonSet{}, nil, now)
287+
assert.NoError(t, err)
288+
assert.Equal(t, 2, len(niProcessor2.nodeInfoCache))
289+
290+
}
291+
239292
func assertEqualNodeCapacities(t *testing.T, expected, actual *apiv1.Node) {
240293
t.Helper()
241294
assert.NotEqual(t, actual.Status, nil, "")

cluster-autoscaler/processors/nodeinfosprovider/node_info_provider_processor.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,6 @@ type TemplateNodeInfoProvider interface {
3737
}
3838

3939
// NewDefaultTemplateNodeInfoProvider returns a default TemplateNodeInfoProvider.
40-
func NewDefaultTemplateNodeInfoProvider() TemplateNodeInfoProvider {
41-
return NewMixedTemplateNodeInfoProvider()
40+
func NewDefaultTemplateNodeInfoProvider(time *time.Duration) TemplateNodeInfoProvider {
41+
return NewMixedTemplateNodeInfoProvider(time)
4242
}

cluster-autoscaler/processors/processors.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,6 @@ func DefaultProcessors() *AutoscalingProcessors {
7777
NodeInfoProcessor: nodeinfos.NewDefaultNodeInfoProcessor(),
7878
NodeGroupConfigProcessor: nodegroupconfig.NewDefaultNodeGroupConfigProcessor(),
7979
CustomResourcesProcessor: customresources.NewDefaultCustomResourcesProcessor(),
80-
TemplateNodeInfoProvider: nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(),
8180
ActionableClusterProcessor: actionablecluster.NewDefaultActionableClusterProcessor(),
8281
}
8382
}

0 commit comments

Comments
 (0)