diff --git a/pkg/cache/cache_test.go b/pkg/cache/cache_test.go index 4ee005efb0..8bfab0b3de 100644 --- a/pkg/cache/cache_test.go +++ b/pkg/cache/cache_test.go @@ -1951,6 +1951,80 @@ func TestClusterQueueUsage(t *testing.T) { }, wantAdmittedWorkloads: 1, }, + "clusterQueue with cohort; partial admission": { + clusterQueue: cq, + workloads: []kueue.Workload{ + *utiltesting.MakeWorkload("partial-one", ""). + PodSets(*utiltesting.MakePodSet("main", 5).Request(corev1.ResourceCPU, "2").Obj()). + ReserveQuota(utiltesting.MakeAdmission("foo").Assignment(corev1.ResourceCPU, "default", "4000m").AssignmentPodCount(2).Obj()). + Admitted(true). + Obj(), + *utiltesting.MakeWorkload("partial-two", ""). + PodSets(*utiltesting.MakePodSet("main", 5).Request(corev1.ResourceCPU, "2").Obj()). + ReserveQuota(utiltesting.MakeAdmission("foo").Assignment(corev1.ResourceCPU, "default", "4000m").AssignmentPodCount(2).Obj()). + Obj(), + }, + wantReservedResources: []kueue.FlavorUsage{ + { + Name: "default", + Resources: []kueue.ResourceUsage{{ + Name: corev1.ResourceCPU, + Total: resource.MustParse("8"), + }}, + }, + { + Name: "model_a", + Resources: []kueue.ResourceUsage{{ + Name: "example.com/gpu", + }}, + }, + { + Name: "model_b", + Resources: []kueue.ResourceUsage{{ + Name: "example.com/gpu", + }}, + }, + { + Name: "interconnect_a", + Resources: []kueue.ResourceUsage{ + {Name: "example.com/vf-0"}, + {Name: "example.com/vf-1"}, + {Name: "example.com/vf-2"}, + }, + }, + }, + wantReservingWorkloads: 2, + wantUsedResources: []kueue.FlavorUsage{ + { + Name: "default", + Resources: []kueue.ResourceUsage{{ + Name: corev1.ResourceCPU, + Total: resource.MustParse("4"), + }}, + }, + { + Name: "model_a", + Resources: []kueue.ResourceUsage{{ + Name: "example.com/gpu", + }}, + }, + { + Name: "model_b", + Resources: []kueue.ResourceUsage{{ + Name: "example.com/gpu", + }}, + }, + { + Name: "interconnect_a", + Resources: []kueue.ResourceUsage{ + {Name: "example.com/vf-0"}, + {Name: "example.com/vf-1"}, + {Name: "example.com/vf-2"}, + }, + }, + }, + wantAdmittedWorkloads: 1, + }, } for name, tc := range cases { t.Run(name, func(t *testing.T) { diff --git a/pkg/workload/workload.go b/pkg/workload/workload.go index c0113c2cf3..b775bbd1de 100644 --- a/pkg/workload/workload.go +++ b/pkg/workload/workload.go @@ -318,12 +318,15 @@ func totalRequestsFromAdmission(wl *kueue.Workload) []PodSetResources { Requests: resources.NewRequests(psa.ResourceUsage), } - if count := currentCounts[psa.Name]; count != setRes.Count { + // If countAfterReclaim is lower then the admission count indicates that + // additional pods are marked as reclaimable, and the consumption should be scaled down. + if countAfterReclaim := currentCounts[psa.Name]; countAfterReclaim < setRes.Count { scaleDown(setRes.Requests, int64(setRes.Count)) - scaleUp(setRes.Requests, int64(count)) - setRes.Count = count + scaleUp(setRes.Requests, int64(countAfterReclaim)) + setRes.Count = countAfterReclaim } - + // Otherwise if countAfterReclaim is higher it means that the podSet was partially admitted + // and the count should be preserved. res = append(res, setRes) } return res diff --git a/pkg/workload/workload_test.go b/pkg/workload/workload_test.go index 0a9de5da28..c278aa8506 100644 --- a/pkg/workload/workload_test.go +++ b/pkg/workload/workload_test.go @@ -192,6 +192,78 @@ func TestNewInfo(t *testing.T) { }, }, }, + "admitted with reclaim and increased reclaim": { + workload: *utiltesting.MakeWorkload("", ""). + PodSets( + *utiltesting.MakePodSet("main", 5). + Request(corev1.ResourceCPU, "10m"). + Request(corev1.ResourceMemory, "10Ki"). + Obj(), + ). + ReserveQuota( + utiltesting.MakeAdmission(""). + Assignment(corev1.ResourceCPU, "f1", "30m"). + Assignment(corev1.ResourceMemory, "f1", "30Ki"). + AssignmentPodCount(3). + Obj(), + ). + ReclaimablePods( + kueue.ReclaimablePod{ + Name: "main", + Count: 3, + }, + ). + Obj(), + wantInfo: Info{ + TotalRequests: []PodSetResources{ + { + Name: "main", + Flavors: map[corev1.ResourceName]kueue.ResourceFlavorReference{ + corev1.ResourceCPU: "f1", + corev1.ResourceMemory: "f1", + }, + Requests: resources.Requests{ + corev1.ResourceCPU: 2 * 10, + corev1.ResourceMemory: 2 * 10 * 1024, + }, + Count: 2, + }, + }, + }, + }, + "partially admitted": { + workload: *utiltesting.MakeWorkload("", ""). + PodSets( + *utiltesting.MakePodSet("main", 5). + Request(corev1.ResourceCPU, "10m"). + Request(corev1.ResourceMemory, "10Ki"). + Obj(), + ). + ReserveQuota( + utiltesting.MakeAdmission(""). + Assignment(corev1.ResourceCPU, "f1", "30m"). + Assignment(corev1.ResourceMemory, "f1", "30Ki"). + AssignmentPodCount(3). + Obj(), + ). + Obj(), + wantInfo: Info{ + TotalRequests: []PodSetResources{ + { + Name: "main", + Flavors: map[corev1.ResourceName]kueue.ResourceFlavorReference{ + corev1.ResourceCPU: "f1", + corev1.ResourceMemory: "f1", + }, + Requests: resources.Requests{ + corev1.ResourceCPU: 3 * 10, + corev1.ResourceMemory: 3 * 10 * 1024, + }, + Count: 3, + }, + }, + }, + }, "filterResources": { workload: *utiltesting.MakeWorkload("", ""). Request(corev1.ResourceCPU, "10m"). diff --git a/test/integration/controller/jobs/job/job_controller_test.go b/test/integration/controller/jobs/job/job_controller_test.go index 4c22a215b3..36d910f6fb 100644 --- a/test/integration/controller/jobs/job/job_controller_test.go +++ b/test/integration/controller/jobs/job/job_controller_test.go @@ -27,6 +27,7 @@ import ( corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" apimeta "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/kubernetes/scheme" @@ -1673,6 +1674,23 @@ var _ = ginkgo.Describe("Interacting with scheduler", ginkgo.Ordered, ginkgo.Con gomega.Expect(*wl.Spec.PodSets[0].MinCount).To(gomega.BeEquivalentTo(1)) }) + ginkgo.By("checking the clusterqueue usage", func() { + updateCq := &kueue.ClusterQueue{} + gomega.Eventually(func(g gomega.Gomega) { + g.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(prodClusterQ), updateCq)).Should(gomega.Succeed()) + g.Expect(updateCq.Status.FlavorsUsage).To(gomega.ContainElement(kueue.FlavorUsage{ + Name: "on-demand", + Resources: []kueue.ResourceUsage{ + { + Name: corev1.ResourceCPU, + Total: resource.MustParse("4"), + Borrowed: resource.MustParse("0"), + }, + }, + })) + }, util.Timeout, util.Interval).Should(gomega.Succeed()) + }) + ginkgo.By("delete the localQueue to prevent readmission", func() { gomega.Expect(util.DeleteObject(ctx, k8sClient, prodLocalQ)).Should(gomega.Succeed()) })