Skip to content

Commit

Permalink
[workload]Fix resource consumption computation for partially admitted…
Browse files Browse the repository at this point in the history
… workloads (#3118)
  • Loading branch information
trasc authored Sep 26, 2024
1 parent 45bc472 commit f24e13b
Show file tree
Hide file tree
Showing 4 changed files with 171 additions and 4 deletions.
74 changes: 74 additions & 0 deletions pkg/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1951,6 +1951,80 @@ func TestClusterQueueUsage(t *testing.T) {
},
wantAdmittedWorkloads: 1,
},
"clusterQueue with cohort; partial admission": {
clusterQueue: cq,
workloads: []kueue.Workload{
*utiltesting.MakeWorkload("partial-one", "").
PodSets(*utiltesting.MakePodSet("main", 5).Request(corev1.ResourceCPU, "2").Obj()).
ReserveQuota(utiltesting.MakeAdmission("foo").Assignment(corev1.ResourceCPU, "default", "4000m").AssignmentPodCount(2).Obj()).
Admitted(true).
Obj(),
*utiltesting.MakeWorkload("partial-two", "").
PodSets(*utiltesting.MakePodSet("main", 5).Request(corev1.ResourceCPU, "2").Obj()).
ReserveQuota(utiltesting.MakeAdmission("foo").Assignment(corev1.ResourceCPU, "default", "4000m").AssignmentPodCount(2).Obj()).
Obj(),
},
wantReservedResources: []kueue.FlavorUsage{
{
Name: "default",
Resources: []kueue.ResourceUsage{{
Name: corev1.ResourceCPU,
Total: resource.MustParse("8"),
}},
},
{
Name: "model_a",
Resources: []kueue.ResourceUsage{{
Name: "example.com/gpu",
}},
},
{
Name: "model_b",
Resources: []kueue.ResourceUsage{{
Name: "example.com/gpu",
}},
},
{
Name: "interconnect_a",
Resources: []kueue.ResourceUsage{
{Name: "example.com/vf-0"},
{Name: "example.com/vf-1"},
{Name: "example.com/vf-2"},
},
},
},
wantReservingWorkloads: 2,
wantUsedResources: []kueue.FlavorUsage{
{
Name: "default",
Resources: []kueue.ResourceUsage{{
Name: corev1.ResourceCPU,
Total: resource.MustParse("4"),
}},
},
{
Name: "model_a",
Resources: []kueue.ResourceUsage{{
Name: "example.com/gpu",
}},
},
{
Name: "model_b",
Resources: []kueue.ResourceUsage{{
Name: "example.com/gpu",
}},
},
{
Name: "interconnect_a",
Resources: []kueue.ResourceUsage{
{Name: "example.com/vf-0"},
{Name: "example.com/vf-1"},
{Name: "example.com/vf-2"},
},
},
},
wantAdmittedWorkloads: 1,
},
}
for name, tc := range cases {
t.Run(name, func(t *testing.T) {
Expand Down
11 changes: 7 additions & 4 deletions pkg/workload/workload.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,12 +318,15 @@ func totalRequestsFromAdmission(wl *kueue.Workload) []PodSetResources {
Requests: resources.NewRequests(psa.ResourceUsage),
}

if count := currentCounts[psa.Name]; count != setRes.Count {
// If countAfterReclaim is lower then the admission count indicates that
// additional pods are marked as reclaimable, and the consumption should be scaled down.
if countAfterReclaim := currentCounts[psa.Name]; countAfterReclaim < setRes.Count {
scaleDown(setRes.Requests, int64(setRes.Count))
scaleUp(setRes.Requests, int64(count))
setRes.Count = count
scaleUp(setRes.Requests, int64(countAfterReclaim))
setRes.Count = countAfterReclaim
}

// Otherwise if countAfterReclaim is higher it means that the podSet was partially admitted
// and the count should be preserved.
res = append(res, setRes)
}
return res
Expand Down
72 changes: 72 additions & 0 deletions pkg/workload/workload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,78 @@ func TestNewInfo(t *testing.T) {
},
},
},
"admitted with reclaim and increased reclaim": {
workload: *utiltesting.MakeWorkload("", "").
PodSets(
*utiltesting.MakePodSet("main", 5).
Request(corev1.ResourceCPU, "10m").
Request(corev1.ResourceMemory, "10Ki").
Obj(),
).
ReserveQuota(
utiltesting.MakeAdmission("").
Assignment(corev1.ResourceCPU, "f1", "30m").
Assignment(corev1.ResourceMemory, "f1", "30Ki").
AssignmentPodCount(3).
Obj(),
).
ReclaimablePods(
kueue.ReclaimablePod{
Name: "main",
Count: 3,
},
).
Obj(),
wantInfo: Info{
TotalRequests: []PodSetResources{
{
Name: "main",
Flavors: map[corev1.ResourceName]kueue.ResourceFlavorReference{
corev1.ResourceCPU: "f1",
corev1.ResourceMemory: "f1",
},
Requests: resources.Requests{
corev1.ResourceCPU: 2 * 10,
corev1.ResourceMemory: 2 * 10 * 1024,
},
Count: 2,
},
},
},
},
"partially admitted": {
workload: *utiltesting.MakeWorkload("", "").
PodSets(
*utiltesting.MakePodSet("main", 5).
Request(corev1.ResourceCPU, "10m").
Request(corev1.ResourceMemory, "10Ki").
Obj(),
).
ReserveQuota(
utiltesting.MakeAdmission("").
Assignment(corev1.ResourceCPU, "f1", "30m").
Assignment(corev1.ResourceMemory, "f1", "30Ki").
AssignmentPodCount(3).
Obj(),
).
Obj(),
wantInfo: Info{
TotalRequests: []PodSetResources{
{
Name: "main",
Flavors: map[corev1.ResourceName]kueue.ResourceFlavorReference{
corev1.ResourceCPU: "f1",
corev1.ResourceMemory: "f1",
},
Requests: resources.Requests{
corev1.ResourceCPU: 3 * 10,
corev1.ResourceMemory: 3 * 10 * 1024,
},
Count: 3,
},
},
},
},
"filterResources": {
workload: *utiltesting.MakeWorkload("", "").
Request(corev1.ResourceCPU, "10m").
Expand Down
18 changes: 18 additions & 0 deletions test/integration/controller/jobs/job/job_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
apimeta "k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes/scheme"
Expand Down Expand Up @@ -1673,6 +1674,23 @@ var _ = ginkgo.Describe("Interacting with scheduler", ginkgo.Ordered, ginkgo.Con
gomega.Expect(*wl.Spec.PodSets[0].MinCount).To(gomega.BeEquivalentTo(1))
})

ginkgo.By("checking the clusterqueue usage", func() {
updateCq := &kueue.ClusterQueue{}
gomega.Eventually(func(g gomega.Gomega) {
g.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(prodClusterQ), updateCq)).Should(gomega.Succeed())
g.Expect(updateCq.Status.FlavorsUsage).To(gomega.ContainElement(kueue.FlavorUsage{
Name: "on-demand",
Resources: []kueue.ResourceUsage{
{
Name: corev1.ResourceCPU,
Total: resource.MustParse("4"),
Borrowed: resource.MustParse("0"),
},
},
}))
}, util.Timeout, util.Interval).Should(gomega.Succeed())
})

ginkgo.By("delete the localQueue to prevent readmission", func() {
gomega.Expect(util.DeleteObject(ctx, k8sClient, prodLocalQ)).Should(gomega.Succeed())
})
Expand Down

0 comments on commit f24e13b

Please sign in to comment.