Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/epp/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (

func main() {
// Register all known plugin factories
runner.RegisterAllPlgugins()
runner.RegisterAllPlugins()

if err := runner.NewRunner().Run(ctrl.SetupSignalHandler()); err != nil {
os.Exit(1)
Expand Down
4 changes: 2 additions & 2 deletions cmd/epp/runner/register.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ import (
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/scorer"
)

// RegisterAllPlgugins registers the factory functions of all known plugins
func RegisterAllPlgugins() {
// RegisterAllPlugins registers the factory functions of all known plugins
func RegisterAllPlugins() {
plugins.Register(filter.LeastKVCacheFilterName, filter.LeastKVCacheFilterFactory)
plugins.Register(filter.LeastQueueFilterName, filter.LeastQueueFilterFactory)
plugins.Register(filter.LoraAffinityFilterName, filter.LoraAffinityFilterFactory)
Expand Down
20 changes: 8 additions & 12 deletions cmd/epp/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,13 @@ import (
"sigs.k8s.io/controller-runtime/pkg/metrics/filters"
metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"

"sigs.k8s.io/gateway-api-inference-extension/api/config/v1alpha1"
conformance_epp "sigs.k8s.io/gateway-api-inference-extension/conformance/testing-epp"
"sigs.k8s.io/gateway-api-inference-extension/internal/runnable"
backendmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/common/config"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datastore"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics/collectors"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/plugins"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/requestcontrol"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/saturationdetector"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling"
Expand Down Expand Up @@ -217,28 +215,30 @@ func (r *Runner) Run(ctx context.Context) error {
return err
}

var theConfig *v1alpha1.EndpointPickerConfig
var instantiatedPlugins map[string]plugins.Plugin

if len(*configText) != 0 || len(*configFile) != 0 {
theConfig, err = config.LoadConfig([]byte(*configText), *configFile)
theConfig, err := config.LoadConfig([]byte(*configText), *configFile)
if err != nil {
setupLog.Error(err, "Failed to load the configuration")
return err
}

epp := eppHandle{}
instantiatedPlugins, err = config.LoadPluginReferences(theConfig.Plugins, epp)
instantiatedPlugins, err := config.LoadPluginReferences(theConfig.Plugins, epp)
if err != nil {
setupLog.Error(err, "Failed to instantiate the plugins")
return err
}

r.schedulerConfig, err = scheduling.LoadSchedulerConfig(theConfig.SchedulingProfiles, instantiatedPlugins, setupLog)
r.schedulerConfig, err = scheduling.LoadSchedulerConfig(theConfig.SchedulingProfiles, instantiatedPlugins)
if err != nil {
setupLog.Error(err, "Failed to create Scheduler configuration")
return err
}

// Add requestcontrol plugins
if instantiatedPlugins != nil {
r.requestControlConfig = requestcontrol.LoadRequestControlConfig(instantiatedPlugins)
}
}

// --- Initialize Core EPP Components ---
Expand All @@ -250,10 +250,6 @@ func (r *Runner) Run(ctx context.Context) error {

saturationDetector := saturationdetector.NewDetector(sdConfig, datastore, ctrl.Log)

// Add requestControl plugins
if instantiatedPlugins != nil {
r.requestControlConfig.AddPlugins(instantiatedPlugins)
}
director := requestcontrol.NewDirectorWithConfig(datastore, scheduler, saturationDetector, r.requestControlConfig)

// --- Setup ExtProc Server Runner ---
Expand Down
35 changes: 0 additions & 35 deletions pkg/epp/registry/registry.go

This file was deleted.

24 changes: 19 additions & 5 deletions pkg/epp/requestcontrol/request_control_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ limitations under the License.

package requestcontrol

import "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/plugins"
import (
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/plugins"
)

// NewConfig creates a new Config object and returns its pointer.
func NewConfig() *Config {
Expand Down Expand Up @@ -46,10 +48,22 @@ func (c *Config) WithPostResponsePlugins(plugins ...PostResponse) *Config {
return c
}

func (c *Config) AddPlugins(instances map[string]plugins.Plugin) {
for _, plugin := range instances {
if postResponse, ok := plugin.(PostResponse); ok {
c.postResponsePlugins = append(c.postResponsePlugins, postResponse)
func (c *Config) AddPlugins(pluginObjects ...plugins.Plugin) {
for _, plugin := range pluginObjects {
if preRequestPlugin, ok := plugin.(PreRequest); ok {
c.preRequestPlugins = append(c.preRequestPlugins, preRequestPlugin)
}
if postResponsePlugin, ok := plugin.(PostResponse); ok {
c.postResponsePlugins = append(c.postResponsePlugins, postResponsePlugin)
}
}
}

func LoadRequestControlConfig(instantiatedPlugins map[string]plugins.Plugin) *Config {
config := NewConfig()
for _, plugin := range instantiatedPlugins {
config.AddPlugins(plugin)
}

return config
}
2 changes: 1 addition & 1 deletion pkg/epp/scheduling/framework/plugins/filter/filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ func TestLoRASoftAffinityDistribution(t *testing.T) {
expectedAvailabilityPercent := 100 - expectedAffinityPercent

// initialize LoraAffinityFilter
LoraAffinityFilter := NewLoraAffinityFilter()
LoraAffinityFilter := NewLoraAffinityFilter(config.Conf.LoraAffinityThreshold)

for range numIterations {
result := LoraAffinityFilter.Filter(context.Background(), req, types.NewCycleState(), pods)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,14 @@ import (
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/types"
)

const LeastKVCacheFilterName = "least-KV-cache"
const (
LeastKVCacheFilterName = "least-KV-cache"
)

// compile-time type validation
var _ framework.Filter = &LeastKVCacheFilter{}

// LeastKVCacheFilterFactory is the plugin factory function for the Least KV Cache filter
// LeastKVCacheFilterFactory defines the factory function for LeastKVCacheFilter.
func LeastKVCacheFilterFactory(name string, _ json.RawMessage, _ plugins.Handle) (plugins.Plugin, error) {
return NewLeastKVCacheFilter(), nil
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,14 @@ import (
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/types"
)

const LeastQueueFilterName = "least-queue"
const (
LeastQueueFilterName = "least-queue"
)

// compile-time type validation
var _ framework.Filter = &LeastQueueFilter{}

// LeastQueueFilterFactory is the plugin factory function for the Least Queue filter
// LeastQueueFilterFactory defines the factory function for LeastQueueFilter.
func LeastQueueFilterFactory(name string, _ json.RawMessage, _ plugins.Handle) (plugins.Plugin, error) {
return NewLeastQueueFilter(), nil
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ import (
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/types"
)

const LoraAffinityFilterName = "lora-affinity"
const (
LoraAffinityFilterName = "lora-affinity"
)

type loraAffinityFilterParameters struct {
Threshold float64 `json:"threshold"`
Expand All @@ -38,19 +40,19 @@ type loraAffinityFilterParameters struct {
// compile-time type validation
var _ framework.Filter = &LoraAffinityFilter{}

// LoraAffinityFilterFactory is the factory function for the LoraAffinity filter
// LoraAffinityFilterFactory defines the factory function for LoraAffinityFilter.
func LoraAffinityFilterFactory(name string, rawParameters json.RawMessage, _ plugins.Handle) (plugins.Plugin, error) {
parameters := loraAffinityFilterParameters{Threshold: config.DefaultLoraAffinityThreshold}
if err := json.Unmarshal(rawParameters, &parameters); err != nil {
return nil, fmt.Errorf("failed to parse the parameters of the %s filter. Error: %s", LoraAffinityFilterName, err)
return nil, fmt.Errorf("failed to parse the parameters of the '%s' filter - %w", LoraAffinityFilterName, err)
}
return &LoraAffinityFilter{loraAffinityThreshold: parameters.Threshold}, nil
return NewLoraAffinityFilter(parameters.Threshold), nil
}

// NewLoraAffinityFilter initializes a new LoraAffinityFilter and returns its pointer.
func NewLoraAffinityFilter() *LoraAffinityFilter {
func NewLoraAffinityFilter(threshold float64) *LoraAffinityFilter {
return &LoraAffinityFilter{
loraAffinityThreshold: config.Conf.LoraAffinityThreshold,
loraAffinityThreshold: threshold,
}
}

Expand Down
14 changes: 8 additions & 6 deletions pkg/epp/scheduling/framework/plugins/filter/low_queue_filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ import (
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/types"
)

const LowQueueFilterName = "low-queue"
const (
LowQueueFilterName = "low-queue"
)

type lowQueueFilterParameters struct {
Threshold int `json:"threshold"`
Expand All @@ -37,20 +39,20 @@ type lowQueueFilterParameters struct {
// compile-time type validation
var _ framework.Filter = &LowQueueFilter{}

// LowQueueFilterFactory is the factory function for the LowQueue filter
// LowQueueFilterFactory defines the factory function for LowQueueFilter.
func LowQueueFilterFactory(name string, rawParameters json.RawMessage, _ plugins.Handle) (plugins.Plugin, error) {
parameters := lowQueueFilterParameters{Threshold: config.DefaultQueueingThresholdLoRA}
if err := json.Unmarshal(rawParameters, &parameters); err != nil {
return nil, fmt.Errorf("failed to parse the parameters of the %s filter. Error: %s", LowQueueFilterName, err)
return nil, fmt.Errorf("failed to parse the parameters of the '%s' filter - %w", LowQueueFilterName, err)
}

return &LowQueueFilter{queueingThresholdLoRA: parameters.Threshold}, nil
return NewLowQueueFilter(parameters.Threshold), nil
}

// NewLowQueueFilter initializes a new LowQueueFilter and returns its pointer.
func NewLowQueueFilter() *LowQueueFilter {
func NewLowQueueFilter(threshold int) *LowQueueFilter {
return &LowQueueFilter{
queueingThresholdLoRA: config.Conf.QueueingThresholdLoRA,
queueingThresholdLoRA: threshold,
}
}

Expand Down
34 changes: 17 additions & 17 deletions pkg/epp/scheduling/framework/plugins/multi/prefix/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,23 +88,6 @@ func (s ServerID) String() string {
return k8stypes.NamespacedName(s).String()
}

// PrefixCachePluginFactory is the factory for the PrefixCache plugin
func PrefixCachePluginFactory(name string, rawParameters json.RawMessage, _ plugins.Handle) (plugins.Plugin, error) {
parameters := Config{
HashBlockSize: DefaultHashBlockSize,
MaxPrefixBlocksToMatch: DefaultMaxPrefixBlocks,
LRUCapacityPerServer: DefaultLRUCapacityPerServer,
}
if err := json.Unmarshal(rawParameters, &parameters); err != nil {
return nil, fmt.Errorf("failed to parse the parameters of the %s plugin. Error: %s", PrefixCachePluginName, err)
}

return &Plugin{
Config: parameters,
indexer: newIndexer(parameters.LRUCapacityPerServer),
}, nil
}

// compile-time type validation
var _ types.StateData = &schedulingContextState{}

Expand Down Expand Up @@ -134,6 +117,23 @@ func (s *schedulingContextState) Clone() types.StateData {
var _ framework.Scorer = &Plugin{}
var _ framework.PostCycle = &Plugin{}

// PrefixCachePluginFactory defines the factory function for Prefix plugin.
func PrefixCachePluginFactory(name string, rawParameters json.RawMessage, _ plugins.Handle) (plugins.Plugin, error) {
parameters := Config{
HashBlockSize: DefaultHashBlockSize,
MaxPrefixBlocksToMatch: DefaultMaxPrefixBlocks,
LRUCapacityPerServer: DefaultLRUCapacityPerServer,
}
if err := json.Unmarshal(rawParameters, &parameters); err != nil {
return nil, fmt.Errorf("failed to parse the parameters of the %s plugin. Error: %s", PrefixCachePluginName, err)
}

return &Plugin{
Config: parameters,
indexer: newIndexer(parameters.LRUCapacityPerServer),
}, nil
}

// New initializes a new prefix Plugin and returns its pointer.
func New(config Config) *Plugin {
capacity := config.LRUCapacityPerServer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,16 @@ import (
logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging"
)

const MaxScorePickerName = "max-score"
const (
MaxScorePickerName = "max-score"
)

// compile-time type validation
var _ framework.Picker = &MaxScorePicker{}

// MaxScorePickerFactory is the factory for the MaxScore picker
// MaxScorePickerFactory defines the factory function for MaxScorePicker.
func MaxScorePickerFactory(name string, _ json.RawMessage, _ plugins.Handle) (plugins.Plugin, error) {
return &MaxScorePicker{random: NewRandomPicker()}, nil
return NewMaxScorePicker(), nil
}

// NewMaxScorePicker initializes a new MaxScorePicker and returns its pointer.
Expand Down
6 changes: 4 additions & 2 deletions pkg/epp/scheduling/framework/plugins/picker/random_picker.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,14 @@ import (
logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging"
)

const RandomPickerName = "random"
const (
RandomPickerName = "random"
)

// compile-time type validation
var _ framework.Picker = &RandomPicker{}

// RandomPickerFactory is the factory for the Random picker
// RandomPickerFactory defines the factory function for RandomPicker.
func RandomPickerFactory(name string, _ json.RawMessage, _ plugins.Handle) (plugins.Plugin, error) {
return NewRandomPicker(), nil
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,14 @@ import (
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/types"
)

const SingleProfileHandlerName = "single-profile"
const (
SingleProfileHandlerName = "single-profile"
)

// compile-time type assertion
var _ framework.ProfileHandler = &SingleProfileHandler{}

// SingleProfileHandlerFactory defines the factory function for SingleProfileHandler.
func SingleProfileHandlerFactory(name string, _ json.RawMessage, _ plugins.Handle) (plugins.Plugin, error) {
return NewSingleProfileHandler(), nil
}
Expand All @@ -42,7 +45,7 @@ func NewSingleProfileHandler() *SingleProfileHandler {
// SingleProfileHandler handles a single profile which is always the primary profile.
type SingleProfileHandler struct{}

// Name returns the name of the Profiles Picker.
// Name returns the name of the Profile Handler.
func (h *SingleProfileHandler) Name() string {
return SingleProfileHandlerName
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/epp/scheduling/framework/plugins/scorer/kvcache.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@ const (
// compile-time type assertion
var _ framework.Scorer = &KVCacheScorer{}

// KvCacheScorerFactory is the factory for the KV-Cache scorer
// KvCacheScorerFactory defines the factory function for KVCacheScorer.
func KvCacheScorerFactory(name string, _ json.RawMessage, _ plugins.Handle) (plugins.Plugin, error) {
return &KVCacheScorer{}, nil
return NewKVCacheScorer(), nil
}

// NewKVCacheScorer initializes a new KVCacheScorer and returns its pointer.
Expand Down
Loading