Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions cmd/epp/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ func (r *Runner) Run(ctx context.Context) error {
}

// --- Initialize Core EPP Components ---
scheduler, err := r.initializeScheduler(datastore)
scheduler, err := r.initializeScheduler()
if err != nil {
setupLog.Error(err, "Failed to create scheduler")
return err
Expand Down Expand Up @@ -293,13 +293,13 @@ func (r *Runner) Run(ctx context.Context) error {
return nil
}

func (r *Runner) initializeScheduler(datastore datastore.Datastore) (*scheduling.Scheduler, error) {
func (r *Runner) initializeScheduler() (*scheduling.Scheduler, error) {
if r.schedulerConfig != nil {
return scheduling.NewSchedulerWithConfig(datastore, r.schedulerConfig), nil
return scheduling.NewSchedulerWithConfig(r.schedulerConfig), nil
}

// otherwise, no one configured from outside scheduler config. use existing configuration
scheduler := scheduling.NewScheduler(datastore)
scheduler := scheduling.NewScheduler()
if schedulerV2 {
queueScorerWeight := envutil.GetEnvInt("QUEUE_SCORE_WEIGHT", scorer.DefaultQueueScorerWeight, setupLog)
kvCacheScorerWeight := envutil.GetEnvInt("KV_CACHE_SCORE_WEIGHT", scorer.DefaultKVCacheScorerWeight, setupLog)
Expand All @@ -317,11 +317,11 @@ func (r *Runner) initializeScheduler(datastore datastore.Datastore) (*scheduling
}

schedulerConfig := scheduling.NewSchedulerConfig(profile.NewSingleProfileHandler(), map[string]*framework.SchedulerProfile{"schedulerv2": schedulerProfile})
scheduler = scheduling.NewSchedulerWithConfig(datastore, schedulerConfig)
scheduler = scheduling.NewSchedulerWithConfig(schedulerConfig)
}

if reqHeaderBasedSchedulerForTesting {
scheduler = conformance_epp.NewReqHeaderBasedScheduler(datastore)
scheduler = conformance_epp.NewReqHeaderBasedScheduler()
}

return scheduler, nil
Expand Down
4 changes: 2 additions & 2 deletions conformance/testing-epp/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,11 @@ import (
// NewReqHeaderBasedScheduler creates a scheduler for conformance tests that selects
// an endpoint based on the "test-epp-endpoint-selection" request header. If the
// header is missing or the specified endpoint doesn't exist, no endpoint is returned.
func NewReqHeaderBasedScheduler(datastore scheduling.Datastore) *scheduling.Scheduler {
func NewReqHeaderBasedScheduler() *scheduling.Scheduler {
predicatableSchedulerProfile := framework.NewSchedulerProfile().
WithFilters(filter.NewHeaderBasedTestingFilter()).
WithPicker(picker.NewMaxScorePicker())

return scheduling.NewSchedulerWithConfig(datastore, scheduling.NewSchedulerConfig(
return scheduling.NewSchedulerWithConfig(scheduling.NewSchedulerConfig(
profile.NewSingleProfileHandler(), map[string]*framework.SchedulerProfile{"req-header-based-profile": predicatableSchedulerProfile}))
}
38 changes: 13 additions & 25 deletions conformance/testing-epp/sheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,13 @@ import (
func TestSchedule(t *testing.T) {
tests := []struct {
name string
input []*backendmetrics.FakePodMetrics
input []backendmetrics.PodMetrics
req *types.LLMRequest
wantRes *types.SchedulingResult
err bool
}{
{
name: "no pods in datastore and req header is set",
name: "no candidate pods and req header is set",
req: &types.LLMRequest{
Headers: map[string]string{"test-epp-endpoint-selection": "random-endpoint"},
RequestId: uuid.NewString(),
Expand All @@ -47,8 +47,8 @@ func TestSchedule(t *testing.T) {
},
{
name: "req header not set",
input: []*backendmetrics.FakePodMetrics{
{Pod: &backend.Pod{Address: "random-endpoint"}},
input: []backendmetrics.PodMetrics{
&backendmetrics.FakePodMetrics{Pod: &backend.Pod{Address: "random-endpoint"}},
},
req: &types.LLMRequest{
Headers: map[string]string{}, // Deliberately set an empty header.
Expand All @@ -58,9 +58,9 @@ func TestSchedule(t *testing.T) {
err: true,
},
{
name: "no pods address in datastore matches req header address",
input: []*backendmetrics.FakePodMetrics{
{Pod: &backend.Pod{Address: "nonmatched-endpoint"}},
name: "no pods address from the candidate pods matches req header address",
input: []backendmetrics.PodMetrics{
&backendmetrics.FakePodMetrics{Pod: &backend.Pod{Address: "nonmatched-endpoint"}},
},
req: &types.LLMRequest{
Headers: map[string]string{"test-epp-endpoint-selection": "matched-endpoint"},
Expand All @@ -70,10 +70,10 @@ func TestSchedule(t *testing.T) {
err: true,
},
{
name: "one pod address in datastore matches req header address",
input: []*backendmetrics.FakePodMetrics{
{Pod: &backend.Pod{Address: "nonmatched-endpoint"}},
{Pod: &backend.Pod{Address: "matched-endpoint"}},
name: "one pod address from the candidate pods matches req header address",
input: []backendmetrics.PodMetrics{
&backendmetrics.FakePodMetrics{Pod: &backend.Pod{Address: "nonmatched-endpoint"}},
&backendmetrics.FakePodMetrics{Pod: &backend.Pod{Address: "matched-endpoint"}},
},
req: &types.LLMRequest{
Headers: map[string]string{"test-epp-endpoint-selection": "matched-endpoint"},
Expand All @@ -99,8 +99,8 @@ func TestSchedule(t *testing.T) {

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
scheduler := NewReqHeaderBasedScheduler(&fakeDataStore{pods: test.input})
got, err := scheduler.Schedule(context.Background(), test.req)
scheduler := NewReqHeaderBasedScheduler()
got, err := scheduler.Schedule(context.Background(), test.req, types.ToSchedulerPodMetrics(test.input))
if test.err != (err != nil) {
t.Errorf("Unexpected error, got %v, want %v", err, test.err)
}
Expand All @@ -111,15 +111,3 @@ func TestSchedule(t *testing.T) {
})
}
}

type fakeDataStore struct {
pods []*backendmetrics.FakePodMetrics
}

func (fds *fakeDataStore) PodGetAll() []backendmetrics.PodMetrics {
pm := make([]backendmetrics.PodMetrics, 0, len(fds.pods))
for _, pod := range fds.pods {
pm = append(pm, pod)
}
return pm
}
8 changes: 6 additions & 2 deletions pkg/epp/requestcontrol/director.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ import (

// Scheduler defines the interface required by the Director for scheduling.
type Scheduler interface {
Schedule(ctx context.Context, b *schedulingtypes.LLMRequest) (result *schedulingtypes.SchedulingResult, err error)
Schedule(ctx context.Context, request *schedulingtypes.LLMRequest, candidatePods []schedulingtypes.Pod) (result *schedulingtypes.SchedulingResult, err error)
}

// SaturationDetector provides a signal indicating whether the backends are considered saturated.
Expand Down Expand Up @@ -135,7 +135,11 @@ func (d *Director) HandleRequest(ctx context.Context, reqCtx *handlers.RequestCo
}

// --- 3. Call Scheduler ---
results, err := d.scheduler.Schedule(ctx, reqCtx.SchedulingRequest)
// Snapshot pod metrics from the datastore to:
// 1. Reduce concurrent access to the datastore.
// 2. Ensure consistent data during the scheduling operation of a request between all scheduling cycles.
candidatePods := schedulingtypes.ToSchedulerPodMetrics(d.datastore.PodGetAll())
results, err := d.scheduler.Schedule(ctx, reqCtx.SchedulingRequest, candidatePods)
if err != nil {
return reqCtx, errutil.Error{Code: errutil.InferencePoolResourceExhausted, Msg: fmt.Errorf("failed to find target pod: %w", err).Error()}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/epp/requestcontrol/director_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ type mockScheduler struct {
scheduleErr error
}

func (m *mockScheduler) Schedule(ctx context.Context, req *schedulingtypes.LLMRequest) (*schedulingtypes.SchedulingResult, error) {
func (m *mockScheduler) Schedule(_ context.Context, _ *schedulingtypes.LLMRequest, _ []schedulingtypes.Pod) (*schedulingtypes.SchedulingResult, error) {
return m.scheduleResults, m.scheduleErr
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/epp/scheduling/framework/scheduler_profile.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,8 @@ func (p *SchedulerProfile) AddPlugins(pluginObjects ...plugins.Plugin) error {

// RunCycle runs a SchedulerProfile cycle. In other words, it invokes all the SchedulerProfile plugins in this
// order - Filters, Scorers, Picker, PostCyclePlugins. After completing all, it returns the result.
func (p *SchedulerProfile) Run(ctx context.Context, request *types.LLMRequest, cycleState *types.CycleState, podsSnapshot []types.Pod) (*types.ProfileRunResult, error) {
pods := p.runFilterPlugins(ctx, request, cycleState, podsSnapshot)
func (p *SchedulerProfile) Run(ctx context.Context, request *types.LLMRequest, cycleState *types.CycleState, candidatePods []types.Pod) (*types.ProfileRunResult, error) {
pods := p.runFilterPlugins(ctx, request, cycleState, candidatePods)
if len(pods) == 0 {
return nil, errutil.Error{Code: errutil.Internal, Msg: "no pods available for the given request"}
}
Expand Down
18 changes: 5 additions & 13 deletions pkg/epp/scheduling/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ type Datastore interface {
}

// NewScheduler returns a new scheduler with default scheduler plugins configuration.
func NewScheduler(datastore Datastore) *Scheduler {
func NewScheduler() *Scheduler {
// When the scheduler is initialized with NewScheduler function, thw below config will be used as default.
// it's possible to call NewSchedulerWithConfig to pass a different scheduler config.
// For build time plugins changes, it's recommended to call in main.go to NewSchedulerWithConfig.
Expand Down Expand Up @@ -75,26 +75,24 @@ func NewScheduler(datastore Datastore) *Scheduler {

profileHandler := profile.NewSingleProfileHandler()

return NewSchedulerWithConfig(datastore, NewSchedulerConfig(profileHandler, map[string]*framework.SchedulerProfile{"default": defaultProfile}))
return NewSchedulerWithConfig(NewSchedulerConfig(profileHandler, map[string]*framework.SchedulerProfile{"default": defaultProfile}))
}

// NewSchedulerWithConfig returns a new scheduler with the given scheduler plugins configuration.
func NewSchedulerWithConfig(datastore Datastore, config *SchedulerConfig) *Scheduler {
func NewSchedulerWithConfig(config *SchedulerConfig) *Scheduler {
return &Scheduler{
datastore: datastore,
profileHandler: config.profileHandler,
profiles: config.profiles,
}
}

type Scheduler struct {
datastore Datastore
profileHandler framework.ProfileHandler
profiles map[string]*framework.SchedulerProfile
}

// Schedule finds the target pod based on metrics and the requested lora adapter.
func (s *Scheduler) Schedule(ctx context.Context, request *types.LLMRequest) (*types.SchedulingResult, error) {
func (s *Scheduler) Schedule(ctx context.Context, request *types.LLMRequest, candidatePods []types.Pod) (*types.SchedulingResult, error) {
logger := log.FromContext(ctx).WithValues("request", request)
loggerDebug := logger.V(logutil.DEBUG)

Expand All @@ -103,12 +101,6 @@ func (s *Scheduler) Schedule(ctx context.Context, request *types.LLMRequest) (*t
metrics.RecordSchedulerE2ELatency(time.Since(scheduleStart))
}()

// Snapshot pod metrics from the datastore to:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can we preserve this comment in the director file?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure, done

// 1. Reduce concurrent access to the datastore.
// 2. Ensure consistent data during the scheduling operation of a request between all scheduling cycles.
podsSnapshot := types.ToSchedulerPodMetrics(s.datastore.PodGetAll())
loggerDebug.Info(fmt.Sprintf("Scheduling a request, Metrics: %+v", podsSnapshot))

profileRunResults := map[string]*types.ProfileRunResult{}
cycleState := types.NewCycleState()

Expand All @@ -122,7 +114,7 @@ func (s *Scheduler) Schedule(ctx context.Context, request *types.LLMRequest) (*t

for name, profile := range profiles {
// run the selected profiles and collect results (current code runs all profiles)
profileRunResult, err := profile.Run(ctx, request, cycleState, podsSnapshot)
profileRunResult, err := profile.Run(ctx, request, cycleState, candidatePods)
if err != nil {
loggerDebug.Info("failed to run scheduler profile", "profile", name, "error", err.Error())
}
Expand Down
30 changes: 9 additions & 21 deletions pkg/epp/scheduling/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,17 +33,17 @@ func TestSchedule(t *testing.T) {
tests := []struct {
name string
req *types.LLMRequest
input []*backendmetrics.FakePodMetrics
input []backendmetrics.PodMetrics
wantRes *types.SchedulingResult
err bool
}{
{
name: "no pods in datastore",
name: "no candidate pods",
req: &types.LLMRequest{
TargetModel: "any-model",
RequestId: uuid.NewString(),
},
input: []*backendmetrics.FakePodMetrics{},
input: []backendmetrics.PodMetrics{},
wantRes: nil,
err: true,
},
Expand All @@ -55,8 +55,8 @@ func TestSchedule(t *testing.T) {
},
// pod2 will be picked because it has relatively low queue size, with the requested
// model being active, and has low KV cache.
input: []*backendmetrics.FakePodMetrics{
{
input: []backendmetrics.PodMetrics{
&backendmetrics.FakePodMetrics{
Pod: &backend.Pod{NamespacedName: k8stypes.NamespacedName{Name: "pod1"}},
Metrics: &backendmetrics.MetricsState{
WaitingQueueSize: 0,
Expand All @@ -68,7 +68,7 @@ func TestSchedule(t *testing.T) {
},
},
},
{
&backendmetrics.FakePodMetrics{
Pod: &backend.Pod{NamespacedName: k8stypes.NamespacedName{Name: "pod2"}},
Metrics: &backendmetrics.MetricsState{
WaitingQueueSize: 3,
Expand All @@ -80,7 +80,7 @@ func TestSchedule(t *testing.T) {
},
},
},
{
&backendmetrics.FakePodMetrics{
Pod: &backend.Pod{NamespacedName: k8stypes.NamespacedName{Name: "pod3"}},
Metrics: &backendmetrics.MetricsState{
WaitingQueueSize: 10,
Expand Down Expand Up @@ -119,8 +119,8 @@ func TestSchedule(t *testing.T) {

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
scheduler := NewScheduler(&fakeDataStore{pods: test.input})
got, err := scheduler.Schedule(context.Background(), test.req)
scheduler := NewScheduler()
got, err := scheduler.Schedule(context.Background(), test.req, types.ToSchedulerPodMetrics(test.input))
if test.err != (err != nil) {
t.Errorf("Unexpected error, got %v, want %v", err, test.err)
}
Expand All @@ -131,15 +131,3 @@ func TestSchedule(t *testing.T) {
})
}
}

type fakeDataStore struct {
pods []*backendmetrics.FakePodMetrics
}

func (fds *fakeDataStore) PodGetAll() []backendmetrics.PodMetrics {
pm := make([]backendmetrics.PodMetrics, 0, len(fds.pods))
for _, pod := range fds.pods {
pm = append(pm, pod)
}
return pm
}
2 changes: 1 addition & 1 deletion test/integration/epp/hermetic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -925,7 +925,7 @@ func BeforeSuite() func() {
// Adjust from defaults
serverRunner.PoolNamespacedName = types.NamespacedName{Name: testPoolName, Namespace: testNamespace}
serverRunner.Datastore = datastore.NewDatastore(context.Background(), pmf)
scheduler := scheduling.NewScheduler(serverRunner.Datastore)
scheduler := scheduling.NewScheduler()

sdConfig := &saturationdetector.Config{
QueueDepthThreshold: saturationdetector.DefaultQueueDepthThreshold,
Expand Down