From 8b365111896819ffea55fdddaaaf405cefbc047c Mon Sep 17 00:00:00 2001 From: Patryk Bundyra <73306396+PBundyra@users.noreply.github.com> Date: Tue, 16 Apr 2024 16:57:33 +0200 Subject: [PATCH] Introduce AdmissionCheckStrategy API, change assigning AdmissionChecks to a Workload (#1960) Add validation webhook Add TestReconcile unit tests Add webhook unit tests Add an additional layer to the API that wraps rules for AdmissionChecks Add AdmissionCheckStrategy to ClusterQueues's cache Change ClusterQueue cache Sort AdmissionChecks in a Workload --- apis/kueue/v1beta1/clusterqueue_types.go | 25 +++- apis/kueue/v1beta1/zz_generated.deepcopy.go | 43 ++++++ .../crd/kueue.x-k8s.io_clusterqueues.yaml | 35 ++++- .../kueue/v1beta1/admissionchecksstrategy.go | 43 ++++++ .../v1beta1/admissioncheckstrategyrule.go | 53 +++++++ .../kueue/v1beta1/clusterqueuespec.go | 25 +++- client-go/applyconfiguration/utils.go | 4 + .../bases/kueue.x-k8s.io_clusterqueues.yaml | 35 ++++- pkg/cache/cache.go | 2 +- pkg/cache/cache_test.go | 69 ++++++--- pkg/cache/clusterqueue.go | 10 +- pkg/cache/clusterqueue_test.go | 138 +++++++++++++++++- pkg/cache/snapshot.go | 3 +- pkg/controller/core/workload_controller.go | 42 +++--- .../core/workload_controller_test.go | 68 ++++++++- pkg/scheduler/scheduler.go | 6 +- pkg/scheduler/scheduler_test.go | 3 + pkg/util/admissioncheck/admissioncheck.go | 16 ++ pkg/util/maps/maps.go | 11 ++ pkg/util/testing/wrappers.go | 30 ++++ pkg/webhooks/clusterqueue_webhook.go | 23 +++ pkg/webhooks/clusterqueue_webhook_test.go | 24 +++ pkg/workload/workload.go | 50 +++++++ pkg/workload/workload_test.go | 70 +++++++++ .../provisioning/provisioning-setup.yaml | 5 +- .../scheduler/workload_controller_test.go | 118 +++++++++++++++ 26 files changed, 885 insertions(+), 66 deletions(-) create mode 100644 client-go/applyconfiguration/kueue/v1beta1/admissionchecksstrategy.go create mode 100644 client-go/applyconfiguration/kueue/v1beta1/admissioncheckstrategyrule.go diff --git a/apis/kueue/v1beta1/clusterqueue_types.go b/apis/kueue/v1beta1/clusterqueue_types.go index 807cabc6d6..812e31dc19 100644 --- a/apis/kueue/v1beta1/clusterqueue_types.go +++ b/apis/kueue/v1beta1/clusterqueue_types.go @@ -98,10 +98,16 @@ type ClusterQueueSpec struct { // +kubebuilder:default={} Preemption *ClusterQueuePreemption `json:"preemption,omitempty"` - // admissionChecks lists the AdmissionChecks required by this ClusterQueue + // admissionChecks lists the AdmissionChecks required by this ClusterQueue. + // Cannot be used along with AdmissionCheckStrategy. // +optional AdmissionChecks []string `json:"admissionChecks,omitempty"` + // admissionCheckStrategy defines a list of strategies to determine which ResourceFlavors require AdmissionChecks. + // This property cannot be used in conjunction with the 'admissionChecks' property. + // +optional + AdmissionChecksStrategy AdmissionChecksStrategy `json:"admissionChecksStrategy,omitempty"` + // stopPolicy - if set to a value different from None, the ClusterQueue is considered Inactive, no new reservation being // made. // @@ -117,6 +123,23 @@ type ClusterQueueSpec struct { StopPolicy *StopPolicy `json:"stopPolicy,omitempty"` } +// AdmissionCheckStrategy defines a strategy for a AdmissionCheck. +type AdmissionChecksStrategy struct { + // admissionChecks is a list of strategies for AdmissionChecks + AdmissionChecks []AdmissionCheckStrategyRule `json:"admissionChecks,omitempty"` +} + +// AdmissionCheckStrategyRule defines rules for a single AdmissionCheck +type AdmissionCheckStrategyRule struct { + // name is an AdmissionCheck's name. + Name string `json:"name"` + + // onFlavors is a list of ResourceFlavors' names that this AdmissionCheck should run for. + // If empty, the AdmissionCheck will run for all workloads submitted to the ClusterQueue. + // +optional + OnFlavors []ResourceFlavorReference `json:"onFlavors"` +} + type QueueingStrategy string const ( diff --git a/apis/kueue/v1beta1/zz_generated.deepcopy.go b/apis/kueue/v1beta1/zz_generated.deepcopy.go index 842c5abfd5..80a883d806 100644 --- a/apis/kueue/v1beta1/zz_generated.deepcopy.go +++ b/apis/kueue/v1beta1/zz_generated.deepcopy.go @@ -192,6 +192,48 @@ func (in *AdmissionCheckStatus) DeepCopy() *AdmissionCheckStatus { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *AdmissionCheckStrategyRule) DeepCopyInto(out *AdmissionCheckStrategyRule) { + *out = *in + if in.OnFlavors != nil { + in, out := &in.OnFlavors, &out.OnFlavors + *out = make([]ResourceFlavorReference, len(*in)) + copy(*out, *in) + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new AdmissionCheckStrategyRule. +func (in *AdmissionCheckStrategyRule) DeepCopy() *AdmissionCheckStrategyRule { + if in == nil { + return nil + } + out := new(AdmissionCheckStrategyRule) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *AdmissionChecksStrategy) DeepCopyInto(out *AdmissionChecksStrategy) { + *out = *in + if in.AdmissionChecks != nil { + in, out := &in.AdmissionChecks, &out.AdmissionChecks + *out = make([]AdmissionCheckStrategyRule, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new AdmissionChecksStrategy. +func (in *AdmissionChecksStrategy) DeepCopy() *AdmissionChecksStrategy { + if in == nil { + return nil + } + out := new(AdmissionChecksStrategy) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *BorrowWithinCohort) DeepCopyInto(out *BorrowWithinCohort) { *out = *in @@ -357,6 +399,7 @@ func (in *ClusterQueueSpec) DeepCopyInto(out *ClusterQueueSpec) { *out = make([]string, len(*in)) copy(*out, *in) } + in.AdmissionChecksStrategy.DeepCopyInto(&out.AdmissionChecksStrategy) if in.StopPolicy != nil { in, out := &in.StopPolicy, &out.StopPolicy *out = new(StopPolicy) diff --git a/charts/kueue/templates/crd/kueue.x-k8s.io_clusterqueues.yaml b/charts/kueue/templates/crd/kueue.x-k8s.io_clusterqueues.yaml index a81c243db6..cdc1cdcbbe 100644 --- a/charts/kueue/templates/crd/kueue.x-k8s.io_clusterqueues.yaml +++ b/charts/kueue/templates/crd/kueue.x-k8s.io_clusterqueues.yaml @@ -74,11 +74,42 @@ spec: description: ClusterQueueSpec defines the desired state of ClusterQueue properties: admissionChecks: - description: admissionChecks lists the AdmissionChecks required by - this ClusterQueue + description: |- + admissionChecks lists the AdmissionChecks required by this ClusterQueue. + Cannot be used along with AdmissionCheckStrategy. items: type: string type: array + admissionChecksStrategy: + description: |- + admissionCheckStrategy defines a list of strategies to determine which ResourceFlavors require AdmissionChecks. + This property cannot be used in conjunction with the 'admissionChecks' property. + properties: + admissionChecks: + description: admissionChecks is a list of strategies for AdmissionChecks + items: + description: AdmissionCheckStrategyRule defines rules for a + single AdmissionCheck + properties: + name: + description: name is an AdmissionCheck's name. + type: string + onFlavors: + description: |- + onFlavors is a list of ResourceFlavors' names that this AdmissionCheck should run for. + If empty, the AdmissionCheck will run for all workloads submitted to the ClusterQueue. + items: + description: ResourceFlavorReference is the name of the + ResourceFlavor. + maxLength: 253 + pattern: ^[a-z0-9]([-a-z0-9]*[a-z0-9])?(\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*$ + type: string + type: array + required: + - name + type: object + type: array + type: object cohort: description: |- cohort that this ClusterQueue belongs to. CQs that belong to the diff --git a/client-go/applyconfiguration/kueue/v1beta1/admissionchecksstrategy.go b/client-go/applyconfiguration/kueue/v1beta1/admissionchecksstrategy.go new file mode 100644 index 0000000000..fb0c4ce292 --- /dev/null +++ b/client-go/applyconfiguration/kueue/v1beta1/admissionchecksstrategy.go @@ -0,0 +1,43 @@ +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +// Code generated by applyconfiguration-gen. DO NOT EDIT. + +package v1beta1 + +// AdmissionChecksStrategyApplyConfiguration represents an declarative configuration of the AdmissionChecksStrategy type for use +// with apply. +type AdmissionChecksStrategyApplyConfiguration struct { + AdmissionChecks []AdmissionCheckStrategyRuleApplyConfiguration `json:"admissionChecks,omitempty"` +} + +// AdmissionChecksStrategyApplyConfiguration constructs an declarative configuration of the AdmissionChecksStrategy type for use with +// apply. +func AdmissionChecksStrategy() *AdmissionChecksStrategyApplyConfiguration { + return &AdmissionChecksStrategyApplyConfiguration{} +} + +// WithAdmissionChecks adds the given value to the AdmissionChecks field in the declarative configuration +// and returns the receiver, so that objects can be build by chaining "With" function invocations. +// If called multiple times, values provided by each call will be appended to the AdmissionChecks field. +func (b *AdmissionChecksStrategyApplyConfiguration) WithAdmissionChecks(values ...*AdmissionCheckStrategyRuleApplyConfiguration) *AdmissionChecksStrategyApplyConfiguration { + for i := range values { + if values[i] == nil { + panic("nil value passed to WithAdmissionChecks") + } + b.AdmissionChecks = append(b.AdmissionChecks, *values[i]) + } + return b +} diff --git a/client-go/applyconfiguration/kueue/v1beta1/admissioncheckstrategyrule.go b/client-go/applyconfiguration/kueue/v1beta1/admissioncheckstrategyrule.go new file mode 100644 index 0000000000..84a8e6655b --- /dev/null +++ b/client-go/applyconfiguration/kueue/v1beta1/admissioncheckstrategyrule.go @@ -0,0 +1,53 @@ +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +// Code generated by applyconfiguration-gen. DO NOT EDIT. + +package v1beta1 + +import ( + v1beta1 "sigs.k8s.io/kueue/apis/kueue/v1beta1" +) + +// AdmissionCheckStrategyRuleApplyConfiguration represents an declarative configuration of the AdmissionCheckStrategyRule type for use +// with apply. +type AdmissionCheckStrategyRuleApplyConfiguration struct { + Name *string `json:"name,omitempty"` + OnFlavors []v1beta1.ResourceFlavorReference `json:"onFlavors,omitempty"` +} + +// AdmissionCheckStrategyRuleApplyConfiguration constructs an declarative configuration of the AdmissionCheckStrategyRule type for use with +// apply. +func AdmissionCheckStrategyRule() *AdmissionCheckStrategyRuleApplyConfiguration { + return &AdmissionCheckStrategyRuleApplyConfiguration{} +} + +// WithName sets the Name field in the declarative configuration to the given value +// and returns the receiver, so that objects can be built by chaining "With" function invocations. +// If called multiple times, the Name field is set to the value of the last call. +func (b *AdmissionCheckStrategyRuleApplyConfiguration) WithName(value string) *AdmissionCheckStrategyRuleApplyConfiguration { + b.Name = &value + return b +} + +// WithOnFlavors adds the given value to the OnFlavors field in the declarative configuration +// and returns the receiver, so that objects can be build by chaining "With" function invocations. +// If called multiple times, values provided by each call will be appended to the OnFlavors field. +func (b *AdmissionCheckStrategyRuleApplyConfiguration) WithOnFlavors(values ...v1beta1.ResourceFlavorReference) *AdmissionCheckStrategyRuleApplyConfiguration { + for i := range values { + b.OnFlavors = append(b.OnFlavors, values[i]) + } + return b +} diff --git a/client-go/applyconfiguration/kueue/v1beta1/clusterqueuespec.go b/client-go/applyconfiguration/kueue/v1beta1/clusterqueuespec.go index 564c24ab9a..1ae653aa53 100644 --- a/client-go/applyconfiguration/kueue/v1beta1/clusterqueuespec.go +++ b/client-go/applyconfiguration/kueue/v1beta1/clusterqueuespec.go @@ -25,14 +25,15 @@ import ( // ClusterQueueSpecApplyConfiguration represents an declarative configuration of the ClusterQueueSpec type for use // with apply. type ClusterQueueSpecApplyConfiguration struct { - ResourceGroups []ResourceGroupApplyConfiguration `json:"resourceGroups,omitempty"` - Cohort *string `json:"cohort,omitempty"` - QueueingStrategy *kueuev1beta1.QueueingStrategy `json:"queueingStrategy,omitempty"` - NamespaceSelector *v1.LabelSelector `json:"namespaceSelector,omitempty"` - FlavorFungibility *FlavorFungibilityApplyConfiguration `json:"flavorFungibility,omitempty"` - Preemption *ClusterQueuePreemptionApplyConfiguration `json:"preemption,omitempty"` - AdmissionChecks []string `json:"admissionChecks,omitempty"` - StopPolicy *kueuev1beta1.StopPolicy `json:"stopPolicy,omitempty"` + ResourceGroups []ResourceGroupApplyConfiguration `json:"resourceGroups,omitempty"` + Cohort *string `json:"cohort,omitempty"` + QueueingStrategy *kueuev1beta1.QueueingStrategy `json:"queueingStrategy,omitempty"` + NamespaceSelector *v1.LabelSelector `json:"namespaceSelector,omitempty"` + FlavorFungibility *FlavorFungibilityApplyConfiguration `json:"flavorFungibility,omitempty"` + Preemption *ClusterQueuePreemptionApplyConfiguration `json:"preemption,omitempty"` + AdmissionChecks []string `json:"admissionChecks,omitempty"` + AdmissionChecksStrategy *AdmissionChecksStrategyApplyConfiguration `json:"admissionChecksStrategy,omitempty"` + StopPolicy *kueuev1beta1.StopPolicy `json:"stopPolicy,omitempty"` } // ClusterQueueSpecApplyConfiguration constructs an declarative configuration of the ClusterQueueSpec type for use with @@ -104,6 +105,14 @@ func (b *ClusterQueueSpecApplyConfiguration) WithAdmissionChecks(values ...strin return b } +// WithAdmissionChecksStrategy sets the AdmissionChecksStrategy field in the declarative configuration to the given value +// and returns the receiver, so that objects can be built by chaining "With" function invocations. +// If called multiple times, the AdmissionChecksStrategy field is set to the value of the last call. +func (b *ClusterQueueSpecApplyConfiguration) WithAdmissionChecksStrategy(value *AdmissionChecksStrategyApplyConfiguration) *ClusterQueueSpecApplyConfiguration { + b.AdmissionChecksStrategy = value + return b +} + // WithStopPolicy sets the StopPolicy field in the declarative configuration to the given value // and returns the receiver, so that objects can be built by chaining "With" function invocations. // If called multiple times, the StopPolicy field is set to the value of the last call. diff --git a/client-go/applyconfiguration/utils.go b/client-go/applyconfiguration/utils.go index 928d168488..8bd03f6f73 100644 --- a/client-go/applyconfiguration/utils.go +++ b/client-go/applyconfiguration/utils.go @@ -54,10 +54,14 @@ func ForKind(kind schema.GroupVersionKind) interface{} { return &kueuev1beta1.AdmissionCheckParametersReferenceApplyConfiguration{} case v1beta1.SchemeGroupVersion.WithKind("AdmissionCheckSpec"): return &kueuev1beta1.AdmissionCheckSpecApplyConfiguration{} + case v1beta1.SchemeGroupVersion.WithKind("AdmissionChecksStrategy"): + return &kueuev1beta1.AdmissionChecksStrategyApplyConfiguration{} case v1beta1.SchemeGroupVersion.WithKind("AdmissionCheckState"): return &kueuev1beta1.AdmissionCheckStateApplyConfiguration{} case v1beta1.SchemeGroupVersion.WithKind("AdmissionCheckStatus"): return &kueuev1beta1.AdmissionCheckStatusApplyConfiguration{} + case v1beta1.SchemeGroupVersion.WithKind("AdmissionCheckStrategyRule"): + return &kueuev1beta1.AdmissionCheckStrategyRuleApplyConfiguration{} case v1beta1.SchemeGroupVersion.WithKind("BorrowWithinCohort"): return &kueuev1beta1.BorrowWithinCohortApplyConfiguration{} case v1beta1.SchemeGroupVersion.WithKind("ClusterQueue"): diff --git a/config/components/crd/bases/kueue.x-k8s.io_clusterqueues.yaml b/config/components/crd/bases/kueue.x-k8s.io_clusterqueues.yaml index ba32a2fbe8..7b39187ad6 100644 --- a/config/components/crd/bases/kueue.x-k8s.io_clusterqueues.yaml +++ b/config/components/crd/bases/kueue.x-k8s.io_clusterqueues.yaml @@ -59,11 +59,42 @@ spec: description: ClusterQueueSpec defines the desired state of ClusterQueue properties: admissionChecks: - description: admissionChecks lists the AdmissionChecks required by - this ClusterQueue + description: |- + admissionChecks lists the AdmissionChecks required by this ClusterQueue. + Cannot be used along with AdmissionCheckStrategy. items: type: string type: array + admissionChecksStrategy: + description: |- + admissionCheckStrategy defines a list of strategies to determine which ResourceFlavors require AdmissionChecks. + This property cannot be used in conjunction with the 'admissionChecks' property. + properties: + admissionChecks: + description: admissionChecks is a list of strategies for AdmissionChecks + items: + description: AdmissionCheckStrategyRule defines rules for a + single AdmissionCheck + properties: + name: + description: name is an AdmissionCheck's name. + type: string + onFlavors: + description: |- + onFlavors is a list of ResourceFlavors' names that this AdmissionCheck should run for. + If empty, the AdmissionCheck will run for all workloads submitted to the ClusterQueue. + items: + description: ResourceFlavorReference is the name of the + ResourceFlavor. + maxLength: 253 + pattern: ^[a-z0-9]([-a-z0-9]*[a-z0-9])?(\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*$ + type: string + type: array + required: + - name + type: object + type: array + type: object cohort: description: |- cohort that this ClusterQueue belongs to. CQs that belong to the diff --git a/pkg/cache/cache.go b/pkg/cache/cache.go index f4b47314ae..a6ef5912e0 100644 --- a/pkg/cache/cache.go +++ b/pkg/cache/cache.go @@ -728,7 +728,7 @@ func (c *Cache) ClusterQueuesUsingAdmissionCheck(ac string) []string { var cqs []string for _, cq := range c.clusterQueues { - if cq.AdmissionChecks.Has(ac) { + if _, found := cq.AdmissionChecks[ac]; found { cqs = append(cqs, cq.Name) } } diff --git a/pkg/cache/cache_test.go b/pkg/cache/cache_test.go index 1f881d3dca..5e0a74f642 100644 --- a/pkg/cache/cache_test.go +++ b/pkg/cache/cache_test.go @@ -964,7 +964,10 @@ func TestCacheClusterQueueOperations(t *testing.T) { Preemption: defaultPreemption, AllocatableResourceGeneration: 1, FlavorFungibility: defaultFlavorFungibility, - AdmissionChecks: sets.New("check1", "check2"), + AdmissionChecks: map[string]sets.Set[string]{ + "check1": sets.New[string](), + "check2": sets.New[string](), + }, }, }, wantCohorts: map[string]sets.Set[string]{}, @@ -992,8 +995,10 @@ func TestCacheClusterQueueOperations(t *testing.T) { Preemption: defaultPreemption, AllocatableResourceGeneration: 1, FlavorFungibility: defaultFlavorFungibility, - AdmissionChecks: sets.New("check1", "check2"), - }, + AdmissionChecks: map[string]sets.Set[string]{ + "check1": sets.New[string](), + "check2": sets.New[string](), + }}, }, wantCohorts: map[string]sets.Set[string]{}, }, @@ -1021,8 +1026,10 @@ func TestCacheClusterQueueOperations(t *testing.T) { Preemption: defaultPreemption, AllocatableResourceGeneration: 1, FlavorFungibility: defaultFlavorFungibility, - AdmissionChecks: sets.New("check1", "check2"), - }, + AdmissionChecks: map[string]sets.Set[string]{ + "check1": sets.New[string](), + "check2": sets.New[string](), + }}, }, wantCohorts: map[string]sets.Set[string]{}, }, @@ -1050,8 +1057,10 @@ func TestCacheClusterQueueOperations(t *testing.T) { Preemption: defaultPreemption, AllocatableResourceGeneration: 1, FlavorFungibility: defaultFlavorFungibility, - AdmissionChecks: sets.New("check1", "check2"), - }, + AdmissionChecks: map[string]sets.Set[string]{ + "check1": sets.New[string](), + "check2": sets.New[string](), + }}, }, wantCohorts: map[string]sets.Set[string]{}, }, @@ -3385,45 +3394,61 @@ func messageOrEmpty(err error) string { } func TestClusterQueuesUsingAdmissionChecks(t *testing.T) { - admissionCheck1 := utiltesting.MakeAdmissionCheck("ac1").Obj() - admissionCheck2 := utiltesting.MakeAdmissionCheck("ac2").Obj() + checks := []*kueue.AdmissionCheck{ + utiltesting.MakeAdmissionCheck("ac1").Obj(), + utiltesting.MakeAdmissionCheck("ac2").Obj(), + utiltesting.MakeAdmissionCheck("ac3").Obj(), + } + fooCq := utiltesting.MakeClusterQueue("fooCq"). - AdmissionChecks(admissionCheck1.Name). + AdmissionChecks("ac1"). Obj() barCq := utiltesting.MakeClusterQueue("barCq").Obj() fizzCq := utiltesting.MakeClusterQueue("fizzCq"). - AdmissionChecks(admissionCheck1.Name, admissionCheck2.Name). + AdmissionChecks("ac1", "ac2"). + Obj() + strategyCq := utiltesting.MakeClusterQueue("strategyCq"). + AdmissionCheckStrategy( + *utiltesting.MakeAdmissionCheckStrategyRule("ac1").Obj(), + *utiltesting.MakeAdmissionCheckStrategyRule("ac3").Obj()). Obj() cases := map[string]struct { clusterQueues []*kueue.ClusterQueue wantInUseClusterQueueNames []string + check string }{ "single clusterQueue with check in use": { - clusterQueues: []*kueue.ClusterQueue{ - fooCq, - }, + clusterQueues: []*kueue.ClusterQueue{fooCq}, wantInUseClusterQueueNames: []string{fooCq.Name}, + check: "ac1", + }, + "single clusterQueue with AdmissionCheckStrategy in use": { + clusterQueues: []*kueue.ClusterQueue{strategyCq}, + wantInUseClusterQueueNames: []string{strategyCq.Name}, + check: "ac3", }, "single clusterQueue with no checks": { - clusterQueues: []*kueue.ClusterQueue{ - barCq, - }, + clusterQueues: []*kueue.ClusterQueue{barCq}, + check: "ac1", }, "multiple clusterQueues with checks in use": { clusterQueues: []*kueue.ClusterQueue{ fooCq, barCq, fizzCq, + strategyCq, }, - wantInUseClusterQueueNames: []string{fooCq.Name, fizzCq.Name}, + wantInUseClusterQueueNames: []string{fooCq.Name, fizzCq.Name, strategyCq.Name}, + check: "ac1", }, } for name, tc := range cases { t.Run(name, func(t *testing.T) { cache := New(utiltesting.NewFakeClient()) - cache.AddOrUpdateAdmissionCheck(admissionCheck1) - cache.AddOrUpdateAdmissionCheck(admissionCheck2) + for _, check := range checks { + cache.AddOrUpdateAdmissionCheck(check) + } for _, cq := range tc.clusterQueues { if err := cache.AddClusterQueue(context.Background(), cq); err != nil { @@ -3431,11 +3456,11 @@ func TestClusterQueuesUsingAdmissionChecks(t *testing.T) { } } - cqs := cache.ClusterQueuesUsingAdmissionCheck(admissionCheck1.Name) + cqs := cache.ClusterQueuesUsingAdmissionCheck(tc.check) if diff := cmp.Diff(tc.wantInUseClusterQueueNames, cqs, cmpopts.SortSlices(func(a, b string) bool { return a < b })); len(diff) != 0 { - t.Errorf("Unexpected flavor is in use by clusterQueues (-want,+got):\n%s", diff) + t.Errorf("Unexpected AdmissionCheck is in use by clusterQueues (-want,+got):\n%s", diff) } }) } diff --git a/pkg/cache/clusterqueue.go b/pkg/cache/clusterqueue.go index 29c857ec8c..2f700dc1c6 100644 --- a/pkg/cache/clusterqueue.go +++ b/pkg/cache/clusterqueue.go @@ -32,6 +32,7 @@ import ( kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1" "sigs.k8s.io/kueue/pkg/features" "sigs.k8s.io/kueue/pkg/metrics" + utilac "sigs.k8s.io/kueue/pkg/util/admissioncheck" "sigs.k8s.io/kueue/pkg/workload" ) @@ -62,8 +63,11 @@ type ClusterQueue struct { NamespaceSelector labels.Selector Preemption kueue.ClusterQueuePreemption FlavorFungibility kueue.FlavorFungibility - AdmissionChecks sets.Set[string] - Status metrics.ClusterQueueStatus + // Aggregates AdmissionChecks from both .spec.AdmissionChecks and .spec.AdmissionCheckStrategy + // Sets hold ResourceFlavors to which an AdmissionCheck should apply. + // In case its empty, it means an AdmissionCheck should apply to all ResourceFlavor + AdmissionChecks map[string]sets.Set[string] + Status metrics.ClusterQueueStatus // GuaranteedQuota records how much resource quota the ClusterQueue reserved // when feature LendingLimit is enabled and flavor's lendingLimit is not nil. GuaranteedQuota FlavorResourceQuantities @@ -199,7 +203,7 @@ func (c *ClusterQueue) update(in *kueue.ClusterQueue, resourceFlavors map[kueue. c.isStopped = ptr.Deref(in.Spec.StopPolicy, kueue.None) != kueue.None - c.AdmissionChecks = sets.New(in.Spec.AdmissionChecks...) + c.AdmissionChecks = utilac.NewAdmissionChecks(in) c.Usage = filterFlavorQuantities(c.Usage, in.Spec.ResourceGroups) c.AdmittedUsage = filterFlavorQuantities(c.AdmittedUsage, in.Spec.ResourceGroups) diff --git a/pkg/cache/clusterqueue_test.go b/pkg/cache/clusterqueue_test.go index 00f5e15198..581dee5a29 100644 --- a/pkg/cache/clusterqueue_test.go +++ b/pkg/cache/clusterqueue_test.go @@ -463,12 +463,20 @@ func TestClusterQueueUpdate(t *testing.T) { } func TestClusterQueueUpdateWithAdmissionCheck(t *testing.T) { - cq := utiltesting.MakeClusterQueue("cq"). + cqWithAC := utiltesting.MakeClusterQueue("cq"). AdmissionChecks("check1", "check2", "check3"). Obj() + cqWithACStrategy := utiltesting.MakeClusterQueue("cq2"). + AdmissionCheckStrategy( + *utiltesting.MakeAdmissionCheckStrategyRule("check1").Obj(), + *utiltesting.MakeAdmissionCheckStrategyRule("check2").Obj(), + *utiltesting.MakeAdmissionCheckStrategyRule("check3").Obj()). + Obj() + testcases := []struct { name string + cq *kueue.ClusterQueue cqStatus metrics.ClusterQueueStatus admissionChecks map[string]AdmissionCheck wantStatus metrics.ClusterQueueStatus @@ -476,6 +484,28 @@ func TestClusterQueueUpdateWithAdmissionCheck(t *testing.T) { }{ { name: "Pending clusterQueue updated valid AC list", + cq: cqWithAC, + cqStatus: pending, + admissionChecks: map[string]AdmissionCheck{ + "check1": { + Active: true, + Controller: "controller1", + }, + "check2": { + Active: true, + Controller: "controller2", + }, + "check3": { + Active: true, + Controller: "controller3", + }, + }, + wantStatus: active, + wantReason: "Ready", + }, + { + name: "Pending clusterQueue with an AC strategy updated valid AC list", + cq: cqWithACStrategy, cqStatus: pending, admissionChecks: map[string]AdmissionCheck{ "check1": { @@ -496,6 +526,24 @@ func TestClusterQueueUpdateWithAdmissionCheck(t *testing.T) { }, { name: "Active clusterQueue updated with not found AC", + cq: cqWithAC, + cqStatus: active, + admissionChecks: map[string]AdmissionCheck{ + "check1": { + Active: true, + Controller: "controller1", + }, + "check2": { + Active: true, + Controller: "controller2", + }, + }, + wantStatus: pending, + wantReason: "CheckNotFoundOrInactive", + }, + { + name: "Active clusterQueue with an AC strategy updated with not found AC", + cq: cqWithACStrategy, cqStatus: active, admissionChecks: map[string]AdmissionCheck{ "check1": { @@ -512,6 +560,28 @@ func TestClusterQueueUpdateWithAdmissionCheck(t *testing.T) { }, { name: "Active clusterQueue updated with inactive AC", + cq: cqWithAC, + cqStatus: active, + admissionChecks: map[string]AdmissionCheck{ + "check1": { + Active: true, + Controller: "controller1", + }, + "check2": { + Active: true, + Controller: "controller2", + }, + "check3": { + Active: false, + Controller: "controller3", + }, + }, + wantStatus: pending, + wantReason: "CheckNotFoundOrInactive", + }, + { + name: "Active clusterQueue with an AC strategy updated with inactive AC", + cq: cqWithACStrategy, cqStatus: active, admissionChecks: map[string]AdmissionCheck{ "check1": { @@ -532,6 +602,30 @@ func TestClusterQueueUpdateWithAdmissionCheck(t *testing.T) { }, { name: "Active clusterQueue updated with duplicate single instance AC Controller", + cq: cqWithAC, + cqStatus: active, + admissionChecks: map[string]AdmissionCheck{ + "check1": { + Active: true, + Controller: "controller1", + SingleInstanceInClusterQueue: true, + }, + "check2": { + Active: true, + Controller: "controller2", + }, + "check3": { + Active: true, + Controller: "controller2", + SingleInstanceInClusterQueue: true, + }, + }, + wantStatus: pending, + wantReason: "MultipleSingleInstanceControllerChecks", + }, + { + name: "Active clusterQueue with an AC strategy updated with duplicate single instance AC Controller", + cq: cqWithACStrategy, cqStatus: active, admissionChecks: map[string]AdmissionCheck{ "check1": { @@ -554,6 +648,28 @@ func TestClusterQueueUpdateWithAdmissionCheck(t *testing.T) { }, { name: "Terminating clusterQueue updated with valid AC list", + cq: cqWithAC, + cqStatus: terminating, + admissionChecks: map[string]AdmissionCheck{ + "check1": { + Active: true, + Controller: "controller1", + }, + "check2": { + Active: true, + Controller: "controller2", + }, + "check3": { + Active: true, + Controller: "controller3", + }, + }, + wantStatus: terminating, + wantReason: "Terminating", + }, + { + name: "Terminating clusterQueue with an AC strategy updated with valid AC list", + cq: cqWithACStrategy, cqStatus: terminating, admissionChecks: map[string]AdmissionCheck{ "check1": { @@ -574,6 +690,24 @@ func TestClusterQueueUpdateWithAdmissionCheck(t *testing.T) { }, { name: "Terminating clusterQueue updated with not found AC", + cq: cqWithAC, + cqStatus: terminating, + admissionChecks: map[string]AdmissionCheck{ + "check1": { + Active: true, + Controller: "controller1", + }, + "check2": { + Active: true, + Controller: "controller2", + }, + }, + wantStatus: terminating, + wantReason: "Terminating", + }, + { + name: "Terminating clusterQueue with an AC strategy updated with not found AC", + cq: cqWithACStrategy, cqStatus: terminating, admissionChecks: map[string]AdmissionCheck{ "check1": { @@ -593,7 +727,7 @@ func TestClusterQueueUpdateWithAdmissionCheck(t *testing.T) { for _, tc := range testcases { t.Run(tc.name, func(t *testing.T) { cache := New(utiltesting.NewFakeClient()) - cq, err := cache.newClusterQueue(cq) + cq, err := cache.newClusterQueue(tc.cq) if err != nil { t.Fatalf("failed to new clusterQueue %v", err) } diff --git a/pkg/cache/snapshot.go b/pkg/cache/snapshot.go index 146003242b..c5cdc3ff9e 100644 --- a/pkg/cache/snapshot.go +++ b/pkg/cache/snapshot.go @@ -143,8 +143,9 @@ func (c *ClusterQueue) snapshot() *ClusterQueue { Preemption: c.Preemption, NamespaceSelector: c.NamespaceSelector, Status: c.Status, - AdmissionChecks: c.AdmissionChecks.Clone(), + AdmissionChecks: utilmaps.DeepCopySets[string](c.AdmissionChecks), } + for fName, rUsage := range c.Usage { cc.Usage[fName] = maps.Clone(rUsage) } diff --git a/pkg/controller/core/workload_controller.go b/pkg/controller/core/workload_controller.go index 1c00783f34..0a964ce7b1 100644 --- a/pkg/controller/core/workload_controller.go +++ b/pkg/controller/core/workload_controller.go @@ -17,11 +17,14 @@ limitations under the License. package core import ( + "cmp" "context" "fmt" + "slices" "time" "github.com/go-logr/logr" + gocmp "github.com/google/go-cmp/cmp" corev1 "k8s.io/api/core/v1" nodev1 "k8s.io/api/node/v1" "k8s.io/apimachinery/pkg/api/equality" @@ -49,7 +52,8 @@ import ( "sigs.k8s.io/kueue/pkg/constants" "sigs.k8s.io/kueue/pkg/controller/core/indexer" "sigs.k8s.io/kueue/pkg/queue" - "sigs.k8s.io/kueue/pkg/util/slices" + utilac "sigs.k8s.io/kueue/pkg/util/admissioncheck" + utilslices "sigs.k8s.io/kueue/pkg/util/slices" "sigs.k8s.io/kueue/pkg/workload" ) @@ -253,17 +257,18 @@ func (r *WorkloadReconciler) reconcileCheckBasedEviction(ctx context.Context, wl } func (r *WorkloadReconciler) reconcileSyncAdmissionChecks(ctx context.Context, wl *kueue.Workload, cqName string) (bool, error) { - // because we need to react to API cluster queue events, the list of checks from a cache can lead to race conditions - queue := kueue.ClusterQueue{} - if err := r.client.Get(ctx, types.NamespacedName{Name: cqName}, &queue); err != nil { + log := ctrl.LoggerFrom(ctx) + + // because we need to react to API cluster cq events, the list of checks from a cache can lead to race conditions + cq := kueue.ClusterQueue{} + if err := r.client.Get(ctx, types.NamespacedName{Name: cqName}, &cq); err != nil { return false, err } - queueAdmissionChecks := queue.Spec.AdmissionChecks - newChecks, shouldUpdate := syncAdmissionCheckConditions(wl.Status.AdmissionChecks, queueAdmissionChecks) + admissionChecks := workload.AdmissionChecksForWorkload(log, wl, utilac.NewAdmissionChecks(&cq)) + newChecks, shouldUpdate := syncAdmissionCheckConditions(wl.Status.AdmissionChecks, admissionChecks) if shouldUpdate { - log := ctrl.LoggerFrom(ctx) - log.V(3).Info("The workload needs admission checks updates", "clusterQueue", klog.KRef("", cqName), "admissionChecks", queueAdmissionChecks) + log.V(3).Info("The workload needs admission checks updates", "clusterQueue", klog.KRef("", cqName), "admissionChecks", admissionChecks) wl.Status.AdmissionChecks = newChecks err := r.client.Status().Update(ctx, wl) return true, client.IgnoreNotFound(err) @@ -306,14 +311,14 @@ func (r *WorkloadReconciler) reconcileOnClusterQueueActiveState(ctx context.Cont return false, nil } -func syncAdmissionCheckConditions(conds []kueue.AdmissionCheckState, queueChecks []string) ([]kueue.AdmissionCheckState, bool) { - if len(queueChecks) == 0 { +func syncAdmissionCheckConditions(conds []kueue.AdmissionCheckState, admissionChecks sets.Set[string]) ([]kueue.AdmissionCheckState, bool) { + if len(admissionChecks) == 0 { return nil, len(conds) > 0 } shouldUpdate := false - currentChecks := slices.ToRefMap(conds, func(c *kueue.AdmissionCheckState) string { return c.Name }) - for _, t := range queueChecks { + currentChecks := utilslices.ToRefMap(conds, func(c *kueue.AdmissionCheckState) string { return c.Name }) + for t := range admissionChecks { if _, found := currentChecks[t]; !found { workload.SetAdmissionCheckState(&conds, kueue.AdmissionCheckState{ Name: t, @@ -324,18 +329,20 @@ func syncAdmissionCheckConditions(conds []kueue.AdmissionCheckState, queueChecks } // if the workload conditions length is bigger, then some cleanup should be done - if len(conds) > len(queueChecks) { - newConds := make([]kueue.AdmissionCheckState, 0, len(queueChecks)) - queueChecksSet := sets.New(queueChecks...) + if len(conds) > len(admissionChecks) { + newConds := make([]kueue.AdmissionCheckState, 0, len(admissionChecks)) shouldUpdate = true for i := range conds { c := &conds[i] - if queueChecksSet.Has(c.Name) { + if admissionChecks.Has(c.Name) { newConds = append(newConds, *c) } } conds = newConds } + slices.SortFunc(conds, func(state1, state2 kueue.AdmissionCheckState) int { + return cmp.Compare(state1.Name, state2.Name) + }) return conds, shouldUpdate } @@ -749,7 +756,8 @@ func (w *workloadCqHandler) Update(ctx context.Context, ev event.UpdateEvent, wq } if !newCq.DeletionTimestamp.IsZero() || - !slices.CmpNoOrder(oldCq.Spec.AdmissionChecks, newCq.Spec.AdmissionChecks) || + !utilslices.CmpNoOrder(oldCq.Spec.AdmissionChecks, newCq.Spec.AdmissionChecks) || + !gocmp.Equal(oldCq.Spec.AdmissionChecksStrategy, newCq.Spec.AdmissionChecksStrategy) || !ptr.Equal(oldCq.Spec.StopPolicy, newCq.Spec.StopPolicy) { w.queueReconcileForWorkloads(ctx, newCq.Name, wq) } diff --git a/pkg/controller/core/workload_controller_test.go b/pkg/controller/core/workload_controller_test.go index 4980e55450..1bd9f421dc 100644 --- a/pkg/controller/core/workload_controller_test.go +++ b/pkg/controller/core/workload_controller_test.go @@ -28,6 +28,7 @@ import ( "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/sets" testingclock "k8s.io/utils/clock/testing" "k8s.io/utils/ptr" "sigs.k8s.io/controller-runtime/pkg/client" @@ -285,13 +286,13 @@ func TestSyncCheckStates(t *testing.T) { for name, tc := range cases { t.Run(name, func(t *testing.T) { - gotStates, gotShouldChange := syncAdmissionCheckConditions(tc.states, tc.list) + gotStates, gotShouldChange := syncAdmissionCheckConditions(tc.states, sets.New(tc.list...)) if tc.wantChange != gotShouldChange { t.Errorf("Unexpected should change, want=%v", tc.wantChange) } - opts := []cmp.Option{} + var opts []cmp.Option if tc.ignoreTransitionTime { opts = append(opts, cmpopts.IgnoreFields(kueue.AdmissionCheckState{}, "LastTransitionTime")) } @@ -309,6 +310,7 @@ var ( kueue.Workload{}, "TypeMeta", "ObjectMeta.ResourceVersion", "Status.RequeueState.RequeueAt", ), cmpopts.IgnoreFields(metav1.Condition{}, "LastTransitionTime"), + cmpopts.IgnoreFields(kueue.AdmissionCheckState{}, "LastTransitionTime"), cmpopts.SortSlices(func(a, b metav1.Condition) bool { return a.Type < b.Type }), } ) @@ -317,11 +319,61 @@ func TestReconcile(t *testing.T) { testStartTime := time.Now() cases := map[string]struct { workload *kueue.Workload + cq *kueue.ClusterQueue + lq *kueue.LocalQueue wantWorkload *kueue.Workload wantError error wantEvents []utiltesting.EventRecord reconcilerOpts []Option }{ + "assign Admission Checks from ClusterQueue.spec.AdmissionCheckStrategy": { + workload: utiltesting.MakeWorkload("wl", "ns"). + ReserveQuota(utiltesting.MakeAdmission("cq").Assignment("cpu", "flavor1", "1").Obj()). + Queue("queue"). + Obj(), + cq: utiltesting.MakeClusterQueue("cq"). + AdmissionCheckStrategy( + *utiltesting.MakeAdmissionCheckStrategyRule("ac1", "flavor1").Obj(), + *utiltesting.MakeAdmissionCheckStrategyRule("ac2").Obj()). + Obj(), + lq: utiltesting.MakeLocalQueue("queue", "ns").ClusterQueue("cq").Obj(), + wantWorkload: utiltesting.MakeWorkload("wl", "ns"). + ReserveQuota(utiltesting.MakeAdmission("cq").Assignment("cpu", "flavor1", "1").Obj()). + Queue("queue"). + AdmissionChecks( + kueue.AdmissionCheckState{ + Name: "ac1", + State: kueue.CheckStatePending, + }, + kueue.AdmissionCheckState{ + Name: "ac2", + State: kueue.CheckStatePending, + }). + Obj(), + }, + "assign Admission Checks from ClusterQueue.spec.AdmissionChecks": { + workload: utiltesting.MakeWorkload("wl", "ns"). + ReserveQuota(utiltesting.MakeAdmission("cq").Assignment("cpu", "flavor1", "1").Obj()). + Queue("queue"). + Obj(), + cq: utiltesting.MakeClusterQueue("cq"). + AdmissionChecks("ac1", "ac2"). + Obj(), + lq: utiltesting.MakeLocalQueue("queue", "ns").ClusterQueue("cq").Obj(), + wantWorkload: utiltesting.MakeWorkload("wl", "ns"). + ReserveQuota(utiltesting.MakeAdmission("cq").Assignment("cpu", "flavor1", "1").Obj()). + Queue("queue"). + AdmissionChecks( + kueue.AdmissionCheckState{ + Name: "ac1", + State: kueue.CheckStatePending, + }, + kueue.AdmissionCheckState{ + Name: "ac2", + State: kueue.CheckStatePending, + }). + Obj(), + }, "admit": { workload: utiltesting.MakeWorkload("wl", "ns"). ReserveQuota(utiltesting.MakeAdmission("q1").Obj()). @@ -547,6 +599,18 @@ func TestReconcile(t *testing.T) { ctx, ctxCancel := context.WithCancel(ctxWithLogger) defer ctxCancel() + if tc.cq != nil { + if err := cl.Create(ctx, tc.cq); err != nil { + t.Errorf("couldn't create the cluster queue: %v", err) + } + if err := qManager.AddClusterQueue(ctx, tc.cq); err != nil { + t.Errorf("couldn't add the cluster queue to the cache: %v", err) + } + if err := qManager.AddLocalQueue(ctx, tc.lq); err != nil { + t.Errorf("couldn't add the local queue to the cache: %v", err) + } + } + _, gotError := reconciler.Reconcile(ctx, reconcile.Request{NamespacedName: client.ObjectKeyFromObject(tc.workload)}) if diff := cmp.Diff(tc.wantError, gotError); diff != "" { diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 706273fb4d..0c63db6cb8 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -277,7 +277,7 @@ func (s *Scheduler) schedule(ctx context.Context) { log.V(5).Info("Finished waiting for all admitted workloads to be in the PodsReady condition") } e.status = nominated - if err := s.admit(ctx, e, cq.AdmissionChecks); err != nil { + if err := s.admit(ctx, e, cq); err != nil { e.inadmissibleMsg = fmt.Sprintf("Failed to admit workload: %v", err) } if cq.Cohort != nil { @@ -506,7 +506,7 @@ func (s *Scheduler) validateLimitRange(ctx context.Context, wi *workload.Info) e // admit sets the admitting clusterQueue and flavors into the workload of // the entry, and asynchronously updates the object in the apiserver after // assuming it in the cache. -func (s *Scheduler) admit(ctx context.Context, e *entry, mustHaveChecks sets.Set[string]) error { +func (s *Scheduler) admit(ctx context.Context, e *entry, cq *cache.ClusterQueue) error { log := ctrl.LoggerFrom(ctx) newWorkload := e.Obj.DeepCopy() admission := &kueue.Admission{ @@ -515,7 +515,7 @@ func (s *Scheduler) admit(ctx context.Context, e *entry, mustHaveChecks sets.Set } workload.SetQuotaReservation(newWorkload, admission) - if workload.HasAllChecks(newWorkload, mustHaveChecks) { + if workload.HasAllChecks(newWorkload, workload.AdmissionChecksForWorkload(log, newWorkload, cq.AdmissionChecks)) { // sync Admitted, ignore the result since an API update is always done. _ = workload.SyncAdmittedCondition(newWorkload) } diff --git a/pkg/scheduler/scheduler_test.go b/pkg/scheduler/scheduler_test.go index 60a09d2794..95ce1874a9 100644 --- a/pkg/scheduler/scheduler_test.go +++ b/pkg/scheduler/scheduler_test.go @@ -1633,6 +1633,9 @@ func TestSchedule(t *testing.T) { if err := qManager.AddClusterQueue(ctx, &cq); err != nil { t.Fatalf("Inserting clusterQueue %s in manager: %v", cq.Name, err) } + if err := cl.Create(ctx, &cq); err != nil { + t.Errorf("couldn't create the cluster queue: %v", err) + } } scheduler := New(qManager, cqCache, cl, recorder, WithFairSharing(tc.enableFairSharing)) gotScheduled := make(map[string]kueue.Admission) diff --git a/pkg/util/admissioncheck/admissioncheck.go b/pkg/util/admissioncheck/admissioncheck.go index 66eff4da30..52b5f82857 100644 --- a/pkg/util/admissioncheck/admissioncheck.go +++ b/pkg/util/admissioncheck/admissioncheck.go @@ -24,6 +24,7 @@ import ( "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/sets" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/apiutil" @@ -154,3 +155,18 @@ func FilterProvReqAnnotations(annotations map[string]string) map[string]string { } return res } + +// NewAdmissionChecks aggregates AdmissionChecks from .spec.AdmissionChecks and .spec.AdmissionChecksStrategy +func NewAdmissionChecks(cq *kueue.ClusterQueue) map[string]sets.Set[string] { + checks := make(map[string]sets.Set[string], len(cq.Spec.AdmissionChecks)+len(cq.Spec.AdmissionChecksStrategy.AdmissionChecks)) + for _, checkName := range cq.Spec.AdmissionChecks { + checks[checkName] = sets.New[string]() + } + for _, check := range cq.Spec.AdmissionChecksStrategy.AdmissionChecks { + checks[check.Name] = sets.New[string]() + for _, flavor := range check.OnFlavors { + checks[check.Name].Insert(string(flavor)) + } + } + return checks +} diff --git a/pkg/util/maps/maps.go b/pkg/util/maps/maps.go index 675d72fa50..5eb5edaca5 100644 --- a/pkg/util/maps/maps.go +++ b/pkg/util/maps/maps.go @@ -21,6 +21,8 @@ package maps import ( "fmt" "maps" + + "k8s.io/apimachinery/pkg/util/sets" ) // Merge merges a and b while resolving the conflicts by calling commonKeyValue @@ -108,3 +110,12 @@ func FilterKeys[K comparable, V any, M ~map[K]V](m M, k []K) M { } return ret } + +// DeepCopySets create a deep copy of map[string]Set which would otherwise be referenced +func DeepCopySets[T comparable](src map[string]sets.Set[T]) map[string]sets.Set[T] { + copy := make(map[string]sets.Set[T], len(src)) + for key, set := range src { + copy[key] = set.Clone() + } + return copy +} diff --git a/pkg/util/testing/wrappers.go b/pkg/util/testing/wrappers.go index 8653eae92b..0b20098a05 100644 --- a/pkg/util/testing/wrappers.go +++ b/pkg/util/testing/wrappers.go @@ -565,6 +565,11 @@ func (c *ClusterQueueWrapper) Cohort(cohort string) *ClusterQueueWrapper { return c } +func (c *ClusterQueueWrapper) AdmissionCheckStrategy(acs ...kueue.AdmissionCheckStrategyRule) *ClusterQueueWrapper { + c.Spec.AdmissionChecksStrategy.AdmissionChecks = acs + return c +} + // ResourceGroup adds a ResourceGroup with flavors. func (c *ClusterQueueWrapper) ResourceGroup(flavors ...kueue.FlavorQuotas) *ClusterQueueWrapper { rg := kueue.ResourceGroup{ @@ -808,6 +813,31 @@ func MakeAdmissionCheck(name string) *AdmissionCheckWrapper { } } +type AdmissionCheckStrategyRuleWrapper struct { + kueue.AdmissionCheckStrategyRule +} + +func MakeAdmissionCheckStrategyRule(name string, flavors ...kueue.ResourceFlavorReference) *AdmissionCheckStrategyRuleWrapper { + if len(flavors) == 0 { + flavors = make([]kueue.ResourceFlavorReference, 0) + } + return &AdmissionCheckStrategyRuleWrapper{ + AdmissionCheckStrategyRule: kueue.AdmissionCheckStrategyRule{ + Name: name, + OnFlavors: flavors, + }, + } +} + +func (acs *AdmissionCheckStrategyRuleWrapper) OnFlavors(flavors []kueue.ResourceFlavorReference) *AdmissionCheckStrategyRuleWrapper { + acs.AdmissionCheckStrategyRule.OnFlavors = flavors + return acs +} + +func (acs *AdmissionCheckStrategyRuleWrapper) Obj() *kueue.AdmissionCheckStrategyRule { + return &acs.AdmissionCheckStrategyRule +} + func (ac *AdmissionCheckWrapper) Active(status metav1.ConditionStatus) *AdmissionCheckWrapper { apimeta.SetStatusCondition(&ac.Status.Conditions, metav1.Condition{ Type: kueue.AdmissionCheckActive, diff --git a/pkg/webhooks/clusterqueue_webhook.go b/pkg/webhooks/clusterqueue_webhook.go index 9d118e8f9b..9be05d9d38 100644 --- a/pkg/webhooks/clusterqueue_webhook.go +++ b/pkg/webhooks/clusterqueue_webhook.go @@ -102,6 +102,10 @@ func ValidateClusterQueue(cq *kueue.ClusterQueue) field.ErrorList { allErrs = append(allErrs, validateResourceGroups(cq.Spec.ResourceGroups, cq.Spec.Cohort, path.Child("resourceGroups"))...) allErrs = append(allErrs, validation.ValidateLabelSelector(cq.Spec.NamespaceSelector, validation.LabelSelectorValidationOptions{}, path.Child("namespaceSelector"))...) + allErrs = append(allErrs, validateCQAdmissionChecks(&cq.Spec, path)...) + if cq.Spec.Preemption != nil { + allErrs = append(allErrs, validatePreemption(cq.Spec.Preemption, path.Child("preemption"))...) + } return allErrs } @@ -111,6 +115,25 @@ func ValidateClusterQueueUpdate(newObj, oldObj *kueue.ClusterQueue) field.ErrorL return allErrs } +func validatePreemption(preemption *kueue.ClusterQueuePreemption, path *field.Path) field.ErrorList { + var allErrs field.ErrorList + if preemption.ReclaimWithinCohort == kueue.PreemptionPolicyNever && + preemption.BorrowWithinCohort != nil && + preemption.BorrowWithinCohort.Policy != kueue.BorrowWithinCohortPolicyNever { + allErrs = append(allErrs, field.Invalid(path, preemption, "reclaimWithinCohort=Never and borrowWithinCohort.Policy!=Never")) + } + return allErrs +} + +func validateCQAdmissionChecks(spec *kueue.ClusterQueueSpec, path *field.Path) field.ErrorList { + var allErrs field.ErrorList + if len(spec.AdmissionChecksStrategy.AdmissionChecks) != 0 && len(spec.AdmissionChecks) != 0 { + allErrs = append(allErrs, field.Invalid(path, spec, "Either AdmissionChecks or AdmissionCheckStrategy can be set, but not both")) + } + + return allErrs +} + func validateResourceGroups(resourceGroups []kueue.ResourceGroup, cohort string, path *field.Path) field.ErrorList { var allErrs field.ErrorList seenResources := sets.New[corev1.ResourceName]() diff --git a/pkg/webhooks/clusterqueue_webhook_test.go b/pkg/webhooks/clusterqueue_webhook_test.go index a8470951ad..ab6621da02 100644 --- a/pkg/webhooks/clusterqueue_webhook_test.go +++ b/pkg/webhooks/clusterqueue_webhook_test.go @@ -56,6 +56,30 @@ func TestValidateClusterQueue(t *testing.T) { field.Invalid(resourceGroupsPath.Index(0).Child("coveredResources").Index(0), "@cpu", ""), }, }, + { + name: "admissionChecks defined", + clusterQueue: testingutil.MakeClusterQueue("cluster-queue"). + AdmissionChecks("ac1"). + Obj(), + }, + { + name: "admissionCheckStrategy defined", + clusterQueue: testingutil.MakeClusterQueue("cluster-queue"). + AdmissionCheckStrategy( + *testingutil.MakeAdmissionCheckStrategyRule("ac1", "flavor1").Obj(), + ).Obj(), + }, + { + name: "both admissionChecks and admissionCheckStrategy is defined", + clusterQueue: testingutil.MakeClusterQueue("cluster-queue"). + AdmissionChecks("ac1"). + AdmissionCheckStrategy( + *testingutil.MakeAdmissionCheckStrategyRule("ac1", "flavor1").Obj(), + ).Obj(), + wantErr: field.ErrorList{ + field.Invalid(specPath, "spec", "Either AdmissionChecks or AdmissionCheckStrategy can be set, but not both"), + }, + }, { name: "in cohort", clusterQueue: testingutil.MakeClusterQueue("cluster-queue").Cohort("prod").Obj(), diff --git a/pkg/workload/workload.go b/pkg/workload/workload.go index f01133eee1..57d5b2e30f 100644 --- a/pkg/workload/workload.go +++ b/pkg/workload/workload.go @@ -22,10 +22,13 @@ import ( "maps" "strings" + "github.com/go-logr/logr" corev1 "k8s.io/api/core/v1" apimeta "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/klog/v2" "k8s.io/utils/ptr" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" @@ -36,6 +39,7 @@ import ( "sigs.k8s.io/kueue/pkg/features" "sigs.k8s.io/kueue/pkg/util/api" "sigs.k8s.io/kueue/pkg/util/limitrange" + utilmaps "sigs.k8s.io/kueue/pkg/util/maps" ) var ( @@ -543,3 +547,49 @@ func RemoveFinalizer(ctx context.Context, c client.Client, wl *kueue.Workload) e } return nil } + +// AdmissionChecksForWorkload returns AdmissionChecks that should be assigned to a specific Workload based on +// ClusterQueue configuration and ResourceFlavors +func AdmissionChecksForWorkload(log logr.Logger, wl *kueue.Workload, admissionChecks map[string]sets.Set[string]) sets.Set[string] { + // If all admissionChecks should be run for all flavors we don't need to wait for Workload's Admission to be set. + // This is also the case if admissionChecks are specified with ClusterQueue.Spec.AdmissionChecks instead of + // ClusterQueue.Spec.AdmissionCheckStrategy + allFlavors := true + for _, flavors := range admissionChecks { + if len(flavors) != 0 { + allFlavors = false + } + } + if allFlavors { + return sets.New(utilmaps.Keys(admissionChecks)...) + } + + // Kueue sets AdmissionChecks first based on ClusterQueue configuration and at this point Workload has no + // ResourceFlavors assigned, so we cannot match AdmissionChecks to ResourceFlavor. + // After Quota is reserved, another reconciliation happens and we can match AdmissionChecks to ResourceFlavors + if wl.Status.Admission == nil { + log.V(2).Info("Workload has no Admission", "Workload", klog.KObj(wl)) + return nil + } + + var assignedFlavors []kueue.ResourceFlavorReference + for _, podSet := range wl.Status.Admission.PodSetAssignments { + for _, flavor := range podSet.Flavors { + assignedFlavors = append(assignedFlavors, flavor) + } + } + + acNames := sets.New[string]() + for acName, flavors := range admissionChecks { + if len(flavors) == 0 { + acNames.Insert(acName) + continue + } + for _, fName := range assignedFlavors { + if flavors.Has(string(fName)) { + acNames.Insert(acName) + } + } + } + return acNames +} diff --git a/pkg/workload/workload_test.go b/pkg/workload/workload_test.go index c3322b4784..78a6bc0f54 100644 --- a/pkg/workload/workload_test.go +++ b/pkg/workload/workload_test.go @@ -26,11 +26,13 @@ import ( corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/utils/ptr" "sigs.k8s.io/controller-runtime/pkg/client" config "sigs.k8s.io/kueue/apis/config/v1beta1" kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1" + utilac "sigs.k8s.io/kueue/pkg/util/admissioncheck" utiltesting "sigs.k8s.io/kueue/pkg/util/testing" ) @@ -647,3 +649,71 @@ func TestResourceUsage(t *testing.T) { }) } } + +func TestAdmissionCheckStrategy(t *testing.T) { + cases := map[string]struct { + cq *kueue.ClusterQueue + wl *kueue.Workload + wantAdmissionChecks sets.Set[string] + }{ + "AdmissionCheckStrategy with a flavor": { + wl: utiltesting.MakeWorkload("wl", "ns"). + ReserveQuota(utiltesting.MakeAdmission("cq").Assignment("cpu", "flavor1", "1").Obj()). + Obj(), + cq: utiltesting.MakeClusterQueue("cq"). + AdmissionCheckStrategy(*utiltesting.MakeAdmissionCheckStrategyRule("ac1", "flavor1").Obj()). + Obj(), + wantAdmissionChecks: sets.New("ac1"), + }, + "AdmissionCheckStrategy with an unmatched flavor": { + wl: utiltesting.MakeWorkload("wl", "ns"). + ReserveQuota(utiltesting.MakeAdmission("cq").Assignment("cpu", "flavor1", "1").Obj()). + Obj(), + cq: utiltesting.MakeClusterQueue("cq"). + AdmissionCheckStrategy(*utiltesting.MakeAdmissionCheckStrategyRule("ac1", "unmatched-flavor").Obj()). + Obj(), + wantAdmissionChecks: nil, + }, + "AdmissionCheckStrategy without a flavor": { + wl: utiltesting.MakeWorkload("wl", "ns"). + ReserveQuota(utiltesting.MakeAdmission("cq").Assignment("cpu", "flavor1", "1").Obj()). + Obj(), + cq: utiltesting.MakeClusterQueue("cq"). + AdmissionCheckStrategy(*utiltesting.MakeAdmissionCheckStrategyRule("ac1").Obj()). + Obj(), + wantAdmissionChecks: sets.New("ac1"), + }, + "Two AdmissionCheckStrategies, one with flavor, one without flavor": { + wl: utiltesting.MakeWorkload("wl", "ns"). + ReserveQuota(utiltesting.MakeAdmission("cq").Assignment("cpu", "flavor1", "1").Obj()). + Obj(), + cq: utiltesting.MakeClusterQueue("cq"). + AdmissionCheckStrategy( + *utiltesting.MakeAdmissionCheckStrategyRule("ac1", "flavor1").Obj(), + *utiltesting.MakeAdmissionCheckStrategyRule("ac2").Obj()). + Obj(), + wantAdmissionChecks: sets.New("ac1", "ac2"), + }, + "Workload has no QuotaReserved": { + wl: utiltesting.MakeWorkload("wl", "ns"). + Obj(), + cq: utiltesting.MakeClusterQueue("cq"). + AdmissionCheckStrategy( + *utiltesting.MakeAdmissionCheckStrategyRule("ac1", "flavor1").Obj(), + *utiltesting.MakeAdmissionCheckStrategyRule("ac2").Obj()). + Obj(), + wantAdmissionChecks: nil, + }, + } + for name, tc := range cases { + t.Run(name, func(t *testing.T) { + _, log := utiltesting.ContextWithLog(t) + gotAdmissionChecks := AdmissionChecksForWorkload(log, tc.wl, utilac.NewAdmissionChecks(tc.cq)) + + if diff := cmp.Diff(tc.wantAdmissionChecks, gotAdmissionChecks); diff != "" { + t.Errorf("Unexpected AdmissionChecks, (want-/got+):\n%s", diff) + } + }) + + } +} diff --git a/site/static/examples/provisioning/provisioning-setup.yaml b/site/static/examples/provisioning/provisioning-setup.yaml index aa48ceffaa..5dd5d7c878 100644 --- a/site/static/examples/provisioning/provisioning-setup.yaml +++ b/site/static/examples/provisioning/provisioning-setup.yaml @@ -20,8 +20,9 @@ spec: nominalQuota: 36Gi - name: "nvidia.com/gpu" nominalQuota: 9 - admissionChecks: - - sample-prov + admissionChecksStrategy: + admissionChecks: + - name: "sample-prov" --- apiVersion: kueue.x-k8s.io/v1beta1 kind: LocalQueue diff --git a/test/integration/scheduler/workload_controller_test.go b/test/integration/scheduler/workload_controller_test.go index 4fd35ab093..5129e92c73 100644 --- a/test/integration/scheduler/workload_controller_test.go +++ b/test/integration/scheduler/workload_controller_test.go @@ -28,6 +28,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1" + "sigs.k8s.io/kueue/pkg/util/slices" "sigs.k8s.io/kueue/pkg/util/testing" "sigs.k8s.io/kueue/pkg/workload" "sigs.k8s.io/kueue/test/util" @@ -69,6 +70,123 @@ var _ = ginkgo.Describe("Workload controller with scheduler", func() { updatedCQ = kueue.ClusterQueue{} }) + ginkgo.When("the queue has admission check strategies", func() { + var ( + flavor1 *kueue.ResourceFlavor + flavor2 *kueue.ResourceFlavor + check1 *kueue.AdmissionCheck + check2 *kueue.AdmissionCheck + check3 *kueue.AdmissionCheck + reservationFlavor string = "reservation" + updatedWl kueue.Workload + flavorOnDemand string = "on-demand" + resourceGPU corev1.ResourceName = "example.com/gpu" + ) + + ginkgo.BeforeEach(func() { + flavor1 = testing.MakeResourceFlavor(flavorOnDemand).Obj() + gomega.Expect(k8sClient.Create(ctx, flavor1)).Should(gomega.Succeed()) + + flavor2 = testing.MakeResourceFlavor(reservationFlavor).Obj() + gomega.Expect(k8sClient.Create(ctx, flavor2)).Should(gomega.Succeed()) + + check1 = testing.MakeAdmissionCheck("check1").ControllerName("ctrl1").Obj() + gomega.Expect(k8sClient.Create(ctx, check1)).Should(gomega.Succeed()) + util.SetAdmissionCheckActive(ctx, k8sClient, check1, metav1.ConditionTrue) + + check2 = testing.MakeAdmissionCheck("check2").ControllerName("ctrl2").Obj() + gomega.Expect(k8sClient.Create(ctx, check2)).Should(gomega.Succeed()) + util.SetAdmissionCheckActive(ctx, k8sClient, check2, metav1.ConditionTrue) + + check3 = testing.MakeAdmissionCheck("check3").ControllerName("ctrl3").Obj() + gomega.Expect(k8sClient.Create(ctx, check3)).Should(gomega.Succeed()) + util.SetAdmissionCheckActive(ctx, k8sClient, check3, metav1.ConditionTrue) + + clusterQueue = testing.MakeClusterQueue("cluster-queue"). + AdmissionCheckStrategy( + *testing.MakeAdmissionCheckStrategyRule("check1", kueue.ResourceFlavorReference(flavorOnDemand)).Obj(), + *testing.MakeAdmissionCheckStrategyRule("check2").Obj(), + *testing.MakeAdmissionCheckStrategyRule("check3", kueue.ResourceFlavorReference(reservationFlavor)).Obj()). + ResourceGroup( + *testing.MakeFlavorQuotas(reservationFlavor).Resource(resourceGPU, "1", "1").Obj(), + *testing.MakeFlavorQuotas(flavorOnDemand).Resource(resourceGPU, "5", "5").Obj()). + Cohort("cohort"). + Obj() + gomega.Expect(k8sClient.Create(ctx, clusterQueue)).To(gomega.Succeed()) + + localQueue = testing.MakeLocalQueue("queue", ns.Name).ClusterQueue(clusterQueue.Name).Obj() + gomega.Expect(k8sClient.Create(ctx, localQueue)).To(gomega.Succeed()) + }) + + ginkgo.AfterEach(func() { + gomega.Expect(util.DeleteNamespace(ctx, k8sClient, ns)).To(gomega.Succeed()) + gomega.Expect(util.DeleteLocalQueue(ctx, k8sClient, localQueue)).To(gomega.Succeed()) + util.ExpectClusterQueueToBeDeleted(ctx, k8sClient, clusterQueue, true) + util.ExpectAdmissionCheckToBeDeleted(ctx, k8sClient, check3, true) + util.ExpectAdmissionCheckToBeDeleted(ctx, k8sClient, check2, true) + util.ExpectAdmissionCheckToBeDeleted(ctx, k8sClient, check1, true) + util.ExpectResourceFlavorToBeDeleted(ctx, k8sClient, flavor1, true) + util.ExpectResourceFlavorToBeDeleted(ctx, k8sClient, flavor2, true) + }) + + ginkgo.It("the workload should have appropriate AdditionalChecks added", func() { + wl := testing.MakeWorkload("wl", ns.Name). + Queue("queue"). + Request(resourceGPU, "3"). + Obj() + wlKey := client.ObjectKeyFromObject(wl) + + ginkgo.By("creating and waiting for workload to have a quota reservation", func() { + gomega.Expect(k8sClient.Create(ctx, wl)).To(gomega.Succeed()) + gomega.Eventually(func(g gomega.Gomega) { + g.Expect(k8sClient.Get(ctx, wlKey, &updatedWl)).Should(gomega.Succeed()) + g.Expect(workload.HasQuotaReservation(&updatedWl)).Should(gomega.BeTrue(), "should have quota reservation") + + checks := slices.Map(updatedWl.Status.AdmissionChecks, func(c *kueue.AdmissionCheckState) string { return c.Name }) + g.Expect(checks).Should(gomega.ConsistOf("check1", "check2")) + }, util.Timeout, util.Interval).Should(gomega.Succeed()) + gomega.Expect(workload.IsAdmitted(&updatedWl)).To(gomega.BeFalse()) + }) + + ginkgo.By("adding an additional admission check to the clusterqueue", func() { + createdQueue := kueue.ClusterQueue{} + queueKey := client.ObjectKeyFromObject(clusterQueue) + gomega.Expect(k8sClient.Get(ctx, queueKey, &createdQueue)).To(gomega.Succeed()) + createdQueue.Spec.AdmissionChecksStrategy.AdmissionChecks = []kueue.AdmissionCheckStrategyRule{ + *testing.MakeAdmissionCheckStrategyRule("check1", kueue.ResourceFlavorReference(flavorOnDemand)).Obj(), + *testing.MakeAdmissionCheckStrategyRule("check2", kueue.ResourceFlavorReference(reservationFlavor)).Obj(), + *testing.MakeAdmissionCheckStrategyRule("check3").Obj()} + gomega.Expect(k8sClient.Update(ctx, &createdQueue)).To(gomega.Succeed()) + + gomega.Eventually(func(g gomega.Gomega) { + g.Expect(k8sClient.Get(ctx, wlKey, &updatedWl)).To(gomega.Succeed()) + checks := slices.Map(updatedWl.Status.AdmissionChecks, func(c *kueue.AdmissionCheckState) string { return c.Name }) + g.Expect(checks).Should(gomega.ConsistOf("check1", "check3")) + }, util.Timeout, util.Interval).Should(gomega.Succeed()) + }) + + ginkgo.By("marking the checks as passed", func() { + gomega.Expect(k8sClient.Get(ctx, wlKey, &updatedWl)).To(gomega.Succeed()) + workload.SetAdmissionCheckState(&updatedWl.Status.AdmissionChecks, kueue.AdmissionCheckState{ + Name: "check1", + State: kueue.CheckStateReady, + Message: "check successfully passed", + }) + workload.SetAdmissionCheckState(&updatedWl.Status.AdmissionChecks, kueue.AdmissionCheckState{ + Name: "check3", + State: kueue.CheckStateReady, + Message: "check successfully passed", + }) + gomega.Expect(k8sClient.Status().Update(ctx, &updatedWl)).Should(gomega.Succeed()) + + gomega.Eventually(func(g gomega.Gomega) { + g.Expect(k8sClient.Get(ctx, wlKey, &updatedWl)).Should(gomega.Succeed()) + g.Expect(workload.IsAdmitted(&updatedWl)).Should(gomega.BeTrue(), "should have been admitted") + }, util.Timeout, util.Interval).Should(gomega.Succeed()) + }) + }) + }) + ginkgo.When("Workload with RuntimeClass defined", func() { ginkgo.BeforeEach(func() { gomega.Expect(k8sClient.Create(ctx, onDemandFlavor)).To(gomega.Succeed())