Skip to content

Commit 74b1f53

Browse files
committed
fix: Fix typos and error handle
Signed-off-by: Kfir Toledo <kfir.toledo@ibm.com>
1 parent 0192528 commit 74b1f53

File tree

3 files changed

+31
-33
lines changed

3 files changed

+31
-33
lines changed

pkg/epp/scheduling/framework/plugins/multi/prefix/indexer.go

Lines changed: 16 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -32,34 +32,37 @@ import (
3232
// prefix cached.
3333
type indexer struct {
3434
mu sync.RWMutex
35-
hashToPods map[BlockHash]podSet // the lookup data structure to find pods that have the BlockHash cached
36-
podToLRU map[string]*lru.Cache[BlockHash, struct{}] // key is pod namespacedName, value is an LRU cache
35+
hashToPods map[BlockHash]podSet // the lookup data structure to find pods that have the BlockHash cached
36+
podToLRU map[ServerID]*lru.Cache[BlockHash, struct{}] // key is pod namespacedName, value is an LRU cache
3737
maxLRUSize int
3838
}
3939

4040
// newIndexer initializes an indexer with size limits and starts cache size reporting.
4141
func newIndexer(maxLRUSize int) *indexer {
4242
ix := &indexer{
4343
hashToPods: make(map[BlockHash]podSet),
44-
podToLRU: make(map[string]*lru.Cache[BlockHash, struct{}]),
44+
podToLRU: make(map[ServerID]*lru.Cache[BlockHash, struct{}]),
4545
maxLRUSize: maxLRUSize,
4646
}
4747
go ix.ReportLRUSize(time.Second)
4848
return ix
4949
}
5050

5151
// Add adds a list of prefix hashes to the cache, tied to the server.
52-
func (i *indexer) Add(hashes []BlockHash, pod ServerID) {
52+
func (i *indexer) Add(hashes []BlockHash, pod ServerID) error {
5353
if pod.Name == "" {
54-
return
54+
return fmt.Errorf("pod name is empty")
5555
}
5656
i.mu.Lock()
5757
// Check if the LRU pod exist
58-
podName := pod.String()
59-
lruForPod, exists := i.podToLRU[podName]
58+
lruForPod, exists := i.podToLRU[pod]
6059
if !exists {
61-
newLRU, _ := lru.NewWithEvict[BlockHash, struct{}](i.maxLRUSize, i.makeEvictionFn(pod))
62-
i.podToLRU[podName] = newLRU
60+
newLRU, err := lru.NewWithEvict[BlockHash, struct{}](i.maxLRUSize, i.makeEvictionFn(pod))
61+
if err != nil {
62+
i.mu.Unlock()
63+
return fmt.Errorf("failed to create LRU for pod %s: %w", pod, err)
64+
}
65+
i.podToLRU[pod] = newLRU
6366
lruForPod = newLRU
6467
}
6568
i.mu.Unlock()
@@ -80,7 +83,7 @@ func (i *indexer) Add(hashes []BlockHash, pod ServerID) {
8083
i.hashToPods[hash] = pods
8184
}
8285
i.mu.Unlock()
83-
86+
return nil
8487
}
8588

8689
// Get returns a set of servers that have the given prefix hash cached.
@@ -100,21 +103,15 @@ func (i *indexer) Get(hash BlockHash) podSet {
100103
// makeEvictionFn returns a per-pod LRU eviction callback that removes the pod from hashToPods on eviction.
101104
func (i *indexer) makeEvictionFn(pod ServerID) func(BlockHash, struct{}) {
102105
return func(hash BlockHash, _ struct{}) {
103-
fmt.Printf("Evicted hash %v from pod %s\n", hash, pod)
104-
105106
i.mu.Lock()
106107
defer i.mu.Unlock()
107-
print("enter eviction")
108108
// Remove the pod from the hash→pods map
109109
if podSet, ok := i.hashToPods[hash]; ok {
110110
delete(podSet, pod)
111111
if len(podSet) == 0 {
112112
delete(i.hashToPods, hash)
113-
} else {
114-
i.hashToPods[hash] = podSet
115113
}
116114
}
117-
print("After eviction")
118115
}
119116
}
120117

@@ -126,14 +123,14 @@ func (i *indexer) ReportLRUSize(interval time.Duration) {
126123
i.mu.RLock()
127124
totalEntries := 0
128125
maxPodEntries := 0
129-
maxPodName := ""
126+
maxPodName := ServerID{}
130127

131-
for podName, lruCache := range i.podToLRU {
128+
for pod, lruCache := range i.podToLRU {
132129
size := lruCache.Len()
133130
totalEntries += size
134131
if size > maxPodEntries {
135132
maxPodEntries = size
136-
maxPodName = podName
133+
maxPodName = pod
137134
}
138135
}
139136

pkg/epp/scheduling/framework/plugins/multi/prefix/indexer_test.go

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,21 +26,18 @@ func TestIndexer_AddAndGet(t *testing.T) {
2626

2727
hash1 := BlockHash(1)
2828
server := ServerID{Namespace: "default", Name: "server1"}
29-
serverName := server.String()
3029
// Add an entry to the cache
3130
i.Add([]BlockHash{hash1}, server)
3231
// Retrieve the entry
33-
assert.Equal(t, 1, i.podToLRU[serverName].Len(), "Cache size should be 1 after adding an entry")
32+
assert.Equal(t, 1, i.podToLRU[server].Len(), "Cache size should be 1 after adding an entry")
3433
servers := i.Get(hash1)
3534
assert.Contains(t, servers, server, "Cache should contain the added server")
3635

3736
// Add another entry to the cache, the cache size should be incremented to 2.
3837
i.Add([]BlockHash{BlockHash(2)}, server)
39-
assert.Equal(t, 2, i.podToLRU[serverName].Len(), "Cache size should be 2 after adding an entry")
38+
assert.Equal(t, 2, i.podToLRU[server].Len(), "Cache size should be 2 after adding an entry")
4039

4140
// Add another entry to the cache, which should evict the first one due to max size.
42-
print("before Add")
4341
i.Add([]BlockHash{BlockHash(3)}, server)
44-
print("after ADD")
45-
assert.Equal(t, 2, i.podToLRU[serverName].Len(), "Cache size should still be 2 after adding an entry")
42+
assert.Equal(t, 2, i.podToLRU[server].Len(), "Cache size should still be 2 after adding an entry")
4643
}

pkg/epp/scheduling/framework/plugins/multi/prefix/plugin.go

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -44,11 +44,11 @@ const (
4444
// A small capacity ensures a high accuracy of cache hit on the model server, but it will
4545
// increase the chance of false negatives. A high capacity does the opposite.
4646
// To properly size this, consider the sum of the total number of cache entries on all model
47-
// servers. Consider the llama3 8B model on 8 H100 80GB GPUs. The size of the model weight is
48-
// about 16GB. Assume 50% of the remaining HBM is used for caching prefixes, we have 32GB. Each
49-
// token is about 128KB in size, so we can cache 250K tokens. Using the default block size of 16
50-
// in vLLM, we will have 250K / 16 = 15.6K blocks.
51-
DefaultLRUCapacityPerServer = 15000
47+
// servers. Consider the llama3 8B model on a H100 80GB GPUs. The size of the model weight is
48+
// about 16GB. The remaining HBM used for caching prefixes is 64GB. Each
49+
// token is about 128KB in size, so we can cache 500K tokens. Using the default block size of 16
50+
// in vLLM, we will have 250K / 16 = 31.25K blocks.
51+
DefaultLRUCapacityPerServer = 31250
5252
)
5353

5454
type Config struct {
@@ -58,7 +58,7 @@ type Config struct {
5858
// MaxPrefixBlocksToMatch is the maximum number of prefix blocks to match. Input beyond this limit will
5959
// be ignored.
6060
MaxPrefixBlocksToMatch int
61-
// Max (approximate) size of the LRU indexer in number of entries per server (pod).
61+
// Max capacity size of the LRU indexer in number of entries per server (pod).
6262
LRUCapacityPerServer int
6363
}
6464

@@ -72,7 +72,7 @@ type podSet map[ServerID]struct{}
7272

7373
type Indexer interface {
7474
Get(hash BlockHash) podSet
75-
Add(hashes []BlockHash, server ServerID)
75+
Add(hashes []BlockHash, server ServerID) error
7676
}
7777

7878
// BlockHash is a hash of the block of request body.
@@ -165,7 +165,11 @@ func (m *Plugin) PostCycle(ctx context.Context, cycleState *types.CycleState, re
165165
log.FromContext(ctx).Error(err, "failed to read prefix plugin cycle state")
166166
return
167167
}
168-
m.indexer.Add(state.PrefixHashes, ServerID(targetPod.NamespacedName))
168+
err = m.indexer.Add(state.PrefixHashes, ServerID(targetPod.NamespacedName))
169+
if err != nil {
170+
log.FromContext(ctx).Error(err, "failed to add prefix hashes to indexer for target pod ", targetPod.NamespacedName)
171+
return
172+
}
169173
total := len(state.PrefixHashes)
170174
matchLen := state.PrefixCacheServers[ServerID(targetPod.NamespacedName)]
171175
metrics.RecordPrefixCacheMatch(matchLen*m.HashBlockSize, total*m.HashBlockSize)

0 commit comments

Comments
 (0)