diff --git a/apis/kueue/v1beta1/clusterqueue_types.go b/apis/kueue/v1beta1/clusterqueue_types.go index 807cabc6d6..812e31dc19 100644 --- a/apis/kueue/v1beta1/clusterqueue_types.go +++ b/apis/kueue/v1beta1/clusterqueue_types.go @@ -98,10 +98,16 @@ type ClusterQueueSpec struct { // +kubebuilder:default={} Preemption *ClusterQueuePreemption `json:"preemption,omitempty"` - // admissionChecks lists the AdmissionChecks required by this ClusterQueue + // admissionChecks lists the AdmissionChecks required by this ClusterQueue. + // Cannot be used along with AdmissionCheckStrategy. // +optional AdmissionChecks []string `json:"admissionChecks,omitempty"` + // admissionCheckStrategy defines a list of strategies to determine which ResourceFlavors require AdmissionChecks. + // This property cannot be used in conjunction with the 'admissionChecks' property. + // +optional + AdmissionChecksStrategy AdmissionChecksStrategy `json:"admissionChecksStrategy,omitempty"` + // stopPolicy - if set to a value different from None, the ClusterQueue is considered Inactive, no new reservation being // made. // @@ -117,6 +123,23 @@ type ClusterQueueSpec struct { StopPolicy *StopPolicy `json:"stopPolicy,omitempty"` } +// AdmissionCheckStrategy defines a strategy for a AdmissionCheck. +type AdmissionChecksStrategy struct { + // admissionChecks is a list of strategies for AdmissionChecks + AdmissionChecks []AdmissionCheckStrategyRule `json:"admissionChecks,omitempty"` +} + +// AdmissionCheckStrategyRule defines rules for a single AdmissionCheck +type AdmissionCheckStrategyRule struct { + // name is an AdmissionCheck's name. + Name string `json:"name"` + + // onFlavors is a list of ResourceFlavors' names that this AdmissionCheck should run for. + // If empty, the AdmissionCheck will run for all workloads submitted to the ClusterQueue. + // +optional + OnFlavors []ResourceFlavorReference `json:"onFlavors"` +} + type QueueingStrategy string const ( diff --git a/apis/kueue/v1beta1/zz_generated.deepcopy.go b/apis/kueue/v1beta1/zz_generated.deepcopy.go index 842c5abfd5..80a883d806 100644 --- a/apis/kueue/v1beta1/zz_generated.deepcopy.go +++ b/apis/kueue/v1beta1/zz_generated.deepcopy.go @@ -192,6 +192,48 @@ func (in *AdmissionCheckStatus) DeepCopy() *AdmissionCheckStatus { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *AdmissionCheckStrategyRule) DeepCopyInto(out *AdmissionCheckStrategyRule) { + *out = *in + if in.OnFlavors != nil { + in, out := &in.OnFlavors, &out.OnFlavors + *out = make([]ResourceFlavorReference, len(*in)) + copy(*out, *in) + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new AdmissionCheckStrategyRule. +func (in *AdmissionCheckStrategyRule) DeepCopy() *AdmissionCheckStrategyRule { + if in == nil { + return nil + } + out := new(AdmissionCheckStrategyRule) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *AdmissionChecksStrategy) DeepCopyInto(out *AdmissionChecksStrategy) { + *out = *in + if in.AdmissionChecks != nil { + in, out := &in.AdmissionChecks, &out.AdmissionChecks + *out = make([]AdmissionCheckStrategyRule, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new AdmissionChecksStrategy. +func (in *AdmissionChecksStrategy) DeepCopy() *AdmissionChecksStrategy { + if in == nil { + return nil + } + out := new(AdmissionChecksStrategy) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *BorrowWithinCohort) DeepCopyInto(out *BorrowWithinCohort) { *out = *in @@ -357,6 +399,7 @@ func (in *ClusterQueueSpec) DeepCopyInto(out *ClusterQueueSpec) { *out = make([]string, len(*in)) copy(*out, *in) } + in.AdmissionChecksStrategy.DeepCopyInto(&out.AdmissionChecksStrategy) if in.StopPolicy != nil { in, out := &in.StopPolicy, &out.StopPolicy *out = new(StopPolicy) diff --git a/charts/kueue/templates/crd/kueue.x-k8s.io_clusterqueues.yaml b/charts/kueue/templates/crd/kueue.x-k8s.io_clusterqueues.yaml index a81c243db6..cdc1cdcbbe 100644 --- a/charts/kueue/templates/crd/kueue.x-k8s.io_clusterqueues.yaml +++ b/charts/kueue/templates/crd/kueue.x-k8s.io_clusterqueues.yaml @@ -74,11 +74,42 @@ spec: description: ClusterQueueSpec defines the desired state of ClusterQueue properties: admissionChecks: - description: admissionChecks lists the AdmissionChecks required by - this ClusterQueue + description: |- + admissionChecks lists the AdmissionChecks required by this ClusterQueue. + Cannot be used along with AdmissionCheckStrategy. items: type: string type: array + admissionChecksStrategy: + description: |- + admissionCheckStrategy defines a list of strategies to determine which ResourceFlavors require AdmissionChecks. + This property cannot be used in conjunction with the 'admissionChecks' property. + properties: + admissionChecks: + description: admissionChecks is a list of strategies for AdmissionChecks + items: + description: AdmissionCheckStrategyRule defines rules for a + single AdmissionCheck + properties: + name: + description: name is an AdmissionCheck's name. + type: string + onFlavors: + description: |- + onFlavors is a list of ResourceFlavors' names that this AdmissionCheck should run for. + If empty, the AdmissionCheck will run for all workloads submitted to the ClusterQueue. + items: + description: ResourceFlavorReference is the name of the + ResourceFlavor. + maxLength: 253 + pattern: ^[a-z0-9]([-a-z0-9]*[a-z0-9])?(\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*$ + type: string + type: array + required: + - name + type: object + type: array + type: object cohort: description: |- cohort that this ClusterQueue belongs to. CQs that belong to the diff --git a/client-go/applyconfiguration/kueue/v1beta1/admissionchecksstrategy.go b/client-go/applyconfiguration/kueue/v1beta1/admissionchecksstrategy.go new file mode 100644 index 0000000000..fb0c4ce292 --- /dev/null +++ b/client-go/applyconfiguration/kueue/v1beta1/admissionchecksstrategy.go @@ -0,0 +1,43 @@ +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +// Code generated by applyconfiguration-gen. DO NOT EDIT. + +package v1beta1 + +// AdmissionChecksStrategyApplyConfiguration represents an declarative configuration of the AdmissionChecksStrategy type for use +// with apply. +type AdmissionChecksStrategyApplyConfiguration struct { + AdmissionChecks []AdmissionCheckStrategyRuleApplyConfiguration `json:"admissionChecks,omitempty"` +} + +// AdmissionChecksStrategyApplyConfiguration constructs an declarative configuration of the AdmissionChecksStrategy type for use with +// apply. +func AdmissionChecksStrategy() *AdmissionChecksStrategyApplyConfiguration { + return &AdmissionChecksStrategyApplyConfiguration{} +} + +// WithAdmissionChecks adds the given value to the AdmissionChecks field in the declarative configuration +// and returns the receiver, so that objects can be build by chaining "With" function invocations. +// If called multiple times, values provided by each call will be appended to the AdmissionChecks field. +func (b *AdmissionChecksStrategyApplyConfiguration) WithAdmissionChecks(values ...*AdmissionCheckStrategyRuleApplyConfiguration) *AdmissionChecksStrategyApplyConfiguration { + for i := range values { + if values[i] == nil { + panic("nil value passed to WithAdmissionChecks") + } + b.AdmissionChecks = append(b.AdmissionChecks, *values[i]) + } + return b +} diff --git a/client-go/applyconfiguration/kueue/v1beta1/admissioncheckstrategyrule.go b/client-go/applyconfiguration/kueue/v1beta1/admissioncheckstrategyrule.go new file mode 100644 index 0000000000..84a8e6655b --- /dev/null +++ b/client-go/applyconfiguration/kueue/v1beta1/admissioncheckstrategyrule.go @@ -0,0 +1,53 @@ +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +// Code generated by applyconfiguration-gen. DO NOT EDIT. + +package v1beta1 + +import ( + v1beta1 "sigs.k8s.io/kueue/apis/kueue/v1beta1" +) + +// AdmissionCheckStrategyRuleApplyConfiguration represents an declarative configuration of the AdmissionCheckStrategyRule type for use +// with apply. +type AdmissionCheckStrategyRuleApplyConfiguration struct { + Name *string `json:"name,omitempty"` + OnFlavors []v1beta1.ResourceFlavorReference `json:"onFlavors,omitempty"` +} + +// AdmissionCheckStrategyRuleApplyConfiguration constructs an declarative configuration of the AdmissionCheckStrategyRule type for use with +// apply. +func AdmissionCheckStrategyRule() *AdmissionCheckStrategyRuleApplyConfiguration { + return &AdmissionCheckStrategyRuleApplyConfiguration{} +} + +// WithName sets the Name field in the declarative configuration to the given value +// and returns the receiver, so that objects can be built by chaining "With" function invocations. +// If called multiple times, the Name field is set to the value of the last call. +func (b *AdmissionCheckStrategyRuleApplyConfiguration) WithName(value string) *AdmissionCheckStrategyRuleApplyConfiguration { + b.Name = &value + return b +} + +// WithOnFlavors adds the given value to the OnFlavors field in the declarative configuration +// and returns the receiver, so that objects can be build by chaining "With" function invocations. +// If called multiple times, values provided by each call will be appended to the OnFlavors field. +func (b *AdmissionCheckStrategyRuleApplyConfiguration) WithOnFlavors(values ...v1beta1.ResourceFlavorReference) *AdmissionCheckStrategyRuleApplyConfiguration { + for i := range values { + b.OnFlavors = append(b.OnFlavors, values[i]) + } + return b +} diff --git a/client-go/applyconfiguration/kueue/v1beta1/clusterqueuespec.go b/client-go/applyconfiguration/kueue/v1beta1/clusterqueuespec.go index 564c24ab9a..1ae653aa53 100644 --- a/client-go/applyconfiguration/kueue/v1beta1/clusterqueuespec.go +++ b/client-go/applyconfiguration/kueue/v1beta1/clusterqueuespec.go @@ -25,14 +25,15 @@ import ( // ClusterQueueSpecApplyConfiguration represents an declarative configuration of the ClusterQueueSpec type for use // with apply. type ClusterQueueSpecApplyConfiguration struct { - ResourceGroups []ResourceGroupApplyConfiguration `json:"resourceGroups,omitempty"` - Cohort *string `json:"cohort,omitempty"` - QueueingStrategy *kueuev1beta1.QueueingStrategy `json:"queueingStrategy,omitempty"` - NamespaceSelector *v1.LabelSelector `json:"namespaceSelector,omitempty"` - FlavorFungibility *FlavorFungibilityApplyConfiguration `json:"flavorFungibility,omitempty"` - Preemption *ClusterQueuePreemptionApplyConfiguration `json:"preemption,omitempty"` - AdmissionChecks []string `json:"admissionChecks,omitempty"` - StopPolicy *kueuev1beta1.StopPolicy `json:"stopPolicy,omitempty"` + ResourceGroups []ResourceGroupApplyConfiguration `json:"resourceGroups,omitempty"` + Cohort *string `json:"cohort,omitempty"` + QueueingStrategy *kueuev1beta1.QueueingStrategy `json:"queueingStrategy,omitempty"` + NamespaceSelector *v1.LabelSelector `json:"namespaceSelector,omitempty"` + FlavorFungibility *FlavorFungibilityApplyConfiguration `json:"flavorFungibility,omitempty"` + Preemption *ClusterQueuePreemptionApplyConfiguration `json:"preemption,omitempty"` + AdmissionChecks []string `json:"admissionChecks,omitempty"` + AdmissionChecksStrategy *AdmissionChecksStrategyApplyConfiguration `json:"admissionChecksStrategy,omitempty"` + StopPolicy *kueuev1beta1.StopPolicy `json:"stopPolicy,omitempty"` } // ClusterQueueSpecApplyConfiguration constructs an declarative configuration of the ClusterQueueSpec type for use with @@ -104,6 +105,14 @@ func (b *ClusterQueueSpecApplyConfiguration) WithAdmissionChecks(values ...strin return b } +// WithAdmissionChecksStrategy sets the AdmissionChecksStrategy field in the declarative configuration to the given value +// and returns the receiver, so that objects can be built by chaining "With" function invocations. +// If called multiple times, the AdmissionChecksStrategy field is set to the value of the last call. +func (b *ClusterQueueSpecApplyConfiguration) WithAdmissionChecksStrategy(value *AdmissionChecksStrategyApplyConfiguration) *ClusterQueueSpecApplyConfiguration { + b.AdmissionChecksStrategy = value + return b +} + // WithStopPolicy sets the StopPolicy field in the declarative configuration to the given value // and returns the receiver, so that objects can be built by chaining "With" function invocations. // If called multiple times, the StopPolicy field is set to the value of the last call. diff --git a/client-go/applyconfiguration/utils.go b/client-go/applyconfiguration/utils.go index 928d168488..8bd03f6f73 100644 --- a/client-go/applyconfiguration/utils.go +++ b/client-go/applyconfiguration/utils.go @@ -54,10 +54,14 @@ func ForKind(kind schema.GroupVersionKind) interface{} { return &kueuev1beta1.AdmissionCheckParametersReferenceApplyConfiguration{} case v1beta1.SchemeGroupVersion.WithKind("AdmissionCheckSpec"): return &kueuev1beta1.AdmissionCheckSpecApplyConfiguration{} + case v1beta1.SchemeGroupVersion.WithKind("AdmissionChecksStrategy"): + return &kueuev1beta1.AdmissionChecksStrategyApplyConfiguration{} case v1beta1.SchemeGroupVersion.WithKind("AdmissionCheckState"): return &kueuev1beta1.AdmissionCheckStateApplyConfiguration{} case v1beta1.SchemeGroupVersion.WithKind("AdmissionCheckStatus"): return &kueuev1beta1.AdmissionCheckStatusApplyConfiguration{} + case v1beta1.SchemeGroupVersion.WithKind("AdmissionCheckStrategyRule"): + return &kueuev1beta1.AdmissionCheckStrategyRuleApplyConfiguration{} case v1beta1.SchemeGroupVersion.WithKind("BorrowWithinCohort"): return &kueuev1beta1.BorrowWithinCohortApplyConfiguration{} case v1beta1.SchemeGroupVersion.WithKind("ClusterQueue"): diff --git a/config/components/crd/bases/kueue.x-k8s.io_clusterqueues.yaml b/config/components/crd/bases/kueue.x-k8s.io_clusterqueues.yaml index ba32a2fbe8..7b39187ad6 100644 --- a/config/components/crd/bases/kueue.x-k8s.io_clusterqueues.yaml +++ b/config/components/crd/bases/kueue.x-k8s.io_clusterqueues.yaml @@ -59,11 +59,42 @@ spec: description: ClusterQueueSpec defines the desired state of ClusterQueue properties: admissionChecks: - description: admissionChecks lists the AdmissionChecks required by - this ClusterQueue + description: |- + admissionChecks lists the AdmissionChecks required by this ClusterQueue. + Cannot be used along with AdmissionCheckStrategy. items: type: string type: array + admissionChecksStrategy: + description: |- + admissionCheckStrategy defines a list of strategies to determine which ResourceFlavors require AdmissionChecks. + This property cannot be used in conjunction with the 'admissionChecks' property. + properties: + admissionChecks: + description: admissionChecks is a list of strategies for AdmissionChecks + items: + description: AdmissionCheckStrategyRule defines rules for a + single AdmissionCheck + properties: + name: + description: name is an AdmissionCheck's name. + type: string + onFlavors: + description: |- + onFlavors is a list of ResourceFlavors' names that this AdmissionCheck should run for. + If empty, the AdmissionCheck will run for all workloads submitted to the ClusterQueue. + items: + description: ResourceFlavorReference is the name of the + ResourceFlavor. + maxLength: 253 + pattern: ^[a-z0-9]([-a-z0-9]*[a-z0-9])?(\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*$ + type: string + type: array + required: + - name + type: object + type: array + type: object cohort: description: |- cohort that this ClusterQueue belongs to. CQs that belong to the diff --git a/pkg/cache/cache.go b/pkg/cache/cache.go index f4b47314ae..a6ef5912e0 100644 --- a/pkg/cache/cache.go +++ b/pkg/cache/cache.go @@ -728,7 +728,7 @@ func (c *Cache) ClusterQueuesUsingAdmissionCheck(ac string) []string { var cqs []string for _, cq := range c.clusterQueues { - if cq.AdmissionChecks.Has(ac) { + if _, found := cq.AdmissionChecks[ac]; found { cqs = append(cqs, cq.Name) } } diff --git a/pkg/cache/cache_test.go b/pkg/cache/cache_test.go index 1f881d3dca..5e0a74f642 100644 --- a/pkg/cache/cache_test.go +++ b/pkg/cache/cache_test.go @@ -964,7 +964,10 @@ func TestCacheClusterQueueOperations(t *testing.T) { Preemption: defaultPreemption, AllocatableResourceGeneration: 1, FlavorFungibility: defaultFlavorFungibility, - AdmissionChecks: sets.New("check1", "check2"), + AdmissionChecks: map[string]sets.Set[string]{ + "check1": sets.New[string](), + "check2": sets.New[string](), + }, }, }, wantCohorts: map[string]sets.Set[string]{}, @@ -992,8 +995,10 @@ func TestCacheClusterQueueOperations(t *testing.T) { Preemption: defaultPreemption, AllocatableResourceGeneration: 1, FlavorFungibility: defaultFlavorFungibility, - AdmissionChecks: sets.New("check1", "check2"), - }, + AdmissionChecks: map[string]sets.Set[string]{ + "check1": sets.New[string](), + "check2": sets.New[string](), + }}, }, wantCohorts: map[string]sets.Set[string]{}, }, @@ -1021,8 +1026,10 @@ func TestCacheClusterQueueOperations(t *testing.T) { Preemption: defaultPreemption, AllocatableResourceGeneration: 1, FlavorFungibility: defaultFlavorFungibility, - AdmissionChecks: sets.New("check1", "check2"), - }, + AdmissionChecks: map[string]sets.Set[string]{ + "check1": sets.New[string](), + "check2": sets.New[string](), + }}, }, wantCohorts: map[string]sets.Set[string]{}, }, @@ -1050,8 +1057,10 @@ func TestCacheClusterQueueOperations(t *testing.T) { Preemption: defaultPreemption, AllocatableResourceGeneration: 1, FlavorFungibility: defaultFlavorFungibility, - AdmissionChecks: sets.New("check1", "check2"), - }, + AdmissionChecks: map[string]sets.Set[string]{ + "check1": sets.New[string](), + "check2": sets.New[string](), + }}, }, wantCohorts: map[string]sets.Set[string]{}, }, @@ -3385,45 +3394,61 @@ func messageOrEmpty(err error) string { } func TestClusterQueuesUsingAdmissionChecks(t *testing.T) { - admissionCheck1 := utiltesting.MakeAdmissionCheck("ac1").Obj() - admissionCheck2 := utiltesting.MakeAdmissionCheck("ac2").Obj() + checks := []*kueue.AdmissionCheck{ + utiltesting.MakeAdmissionCheck("ac1").Obj(), + utiltesting.MakeAdmissionCheck("ac2").Obj(), + utiltesting.MakeAdmissionCheck("ac3").Obj(), + } + fooCq := utiltesting.MakeClusterQueue("fooCq"). - AdmissionChecks(admissionCheck1.Name). + AdmissionChecks("ac1"). Obj() barCq := utiltesting.MakeClusterQueue("barCq").Obj() fizzCq := utiltesting.MakeClusterQueue("fizzCq"). - AdmissionChecks(admissionCheck1.Name, admissionCheck2.Name). + AdmissionChecks("ac1", "ac2"). + Obj() + strategyCq := utiltesting.MakeClusterQueue("strategyCq"). + AdmissionCheckStrategy( + *utiltesting.MakeAdmissionCheckStrategyRule("ac1").Obj(), + *utiltesting.MakeAdmissionCheckStrategyRule("ac3").Obj()). Obj() cases := map[string]struct { clusterQueues []*kueue.ClusterQueue wantInUseClusterQueueNames []string + check string }{ "single clusterQueue with check in use": { - clusterQueues: []*kueue.ClusterQueue{ - fooCq, - }, + clusterQueues: []*kueue.ClusterQueue{fooCq}, wantInUseClusterQueueNames: []string{fooCq.Name}, + check: "ac1", + }, + "single clusterQueue with AdmissionCheckStrategy in use": { + clusterQueues: []*kueue.ClusterQueue{strategyCq}, + wantInUseClusterQueueNames: []string{strategyCq.Name}, + check: "ac3", }, "single clusterQueue with no checks": { - clusterQueues: []*kueue.ClusterQueue{ - barCq, - }, + clusterQueues: []*kueue.ClusterQueue{barCq}, + check: "ac1", }, "multiple clusterQueues with checks in use": { clusterQueues: []*kueue.ClusterQueue{ fooCq, barCq, fizzCq, + strategyCq, }, - wantInUseClusterQueueNames: []string{fooCq.Name, fizzCq.Name}, + wantInUseClusterQueueNames: []string{fooCq.Name, fizzCq.Name, strategyCq.Name}, + check: "ac1", }, } for name, tc := range cases { t.Run(name, func(t *testing.T) { cache := New(utiltesting.NewFakeClient()) - cache.AddOrUpdateAdmissionCheck(admissionCheck1) - cache.AddOrUpdateAdmissionCheck(admissionCheck2) + for _, check := range checks { + cache.AddOrUpdateAdmissionCheck(check) + } for _, cq := range tc.clusterQueues { if err := cache.AddClusterQueue(context.Background(), cq); err != nil { @@ -3431,11 +3456,11 @@ func TestClusterQueuesUsingAdmissionChecks(t *testing.T) { } } - cqs := cache.ClusterQueuesUsingAdmissionCheck(admissionCheck1.Name) + cqs := cache.ClusterQueuesUsingAdmissionCheck(tc.check) if diff := cmp.Diff(tc.wantInUseClusterQueueNames, cqs, cmpopts.SortSlices(func(a, b string) bool { return a < b })); len(diff) != 0 { - t.Errorf("Unexpected flavor is in use by clusterQueues (-want,+got):\n%s", diff) + t.Errorf("Unexpected AdmissionCheck is in use by clusterQueues (-want,+got):\n%s", diff) } }) } diff --git a/pkg/cache/clusterqueue.go b/pkg/cache/clusterqueue.go index 29c857ec8c..2f700dc1c6 100644 --- a/pkg/cache/clusterqueue.go +++ b/pkg/cache/clusterqueue.go @@ -32,6 +32,7 @@ import ( kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1" "sigs.k8s.io/kueue/pkg/features" "sigs.k8s.io/kueue/pkg/metrics" + utilac "sigs.k8s.io/kueue/pkg/util/admissioncheck" "sigs.k8s.io/kueue/pkg/workload" ) @@ -62,8 +63,11 @@ type ClusterQueue struct { NamespaceSelector labels.Selector Preemption kueue.ClusterQueuePreemption FlavorFungibility kueue.FlavorFungibility - AdmissionChecks sets.Set[string] - Status metrics.ClusterQueueStatus + // Aggregates AdmissionChecks from both .spec.AdmissionChecks and .spec.AdmissionCheckStrategy + // Sets hold ResourceFlavors to which an AdmissionCheck should apply. + // In case its empty, it means an AdmissionCheck should apply to all ResourceFlavor + AdmissionChecks map[string]sets.Set[string] + Status metrics.ClusterQueueStatus // GuaranteedQuota records how much resource quota the ClusterQueue reserved // when feature LendingLimit is enabled and flavor's lendingLimit is not nil. GuaranteedQuota FlavorResourceQuantities @@ -199,7 +203,7 @@ func (c *ClusterQueue) update(in *kueue.ClusterQueue, resourceFlavors map[kueue. c.isStopped = ptr.Deref(in.Spec.StopPolicy, kueue.None) != kueue.None - c.AdmissionChecks = sets.New(in.Spec.AdmissionChecks...) + c.AdmissionChecks = utilac.NewAdmissionChecks(in) c.Usage = filterFlavorQuantities(c.Usage, in.Spec.ResourceGroups) c.AdmittedUsage = filterFlavorQuantities(c.AdmittedUsage, in.Spec.ResourceGroups) diff --git a/pkg/cache/clusterqueue_test.go b/pkg/cache/clusterqueue_test.go index 00f5e15198..581dee5a29 100644 --- a/pkg/cache/clusterqueue_test.go +++ b/pkg/cache/clusterqueue_test.go @@ -463,12 +463,20 @@ func TestClusterQueueUpdate(t *testing.T) { } func TestClusterQueueUpdateWithAdmissionCheck(t *testing.T) { - cq := utiltesting.MakeClusterQueue("cq"). + cqWithAC := utiltesting.MakeClusterQueue("cq"). AdmissionChecks("check1", "check2", "check3"). Obj() + cqWithACStrategy := utiltesting.MakeClusterQueue("cq2"). + AdmissionCheckStrategy( + *utiltesting.MakeAdmissionCheckStrategyRule("check1").Obj(), + *utiltesting.MakeAdmissionCheckStrategyRule("check2").Obj(), + *utiltesting.MakeAdmissionCheckStrategyRule("check3").Obj()). + Obj() + testcases := []struct { name string + cq *kueue.ClusterQueue cqStatus metrics.ClusterQueueStatus admissionChecks map[string]AdmissionCheck wantStatus metrics.ClusterQueueStatus @@ -476,6 +484,28 @@ func TestClusterQueueUpdateWithAdmissionCheck(t *testing.T) { }{ { name: "Pending clusterQueue updated valid AC list", + cq: cqWithAC, + cqStatus: pending, + admissionChecks: map[string]AdmissionCheck{ + "check1": { + Active: true, + Controller: "controller1", + }, + "check2": { + Active: true, + Controller: "controller2", + }, + "check3": { + Active: true, + Controller: "controller3", + }, + }, + wantStatus: active, + wantReason: "Ready", + }, + { + name: "Pending clusterQueue with an AC strategy updated valid AC list", + cq: cqWithACStrategy, cqStatus: pending, admissionChecks: map[string]AdmissionCheck{ "check1": { @@ -496,6 +526,24 @@ func TestClusterQueueUpdateWithAdmissionCheck(t *testing.T) { }, { name: "Active clusterQueue updated with not found AC", + cq: cqWithAC, + cqStatus: active, + admissionChecks: map[string]AdmissionCheck{ + "check1": { + Active: true, + Controller: "controller1", + }, + "check2": { + Active: true, + Controller: "controller2", + }, + }, + wantStatus: pending, + wantReason: "CheckNotFoundOrInactive", + }, + { + name: "Active clusterQueue with an AC strategy updated with not found AC", + cq: cqWithACStrategy, cqStatus: active, admissionChecks: map[string]AdmissionCheck{ "check1": { @@ -512,6 +560,28 @@ func TestClusterQueueUpdateWithAdmissionCheck(t *testing.T) { }, { name: "Active clusterQueue updated with inactive AC", + cq: cqWithAC, + cqStatus: active, + admissionChecks: map[string]AdmissionCheck{ + "check1": { + Active: true, + Controller: "controller1", + }, + "check2": { + Active: true, + Controller: "controller2", + }, + "check3": { + Active: false, + Controller: "controller3", + }, + }, + wantStatus: pending, + wantReason: "CheckNotFoundOrInactive", + }, + { + name: "Active clusterQueue with an AC strategy updated with inactive AC", + cq: cqWithACStrategy, cqStatus: active, admissionChecks: map[string]AdmissionCheck{ "check1": { @@ -532,6 +602,30 @@ func TestClusterQueueUpdateWithAdmissionCheck(t *testing.T) { }, { name: "Active clusterQueue updated with duplicate single instance AC Controller", + cq: cqWithAC, + cqStatus: active, + admissionChecks: map[string]AdmissionCheck{ + "check1": { + Active: true, + Controller: "controller1", + SingleInstanceInClusterQueue: true, + }, + "check2": { + Active: true, + Controller: "controller2", + }, + "check3": { + Active: true, + Controller: "controller2", + SingleInstanceInClusterQueue: true, + }, + }, + wantStatus: pending, + wantReason: "MultipleSingleInstanceControllerChecks", + }, + { + name: "Active clusterQueue with an AC strategy updated with duplicate single instance AC Controller", + cq: cqWithACStrategy, cqStatus: active, admissionChecks: map[string]AdmissionCheck{ "check1": { @@ -554,6 +648,28 @@ func TestClusterQueueUpdateWithAdmissionCheck(t *testing.T) { }, { name: "Terminating clusterQueue updated with valid AC list", + cq: cqWithAC, + cqStatus: terminating, + admissionChecks: map[string]AdmissionCheck{ + "check1": { + Active: true, + Controller: "controller1", + }, + "check2": { + Active: true, + Controller: "controller2", + }, + "check3": { + Active: true, + Controller: "controller3", + }, + }, + wantStatus: terminating, + wantReason: "Terminating", + }, + { + name: "Terminating clusterQueue with an AC strategy updated with valid AC list", + cq: cqWithACStrategy, cqStatus: terminating, admissionChecks: map[string]AdmissionCheck{ "check1": { @@ -574,6 +690,24 @@ func TestClusterQueueUpdateWithAdmissionCheck(t *testing.T) { }, { name: "Terminating clusterQueue updated with not found AC", + cq: cqWithAC, + cqStatus: terminating, + admissionChecks: map[string]AdmissionCheck{ + "check1": { + Active: true, + Controller: "controller1", + }, + "check2": { + Active: true, + Controller: "controller2", + }, + }, + wantStatus: terminating, + wantReason: "Terminating", + }, + { + name: "Terminating clusterQueue with an AC strategy updated with not found AC", + cq: cqWithACStrategy, cqStatus: terminating, admissionChecks: map[string]AdmissionCheck{ "check1": { @@ -593,7 +727,7 @@ func TestClusterQueueUpdateWithAdmissionCheck(t *testing.T) { for _, tc := range testcases { t.Run(tc.name, func(t *testing.T) { cache := New(utiltesting.NewFakeClient()) - cq, err := cache.newClusterQueue(cq) + cq, err := cache.newClusterQueue(tc.cq) if err != nil { t.Fatalf("failed to new clusterQueue %v", err) } diff --git a/pkg/cache/snapshot.go b/pkg/cache/snapshot.go index 146003242b..c5cdc3ff9e 100644 --- a/pkg/cache/snapshot.go +++ b/pkg/cache/snapshot.go @@ -143,8 +143,9 @@ func (c *ClusterQueue) snapshot() *ClusterQueue { Preemption: c.Preemption, NamespaceSelector: c.NamespaceSelector, Status: c.Status, - AdmissionChecks: c.AdmissionChecks.Clone(), + AdmissionChecks: utilmaps.DeepCopySets[string](c.AdmissionChecks), } + for fName, rUsage := range c.Usage { cc.Usage[fName] = maps.Clone(rUsage) } diff --git a/pkg/controller/core/workload_controller.go b/pkg/controller/core/workload_controller.go index 1c00783f34..0a964ce7b1 100644 --- a/pkg/controller/core/workload_controller.go +++ b/pkg/controller/core/workload_controller.go @@ -17,11 +17,14 @@ limitations under the License. package core import ( + "cmp" "context" "fmt" + "slices" "time" "github.com/go-logr/logr" + gocmp "github.com/google/go-cmp/cmp" corev1 "k8s.io/api/core/v1" nodev1 "k8s.io/api/node/v1" "k8s.io/apimachinery/pkg/api/equality" @@ -49,7 +52,8 @@ import ( "sigs.k8s.io/kueue/pkg/constants" "sigs.k8s.io/kueue/pkg/controller/core/indexer" "sigs.k8s.io/kueue/pkg/queue" - "sigs.k8s.io/kueue/pkg/util/slices" + utilac "sigs.k8s.io/kueue/pkg/util/admissioncheck" + utilslices "sigs.k8s.io/kueue/pkg/util/slices" "sigs.k8s.io/kueue/pkg/workload" ) @@ -253,17 +257,18 @@ func (r *WorkloadReconciler) reconcileCheckBasedEviction(ctx context.Context, wl } func (r *WorkloadReconciler) reconcileSyncAdmissionChecks(ctx context.Context, wl *kueue.Workload, cqName string) (bool, error) { - // because we need to react to API cluster queue events, the list of checks from a cache can lead to race conditions - queue := kueue.ClusterQueue{} - if err := r.client.Get(ctx, types.NamespacedName{Name: cqName}, &queue); err != nil { + log := ctrl.LoggerFrom(ctx) + + // because we need to react to API cluster cq events, the list of checks from a cache can lead to race conditions + cq := kueue.ClusterQueue{} + if err := r.client.Get(ctx, types.NamespacedName{Name: cqName}, &cq); err != nil { return false, err } - queueAdmissionChecks := queue.Spec.AdmissionChecks - newChecks, shouldUpdate := syncAdmissionCheckConditions(wl.Status.AdmissionChecks, queueAdmissionChecks) + admissionChecks := workload.AdmissionChecksForWorkload(log, wl, utilac.NewAdmissionChecks(&cq)) + newChecks, shouldUpdate := syncAdmissionCheckConditions(wl.Status.AdmissionChecks, admissionChecks) if shouldUpdate { - log := ctrl.LoggerFrom(ctx) - log.V(3).Info("The workload needs admission checks updates", "clusterQueue", klog.KRef("", cqName), "admissionChecks", queueAdmissionChecks) + log.V(3).Info("The workload needs admission checks updates", "clusterQueue", klog.KRef("", cqName), "admissionChecks", admissionChecks) wl.Status.AdmissionChecks = newChecks err := r.client.Status().Update(ctx, wl) return true, client.IgnoreNotFound(err) @@ -306,14 +311,14 @@ func (r *WorkloadReconciler) reconcileOnClusterQueueActiveState(ctx context.Cont return false, nil } -func syncAdmissionCheckConditions(conds []kueue.AdmissionCheckState, queueChecks []string) ([]kueue.AdmissionCheckState, bool) { - if len(queueChecks) == 0 { +func syncAdmissionCheckConditions(conds []kueue.AdmissionCheckState, admissionChecks sets.Set[string]) ([]kueue.AdmissionCheckState, bool) { + if len(admissionChecks) == 0 { return nil, len(conds) > 0 } shouldUpdate := false - currentChecks := slices.ToRefMap(conds, func(c *kueue.AdmissionCheckState) string { return c.Name }) - for _, t := range queueChecks { + currentChecks := utilslices.ToRefMap(conds, func(c *kueue.AdmissionCheckState) string { return c.Name }) + for t := range admissionChecks { if _, found := currentChecks[t]; !found { workload.SetAdmissionCheckState(&conds, kueue.AdmissionCheckState{ Name: t, @@ -324,18 +329,20 @@ func syncAdmissionCheckConditions(conds []kueue.AdmissionCheckState, queueChecks } // if the workload conditions length is bigger, then some cleanup should be done - if len(conds) > len(queueChecks) { - newConds := make([]kueue.AdmissionCheckState, 0, len(queueChecks)) - queueChecksSet := sets.New(queueChecks...) + if len(conds) > len(admissionChecks) { + newConds := make([]kueue.AdmissionCheckState, 0, len(admissionChecks)) shouldUpdate = true for i := range conds { c := &conds[i] - if queueChecksSet.Has(c.Name) { + if admissionChecks.Has(c.Name) { newConds = append(newConds, *c) } } conds = newConds } + slices.SortFunc(conds, func(state1, state2 kueue.AdmissionCheckState) int { + return cmp.Compare(state1.Name, state2.Name) + }) return conds, shouldUpdate } @@ -749,7 +756,8 @@ func (w *workloadCqHandler) Update(ctx context.Context, ev event.UpdateEvent, wq } if !newCq.DeletionTimestamp.IsZero() || - !slices.CmpNoOrder(oldCq.Spec.AdmissionChecks, newCq.Spec.AdmissionChecks) || + !utilslices.CmpNoOrder(oldCq.Spec.AdmissionChecks, newCq.Spec.AdmissionChecks) || + !gocmp.Equal(oldCq.Spec.AdmissionChecksStrategy, newCq.Spec.AdmissionChecksStrategy) || !ptr.Equal(oldCq.Spec.StopPolicy, newCq.Spec.StopPolicy) { w.queueReconcileForWorkloads(ctx, newCq.Name, wq) } diff --git a/pkg/controller/core/workload_controller_test.go b/pkg/controller/core/workload_controller_test.go index 4980e55450..1bd9f421dc 100644 --- a/pkg/controller/core/workload_controller_test.go +++ b/pkg/controller/core/workload_controller_test.go @@ -28,6 +28,7 @@ import ( "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/sets" testingclock "k8s.io/utils/clock/testing" "k8s.io/utils/ptr" "sigs.k8s.io/controller-runtime/pkg/client" @@ -285,13 +286,13 @@ func TestSyncCheckStates(t *testing.T) { for name, tc := range cases { t.Run(name, func(t *testing.T) { - gotStates, gotShouldChange := syncAdmissionCheckConditions(tc.states, tc.list) + gotStates, gotShouldChange := syncAdmissionCheckConditions(tc.states, sets.New(tc.list...)) if tc.wantChange != gotShouldChange { t.Errorf("Unexpected should change, want=%v", tc.wantChange) } - opts := []cmp.Option{} + var opts []cmp.Option if tc.ignoreTransitionTime { opts = append(opts, cmpopts.IgnoreFields(kueue.AdmissionCheckState{}, "LastTransitionTime")) } @@ -309,6 +310,7 @@ var ( kueue.Workload{}, "TypeMeta", "ObjectMeta.ResourceVersion", "Status.RequeueState.RequeueAt", ), cmpopts.IgnoreFields(metav1.Condition{}, "LastTransitionTime"), + cmpopts.IgnoreFields(kueue.AdmissionCheckState{}, "LastTransitionTime"), cmpopts.SortSlices(func(a, b metav1.Condition) bool { return a.Type < b.Type }), } ) @@ -317,11 +319,61 @@ func TestReconcile(t *testing.T) { testStartTime := time.Now() cases := map[string]struct { workload *kueue.Workload + cq *kueue.ClusterQueue + lq *kueue.LocalQueue wantWorkload *kueue.Workload wantError error wantEvents []utiltesting.EventRecord reconcilerOpts []Option }{ + "assign Admission Checks from ClusterQueue.spec.AdmissionCheckStrategy": { + workload: utiltesting.MakeWorkload("wl", "ns"). + ReserveQuota(utiltesting.MakeAdmission("cq").Assignment("cpu", "flavor1", "1").Obj()). + Queue("queue"). + Obj(), + cq: utiltesting.MakeClusterQueue("cq"). + AdmissionCheckStrategy( + *utiltesting.MakeAdmissionCheckStrategyRule("ac1", "flavor1").Obj(), + *utiltesting.MakeAdmissionCheckStrategyRule("ac2").Obj()). + Obj(), + lq: utiltesting.MakeLocalQueue("queue", "ns").ClusterQueue("cq").Obj(), + wantWorkload: utiltesting.MakeWorkload("wl", "ns"). + ReserveQuota(utiltesting.MakeAdmission("cq").Assignment("cpu", "flavor1", "1").Obj()). + Queue("queue"). + AdmissionChecks( + kueue.AdmissionCheckState{ + Name: "ac1", + State: kueue.CheckStatePending, + }, + kueue.AdmissionCheckState{ + Name: "ac2", + State: kueue.CheckStatePending, + }). + Obj(), + }, + "assign Admission Checks from ClusterQueue.spec.AdmissionChecks": { + workload: utiltesting.MakeWorkload("wl", "ns"). + ReserveQuota(utiltesting.MakeAdmission("cq").Assignment("cpu", "flavor1", "1").Obj()). + Queue("queue"). + Obj(), + cq: utiltesting.MakeClusterQueue("cq"). + AdmissionChecks("ac1", "ac2"). + Obj(), + lq: utiltesting.MakeLocalQueue("queue", "ns").ClusterQueue("cq").Obj(), + wantWorkload: utiltesting.MakeWorkload("wl", "ns"). + ReserveQuota(utiltesting.MakeAdmission("cq").Assignment("cpu", "flavor1", "1").Obj()). + Queue("queue"). + AdmissionChecks( + kueue.AdmissionCheckState{ + Name: "ac1", + State: kueue.CheckStatePending, + }, + kueue.AdmissionCheckState{ + Name: "ac2", + State: kueue.CheckStatePending, + }). + Obj(), + }, "admit": { workload: utiltesting.MakeWorkload("wl", "ns"). ReserveQuota(utiltesting.MakeAdmission("q1").Obj()). @@ -547,6 +599,18 @@ func TestReconcile(t *testing.T) { ctx, ctxCancel := context.WithCancel(ctxWithLogger) defer ctxCancel() + if tc.cq != nil { + if err := cl.Create(ctx, tc.cq); err != nil { + t.Errorf("couldn't create the cluster queue: %v", err) + } + if err := qManager.AddClusterQueue(ctx, tc.cq); err != nil { + t.Errorf("couldn't add the cluster queue to the cache: %v", err) + } + if err := qManager.AddLocalQueue(ctx, tc.lq); err != nil { + t.Errorf("couldn't add the local queue to the cache: %v", err) + } + } + _, gotError := reconciler.Reconcile(ctx, reconcile.Request{NamespacedName: client.ObjectKeyFromObject(tc.workload)}) if diff := cmp.Diff(tc.wantError, gotError); diff != "" { diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 706273fb4d..0c63db6cb8 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -277,7 +277,7 @@ func (s *Scheduler) schedule(ctx context.Context) { log.V(5).Info("Finished waiting for all admitted workloads to be in the PodsReady condition") } e.status = nominated - if err := s.admit(ctx, e, cq.AdmissionChecks); err != nil { + if err := s.admit(ctx, e, cq); err != nil { e.inadmissibleMsg = fmt.Sprintf("Failed to admit workload: %v", err) } if cq.Cohort != nil { @@ -506,7 +506,7 @@ func (s *Scheduler) validateLimitRange(ctx context.Context, wi *workload.Info) e // admit sets the admitting clusterQueue and flavors into the workload of // the entry, and asynchronously updates the object in the apiserver after // assuming it in the cache. -func (s *Scheduler) admit(ctx context.Context, e *entry, mustHaveChecks sets.Set[string]) error { +func (s *Scheduler) admit(ctx context.Context, e *entry, cq *cache.ClusterQueue) error { log := ctrl.LoggerFrom(ctx) newWorkload := e.Obj.DeepCopy() admission := &kueue.Admission{ @@ -515,7 +515,7 @@ func (s *Scheduler) admit(ctx context.Context, e *entry, mustHaveChecks sets.Set } workload.SetQuotaReservation(newWorkload, admission) - if workload.HasAllChecks(newWorkload, mustHaveChecks) { + if workload.HasAllChecks(newWorkload, workload.AdmissionChecksForWorkload(log, newWorkload, cq.AdmissionChecks)) { // sync Admitted, ignore the result since an API update is always done. _ = workload.SyncAdmittedCondition(newWorkload) } diff --git a/pkg/scheduler/scheduler_test.go b/pkg/scheduler/scheduler_test.go index 60a09d2794..95ce1874a9 100644 --- a/pkg/scheduler/scheduler_test.go +++ b/pkg/scheduler/scheduler_test.go @@ -1633,6 +1633,9 @@ func TestSchedule(t *testing.T) { if err := qManager.AddClusterQueue(ctx, &cq); err != nil { t.Fatalf("Inserting clusterQueue %s in manager: %v", cq.Name, err) } + if err := cl.Create(ctx, &cq); err != nil { + t.Errorf("couldn't create the cluster queue: %v", err) + } } scheduler := New(qManager, cqCache, cl, recorder, WithFairSharing(tc.enableFairSharing)) gotScheduled := make(map[string]kueue.Admission) diff --git a/pkg/util/admissioncheck/admissioncheck.go b/pkg/util/admissioncheck/admissioncheck.go index 66eff4da30..52b5f82857 100644 --- a/pkg/util/admissioncheck/admissioncheck.go +++ b/pkg/util/admissioncheck/admissioncheck.go @@ -24,6 +24,7 @@ import ( "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/sets" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/apiutil" @@ -154,3 +155,18 @@ func FilterProvReqAnnotations(annotations map[string]string) map[string]string { } return res } + +// NewAdmissionChecks aggregates AdmissionChecks from .spec.AdmissionChecks and .spec.AdmissionChecksStrategy +func NewAdmissionChecks(cq *kueue.ClusterQueue) map[string]sets.Set[string] { + checks := make(map[string]sets.Set[string], len(cq.Spec.AdmissionChecks)+len(cq.Spec.AdmissionChecksStrategy.AdmissionChecks)) + for _, checkName := range cq.Spec.AdmissionChecks { + checks[checkName] = sets.New[string]() + } + for _, check := range cq.Spec.AdmissionChecksStrategy.AdmissionChecks { + checks[check.Name] = sets.New[string]() + for _, flavor := range check.OnFlavors { + checks[check.Name].Insert(string(flavor)) + } + } + return checks +} diff --git a/pkg/util/maps/maps.go b/pkg/util/maps/maps.go index 675d72fa50..5eb5edaca5 100644 --- a/pkg/util/maps/maps.go +++ b/pkg/util/maps/maps.go @@ -21,6 +21,8 @@ package maps import ( "fmt" "maps" + + "k8s.io/apimachinery/pkg/util/sets" ) // Merge merges a and b while resolving the conflicts by calling commonKeyValue @@ -108,3 +110,12 @@ func FilterKeys[K comparable, V any, M ~map[K]V](m M, k []K) M { } return ret } + +// DeepCopySets create a deep copy of map[string]Set which would otherwise be referenced +func DeepCopySets[T comparable](src map[string]sets.Set[T]) map[string]sets.Set[T] { + copy := make(map[string]sets.Set[T], len(src)) + for key, set := range src { + copy[key] = set.Clone() + } + return copy +} diff --git a/pkg/util/testing/wrappers.go b/pkg/util/testing/wrappers.go index 8653eae92b..0b20098a05 100644 --- a/pkg/util/testing/wrappers.go +++ b/pkg/util/testing/wrappers.go @@ -565,6 +565,11 @@ func (c *ClusterQueueWrapper) Cohort(cohort string) *ClusterQueueWrapper { return c } +func (c *ClusterQueueWrapper) AdmissionCheckStrategy(acs ...kueue.AdmissionCheckStrategyRule) *ClusterQueueWrapper { + c.Spec.AdmissionChecksStrategy.AdmissionChecks = acs + return c +} + // ResourceGroup adds a ResourceGroup with flavors. func (c *ClusterQueueWrapper) ResourceGroup(flavors ...kueue.FlavorQuotas) *ClusterQueueWrapper { rg := kueue.ResourceGroup{ @@ -808,6 +813,31 @@ func MakeAdmissionCheck(name string) *AdmissionCheckWrapper { } } +type AdmissionCheckStrategyRuleWrapper struct { + kueue.AdmissionCheckStrategyRule +} + +func MakeAdmissionCheckStrategyRule(name string, flavors ...kueue.ResourceFlavorReference) *AdmissionCheckStrategyRuleWrapper { + if len(flavors) == 0 { + flavors = make([]kueue.ResourceFlavorReference, 0) + } + return &AdmissionCheckStrategyRuleWrapper{ + AdmissionCheckStrategyRule: kueue.AdmissionCheckStrategyRule{ + Name: name, + OnFlavors: flavors, + }, + } +} + +func (acs *AdmissionCheckStrategyRuleWrapper) OnFlavors(flavors []kueue.ResourceFlavorReference) *AdmissionCheckStrategyRuleWrapper { + acs.AdmissionCheckStrategyRule.OnFlavors = flavors + return acs +} + +func (acs *AdmissionCheckStrategyRuleWrapper) Obj() *kueue.AdmissionCheckStrategyRule { + return &acs.AdmissionCheckStrategyRule +} + func (ac *AdmissionCheckWrapper) Active(status metav1.ConditionStatus) *AdmissionCheckWrapper { apimeta.SetStatusCondition(&ac.Status.Conditions, metav1.Condition{ Type: kueue.AdmissionCheckActive, diff --git a/pkg/webhooks/clusterqueue_webhook.go b/pkg/webhooks/clusterqueue_webhook.go index 9d118e8f9b..9be05d9d38 100644 --- a/pkg/webhooks/clusterqueue_webhook.go +++ b/pkg/webhooks/clusterqueue_webhook.go @@ -102,6 +102,10 @@ func ValidateClusterQueue(cq *kueue.ClusterQueue) field.ErrorList { allErrs = append(allErrs, validateResourceGroups(cq.Spec.ResourceGroups, cq.Spec.Cohort, path.Child("resourceGroups"))...) allErrs = append(allErrs, validation.ValidateLabelSelector(cq.Spec.NamespaceSelector, validation.LabelSelectorValidationOptions{}, path.Child("namespaceSelector"))...) + allErrs = append(allErrs, validateCQAdmissionChecks(&cq.Spec, path)...) + if cq.Spec.Preemption != nil { + allErrs = append(allErrs, validatePreemption(cq.Spec.Preemption, path.Child("preemption"))...) + } return allErrs } @@ -111,6 +115,25 @@ func ValidateClusterQueueUpdate(newObj, oldObj *kueue.ClusterQueue) field.ErrorL return allErrs } +func validatePreemption(preemption *kueue.ClusterQueuePreemption, path *field.Path) field.ErrorList { + var allErrs field.ErrorList + if preemption.ReclaimWithinCohort == kueue.PreemptionPolicyNever && + preemption.BorrowWithinCohort != nil && + preemption.BorrowWithinCohort.Policy != kueue.BorrowWithinCohortPolicyNever { + allErrs = append(allErrs, field.Invalid(path, preemption, "reclaimWithinCohort=Never and borrowWithinCohort.Policy!=Never")) + } + return allErrs +} + +func validateCQAdmissionChecks(spec *kueue.ClusterQueueSpec, path *field.Path) field.ErrorList { + var allErrs field.ErrorList + if len(spec.AdmissionChecksStrategy.AdmissionChecks) != 0 && len(spec.AdmissionChecks) != 0 { + allErrs = append(allErrs, field.Invalid(path, spec, "Either AdmissionChecks or AdmissionCheckStrategy can be set, but not both")) + } + + return allErrs +} + func validateResourceGroups(resourceGroups []kueue.ResourceGroup, cohort string, path *field.Path) field.ErrorList { var allErrs field.ErrorList seenResources := sets.New[corev1.ResourceName]() diff --git a/pkg/webhooks/clusterqueue_webhook_test.go b/pkg/webhooks/clusterqueue_webhook_test.go index a8470951ad..ab6621da02 100644 --- a/pkg/webhooks/clusterqueue_webhook_test.go +++ b/pkg/webhooks/clusterqueue_webhook_test.go @@ -56,6 +56,30 @@ func TestValidateClusterQueue(t *testing.T) { field.Invalid(resourceGroupsPath.Index(0).Child("coveredResources").Index(0), "@cpu", ""), }, }, + { + name: "admissionChecks defined", + clusterQueue: testingutil.MakeClusterQueue("cluster-queue"). + AdmissionChecks("ac1"). + Obj(), + }, + { + name: "admissionCheckStrategy defined", + clusterQueue: testingutil.MakeClusterQueue("cluster-queue"). + AdmissionCheckStrategy( + *testingutil.MakeAdmissionCheckStrategyRule("ac1", "flavor1").Obj(), + ).Obj(), + }, + { + name: "both admissionChecks and admissionCheckStrategy is defined", + clusterQueue: testingutil.MakeClusterQueue("cluster-queue"). + AdmissionChecks("ac1"). + AdmissionCheckStrategy( + *testingutil.MakeAdmissionCheckStrategyRule("ac1", "flavor1").Obj(), + ).Obj(), + wantErr: field.ErrorList{ + field.Invalid(specPath, "spec", "Either AdmissionChecks or AdmissionCheckStrategy can be set, but not both"), + }, + }, { name: "in cohort", clusterQueue: testingutil.MakeClusterQueue("cluster-queue").Cohort("prod").Obj(), diff --git a/pkg/workload/workload.go b/pkg/workload/workload.go index f01133eee1..57d5b2e30f 100644 --- a/pkg/workload/workload.go +++ b/pkg/workload/workload.go @@ -22,10 +22,13 @@ import ( "maps" "strings" + "github.com/go-logr/logr" corev1 "k8s.io/api/core/v1" apimeta "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/klog/v2" "k8s.io/utils/ptr" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" @@ -36,6 +39,7 @@ import ( "sigs.k8s.io/kueue/pkg/features" "sigs.k8s.io/kueue/pkg/util/api" "sigs.k8s.io/kueue/pkg/util/limitrange" + utilmaps "sigs.k8s.io/kueue/pkg/util/maps" ) var ( @@ -543,3 +547,49 @@ func RemoveFinalizer(ctx context.Context, c client.Client, wl *kueue.Workload) e } return nil } + +// AdmissionChecksForWorkload returns AdmissionChecks that should be assigned to a specific Workload based on +// ClusterQueue configuration and ResourceFlavors +func AdmissionChecksForWorkload(log logr.Logger, wl *kueue.Workload, admissionChecks map[string]sets.Set[string]) sets.Set[string] { + // If all admissionChecks should be run for all flavors we don't need to wait for Workload's Admission to be set. + // This is also the case if admissionChecks are specified with ClusterQueue.Spec.AdmissionChecks instead of + // ClusterQueue.Spec.AdmissionCheckStrategy + allFlavors := true + for _, flavors := range admissionChecks { + if len(flavors) != 0 { + allFlavors = false + } + } + if allFlavors { + return sets.New(utilmaps.Keys(admissionChecks)...) + } + + // Kueue sets AdmissionChecks first based on ClusterQueue configuration and at this point Workload has no + // ResourceFlavors assigned, so we cannot match AdmissionChecks to ResourceFlavor. + // After Quota is reserved, another reconciliation happens and we can match AdmissionChecks to ResourceFlavors + if wl.Status.Admission == nil { + log.V(2).Info("Workload has no Admission", "Workload", klog.KObj(wl)) + return nil + } + + var assignedFlavors []kueue.ResourceFlavorReference + for _, podSet := range wl.Status.Admission.PodSetAssignments { + for _, flavor := range podSet.Flavors { + assignedFlavors = append(assignedFlavors, flavor) + } + } + + acNames := sets.New[string]() + for acName, flavors := range admissionChecks { + if len(flavors) == 0 { + acNames.Insert(acName) + continue + } + for _, fName := range assignedFlavors { + if flavors.Has(string(fName)) { + acNames.Insert(acName) + } + } + } + return acNames +} diff --git a/pkg/workload/workload_test.go b/pkg/workload/workload_test.go index c3322b4784..78a6bc0f54 100644 --- a/pkg/workload/workload_test.go +++ b/pkg/workload/workload_test.go @@ -26,11 +26,13 @@ import ( corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/utils/ptr" "sigs.k8s.io/controller-runtime/pkg/client" config "sigs.k8s.io/kueue/apis/config/v1beta1" kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1" + utilac "sigs.k8s.io/kueue/pkg/util/admissioncheck" utiltesting "sigs.k8s.io/kueue/pkg/util/testing" ) @@ -647,3 +649,71 @@ func TestResourceUsage(t *testing.T) { }) } } + +func TestAdmissionCheckStrategy(t *testing.T) { + cases := map[string]struct { + cq *kueue.ClusterQueue + wl *kueue.Workload + wantAdmissionChecks sets.Set[string] + }{ + "AdmissionCheckStrategy with a flavor": { + wl: utiltesting.MakeWorkload("wl", "ns"). + ReserveQuota(utiltesting.MakeAdmission("cq").Assignment("cpu", "flavor1", "1").Obj()). + Obj(), + cq: utiltesting.MakeClusterQueue("cq"). + AdmissionCheckStrategy(*utiltesting.MakeAdmissionCheckStrategyRule("ac1", "flavor1").Obj()). + Obj(), + wantAdmissionChecks: sets.New("ac1"), + }, + "AdmissionCheckStrategy with an unmatched flavor": { + wl: utiltesting.MakeWorkload("wl", "ns"). + ReserveQuota(utiltesting.MakeAdmission("cq").Assignment("cpu", "flavor1", "1").Obj()). + Obj(), + cq: utiltesting.MakeClusterQueue("cq"). + AdmissionCheckStrategy(*utiltesting.MakeAdmissionCheckStrategyRule("ac1", "unmatched-flavor").Obj()). + Obj(), + wantAdmissionChecks: nil, + }, + "AdmissionCheckStrategy without a flavor": { + wl: utiltesting.MakeWorkload("wl", "ns"). + ReserveQuota(utiltesting.MakeAdmission("cq").Assignment("cpu", "flavor1", "1").Obj()). + Obj(), + cq: utiltesting.MakeClusterQueue("cq"). + AdmissionCheckStrategy(*utiltesting.MakeAdmissionCheckStrategyRule("ac1").Obj()). + Obj(), + wantAdmissionChecks: sets.New("ac1"), + }, + "Two AdmissionCheckStrategies, one with flavor, one without flavor": { + wl: utiltesting.MakeWorkload("wl", "ns"). + ReserveQuota(utiltesting.MakeAdmission("cq").Assignment("cpu", "flavor1", "1").Obj()). + Obj(), + cq: utiltesting.MakeClusterQueue("cq"). + AdmissionCheckStrategy( + *utiltesting.MakeAdmissionCheckStrategyRule("ac1", "flavor1").Obj(), + *utiltesting.MakeAdmissionCheckStrategyRule("ac2").Obj()). + Obj(), + wantAdmissionChecks: sets.New("ac1", "ac2"), + }, + "Workload has no QuotaReserved": { + wl: utiltesting.MakeWorkload("wl", "ns"). + Obj(), + cq: utiltesting.MakeClusterQueue("cq"). + AdmissionCheckStrategy( + *utiltesting.MakeAdmissionCheckStrategyRule("ac1", "flavor1").Obj(), + *utiltesting.MakeAdmissionCheckStrategyRule("ac2").Obj()). + Obj(), + wantAdmissionChecks: nil, + }, + } + for name, tc := range cases { + t.Run(name, func(t *testing.T) { + _, log := utiltesting.ContextWithLog(t) + gotAdmissionChecks := AdmissionChecksForWorkload(log, tc.wl, utilac.NewAdmissionChecks(tc.cq)) + + if diff := cmp.Diff(tc.wantAdmissionChecks, gotAdmissionChecks); diff != "" { + t.Errorf("Unexpected AdmissionChecks, (want-/got+):\n%s", diff) + } + }) + + } +} diff --git a/site/static/examples/provisioning/provisioning-setup.yaml b/site/static/examples/provisioning/provisioning-setup.yaml index aa48ceffaa..5dd5d7c878 100644 --- a/site/static/examples/provisioning/provisioning-setup.yaml +++ b/site/static/examples/provisioning/provisioning-setup.yaml @@ -20,8 +20,9 @@ spec: nominalQuota: 36Gi - name: "nvidia.com/gpu" nominalQuota: 9 - admissionChecks: - - sample-prov + admissionChecksStrategy: + admissionChecks: + - name: "sample-prov" --- apiVersion: kueue.x-k8s.io/v1beta1 kind: LocalQueue diff --git a/test/integration/scheduler/workload_controller_test.go b/test/integration/scheduler/workload_controller_test.go index 4fd35ab093..5129e92c73 100644 --- a/test/integration/scheduler/workload_controller_test.go +++ b/test/integration/scheduler/workload_controller_test.go @@ -28,6 +28,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1" + "sigs.k8s.io/kueue/pkg/util/slices" "sigs.k8s.io/kueue/pkg/util/testing" "sigs.k8s.io/kueue/pkg/workload" "sigs.k8s.io/kueue/test/util" @@ -69,6 +70,123 @@ var _ = ginkgo.Describe("Workload controller with scheduler", func() { updatedCQ = kueue.ClusterQueue{} }) + ginkgo.When("the queue has admission check strategies", func() { + var ( + flavor1 *kueue.ResourceFlavor + flavor2 *kueue.ResourceFlavor + check1 *kueue.AdmissionCheck + check2 *kueue.AdmissionCheck + check3 *kueue.AdmissionCheck + reservationFlavor string = "reservation" + updatedWl kueue.Workload + flavorOnDemand string = "on-demand" + resourceGPU corev1.ResourceName = "example.com/gpu" + ) + + ginkgo.BeforeEach(func() { + flavor1 = testing.MakeResourceFlavor(flavorOnDemand).Obj() + gomega.Expect(k8sClient.Create(ctx, flavor1)).Should(gomega.Succeed()) + + flavor2 = testing.MakeResourceFlavor(reservationFlavor).Obj() + gomega.Expect(k8sClient.Create(ctx, flavor2)).Should(gomega.Succeed()) + + check1 = testing.MakeAdmissionCheck("check1").ControllerName("ctrl1").Obj() + gomega.Expect(k8sClient.Create(ctx, check1)).Should(gomega.Succeed()) + util.SetAdmissionCheckActive(ctx, k8sClient, check1, metav1.ConditionTrue) + + check2 = testing.MakeAdmissionCheck("check2").ControllerName("ctrl2").Obj() + gomega.Expect(k8sClient.Create(ctx, check2)).Should(gomega.Succeed()) + util.SetAdmissionCheckActive(ctx, k8sClient, check2, metav1.ConditionTrue) + + check3 = testing.MakeAdmissionCheck("check3").ControllerName("ctrl3").Obj() + gomega.Expect(k8sClient.Create(ctx, check3)).Should(gomega.Succeed()) + util.SetAdmissionCheckActive(ctx, k8sClient, check3, metav1.ConditionTrue) + + clusterQueue = testing.MakeClusterQueue("cluster-queue"). + AdmissionCheckStrategy( + *testing.MakeAdmissionCheckStrategyRule("check1", kueue.ResourceFlavorReference(flavorOnDemand)).Obj(), + *testing.MakeAdmissionCheckStrategyRule("check2").Obj(), + *testing.MakeAdmissionCheckStrategyRule("check3", kueue.ResourceFlavorReference(reservationFlavor)).Obj()). + ResourceGroup( + *testing.MakeFlavorQuotas(reservationFlavor).Resource(resourceGPU, "1", "1").Obj(), + *testing.MakeFlavorQuotas(flavorOnDemand).Resource(resourceGPU, "5", "5").Obj()). + Cohort("cohort"). + Obj() + gomega.Expect(k8sClient.Create(ctx, clusterQueue)).To(gomega.Succeed()) + + localQueue = testing.MakeLocalQueue("queue", ns.Name).ClusterQueue(clusterQueue.Name).Obj() + gomega.Expect(k8sClient.Create(ctx, localQueue)).To(gomega.Succeed()) + }) + + ginkgo.AfterEach(func() { + gomega.Expect(util.DeleteNamespace(ctx, k8sClient, ns)).To(gomega.Succeed()) + gomega.Expect(util.DeleteLocalQueue(ctx, k8sClient, localQueue)).To(gomega.Succeed()) + util.ExpectClusterQueueToBeDeleted(ctx, k8sClient, clusterQueue, true) + util.ExpectAdmissionCheckToBeDeleted(ctx, k8sClient, check3, true) + util.ExpectAdmissionCheckToBeDeleted(ctx, k8sClient, check2, true) + util.ExpectAdmissionCheckToBeDeleted(ctx, k8sClient, check1, true) + util.ExpectResourceFlavorToBeDeleted(ctx, k8sClient, flavor1, true) + util.ExpectResourceFlavorToBeDeleted(ctx, k8sClient, flavor2, true) + }) + + ginkgo.It("the workload should have appropriate AdditionalChecks added", func() { + wl := testing.MakeWorkload("wl", ns.Name). + Queue("queue"). + Request(resourceGPU, "3"). + Obj() + wlKey := client.ObjectKeyFromObject(wl) + + ginkgo.By("creating and waiting for workload to have a quota reservation", func() { + gomega.Expect(k8sClient.Create(ctx, wl)).To(gomega.Succeed()) + gomega.Eventually(func(g gomega.Gomega) { + g.Expect(k8sClient.Get(ctx, wlKey, &updatedWl)).Should(gomega.Succeed()) + g.Expect(workload.HasQuotaReservation(&updatedWl)).Should(gomega.BeTrue(), "should have quota reservation") + + checks := slices.Map(updatedWl.Status.AdmissionChecks, func(c *kueue.AdmissionCheckState) string { return c.Name }) + g.Expect(checks).Should(gomega.ConsistOf("check1", "check2")) + }, util.Timeout, util.Interval).Should(gomega.Succeed()) + gomega.Expect(workload.IsAdmitted(&updatedWl)).To(gomega.BeFalse()) + }) + + ginkgo.By("adding an additional admission check to the clusterqueue", func() { + createdQueue := kueue.ClusterQueue{} + queueKey := client.ObjectKeyFromObject(clusterQueue) + gomega.Expect(k8sClient.Get(ctx, queueKey, &createdQueue)).To(gomega.Succeed()) + createdQueue.Spec.AdmissionChecksStrategy.AdmissionChecks = []kueue.AdmissionCheckStrategyRule{ + *testing.MakeAdmissionCheckStrategyRule("check1", kueue.ResourceFlavorReference(flavorOnDemand)).Obj(), + *testing.MakeAdmissionCheckStrategyRule("check2", kueue.ResourceFlavorReference(reservationFlavor)).Obj(), + *testing.MakeAdmissionCheckStrategyRule("check3").Obj()} + gomega.Expect(k8sClient.Update(ctx, &createdQueue)).To(gomega.Succeed()) + + gomega.Eventually(func(g gomega.Gomega) { + g.Expect(k8sClient.Get(ctx, wlKey, &updatedWl)).To(gomega.Succeed()) + checks := slices.Map(updatedWl.Status.AdmissionChecks, func(c *kueue.AdmissionCheckState) string { return c.Name }) + g.Expect(checks).Should(gomega.ConsistOf("check1", "check3")) + }, util.Timeout, util.Interval).Should(gomega.Succeed()) + }) + + ginkgo.By("marking the checks as passed", func() { + gomega.Expect(k8sClient.Get(ctx, wlKey, &updatedWl)).To(gomega.Succeed()) + workload.SetAdmissionCheckState(&updatedWl.Status.AdmissionChecks, kueue.AdmissionCheckState{ + Name: "check1", + State: kueue.CheckStateReady, + Message: "check successfully passed", + }) + workload.SetAdmissionCheckState(&updatedWl.Status.AdmissionChecks, kueue.AdmissionCheckState{ + Name: "check3", + State: kueue.CheckStateReady, + Message: "check successfully passed", + }) + gomega.Expect(k8sClient.Status().Update(ctx, &updatedWl)).Should(gomega.Succeed()) + + gomega.Eventually(func(g gomega.Gomega) { + g.Expect(k8sClient.Get(ctx, wlKey, &updatedWl)).Should(gomega.Succeed()) + g.Expect(workload.IsAdmitted(&updatedWl)).Should(gomega.BeTrue(), "should have been admitted") + }, util.Timeout, util.Interval).Should(gomega.Succeed()) + }) + }) + }) + ginkgo.When("Workload with RuntimeClass defined", func() { ginkgo.BeforeEach(func() { gomega.Expect(k8sClient.Create(ctx, onDemandFlavor)).To(gomega.Succeed())