Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion cmd/epp/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ import (

func main() {
// Register all known plugin factories
runner.RegisterAllPlgugins()
runner.RegisterAllPlugins()
// For adding out-of-tree plugins to the plugins registry, use the following:
// plugins.Register(my-out-of-tree-plugin-name, my-out-of-tree-plugin-factory-function)

if err := runner.NewRunner().Run(ctrl.SetupSignalHandler()); err != nil {
os.Exit(1)
Expand Down
4 changes: 2 additions & 2 deletions cmd/epp/runner/register.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ import (
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/scorer"
)

// RegisterAllPlgugins registers the factory functions of all known plugins
func RegisterAllPlgugins() {
// RegisterAllPlugins registers the factory functions of all known plugins
func RegisterAllPlugins() {
plugins.Register(filter.LeastKVCacheFilterName, filter.LeastKVCacheFilterFactory)
plugins.Register(filter.LeastQueueFilterName, filter.LeastQueueFilterFactory)
plugins.Register(filter.LoraAffinityFilterName, filter.LoraAffinityFilterFactory)
Expand Down
20 changes: 8 additions & 12 deletions cmd/epp/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,13 @@ import (
"sigs.k8s.io/controller-runtime/pkg/metrics/filters"
metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"

"sigs.k8s.io/gateway-api-inference-extension/api/config/v1alpha1"
conformance_epp "sigs.k8s.io/gateway-api-inference-extension/conformance/testing-epp"
"sigs.k8s.io/gateway-api-inference-extension/internal/runnable"
backendmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/common/config"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datastore"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics/collectors"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/plugins"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/requestcontrol"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/saturationdetector"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling"
Expand Down Expand Up @@ -217,28 +215,30 @@ func (r *Runner) Run(ctx context.Context) error {
return err
}

var theConfig *v1alpha1.EndpointPickerConfig
var instantiatedPlugins map[string]plugins.Plugin

if len(*configText) != 0 || len(*configFile) != 0 {
theConfig, err = config.LoadConfig([]byte(*configText), *configFile)
theConfig, err := config.LoadConfig([]byte(*configText), *configFile)
if err != nil {
setupLog.Error(err, "Failed to load the configuration")
return err
}

epp := eppHandle{}
instantiatedPlugins, err = config.LoadPluginReferences(theConfig.Plugins, epp)
instantiatedPlugins, err := config.LoadPluginReferences(theConfig.Plugins, epp)
if err != nil {
setupLog.Error(err, "Failed to instantiate the plugins")
return err
}

r.schedulerConfig, err = scheduling.LoadSchedulerConfig(theConfig.SchedulingProfiles, instantiatedPlugins, setupLog)
r.schedulerConfig, err = scheduling.LoadSchedulerConfig(theConfig.SchedulingProfiles, instantiatedPlugins)
if err != nil {
setupLog.Error(err, "Failed to create Scheduler configuration")
return err
}

// Add requestcontrol plugins
if instantiatedPlugins != nil {
r.requestControlConfig = requestcontrol.LoadRequestControlConfig(instantiatedPlugins)
}
}

// --- Initialize Core EPP Components ---
Expand All @@ -250,10 +250,6 @@ func (r *Runner) Run(ctx context.Context) error {

saturationDetector := saturationdetector.NewDetector(sdConfig, datastore, ctrl.Log)

// Add requestControl plugins
if instantiatedPlugins != nil {
r.requestControlConfig.AddPlugins(instantiatedPlugins)
}
director := requestcontrol.NewDirectorWithConfig(datastore, scheduler, saturationDetector, r.requestControlConfig)

// --- Setup ExtProc Server Runner ---
Expand Down
35 changes: 0 additions & 35 deletions pkg/epp/registry/registry.go

This file was deleted.

24 changes: 19 additions & 5 deletions pkg/epp/requestcontrol/request_control_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ limitations under the License.

package requestcontrol

import "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/plugins"
import (
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/plugins"
)

// NewConfig creates a new Config object and returns its pointer.
func NewConfig() *Config {
Expand Down Expand Up @@ -46,10 +48,22 @@ func (c *Config) WithPostResponsePlugins(plugins ...PostResponse) *Config {
return c
}

func (c *Config) AddPlugins(instances map[string]plugins.Plugin) {
for _, plugin := range instances {
if postResponse, ok := plugin.(PostResponse); ok {
c.postResponsePlugins = append(c.postResponsePlugins, postResponse)
func (c *Config) AddPlugins(pluginObjects ...plugins.Plugin) {
for _, plugin := range pluginObjects {
if preRequestPlugin, ok := plugin.(PreRequest); ok {
c.preRequestPlugins = append(c.preRequestPlugins, preRequestPlugin)
}
if postResponsePlugin, ok := plugin.(PostResponse); ok {
c.postResponsePlugins = append(c.postResponsePlugins, postResponsePlugin)
}
}
}

func LoadRequestControlConfig(instantiatedPlugins map[string]plugins.Plugin) *Config {
config := NewConfig()
for _, plugin := range instantiatedPlugins {
config.AddPlugins(plugin)
}

return config
}
2 changes: 1 addition & 1 deletion pkg/epp/scheduling/framework/plugins/filter/filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ func TestLoRASoftAffinityDistribution(t *testing.T) {
expectedAvailabilityPercent := 100 - expectedAffinityPercent

// initialize LoraAffinityFilter
LoraAffinityFilter := NewLoraAffinityFilter()
LoraAffinityFilter := NewLoraAffinityFilter(config.Conf.LoraAffinityThreshold)

for range numIterations {
result := LoraAffinityFilter.Filter(context.Background(), req, types.NewCycleState(), pods)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,14 @@ import (
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/types"
)

const LeastKVCacheFilterName = "least-KV-cache"
const (
LeastKVCacheFilterName = "least-KV-cache"
)

// compile-time type validation
var _ framework.Filter = &LeastKVCacheFilter{}

// LeastKVCacheFilterFactory is the plugin factory function for the Least KV Cache filter
// LeastKVCacheFilterFactory defines the factory function for LeastKVCacheFilter.
func LeastKVCacheFilterFactory(name string, _ json.RawMessage, _ plugins.Handle) (plugins.Plugin, error) {
return NewLeastKVCacheFilter(), nil
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,14 @@ import (
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/types"
)

const LeastQueueFilterName = "least-queue"
const (
LeastQueueFilterName = "least-queue"
)

// compile-time type validation
var _ framework.Filter = &LeastQueueFilter{}

// LeastQueueFilterFactory is the plugin factory function for the Least Queue filter
// LeastQueueFilterFactory defines the factory function for LeastQueueFilter.
func LeastQueueFilterFactory(name string, _ json.RawMessage, _ plugins.Handle) (plugins.Plugin, error) {
return NewLeastQueueFilter(), nil
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ import (
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/types"
)

const LoraAffinityFilterName = "lora-affinity"
const (
LoraAffinityFilterName = "lora-affinity"
)

type loraAffinityFilterParameters struct {
Threshold float64 `json:"threshold"`
Expand All @@ -38,19 +40,19 @@ type loraAffinityFilterParameters struct {
// compile-time type validation
var _ framework.Filter = &LoraAffinityFilter{}

// LoraAffinityFilterFactory is the factory function for the LoraAffinity filter
// LoraAffinityFilterFactory defines the factory function for LoraAffinityFilter.
func LoraAffinityFilterFactory(name string, rawParameters json.RawMessage, _ plugins.Handle) (plugins.Plugin, error) {
parameters := loraAffinityFilterParameters{Threshold: config.DefaultLoraAffinityThreshold}
if err := json.Unmarshal(rawParameters, &parameters); err != nil {
return nil, fmt.Errorf("failed to parse the parameters of the %s filter. Error: %s", LoraAffinityFilterName, err)
return nil, fmt.Errorf("failed to parse the parameters of the '%s' filter - %w", LoraAffinityFilterName, err)
}
return &LoraAffinityFilter{loraAffinityThreshold: parameters.Threshold}, nil
return NewLoraAffinityFilter(parameters.Threshold), nil
}

// NewLoraAffinityFilter initializes a new LoraAffinityFilter and returns its pointer.
func NewLoraAffinityFilter() *LoraAffinityFilter {
func NewLoraAffinityFilter(threshold float64) *LoraAffinityFilter {
return &LoraAffinityFilter{
loraAffinityThreshold: config.Conf.LoraAffinityThreshold,
loraAffinityThreshold: threshold,
}
}

Expand Down
14 changes: 8 additions & 6 deletions pkg/epp/scheduling/framework/plugins/filter/low_queue_filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ import (
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/types"
)

const LowQueueFilterName = "low-queue"
const (
LowQueueFilterName = "low-queue"
)

type lowQueueFilterParameters struct {
Threshold int `json:"threshold"`
Expand All @@ -37,20 +39,20 @@ type lowQueueFilterParameters struct {
// compile-time type validation
var _ framework.Filter = &LowQueueFilter{}

// LowQueueFilterFactory is the factory function for the LowQueue filter
// LowQueueFilterFactory defines the factory function for LowQueueFilter.
func LowQueueFilterFactory(name string, rawParameters json.RawMessage, _ plugins.Handle) (plugins.Plugin, error) {
parameters := lowQueueFilterParameters{Threshold: config.DefaultQueueingThresholdLoRA}
if err := json.Unmarshal(rawParameters, &parameters); err != nil {
return nil, fmt.Errorf("failed to parse the parameters of the %s filter. Error: %s", LowQueueFilterName, err)
return nil, fmt.Errorf("failed to parse the parameters of the '%s' filter - %w", LowQueueFilterName, err)
}

return &LowQueueFilter{queueingThresholdLoRA: parameters.Threshold}, nil
return NewLowQueueFilter(parameters.Threshold), nil
}

// NewLowQueueFilter initializes a new LowQueueFilter and returns its pointer.
func NewLowQueueFilter() *LowQueueFilter {
func NewLowQueueFilter(threshold int) *LowQueueFilter {
return &LowQueueFilter{
queueingThresholdLoRA: config.Conf.QueueingThresholdLoRA,
queueingThresholdLoRA: threshold,
}
}

Expand Down
34 changes: 17 additions & 17 deletions pkg/epp/scheduling/framework/plugins/multi/prefix/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,23 +88,6 @@ func (s ServerID) String() string {
return k8stypes.NamespacedName(s).String()
}

// PrefixCachePluginFactory is the factory for the PrefixCache plugin
func PrefixCachePluginFactory(name string, rawParameters json.RawMessage, _ plugins.Handle) (plugins.Plugin, error) {
parameters := Config{
HashBlockSize: DefaultHashBlockSize,
MaxPrefixBlocksToMatch: DefaultMaxPrefixBlocks,
LRUCapacityPerServer: DefaultLRUCapacityPerServer,
}
if err := json.Unmarshal(rawParameters, &parameters); err != nil {
return nil, fmt.Errorf("failed to parse the parameters of the %s plugin. Error: %s", PrefixCachePluginName, err)
}

return &Plugin{
Config: parameters,
indexer: newIndexer(parameters.LRUCapacityPerServer),
}, nil
}

// compile-time type validation
var _ types.StateData = &schedulingContextState{}

Expand Down Expand Up @@ -134,6 +117,23 @@ func (s *schedulingContextState) Clone() types.StateData {
var _ framework.Scorer = &Plugin{}
var _ framework.PostCycle = &Plugin{}

// PrefixCachePluginFactory defines the factory function for Prefix plugin.
func PrefixCachePluginFactory(name string, rawParameters json.RawMessage, _ plugins.Handle) (plugins.Plugin, error) {
parameters := Config{
HashBlockSize: DefaultHashBlockSize,
MaxPrefixBlocksToMatch: DefaultMaxPrefixBlocks,
LRUCapacityPerServer: DefaultLRUCapacityPerServer,
}
if err := json.Unmarshal(rawParameters, &parameters); err != nil {
return nil, fmt.Errorf("failed to parse the parameters of the %s plugin. Error: %s", PrefixCachePluginName, err)
}

return &Plugin{
Config: parameters,
indexer: newIndexer(parameters.LRUCapacityPerServer),
}, nil
}

// New initializes a new prefix Plugin and returns its pointer.
func New(config Config) *Plugin {
capacity := config.LRUCapacityPerServer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,16 @@ import (
logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging"
)

const MaxScorePickerName = "max-score"
const (
MaxScorePickerName = "max-score"
)

// compile-time type validation
var _ framework.Picker = &MaxScorePicker{}

// MaxScorePickerFactory is the factory for the MaxScore picker
// MaxScorePickerFactory defines the factory function for MaxScorePicker.
func MaxScorePickerFactory(name string, _ json.RawMessage, _ plugins.Handle) (plugins.Plugin, error) {
return &MaxScorePicker{random: NewRandomPicker()}, nil
return NewMaxScorePicker(), nil
}

// NewMaxScorePicker initializes a new MaxScorePicker and returns its pointer.
Expand Down
6 changes: 4 additions & 2 deletions pkg/epp/scheduling/framework/plugins/picker/random_picker.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,14 @@ import (
logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging"
)

const RandomPickerName = "random"
const (
RandomPickerName = "random"
)

// compile-time type validation
var _ framework.Picker = &RandomPicker{}

// RandomPickerFactory is the factory for the Random picker
// RandomPickerFactory defines the factory function for RandomPicker.
func RandomPickerFactory(name string, _ json.RawMessage, _ plugins.Handle) (plugins.Plugin, error) {
return NewRandomPicker(), nil
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,14 @@ import (
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/types"
)

const SingleProfileHandlerName = "single-profile"
const (
SingleProfileHandlerName = "single-profile"
)

// compile-time type assertion
var _ framework.ProfileHandler = &SingleProfileHandler{}

// SingleProfileHandlerFactory defines the factory function for SingleProfileHandler.
func SingleProfileHandlerFactory(name string, _ json.RawMessage, _ plugins.Handle) (plugins.Plugin, error) {
return NewSingleProfileHandler(), nil
}
Expand All @@ -42,7 +45,7 @@ func NewSingleProfileHandler() *SingleProfileHandler {
// SingleProfileHandler handles a single profile which is always the primary profile.
type SingleProfileHandler struct{}

// Name returns the name of the Profiles Picker.
// Name returns the name of the Profile Handler.
func (h *SingleProfileHandler) Name() string {
return SingleProfileHandlerName
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/epp/scheduling/framework/plugins/scorer/kvcache.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@ const (
// compile-time type assertion
var _ framework.Scorer = &KVCacheScorer{}

// KvCacheScorerFactory is the factory for the KV-Cache scorer
// KvCacheScorerFactory defines the factory function for KVCacheScorer.
func KvCacheScorerFactory(name string, _ json.RawMessage, _ plugins.Handle) (plugins.Plugin, error) {
return &KVCacheScorer{}, nil
return NewKVCacheScorer(), nil
}

// NewKVCacheScorer initializes a new KVCacheScorer and returns its pointer.
Expand Down
Loading