Skip to content

Commit

Permalink
Improve unit tests, improve integration tests, change ClusterQueue cache
Browse files Browse the repository at this point in the history
  • Loading branch information
PBundyra committed Apr 16, 2024
1 parent 597959a commit 2dfd74d
Show file tree
Hide file tree
Showing 11 changed files with 243 additions and 112 deletions.
7 changes: 1 addition & 6 deletions pkg/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -728,14 +728,9 @@ func (c *Cache) ClusterQueuesUsingAdmissionCheck(ac string) []string {
var cqs []string

for _, cq := range c.clusterQueues {
if cq.AdmissionChecks.Has(ac) {
if _, found := cq.AdmissionChecks[ac]; found {
cqs = append(cqs, cq.Name)
}
for _, check := range cq.AdmissionCheckStrategy.AdmissionChecks {
if check.Name == ac {
cqs = append(cqs, cq.Name)
}
}
}
return cqs
}
Expand Down
23 changes: 16 additions & 7 deletions pkg/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -964,7 +964,10 @@ func TestCacheClusterQueueOperations(t *testing.T) {
Preemption: defaultPreemption,
AllocatableResourceGeneration: 1,
FlavorFungibility: defaultFlavorFungibility,
AdmissionChecks: sets.New("check1", "check2"),
AdmissionChecks: map[string]sets.Set[string]{
"check1": sets.New[string](),
"check2": sets.New[string](),
},
},
},
wantCohorts: map[string]sets.Set[string]{},
Expand Down Expand Up @@ -992,8 +995,10 @@ func TestCacheClusterQueueOperations(t *testing.T) {
Preemption: defaultPreemption,
AllocatableResourceGeneration: 1,
FlavorFungibility: defaultFlavorFungibility,
AdmissionChecks: sets.New("check1", "check2"),
},
AdmissionChecks: map[string]sets.Set[string]{
"check1": sets.New[string](),
"check2": sets.New[string](),
}},
},
wantCohorts: map[string]sets.Set[string]{},
},
Expand Down Expand Up @@ -1021,8 +1026,10 @@ func TestCacheClusterQueueOperations(t *testing.T) {
Preemption: defaultPreemption,
AllocatableResourceGeneration: 1,
FlavorFungibility: defaultFlavorFungibility,
AdmissionChecks: sets.New("check1", "check2"),
},
AdmissionChecks: map[string]sets.Set[string]{
"check1": sets.New[string](),
"check2": sets.New[string](),
}},
},
wantCohorts: map[string]sets.Set[string]{},
},
Expand Down Expand Up @@ -1050,8 +1057,10 @@ func TestCacheClusterQueueOperations(t *testing.T) {
Preemption: defaultPreemption,
AllocatableResourceGeneration: 1,
FlavorFungibility: defaultFlavorFungibility,
AdmissionChecks: sets.New("check1", "check2"),
},
AdmissionChecks: map[string]sets.Set[string]{
"check1": sets.New[string](),
"check2": sets.New[string](),
}},
},
wantCohorts: map[string]sets.Set[string]{},
},
Expand Down
40 changes: 19 additions & 21 deletions pkg/cache/clusterqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import (
kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
"sigs.k8s.io/kueue/pkg/features"
"sigs.k8s.io/kueue/pkg/metrics"
"sigs.k8s.io/kueue/pkg/util/slices"
utilac "sigs.k8s.io/kueue/pkg/util/admissioncheck"
"sigs.k8s.io/kueue/pkg/workload"
)

Expand All @@ -53,19 +53,21 @@ type ResourceStats map[corev1.ResourceName]*QuotaStats
// ClusterQueue is the internal implementation of kueue.ClusterQueue that
// holds admitted workloads.
type ClusterQueue struct {
Name string
Cohort *Cohort
ResourceGroups []ResourceGroup
RGByResource map[corev1.ResourceName]*ResourceGroup
Usage FlavorResourceQuantities
Workloads map[string]*workload.Info
WorkloadsNotReady sets.Set[string]
NamespaceSelector labels.Selector
Preemption kueue.ClusterQueuePreemption
FlavorFungibility kueue.FlavorFungibility
AdmissionChecks sets.Set[string]
AdmissionCheckStrategy kueue.AdmissionChecksStrategy
Status metrics.ClusterQueueStatus
Name string
Cohort *Cohort
ResourceGroups []ResourceGroup
RGByResource map[corev1.ResourceName]*ResourceGroup
Usage FlavorResourceQuantities
Workloads map[string]*workload.Info
WorkloadsNotReady sets.Set[string]
NamespaceSelector labels.Selector
Preemption kueue.ClusterQueuePreemption
FlavorFungibility kueue.FlavorFungibility
// Aggregates AdmissionChecks from both .spec.AdmissionChecks and .spec.AdmissionCheckStrategy
// Sets hold ResourceFlavors to which an AdmissionCheck should apply.
// In case its empty, it means an AdmissionCheck should apply to all ResourceFlavor
AdmissionChecks map[string]sets.Set[string]
Status metrics.ClusterQueueStatus
// GuaranteedQuota records how much resource quota the ClusterQueue reserved
// when feature LendingLimit is enabled and flavor's lendingLimit is not nil.
GuaranteedQuota FlavorResourceQuantities
Expand Down Expand Up @@ -201,8 +203,7 @@ func (c *ClusterQueue) update(in *kueue.ClusterQueue, resourceFlavors map[kueue.

c.isStopped = ptr.Deref(in.Spec.StopPolicy, kueue.None) != kueue.None

c.AdmissionChecks = sets.New(in.Spec.AdmissionChecks...)
c.AdmissionCheckStrategy = in.Spec.AdmissionChecksStrategy
c.AdmissionChecks = utilac.NewAdmissionChecks(in)

c.Usage = filterFlavorQuantities(c.Usage, in.Spec.ResourceGroups)
c.AdmittedUsage = filterFlavorQuantities(c.AdmittedUsage, in.Spec.ResourceGroups)
Expand Down Expand Up @@ -418,11 +419,8 @@ func (c *ClusterQueue) updateLabelKeys(flavors map[kueue.ResourceFlavorReference
func (c *ClusterQueue) updateWithAdmissionChecks(checks map[string]AdmissionCheck) {
hasMissing := false
singleInstanceControllers := sets.New[string]()
queueChecks := c.AdmissionChecks.Clone()
queueChecks.Insert(slices.Map(c.AdmissionCheckStrategy.AdmissionChecks,
func(r *kueue.AdmissionCheckStrategyRule) string { return r.Name })...)
checksPerController := make(map[string]int, len(queueChecks))
for acName := range queueChecks {
checksPerController := make(map[string]int, len(c.AdmissionChecks))
for acName, _ := range c.AdmissionChecks {
if ac, found := checks[acName]; !found {
hasMissing = true
} else {
Expand Down
189 changes: 157 additions & 32 deletions pkg/cache/clusterqueue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -463,26 +463,49 @@ func TestClusterQueueUpdate(t *testing.T) {
}

func TestClusterQueueUpdateWithAdmissionCheck(t *testing.T) {
cqs := []*kueue.ClusterQueue{utiltesting.MakeClusterQueue("cq").
cqWithAC := utiltesting.MakeClusterQueue("cq").
AdmissionChecks("check1", "check2", "check3").
Obj(),
utiltesting.MakeClusterQueue("cq2").
AdmissionCheckStrategy(
*utiltesting.MakeAdmissionCheckStrategyRule("check1").Obj(),
*utiltesting.MakeAdmissionCheckStrategyRule("check2").Obj(),
*utiltesting.MakeAdmissionCheckStrategyRule("check3").Obj()).
Obj(),
}
Obj()

cqWithACStrategy := utiltesting.MakeClusterQueue("cq2").
AdmissionCheckStrategy(
*utiltesting.MakeAdmissionCheckStrategyRule("check1").Obj(),
*utiltesting.MakeAdmissionCheckStrategyRule("check2").Obj(),
*utiltesting.MakeAdmissionCheckStrategyRule("check3").Obj()).
Obj()

testcases := []struct {
name string
cq *kueue.ClusterQueue
cqStatus metrics.ClusterQueueStatus
admissionChecks map[string]AdmissionCheck
wantStatus metrics.ClusterQueueStatus
wantReason string
}{
{
name: "Pending clusterQueue updated valid AC list",
cq: cqWithAC,
cqStatus: pending,
admissionChecks: map[string]AdmissionCheck{
"check1": {
Active: true,
Controller: "controller1",
},
"check2": {
Active: true,
Controller: "controller2",
},
"check3": {
Active: true,
Controller: "controller3",
},
},
wantStatus: active,
wantReason: "Ready",
},
{
name: "Pending clusterQueue with an AC strategy updated valid AC list",
cq: cqWithACStrategy,
cqStatus: pending,
admissionChecks: map[string]AdmissionCheck{
"check1": {
Expand All @@ -503,6 +526,24 @@ func TestClusterQueueUpdateWithAdmissionCheck(t *testing.T) {
},
{
name: "Active clusterQueue updated with not found AC",
cq: cqWithAC,
cqStatus: active,
admissionChecks: map[string]AdmissionCheck{
"check1": {
Active: true,
Controller: "controller1",
},
"check2": {
Active: true,
Controller: "controller2",
},
},
wantStatus: pending,
wantReason: "CheckNotFoundOrInactive",
},
{
name: "Active clusterQueue with an AC strategy updated with not found AC",
cq: cqWithACStrategy,
cqStatus: active,
admissionChecks: map[string]AdmissionCheck{
"check1": {
Expand All @@ -519,6 +560,28 @@ func TestClusterQueueUpdateWithAdmissionCheck(t *testing.T) {
},
{
name: "Active clusterQueue updated with inactive AC",
cq: cqWithAC,
cqStatus: active,
admissionChecks: map[string]AdmissionCheck{
"check1": {
Active: true,
Controller: "controller1",
},
"check2": {
Active: true,
Controller: "controller2",
},
"check3": {
Active: false,
Controller: "controller3",
},
},
wantStatus: pending,
wantReason: "CheckNotFoundOrInactive",
},
{
name: "Active clusterQueue with an AC strategy updated with inactive AC",
cq: cqWithACStrategy,
cqStatus: active,
admissionChecks: map[string]AdmissionCheck{
"check1": {
Expand All @@ -539,6 +602,30 @@ func TestClusterQueueUpdateWithAdmissionCheck(t *testing.T) {
},
{
name: "Active clusterQueue updated with duplicate single instance AC Controller",
cq: cqWithAC,
cqStatus: active,
admissionChecks: map[string]AdmissionCheck{
"check1": {
Active: true,
Controller: "controller1",
SingleInstanceInClusterQueue: true,
},
"check2": {
Active: true,
Controller: "controller2",
},
"check3": {
Active: true,
Controller: "controller2",
SingleInstanceInClusterQueue: true,
},
},
wantStatus: pending,
wantReason: "MultipleSingleInstanceControllerChecks",
},
{
name: "Active clusterQueue with an AC strategy updated with duplicate single instance AC Controller",
cq: cqWithACStrategy,
cqStatus: active,
admissionChecks: map[string]AdmissionCheck{
"check1": {
Expand All @@ -561,6 +648,28 @@ func TestClusterQueueUpdateWithAdmissionCheck(t *testing.T) {
},
{
name: "Terminating clusterQueue updated with valid AC list",
cq: cqWithAC,
cqStatus: terminating,
admissionChecks: map[string]AdmissionCheck{
"check1": {
Active: true,
Controller: "controller1",
},
"check2": {
Active: true,
Controller: "controller2",
},
"check3": {
Active: true,
Controller: "controller3",
},
},
wantStatus: terminating,
wantReason: "Terminating",
},
{
name: "Terminating clusterQueue with an AC strategy updated with valid AC list",
cq: cqWithACStrategy,
cqStatus: terminating,
admissionChecks: map[string]AdmissionCheck{
"check1": {
Expand All @@ -581,6 +690,24 @@ func TestClusterQueueUpdateWithAdmissionCheck(t *testing.T) {
},
{
name: "Terminating clusterQueue updated with not found AC",
cq: cqWithAC,
cqStatus: terminating,
admissionChecks: map[string]AdmissionCheck{
"check1": {
Active: true,
Controller: "controller1",
},
"check2": {
Active: true,
Controller: "controller2",
},
},
wantStatus: terminating,
wantReason: "Terminating",
},
{
name: "Terminating clusterQueue with an AC strategy updated with not found AC",
cq: cqWithACStrategy,
cqStatus: terminating,
admissionChecks: map[string]AdmissionCheck{
"check1": {
Expand All @@ -599,33 +726,31 @@ func TestClusterQueueUpdateWithAdmissionCheck(t *testing.T) {

for _, tc := range testcases {
t.Run(tc.name, func(t *testing.T) {
for _, cq := range cqs {
cache := New(utiltesting.NewFakeClient())
cq, err := cache.newClusterQueue(cq)
if err != nil {
t.Fatalf("failed to new clusterQueue %v", err)
}
cache := New(utiltesting.NewFakeClient())
cq, err := cache.newClusterQueue(tc.cq)
if err != nil {
t.Fatalf("failed to new clusterQueue %v", err)
}

cq.Status = tc.cqStatus
cq.Status = tc.cqStatus

// Align the admission check related internals to the desired Status.
if tc.cqStatus == active {
cq.hasMultipleSingleInstanceControllersChecks = false
cq.hasMissingOrInactiveAdmissionChecks = false
} else {
cq.hasMultipleSingleInstanceControllersChecks = true
cq.hasMissingOrInactiveAdmissionChecks = true
}
cq.updateWithAdmissionChecks(tc.admissionChecks)
// Align the admission check related internals to the desired Status.
if tc.cqStatus == active {
cq.hasMultipleSingleInstanceControllersChecks = false
cq.hasMissingOrInactiveAdmissionChecks = false
} else {
cq.hasMultipleSingleInstanceControllersChecks = true
cq.hasMissingOrInactiveAdmissionChecks = true
}
cq.updateWithAdmissionChecks(tc.admissionChecks)

if cq.Status != tc.wantStatus {
t.Errorf("got different status, want: %v, got: %v", tc.wantStatus, cq.Status)
}
if cq.Status != tc.wantStatus {
t.Errorf("got different status, want: %v, got: %v", tc.wantStatus, cq.Status)
}

gotReason, _ := cq.inactiveReason()
if diff := cmp.Diff(tc.wantReason, gotReason); diff != "" {
t.Errorf("Unexpected inactiveReason (-want,+got):\n%s", diff)
}
gotReason, _ := cq.inactiveReason()
if diff := cmp.Diff(tc.wantReason, gotReason); diff != "" {
t.Errorf("Unexpected inactiveReason (-want,+got):\n%s", diff)
}
})
}
Expand Down
Loading

0 comments on commit 2dfd74d

Please sign in to comment.