Skip to content

Commit ca45167

Browse files
committed
fix: add safety check for LRUCapacityPerServer
Signed-off-by: Kfir Toledo <kfir.toledo@ibm.com>
1 parent e7255c8 commit ca45167

File tree

4 files changed

+28
-22
lines changed

4 files changed

+28
-22
lines changed

pkg/epp/scheduling/framework/plugins/multi/prefix/indexer.go

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ package prefix
1818

1919
import (
2020
"context"
21-
"fmt"
2221
"sync"
2322
"time"
2423

@@ -44,24 +43,22 @@ func newIndexer(maxLRUSize int) *indexer {
4443
podToLRU: make(map[ServerID]*lru.Cache[BlockHash, struct{}]),
4544
maxLRUSize: maxLRUSize,
4645
}
46+
4747
go ix.ReportLRUSize(time.Second)
4848
return ix
4949
}
5050

5151
// Add adds a list of prefix hashes to the cache, tied to the server.
52-
func (i *indexer) Add(hashes []BlockHash, pod ServerID) error {
52+
func (i *indexer) Add(hashes []BlockHash, pod ServerID) {
5353
i.mu.Lock()
5454
// Check if the LRU pod exist
5555
lruForPod, exists := i.podToLRU[pod]
5656
if !exists {
57-
newLRU, err := lru.NewWithEvict[BlockHash, struct{}](i.maxLRUSize, i.makeEvictionFn(pod))
58-
if err != nil {
59-
i.mu.Unlock()
60-
return fmt.Errorf("failed to create LRU for pod %s: %w", pod, err)
61-
}
57+
newLRU, _ := lru.NewWithEvict[BlockHash, struct{}](i.maxLRUSize, i.makeEvictionFn(pod))
6258
i.podToLRU[pod] = newLRU
6359
lruForPod = newLRU
6460
}
61+
6562
i.mu.Unlock()
6663

6764
// Add to LRU (may evict)
@@ -79,8 +76,8 @@ func (i *indexer) Add(hashes []BlockHash, pod ServerID) error {
7976
pods[pod] = struct{}{}
8077
i.hashToPods[hash] = pods
8178
}
79+
8280
i.mu.Unlock()
83-
return nil
8481
}
8582

8683
// Get returns a set of servers that have the given prefix hash cached.

pkg/epp/scheduling/framework/plugins/multi/prefix/indexer_test.go

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,21 +27,18 @@ func TestIndexer_AddAndGet(t *testing.T) {
2727
hash1 := BlockHash(1)
2828
server := ServerID{Namespace: "default", Name: "server1"}
2929
// Add an entry to the cache
30-
err := i.Add([]BlockHash{hash1}, server)
31-
assert.NoError(t, err)
30+
i.Add([]BlockHash{hash1}, server)
3231

3332
// Retrieve the entry
3433
assert.Equal(t, 1, i.podToLRU[server].Len(), "Cache size should be 1 after adding an entry")
3534
servers := i.Get(hash1)
3635
assert.Contains(t, servers, server, "Cache should contain the added server")
3736

3837
// Add another entry to the cache, the cache size should be incremented to 2.
39-
err = i.Add([]BlockHash{BlockHash(2)}, server)
40-
assert.NoError(t, err)
38+
i.Add([]BlockHash{BlockHash(2)}, server)
4139
assert.Equal(t, 2, i.podToLRU[server].Len(), "Cache size should be 2 after adding an entry")
4240

4341
// Add another entry to the cache, which should evict the first one due to max size.
44-
err = i.Add([]BlockHash{BlockHash(3)}, server)
45-
assert.NoError(t, err)
42+
i.Add([]BlockHash{BlockHash(3)}, server)
4643
assert.Equal(t, 2, i.podToLRU[server].Len(), "Cache size should still be 2 after adding an entry")
4744
}

pkg/epp/scheduling/framework/plugins/multi/prefix/plugin.go

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ type podSet map[ServerID]struct{}
7272

7373
type Indexer interface {
7474
Get(hash BlockHash) podSet
75-
Add(hashes []BlockHash, server ServerID) error
75+
Add(hashes []BlockHash, server ServerID)
7676
}
7777

7878
// BlockHash is a hash of the block of request body.
@@ -115,9 +115,18 @@ var _ framework.PostCycle = &Plugin{}
115115

116116
// New initializes a new prefix Plugin and returns its pointer.
117117
func New(config Config) *Plugin {
118+
capacity := config.LRUCapacityPerServer
119+
if capacity <= 0 {
120+
capacity = DefaultLRUCapacityPerServer
121+
log.FromContext(context.TODO()).V(logutil.DEFAULT).Info(
122+
"LRUCapacityPerServer is not positive, using default value",
123+
"defaultCapacity", DefaultLRUCapacityPerServer,
124+
)
125+
}
126+
118127
m := &Plugin{
119128
Config: config,
120-
indexer: newIndexer(config.LRUCapacityPerServer),
129+
indexer: newIndexer(capacity),
121130
}
122131
return m
123132
}
@@ -165,11 +174,9 @@ func (m *Plugin) PostCycle(ctx context.Context, cycleState *types.CycleState, re
165174
log.FromContext(ctx).Error(err, "failed to read prefix plugin cycle state")
166175
return
167176
}
168-
err = m.indexer.Add(state.PrefixHashes, ServerID(targetPod.NamespacedName))
169-
if err != nil {
170-
log.FromContext(ctx).Error(err, "failed to add prefix hashes to indexer for target pod", "pod", targetPod.NamespacedName)
171-
return
172-
}
177+
178+
m.indexer.Add(state.PrefixHashes, ServerID(targetPod.NamespacedName))
179+
173180
total := len(state.PrefixHashes)
174181
matchLen := state.PrefixCacheServers[ServerID(targetPod.NamespacedName)]
175182
metrics.RecordPrefixCacheMatch(matchLen*m.HashBlockSize, total*m.HashBlockSize)

pkg/epp/scheduling/framework/plugins/multi/prefix/plugin_test.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,12 @@ func BenchmarkPrefixPluginStress(b *testing.B) {
154154

155155
plugin := New(config)
156156
types.NewCycleState()
157-
promptLen := []int{10, 20, 50, 100, 200, 500, 1000, 2000, 5000, 10000, 20000, 50000}
157+
var promptLen []int
158+
for i := 1; i <= 1024; i++ {
159+
promptLen = append(promptLen, i)
160+
}
161+
promptLen = append(promptLen, 2048, 4096, 8192, 10000, 20000, 50000)
162+
158163
for _, i := range promptLen {
159164
// Generate increasing-length random prompts
160165
prompt := randomPrompt(4 + i)

0 commit comments

Comments
 (0)