Skip to content

Commit 37b3da4

Browse files
authored
Merge pull request #7529 from towca/jtuznik/dra-prep
CA: prepare for DRA integration
2 parents f6c990e + 0691512 commit 37b3da4

29 files changed

+452
-191
lines changed

cluster-autoscaler/config/autoscaling_options.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -309,6 +309,8 @@ type AutoscalingOptions struct {
309309
CheckCapacityProvisioningRequestBatchTimebox time.Duration
310310
// ForceDeleteLongUnregisteredNodes is used to enable/disable ignoring min size constraints during removal of long unregistered nodes
311311
ForceDeleteLongUnregisteredNodes bool
312+
// DynamicResourceAllocationEnabled configures whether logic for handling DRA objects is enabled.
313+
DynamicResourceAllocationEnabled bool
312314
}
313315

314316
// KubeClientOptions specify options for kube client

cluster-autoscaler/core/autoscaler.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -117,14 +117,14 @@ func initializeDefaultOptions(opts *AutoscalerOptions, informerFactory informers
117117
opts.AutoscalingKubeClients = context.NewAutoscalingKubeClients(opts.AutoscalingOptions, opts.KubeClient, opts.InformerFactory)
118118
}
119119
if opts.FrameworkHandle == nil {
120-
fwHandle, err := framework.NewHandle(opts.InformerFactory, opts.SchedulerConfig)
120+
fwHandle, err := framework.NewHandle(opts.InformerFactory, opts.SchedulerConfig, opts.DynamicResourceAllocationEnabled)
121121
if err != nil {
122122
return err
123123
}
124124
opts.FrameworkHandle = fwHandle
125125
}
126126
if opts.ClusterSnapshot == nil {
127-
opts.ClusterSnapshot = predicate.NewPredicateSnapshot(store.NewBasicSnapshotStore(), opts.FrameworkHandle)
127+
opts.ClusterSnapshot = predicate.NewPredicateSnapshot(store.NewBasicSnapshotStore(), opts.FrameworkHandle, opts.DynamicResourceAllocationEnabled)
128128
}
129129
if opts.RemainingPdbTracker == nil {
130130
opts.RemainingPdbTracker = pdb.NewBasicRemainingPdbTracker()

cluster-autoscaler/core/podlistprocessor/filter_out_expendable_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626
"k8s.io/autoscaler/cluster-autoscaler/config"
2727
"k8s.io/autoscaler/cluster-autoscaler/context"
2828
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot/testsnapshot"
29+
drasnapshot "k8s.io/autoscaler/cluster-autoscaler/simulator/dynamicresources/snapshot"
2930
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
3031
"k8s.io/autoscaler/cluster-autoscaler/utils/test"
3132
)
@@ -110,7 +111,7 @@ func TestFilterOutExpendable(t *testing.T) {
110111
t.Run(tc.name, func(t *testing.T) {
111112
processor := NewFilterOutExpendablePodListProcessor()
112113
snapshot := testsnapshot.NewTestSnapshotOrDie(t)
113-
err := snapshot.SetClusterState(tc.nodes, nil)
114+
err := snapshot.SetClusterState(tc.nodes, nil, drasnapshot.Snapshot{})
114115
assert.NoError(t, err)
115116

116117
pods, err := processor.Process(&context.AutoscalingContext{

cluster-autoscaler/core/podlistprocessor/filter_out_schedulable_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot"
2828
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot/store"
2929
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot/testsnapshot"
30+
drasnapshot "k8s.io/autoscaler/cluster-autoscaler/simulator/dynamicresources/snapshot"
3031
"k8s.io/autoscaler/cluster-autoscaler/simulator/framework"
3132
"k8s.io/autoscaler/cluster-autoscaler/simulator/scheduling"
3233
. "k8s.io/autoscaler/cluster-autoscaler/utils/test"
@@ -280,7 +281,7 @@ func BenchmarkFilterOutSchedulable(b *testing.B) {
280281
}
281282

282283
clusterSnapshot := snapshotFactory()
283-
if err := clusterSnapshot.SetClusterState(nodes, scheduledPods); err != nil {
284+
if err := clusterSnapshot.SetClusterState(nodes, scheduledPods, drasnapshot.Snapshot{}); err != nil {
284285
assert.NoError(b, err)
285286
}
286287

cluster-autoscaler/core/scaledown/actuation/actuator.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ import (
3636
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot/predicate"
3737
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot/store"
3838
"k8s.io/autoscaler/cluster-autoscaler/simulator/drainability/rules"
39+
drasnapshot "k8s.io/autoscaler/cluster-autoscaler/simulator/dynamicresources/snapshot"
3940
"k8s.io/autoscaler/cluster-autoscaler/simulator/options"
4041
"k8s.io/autoscaler/cluster-autoscaler/simulator/utilization"
4142
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
@@ -358,7 +359,7 @@ func (a *Actuator) taintNode(node *apiv1.Node) error {
358359
}
359360

360361
func (a *Actuator) createSnapshot(nodes []*apiv1.Node) (clustersnapshot.ClusterSnapshot, error) {
361-
snapshot := predicate.NewPredicateSnapshot(store.NewBasicSnapshotStore(), a.ctx.FrameworkHandle)
362+
snapshot := predicate.NewPredicateSnapshot(store.NewBasicSnapshotStore(), a.ctx.FrameworkHandle, a.ctx.DynamicResourceAllocationEnabled)
362363
pods, err := a.ctx.AllPodLister().List()
363364
if err != nil {
364365
return nil, err
@@ -367,7 +368,7 @@ func (a *Actuator) createSnapshot(nodes []*apiv1.Node) (clustersnapshot.ClusterS
367368
scheduledPods := kube_util.ScheduledPods(pods)
368369
nonExpendableScheduledPods := utils.FilterOutExpendablePods(scheduledPods, a.ctx.ExpendablePodsPriorityCutoff)
369370

370-
err = snapshot.SetClusterState(nodes, nonExpendableScheduledPods)
371+
err = snapshot.SetClusterState(nodes, nonExpendableScheduledPods, drasnapshot.Snapshot{})
371372
if err != nil {
372373
return nil, err
373374
}

cluster-autoscaler/core/scaledown/eligibility/eligibility.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ import (
3131

3232
apiv1 "k8s.io/api/core/v1"
3333
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
34-
klog "k8s.io/klog/v2"
34+
"k8s.io/klog/v2"
3535
)
3636

3737
const (

cluster-autoscaler/core/scaleup/orchestrator/orchestrator_test.go

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ import (
4646
"k8s.io/autoscaler/cluster-autoscaler/processors/nodeinfosprovider"
4747
"k8s.io/autoscaler/cluster-autoscaler/processors/status"
4848
processorstest "k8s.io/autoscaler/cluster-autoscaler/processors/test"
49+
drasnapshot "k8s.io/autoscaler/cluster-autoscaler/simulator/dynamicresources/snapshot"
4950
"k8s.io/autoscaler/cluster-autoscaler/simulator/framework"
5051
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
5152
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
@@ -1044,7 +1045,7 @@ func runSimpleScaleUpTest(t *testing.T, config *ScaleUpTestConfig) *ScaleUpTestR
10441045
// build orchestrator
10451046
context, err := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, listers, provider, nil, nil)
10461047
assert.NoError(t, err)
1047-
err = context.ClusterSnapshot.SetClusterState(nodes, kube_util.ScheduledPods(pods))
1048+
err = context.ClusterSnapshot.SetClusterState(nodes, kube_util.ScheduledPods(pods), drasnapshot.Snapshot{})
10481049
assert.NoError(t, err)
10491050
nodeInfos, err := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil, false).
10501051
Process(&context, nodes, []*appsv1.DaemonSet{}, taints.TaintConfig{}, now)
@@ -1154,7 +1155,7 @@ func TestScaleUpUnhealthy(t *testing.T) {
11541155
}
11551156
context, err := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, listers, provider, nil, nil)
11561157
assert.NoError(t, err)
1157-
err = context.ClusterSnapshot.SetClusterState(nodes, pods)
1158+
err = context.ClusterSnapshot.SetClusterState(nodes, pods, drasnapshot.Snapshot{})
11581159
assert.NoError(t, err)
11591160
nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil, false).Process(&context, nodes, []*appsv1.DaemonSet{}, taints.TaintConfig{}, now)
11601161
clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{MaxNodeProvisionTime: 15 * time.Minute}), asyncnodegroups.NewDefaultAsyncNodeGroupStateChecker())
@@ -1197,7 +1198,7 @@ func TestBinpackingLimiter(t *testing.T) {
11971198

11981199
context, err := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, listers, provider, nil, nil)
11991200
assert.NoError(t, err)
1200-
err = context.ClusterSnapshot.SetClusterState(nodes, nil)
1201+
err = context.ClusterSnapshot.SetClusterState(nodes, nil, drasnapshot.Snapshot{})
12011202
assert.NoError(t, err)
12021203
nodeInfos, err := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil, false).
12031204
Process(&context, nodes, []*appsv1.DaemonSet{}, taints.TaintConfig{}, now)
@@ -1257,7 +1258,7 @@ func TestScaleUpNoHelp(t *testing.T) {
12571258
}
12581259
context, err := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, listers, provider, nil, nil)
12591260
assert.NoError(t, err)
1260-
err = context.ClusterSnapshot.SetClusterState(nodes, pods)
1261+
err = context.ClusterSnapshot.SetClusterState(nodes, pods, drasnapshot.Snapshot{})
12611262
assert.NoError(t, err)
12621263
nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil, false).Process(&context, nodes, []*appsv1.DaemonSet{}, taints.TaintConfig{}, now)
12631264
clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{MaxNodeProvisionTime: 15 * time.Minute}), asyncnodegroups.NewDefaultAsyncNodeGroupStateChecker())
@@ -1412,7 +1413,7 @@ func TestComputeSimilarNodeGroups(t *testing.T) {
14121413
listers := kube_util.NewListerRegistry(nil, nil, kube_util.NewTestPodLister(nil), nil, nil, nil, nil, nil, nil)
14131414
ctx, err := NewScaleTestAutoscalingContext(config.AutoscalingOptions{BalanceSimilarNodeGroups: tc.balancingEnabled}, &fake.Clientset{}, listers, provider, nil, nil)
14141415
assert.NoError(t, err)
1415-
err = ctx.ClusterSnapshot.SetClusterState(nodes, nil)
1416+
err = ctx.ClusterSnapshot.SetClusterState(nodes, nil, drasnapshot.Snapshot{})
14161417
assert.NoError(t, err)
14171418
nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil, false).Process(&ctx, nodes, []*appsv1.DaemonSet{}, taints.TaintConfig{}, now)
14181419
clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, ctx.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{MaxNodeProvisionTime: 15 * time.Minute}), asyncnodegroups.NewDefaultAsyncNodeGroupStateChecker())
@@ -1477,7 +1478,7 @@ func TestScaleUpBalanceGroups(t *testing.T) {
14771478
}
14781479
context, err := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, listers, provider, nil, nil)
14791480
assert.NoError(t, err)
1480-
err = context.ClusterSnapshot.SetClusterState(nodes, podList)
1481+
err = context.ClusterSnapshot.SetClusterState(nodes, podList, drasnapshot.Snapshot{})
14811482
assert.NoError(t, err)
14821483
nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil, false).Process(&context, nodes, []*appsv1.DaemonSet{}, taints.TaintConfig{}, now)
14831484
clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{MaxNodeProvisionTime: 15 * time.Minute}), asyncnodegroups.NewDefaultAsyncNodeGroupStateChecker())
@@ -1654,7 +1655,7 @@ func TestScaleUpToMeetNodeGroupMinSize(t *testing.T) {
16541655
assert.NoError(t, err)
16551656

16561657
nodes := []*apiv1.Node{n1, n2}
1657-
err = context.ClusterSnapshot.SetClusterState(nodes, nil)
1658+
err = context.ClusterSnapshot.SetClusterState(nodes, nil, drasnapshot.Snapshot{})
16581659
assert.NoError(t, err)
16591660
nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil, false).Process(&context, nodes, []*appsv1.DaemonSet{}, taints.TaintConfig{}, time.Now())
16601661
processors := processorstest.NewTestProcessors(&context)

cluster-autoscaler/core/scaleup/resource/manager_test.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import (
3232
"k8s.io/autoscaler/cluster-autoscaler/core/test"
3333
"k8s.io/autoscaler/cluster-autoscaler/processors/nodeinfosprovider"
3434
processorstest "k8s.io/autoscaler/cluster-autoscaler/processors/test"
35+
drasnapshot "k8s.io/autoscaler/cluster-autoscaler/simulator/dynamicresources/snapshot"
3536
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
3637
"k8s.io/autoscaler/cluster-autoscaler/utils/taints"
3738
utils_test "k8s.io/autoscaler/cluster-autoscaler/utils/test"
@@ -71,7 +72,7 @@ func TestDeltaForNode(t *testing.T) {
7172

7273
ng := testCase.nodeGroupConfig
7374
group, nodes := newNodeGroup(t, cp, ng.Name, ng.Min, ng.Max, ng.Size, ng.CPU, ng.Mem)
74-
err := ctx.ClusterSnapshot.SetClusterState(nodes, nil)
75+
err := ctx.ClusterSnapshot.SetClusterState(nodes, nil, drasnapshot.Snapshot{})
7576
assert.NoError(t, err)
7677
nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil, false).Process(&ctx, nodes, []*appsv1.DaemonSet{}, taints.TaintConfig{}, time.Now())
7778

@@ -114,7 +115,7 @@ func TestResourcesLeft(t *testing.T) {
114115

115116
ng := testCase.nodeGroupConfig
116117
_, nodes := newNodeGroup(t, cp, ng.Name, ng.Min, ng.Max, ng.Size, ng.CPU, ng.Mem)
117-
err := ctx.ClusterSnapshot.SetClusterState(nodes, nil)
118+
err := ctx.ClusterSnapshot.SetClusterState(nodes, nil, drasnapshot.Snapshot{})
118119
assert.NoError(t, err)
119120
nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil, false).Process(&ctx, nodes, []*appsv1.DaemonSet{}, taints.TaintConfig{}, time.Now())
120121

@@ -167,7 +168,7 @@ func TestApplyLimits(t *testing.T) {
167168

168169
ng := testCase.nodeGroupConfig
169170
group, nodes := newNodeGroup(t, cp, ng.Name, ng.Min, ng.Max, ng.Size, ng.CPU, ng.Mem)
170-
err := ctx.ClusterSnapshot.SetClusterState(nodes, nil)
171+
err := ctx.ClusterSnapshot.SetClusterState(nodes, nil, drasnapshot.Snapshot{})
171172
assert.NoError(t, err)
172173
nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil, false).Process(&ctx, nodes, []*appsv1.DaemonSet{}, taints.TaintConfig{}, time.Now())
173174

@@ -234,7 +235,7 @@ func TestResourceManagerWithGpuResource(t *testing.T) {
234235
assert.NoError(t, err)
235236

236237
nodes := []*corev1.Node{n1}
237-
err = context.ClusterSnapshot.SetClusterState(nodes, nil)
238+
err = context.ClusterSnapshot.SetClusterState(nodes, nil, drasnapshot.Snapshot{})
238239
assert.NoError(t, err)
239240
nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil, false).Process(&context, nodes, []*appsv1.DaemonSet{}, taints.TaintConfig{}, time.Now())
240241

cluster-autoscaler/core/static_autoscaler.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ import (
4646
"k8s.io/autoscaler/cluster-autoscaler/simulator"
4747
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot"
4848
"k8s.io/autoscaler/cluster-autoscaler/simulator/drainability/rules"
49+
drasnapshot "k8s.io/autoscaler/cluster-autoscaler/simulator/dynamicresources/snapshot"
4950
"k8s.io/autoscaler/cluster-autoscaler/simulator/framework"
5051
"k8s.io/autoscaler/cluster-autoscaler/simulator/options"
5152
"k8s.io/autoscaler/cluster-autoscaler/utils/backoff"
@@ -337,7 +338,7 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) caerrors.AutoscalerErr
337338
}
338339
nonExpendableScheduledPods := core_utils.FilterOutExpendablePods(originalScheduledPods, a.ExpendablePodsPriorityCutoff)
339340
// Initialize cluster state to ClusterSnapshot
340-
if err := a.ClusterSnapshot.SetClusterState(allNodes, nonExpendableScheduledPods); err != nil {
341+
if err := a.ClusterSnapshot.SetClusterState(allNodes, nonExpendableScheduledPods, drasnapshot.Snapshot{}); err != nil {
341342
return caerrors.ToAutoscalerError(caerrors.InternalError, err).AddPrefix("failed to initialize ClusterSnapshot: ")
342343
}
343344
// Initialize Pod Disruption Budget tracking

cluster-autoscaler/main.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -280,6 +280,7 @@ var (
280280
checkCapacityProvisioningRequestMaxBatchSize = flag.Int("check-capacity-provisioning-request-max-batch-size", 10, "Maximum number of provisioning requests to process in a single batch.")
281281
checkCapacityProvisioningRequestBatchTimebox = flag.Duration("check-capacity-provisioning-request-batch-timebox", 10*time.Second, "Maximum time to process a batch of provisioning requests.")
282282
forceDeleteLongUnregisteredNodes = flag.Bool("force-delete-unregistered-nodes", false, "Whether to enable force deletion of long unregistered nodes, regardless of the min size of the node group the belong to.")
283+
enableDynamicResourceAllocation = flag.Bool("enable-dynamic-resource-allocation", false, "Whether logic for handling DRA (Dynamic Resource Allocation) objects is enabled.")
283284
)
284285

285286
func isFlagPassed(name string) bool {
@@ -459,6 +460,7 @@ func createAutoscalingOptions() config.AutoscalingOptions {
459460
CheckCapacityProvisioningRequestMaxBatchSize: *checkCapacityProvisioningRequestMaxBatchSize,
460461
CheckCapacityProvisioningRequestBatchTimebox: *checkCapacityProvisioningRequestBatchTimebox,
461462
ForceDeleteLongUnregisteredNodes: *forceDeleteLongUnregisteredNodes,
463+
DynamicResourceAllocationEnabled: *enableDynamicResourceAllocation,
462464
}
463465
}
464466

@@ -494,7 +496,7 @@ func buildAutoscaler(context ctx.Context, debuggingSnapshotter debuggingsnapshot
494496
}
495497
informerFactory := informers.NewSharedInformerFactoryWithOptions(kubeClient, 0, informers.WithTransform(trim))
496498

497-
fwHandle, err := framework.NewHandle(informerFactory, autoscalingOptions.SchedulerConfig)
499+
fwHandle, err := framework.NewHandle(informerFactory, autoscalingOptions.SchedulerConfig, autoscalingOptions.DynamicResourceAllocationEnabled)
498500
if err != nil {
499501
return nil, nil, err
500502
}
@@ -504,7 +506,7 @@ func buildAutoscaler(context ctx.Context, debuggingSnapshotter debuggingsnapshot
504506
opts := core.AutoscalerOptions{
505507
AutoscalingOptions: autoscalingOptions,
506508
FrameworkHandle: fwHandle,
507-
ClusterSnapshot: predicate.NewPredicateSnapshot(store.NewDeltaSnapshotStore(), fwHandle),
509+
ClusterSnapshot: predicate.NewPredicateSnapshot(store.NewDeltaSnapshotStore(), fwHandle, autoscalingOptions.DynamicResourceAllocationEnabled),
508510
KubeClient: kubeClient,
509511
InformerFactory: informerFactory,
510512
DebuggingSnapshotter: debuggingSnapshotter,

0 commit comments

Comments
 (0)