From fecc0835fc10d9a8d4ac89d81eda3c61d21e218a Mon Sep 17 00:00:00 2001 From: Patryk Bundyra Date: Thu, 28 Mar 2024 12:39:08 +0000 Subject: [PATCH] Introduce AdmissionCheckStrategy API, change assigning AdmissionChecks to a Workload --- apis/kueue/v1beta1/clusterqueue_types.go | 11 ++ apis/kueue/v1beta1/zz_generated.deepcopy.go | 27 +++++ .../kueue/v1beta1/admissioncheckstrategy.go | 49 +++++++++ .../kueue/v1beta1/clusterqueuespec.go | 30 ++++-- client-go/applyconfiguration/utils.go | 2 + .../bases/kueue.x-k8s.io_clusterqueues.yaml | 20 ++++ go.mod | 2 +- pkg/controller/core/workload_controller.go | 64 ++++++++--- .../core/workload_controller_test.go | 59 ++++++++++ pkg/util/testing/wrappers.go | 28 +++++ site/static/examples/jobs/jobset-sample.yaml | 50 +++++++++ .../examples/provisioning/plain-prov-req.yaml | 38 +++++++ .../integration/controller/core/suite_test.go | 6 ++ .../core/workload_controller_test.go | 101 ++++++++++++++++++ 14 files changed, 466 insertions(+), 21 deletions(-) create mode 100644 client-go/applyconfiguration/kueue/v1beta1/admissioncheckstrategy.go create mode 100644 site/static/examples/jobs/jobset-sample.yaml create mode 100644 site/static/examples/provisioning/plain-prov-req.yaml diff --git a/apis/kueue/v1beta1/clusterqueue_types.go b/apis/kueue/v1beta1/clusterqueue_types.go index 830099391b..f730018733 100644 --- a/apis/kueue/v1beta1/clusterqueue_types.go +++ b/apis/kueue/v1beta1/clusterqueue_types.go @@ -97,6 +97,9 @@ type ClusterQueueSpec struct { // +optional AdmissionChecks []string `json:"admissionChecks,omitempty"` + // admissionCheckStrategy is the list of AdmissionChecks that should run when a particular ResourceFlavor is used. + AdmissionCheckStrategy []AdmissionCheckStrategy `json:"admissionCheckStrategy,omitempty"` + // stopPolicy - if set to a value different from None, the ClusterQueue is considered Inactive, no new reservation being // made. // @@ -112,6 +115,14 @@ type ClusterQueueSpec struct { StopPolicy *StopPolicy `json:"stopPolicy,omitempty"` } +type AdmissionCheckStrategy struct { + // name is an AdmissionCheck's metav1.Name + Name string `json:"name"` + // forFlavors is a list of ResourceFlavors' metav1.Names that this AdmissionCheck should run for. + // if empty the AdmissionCheck will run for all workloads submitted to the ClusterQueue. + OnFlavors []string `json:"onFlavors"` +} + type QueueingStrategy string const ( diff --git a/apis/kueue/v1beta1/zz_generated.deepcopy.go b/apis/kueue/v1beta1/zz_generated.deepcopy.go index 842c5abfd5..4d3e461097 100644 --- a/apis/kueue/v1beta1/zz_generated.deepcopy.go +++ b/apis/kueue/v1beta1/zz_generated.deepcopy.go @@ -192,6 +192,26 @@ func (in *AdmissionCheckStatus) DeepCopy() *AdmissionCheckStatus { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *AdmissionCheckStrategy) DeepCopyInto(out *AdmissionCheckStrategy) { + *out = *in + if in.OnFlavors != nil { + in, out := &in.OnFlavors, &out.OnFlavors + *out = make([]string, len(*in)) + copy(*out, *in) + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new AdmissionCheckStrategy. +func (in *AdmissionCheckStrategy) DeepCopy() *AdmissionCheckStrategy { + if in == nil { + return nil + } + out := new(AdmissionCheckStrategy) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *BorrowWithinCohort) DeepCopyInto(out *BorrowWithinCohort) { *out = *in @@ -357,6 +377,13 @@ func (in *ClusterQueueSpec) DeepCopyInto(out *ClusterQueueSpec) { *out = make([]string, len(*in)) copy(*out, *in) } + if in.AdmissionCheckStrategy != nil { + in, out := &in.AdmissionCheckStrategy, &out.AdmissionCheckStrategy + *out = make([]AdmissionCheckStrategy, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } if in.StopPolicy != nil { in, out := &in.StopPolicy, &out.StopPolicy *out = new(StopPolicy) diff --git a/client-go/applyconfiguration/kueue/v1beta1/admissioncheckstrategy.go b/client-go/applyconfiguration/kueue/v1beta1/admissioncheckstrategy.go new file mode 100644 index 0000000000..c0094362d3 --- /dev/null +++ b/client-go/applyconfiguration/kueue/v1beta1/admissioncheckstrategy.go @@ -0,0 +1,49 @@ +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +// Code generated by applyconfiguration-gen. DO NOT EDIT. + +package v1beta1 + +// AdmissionCheckStrategyApplyConfiguration represents an declarative configuration of the AdmissionCheckStrategy type for use +// with apply. +type AdmissionCheckStrategyApplyConfiguration struct { + Name *string `json:"name,omitempty"` + OnFlavors []string `json:"onFlavors,omitempty"` +} + +// AdmissionCheckStrategyApplyConfiguration constructs an declarative configuration of the AdmissionCheckStrategy type for use with +// apply. +func AdmissionCheckStrategy() *AdmissionCheckStrategyApplyConfiguration { + return &AdmissionCheckStrategyApplyConfiguration{} +} + +// WithName sets the Name field in the declarative configuration to the given value +// and returns the receiver, so that objects can be built by chaining "With" function invocations. +// If called multiple times, the Name field is set to the value of the last call. +func (b *AdmissionCheckStrategyApplyConfiguration) WithName(value string) *AdmissionCheckStrategyApplyConfiguration { + b.Name = &value + return b +} + +// WithOnFlavors adds the given value to the OnFlavors field in the declarative configuration +// and returns the receiver, so that objects can be build by chaining "With" function invocations. +// If called multiple times, values provided by each call will be appended to the OnFlavors field. +func (b *AdmissionCheckStrategyApplyConfiguration) WithOnFlavors(values ...string) *AdmissionCheckStrategyApplyConfiguration { + for i := range values { + b.OnFlavors = append(b.OnFlavors, values[i]) + } + return b +} diff --git a/client-go/applyconfiguration/kueue/v1beta1/clusterqueuespec.go b/client-go/applyconfiguration/kueue/v1beta1/clusterqueuespec.go index 564c24ab9a..14c59ec60d 100644 --- a/client-go/applyconfiguration/kueue/v1beta1/clusterqueuespec.go +++ b/client-go/applyconfiguration/kueue/v1beta1/clusterqueuespec.go @@ -25,14 +25,15 @@ import ( // ClusterQueueSpecApplyConfiguration represents an declarative configuration of the ClusterQueueSpec type for use // with apply. type ClusterQueueSpecApplyConfiguration struct { - ResourceGroups []ResourceGroupApplyConfiguration `json:"resourceGroups,omitempty"` - Cohort *string `json:"cohort,omitempty"` - QueueingStrategy *kueuev1beta1.QueueingStrategy `json:"queueingStrategy,omitempty"` - NamespaceSelector *v1.LabelSelector `json:"namespaceSelector,omitempty"` - FlavorFungibility *FlavorFungibilityApplyConfiguration `json:"flavorFungibility,omitempty"` - Preemption *ClusterQueuePreemptionApplyConfiguration `json:"preemption,omitempty"` - AdmissionChecks []string `json:"admissionChecks,omitempty"` - StopPolicy *kueuev1beta1.StopPolicy `json:"stopPolicy,omitempty"` + ResourceGroups []ResourceGroupApplyConfiguration `json:"resourceGroups,omitempty"` + Cohort *string `json:"cohort,omitempty"` + QueueingStrategy *kueuev1beta1.QueueingStrategy `json:"queueingStrategy,omitempty"` + NamespaceSelector *v1.LabelSelector `json:"namespaceSelector,omitempty"` + FlavorFungibility *FlavorFungibilityApplyConfiguration `json:"flavorFungibility,omitempty"` + Preemption *ClusterQueuePreemptionApplyConfiguration `json:"preemption,omitempty"` + AdmissionChecks []string `json:"admissionChecks,omitempty"` + AdmissionCheckStrategy []AdmissionCheckStrategyApplyConfiguration `json:"admissionCheckStrategy,omitempty"` + StopPolicy *kueuev1beta1.StopPolicy `json:"stopPolicy,omitempty"` } // ClusterQueueSpecApplyConfiguration constructs an declarative configuration of the ClusterQueueSpec type for use with @@ -104,6 +105,19 @@ func (b *ClusterQueueSpecApplyConfiguration) WithAdmissionChecks(values ...strin return b } +// WithAdmissionCheckStrategy adds the given value to the AdmissionCheckStrategy field in the declarative configuration +// and returns the receiver, so that objects can be build by chaining "With" function invocations. +// If called multiple times, values provided by each call will be appended to the AdmissionCheckStrategy field. +func (b *ClusterQueueSpecApplyConfiguration) WithAdmissionCheckStrategy(values ...*AdmissionCheckStrategyApplyConfiguration) *ClusterQueueSpecApplyConfiguration { + for i := range values { + if values[i] == nil { + panic("nil value passed to WithAdmissionCheckStrategy") + } + b.AdmissionCheckStrategy = append(b.AdmissionCheckStrategy, *values[i]) + } + return b +} + // WithStopPolicy sets the StopPolicy field in the declarative configuration to the given value // and returns the receiver, so that objects can be built by chaining "With" function invocations. // If called multiple times, the StopPolicy field is set to the value of the last call. diff --git a/client-go/applyconfiguration/utils.go b/client-go/applyconfiguration/utils.go index 928d168488..9e6fc3ab2c 100644 --- a/client-go/applyconfiguration/utils.go +++ b/client-go/applyconfiguration/utils.go @@ -58,6 +58,8 @@ func ForKind(kind schema.GroupVersionKind) interface{} { return &kueuev1beta1.AdmissionCheckStateApplyConfiguration{} case v1beta1.SchemeGroupVersion.WithKind("AdmissionCheckStatus"): return &kueuev1beta1.AdmissionCheckStatusApplyConfiguration{} + case v1beta1.SchemeGroupVersion.WithKind("AdmissionCheckStrategy"): + return &kueuev1beta1.AdmissionCheckStrategyApplyConfiguration{} case v1beta1.SchemeGroupVersion.WithKind("BorrowWithinCohort"): return &kueuev1beta1.BorrowWithinCohortApplyConfiguration{} case v1beta1.SchemeGroupVersion.WithKind("ClusterQueue"): diff --git a/config/components/crd/bases/kueue.x-k8s.io_clusterqueues.yaml b/config/components/crd/bases/kueue.x-k8s.io_clusterqueues.yaml index b98091c84f..da9db9b3e8 100644 --- a/config/components/crd/bases/kueue.x-k8s.io_clusterqueues.yaml +++ b/config/components/crd/bases/kueue.x-k8s.io_clusterqueues.yaml @@ -58,6 +58,26 @@ spec: spec: description: ClusterQueueSpec defines the desired state of ClusterQueue properties: + admissionCheckStrategy: + description: admissionCheckStrategy is the list of AdmissionChecks + that should run when a particular ResourceFlavor is used. + items: + properties: + name: + description: name is an AdmissionCheck's metav1.Name + type: string + onFlavors: + description: |- + forFlavors is a list of ResourceFlavors' metav1.Names that this AdmissionCheck should run for. + if empty the AdmissionCheck will run for all workloads submitted to the ClusterQueue. + items: + type: string + type: array + required: + - name + - onFlavors + type: object + type: array admissionChecks: description: admissionChecks lists the AdmissionChecks required by this ClusterQueue diff --git a/go.mod b/go.mod index a8124d98cb..95d367f3c7 100644 --- a/go.mod +++ b/go.mod @@ -16,6 +16,7 @@ require ( github.com/ray-project/kuberay/ray-operator v1.1.0 github.com/spf13/cobra v1.8.0 go.uber.org/zap v1.27.0 + golang.org/x/exp v0.0.0-20230905200255-921286631fa9 k8s.io/api v0.29.3 k8s.io/apimachinery v0.29.3 k8s.io/apiserver v0.29.3 @@ -102,7 +103,6 @@ require ( go.uber.org/atomic v1.11.0 // indirect go.uber.org/multierr v1.11.0 // indirect golang.org/x/crypto v0.18.0 // indirect - golang.org/x/exp v0.0.0-20230905200255-921286631fa9 // indirect golang.org/x/mod v0.14.0 // indirect golang.org/x/net v0.20.0 // indirect golang.org/x/oauth2 v0.12.0 // indirect diff --git a/pkg/controller/core/workload_controller.go b/pkg/controller/core/workload_controller.go index 1c00783f34..93761c3177 100644 --- a/pkg/controller/core/workload_controller.go +++ b/pkg/controller/core/workload_controller.go @@ -19,9 +19,11 @@ package core import ( "context" "fmt" + "slices" "time" "github.com/go-logr/logr" + "golang.org/x/exp/maps" corev1 "k8s.io/api/core/v1" nodev1 "k8s.io/api/node/v1" "k8s.io/apimachinery/pkg/api/equality" @@ -49,7 +51,7 @@ import ( "sigs.k8s.io/kueue/pkg/constants" "sigs.k8s.io/kueue/pkg/controller/core/indexer" "sigs.k8s.io/kueue/pkg/queue" - "sigs.k8s.io/kueue/pkg/util/slices" + utilslices "sigs.k8s.io/kueue/pkg/util/slices" "sigs.k8s.io/kueue/pkg/workload" ) @@ -221,6 +223,7 @@ func (r *WorkloadReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c case !r.queues.QueueForWorkloadExists(&wl): log.V(3).Info("Workload is inadmissible because of missing LocalQueue", "localQueue", klog.KRef(wl.Namespace, wl.Spec.QueueName)) if workload.UnsetQuotaReservationWithCondition(&wl, "Inadmissible", fmt.Sprintf("LocalQueue %s doesn't exist", wl.Spec.QueueName)) { + err := workload.ApplyAdmissionStatus(ctx, r.client, &wl, true) return ctrl.Result{}, client.IgnoreNotFound(err) } @@ -259,11 +262,17 @@ func (r *WorkloadReconciler) reconcileSyncAdmissionChecks(ctx context.Context, w return false, err } - queueAdmissionChecks := queue.Spec.AdmissionChecks - newChecks, shouldUpdate := syncAdmissionCheckConditions(wl.Status.AdmissionChecks, queueAdmissionChecks) + var admissionChecks []string + if len(queue.Spec.AdmissionChecks) == 0 { + admissionChecks = getChecksFromACStrategies(ctx, wl, &queue) + } else { + admissionChecks = queue.Spec.AdmissionChecks + } + + newChecks, shouldUpdate := syncAdmissionCheckConditions(wl.Status.AdmissionChecks, admissionChecks) if shouldUpdate { log := ctrl.LoggerFrom(ctx) - log.V(3).Info("The workload needs admission checks updates", "clusterQueue", klog.KRef("", cqName), "admissionChecks", queueAdmissionChecks) + log.V(3).Info("The workload needs admission checks updates", "clusterQueue", klog.KRef("", cqName), "admissionChecks", admissionChecks) wl.Status.AdmissionChecks = newChecks err := r.client.Status().Update(ctx, wl) return true, client.IgnoreNotFound(err) @@ -271,6 +280,37 @@ func (r *WorkloadReconciler) reconcileSyncAdmissionChecks(ctx context.Context, w return false, nil } +// getChecksFromACStrategies return AdmissionChecks that are assigned to ResourceFlavors assigned to the Workload. +func getChecksFromACStrategies(ctx context.Context, wl *kueue.Workload, queue *kueue.ClusterQueue) []string { + log := ctrl.LoggerFrom(ctx) + + // Kueue sets AdmissionChecks first based on ClusterQueue configuration and at this point Workload has no + // ResourceFlavors assigned, so we cannot match AdmissionChecks to ResourceFlavor. + // After QuotaReservation is, another reconciliation happens and we can match AdmissionChecks to ResourceFlavors + if wl.Status.Admission == nil { + log.V(2).Info("Workload has no Admission", "Workload", wl) + return []string{} + } + + wlFlavors := make([]kueue.ResourceFlavorReference, 0) + for _, podSet := range wl.Status.Admission.PodSetAssignments { + wlFlavors = append(wlFlavors, maps.Values(podSet.Flavors)...) + } + // Since different resources may have the same ResourceFlavor, we need to remove duplicates + slices.Sort(wlFlavors) + wlFlavors = slices.Compact(wlFlavors) + + admissionCheckNames := make([]string, 0) + for _, acStrategy := range queue.Spec.AdmissionCheckStrategy { + for _, flavor := range wlFlavors { + if len(acStrategy.OnFlavors) == 0 || slices.Contains(acStrategy.OnFlavors, string(flavor)) { + admissionCheckNames = append(admissionCheckNames, acStrategy.Name) + } + } + } + return admissionCheckNames +} + func (r *WorkloadReconciler) reconcileOnClusterQueueActiveState(ctx context.Context, wl *kueue.Workload, cqName string) (bool, error) { queue := kueue.ClusterQueue{} err := r.client.Get(ctx, types.NamespacedName{Name: cqName}, &queue) @@ -306,14 +346,14 @@ func (r *WorkloadReconciler) reconcileOnClusterQueueActiveState(ctx context.Cont return false, nil } -func syncAdmissionCheckConditions(conds []kueue.AdmissionCheckState, queueChecks []string) ([]kueue.AdmissionCheckState, bool) { - if len(queueChecks) == 0 { +func syncAdmissionCheckConditions(conds []kueue.AdmissionCheckState, admissionChecks []string) ([]kueue.AdmissionCheckState, bool) { + if len(admissionChecks) == 0 { return nil, len(conds) > 0 } shouldUpdate := false - currentChecks := slices.ToRefMap(conds, func(c *kueue.AdmissionCheckState) string { return c.Name }) - for _, t := range queueChecks { + currentChecks := utilslices.ToRefMap(conds, func(c *kueue.AdmissionCheckState) string { return c.Name }) + for _, t := range admissionChecks { if _, found := currentChecks[t]; !found { workload.SetAdmissionCheckState(&conds, kueue.AdmissionCheckState{ Name: t, @@ -324,9 +364,9 @@ func syncAdmissionCheckConditions(conds []kueue.AdmissionCheckState, queueChecks } // if the workload conditions length is bigger, then some cleanup should be done - if len(conds) > len(queueChecks) { - newConds := make([]kueue.AdmissionCheckState, 0, len(queueChecks)) - queueChecksSet := sets.New(queueChecks...) + if len(conds) > len(admissionChecks) { + newConds := make([]kueue.AdmissionCheckState, 0, len(admissionChecks)) + queueChecksSet := sets.New(admissionChecks...) shouldUpdate = true for i := range conds { c := &conds[i] @@ -749,7 +789,7 @@ func (w *workloadCqHandler) Update(ctx context.Context, ev event.UpdateEvent, wq } if !newCq.DeletionTimestamp.IsZero() || - !slices.CmpNoOrder(oldCq.Spec.AdmissionChecks, newCq.Spec.AdmissionChecks) || + !utilslices.CmpNoOrder(oldCq.Spec.AdmissionChecks, newCq.Spec.AdmissionChecks) || !ptr.Equal(oldCq.Spec.StopPolicy, newCq.Spec.StopPolicy) { w.queueReconcileForWorkloads(ctx, newCq.Name, wq) } diff --git a/pkg/controller/core/workload_controller_test.go b/pkg/controller/core/workload_controller_test.go index fb85a9633b..a905201764 100644 --- a/pkg/controller/core/workload_controller_test.go +++ b/pkg/controller/core/workload_controller_test.go @@ -585,3 +585,62 @@ func TestReconcile(t *testing.T) { }) } } + +func TestAdmissionCheckStrategy(t *testing.T) { + cases := map[string]struct { + cq *kueue.ClusterQueue + wl *kueue.Workload + wantAdmissionChecks []string + }{ + "AdmissionCheckStrategy with a flavor": { + wl: utiltesting.MakeWorkload("wl", "ns"). + Admission(utiltesting.MakeAdmission("cq").Assignment("cpu", "flavor1", "1").Obj()). + Obj(), + cq: utiltesting.MakeClusterQueue("cq"). + AdmissionCheckStrategy(*utiltesting.MakeAdmissionCheckStrategy("ac1", "flavor1").Obj()). + Obj(), + wantAdmissionChecks: []string{"ac1"}, + }, + "AdmissionCheckStrategy without a flavor": { + wl: utiltesting.MakeWorkload("wl", "ns"). + Admission(utiltesting.MakeAdmission("cq").Assignment("cpu", "flavor1", "1").Obj()). + Obj(), + cq: utiltesting.MakeClusterQueue("cq"). + AdmissionCheckStrategy(*utiltesting.MakeAdmissionCheckStrategy("ac1").Obj()). + Obj(), + wantAdmissionChecks: []string{"ac1"}, + }, + "Two AdmissionCheckStrategies, one with flavor, one without flavor": { + wl: utiltesting.MakeWorkload("wl", "ns"). + Admission(utiltesting.MakeAdmission("cq").Assignment("cpu", "flavor1", "1").Obj()). + Obj(), + cq: utiltesting.MakeClusterQueue("cq"). + AdmissionCheckStrategy( + *utiltesting.MakeAdmissionCheckStrategy("ac1", "flavor1").Obj(), + *utiltesting.MakeAdmissionCheckStrategy("ac2").Obj()). + Obj(), + wantAdmissionChecks: []string{"ac1", "ac2"}, + }, + "Workload has no Admission": { + wl: utiltesting.MakeWorkload("wl", "ns"). + Obj(), + cq: utiltesting.MakeClusterQueue("cq"). + AdmissionCheckStrategy( + *utiltesting.MakeAdmissionCheckStrategy("ac1", "flavor1").Obj(), + *utiltesting.MakeAdmissionCheckStrategy("ac2").Obj()). + Obj(), + wantAdmissionChecks: []string{}, + }, + } + for name, tc := range cases { + t.Run(name, func(t *testing.T) { + ctx := context.Background() + gotAdmissionChecks := getChecksFromACStrategies(ctx, tc.wl, tc.cq) + + if diff := cmp.Diff(tc.wantAdmissionChecks, gotAdmissionChecks); diff != "" { + t.Errorf("Unexpected AdmissionChecks, (want-/got+): %s", diff) + } + }) + + } +} diff --git a/pkg/util/testing/wrappers.go b/pkg/util/testing/wrappers.go index 84e672f198..83037c677e 100644 --- a/pkg/util/testing/wrappers.go +++ b/pkg/util/testing/wrappers.go @@ -531,6 +531,11 @@ func (c *ClusterQueueWrapper) Cohort(cohort string) *ClusterQueueWrapper { return c } +func (c *ClusterQueueWrapper) AdmissionCheckStrategy(acs ...kueue.AdmissionCheckStrategy) *ClusterQueueWrapper { + c.Spec.AdmissionCheckStrategy = acs + return c +} + // ResourceGroup adds a ResourceGroup with flavors. func (c *ClusterQueueWrapper) ResourceGroup(flavors ...kueue.FlavorQuotas) *ClusterQueueWrapper { rg := kueue.ResourceGroup{ @@ -766,6 +771,29 @@ func MakeAdmissionCheck(name string) *AdmissionCheckWrapper { } } +type AdmissionCheckStrategyWrapper struct{ kueue.AdmissionCheckStrategy } + +func MakeAdmissionCheckStrategy(name string, flavors ...string) *AdmissionCheckStrategyWrapper { + if len(flavors) == 0 { + flavors = make([]string, 0) + } + return &AdmissionCheckStrategyWrapper{ + AdmissionCheckStrategy: kueue.AdmissionCheckStrategy{ + Name: name, + OnFlavors: flavors, + }, + } +} + +func (acs *AdmissionCheckStrategyWrapper) OnFlavors(flavors []string) *AdmissionCheckStrategyWrapper { + acs.AdmissionCheckStrategy.OnFlavors = flavors + return acs +} + +func (acs *AdmissionCheckStrategyWrapper) Obj() *kueue.AdmissionCheckStrategy { + return &acs.AdmissionCheckStrategy +} + func (ac *AdmissionCheckWrapper) Active(status metav1.ConditionStatus) *AdmissionCheckWrapper { apimeta.SetStatusCondition(&ac.Status.Conditions, metav1.Condition{ Type: kueue.AdmissionCheckActive, diff --git a/site/static/examples/jobs/jobset-sample.yaml b/site/static/examples/jobs/jobset-sample.yaml new file mode 100644 index 0000000000..e652b4721c --- /dev/null +++ b/site/static/examples/jobs/jobset-sample.yaml @@ -0,0 +1,50 @@ +apiVersion: jobset.x-k8s.io/v1alpha2 +kind: JobSet +metadata: + generateName: sleep-job- + labels: + kueue.x-k8s.io/queue-name: user-queue +spec: + network: + enableDNSHostnames: false + subdomain: some-subdomain + replicatedJobs: + - name: workers + replicas: 1 + template: + spec: + parallelism: 2 + completions: 2 + backoffLimit: 0 + template: + spec: + containers: + - name: sleep + image: busybox + resources: + requests: + cpu: 1 + memory: "200Mi" + command: + - sleep + args: + - 100s + - name: driver + template: + spec: + parallelism: 1 + completions: 1 + backoffLimit: 0 + template: + spec: + containers: + - name: sleep + image: busybox + resources: + requests: + cpu: 2 + memory: "200Mi" + command: + - sleep + args: + - 100s \ No newline at end of file diff --git a/site/static/examples/provisioning/plain-prov-req.yaml b/site/static/examples/provisioning/plain-prov-req.yaml new file mode 100644 index 0000000000..f975776db1 --- /dev/null +++ b/site/static/examples/provisioning/plain-prov-req.yaml @@ -0,0 +1,38 @@ +apiVersion: v1 +kind: PodTemplate +metadata: + name: pod-template-mrd-gpu-1 + namespace: default +template: + spec: + tolerations: + - key: "nvidia.com/gpu" + operator: "Exists" + effect: "NoSchedule" + containers: + - name: pi + image: perl + command: ["/bin/sh"] + args: ["-c", "sleep 3600 && perl -Mbignum=bpi -wle \"print bpi(1000)\""] + resources: + limits: + cpu: "700m" + nvidia.com/gpu: 1 + requests: + cpu: "700m" + nvidia.com/gpu: 1 + restartPolicy: Never +--- +apiVersion: autoscaling.x-k8s.io/v1beta1 +kind: ProvisioningRequest +metadata: + generateName: provreq-mrd-gpu- + namespace: default +spec: + provisioningClassName: queued-provisioning.gke.io + parameters: + maxRunDurationSeconds: "2" + podSets: + - count: 2 + podTemplateRef: + name: pod-template-mrd-gpu-1 \ No newline at end of file diff --git a/test/integration/controller/core/suite_test.go b/test/integration/controller/core/suite_test.go index 8b14e3dc3b..8017f1e51f 100644 --- a/test/integration/controller/core/suite_test.go +++ b/test/integration/controller/core/suite_test.go @@ -29,9 +29,11 @@ import ( config "sigs.k8s.io/kueue/apis/config/v1beta1" "sigs.k8s.io/kueue/pkg/cache" + "sigs.k8s.io/kueue/pkg/constants" "sigs.k8s.io/kueue/pkg/controller/core" "sigs.k8s.io/kueue/pkg/controller/core/indexer" "sigs.k8s.io/kueue/pkg/queue" + "sigs.k8s.io/kueue/pkg/scheduler" "sigs.k8s.io/kueue/pkg/webhooks" "sigs.k8s.io/kueue/test/integration/framework" // +kubebuilder:scaffold:imports @@ -77,4 +79,8 @@ func managerSetup(mgr manager.Manager, ctx context.Context) { failedCtrl, err := core.SetupControllers(mgr, queues, cCache, controllersCfg) gomega.Expect(err).ToNot(gomega.HaveOccurred(), "controller", failedCtrl) + + sched := scheduler.New(queues, cCache, mgr.GetClient(), mgr.GetEventRecorderFor(constants.AdmissionName)) + err = sched.Start(ctx) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) } diff --git a/test/integration/controller/core/workload_controller_test.go b/test/integration/controller/core/workload_controller_test.go index b881ddef32..05acf6418a 100644 --- a/test/integration/controller/core/workload_controller_test.go +++ b/test/integration/controller/core/workload_controller_test.go @@ -375,6 +375,107 @@ var _ = ginkgo.Describe("Workload controller", ginkgo.Ordered, ginkgo.ContinueOn }) }) + ginkgo.When("the queue has admission check strategies", func() { + var ( + flavor1 *kueue.ResourceFlavor + flavor2 *kueue.ResourceFlavor + check1 *kueue.AdmissionCheck + check2 *kueue.AdmissionCheck + check3 *kueue.AdmissionCheck + reservationFlavor string = "reservation" + ) + + ginkgo.BeforeEach(func() { + flavor1 = testing.MakeResourceFlavor(flavorOnDemand).Obj() + gomega.Expect(k8sClient.Create(ctx, flavor1)).Should(gomega.Succeed()) + + flavor2 = testing.MakeResourceFlavor(reservationFlavor).Obj() + gomega.Expect(k8sClient.Create(ctx, flavor2)).Should(gomega.Succeed()) + + check1 = testing.MakeAdmissionCheck("check1").ControllerName("ctrl1").Obj() + gomega.Expect(k8sClient.Create(ctx, check1)).Should(gomega.Succeed()) + util.SetAdmissionCheckActive(ctx, k8sClient, check1, metav1.ConditionTrue) + + check2 = testing.MakeAdmissionCheck("check2").ControllerName("ctrl2").Obj() + gomega.Expect(k8sClient.Create(ctx, check2)).Should(gomega.Succeed()) + util.SetAdmissionCheckActive(ctx, k8sClient, check2, metav1.ConditionTrue) + + check3 = testing.MakeAdmissionCheck("check3").ControllerName("ctrl3").Obj() + gomega.Expect(k8sClient.Create(ctx, check3)).Should(gomega.Succeed()) + util.SetAdmissionCheckActive(ctx, k8sClient, check3, metav1.ConditionTrue) + + clusterQueue = testing.MakeClusterQueue("cluster-queue"). + AdmissionCheckStrategy( + *testing.MakeAdmissionCheckStrategy("check1", flavorOnDemand).Obj(), + *testing.MakeAdmissionCheckStrategy("check2").Obj(), + *testing.MakeAdmissionCheckStrategy("check3", reservationFlavor).Obj()). + ResourceGroup( + *testing.MakeFlavorQuotas(reservationFlavor).Resource(resourceGPU, "1", "1").Obj(), + *testing.MakeFlavorQuotas(flavorOnDemand).Resource(resourceGPU, "5", "5").Obj()). + Cohort("cohort"). + Obj() + gomega.Expect(k8sClient.Create(ctx, clusterQueue)).To(gomega.Succeed()) + + localQueue = testing.MakeLocalQueue("queue", ns.Name).ClusterQueue(clusterQueue.Name).Obj() + gomega.Expect(k8sClient.Create(ctx, localQueue)).To(gomega.Succeed()) + }) + + ginkgo.AfterEach(func() { + gomega.Expect(util.DeleteNamespace(ctx, k8sClient, ns)).To(gomega.Succeed()) + util.ExpectClusterQueueToBeDeleted(ctx, k8sClient, clusterQueue, true) + util.ExpectAdmissionCheckToBeDeleted(ctx, k8sClient, check3, true) + util.ExpectAdmissionCheckToBeDeleted(ctx, k8sClient, check2, true) + util.ExpectAdmissionCheckToBeDeleted(ctx, k8sClient, check1, true) + util.ExpectResourceFlavorToBeDeleted(ctx, k8sClient, flavor1, true) + util.ExpectResourceFlavorToBeDeleted(ctx, k8sClient, flavor2, true) + }) + + ginkgo.FIt("the workload should have appropriate AdditionalChecks added", func() { + wl := testing.MakeWorkload("wl", ns.Name). + Queue("queue"). + Request(resourceGPU, "3"). + Obj() + wlKey := client.ObjectKeyFromObject(wl) + + ginkgo.By("creating and admitting the workload", func() { + gomega.Expect(k8sClient.Create(ctx, wl)).To(gomega.Succeed()) + gomega.Eventually(func() bool { + if err := k8sClient.Get(ctx, client.ObjectKeyFromObject(wl), &updatedQueueWorkload); err != nil { + return false + } + return workload.HasQuotaReservation(&updatedQueueWorkload) + }, util.Timeout, util.Interval).Should(gomega.BeTrue()) + }) + + ginkgo.By("checking workload's admission checks", func() { + gomega.Eventually(func() []string { + gomega.Expect(k8sClient.Get(ctx, wlKey, &updatedQueueWorkload)).To(gomega.Succeed()) + return slices.Map(updatedQueueWorkload.Status.AdmissionChecks, func(c *kueue.AdmissionCheckState) string { return c.Name }) + }, util.Timeout, util.Interval).Should(gomega.ConsistOf("check1", "check2")) + }) + + ginkgo.By("updating admission check strategies", func() { + createdQueue := kueue.ClusterQueue{} + queueKey := client.ObjectKeyFromObject(clusterQueue) + gomega.Eventually(func() error { + gomega.Expect(k8sClient.Get(ctx, queueKey, &createdQueue)).To(gomega.Succeed()) + createdQueue.Spec.AdmissionCheckStrategy = []kueue.AdmissionCheckStrategy{ + *testing.MakeAdmissionCheckStrategy("check1", flavorOnDemand).Obj(), + *testing.MakeAdmissionCheckStrategy("check2", reservationFlavor).Obj(), + *testing.MakeAdmissionCheckStrategy("check3").Obj()} + return k8sClient.Update(ctx, &createdQueue) + }, util.Timeout, util.Interval).Should(gomega.Succeed()) + }) + + ginkgo.By("checking workload's admission checks", func() { + gomega.Eventually(func() []string { + gomega.Expect(k8sClient.Get(ctx, wlKey, &updatedQueueWorkload)).To(gomega.Succeed()) + return slices.Map(updatedQueueWorkload.Status.AdmissionChecks, func(c *kueue.AdmissionCheckState) string { return c.Name }) + }, util.Timeout, util.Interval).Should(gomega.ConsistOf("check1", "check3")) + }) + }) + }) + ginkgo.When("changing the priority value of PriorityClass doesn't affect the priority of the workload", func() { ginkgo.BeforeEach(func() { workloadPriorityClass = testing.MakeWorkloadPriorityClass("workload-priority-class").PriorityValue(200).Obj()