Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 5 additions & 14 deletions cmd/epp/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,16 +272,16 @@ func (r *Runner) Run(ctx context.Context) error {
}
}

err = r.parseConfiguration(ctx)
err = r.parsePluginsConfiguration(ctx)
if err != nil {
setupLog.Error(err, "Failed to parse the configuration")
setupLog.Error(err, "Failed to parse plugins configuration")
return err
}

// --- Initialize Core EPP Components ---
scheduler := r.initializeScheduler()
scheduler := scheduling.NewSchedulerWithConfig(r.schedulerConfig)

saturationDetector := saturationdetector.NewDetector(sdConfig, datastore, ctrl.Log)
saturationDetector := saturationdetector.NewDetector(sdConfig, datastore, setupLog)

director := requestcontrol.NewDirectorWithConfig(datastore, scheduler, saturationDetector, r.requestControlConfig)

Expand Down Expand Up @@ -326,16 +326,7 @@ func (r *Runner) Run(ctx context.Context) error {
return nil
}

func (r *Runner) initializeScheduler() *scheduling.Scheduler {
if r.schedulerConfig != nil {
return scheduling.NewSchedulerWithConfig(r.schedulerConfig)
}

// otherwise, no one configured from outside scheduler config. use existing configuration
return scheduling.NewScheduler()
}

func (r *Runner) parseConfiguration(ctx context.Context) error {
func (r *Runner) parsePluginsConfiguration(ctx context.Context) error {
if *configText == "" && *configFile == "" {
return nil // configuring through code, not through file
}
Expand Down
44 changes: 0 additions & 44 deletions pkg/epp/scheduling/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/log"
backendmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/config"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/filter"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/picker"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/profile"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/types"
logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging"
)
Expand All @@ -38,46 +34,6 @@ type Datastore interface {
PodGetAll() []backendmetrics.PodMetrics
}

// NewScheduler returns a new scheduler with default scheduler plugins configuration.
func NewScheduler() *Scheduler {
// When the scheduler is initialized with NewScheduler function, thw below config will be used as default.
// it's possible to call NewSchedulerWithConfig to pass a different scheduler config.
// For build time plugins changes, it's recommended to call in main.go to NewSchedulerWithConfig.
loraAffinityFilter := filter.NewLoraAffinityFilter(config.Conf.LoraAffinityThreshold)
leastQueueFilter := filter.NewLeastQueueFilter()
leastKvCacheFilter := filter.NewLeastKVCacheFilter()

lowLatencyFilter := &filter.DecisionTreeFilter{
Current: filter.NewLowQueueFilter(config.Conf.QueueingThresholdLoRA),
NextOnSuccess: &filter.DecisionTreeFilter{
Current: loraAffinityFilter,
NextOnSuccessOrFailure: &filter.DecisionTreeFilter{
Current: leastQueueFilter,
NextOnSuccessOrFailure: &filter.DecisionTreeFilter{
Current: leastKvCacheFilter,
},
},
},
NextOnFailure: &filter.DecisionTreeFilter{
Current: leastQueueFilter,
NextOnSuccessOrFailure: &filter.DecisionTreeFilter{
Current: loraAffinityFilter,
NextOnSuccessOrFailure: &filter.DecisionTreeFilter{
Current: leastKvCacheFilter,
},
},
},
}

defaultProfile := framework.NewSchedulerProfile().
WithFilters(lowLatencyFilter).
WithPicker(picker.NewRandomPicker(picker.DefaultMaxNumOfEndpoints))

profileHandler := profile.NewSingleProfileHandler()

return NewSchedulerWithConfig(NewSchedulerConfig(profileHandler, map[string]*framework.SchedulerProfile{"default": defaultProfile}))
}

// NewSchedulerWithConfig returns a new scheduler with the given scheduler plugins configuration.
func NewSchedulerWithConfig(config *SchedulerConfig) *Scheduler {
return &Scheduler{
Expand Down
41 changes: 40 additions & 1 deletion pkg/epp/scheduling/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,50 @@ import (
k8stypes "k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend"
backendmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics" // Import config for thresholds
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/config"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/filter"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/picker"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/profile"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/types"
)

// Tests the default scheduler configuration and expected behavior.
func TestSchedule(t *testing.T) {
loraAffinityFilter := filter.NewLoraAffinityFilter(config.Conf.LoraAffinityThreshold)
leastQueueFilter := filter.NewLeastQueueFilter()
leastKvCacheFilter := filter.NewLeastKVCacheFilter()

lowLatencyFilter := &filter.DecisionTreeFilter{
Current: filter.NewLowQueueFilter(config.Conf.QueueingThresholdLoRA),
NextOnSuccess: &filter.DecisionTreeFilter{
Current: loraAffinityFilter,
NextOnSuccessOrFailure: &filter.DecisionTreeFilter{
Current: leastQueueFilter,
NextOnSuccessOrFailure: &filter.DecisionTreeFilter{
Current: leastKvCacheFilter,
},
},
},
NextOnFailure: &filter.DecisionTreeFilter{
Current: leastQueueFilter,
NextOnSuccessOrFailure: &filter.DecisionTreeFilter{
Current: loraAffinityFilter,
NextOnSuccessOrFailure: &filter.DecisionTreeFilter{
Current: leastKvCacheFilter,
},
},
},
}

defaultProfile := framework.NewSchedulerProfile().
WithFilters(lowLatencyFilter).
WithPicker(picker.NewRandomPicker(picker.DefaultMaxNumOfEndpoints))

profileHandler := profile.NewSingleProfileHandler()

schedulerConfig := NewSchedulerConfig(profileHandler, map[string]*framework.SchedulerProfile{"default": defaultProfile})

tests := []struct {
name string
req *types.LLMRequest
Expand Down Expand Up @@ -120,7 +159,7 @@ func TestSchedule(t *testing.T) {

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
scheduler := NewScheduler()
scheduler := NewSchedulerWithConfig(schedulerConfig)
got, err := scheduler.Schedule(context.Background(), test.req, test.input)
if test.err != (err != nil) {
t.Errorf("Unexpected error, got %v, want %v", err, test.err)
Expand Down
48 changes: 44 additions & 4 deletions test/integration/epp/hermetic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,13 @@ import (
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/cache"
k8sclient "sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/config"
crconfig "sigs.k8s.io/controller-runtime/pkg/config"
"sigs.k8s.io/controller-runtime/pkg/envtest"
crmetrics "sigs.k8s.io/controller-runtime/pkg/metrics"
"sigs.k8s.io/controller-runtime/pkg/metrics/filters"
metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"
"sigs.k8s.io/yaml"

"sigs.k8s.io/gateway-api-inference-extension/api/v1alpha2"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend"
backendmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics"
Expand All @@ -63,11 +65,15 @@ import (
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/requestcontrol"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/saturationdetector"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/config"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/filter"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/picker"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/profile"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/server"
logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging"
epptestutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/testing"
integrationutils "sigs.k8s.io/gateway-api-inference-extension/test/integration"
"sigs.k8s.io/yaml"
)

const (
Expand Down Expand Up @@ -1018,7 +1024,41 @@ func BeforeSuite() func() {
// Adjust from defaults
serverRunner.PoolNamespacedName = types.NamespacedName{Name: testPoolName, Namespace: testNamespace}
serverRunner.Datastore = datastore.NewDatastore(context.Background(), pmf)
scheduler := scheduling.NewScheduler()

loraAffinityFilter := filter.NewLoraAffinityFilter(config.Conf.LoraAffinityThreshold)
leastQueueFilter := filter.NewLeastQueueFilter()
leastKvCacheFilter := filter.NewLeastKVCacheFilter()

lowLatencyFilter := &filter.DecisionTreeFilter{
Current: filter.NewLowQueueFilter(config.Conf.QueueingThresholdLoRA),
NextOnSuccess: &filter.DecisionTreeFilter{
Current: loraAffinityFilter,
NextOnSuccessOrFailure: &filter.DecisionTreeFilter{
Current: leastQueueFilter,
NextOnSuccessOrFailure: &filter.DecisionTreeFilter{
Current: leastKvCacheFilter,
},
},
},
NextOnFailure: &filter.DecisionTreeFilter{
Current: leastQueueFilter,
NextOnSuccessOrFailure: &filter.DecisionTreeFilter{
Current: loraAffinityFilter,
NextOnSuccessOrFailure: &filter.DecisionTreeFilter{
Current: leastKvCacheFilter,
},
},
},
}

defaultProfile := framework.NewSchedulerProfile().
WithFilters(lowLatencyFilter).
WithPicker(picker.NewRandomPicker(picker.DefaultMaxNumOfEndpoints))

profileHandler := profile.NewSingleProfileHandler()

schedulerConfig := scheduling.NewSchedulerConfig(profileHandler, map[string]*framework.SchedulerProfile{"default": defaultProfile})
scheduler := scheduling.NewSchedulerWithConfig(schedulerConfig)

sdConfig := &saturationdetector.Config{
QueueDepthThreshold: saturationdetector.DefaultQueueDepthThreshold,
Expand Down Expand Up @@ -1125,7 +1165,7 @@ func managerTestOptions(namespace, name string, metricsServerOptions metricsserv
},
},
},
Controller: config.Controller{
Controller: crconfig.Controller{
SkipNameValidation: boolPointer(true),
},
Metrics: metricsServerOptions,
Expand Down