Skip to content

Commit 92f05d7

Browse files
shmuelkBenjaminBraunDev
authored andcommitted
Add APIs for the instantiated plugins to the EPP Handle (kubernetes-sigs#1039)
* Added plugin instance APIs to plugins.Handle Signed-off-by: Shmuel Kallner <kallner@il.ibm.com> * An implementation of the new plugins.Handle APIs Signed-off-by: Shmuel Kallner <kallner@il.ibm.com> * Moved all configuration loading code to new package Signed-off-by: Shmuel Kallner <kallner@il.ibm.com> * Updates due to new and moved APIs Signed-off-by: Shmuel Kallner <kallner@il.ibm.com> * Cleanup of old configuration loading code Signed-off-by: Shmuel Kallner <kallner@il.ibm.com> --------- Signed-off-by: Shmuel Kallner <kallner@il.ibm.com>
1 parent d889ab4 commit 92f05d7

File tree

24 files changed

+1780
-79
lines changed

24 files changed

+1780
-79
lines changed

cmd/epp/runner/register.go

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
/*
2+
Copyright 2025 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package runner
18+
19+
import (
20+
"context"
21+
22+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/plugins"
23+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/filter"
24+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/multi/prefix"
25+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/picker"
26+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/profile"
27+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/scorer"
28+
)
29+
30+
// RegisterAllPlugins registers the factory functions of all known plugins
31+
func RegisterAllPlugins() {
32+
plugins.Register(filter.DecisionTreeFilterType, filter.DecisionTreeFilterFactory)
33+
plugins.Register(filter.LeastKVCacheFilterType, filter.LeastKVCacheFilterFactory)
34+
plugins.Register(filter.LeastQueueFilterType, filter.LeastQueueFilterFactory)
35+
plugins.Register(filter.LoraAffinityFilterType, filter.LoraAffinityFilterFactory)
36+
plugins.Register(filter.LowQueueFilterType, filter.LowQueueFilterFactory)
37+
plugins.Register(prefix.PrefixCachePluginType, prefix.PrefixCachePluginFactory)
38+
plugins.Register(picker.MaxScorePickerType, picker.MaxScorePickerFactory)
39+
plugins.Register(picker.RandomPickerType, picker.RandomPickerFactory)
40+
plugins.Register(profile.SingleProfileHandlerType, profile.SingleProfileHandlerFactory)
41+
plugins.Register(scorer.KvCacheScorerType, scorer.KvCacheScorerFactory)
42+
plugins.Register(scorer.QueueScorerType, scorer.QueueScorerFactory)
43+
}
44+
45+
// eppHandle is an implementation of the interface plugins.Handle
46+
type eppHandle struct {
47+
ctx context.Context
48+
plugins plugins.HandlePlugins
49+
}
50+
51+
// Context returns a context the plugins can use, if they need one
52+
func (h *eppHandle) Context() context.Context {
53+
return h.ctx
54+
}
55+
56+
// Plugins returns the sub-handle for working with instantiated plugins
57+
func (h *eppHandle) Plugins() plugins.HandlePlugins {
58+
return h.plugins
59+
}
60+
61+
// eppHandlePlugins implements the set of APIs to work with instantiated plugins
62+
type eppHandlePlugins struct {
63+
thePlugins map[string]plugins.Plugin
64+
}
65+
66+
// Plugin returns the named plugin instance
67+
func (h *eppHandlePlugins) Plugin(name string) plugins.Plugin {
68+
return h.thePlugins[name]
69+
}
70+
71+
// AddPlugin adds a plugin to the set of known plugin instances
72+
func (h *eppHandlePlugins) AddPlugin(name string, plugin plugins.Plugin) {
73+
h.thePlugins[name] = plugin
74+
}
75+
76+
// GetAllPlugins returns all of the known plugins
77+
func (h *eppHandlePlugins) GetAllPlugins() []plugins.Plugin {
78+
result := make([]plugins.Plugin, 0)
79+
for _, plugin := range h.thePlugins {
80+
result = append(result, plugin)
81+
}
82+
return result
83+
}
84+
85+
// GetAllPluginsWithNames returns al of the known plugins with their names
86+
func (h *eppHandlePlugins) GetAllPluginsWithNames() map[string]plugins.Plugin {
87+
return h.thePlugins
88+
}
89+
90+
func newEppHandle(ctx context.Context) *eppHandle {
91+
return &eppHandle{
92+
ctx: ctx,
93+
plugins: &eppHandlePlugins{
94+
thePlugins: map[string]plugins.Plugin{},
95+
},
96+
}
97+
}

cmd/epp/runner/runner.go

Lines changed: 92 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ import (
4545
"sigs.k8s.io/gateway-api-inference-extension/internal/runnable"
4646
"sigs.k8s.io/gateway-api-inference-extension/pkg/common"
4747
backendmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics"
48-
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/config/loader"
48+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/common/config/loader"
4949
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datastore"
5050

5151
// Import the latency predictor package
@@ -325,6 +325,11 @@ func (r *Runner) Run(ctx context.Context) error {
325325
setupLog.Error(err, "Failed to create controller manager")
326326
return err
327327
}
328+
err = setupPprofHandlers(mgr)
329+
if err != nil {
330+
setupLog.Error(err, "Failed to setup pprof handlers")
331+
return err
332+
}
328333

329334
// ===================================================================
330335
// == Latency Predictor Integration
@@ -368,37 +373,40 @@ func (r *Runner) Run(ctx context.Context) error {
368373
}
369374
}
370375

376+
// START DIFF
377+
// below is what was incomming
378+
err = r.parseConfiguration(ctx)
379+
if err != nil {
380+
setupLog.Error(err, "Failed to parse the configuration")
381+
return err
382+
}
383+
384+
// below is what was current
371385
if len(*configText) != 0 || len(*configFile) != 0 {
372-
theConfig, err := config.LoadConfig([]byte(*configText), *configFile)
386+
theConfig, err := loader.LoadConfig([]byte(*configText), *configFile)
373387
if err != nil {
374388
setupLog.Error(err, "Failed to load the configuration")
375389
return err
376390
}
377391

378-
epp := eppHandle{}
379-
instantiatedPlugins, err := config.LoadPluginReferences(theConfig.Plugins, epp)
392+
epp := newEppHandle()
393+
394+
err = loader.LoadPluginReferences(theConfig.Plugins, epp)
380395
if err != nil {
381396
setupLog.Error(err, "Failed to instantiate the plugins")
382397
return err
383398
}
384-
}
385399

386-
r.schedulerConfig, err = scheduling.LoadSchedulerConfig(theConfig.SchedulingProfiles, instantiatedPlugins)
387-
if err != nil {
388-
setupLog.Error(err, "Failed to create Scheduler configuration")
389-
return err
390-
}
391-
392-
err = r.parsePluginsConfiguration(ctx)
393-
if err != nil {
394-
setupLog.Error(err, "Failed to parse plugins configuration")
395-
return err
396-
}
400+
r.schedulerConfig, err = loader.LoadSchedulerConfig(theConfig.SchedulingProfiles, epp)
401+
if err != nil {
402+
setupLog.Error(err, "Failed to create Scheduler configuration")
403+
return err
404+
}
397405

398-
// Add requestcontrol plugins
399-
if instantiatedPlugins != nil {
400-
r.requestControlConfig = requestcontrol.LoadRequestControlConfig(instantiatedPlugins)
406+
// Add requestControl plugins
407+
r.requestControlConfig.AddPlugins(epp.Plugins().GetAllPlugins()...)
401408
}
409+
// END DIFF
402410

403411
// --- Initialize Core EPP Components ---
404412
if r.schedulerConfig == nil {
@@ -418,18 +426,20 @@ func (r *Runner) Run(ctx context.Context) error {
418426

419427
// --- Setup ExtProc Server Runner ---
420428
serverRunner := &runserver.ExtProcServerRunner{
421-
GrpcPort: *grpcPort,
422-
PoolNamespacedName: poolNamespacedName,
423-
PoolGKNN: poolGKNN,
424-
Datastore: datastore,
425-
SecureServing: *secureServing,
426-
HealthChecking: *healthChecking,
427-
CertPath: *certPath,
428-
RefreshPrometheusMetricsInterval: *refreshPrometheusMetricsInterval,
429-
MetricsStalenessThreshold: *metricsStalenessThreshold,
430-
Director: director,
431-
SaturationDetector: saturationDetector,
432-
LatencyPredictor: predictor,
429+
GrpcPort: *grpcPort,
430+
PoolNamespacedName: poolNamespacedName,
431+
PoolGKNN: poolGKNN,
432+
Datastore: datastore,
433+
SecureServing: *secureServing,
434+
HealthChecking: *healthChecking,
435+
CertPath: *certPath,
436+
RefreshPrometheusMetricsInterval: *refreshPrometheusMetricsInterval,
437+
MetricsStalenessThreshold: *metricsStalenessThreshold,
438+
Director: director,
439+
SaturationDetector: saturationDetector,
440+
LatencyPredictor: predictor,
441+
DestinationEndpointHintMetadataNamespace: *destinationEndpointHintMetadataNamespace,
442+
DestinationEndpointHintKey: *destinationEndpointHintKey,
433443
}
434444
if err := serverRunner.SetupWithManager(ctx, mgr); err != nil {
435445
setupLog.Error(err, "Failed to setup EPP controllers")
@@ -507,19 +517,19 @@ func (r *Runner) parsePluginsConfiguration(ctx context.Context) error {
507517

508518
func (r *Runner) initializeScheduler(datastore datastore.Datastore, predictor *latencypredictor.Predictor) (*scheduling.Scheduler, error) {
509519
if r.schedulerConfig != nil {
510-
return scheduling.NewSchedulerWithConfig(datastore, r.schedulerConfig), nil
520+
return scheduling.NewSchedulerWithConfig(r.schedulerConfig), nil
511521
}
512522

513523
// otherwise, no one configured from outside scheduler config. use existing configuration
514-
scheduler := scheduling.NewScheduler(datastore)
524+
scheduler := scheduling.NewScheduler()
515525
if schedulerV2 {
516526
queueScorerWeight := envutil.GetEnvInt("QUEUE_SCORE_WEIGHT", scorer.DefaultQueueScorerWeight, setupLog)
517527
kvCacheScorerWeight := envutil.GetEnvInt("KV_CACHE_SCORE_WEIGHT", scorer.DefaultKVCacheScorerWeight, setupLog)
518528

519529
schedulerProfile := framework.NewSchedulerProfile().
520530
WithScorers(framework.NewWeightedScorer(scorer.NewQueueScorer(), queueScorerWeight),
521531
framework.NewWeightedScorer(scorer.NewKVCacheScorer(), kvCacheScorerWeight)).
522-
WithPicker(picker.NewMaxScorePicker())
532+
WithPicker(picker.NewMaxScorePicker(picker.DefaultMaxNumOfEndpoints))
523533

524534
if prefixCacheScheduling {
525535
prefixScorerWeight := envutil.GetEnvInt("PREFIX_CACHE_SCORE_WEIGHT", prefix.DefaultScorerWeight, setupLog)
@@ -529,16 +539,41 @@ func (r *Runner) initializeScheduler(datastore datastore.Datastore, predictor *l
529539
}
530540

531541
schedulerConfig := scheduling.NewSchedulerConfig(profile.NewSingleProfileHandler(), map[string]*framework.SchedulerProfile{"schedulerv2": schedulerProfile})
532-
scheduler = scheduling.NewSchedulerWithConfig(datastore, schedulerConfig)
542+
scheduler = scheduling.NewSchedulerWithConfig(schedulerConfig)
533543
}
534544

535545
if reqHeaderBasedSchedulerForTesting {
536-
scheduler = conformance_epp.NewReqHeaderBasedScheduler(datastore)
546+
scheduler = conformance_epp.NewReqHeaderBasedScheduler()
537547
}
538548

539549
return scheduler, nil
540550
}
541551

552+
func (r *Runner) parseConfiguration(ctx context.Context) error {
553+
if len(*configText) != 0 || len(*configFile) != 0 {
554+
theConfig, err := loader.LoadConfig([]byte(*configText), *configFile)
555+
if err != nil {
556+
return fmt.Errorf("failed to load the configuration - %w", err)
557+
}
558+
559+
epp := newEppHandle(ctx)
560+
561+
err = loader.LoadPluginReferences(theConfig.Plugins, epp)
562+
if err != nil {
563+
return fmt.Errorf("failed to instantiate the plugins - %w", err)
564+
}
565+
566+
r.schedulerConfig, err = loader.LoadSchedulerConfig(theConfig.SchedulingProfiles, epp)
567+
if err != nil {
568+
return fmt.Errorf("failed to create Scheduler configuration - %w", err)
569+
}
570+
571+
// Add requestControl plugins
572+
r.requestControlConfig.AddPlugins(epp.Plugins().GetAllPlugins()...)
573+
}
574+
return nil
575+
}
576+
542577
func initLogging(opts *zap.Options) {
543578
// Unless -zap-log-level is explicitly set, use -v
544579
useV := true
@@ -652,3 +687,24 @@ func (p *predictorRunnable) Start(ctx context.Context) error {
652687
p.predictor.Stop()
653688
return nil
654689
}
690+
691+
// setupPprofHandlers only implements the pre-defined profiles:
692+
// https://cs.opensource.google/go/go/+/refs/tags/go1.24.4:src/runtime/pprof/pprof.go;l=108
693+
func setupPprofHandlers(mgr ctrl.Manager) error {
694+
var err error
695+
profiles := []string{
696+
"heap",
697+
"goroutine",
698+
"allocs",
699+
"threadcreate",
700+
"block",
701+
"mutex",
702+
}
703+
for _, p := range profiles {
704+
err = mgr.AddMetricsServerExtraHandler("/debug/pprof/"+p, pprof.Handler(p))
705+
if err != nil {
706+
return err
707+
}
708+
}
709+
return nil
710+
}
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/*
2+
Copyright 2025 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package scheduling
18+
19+
import (
20+
"sigs.k8s.io/gateway-api-inference-extension/conformance/testing-epp/plugins/filter"
21+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling"
22+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework"
23+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/picker"
24+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/profile"
25+
)
26+
27+
// NewReqHeaderBasedScheduler creates a scheduler for conformance tests that selects
28+
// an endpoint based on the "test-epp-endpoint-selection" request header. If the
29+
// header is missing or the specified endpoint doesn't exist, no endpoint is returned.
30+
func NewReqHeaderBasedScheduler() *scheduling.Scheduler {
31+
predicatableSchedulerProfile := framework.NewSchedulerProfile().
32+
WithFilters(filter.NewHeaderBasedTestingFilter()).
33+
WithPicker(picker.NewMaxScorePicker(picker.DefaultMaxNumOfEndpoints))
34+
35+
return scheduling.NewSchedulerWithConfig(scheduling.NewSchedulerConfig(
36+
profile.NewSingleProfileHandler(), map[string]*framework.SchedulerProfile{"req-header-based-profile": predicatableSchedulerProfile}))
37+
}

0 commit comments

Comments
 (0)