Skip to content

Commit

Permalink
Discard an API request in the scheduler change
Browse files Browse the repository at this point in the history
    Add AdmissionCheckStrategy to ClusterQueues's cache

    Improve integration tests
  • Loading branch information
PBundyra committed Apr 12, 2024
1 parent d98771c commit 18dd2fa
Show file tree
Hide file tree
Showing 19 changed files with 167 additions and 120 deletions.
3 changes: 2 additions & 1 deletion apis/kueue/v1beta1/clusterqueue_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,8 @@ type AdmissionCheckStrategyRule struct {

// onFlavors is a list of ResourceFlavors' names that this AdmissionCheck should run for.
// If empty, the AdmissionCheck will run for all workloads submitted to the ClusterQueue.
OnFlavors []string `json:"onFlavors"`
// +optional
OnFlavors []ResourceFlavorReference `json:"onFlavors"`
}

type QueueingStrategy string
Expand Down
2 changes: 1 addition & 1 deletion apis/kueue/v1beta1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion charts/kueue/templates/crd/kueue.x-k8s.io_clusterqueues.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -99,11 +99,12 @@ spec:
onFlavors is a list of ResourceFlavors' names that this AdmissionCheck should run for.
If empty, the AdmissionCheck will run for all workloads submitted to the ClusterQueue.
items:
description: ResourceFlavorReference is the name of the
ResourceFlavor.
type: string
type: array
required:
- name
- onFlavors
type: object
type: array
type: object
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -84,11 +84,12 @@ spec:
onFlavors is a list of ResourceFlavors' names that this AdmissionCheck should run for.
If empty, the AdmissionCheck will run for all workloads submitted to the ClusterQueue.
items:
description: ResourceFlavorReference is the name of the
ResourceFlavor.
type: string
type: array
required:
- name
- onFlavors
type: object
type: array
type: object
Expand Down
5 changes: 5 additions & 0 deletions pkg/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -731,6 +731,11 @@ func (c *Cache) ClusterQueuesUsingAdmissionCheck(ac string) []string {
if cq.AdmissionChecks.Has(ac) {
cqs = append(cqs, cq.Name)
}
for _, check := range cq.AdmissionCheckStrategy.AdmissionChecks {
if check.Name == ac {
cqs = append(cqs, cq.Name)
}
}
}
return cqs
}
Expand Down
46 changes: 31 additions & 15 deletions pkg/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3385,57 +3385,73 @@ func messageOrEmpty(err error) string {
}

func TestClusterQueuesUsingAdmissionChecks(t *testing.T) {
admissionCheck1 := utiltesting.MakeAdmissionCheck("ac1").Obj()
admissionCheck2 := utiltesting.MakeAdmissionCheck("ac2").Obj()
checks := []*kueue.AdmissionCheck{
utiltesting.MakeAdmissionCheck("ac1").Obj(),
utiltesting.MakeAdmissionCheck("ac2").Obj(),
utiltesting.MakeAdmissionCheck("ac3").Obj(),
}

fooCq := utiltesting.MakeClusterQueue("fooCq").
AdmissionChecks(admissionCheck1.Name).
AdmissionChecks("ac1").
Obj()
barCq := utiltesting.MakeClusterQueue("barCq").Obj()
fizzCq := utiltesting.MakeClusterQueue("fizzCq").
AdmissionChecks(admissionCheck1.Name, admissionCheck2.Name).
AdmissionChecks("ac1", "ac2").
Obj()
strategyCq := utiltesting.MakeClusterQueue("strategyCq").
AdmissionCheckStrategy(
*utiltesting.MakeAdmissionCheckStrategyRule("ac1").Obj(),
*utiltesting.MakeAdmissionCheckStrategyRule("ac3").Obj()).
Obj()

cases := map[string]struct {
clusterQueues []*kueue.ClusterQueue
wantInUseClusterQueueNames []string
check string
}{
"single clusterQueue with check in use": {
clusterQueues: []*kueue.ClusterQueue{
fooCq,
},
clusterQueues: []*kueue.ClusterQueue{fooCq},
wantInUseClusterQueueNames: []string{fooCq.Name},
check: "ac1",
},
"single clusterQueue with AdmissionCheckStrategy in use": {
clusterQueues: []*kueue.ClusterQueue{strategyCq},
wantInUseClusterQueueNames: []string{strategyCq.Name},
check: "ac3",
},
"single clusterQueue with no checks": {
clusterQueues: []*kueue.ClusterQueue{
barCq,
},
clusterQueues: []*kueue.ClusterQueue{barCq},
check: "ac1",
},
"multiple clusterQueues with checks in use": {
clusterQueues: []*kueue.ClusterQueue{
fooCq,
barCq,
fizzCq,
strategyCq,
},
wantInUseClusterQueueNames: []string{fooCq.Name, fizzCq.Name},
wantInUseClusterQueueNames: []string{fooCq.Name, fizzCq.Name, strategyCq.Name},
check: "ac1",
},
}
for name, tc := range cases {
t.Run(name, func(t *testing.T) {
cache := New(utiltesting.NewFakeClient())
cache.AddOrUpdateAdmissionCheck(admissionCheck1)
cache.AddOrUpdateAdmissionCheck(admissionCheck2)
for _, check := range checks {
cache.AddOrUpdateAdmissionCheck(check)
}

for _, cq := range tc.clusterQueues {
if err := cache.AddClusterQueue(context.Background(), cq); err != nil {
t.Errorf("failed to add clusterQueue %s", cq.Name)
}
}

cqs := cache.ClusterQueuesUsingAdmissionCheck(admissionCheck1.Name)
cqs := cache.ClusterQueuesUsingAdmissionCheck(tc.check)
if diff := cmp.Diff(tc.wantInUseClusterQueueNames, cqs, cmpopts.SortSlices(func(a, b string) bool {
return a < b
})); len(diff) != 0 {
t.Errorf("Unexpected flavor is in use by clusterQueues (-want,+got):\n%s", diff)
t.Errorf("Unexpected AdmissionCheck is in use by clusterQueues (-want,+got):\n%s", diff)
}
})
}
Expand Down
34 changes: 20 additions & 14 deletions pkg/cache/clusterqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
"sigs.k8s.io/kueue/pkg/features"
"sigs.k8s.io/kueue/pkg/metrics"
"sigs.k8s.io/kueue/pkg/util/slices"
"sigs.k8s.io/kueue/pkg/workload"
)

Expand All @@ -52,18 +53,19 @@ type ResourceStats map[corev1.ResourceName]*QuotaStats
// ClusterQueue is the internal implementation of kueue.ClusterQueue that
// holds admitted workloads.
type ClusterQueue struct {
Name string
Cohort *Cohort
ResourceGroups []ResourceGroup
RGByResource map[corev1.ResourceName]*ResourceGroup
Usage FlavorResourceQuantities
Workloads map[string]*workload.Info
WorkloadsNotReady sets.Set[string]
NamespaceSelector labels.Selector
Preemption kueue.ClusterQueuePreemption
FlavorFungibility kueue.FlavorFungibility
AdmissionChecks sets.Set[string]
Status metrics.ClusterQueueStatus
Name string
Cohort *Cohort
ResourceGroups []ResourceGroup
RGByResource map[corev1.ResourceName]*ResourceGroup
Usage FlavorResourceQuantities
Workloads map[string]*workload.Info
WorkloadsNotReady sets.Set[string]
NamespaceSelector labels.Selector
Preemption kueue.ClusterQueuePreemption
FlavorFungibility kueue.FlavorFungibility
AdmissionChecks sets.Set[string]
AdmissionCheckStrategy kueue.AdmissionChecksStrategy
Status metrics.ClusterQueueStatus
// GuaranteedQuota records how much resource quota the ClusterQueue reserved
// when feature LendingLimit is enabled and flavor's lendingLimit is not nil.
GuaranteedQuota FlavorResourceQuantities
Expand Down Expand Up @@ -200,6 +202,7 @@ func (c *ClusterQueue) update(in *kueue.ClusterQueue, resourceFlavors map[kueue.
c.isStopped = ptr.Deref(in.Spec.StopPolicy, kueue.None) != kueue.None

c.AdmissionChecks = sets.New(in.Spec.AdmissionChecks...)
c.AdmissionCheckStrategy = in.Spec.AdmissionChecksStrategy

c.Usage = filterFlavorQuantities(c.Usage, in.Spec.ResourceGroups)
c.AdmittedUsage = filterFlavorQuantities(c.AdmittedUsage, in.Spec.ResourceGroups)
Expand Down Expand Up @@ -414,9 +417,12 @@ func (c *ClusterQueue) updateLabelKeys(flavors map[kueue.ResourceFlavorReference
// updateWithAdmissionChecks updates a ClusterQueue based on the passed AdmissionChecks set.
func (c *ClusterQueue) updateWithAdmissionChecks(checks map[string]AdmissionCheck) {
hasMissing := false
checksPerController := make(map[string]int, len(c.AdmissionChecks))
singleInstanceControllers := sets.New[string]()
for acName := range c.AdmissionChecks {
queueChecks := c.AdmissionChecks.Clone()
queueChecks.Insert(slices.Map(c.AdmissionCheckStrategy.AdmissionChecks,
func(r *kueue.AdmissionCheckStrategyRule) string { return r.Name })...)
checksPerController := make(map[string]int, len(queueChecks))
for acName := range queueChecks {
if ac, found := checks[acName]; !found {
hasMissing = true
} else {
Expand Down
55 changes: 32 additions & 23 deletions pkg/cache/clusterqueue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -463,9 +463,16 @@ func TestClusterQueueUpdate(t *testing.T) {
}

func TestClusterQueueUpdateWithAdmissionCheck(t *testing.T) {
cq := utiltesting.MakeClusterQueue("cq").
cqs := []*kueue.ClusterQueue{utiltesting.MakeClusterQueue("cq").
AdmissionChecks("check1", "check2", "check3").
Obj()
Obj(),
utiltesting.MakeClusterQueue("cq2").
AdmissionCheckStrategy(
*utiltesting.MakeAdmissionCheckStrategyRule("check1").Obj(),
*utiltesting.MakeAdmissionCheckStrategyRule("check2").Obj(),
*utiltesting.MakeAdmissionCheckStrategyRule("check3").Obj()).
Obj(),
}

testcases := []struct {
name string
Expand Down Expand Up @@ -592,31 +599,33 @@ func TestClusterQueueUpdateWithAdmissionCheck(t *testing.T) {

for _, tc := range testcases {
t.Run(tc.name, func(t *testing.T) {
cache := New(utiltesting.NewFakeClient())
cq, err := cache.newClusterQueue(cq)
if err != nil {
t.Fatalf("failed to new clusterQueue %v", err)
}
for _, cq := range cqs {
cache := New(utiltesting.NewFakeClient())
cq, err := cache.newClusterQueue(cq)
if err != nil {
t.Fatalf("failed to new clusterQueue %v", err)
}

cq.Status = tc.cqStatus
cq.Status = tc.cqStatus

// Align the admission check related internals to the desired Status.
if tc.cqStatus == active {
cq.hasMultipleSingleInstanceControllersChecks = false
cq.hasMissingOrInactiveAdmissionChecks = false
} else {
cq.hasMultipleSingleInstanceControllersChecks = true
cq.hasMissingOrInactiveAdmissionChecks = true
}
cq.updateWithAdmissionChecks(tc.admissionChecks)
// Align the admission check related internals to the desired Status.
if tc.cqStatus == active {
cq.hasMultipleSingleInstanceControllersChecks = false
cq.hasMissingOrInactiveAdmissionChecks = false
} else {
cq.hasMultipleSingleInstanceControllersChecks = true
cq.hasMissingOrInactiveAdmissionChecks = true
}
cq.updateWithAdmissionChecks(tc.admissionChecks)

if cq.Status != tc.wantStatus {
t.Errorf("got different status, want: %v, got: %v", tc.wantStatus, cq.Status)
}
if cq.Status != tc.wantStatus {
t.Errorf("got different status, want: %v, got: %v", tc.wantStatus, cq.Status)
}

gotReason, _ := cq.inactiveReason()
if diff := cmp.Diff(tc.wantReason, gotReason); diff != "" {
t.Errorf("Unexpected inactiveReason (-want,+got):\n%s", diff)
gotReason, _ := cq.inactiveReason()
if diff := cmp.Diff(tc.wantReason, gotReason); diff != "" {
t.Errorf("Unexpected inactiveReason (-want,+got):\n%s", diff)
}
}
})
}
Expand Down
1 change: 1 addition & 0 deletions pkg/cache/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ func (c *ClusterQueue) snapshot() *ClusterQueue {
NamespaceSelector: c.NamespaceSelector,
Status: c.Status,
AdmissionChecks: c.AdmissionChecks.Clone(),
AdmissionCheckStrategy: c.AdmissionCheckStrategy,
}
for fName, rUsage := range c.Usage {
cc.Usage[fName] = maps.Clone(rUsage)
Expand Down
8 changes: 4 additions & 4 deletions pkg/controller/core/workload_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,13 +256,13 @@ func (r *WorkloadReconciler) reconcileCheckBasedEviction(ctx context.Context, wl
func (r *WorkloadReconciler) reconcileSyncAdmissionChecks(ctx context.Context, wl *kueue.Workload, cqName string) (bool, error) {
log := ctrl.LoggerFrom(ctx)

// because we need to react to API cluster queue events, the list of checks from a cache can lead to race conditions
queue := kueue.ClusterQueue{}
if err := r.client.Get(ctx, types.NamespacedName{Name: cqName}, &queue); err != nil {
// because we need to react to API cluster cq events, the list of checks from a cache can lead to race conditions
cq := kueue.ClusterQueue{}
if err := r.client.Get(ctx, types.NamespacedName{Name: cqName}, &cq); err != nil {
return false, err
}

admissionChecks := workload.AdmissionChecksForWorkload(&log, wl, &queue)
admissionChecks := workload.AdmissionChecksForWorkload(log, wl, sets.New(cq.Spec.AdmissionChecks...), &cq.Spec.AdmissionChecksStrategy)
newChecks, shouldUpdate := syncAdmissionCheckConditions(wl.Status.AdmissionChecks, admissionChecks)
if shouldUpdate {
log.V(3).Info("The workload needs admission checks updates", "clusterQueue", klog.KRef("", cqName), "admissionChecks", admissionChecks)
Expand Down
6 changes: 3 additions & 3 deletions pkg/controller/core/workload_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -601,13 +601,13 @@ func TestReconcile(t *testing.T) {

if tc.cq != nil {
if err := cl.Create(ctx, tc.cq); err != nil {
t.Error("couldn't create the cluster queue", err)
t.Errorf("couldn't create the cluster queue: %v", err)
}
if err := qManager.AddClusterQueue(ctx, tc.cq); err != nil {
t.Error("couldn't add the cluster queue to the cache", err)
t.Errorf("couldn't add the cluster queue to the cache: %v", err)
}
if err := qManager.AddLocalQueue(ctx, tc.lq); err != nil {
t.Error("couldn't add the local queue to the cache", err)
t.Errorf("couldn't add the local queue to the cache: %v", err)
}
}

Expand Down
10 changes: 3 additions & 7 deletions pkg/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ func (s *Scheduler) schedule(ctx context.Context) {
log.V(5).Info("Finished waiting for all admitted workloads to be in the PodsReady condition")
}
e.status = nominated
if err := s.admit(ctx, e, cq.Name); err != nil {
if err := s.admit(ctx, e, cq); err != nil {
e.inadmissibleMsg = fmt.Sprintf("Failed to admit workload: %v", err)
}
if cq.Cohort != nil {
Expand Down Expand Up @@ -506,7 +506,7 @@ func (s *Scheduler) validateLimitRange(ctx context.Context, wi *workload.Info) e
// admit sets the admitting clusterQueue and flavors into the workload of
// the entry, and asynchronously updates the object in the apiserver after
// assuming it in the cache.
func (s *Scheduler) admit(ctx context.Context, e *entry, cqName string) error {
func (s *Scheduler) admit(ctx context.Context, e *entry, cq *cache.ClusterQueue) error {
log := ctrl.LoggerFrom(ctx)
newWorkload := e.Obj.DeepCopy()
admission := &kueue.Admission{
Expand All @@ -515,11 +515,7 @@ func (s *Scheduler) admit(ctx context.Context, e *entry, cqName string) error {
}

workload.SetQuotaReservation(newWorkload, admission)
queue := &kueue.ClusterQueue{}
if err := s.client.Get(ctx, types.NamespacedName{Name: cqName}, queue); err != nil {
return err
}
if workload.HasAllChecks(newWorkload, workload.AdmissionChecksForWorkload(&log, newWorkload, queue)) {
if workload.HasAllChecks(newWorkload, workload.AdmissionChecksForWorkload(log, newWorkload, cq.AdmissionChecks, &cq.AdmissionCheckStrategy)) {
// sync Admitted, ignore the result since an API update is always done.
_ = workload.SyncAdmittedCondition(newWorkload)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1448,7 +1448,7 @@ func TestSchedule(t *testing.T) {
t.Fatalf("Inserting clusterQueue %s in manager: %v", cq.Name, err)
}
if err := cl.Create(ctx, &cq); err != nil {
t.Error("couldn't create the cluster queue", err)
t.Errorf("couldn't create the cluster queue: %v", err)
}
}
scheduler := New(qManager, cqCache, cl, recorder, WithFairSharing(tc.enableFairSharing))
Expand Down
Loading

0 comments on commit 18dd2fa

Please sign in to comment.