Skip to content

Prefix aware scorer initialization #143

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Jun 18, 2025
9 changes: 7 additions & 2 deletions docs/architecture.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,8 @@ These components are maintained in the `llm-d-inference-scheduler` repository an
| Scorer | Description | Env Vars |
|------------------|--------------------------------------------|----------|
| Session-aware | Prefers pods from same session | `ENABLE_SESSION_AWARE_SCORER`, `SESSION_AWARE_SCORER_WEIGHT`, `PREFILL_ENABLE_SESSION_AWARE_SCORER`, `PREFILL_SESSION_AWARE_SCORER_WEIGHT` |
| Prefix-aware | Scores based on prompt prefix history;<br>lightweight but may not reflect actual KV-cache state | `ENABLE_PREFIX_AWARE_SCORER`, `PREFIX_AWARE_SCORER_WEIGHT`, `PREFILL_ENABLE_PREFIX_AWARE_SCORER`, `PREFILL_PREFIX_AWARE_SCORER_WEIGHT`, `PREFIX_SCORER_BLOCK_SIZE`|
| KVCache-aware | Scores based on real KV-cache state on vLLM;<br>more accurate but requires extra computation and cycles to track the current cache state | `ENABLE_KVCACHE_AWARE_SCORER`, `KVCACHE_INDEXER_REDIS_ADDR`, `PREFILL_ENABLE_KVCACHE_AWARE_SCORER`, `PREFILL_KVCACHE_INDEXER_REDIS_ADDR`, `HF_TOKEN`, `KVCACHE_INDEXER_REDIS_ADDR` |
| Prefix-aware | Scores based on prompt prefix history;<br>lightweight but may not reflect actual KV-cache state | `ENABLE_PREFIX_AWARE_SCORER`, `PREFIX_AWARE_SCORER_WEIGHT`, `PREFILL_ENABLE_PREFIX_AWARE_SCORER`, `PREFILL_PREFIX_AWARE_SCORER_WEIGHT`, `PREFIX_SCORER_CACHE_CAPACITY`, `PREFIX_SCORER_CACHE_BLOCK_SIZE`|
| KVCache-aware | Scores based on real KV-cache state on vLLM;<br>more accurate but requires extra computation and cycles to track the current cache state | `ENABLE_KVCACHE_AWARE_SCORER`, `KVCACHE_INDEXER_REDIS_ADDR`, `PREFILL_ENABLE_KVCACHE_AWARE_SCORER`, `PREFILL_KVCACHE_INDEXER_REDIS_ADDR`, `HF_TOKEN`, `KVCACHE_INDEXER_REDIS_ADDR` |
| Load-aware | Avoids busy pods | `ENABLE_LOAD_AWARE_SCORER`, `LOAD_AWARE_SCORER_WEIGHT`, `PREFILL_ENABLE_LOAD_AWARE_SCORER`, `PREFILL_LOAD_AWARE_SCORER_WEIGHT` |

### Prefill / Decode Configuration
Expand All @@ -92,6 +92,11 @@ In case Disaggrigated Prefill is enabled, you should also define the following e
- Toggle P/D mode: `PD_ENABLED=true`
- Threshold: `PD_PROMPT_LEN_THRESHOLD=<value>`

### Prefix Aware Scorer Configuration

- `PREFIX_SCORER_CACHE_CAPACITY` - the cache capacity sets the maximum number of blocks the LRU cache can store. A block maps from a chunk of a prompt to a set of pods that are estimated to have the prefix of the prompt that ends at the keyed chunk.
- `PREFIX_SCORER_CACHE_BLOCK_SIZE` - the cache block size defines the length of the prompt chunk that a block is keyed by.

#### Prefill Scorers:
```bash
export PREFILL_ENABLE_SESSION_AWARE_SCORER=true
Expand Down
15 changes: 11 additions & 4 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,13 @@ const (
pdPromptLenThresholdEnvKey = "PD_PROMPT_LEN_THRESHOLD"
pdPromptLenThresholdDefault = 100

prefixScorerBlockSizeEnvKey = "PREFIX_SCORER_BLOCK_SIZE"
prefixScorerBlockSizeDefault = 256
prefixCacheCapacityEnvKey = "PREFIX_SCORER_CACHE_CAPACITY"
// DefaultPrefixCacheCapacity defines the default value for maximum number of blocks the LRU cache can store.
DefaultPrefixCacheCapacity = 500000

prefixScorerCacheBlockSizeEnvKey = "PREFIX_SCORER_CACHE_BLOCK_SIZE"
// DefaultPrefixCacheBlockSize defines the default value of how many runes each block contains in the prefix cache.
DefaultPrefixCacheBlockSize = 256
)

// Config contains scheduler configuration, currently configuration is loaded from environment variables
Expand All @@ -60,7 +65,8 @@ type Config struct {
PrefillSchedulerPlugins map[string]int
PDEnabled bool
PDThreshold int
PrefixBlockSize int
PrefixCacheBlockSize int
PrefixCacheCapacity int
}

// LoadConfig loads configuration from environment variables and returns a new instance of Config
Expand All @@ -77,7 +83,8 @@ func LoadConfig(logger logr.Logger) *Config {
PrefillSchedulerPlugins: loadPluginInfo(logger, true, pluginNames),
PDEnabled: env.GetEnvString(pdEnabledEnvKey, "false", logger) == "true",
PDThreshold: env.GetEnvInt(pdPromptLenThresholdEnvKey, pdPromptLenThresholdDefault, logger),
PrefixBlockSize: env.GetEnvInt(prefixScorerBlockSizeEnvKey, prefixScorerBlockSizeDefault, logger),
PrefixCacheBlockSize: env.GetEnvInt(prefixScorerCacheBlockSizeEnvKey, DefaultPrefixCacheBlockSize, logger),
PrefixCacheCapacity: env.GetEnvInt(prefixCacheCapacityEnvKey, DefaultPrefixCacheCapacity, logger),
}
}

Expand Down
3 changes: 2 additions & 1 deletion pkg/scheduling/pd/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ type Datastore interface {
// provided configuration.
func NewScheduler(ctx context.Context, schedulerConfig *config.Config, ds Datastore) (*Scheduler, error) {
prefixConfig := scorer.DefaultPrefixStoreConfig()
prefixConfig.BlockSize = schedulerConfig.PrefixBlockSize
prefixConfig.CacheBlockSize = schedulerConfig.PrefixCacheBlockSize
prefixConfig.CacheCapacity = schedulerConfig.PrefixCacheCapacity

scheduler := &Scheduler{
threshold: schedulerConfig.PDThreshold,
Expand Down
3 changes: 2 additions & 1 deletion pkg/scheduling/pd/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,8 @@ func TestPDSchedule(t *testing.T) {
PrefillSchedulerPlugins: map[string]int{},
PDEnabled: true,
PDThreshold: 5,
PrefixBlockSize: 256,
PrefixCacheBlockSize: 256,
PrefixCacheCapacity: 50000,
}

for _, test := range tests {
Expand Down
2 changes: 1 addition & 1 deletion pkg/scheduling/plugins/scorer/prefix_aware.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func (s *PrefixAwareScorer) GetCachedPercentage(pod, prompt string) float64 {
}

intVal, _ := rawVal.(int)
return float64(intVal*s.prefixStore.blockSize) / float64(len(prompt))
return float64(intVal*s.prefixStore.cacheBlockSize) / float64(len(prompt))
}

// cleanup Cleans up hits map
Expand Down
6 changes: 3 additions & 3 deletions pkg/scheduling/plugins/scorer/prefix_aware_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func TestPrefixAwareScorer(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
// Reset prefix store for each test
config := scorer.DefaultPrefixStoreConfig()
config.BlockSize = 5 // set small chunking for testing
config.CacheBlockSize = 5 // set small chunking for testing

s := scorer.NewPrefixAwareScorer(ctx, config)

Expand Down Expand Up @@ -157,13 +157,13 @@ func TestPrefixAwareScorerProfiling(t *testing.T) {

name2Pod := createPods(nPodsTotal)
config := scorer.DefaultPrefixStoreConfig()
text := generateNonRepeatingText(config.BlockSize * nPodsInStore)
text := generateNonRepeatingText(config.CacheBlockSize * nPodsInStore)
t.Run(testName, func(t *testing.T) {
start := time.Now() // record start time
config := scorer.DefaultPrefixStoreConfig()
s := scorer.NewPrefixAwareScorer(ctx, config)
for i := range nPodsInStore {
prompt := text[0 : (i+1)*config.BlockSize-1]
prompt := text[0 : (i+1)*config.CacheBlockSize-1]
err := s.GetPrefixStore().AddEntry(modelName, prompt, &name2Pod["pod"+strconv.Itoa(i)].NamespacedName)
if err != nil {
t.Errorf("Failed to add entry to prefix store: %v", err)
Expand Down
58 changes: 29 additions & 29 deletions pkg/scheduling/plugins/scorer/prefix_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,34 +10,34 @@ import (

"github.com/cespare/xxhash/v2"
lru "github.com/hashicorp/golang-lru/v2"

"github.com/llm-d/llm-d-inference-scheduler/pkg/config"
)

const (
// defaultMaxCacheSize sets the maximum number of blocks the LRU cache can store.
defaultMaxCacheSize = 500000
// defaultBlockSize defines how many runes each block contains in the prefix cache.
defaultBlockSize = 256
// defaultMaxBlockCacheSize sets the maximum number of pods a block can store.
defaultMaxBlockCacheSize = 100
// defaultMaxBlockPods defined the default maximum number of pods a block can store. Currently this value cannot be changed by configuration
defaultMaxBlockPods = 100
)

// PrefixStoreConfig contains initialization configuration for PrefixStore.
type PrefixStoreConfig struct {
// CacheSize sets the maximum number of blocks the LRU cache can store.
CacheSize int
// BlockSize defines how many runes each block contains in the prefix cache.
BlockSize int
// BlockCacheSize sets the maximum number of pods a block can store.
BlockCacheSize int
// CacheCapacity sets the maximum number of blocks the LRU cache can store.
// A block maps from a chunk of a prompt to a set of pods that are estimated to have
// the prefix of the prompt that ends at the keyed chunk.
CacheCapacity int
// CacheBlockSize defines the length of the prompt chunk that a block is keyed by.
CacheBlockSize int
// MaxBlockPods sets the maximum number of pods a block can store.
MaxBlockPods int
}

// DefaultPrefixStoreConfig returns an PrefixStoreConfig instance with default
// configuration.
func DefaultPrefixStoreConfig() *PrefixStoreConfig {
return &PrefixStoreConfig{
CacheSize: defaultMaxCacheSize,
BlockSize: defaultBlockSize,
BlockCacheSize: defaultMaxBlockCacheSize,
CacheCapacity: config.DefaultPrefixCacheCapacity,
CacheBlockSize: config.DefaultPrefixCacheBlockSize,
MaxBlockPods: defaultMaxBlockPods,
}
}

Expand All @@ -51,9 +51,9 @@ type block struct {
type PrefixStore struct {
sync.RWMutex

cacheSize int
blockSize int
blockCacheSize int
cacheCapacity int
cacheBlockSize int
maxBlockPods int

store map[string]*lru.Cache[uint64, *block]
}
Expand All @@ -66,16 +66,16 @@ func NewPrefixStore(config *PrefixStoreConfig) *PrefixStore {
}

return &PrefixStore{
cacheSize: config.CacheSize,
blockSize: config.BlockSize,
blockCacheSize: config.BlockCacheSize,
cacheCapacity: config.CacheCapacity,
cacheBlockSize: config.CacheBlockSize,
maxBlockPods: config.MaxBlockPods,
store: make(map[string]*lru.Cache[uint64, *block]),
}
}

// AddEntry adds a new entry to the prefix store.
func (s *PrefixStore) AddEntry(modelName string, prompt string, pod *types.NamespacedName) error {
if prompt == "" || pod == nil || len(prompt) < s.blockSize /* skip if prompt is too short */ {
if prompt == "" || pod == nil || len(prompt) < s.cacheBlockSize /* skip if prompt is too short */ {
return nil
}

Expand All @@ -84,7 +84,7 @@ func (s *PrefixStore) AddEntry(modelName string, prompt string, pod *types.Names
cache, ok := s.store[modelName]
if !ok {
var err error
cache, err = lru.New[uint64, *block](s.cacheSize)
cache, err = lru.New[uint64, *block](s.cacheCapacity)
if err != nil {
return fmt.Errorf("failed to create LRU cache for model %s: %w", modelName, err)
}
Expand All @@ -98,8 +98,8 @@ func (s *PrefixStore) AddEntry(modelName string, prompt string, pod *types.Names
digest := xxhash.New()

// Chunk the text into blocks and populate the cache
for start := 0; start < len(promptBytes); start += s.blockSize {
end := start + s.blockSize
for start := 0; start < len(promptBytes); start += s.cacheBlockSize {
end := start + s.cacheBlockSize
if end > len(promptBytes) {
break // skip partial blocks
}
Expand All @@ -118,7 +118,7 @@ func (s *PrefixStore) AddEntry(modelName string, prompt string, pod *types.Names

b, ok := cache.Get(blockHash)
if !ok {
pods, err := lru.New[types.NamespacedName, time.Time](s.blockCacheSize)
pods, err := lru.New[types.NamespacedName, time.Time](s.maxBlockPods)
if err != nil {
return fmt.Errorf("failed to create LRU cache for block: %w", err)
}
Expand All @@ -136,7 +136,7 @@ func (s *PrefixStore) AddEntry(modelName string, prompt string, pod *types.Names
// FindMatchingPods finds all pods that match the given prompt and model name.
// It returns a map of pods and the number of blocks they match.
func (s *PrefixStore) FindMatchingPods(prompt, modelName string) map[string]int {
if prompt == "" || modelName == "" || len(prompt) < s.blockSize /* skip if prompt is too short */ {
if prompt == "" || modelName == "" || len(prompt) < s.cacheBlockSize /* skip if prompt is too short */ {
return nil
}

Expand All @@ -153,8 +153,8 @@ func (s *PrefixStore) FindMatchingPods(prompt, modelName string) map[string]int
digest := xxhash.New()

matchedPods := make(map[string]int)
for start := 0; start < len(promptBytes); start += s.blockSize {
end := start + s.blockSize
for start := 0; start < len(promptBytes); start += s.cacheBlockSize {
end := start + s.cacheBlockSize
if end > len(promptBytes) {
break // skip partial blocks
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/scheduling/plugins/scorer/prefix_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ func TestBasicPrefixOperations(t *testing.T) {
_ = log.IntoContext(ctx, logr.New(log.NullLogSink{}))

config := scorer.DefaultPrefixStoreConfig()
config.BlockSize = 5 // set small chunking for testing
config.CacheBlockSize = 5 // set small chunking for testing
store := scorer.NewPrefixStore(config)

podName := k8stypes.NamespacedName{
Expand Down
Loading