Skip to content

Commit

Permalink
Introduce AdmissionCheckStrategy API, change assigning AdmissionCheck…
Browse files Browse the repository at this point in the history
…s to a Workload
  • Loading branch information
PBundyra committed Apr 9, 2024
1 parent b9c346a commit c755404
Show file tree
Hide file tree
Showing 14 changed files with 471 additions and 21 deletions.
11 changes: 11 additions & 0 deletions apis/kueue/v1beta1/clusterqueue_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,9 @@ type ClusterQueueSpec struct {
// +optional
AdmissionChecks []string `json:"admissionChecks,omitempty"`

// admissionCheckStrategy is the list of AdmissionChecks that should run when a particular ResourceFlavor is used.
AdmissionCheckStrategy []AdmissionCheckStrategy `json:"admissionCheckStrategy,omitempty"`

// stopPolicy - if set to a value different from None, the ClusterQueue is considered Inactive, no new reservation being
// made.
//
Expand All @@ -112,6 +115,14 @@ type ClusterQueueSpec struct {
StopPolicy *StopPolicy `json:"stopPolicy,omitempty"`
}

type AdmissionCheckStrategy struct {
// name is an AdmissionCheck's metav1.Name
Name string `json:"name"`
// forFlavors is a list of ResourceFlavors' metav1.Names that this AdmissionCheck should run for.
// if empty the AdmissionCheck will run for all workloads submitted to the ClusterQueue.
OnFlavors []string `json:"onFlavors"`
}

type QueueingStrategy string

const (
Expand Down
27 changes: 27 additions & 0 deletions apis/kueue/v1beta1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

30 changes: 22 additions & 8 deletions client-go/applyconfiguration/kueue/v1beta1/clusterqueuespec.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions client-go/applyconfiguration/utils.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 20 additions & 0 deletions config/components/crd/bases/kueue.x-k8s.io_clusterqueues.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,26 @@ spec:
spec:
description: ClusterQueueSpec defines the desired state of ClusterQueue
properties:
admissionCheckStrategy:
description: admissionCheckStrategy is the list of AdmissionChecks
that should run when a particular ResourceFlavor is used.
items:
properties:
name:
description: name is an AdmissionCheck's metav1.Name
type: string
onFlavors:
description: |-
forFlavors is a list of ResourceFlavors' metav1.Names that this AdmissionCheck should run for.
if empty the AdmissionCheck will run for all workloads submitted to the ClusterQueue.
items:
type: string
type: array
required:
- name
- onFlavors
type: object
type: array
admissionChecks:
description: admissionChecks lists the AdmissionChecks required by
this ClusterQueue
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ require (
github.com/ray-project/kuberay/ray-operator v1.1.0
github.com/spf13/cobra v1.8.0
go.uber.org/zap v1.27.0
golang.org/x/exp v0.0.0-20230905200255-921286631fa9
k8s.io/api v0.29.3
k8s.io/apimachinery v0.29.3
k8s.io/apiserver v0.29.3
Expand Down Expand Up @@ -102,7 +103,6 @@ require (
go.uber.org/atomic v1.11.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/crypto v0.18.0 // indirect
golang.org/x/exp v0.0.0-20230905200255-921286631fa9 // indirect
golang.org/x/mod v0.14.0 // indirect
golang.org/x/net v0.20.0 // indirect
golang.org/x/oauth2 v0.12.0 // indirect
Expand Down
64 changes: 52 additions & 12 deletions pkg/controller/core/workload_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@ package core
import (
"context"
"fmt"
"slices"
"time"

"github.com/go-logr/logr"
"golang.org/x/exp/maps"
corev1 "k8s.io/api/core/v1"
nodev1 "k8s.io/api/node/v1"
"k8s.io/apimachinery/pkg/api/equality"
Expand Down Expand Up @@ -49,7 +51,7 @@ import (
"sigs.k8s.io/kueue/pkg/constants"
"sigs.k8s.io/kueue/pkg/controller/core/indexer"
"sigs.k8s.io/kueue/pkg/queue"
"sigs.k8s.io/kueue/pkg/util/slices"
utilslices "sigs.k8s.io/kueue/pkg/util/slices"
"sigs.k8s.io/kueue/pkg/workload"
)

Expand Down Expand Up @@ -221,6 +223,7 @@ func (r *WorkloadReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
case !r.queues.QueueForWorkloadExists(&wl):
log.V(3).Info("Workload is inadmissible because of missing LocalQueue", "localQueue", klog.KRef(wl.Namespace, wl.Spec.QueueName))
if workload.UnsetQuotaReservationWithCondition(&wl, "Inadmissible", fmt.Sprintf("LocalQueue %s doesn't exist", wl.Spec.QueueName)) {

err := workload.ApplyAdmissionStatus(ctx, r.client, &wl, true)
return ctrl.Result{}, client.IgnoreNotFound(err)
}
Expand Down Expand Up @@ -259,18 +262,55 @@ func (r *WorkloadReconciler) reconcileSyncAdmissionChecks(ctx context.Context, w
return false, err
}

queueAdmissionChecks := queue.Spec.AdmissionChecks
newChecks, shouldUpdate := syncAdmissionCheckConditions(wl.Status.AdmissionChecks, queueAdmissionChecks)
var admissionChecks []string
if len(queue.Spec.AdmissionChecks) == 0 {
admissionChecks = getChecksFromACStrategies(ctx, wl, &queue)
} else {
admissionChecks = queue.Spec.AdmissionChecks
}

newChecks, shouldUpdate := syncAdmissionCheckConditions(wl.Status.AdmissionChecks, admissionChecks)
if shouldUpdate {
log := ctrl.LoggerFrom(ctx)
log.V(3).Info("The workload needs admission checks updates", "clusterQueue", klog.KRef("", cqName), "admissionChecks", queueAdmissionChecks)
log.V(3).Info("The workload needs admission checks updates", "clusterQueue", klog.KRef("", cqName), "admissionChecks", admissionChecks)
wl.Status.AdmissionChecks = newChecks
err := r.client.Status().Update(ctx, wl)
return true, client.IgnoreNotFound(err)
}
return false, nil
}

// getChecksFromACStrategies return AdmissionChecks that are assigned to ResourceFlavors assigned to the Workload.
func getChecksFromACStrategies(ctx context.Context, wl *kueue.Workload, queue *kueue.ClusterQueue) []string {
log := ctrl.LoggerFrom(ctx)

// Kueue sets AdmissionChecks first based on ClusterQueue configuration and at this point Workload has no
// ResourceFlavors assigned, so we cannot match AdmissionChecks to ResourceFlavor.
// After QuotaReservation is, another reconciliation happens and we can match AdmissionChecks to ResourceFlavors
if wl.Status.Admission == nil {
log.V(2).Info("Workload has no Admission", "Workload", wl)
return []string{}
}

wlFlavors := make([]kueue.ResourceFlavorReference, 0)
for _, podSet := range wl.Status.Admission.PodSetAssignments {
wlFlavors = append(wlFlavors, maps.Values(podSet.Flavors)...)
}
// Since different resources may have the same ResourceFlavor, we need to remove duplicates
slices.Sort(wlFlavors)
wlFlavors = slices.Compact(wlFlavors)

admissionCheckNames := make([]string, 0)
for _, acStrategy := range queue.Spec.AdmissionCheckStrategy {
for _, flavor := range wlFlavors {
if len(acStrategy.OnFlavors) == 0 || slices.Contains(acStrategy.OnFlavors, string(flavor)) {
admissionCheckNames = append(admissionCheckNames, acStrategy.Name)
}
}
}
return admissionCheckNames
}

func (r *WorkloadReconciler) reconcileOnClusterQueueActiveState(ctx context.Context, wl *kueue.Workload, cqName string) (bool, error) {
queue := kueue.ClusterQueue{}
err := r.client.Get(ctx, types.NamespacedName{Name: cqName}, &queue)
Expand Down Expand Up @@ -306,14 +346,14 @@ func (r *WorkloadReconciler) reconcileOnClusterQueueActiveState(ctx context.Cont
return false, nil
}

func syncAdmissionCheckConditions(conds []kueue.AdmissionCheckState, queueChecks []string) ([]kueue.AdmissionCheckState, bool) {
if len(queueChecks) == 0 {
func syncAdmissionCheckConditions(conds []kueue.AdmissionCheckState, admissionChecks []string) ([]kueue.AdmissionCheckState, bool) {
if len(admissionChecks) == 0 {
return nil, len(conds) > 0
}

shouldUpdate := false
currentChecks := slices.ToRefMap(conds, func(c *kueue.AdmissionCheckState) string { return c.Name })
for _, t := range queueChecks {
currentChecks := utilslices.ToRefMap(conds, func(c *kueue.AdmissionCheckState) string { return c.Name })
for _, t := range admissionChecks {
if _, found := currentChecks[t]; !found {
workload.SetAdmissionCheckState(&conds, kueue.AdmissionCheckState{
Name: t,
Expand All @@ -324,9 +364,9 @@ func syncAdmissionCheckConditions(conds []kueue.AdmissionCheckState, queueChecks
}

// if the workload conditions length is bigger, then some cleanup should be done
if len(conds) > len(queueChecks) {
newConds := make([]kueue.AdmissionCheckState, 0, len(queueChecks))
queueChecksSet := sets.New(queueChecks...)
if len(conds) > len(admissionChecks) {
newConds := make([]kueue.AdmissionCheckState, 0, len(admissionChecks))
queueChecksSet := sets.New(admissionChecks...)
shouldUpdate = true
for i := range conds {
c := &conds[i]
Expand Down Expand Up @@ -749,7 +789,7 @@ func (w *workloadCqHandler) Update(ctx context.Context, ev event.UpdateEvent, wq
}

if !newCq.DeletionTimestamp.IsZero() ||
!slices.CmpNoOrder(oldCq.Spec.AdmissionChecks, newCq.Spec.AdmissionChecks) ||
!utilslices.CmpNoOrder(oldCq.Spec.AdmissionChecks, newCq.Spec.AdmissionChecks) ||
!ptr.Equal(oldCq.Spec.StopPolicy, newCq.Spec.StopPolicy) {
w.queueReconcileForWorkloads(ctx, newCq.Name, wq)
}
Expand Down
Loading

0 comments on commit c755404

Please sign in to comment.