Skip to content

Commit

Permalink
review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
mimowo committed Oct 21, 2024
1 parent d71a544 commit d5ca699
Show file tree
Hide file tree
Showing 10 changed files with 158 additions and 153 deletions.
4 changes: 2 additions & 2 deletions pkg/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ func (c *Cache) updateClusterQueues() sets.Set[string] {
return cqs
}

func (c *Cache) GetActiveClusterQueues() sets.Set[string] {
func (c *Cache) ActiveClusterQueues() sets.Set[string] {
c.Lock()
defer c.Unlock()
cqs := sets.New[string]()
Expand All @@ -241,7 +241,7 @@ func (c *Cache) GetActiveClusterQueues() sets.Set[string] {
return cqs
}

func (c *Cache) GetTASCache() *TASCache {
func (c *Cache) TASCache() *TASCache {
return &c.tasCache
}

Expand Down
17 changes: 2 additions & 15 deletions pkg/cache/clusterqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,7 @@ type clusterQueue struct {
resourceNode ResourceNode
hierarchy.ClusterQueue[*cohort]

tasFlavors []kueue.ResourceFlavorReference
tasCache *TASCache
tasCache *TASCache
}

func (c *clusterQueue) GetName() string {
Expand Down Expand Up @@ -295,15 +294,6 @@ func (c *clusterQueue) inactiveReason() (string, string) {
func (c *clusterQueue) UpdateWithFlavors(flavors map[kueue.ResourceFlavorReference]*kueue.ResourceFlavor) {
c.updateLabelKeys(flavors)
c.updateQueueStatus()
if features.Enabled(features.TopologyAwareScheduling) {
for flavorName, rf := range flavors {
if rf.Spec.TopologyName != nil {
if !slices.Contains(c.tasFlavors, flavorName) {
c.tasFlavors = append(c.tasFlavors, flavorName)
}
}
}
}
}

func (c *clusterQueue) updateLabelKeys(flavors map[kueue.ResourceFlavorReference]*kueue.ResourceFlavor) {
Expand Down Expand Up @@ -474,10 +464,7 @@ func (c *clusterQueue) tasFlavorCache(flvName kueue.ResourceFlavorReference) *TA
if !features.Enabled(features.TopologyAwareScheduling) {
return nil
}
if c.tasFlavors == nil || c.tasCache == nil {
return nil
}
if !slices.Contains(c.tasFlavors, flvName) {
if c.tasCache == nil {
return nil
}
return c.tasCache.Get(flvName)
Expand Down
2 changes: 1 addition & 1 deletion pkg/cache/clusterqueue_snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ type ClusterQueueSnapshot struct {
ResourceNode ResourceNode
hierarchy.ClusterQueue[*CohortSnapshot]

TASFlavorSnapshots map[kueue.ResourceFlavorReference]*TASFlavorSnapshot
TASFlavors map[kueue.ResourceFlavorReference]*TASFlavorSnapshot
}

// RGByResource returns the ResourceGroup which contains capacity
Expand Down
20 changes: 10 additions & 10 deletions pkg/cache/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,12 @@ func (c *Cache) SnapshotWithCtx(ctx context.Context) Snapshot {
snap.UpdateCohortEdge(cohort.Name, cohort.Parent().Name)
}
}
tasSnapshotsMap := make(map[kueue.ResourceFlavorReference]*TASFlavorSnapshot)
if features.Enabled(features.TopologyAwareScheduling) {
for key, cache := range c.tasCache.MapClone() {
tasSnapshotsMap[key] = cache.snapshot(ctx)
}
}
for _, cq := range c.hm.ClusterQueues {
if !cq.Active() || (cq.HasParent() && c.hm.CycleChecker.HasCycle(cq.Parent())) {
snap.InactiveClusterQueueSets.Insert(cq.Name)
Expand All @@ -111,15 +117,9 @@ func (c *Cache) SnapshotWithCtx(ctx context.Context) Snapshot {
snap.UpdateClusterQueueEdge(cq.Name, cq.Parent().Name)
}
if features.Enabled(features.TopologyAwareScheduling) {
tasSnapshotsMap := make(map[kueue.ResourceFlavorReference]*TASFlavorSnapshot)
for _, tasFlv := range c.tasCache.GetKeys() {
if tasCacheRF := c.tasCache.Get(tasFlv); tasCacheRF != nil {
tasSnapshotsMap[tasFlv] = tasCacheRF.snapshot(ctx)
}
}
for _, tasFlv := range cq.tasFlavors {
if s := tasSnapshotsMap[tasFlv]; s != nil {
cqSnapshot.TASFlavorSnapshots[tasFlv] = s
for tasFlv, s := range tasSnapshotsMap {
if cq.flavorInUse(string(tasFlv)) {
cqSnapshot.TASFlavors[tasFlv] = s
}
}
}
Expand All @@ -146,7 +146,7 @@ func snapshotClusterQueue(c *clusterQueue) *ClusterQueueSnapshot {
Status: c.Status,
AdmissionChecks: utilmaps.DeepCopySets[kueue.ResourceFlavorReference](c.AdmissionChecks),
ResourceNode: c.resourceNode.Clone(),
TASFlavorSnapshots: make(map[kueue.ResourceFlavorReference]*TASFlavorSnapshot),
TASFlavors: make(map[kueue.ResourceFlavorReference]*TASFlavorSnapshot),
}
for i, rg := range c.ResourceGroups {
cc.ResourceGroups[i] = rg.Clone()
Expand Down
19 changes: 9 additions & 10 deletions pkg/cache/tas_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,20 +25,19 @@ import (

kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
"sigs.k8s.io/kueue/pkg/resources"
utilmaps "sigs.k8s.io/kueue/pkg/util/maps"
utiltas "sigs.k8s.io/kueue/pkg/util/tas"
)

type TASCache struct {
sync.RWMutex
client client.Client
Map map[kueue.ResourceFlavorReference]*TASFlavorCache
client client.Client
flvsMap map[kueue.ResourceFlavorReference]*TASFlavorCache
}

func NewTASCache(client client.Client) TASCache {
return TASCache{
client: client,
Map: make(map[kueue.ResourceFlavorReference]*TASFlavorCache),
client: client,
flvsMap: make(map[kueue.ResourceFlavorReference]*TASFlavorCache),
}
}

Expand All @@ -54,23 +53,23 @@ func (t *TASCache) NewFlavorCache(labels []string, nodeLabels map[string]string)
func (t *TASCache) Get(name kueue.ResourceFlavorReference) *TASFlavorCache {
t.RLock()
defer t.RUnlock()
return t.Map[name]
return t.flvsMap[name]
}

func (t *TASCache) GetKeys() []kueue.ResourceFlavorReference {
func (t *TASCache) MapClone() map[kueue.ResourceFlavorReference]*TASFlavorCache {
t.RLock()
defer t.RUnlock()
return utilmaps.Keys(t.Map)
return maps.Clone(t.flvsMap)
}

func (t *TASCache) Set(name kueue.ResourceFlavorReference, info *TASFlavorCache) {
t.Lock()
defer t.Unlock()
t.Map[name] = info
t.flvsMap[name] = info
}

func (t *TASCache) Delete(name kueue.ResourceFlavorReference) {
t.Lock()
defer t.Unlock()
delete(t.Map, name)
delete(t.flvsMap, name)
}
12 changes: 9 additions & 3 deletions pkg/cache/tas_flavor.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,16 @@ import (
type TASFlavorCache struct {
sync.RWMutex

client client.Client
client client.Client

// nodeLabels is a map of nodeLabels defined in the ResourceFlavor object.
NodeLabels map[string]string
Levels []string
usageMap map[utiltas.TopologyDomainID]resources.Requests
// levels is a list of levels defined in the Topology object referenced
// by the flavor corresponding to the cache.
Levels []string

// usageMap maintains the usage per topology domain
usageMap map[utiltas.TopologyDomainID]resources.Requests
}

func (c *TASFlavorCache) snapshot(ctx context.Context) *TASFlavorSnapshot {
Expand Down
Loading

0 comments on commit d5ca699

Please sign in to comment.