Skip to content

Commit

Permalink
TAS: Compute the topology assignments (#3256)
Browse files Browse the repository at this point in the history
* TAS: Compute the topology assignments

* review comments

* Cleanup the extra Snapshot function
  • Loading branch information
mimowo authored Oct 22, 2024
1 parent 2701822 commit 0afb766
Show file tree
Hide file tree
Showing 34 changed files with 2,679 additions and 54 deletions.
2 changes: 2 additions & 0 deletions charts/kueue/templates/rbac/role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ rules:
resources:
- limitranges
- namespaces
- nodes
verbs:
- get
- list
Expand Down Expand Up @@ -247,6 +248,7 @@ rules:
- multikueueclusters
- multikueueconfigs
- provisioningrequestconfigs
- topologies
- workloadpriorityclasses
verbs:
- get
Expand Down
2 changes: 2 additions & 0 deletions config/components/rbac/role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ rules:
resources:
- limitranges
- namespaces
- nodes
verbs:
- get
- list
Expand Down Expand Up @@ -246,6 +247,7 @@ rules:
- multikueueclusters
- multikueueconfigs
- provisioningrequestconfigs
- topologies
- workloadpriorityclasses
verbs:
- get
Expand Down
20 changes: 20 additions & 0 deletions pkg/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@ type Cache struct {
fairSharingEnabled bool

hm hierarchy.Manager[*clusterQueue, *cohort]

tasCache TASCache
}

func New(client client.Client, opts ...Option) *Cache {
Expand All @@ -125,6 +127,7 @@ func New(client client.Client, opts ...Option) *Cache {
workloadInfoOptions: options.workloadInfoOptions,
fairSharingEnabled: options.fairSharingEnabled,
hm: hierarchy.NewManager[*clusterQueue, *cohort](newCohort),
tasCache: NewTASCache(client),
}
c.podsReadyCond.L = &c.RWMutex
return c
Expand All @@ -140,6 +143,7 @@ func (c *Cache) newClusterQueue(cq *kueue.ClusterQueue) (*clusterQueue, error) {
workloadInfoOptions: c.workloadInfoOptions,
AdmittedUsage: make(resources.FlavorResourceQuantities),
resourceNode: NewResourceNode(),
tasCache: &c.tasCache,
}
c.hm.AddClusterQueue(cqImpl)
c.hm.UpdateClusterQueueEdge(cq.Name, cq.Spec.Cohort)
Expand Down Expand Up @@ -225,6 +229,22 @@ func (c *Cache) updateClusterQueues() sets.Set[string] {
return cqs
}

func (c *Cache) ActiveClusterQueues() sets.Set[string] {
c.RLock()
defer c.RUnlock()
cqs := sets.New[string]()
for _, cq := range c.hm.ClusterQueues {
if cq.Status == active {
cqs.Insert(cq.Name)
}
}
return cqs
}

func (c *Cache) TASCache() *TASCache {
return &c.tasCache
}

func (c *Cache) AddOrUpdateResourceFlavor(rf *kueue.ResourceFlavor) sets.Set[string] {
c.Lock()
defer c.Unlock()
Expand Down
21 changes: 21 additions & 0 deletions pkg/cache/clusterqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"k8s.io/utils/ptr"

kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
"sigs.k8s.io/kueue/pkg/features"
"sigs.k8s.io/kueue/pkg/hierarchy"
"sigs.k8s.io/kueue/pkg/metrics"
"sigs.k8s.io/kueue/pkg/resources"
Expand Down Expand Up @@ -83,6 +84,8 @@ type clusterQueue struct {

resourceNode ResourceNode
hierarchy.ClusterQueue[*cohort]

tasCache *TASCache
}

func (c *clusterQueue) GetName() string {
Expand Down Expand Up @@ -426,12 +429,20 @@ func (c *clusterQueue) reportActiveWorkloads() {
func (c *clusterQueue) updateWorkloadUsage(wi *workload.Info, m int64) {
admitted := workload.IsAdmitted(wi.Obj)
frUsage := wi.FlavorResourceUsage()
tasUsage := wi.TASUsage()
for fr, q := range frUsage {
tasFlvCache := c.tasFlavorCache(fr.Flavor)
if m == 1 {
addUsage(c, fr, q)
if tasFlvCache != nil {
tasFlvCache.addUsage(tasUsage)
}
}
if m == -1 {
removeUsage(c, fr, q)
if tasFlvCache != nil {
tasFlvCache.removeUsage(tasUsage)
}
}
}
if admitted {
Expand All @@ -449,6 +460,16 @@ func (c *clusterQueue) updateWorkloadUsage(wi *workload.Info, m int64) {
}
}

func (c *clusterQueue) tasFlavorCache(flvName kueue.ResourceFlavorReference) *TASFlavorCache {
if !features.Enabled(features.TopologyAwareScheduling) {
return nil
}
if c.tasCache == nil {
return nil
}
return c.tasCache.Get(flvName)
}

func updateFlavorUsage(newUsage resources.FlavorResourceQuantities, oldUsage resources.FlavorResourceQuantities, m int64) {
for fr, q := range newUsage {
oldUsage[fr] += q * m
Expand Down
2 changes: 2 additions & 0 deletions pkg/cache/clusterqueue_snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ type ClusterQueueSnapshot struct {

ResourceNode ResourceNode
hierarchy.ClusterQueue[*CohortSnapshot]

TASFlavors map[kueue.ResourceFlavorReference]*TASFlavorSnapshot
}

// RGByResource returns the ResourceGroup which contains capacity
Expand Down
8 changes: 5 additions & 3 deletions pkg/cache/clusterqueue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,7 @@ func TestFitInCohort(t *testing.T) {

for name, tc := range cases {
t.Run(name, func(t *testing.T) {
ctx, _ := utiltesting.ContextWithLog(t)
if tc.disableLendingLimit {
features.SetFeatureGateDuringTest(t, features.LendingLimit, false)
}
Expand All @@ -381,7 +382,7 @@ func TestFitInCohort(t *testing.T) {
_ = cache.AddClusterQueue(context.Background(), cq)
}

snapshot := cache.Snapshot()
snapshot := cache.Snapshot(ctx)
cq := snapshot.ClusterQueues["CQ"]
cq.AddUsage(tc.usage)

Expand Down Expand Up @@ -467,7 +468,7 @@ func TestClusterQueueUpdate(t *testing.T) {
if err := cqCache.UpdateClusterQueue(tc.newcq); err != nil {
t.Fatalf("Updating clusterQueue %s in cache: %v", tc.newcq.Name, err)
}
snapshot := cqCache.Snapshot()
snapshot := cqCache.Snapshot(ctx)
if diff := cmp.Diff(
tc.wantLastAssignmentGeneration,
snapshot.ClusterQueues["eng-alpha"].AllocatableResourceGeneration); diff != "" {
Expand Down Expand Up @@ -1072,6 +1073,7 @@ func TestDominantResourceShare(t *testing.T) {
}
for name, tc := range cases {
t.Run(name, func(t *testing.T) {
ctx, _ := utiltesting.ContextWithLog(t)
cache := New(utiltesting.NewFakeClient())
cache.AddOrUpdateResourceFlavor(utiltesting.MakeResourceFlavor("default").Obj())
cache.AddOrUpdateResourceFlavor(utiltesting.MakeResourceFlavor("on-demand").Obj())
Expand All @@ -1084,7 +1086,7 @@ func TestDominantResourceShare(t *testing.T) {
_ = cache.AddClusterQueue(context.Background(), tc.lendingClusterQueue)
}

snapshot := cache.Snapshot()
snapshot := cache.Snapshot(ctx)

i := 0
for fr, v := range tc.usage {
Expand Down
2 changes: 1 addition & 1 deletion pkg/cache/resource_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,7 @@ func TestAvailable(t *testing.T) {
_ = cache.AddOrUpdateCohort(&cohort)
}

snapshot := cache.Snapshot()
snapshot := cache.Snapshot(ctx)

// before adding usage
{
Expand Down
21 changes: 19 additions & 2 deletions pkg/cache/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@ limitations under the License.
package cache

import (
"context"
"maps"

"github.com/go-logr/logr"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/klog/v2"

kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
"sigs.k8s.io/kueue/pkg/features"
"sigs.k8s.io/kueue/pkg/hierarchy"
utilmaps "sigs.k8s.io/kueue/pkg/util/maps"
"sigs.k8s.io/kueue/pkg/workload"
Expand Down Expand Up @@ -75,7 +77,7 @@ func (s *Snapshot) Log(log logr.Logger) {
}
}

func (c *Cache) Snapshot() Snapshot {
func (c *Cache) Snapshot(ctx context.Context) Snapshot {
c.RLock()
defer c.RUnlock()

Expand All @@ -94,15 +96,29 @@ func (c *Cache) Snapshot() Snapshot {
snap.UpdateCohortEdge(cohort.Name, cohort.Parent().Name)
}
}
tasSnapshots := make(map[kueue.ResourceFlavorReference]*TASFlavorSnapshot)
if features.Enabled(features.TopologyAwareScheduling) {
for key, cache := range c.tasCache.Clone() {
tasSnapshots[key] = cache.snapshot(ctx)
}
}
for _, cq := range c.hm.ClusterQueues {
if !cq.Active() || (cq.HasParent() && c.hm.CycleChecker.HasCycle(cq.Parent())) {
snap.InactiveClusterQueueSets.Insert(cq.Name)
continue
}
snap.AddClusterQueue(snapshotClusterQueue(cq))
cqSnapshot := snapshotClusterQueue(cq)
snap.AddClusterQueue(cqSnapshot)
if cq.HasParent() {
snap.UpdateClusterQueueEdge(cq.Name, cq.Parent().Name)
}
if features.Enabled(features.TopologyAwareScheduling) {
for tasFlv, s := range tasSnapshots {
if cq.flavorInUse(string(tasFlv)) {
cqSnapshot.TASFlavors[tasFlv] = s
}
}
}
}
for name, rf := range c.resourceFlavors {
// Shallow copy is enough
Expand All @@ -126,6 +142,7 @@ func snapshotClusterQueue(c *clusterQueue) *ClusterQueueSnapshot {
Status: c.Status,
AdmissionChecks: utilmaps.DeepCopySets[kueue.ResourceFlavorReference](c.AdmissionChecks),
ResourceNode: c.resourceNode.Clone(),
TASFlavors: make(map[kueue.ResourceFlavorReference]*TASFlavorSnapshot),
}
for i, rg := range c.ResourceGroups {
cc.ResourceGroups[i] = rg.Clone()
Expand Down
11 changes: 6 additions & 5 deletions pkg/cache/snapshot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -878,6 +878,7 @@ func TestSnapshot(t *testing.T) {

for name, tc := range testCases {
t.Run(name, func(t *testing.T) {
ctx, _ := utiltesting.ContextWithLog(t)
if tc.disableLendingLimit {
features.SetFeatureGateDuringTest(t, features.LendingLimit, false)
}
Expand All @@ -898,7 +899,7 @@ func TestSnapshot(t *testing.T) {
for _, wl := range tc.wls {
cache.AddOrUpdateWorkload(wl)
}
snapshot := cache.Snapshot()
snapshot := cache.Snapshot(ctx)
if diff := cmp.Diff(tc.wantSnapshot, snapshot, snapCmpOpts...); len(diff) != 0 {
t.Errorf("Unexpected Snapshot (-want,+got):\n%s", diff)
}
Expand Down Expand Up @@ -981,7 +982,7 @@ func TestSnapshotAddRemoveWorkload(t *testing.T) {
wlInfos[workload.Key(wl.Obj)] = wl
}
}
initialSnapshot := cqCache.Snapshot()
initialSnapshot := cqCache.Snapshot(ctx)
initialCohortResources := initialSnapshot.ClusterQueues["c1"].Parent().ResourceNode.SubtreeQuota
cases := map[string]struct {
remove []string
Expand Down Expand Up @@ -1192,7 +1193,7 @@ func TestSnapshotAddRemoveWorkload(t *testing.T) {
cmpopts.IgnoreTypes(&workload.Info{}))
for name, tc := range cases {
t.Run(name, func(t *testing.T) {
snap := cqCache.Snapshot()
snap := cqCache.Snapshot(ctx)
for _, name := range tc.remove {
snap.RemoveWorkload(wlInfos[name])
}
Expand Down Expand Up @@ -1269,7 +1270,7 @@ func TestSnapshotAddRemoveWorkloadWithLendingLimit(t *testing.T) {
wlInfos[workload.Key(wl.Obj)] = wl
}
}
initialSnapshot := cqCache.Snapshot()
initialSnapshot := cqCache.Snapshot(ctx)
initialCohortResources := initialSnapshot.ClusterQueues["lend-a"].Parent().ResourceNode.SubtreeQuota
cases := map[string]struct {
remove []string
Expand Down Expand Up @@ -1668,7 +1669,7 @@ func TestSnapshotAddRemoveWorkloadWithLendingLimit(t *testing.T) {
cmpopts.IgnoreTypes(&workload.Info{}))
for name, tc := range cases {
t.Run(name, func(t *testing.T) {
snap := cqCache.Snapshot()
snap := cqCache.Snapshot(ctx)
for _, name := range tc.remove {
snap.RemoveWorkload(wlInfos[name])
}
Expand Down
64 changes: 64 additions & 0 deletions pkg/cache/tas_cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package cache

import (
"maps"
"sync"

"sigs.k8s.io/controller-runtime/pkg/client"

kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
)

type TASCache struct {
sync.RWMutex
client client.Client
flavors map[kueue.ResourceFlavorReference]*TASFlavorCache
}

func NewTASCache(client client.Client) TASCache {
return TASCache{
client: client,
flavors: make(map[kueue.ResourceFlavorReference]*TASFlavorCache),
}
}

func (t *TASCache) Get(name kueue.ResourceFlavorReference) *TASFlavorCache {
t.RLock()
defer t.RUnlock()
return t.flavors[name]
}

// Clone returns a shallow copy of the map
func (t *TASCache) Clone() map[kueue.ResourceFlavorReference]*TASFlavorCache {
t.RLock()
defer t.RUnlock()
return maps.Clone(t.flavors)
}

func (t *TASCache) Set(name kueue.ResourceFlavorReference, info *TASFlavorCache) {
t.Lock()
defer t.Unlock()
t.flavors[name] = info
}

func (t *TASCache) Delete(name kueue.ResourceFlavorReference) {
t.Lock()
defer t.Unlock()
delete(t.flavors, name)
}
Loading

0 comments on commit 0afb766

Please sign in to comment.