From 18dd2fa364ddc5f4796884354b685ef6819a3294 Mon Sep 17 00:00:00 2001 From: Patryk Bundyra Date: Fri, 12 Apr 2024 13:55:56 +0000 Subject: [PATCH] Discard an API request in the scheduler change Add AdmissionCheckStrategy to ClusterQueues's cache Improve integration tests --- apis/kueue/v1beta1/clusterqueue_types.go | 3 +- apis/kueue/v1beta1/zz_generated.deepcopy.go | 2 +- .../crd/kueue.x-k8s.io_clusterqueues.yaml | 3 +- .../v1beta1/admissioncheckstrategyrule.go | 10 ++- .../bases/kueue.x-k8s.io_clusterqueues.yaml | 3 +- pkg/cache/cache.go | 5 ++ pkg/cache/cache_test.go | 46 +++++++++----- pkg/cache/clusterqueue.go | 34 +++++----- pkg/cache/clusterqueue_test.go | 55 +++++++++------- pkg/cache/snapshot.go | 1 + pkg/controller/core/workload_controller.go | 8 +-- .../core/workload_controller_test.go | 6 +- pkg/scheduler/scheduler.go | 10 +-- pkg/scheduler/scheduler_test.go | 2 +- pkg/util/testing/wrappers.go | 6 +- pkg/workload/workload.go | 14 ++--- pkg/workload/workload_test.go | 11 +++- .../provisioning/provisioning-setup.yaml | 5 +- .../scheduler/workload_controller_test.go | 63 +++++++++---------- 19 files changed, 167 insertions(+), 120 deletions(-) diff --git a/apis/kueue/v1beta1/clusterqueue_types.go b/apis/kueue/v1beta1/clusterqueue_types.go index ff2893f672..a5be5373c4 100644 --- a/apis/kueue/v1beta1/clusterqueue_types.go +++ b/apis/kueue/v1beta1/clusterqueue_types.go @@ -131,7 +131,8 @@ type AdmissionCheckStrategyRule struct { // onFlavors is a list of ResourceFlavors' names that this AdmissionCheck should run for. // If empty, the AdmissionCheck will run for all workloads submitted to the ClusterQueue. - OnFlavors []string `json:"onFlavors"` + // +optional + OnFlavors []ResourceFlavorReference `json:"onFlavors"` } type QueueingStrategy string diff --git a/apis/kueue/v1beta1/zz_generated.deepcopy.go b/apis/kueue/v1beta1/zz_generated.deepcopy.go index 70d6ad87f3..80a883d806 100644 --- a/apis/kueue/v1beta1/zz_generated.deepcopy.go +++ b/apis/kueue/v1beta1/zz_generated.deepcopy.go @@ -197,7 +197,7 @@ func (in *AdmissionCheckStrategyRule) DeepCopyInto(out *AdmissionCheckStrategyRu *out = *in if in.OnFlavors != nil { in, out := &in.OnFlavors, &out.OnFlavors - *out = make([]string, len(*in)) + *out = make([]ResourceFlavorReference, len(*in)) copy(*out, *in) } } diff --git a/charts/kueue/templates/crd/kueue.x-k8s.io_clusterqueues.yaml b/charts/kueue/templates/crd/kueue.x-k8s.io_clusterqueues.yaml index e8ee8a9994..1b578880ac 100644 --- a/charts/kueue/templates/crd/kueue.x-k8s.io_clusterqueues.yaml +++ b/charts/kueue/templates/crd/kueue.x-k8s.io_clusterqueues.yaml @@ -99,11 +99,12 @@ spec: onFlavors is a list of ResourceFlavors' names that this AdmissionCheck should run for. If empty, the AdmissionCheck will run for all workloads submitted to the ClusterQueue. items: + description: ResourceFlavorReference is the name of the + ResourceFlavor. type: string type: array required: - name - - onFlavors type: object type: array type: object diff --git a/client-go/applyconfiguration/kueue/v1beta1/admissioncheckstrategyrule.go b/client-go/applyconfiguration/kueue/v1beta1/admissioncheckstrategyrule.go index b188aa0356..84a8e6655b 100644 --- a/client-go/applyconfiguration/kueue/v1beta1/admissioncheckstrategyrule.go +++ b/client-go/applyconfiguration/kueue/v1beta1/admissioncheckstrategyrule.go @@ -17,11 +17,15 @@ limitations under the License. package v1beta1 +import ( + v1beta1 "sigs.k8s.io/kueue/apis/kueue/v1beta1" +) + // AdmissionCheckStrategyRuleApplyConfiguration represents an declarative configuration of the AdmissionCheckStrategyRule type for use // with apply. type AdmissionCheckStrategyRuleApplyConfiguration struct { - Name *string `json:"name,omitempty"` - OnFlavors []string `json:"onFlavors,omitempty"` + Name *string `json:"name,omitempty"` + OnFlavors []v1beta1.ResourceFlavorReference `json:"onFlavors,omitempty"` } // AdmissionCheckStrategyRuleApplyConfiguration constructs an declarative configuration of the AdmissionCheckStrategyRule type for use with @@ -41,7 +45,7 @@ func (b *AdmissionCheckStrategyRuleApplyConfiguration) WithName(value string) *A // WithOnFlavors adds the given value to the OnFlavors field in the declarative configuration // and returns the receiver, so that objects can be build by chaining "With" function invocations. // If called multiple times, values provided by each call will be appended to the OnFlavors field. -func (b *AdmissionCheckStrategyRuleApplyConfiguration) WithOnFlavors(values ...string) *AdmissionCheckStrategyRuleApplyConfiguration { +func (b *AdmissionCheckStrategyRuleApplyConfiguration) WithOnFlavors(values ...v1beta1.ResourceFlavorReference) *AdmissionCheckStrategyRuleApplyConfiguration { for i := range values { b.OnFlavors = append(b.OnFlavors, values[i]) } diff --git a/config/components/crd/bases/kueue.x-k8s.io_clusterqueues.yaml b/config/components/crd/bases/kueue.x-k8s.io_clusterqueues.yaml index 7b707772f3..dd83f915d8 100644 --- a/config/components/crd/bases/kueue.x-k8s.io_clusterqueues.yaml +++ b/config/components/crd/bases/kueue.x-k8s.io_clusterqueues.yaml @@ -84,11 +84,12 @@ spec: onFlavors is a list of ResourceFlavors' names that this AdmissionCheck should run for. If empty, the AdmissionCheck will run for all workloads submitted to the ClusterQueue. items: + description: ResourceFlavorReference is the name of the + ResourceFlavor. type: string type: array required: - name - - onFlavors type: object type: array type: object diff --git a/pkg/cache/cache.go b/pkg/cache/cache.go index f4b47314ae..64f25ffae5 100644 --- a/pkg/cache/cache.go +++ b/pkg/cache/cache.go @@ -731,6 +731,11 @@ func (c *Cache) ClusterQueuesUsingAdmissionCheck(ac string) []string { if cq.AdmissionChecks.Has(ac) { cqs = append(cqs, cq.Name) } + for _, check := range cq.AdmissionCheckStrategy.AdmissionChecks { + if check.Name == ac { + cqs = append(cqs, cq.Name) + } + } } return cqs } diff --git a/pkg/cache/cache_test.go b/pkg/cache/cache_test.go index 1f881d3dca..bddbd54af9 100644 --- a/pkg/cache/cache_test.go +++ b/pkg/cache/cache_test.go @@ -3385,45 +3385,61 @@ func messageOrEmpty(err error) string { } func TestClusterQueuesUsingAdmissionChecks(t *testing.T) { - admissionCheck1 := utiltesting.MakeAdmissionCheck("ac1").Obj() - admissionCheck2 := utiltesting.MakeAdmissionCheck("ac2").Obj() + checks := []*kueue.AdmissionCheck{ + utiltesting.MakeAdmissionCheck("ac1").Obj(), + utiltesting.MakeAdmissionCheck("ac2").Obj(), + utiltesting.MakeAdmissionCheck("ac3").Obj(), + } + fooCq := utiltesting.MakeClusterQueue("fooCq"). - AdmissionChecks(admissionCheck1.Name). + AdmissionChecks("ac1"). Obj() barCq := utiltesting.MakeClusterQueue("barCq").Obj() fizzCq := utiltesting.MakeClusterQueue("fizzCq"). - AdmissionChecks(admissionCheck1.Name, admissionCheck2.Name). + AdmissionChecks("ac1", "ac2"). + Obj() + strategyCq := utiltesting.MakeClusterQueue("strategyCq"). + AdmissionCheckStrategy( + *utiltesting.MakeAdmissionCheckStrategyRule("ac1").Obj(), + *utiltesting.MakeAdmissionCheckStrategyRule("ac3").Obj()). Obj() cases := map[string]struct { clusterQueues []*kueue.ClusterQueue wantInUseClusterQueueNames []string + check string }{ "single clusterQueue with check in use": { - clusterQueues: []*kueue.ClusterQueue{ - fooCq, - }, + clusterQueues: []*kueue.ClusterQueue{fooCq}, wantInUseClusterQueueNames: []string{fooCq.Name}, + check: "ac1", + }, + "single clusterQueue with AdmissionCheckStrategy in use": { + clusterQueues: []*kueue.ClusterQueue{strategyCq}, + wantInUseClusterQueueNames: []string{strategyCq.Name}, + check: "ac3", }, "single clusterQueue with no checks": { - clusterQueues: []*kueue.ClusterQueue{ - barCq, - }, + clusterQueues: []*kueue.ClusterQueue{barCq}, + check: "ac1", }, "multiple clusterQueues with checks in use": { clusterQueues: []*kueue.ClusterQueue{ fooCq, barCq, fizzCq, + strategyCq, }, - wantInUseClusterQueueNames: []string{fooCq.Name, fizzCq.Name}, + wantInUseClusterQueueNames: []string{fooCq.Name, fizzCq.Name, strategyCq.Name}, + check: "ac1", }, } for name, tc := range cases { t.Run(name, func(t *testing.T) { cache := New(utiltesting.NewFakeClient()) - cache.AddOrUpdateAdmissionCheck(admissionCheck1) - cache.AddOrUpdateAdmissionCheck(admissionCheck2) + for _, check := range checks { + cache.AddOrUpdateAdmissionCheck(check) + } for _, cq := range tc.clusterQueues { if err := cache.AddClusterQueue(context.Background(), cq); err != nil { @@ -3431,11 +3447,11 @@ func TestClusterQueuesUsingAdmissionChecks(t *testing.T) { } } - cqs := cache.ClusterQueuesUsingAdmissionCheck(admissionCheck1.Name) + cqs := cache.ClusterQueuesUsingAdmissionCheck(tc.check) if diff := cmp.Diff(tc.wantInUseClusterQueueNames, cqs, cmpopts.SortSlices(func(a, b string) bool { return a < b })); len(diff) != 0 { - t.Errorf("Unexpected flavor is in use by clusterQueues (-want,+got):\n%s", diff) + t.Errorf("Unexpected AdmissionCheck is in use by clusterQueues (-want,+got):\n%s", diff) } }) } diff --git a/pkg/cache/clusterqueue.go b/pkg/cache/clusterqueue.go index 29c857ec8c..a817830a24 100644 --- a/pkg/cache/clusterqueue.go +++ b/pkg/cache/clusterqueue.go @@ -32,6 +32,7 @@ import ( kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1" "sigs.k8s.io/kueue/pkg/features" "sigs.k8s.io/kueue/pkg/metrics" + "sigs.k8s.io/kueue/pkg/util/slices" "sigs.k8s.io/kueue/pkg/workload" ) @@ -52,18 +53,19 @@ type ResourceStats map[corev1.ResourceName]*QuotaStats // ClusterQueue is the internal implementation of kueue.ClusterQueue that // holds admitted workloads. type ClusterQueue struct { - Name string - Cohort *Cohort - ResourceGroups []ResourceGroup - RGByResource map[corev1.ResourceName]*ResourceGroup - Usage FlavorResourceQuantities - Workloads map[string]*workload.Info - WorkloadsNotReady sets.Set[string] - NamespaceSelector labels.Selector - Preemption kueue.ClusterQueuePreemption - FlavorFungibility kueue.FlavorFungibility - AdmissionChecks sets.Set[string] - Status metrics.ClusterQueueStatus + Name string + Cohort *Cohort + ResourceGroups []ResourceGroup + RGByResource map[corev1.ResourceName]*ResourceGroup + Usage FlavorResourceQuantities + Workloads map[string]*workload.Info + WorkloadsNotReady sets.Set[string] + NamespaceSelector labels.Selector + Preemption kueue.ClusterQueuePreemption + FlavorFungibility kueue.FlavorFungibility + AdmissionChecks sets.Set[string] + AdmissionCheckStrategy kueue.AdmissionChecksStrategy + Status metrics.ClusterQueueStatus // GuaranteedQuota records how much resource quota the ClusterQueue reserved // when feature LendingLimit is enabled and flavor's lendingLimit is not nil. GuaranteedQuota FlavorResourceQuantities @@ -200,6 +202,7 @@ func (c *ClusterQueue) update(in *kueue.ClusterQueue, resourceFlavors map[kueue. c.isStopped = ptr.Deref(in.Spec.StopPolicy, kueue.None) != kueue.None c.AdmissionChecks = sets.New(in.Spec.AdmissionChecks...) + c.AdmissionCheckStrategy = in.Spec.AdmissionChecksStrategy c.Usage = filterFlavorQuantities(c.Usage, in.Spec.ResourceGroups) c.AdmittedUsage = filterFlavorQuantities(c.AdmittedUsage, in.Spec.ResourceGroups) @@ -414,9 +417,12 @@ func (c *ClusterQueue) updateLabelKeys(flavors map[kueue.ResourceFlavorReference // updateWithAdmissionChecks updates a ClusterQueue based on the passed AdmissionChecks set. func (c *ClusterQueue) updateWithAdmissionChecks(checks map[string]AdmissionCheck) { hasMissing := false - checksPerController := make(map[string]int, len(c.AdmissionChecks)) singleInstanceControllers := sets.New[string]() - for acName := range c.AdmissionChecks { + queueChecks := c.AdmissionChecks.Clone() + queueChecks.Insert(slices.Map(c.AdmissionCheckStrategy.AdmissionChecks, + func(r *kueue.AdmissionCheckStrategyRule) string { return r.Name })...) + checksPerController := make(map[string]int, len(queueChecks)) + for acName := range queueChecks { if ac, found := checks[acName]; !found { hasMissing = true } else { diff --git a/pkg/cache/clusterqueue_test.go b/pkg/cache/clusterqueue_test.go index 00f5e15198..98e48f6af2 100644 --- a/pkg/cache/clusterqueue_test.go +++ b/pkg/cache/clusterqueue_test.go @@ -463,9 +463,16 @@ func TestClusterQueueUpdate(t *testing.T) { } func TestClusterQueueUpdateWithAdmissionCheck(t *testing.T) { - cq := utiltesting.MakeClusterQueue("cq"). + cqs := []*kueue.ClusterQueue{utiltesting.MakeClusterQueue("cq"). AdmissionChecks("check1", "check2", "check3"). - Obj() + Obj(), + utiltesting.MakeClusterQueue("cq2"). + AdmissionCheckStrategy( + *utiltesting.MakeAdmissionCheckStrategyRule("check1").Obj(), + *utiltesting.MakeAdmissionCheckStrategyRule("check2").Obj(), + *utiltesting.MakeAdmissionCheckStrategyRule("check3").Obj()). + Obj(), + } testcases := []struct { name string @@ -592,31 +599,33 @@ func TestClusterQueueUpdateWithAdmissionCheck(t *testing.T) { for _, tc := range testcases { t.Run(tc.name, func(t *testing.T) { - cache := New(utiltesting.NewFakeClient()) - cq, err := cache.newClusterQueue(cq) - if err != nil { - t.Fatalf("failed to new clusterQueue %v", err) - } + for _, cq := range cqs { + cache := New(utiltesting.NewFakeClient()) + cq, err := cache.newClusterQueue(cq) + if err != nil { + t.Fatalf("failed to new clusterQueue %v", err) + } - cq.Status = tc.cqStatus + cq.Status = tc.cqStatus - // Align the admission check related internals to the desired Status. - if tc.cqStatus == active { - cq.hasMultipleSingleInstanceControllersChecks = false - cq.hasMissingOrInactiveAdmissionChecks = false - } else { - cq.hasMultipleSingleInstanceControllersChecks = true - cq.hasMissingOrInactiveAdmissionChecks = true - } - cq.updateWithAdmissionChecks(tc.admissionChecks) + // Align the admission check related internals to the desired Status. + if tc.cqStatus == active { + cq.hasMultipleSingleInstanceControllersChecks = false + cq.hasMissingOrInactiveAdmissionChecks = false + } else { + cq.hasMultipleSingleInstanceControllersChecks = true + cq.hasMissingOrInactiveAdmissionChecks = true + } + cq.updateWithAdmissionChecks(tc.admissionChecks) - if cq.Status != tc.wantStatus { - t.Errorf("got different status, want: %v, got: %v", tc.wantStatus, cq.Status) - } + if cq.Status != tc.wantStatus { + t.Errorf("got different status, want: %v, got: %v", tc.wantStatus, cq.Status) + } - gotReason, _ := cq.inactiveReason() - if diff := cmp.Diff(tc.wantReason, gotReason); diff != "" { - t.Errorf("Unexpected inactiveReason (-want,+got):\n%s", diff) + gotReason, _ := cq.inactiveReason() + if diff := cmp.Diff(tc.wantReason, gotReason); diff != "" { + t.Errorf("Unexpected inactiveReason (-want,+got):\n%s", diff) + } } }) } diff --git a/pkg/cache/snapshot.go b/pkg/cache/snapshot.go index 146003242b..b33ce82d17 100644 --- a/pkg/cache/snapshot.go +++ b/pkg/cache/snapshot.go @@ -144,6 +144,7 @@ func (c *ClusterQueue) snapshot() *ClusterQueue { NamespaceSelector: c.NamespaceSelector, Status: c.Status, AdmissionChecks: c.AdmissionChecks.Clone(), + AdmissionCheckStrategy: c.AdmissionCheckStrategy, } for fName, rUsage := range c.Usage { cc.Usage[fName] = maps.Clone(rUsage) diff --git a/pkg/controller/core/workload_controller.go b/pkg/controller/core/workload_controller.go index 7044db5027..10bb92f1f1 100644 --- a/pkg/controller/core/workload_controller.go +++ b/pkg/controller/core/workload_controller.go @@ -256,13 +256,13 @@ func (r *WorkloadReconciler) reconcileCheckBasedEviction(ctx context.Context, wl func (r *WorkloadReconciler) reconcileSyncAdmissionChecks(ctx context.Context, wl *kueue.Workload, cqName string) (bool, error) { log := ctrl.LoggerFrom(ctx) - // because we need to react to API cluster queue events, the list of checks from a cache can lead to race conditions - queue := kueue.ClusterQueue{} - if err := r.client.Get(ctx, types.NamespacedName{Name: cqName}, &queue); err != nil { + // because we need to react to API cluster cq events, the list of checks from a cache can lead to race conditions + cq := kueue.ClusterQueue{} + if err := r.client.Get(ctx, types.NamespacedName{Name: cqName}, &cq); err != nil { return false, err } - admissionChecks := workload.AdmissionChecksForWorkload(&log, wl, &queue) + admissionChecks := workload.AdmissionChecksForWorkload(log, wl, sets.New(cq.Spec.AdmissionChecks...), &cq.Spec.AdmissionChecksStrategy) newChecks, shouldUpdate := syncAdmissionCheckConditions(wl.Status.AdmissionChecks, admissionChecks) if shouldUpdate { log.V(3).Info("The workload needs admission checks updates", "clusterQueue", klog.KRef("", cqName), "admissionChecks", admissionChecks) diff --git a/pkg/controller/core/workload_controller_test.go b/pkg/controller/core/workload_controller_test.go index de06693da5..118d489d97 100644 --- a/pkg/controller/core/workload_controller_test.go +++ b/pkg/controller/core/workload_controller_test.go @@ -601,13 +601,13 @@ func TestReconcile(t *testing.T) { if tc.cq != nil { if err := cl.Create(ctx, tc.cq); err != nil { - t.Error("couldn't create the cluster queue", err) + t.Errorf("couldn't create the cluster queue: %v", err) } if err := qManager.AddClusterQueue(ctx, tc.cq); err != nil { - t.Error("couldn't add the cluster queue to the cache", err) + t.Errorf("couldn't add the cluster queue to the cache: %v", err) } if err := qManager.AddLocalQueue(ctx, tc.lq); err != nil { - t.Error("couldn't add the local queue to the cache", err) + t.Errorf("couldn't add the local queue to the cache: %v", err) } } diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 332b70d5f9..223657d424 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -277,7 +277,7 @@ func (s *Scheduler) schedule(ctx context.Context) { log.V(5).Info("Finished waiting for all admitted workloads to be in the PodsReady condition") } e.status = nominated - if err := s.admit(ctx, e, cq.Name); err != nil { + if err := s.admit(ctx, e, cq); err != nil { e.inadmissibleMsg = fmt.Sprintf("Failed to admit workload: %v", err) } if cq.Cohort != nil { @@ -506,7 +506,7 @@ func (s *Scheduler) validateLimitRange(ctx context.Context, wi *workload.Info) e // admit sets the admitting clusterQueue and flavors into the workload of // the entry, and asynchronously updates the object in the apiserver after // assuming it in the cache. -func (s *Scheduler) admit(ctx context.Context, e *entry, cqName string) error { +func (s *Scheduler) admit(ctx context.Context, e *entry, cq *cache.ClusterQueue) error { log := ctrl.LoggerFrom(ctx) newWorkload := e.Obj.DeepCopy() admission := &kueue.Admission{ @@ -515,11 +515,7 @@ func (s *Scheduler) admit(ctx context.Context, e *entry, cqName string) error { } workload.SetQuotaReservation(newWorkload, admission) - queue := &kueue.ClusterQueue{} - if err := s.client.Get(ctx, types.NamespacedName{Name: cqName}, queue); err != nil { - return err - } - if workload.HasAllChecks(newWorkload, workload.AdmissionChecksForWorkload(&log, newWorkload, queue)) { + if workload.HasAllChecks(newWorkload, workload.AdmissionChecksForWorkload(log, newWorkload, cq.AdmissionChecks, &cq.AdmissionCheckStrategy)) { // sync Admitted, ignore the result since an API update is always done. _ = workload.SyncAdmittedCondition(newWorkload) } diff --git a/pkg/scheduler/scheduler_test.go b/pkg/scheduler/scheduler_test.go index 377cd31a16..5cd37226af 100644 --- a/pkg/scheduler/scheduler_test.go +++ b/pkg/scheduler/scheduler_test.go @@ -1448,7 +1448,7 @@ func TestSchedule(t *testing.T) { t.Fatalf("Inserting clusterQueue %s in manager: %v", cq.Name, err) } if err := cl.Create(ctx, &cq); err != nil { - t.Error("couldn't create the cluster queue", err) + t.Errorf("couldn't create the cluster queue: %v", err) } } scheduler := New(qManager, cqCache, cl, recorder, WithFairSharing(tc.enableFairSharing)) diff --git a/pkg/util/testing/wrappers.go b/pkg/util/testing/wrappers.go index d8f871177b..9a46cc0233 100644 --- a/pkg/util/testing/wrappers.go +++ b/pkg/util/testing/wrappers.go @@ -775,9 +775,9 @@ type AdmissionCheckStrategyRuleWrapper struct { kueue.AdmissionCheckStrategyRule } -func MakeAdmissionCheckStrategyRule(name string, flavors ...string) *AdmissionCheckStrategyRuleWrapper { +func MakeAdmissionCheckStrategyRule(name string, flavors ...kueue.ResourceFlavorReference) *AdmissionCheckStrategyRuleWrapper { if len(flavors) == 0 { - flavors = make([]string, 0) + flavors = make([]kueue.ResourceFlavorReference, 0) } return &AdmissionCheckStrategyRuleWrapper{ AdmissionCheckStrategyRule: kueue.AdmissionCheckStrategyRule{ @@ -787,7 +787,7 @@ func MakeAdmissionCheckStrategyRule(name string, flavors ...string) *AdmissionCh } } -func (acs *AdmissionCheckStrategyRuleWrapper) OnFlavors(flavors []string) *AdmissionCheckStrategyRuleWrapper { +func (acs *AdmissionCheckStrategyRuleWrapper) OnFlavors(flavors []kueue.ResourceFlavorReference) *AdmissionCheckStrategyRuleWrapper { acs.AdmissionCheckStrategyRule.OnFlavors = flavors return acs } diff --git a/pkg/workload/workload.go b/pkg/workload/workload.go index 45b0bb2c49..a9136a1ee2 100644 --- a/pkg/workload/workload.go +++ b/pkg/workload/workload.go @@ -527,9 +527,9 @@ func RemoveFinalizer(ctx context.Context, c client.Client, wl *kueue.Workload) e // AdmissionChecksForWorkload returns AdmissionChecks that should be assigned to a specific Workload based on // ClusterQueue configuration and ResourceFlavors -func AdmissionChecksForWorkload(log *logr.Logger, wl *kueue.Workload, cq *kueue.ClusterQueue) sets.Set[string] { - if len(cq.Spec.AdmissionChecks) != 0 { - return sets.New(cq.Spec.AdmissionChecks...) +func AdmissionChecksForWorkload(log logr.Logger, wl *kueue.Workload, admissionChecks sets.Set[string], admissionCheckStrategy *kueue.AdmissionChecksStrategy) sets.Set[string] { + if len(admissionChecks) != 0 { + return admissionChecks } // Kueue sets AdmissionChecks first based on ClusterQueue configuration and at this point Workload has no @@ -540,20 +540,20 @@ func AdmissionChecksForWorkload(log *logr.Logger, wl *kueue.Workload, cq *kueue. return nil } - assignedFlavors := sets.New[string]() + var assignedFlavors []kueue.ResourceFlavorReference for _, podSet := range wl.Status.Admission.PodSetAssignments { for _, flavor := range podSet.Flavors { - assignedFlavors.Insert(string(flavor)) + assignedFlavors = append(assignedFlavors, flavor) } } acNames := sets.New[string]() - for _, acStrategy := range cq.Spec.AdmissionChecksStrategy.AdmissionChecks { + for _, acStrategy := range admissionCheckStrategy.AdmissionChecks { if len(acStrategy.OnFlavors) == 0 { acNames.Insert(acStrategy.Name) continue } - for fName := range assignedFlavors { + for _, fName := range assignedFlavors { if slices.Contains(acStrategy.OnFlavors, fName) { acNames.Insert(acStrategy.Name) } diff --git a/pkg/workload/workload_test.go b/pkg/workload/workload_test.go index 5baa944506..afca5967c6 100644 --- a/pkg/workload/workload_test.go +++ b/pkg/workload/workload_test.go @@ -660,6 +660,15 @@ func TestAdmissionCheckStrategy(t *testing.T) { Obj(), wantAdmissionChecks: sets.New("ac1"), }, + "AdmissionCheckStrategy with an unmatched flavor": { + wl: utiltesting.MakeWorkload("wl", "ns"). + ReserveQuota(utiltesting.MakeAdmission("cq").Assignment("cpu", "flavor1", "1").Obj()). + Obj(), + cq: utiltesting.MakeClusterQueue("cq"). + AdmissionCheckStrategy(*utiltesting.MakeAdmissionCheckStrategyRule("ac1", "unmatched-flavor").Obj()). + Obj(), + wantAdmissionChecks: nil, + }, "AdmissionCheckStrategy without a flavor": { wl: utiltesting.MakeWorkload("wl", "ns"). ReserveQuota(utiltesting.MakeAdmission("cq").Assignment("cpu", "flavor1", "1").Obj()). @@ -694,7 +703,7 @@ func TestAdmissionCheckStrategy(t *testing.T) { for name, tc := range cases { t.Run(name, func(t *testing.T) { _, log := utiltesting.ContextWithLog(t) - gotAdmissionChecks := AdmissionChecksForWorkload(&log, tc.wl, tc.cq) + gotAdmissionChecks := AdmissionChecksForWorkload(log, tc.wl, sets.New(tc.cq.Spec.AdmissionChecks...), &tc.cq.Spec.AdmissionChecksStrategy) if diff := cmp.Diff(tc.wantAdmissionChecks, gotAdmissionChecks); diff != "" { t.Errorf("Unexpected AdmissionChecks, (want-/got+):\n%s", diff) diff --git a/site/static/examples/provisioning/provisioning-setup.yaml b/site/static/examples/provisioning/provisioning-setup.yaml index aa48ceffaa..5dd5d7c878 100644 --- a/site/static/examples/provisioning/provisioning-setup.yaml +++ b/site/static/examples/provisioning/provisioning-setup.yaml @@ -20,8 +20,9 @@ spec: nominalQuota: 36Gi - name: "nvidia.com/gpu" nominalQuota: 9 - admissionChecks: - - sample-prov + admissionChecksStrategy: + admissionChecks: + - name: "sample-prov" --- apiVersion: kueue.x-k8s.io/v1beta1 kind: LocalQueue diff --git a/test/integration/scheduler/workload_controller_test.go b/test/integration/scheduler/workload_controller_test.go index a8b69f36ec..af6e4a94ea 100644 --- a/test/integration/scheduler/workload_controller_test.go +++ b/test/integration/scheduler/workload_controller_test.go @@ -104,9 +104,9 @@ var _ = ginkgo.Describe("Workload controller with scheduler", func() { clusterQueue = testing.MakeClusterQueue("cluster-queue"). AdmissionCheckStrategy( - *testing.MakeAdmissionCheckStrategyRule("check1", flavorOnDemand).Obj(), + *testing.MakeAdmissionCheckStrategyRule("check1", kueue.ResourceFlavorReference(flavorOnDemand)).Obj(), *testing.MakeAdmissionCheckStrategyRule("check2").Obj(), - *testing.MakeAdmissionCheckStrategyRule("check3", reservationFlavor).Obj()). + *testing.MakeAdmissionCheckStrategyRule("check3", kueue.ResourceFlavorReference(reservationFlavor)).Obj()). ResourceGroup( *testing.MakeFlavorQuotas(reservationFlavor).Resource(resourceGPU, "1", "1").Obj(), *testing.MakeFlavorQuotas(flavorOnDemand).Resource(resourceGPU, "5", "5").Obj()). @@ -120,6 +120,7 @@ var _ = ginkgo.Describe("Workload controller with scheduler", func() { ginkgo.AfterEach(func() { gomega.Expect(util.DeleteNamespace(ctx, k8sClient, ns)).To(gomega.Succeed()) + gomega.Expect(util.DeleteLocalQueue(ctx, k8sClient, localQueue)).To(gomega.Succeed()) util.ExpectClusterQueueToBeDeleted(ctx, k8sClient, clusterQueue, true) util.ExpectAdmissionCheckToBeDeleted(ctx, k8sClient, check3, true) util.ExpectAdmissionCheckToBeDeleted(ctx, k8sClient, check2, true) @@ -137,45 +138,45 @@ var _ = ginkgo.Describe("Workload controller with scheduler", func() { ginkgo.By("creating and reserving quota the workload", func() { gomega.Expect(k8sClient.Create(ctx, wl)).To(gomega.Succeed()) - gomega.Eventually(func() bool { - if err := k8sClient.Get(ctx, wlKey, &updatedWl); err != nil { - return false - } - return workload.HasQuotaReservation(&updatedWl) - }, util.Timeout, util.Interval).Should(gomega.BeTrue(), "should have quota reservation") + gomega.Eventually(func(g gomega.Gomega) { + g.Expect(k8sClient.Get(ctx, wlKey, &updatedWl)).Should(gomega.Succeed()) + g.Expect(workload.HasQuotaReservation(&updatedWl)).Should(gomega.BeTrue(), "should have quota reservation") + }, util.Timeout, util.Interval).Should(gomega.Succeed()) + gomega.Expect(workload.IsAdmitted(&updatedWl)).To(gomega.BeFalse()) }) ginkgo.By("checking workload's admission checks", func() { - gomega.Eventually(func() []string { - gomega.Expect(k8sClient.Get(ctx, wlKey, &updatedWl)).To(gomega.Succeed()) - return slices.Map(updatedWl.Status.AdmissionChecks, func(c *kueue.AdmissionCheckState) string { return c.Name }) - }, util.Timeout, util.Interval).Should(gomega.ConsistOf("check1", "check2")) + gomega.Eventually(func(g gomega.Gomega) { + g.Expect(k8sClient.Get(ctx, wlKey, &updatedWl)).To(gomega.Succeed()) + checks := slices.Map(updatedWl.Status.AdmissionChecks, func(c *kueue.AdmissionCheckState) string { return c.Name }) + g.Expect(checks).Should(gomega.ConsistOf("check1", "check2")) + }, util.Timeout, util.Interval).Should(gomega.Succeed()) }) ginkgo.By("updating admission check strategies", func() { createdQueue := kueue.ClusterQueue{} queueKey := client.ObjectKeyFromObject(clusterQueue) - gomega.Eventually(func() error { - gomega.Expect(k8sClient.Get(ctx, queueKey, &createdQueue)).To(gomega.Succeed()) - + gomega.Eventually(func(g gomega.Gomega) { + g.Expect(k8sClient.Get(ctx, queueKey, &createdQueue)).To(gomega.Succeed()) createdQueue.Spec.AdmissionChecksStrategy.AdmissionChecks = []kueue.AdmissionCheckStrategyRule{ - *testing.MakeAdmissionCheckStrategyRule("check1", flavorOnDemand).Obj(), - *testing.MakeAdmissionCheckStrategyRule("check2", reservationFlavor).Obj(), + *testing.MakeAdmissionCheckStrategyRule("check1", kueue.ResourceFlavorReference(flavorOnDemand)).Obj(), + *testing.MakeAdmissionCheckStrategyRule("check2", kueue.ResourceFlavorReference(reservationFlavor)).Obj(), *testing.MakeAdmissionCheckStrategyRule("check3").Obj()} - return k8sClient.Update(ctx, &createdQueue) + g.Expect(k8sClient.Update(ctx, &createdQueue)).To(gomega.Succeed()) }, util.Timeout, util.Interval).Should(gomega.Succeed()) }) ginkgo.By("checking workload's admission checks", func() { - gomega.Eventually(func() []string { - gomega.Expect(k8sClient.Get(ctx, wlKey, &updatedWl)).To(gomega.Succeed()) - return slices.Map(updatedWl.Status.AdmissionChecks, func(c *kueue.AdmissionCheckState) string { return c.Name }) - }, util.Timeout, util.Interval).Should(gomega.ConsistOf("check1", "check3")) + gomega.Eventually(func(g gomega.Gomega) { + g.Expect(k8sClient.Get(ctx, wlKey, &updatedWl)).To(gomega.Succeed()) + checks := slices.Map(updatedWl.Status.AdmissionChecks, func(c *kueue.AdmissionCheckState) string { return c.Name }) + g.Expect(checks).Should(gomega.ConsistOf("check1", "check3")) + }, util.Timeout, util.Interval).Should(gomega.Succeed()) }) ginkgo.By("marking the checks as passed", func() { - gomega.Eventually(func() error { - gomega.Expect(k8sClient.Get(ctx, wlKey, &updatedWl)).To(gomega.Succeed()) + gomega.Eventually(func(g gomega.Gomega) { + g.Expect(k8sClient.Get(ctx, wlKey, &updatedWl)).To(gomega.Succeed()) workload.SetAdmissionCheckState(&updatedWl.Status.AdmissionChecks, kueue.AdmissionCheckState{ Name: "check1", State: kueue.CheckStateReady, @@ -186,17 +187,13 @@ var _ = ginkgo.Describe("Workload controller with scheduler", func() { State: kueue.CheckStateReady, Message: "check successfully passed", }) - return k8sClient.Status().Update(ctx, &updatedWl) + g.Expect(k8sClient.Status().Update(ctx, &updatedWl)).Should(gomega.Succeed()) }, util.Timeout, util.Interval).Should(gomega.Succeed()) - }) - ginkgo.By("admitting the workload", func() { - gomega.Eventually(func() bool { - if err := k8sClient.Get(ctx, wlKey, &updatedWl); err != nil { - return false - } - return workload.IsAdmitted(&updatedWl) - }, util.Timeout, util.Interval).Should(gomega.BeTrue(), "should have been admitted") + gomega.Eventually(func(g gomega.Gomega) { + g.Expect(k8sClient.Get(ctx, wlKey, &updatedWl)).Should(gomega.Succeed()) + g.Expect(workload.IsAdmitted(&updatedWl)).Should(gomega.BeTrue(), "should have been admitted") + }, util.Timeout, util.Interval).Should(gomega.Succeed()) }) }) })