Skip to content

Commit fd75fd8

Browse files
nirrozenbaumBenjaminBraunDev
authored andcommitted
remove datastore dependency from the scheduler (kubernetes-sigs#1049)
* remove datastore dependency from the scheduler Signed-off-by: Nir Rozenbaum <nirro@il.ibm.com> * added back comments on snapshotting pods from datastore before calling schedule Signed-off-by: Nir Rozenbaum <nirro@il.ibm.com> * removed fake datastore from conformance scheduler test Signed-off-by: Nir Rozenbaum <nirro@il.ibm.com> --------- Signed-off-by: Nir Rozenbaum <nirro@il.ibm.com>
1 parent f8c9507 commit fd75fd8

File tree

9 files changed

+61
-90
lines changed

9 files changed

+61
-90
lines changed

cmd/epp/runner/runner.go

Lines changed: 22 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ import (
4040
backendmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics"
4141
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/common/config/loader"
4242
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datastore"
43+
4344
// Import the latency predictor package
4445
latencypredictor "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/latencypredictorasync"
4546
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics"
@@ -93,11 +94,11 @@ var (
9394
"refreshPrometheusMetricsInterval",
9495
runserver.DefaultRefreshPrometheusMetricsInterval,
9596
"interval to flush prometheus metrics")
96-
logVerbosity = flag.Int("v", logging.DEFAULT, "number for the log level verbosity")
97+
logVerbosity = flag.Int("v", logging.DEFAULT, "number for the log level verbosity")
9798
secureServing = flag.Bool(
9899
"secureServing", runserver.DefaultSecureServing, "Enables secure serving. Defaults to true.")
99100
healthChecking = flag.Bool("healthChecking", runserver.DefaultHealthChecking, "Enables health checking")
100-
certPath = flag.String(
101+
certPath = flag.String(
101102
"certPath", "", "The path to the certificate for secure serving. The certificate and private key files "+
102103
"are assumed to be named tls.crt and tls.key, respectively. If not set, and secureServing is enabled, "+
103104
"then a self-signed certificate is used.")
@@ -222,11 +223,11 @@ func (r *Runner) Run(ctx context.Context) error {
222223
// ===================================================================
223224
// == Latency Predictor Integration
224225
// ===================================================================
225-
var predictor latencypredictor.PredictorInterface // Use the interface type
226+
var predictor latencypredictor.PredictorInterface // Use the interface type
226227
if *enableLatencyPredictor {
227228
setupLog.Info("Latency predictor is enabled. Initializing...")
228229
predictor = latencypredictor.New(latencypredictor.ConfigFromEnv(), ctrl.Log.WithName("latency-predictor"))
229-
230+
230231
// For the runnable, you'll need to type assert back to the concrete type
231232
concretePredictor := predictor.(*latencypredictor.Predictor)
232233
if err := mgr.Add(runnable.NoLeaderElection(&predictorRunnable{predictor: concretePredictor})); err != nil {
@@ -235,10 +236,9 @@ func (r *Runner) Run(ctx context.Context) error {
235236
}
236237
} else {
237238
setupLog.Info("Latency predictor is disabled.")
238-
predictor = nil // This will be a true nil interface
239+
predictor = nil // This will be a true nil interface
239240
}
240241

241-
242242
// ===================================================================
243243

244244
if len(*configText) != 0 || len(*configFile) != 0 {
@@ -281,18 +281,18 @@ func (r *Runner) Run(ctx context.Context) error {
281281

282282
// --- Setup ExtProc Server Runner ---
283283
serverRunner := &runserver.ExtProcServerRunner{
284-
GrpcPort: *grpcPort,
284+
GrpcPort: *grpcPort,
285285
DestinationEndpointHintMetadataNamespace: *destinationEndpointHintMetadataNamespace,
286-
DestinationEndpointHintKey: *destinationEndpointHintKey,
287-
PoolNamespacedName: poolNamespacedName,
288-
Datastore: datastore,
289-
SecureServing: *secureServing,
290-
HealthChecking: *healthChecking,
291-
CertPath: *certPath,
292-
RefreshPrometheusMetricsInterval: *refreshPrometheusMetricsInterval,
293-
Director: director,
294-
SaturationDetector: saturationDetector,
295-
LatencyPredictor: predictor,
286+
DestinationEndpointHintKey: *destinationEndpointHintKey,
287+
PoolNamespacedName: poolNamespacedName,
288+
Datastore: datastore,
289+
SecureServing: *secureServing,
290+
HealthChecking: *healthChecking,
291+
CertPath: *certPath,
292+
RefreshPrometheusMetricsInterval: *refreshPrometheusMetricsInterval,
293+
Director: director,
294+
SaturationDetector: saturationDetector,
295+
LatencyPredictor: predictor,
296296
}
297297
if err := serverRunner.SetupWithManager(ctx, mgr); err != nil {
298298
setupLog.Error(err, "Failed to setup EPP controllers")
@@ -321,13 +321,13 @@ func (r *Runner) Run(ctx context.Context) error {
321321
return nil
322322
}
323323

324-
func (r *Runner) initializeScheduler(datastore datastore.Datastore,) (*scheduling.Scheduler, error) {
324+
func (r *Runner) initializeScheduler(datastore datastore.Datastore) (*scheduling.Scheduler, error) {
325325
if r.schedulerConfig != nil {
326-
return scheduling.NewSchedulerWithConfig(datastore, r.schedulerConfig), nil
326+
return scheduling.NewSchedulerWithConfig(r.schedulerConfig), nil
327327
}
328328

329329
// otherwise, no one configured from outside scheduler config. use existing configuration
330-
scheduler := scheduling.NewScheduler(datastore)
330+
scheduler := scheduling.NewScheduler()
331331
if schedulerV2 {
332332
queueScorerWeight := envutil.GetEnvInt("QUEUE_SCORE_WEIGHT", scorer.DefaultQueueScorerWeight, setupLog)
333333
kvCacheScorerWeight := envutil.GetEnvInt("KV_CACHE_SCORE_WEIGHT", scorer.DefaultKVCacheScorerWeight, setupLog)
@@ -337,7 +337,6 @@ func (r *Runner) initializeScheduler(datastore datastore.Datastore,) (*schedulin
337337
framework.NewWeightedScorer(scorer.NewKVCacheScorer(), kvCacheScorerWeight)).
338338
WithPicker(picker.NewMaxScorePicker())
339339

340-
341340
if prefixCacheScheduling {
342341
prefixScorerWeight := envutil.GetEnvInt("PREFIX_CACHE_SCORE_WEIGHT", prefix.DefaultScorerWeight, setupLog)
343342
if err := schedulerProfile.AddPlugins(framework.NewWeightedScorer(prefix.New(loadPrefixCacheConfig()), prefixScorerWeight)); err != nil {
@@ -346,11 +345,11 @@ func (r *Runner) initializeScheduler(datastore datastore.Datastore,) (*schedulin
346345
}
347346

348347
schedulerConfig := scheduling.NewSchedulerConfig(profile.NewSingleProfileHandler(), map[string]*framework.SchedulerProfile{"schedulerv2": schedulerProfile})
349-
scheduler = scheduling.NewSchedulerWithConfig(datastore, schedulerConfig)
348+
scheduler = scheduling.NewSchedulerWithConfig(schedulerConfig)
350349
}
351350

352351
if reqHeaderBasedSchedulerForTesting {
353-
scheduler = conformance_epp.NewReqHeaderBasedScheduler(datastore)
352+
scheduler = conformance_epp.NewReqHeaderBasedScheduler()
354353
}
355354

356355
return scheduler, nil

conformance/testing-epp/scheduler.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,11 @@ import (
2727
// NewReqHeaderBasedScheduler creates a scheduler for conformance tests that selects
2828
// an endpoint based on the "test-epp-endpoint-selection" request header. If the
2929
// header is missing or the specified endpoint doesn't exist, no endpoint is returned.
30-
func NewReqHeaderBasedScheduler(datastore scheduling.Datastore) *scheduling.Scheduler {
30+
func NewReqHeaderBasedScheduler() *scheduling.Scheduler {
3131
predicatableSchedulerProfile := framework.NewSchedulerProfile().
3232
WithFilters(filter.NewHeaderBasedTestingFilter()).
3333
WithPicker(picker.NewMaxScorePicker())
3434

35-
return scheduling.NewSchedulerWithConfig(datastore, scheduling.NewSchedulerConfig(
35+
return scheduling.NewSchedulerWithConfig(scheduling.NewSchedulerConfig(
3636
profile.NewSingleProfileHandler(), map[string]*framework.SchedulerProfile{"req-header-based-profile": predicatableSchedulerProfile}))
3737
}

conformance/testing-epp/sheduler_test.go

Lines changed: 13 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -31,13 +31,13 @@ import (
3131
func TestSchedule(t *testing.T) {
3232
tests := []struct {
3333
name string
34-
input []*backendmetrics.FakePodMetrics
34+
input []backendmetrics.PodMetrics
3535
req *types.LLMRequest
3636
wantRes *types.SchedulingResult
3737
err bool
3838
}{
3939
{
40-
name: "no pods in datastore and req header is set",
40+
name: "no candidate pods and req header is set",
4141
req: &types.LLMRequest{
4242
Headers: map[string]string{"test-epp-endpoint-selection": "random-endpoint"},
4343
RequestId: uuid.NewString(),
@@ -47,8 +47,8 @@ func TestSchedule(t *testing.T) {
4747
},
4848
{
4949
name: "req header not set",
50-
input: []*backendmetrics.FakePodMetrics{
51-
{Pod: &backend.Pod{Address: "random-endpoint"}},
50+
input: []backendmetrics.PodMetrics{
51+
&backendmetrics.FakePodMetrics{Pod: &backend.Pod{Address: "random-endpoint"}},
5252
},
5353
req: &types.LLMRequest{
5454
Headers: map[string]string{}, // Deliberately set an empty header.
@@ -58,9 +58,9 @@ func TestSchedule(t *testing.T) {
5858
err: true,
5959
},
6060
{
61-
name: "no pods address in datastore matches req header address",
62-
input: []*backendmetrics.FakePodMetrics{
63-
{Pod: &backend.Pod{Address: "nonmatched-endpoint"}},
61+
name: "no pods address from the candidate pods matches req header address",
62+
input: []backendmetrics.PodMetrics{
63+
&backendmetrics.FakePodMetrics{Pod: &backend.Pod{Address: "nonmatched-endpoint"}},
6464
},
6565
req: &types.LLMRequest{
6666
Headers: map[string]string{"test-epp-endpoint-selection": "matched-endpoint"},
@@ -70,10 +70,10 @@ func TestSchedule(t *testing.T) {
7070
err: true,
7171
},
7272
{
73-
name: "one pod address in datastore matches req header address",
74-
input: []*backendmetrics.FakePodMetrics{
75-
{Pod: &backend.Pod{Address: "nonmatched-endpoint"}},
76-
{Pod: &backend.Pod{Address: "matched-endpoint"}},
73+
name: "one pod address from the candidate pods matches req header address",
74+
input: []backendmetrics.PodMetrics{
75+
&backendmetrics.FakePodMetrics{Pod: &backend.Pod{Address: "nonmatched-endpoint"}},
76+
&backendmetrics.FakePodMetrics{Pod: &backend.Pod{Address: "matched-endpoint"}},
7777
},
7878
req: &types.LLMRequest{
7979
Headers: map[string]string{"test-epp-endpoint-selection": "matched-endpoint"},
@@ -99,8 +99,8 @@ func TestSchedule(t *testing.T) {
9999

100100
for _, test := range tests {
101101
t.Run(test.name, func(t *testing.T) {
102-
scheduler := NewReqHeaderBasedScheduler(&fakeDataStore{pods: test.input})
103-
got, err := scheduler.Schedule(context.Background(), test.req)
102+
scheduler := NewReqHeaderBasedScheduler()
103+
got, err := scheduler.Schedule(context.Background(), test.req, types.ToSchedulerPodMetrics(test.input))
104104
if test.err != (err != nil) {
105105
t.Errorf("Unexpected error, got %v, want %v", err, test.err)
106106
}
@@ -111,15 +111,3 @@ func TestSchedule(t *testing.T) {
111111
})
112112
}
113113
}
114-
115-
type fakeDataStore struct {
116-
pods []*backendmetrics.FakePodMetrics
117-
}
118-
119-
func (fds *fakeDataStore) PodGetAll() []backendmetrics.PodMetrics {
120-
pm := make([]backendmetrics.PodMetrics, 0, len(fds.pods))
121-
for _, pod := range fds.pods {
122-
pm = append(pm, pod)
123-
}
124-
return pm
125-
}

pkg/epp/requestcontrol/director.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ func calculateRunningAverage(currentAvg float64, newValue float64, count int) fl
9595

9696
// Scheduler defines the interface required by the Director for scheduling.
9797
type Scheduler interface {
98-
Schedule(ctx context.Context, b *schedulingtypes.LLMRequest) (result *schedulingtypes.SchedulingResult, err error)
98+
Schedule(ctx context.Context, request *schedulingtypes.LLMRequest, candidatePods []schedulingtypes.Pod) (result *schedulingtypes.SchedulingResult, err error)
9999
}
100100

101101
// SaturationDetector provides a signal indicating whether the backends are considered saturated.
@@ -190,7 +190,11 @@ func (d *Director) HandleRequest(ctx context.Context, reqCtx *handlers.RequestCo
190190
}
191191

192192
// --- 3. Call Scheduler ---
193-
results, err := d.scheduler.Schedule(ctx, reqCtx.SchedulingRequest)
193+
// Snapshot pod metrics from the datastore to:
194+
// 1. Reduce concurrent access to the datastore.
195+
// 2. Ensure consistent data during the scheduling operation of a request between all scheduling cycles.
196+
candidatePods := schedulingtypes.ToSchedulerPodMetrics(d.datastore.PodGetAll())
197+
results, err := d.scheduler.Schedule(ctx, reqCtx.SchedulingRequest, candidatePods)
194198
if err != nil {
195199
return reqCtx, errutil.Error{Code: errutil.InferencePoolResourceExhausted, Msg: fmt.Errorf("failed to find target pod: %w", err).Error()}
196200
}

pkg/epp/requestcontrol/director_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ type mockScheduler struct {
6161
scheduleErr error
6262
}
6363

64-
func (m *mockScheduler) Schedule(ctx context.Context, req *schedulingtypes.LLMRequest) (*schedulingtypes.SchedulingResult, error) {
64+
func (m *mockScheduler) Schedule(_ context.Context, _ *schedulingtypes.LLMRequest, _ []schedulingtypes.Pod) (*schedulingtypes.SchedulingResult, error) {
6565
return m.scheduleResults, m.scheduleErr
6666
}
6767

pkg/epp/scheduling/framework/scheduler_profile.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -106,8 +106,8 @@ func (p *SchedulerProfile) AddPlugins(pluginObjects ...plugins.Plugin) error {
106106

107107
// RunCycle runs a SchedulerProfile cycle. In other words, it invokes all the SchedulerProfile plugins in this
108108
// order - Filters, Scorers, Picker, PostCyclePlugins. After completing all, it returns the result.
109-
func (p *SchedulerProfile) Run(ctx context.Context, request *types.LLMRequest, cycleState *types.CycleState, podsSnapshot []types.Pod) (*types.ProfileRunResult, error) {
110-
pods := p.runFilterPlugins(ctx, request, cycleState, podsSnapshot)
109+
func (p *SchedulerProfile) Run(ctx context.Context, request *types.LLMRequest, cycleState *types.CycleState, candidatePods []types.Pod) (*types.ProfileRunResult, error) {
110+
pods := p.runFilterPlugins(ctx, request, cycleState, candidatePods)
111111
if len(pods) == 0 {
112112
return nil, errutil.Error{Code: errutil.Internal, Msg: "no pods available for the given request"}
113113
}

pkg/epp/scheduling/scheduler.go

Lines changed: 5 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ type Datastore interface {
3939
}
4040

4141
// NewScheduler returns a new scheduler with default scheduler plugins configuration.
42-
func NewScheduler(datastore Datastore) *Scheduler {
42+
func NewScheduler() *Scheduler {
4343
// When the scheduler is initialized with NewScheduler function, thw below config will be used as default.
4444
// it's possible to call NewSchedulerWithConfig to pass a different scheduler config.
4545
// For build time plugins changes, it's recommended to call in main.go to NewSchedulerWithConfig.
@@ -75,26 +75,24 @@ func NewScheduler(datastore Datastore) *Scheduler {
7575

7676
profileHandler := profile.NewSingleProfileHandler()
7777

78-
return NewSchedulerWithConfig(datastore, NewSchedulerConfig(profileHandler, map[string]*framework.SchedulerProfile{"default": defaultProfile}))
78+
return NewSchedulerWithConfig(NewSchedulerConfig(profileHandler, map[string]*framework.SchedulerProfile{"default": defaultProfile}))
7979
}
8080

8181
// NewSchedulerWithConfig returns a new scheduler with the given scheduler plugins configuration.
82-
func NewSchedulerWithConfig(datastore Datastore, config *SchedulerConfig) *Scheduler {
82+
func NewSchedulerWithConfig(config *SchedulerConfig) *Scheduler {
8383
return &Scheduler{
84-
datastore: datastore,
8584
profileHandler: config.profileHandler,
8685
profiles: config.profiles,
8786
}
8887
}
8988

9089
type Scheduler struct {
91-
datastore Datastore
9290
profileHandler framework.ProfileHandler
9391
profiles map[string]*framework.SchedulerProfile
9492
}
9593

9694
// Schedule finds the target pod based on metrics and the requested lora adapter.
97-
func (s *Scheduler) Schedule(ctx context.Context, request *types.LLMRequest) (*types.SchedulingResult, error) {
95+
func (s *Scheduler) Schedule(ctx context.Context, request *types.LLMRequest, candidatePods []types.Pod) (*types.SchedulingResult, error) {
9896
logger := log.FromContext(ctx).WithValues("request", request)
9997
loggerDebug := logger.V(logutil.DEBUG)
10098

@@ -103,12 +101,6 @@ func (s *Scheduler) Schedule(ctx context.Context, request *types.LLMRequest) (*t
103101
metrics.RecordSchedulerE2ELatency(time.Since(scheduleStart))
104102
}()
105103

106-
// Snapshot pod metrics from the datastore to:
107-
// 1. Reduce concurrent access to the datastore.
108-
// 2. Ensure consistent data during the scheduling operation of a request between all scheduling cycles.
109-
podsSnapshot := types.ToSchedulerPodMetrics(s.datastore.PodGetAll())
110-
loggerDebug.Info(fmt.Sprintf("Scheduling a request, Metrics: %+v", podsSnapshot))
111-
112104
profileRunResults := map[string]*types.ProfileRunResult{}
113105
cycleState := types.NewCycleState()
114106

@@ -122,7 +114,7 @@ func (s *Scheduler) Schedule(ctx context.Context, request *types.LLMRequest) (*t
122114

123115
for name, profile := range profiles {
124116
// run the selected profiles and collect results (current code runs all profiles)
125-
profileRunResult, err := profile.Run(ctx, request, cycleState, podsSnapshot)
117+
profileRunResult, err := profile.Run(ctx, request, cycleState, candidatePods)
126118
if err != nil {
127119
loggerDebug.Info("failed to run scheduler profile", "profile", name, "error", err.Error())
128120
}

pkg/epp/scheduling/scheduler_test.go

Lines changed: 9 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -33,17 +33,17 @@ func TestSchedule(t *testing.T) {
3333
tests := []struct {
3434
name string
3535
req *types.LLMRequest
36-
input []*backendmetrics.FakePodMetrics
36+
input []backendmetrics.PodMetrics
3737
wantRes *types.SchedulingResult
3838
err bool
3939
}{
4040
{
41-
name: "no pods in datastore",
41+
name: "no candidate pods",
4242
req: &types.LLMRequest{
4343
TargetModel: "any-model",
4444
RequestId: uuid.NewString(),
4545
},
46-
input: []*backendmetrics.FakePodMetrics{},
46+
input: []backendmetrics.PodMetrics{},
4747
wantRes: nil,
4848
err: true,
4949
},
@@ -55,8 +55,8 @@ func TestSchedule(t *testing.T) {
5555
},
5656
// pod2 will be picked because it has relatively low queue size, with the requested
5757
// model being active, and has low KV cache.
58-
input: []*backendmetrics.FakePodMetrics{
59-
{
58+
input: []backendmetrics.PodMetrics{
59+
&backendmetrics.FakePodMetrics{
6060
Pod: &backend.Pod{NamespacedName: k8stypes.NamespacedName{Name: "pod1"}},
6161
Metrics: &backendmetrics.MetricsState{
6262
WaitingQueueSize: 0,
@@ -68,7 +68,7 @@ func TestSchedule(t *testing.T) {
6868
},
6969
},
7070
},
71-
{
71+
&backendmetrics.FakePodMetrics{
7272
Pod: &backend.Pod{NamespacedName: k8stypes.NamespacedName{Name: "pod2"}},
7373
Metrics: &backendmetrics.MetricsState{
7474
WaitingQueueSize: 3,
@@ -80,7 +80,7 @@ func TestSchedule(t *testing.T) {
8080
},
8181
},
8282
},
83-
{
83+
&backendmetrics.FakePodMetrics{
8484
Pod: &backend.Pod{NamespacedName: k8stypes.NamespacedName{Name: "pod3"}},
8585
Metrics: &backendmetrics.MetricsState{
8686
WaitingQueueSize: 10,
@@ -119,8 +119,8 @@ func TestSchedule(t *testing.T) {
119119

120120
for _, test := range tests {
121121
t.Run(test.name, func(t *testing.T) {
122-
scheduler := NewScheduler(&fakeDataStore{pods: test.input})
123-
got, err := scheduler.Schedule(context.Background(), test.req)
122+
scheduler := NewScheduler()
123+
got, err := scheduler.Schedule(context.Background(), test.req, types.ToSchedulerPodMetrics(test.input))
124124
if test.err != (err != nil) {
125125
t.Errorf("Unexpected error, got %v, want %v", err, test.err)
126126
}
@@ -131,15 +131,3 @@ func TestSchedule(t *testing.T) {
131131
})
132132
}
133133
}
134-
135-
type fakeDataStore struct {
136-
pods []*backendmetrics.FakePodMetrics
137-
}
138-
139-
func (fds *fakeDataStore) PodGetAll() []backendmetrics.PodMetrics {
140-
pm := make([]backendmetrics.PodMetrics, 0, len(fds.pods))
141-
for _, pod := range fds.pods {
142-
pm = append(pm, pod)
143-
}
144-
return pm
145-
}

0 commit comments

Comments
 (0)