Skip to content

Commit 1895f0b

Browse files
kaushikmitrBenjaminBraunDev
authored andcommitted
seperate servers for training and prediction
Add APIs for the instantiated plugins to the EPP Handle (kubernetes-sigs#1039) * Added plugin instance APIs to plugins.Handle Signed-off-by: Shmuel Kallner <kallner@il.ibm.com> * An implementation of the new plugins.Handle APIs Signed-off-by: Shmuel Kallner <kallner@il.ibm.com> * Moved all configuration loading code to new package Signed-off-by: Shmuel Kallner <kallner@il.ibm.com> * Updates due to new and moved APIs Signed-off-by: Shmuel Kallner <kallner@il.ibm.com> * Cleanup of old configuration loading code Signed-off-by: Shmuel Kallner <kallner@il.ibm.com> --------- Signed-off-by: Shmuel Kallner <kallner@il.ibm.com> chore(deps): bump the kubernetes group with 6 updates (kubernetes-sigs#1050) Bumps the kubernetes group with 6 updates: | Package | From | To | | --- | --- | --- | | [k8s.io/api](https://github.com/kubernetes/api) | `0.33.1` | `0.33.2` | | [k8s.io/apiextensions-apiserver](https://github.com/kubernetes/apiextensions-apiserver) | `0.33.1` | `0.33.2` | | [k8s.io/apimachinery](https://github.com/kubernetes/apimachinery) | `0.33.1` | `0.33.2` | | [k8s.io/client-go](https://github.com/kubernetes/client-go) | `0.33.1` | `0.33.2` | | [k8s.io/code-generator](https://github.com/kubernetes/code-generator) | `0.33.1` | `0.33.2` | | [k8s.io/component-base](https://github.com/kubernetes/component-base) | `0.33.1` | `0.33.2` | Updates `k8s.io/api` from 0.33.1 to 0.33.2 - [Commits](kubernetes/api@v0.33.1...v0.33.2) Updates `k8s.io/apiextensions-apiserver` from 0.33.1 to 0.33.2 - [Release notes](https://github.com/kubernetes/apiextensions-apiserver/releases) - [Commits](kubernetes/apiextensions-apiserver@v0.33.1...v0.33.2) Updates `k8s.io/apimachinery` from 0.33.1 to 0.33.2 - [Commits](kubernetes/apimachinery@v0.33.1...v0.33.2) Updates `k8s.io/client-go` from 0.33.1 to 0.33.2 - [Changelog](https://github.com/kubernetes/client-go/blob/master/CHANGELOG.md) - [Commits](kubernetes/client-go@v0.33.1...v0.33.2) Updates `k8s.io/code-generator` from 0.33.1 to 0.33.2 - [Commits](kubernetes/code-generator@v0.33.1...v0.33.2) Updates `k8s.io/component-base` from 0.33.1 to 0.33.2 - [Commits](kubernetes/component-base@v0.33.1...v0.33.2) --- updated-dependencies: - dependency-name: k8s.io/api dependency-version: 0.33.2 dependency-type: direct:production update-type: version-update:semver-patch dependency-group: kubernetes - dependency-name: k8s.io/apiextensions-apiserver dependency-version: 0.33.2 dependency-type: direct:production update-type: version-update:semver-patch dependency-group: kubernetes - dependency-name: k8s.io/apimachinery dependency-version: 0.33.2 dependency-type: direct:production update-type: version-update:semver-patch dependency-group: kubernetes - dependency-name: k8s.io/client-go dependency-version: 0.33.2 dependency-type: direct:production update-type: version-update:semver-patch dependency-group: kubernetes - dependency-name: k8s.io/code-generator dependency-version: 0.33.2 dependency-type: direct:production update-type: version-update:semver-patch dependency-group: kubernetes - dependency-name: k8s.io/component-base dependency-version: 0.33.2 dependency-type: direct:production update-type: version-update:semver-patch dependency-group: kubernetes ... Signed-off-by: dependabot[bot] <support@github.com> Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> remove datastore dependency from the scheduler (kubernetes-sigs#1049) * remove datastore dependency from the scheduler Signed-off-by: Nir Rozenbaum <nirro@il.ibm.com> * added back comments on snapshotting pods from datastore before calling schedule Signed-off-by: Nir Rozenbaum <nirro@il.ibm.com> * removed fake datastore from conformance scheduler test Signed-off-by: Nir Rozenbaum <nirro@il.ibm.com> --------- Signed-off-by: Nir Rozenbaum <nirro@il.ibm.com> Add subsetting logic for epp (kubernetes-sigs#981) feat: Added a factory function for the DecisionTree filter (kubernetes-sigs#1053) * Added a factory function for the DecisionTreeFilter Signed-off-by: Shmuel Kallner <kallner@il.ibm.com> * Added tests of the factory function of the DecisionTreeFilter Signed-off-by: Shmuel Kallner <kallner@il.ibm.com> * Registered the factory function of the DecisionTreeFilter Signed-off-by: Shmuel Kallner <kallner@il.ibm.com> * Refactored the configuration loading Signed-off-by: Shmuel Kallner <kallner@il.ibm.com> --------- Signed-off-by: Shmuel Kallner <kallner@il.ibm.com> Adding pprof endpoints to metrics port (kubernetes-sigs#1069) feat: Add a context.Context to the plugins.HAndle interface (kubernetes-sigs#1076) * Added a context.Context to the plugins.Handle interface Signed-off-by: Shmuel Kallner <kallner@il.ibm.com> * Changes due to changes in internal APIs Signed-off-by: Shmuel Kallner <kallner@il.ibm.com> * Changes to tests due to changes in internal APIs Signed-off-by: Shmuel Kallner <kallner@il.ibm.com> --------- Signed-off-by: Shmuel Kallner <kallner@il.ibm.com> convert subset filter from a plugin to logic in director (kubernetes-sigs#1088) * convert subset filter from a plugin to logic in director Signed-off-by: Nir Rozenbaum <nirro@il.ibm.com> * replace interface{} with any Signed-off-by: Nir Rozenbaum <nirro@il.ibm.com> * make linter happy Signed-off-by: Nir Rozenbaum <nirro@il.ibm.com> * address code review comments Signed-off-by: Nir Rozenbaum <nirro@il.ibm.com> --------- Signed-off-by: Nir Rozenbaum <nirro@il.ibm.com> chore(deps): bump golang.org/x/sync from 0.14.0 to 0.15.0 (kubernetes-sigs#1096) Bumps [golang.org/x/sync](https://github.com/golang/sync) from 0.14.0 to 0.15.0. - [Commits](golang/sync@v0.14.0...v0.15.0) --- updated-dependencies: - dependency-name: golang.org/x/sync dependency-version: 0.15.0 dependency-type: direct:production update-type: version-update:semver-minor ... Signed-off-by: dependabot[bot] <support@github.com> Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> Introduce plugins.TypedName to be used for Plugin base implementation (kubernetes-sigs#1086) * introduce TypedName to reduce boilerplate, modify plugins Signed-off-by: Etai Lev Ran <elevran@gmail.com> * implement GetTypedName() Signed-off-by: Etai Lev Ran <elevran@gmail.com> * Remove Type() and Name() from Plugin interface Signed-off-by: Etai Lev Ran <elevran@gmail.com> * use TypedName as private field, not embedded Signed-off-by: Etai Lev Ran <elevran@gmail.com> --------- Signed-off-by: Etai Lev Ran <elevran@gmail.com> move the conversion from pod metrics to scheduler pod representation one level up (kubernetes-sigs#1104) * move the converstion from pod metrics to scheduler pod representation one level up Signed-off-by: Nir Rozenbaum <nirro@il.ibm.com> * minor change in helper func Signed-off-by: Nir Rozenbaum <nirro@il.ibm.com> --------- Signed-off-by: Nir Rozenbaum <nirro@il.ibm.com> handle picking multiple destinations in scheduling layer (kubernetes-sigs#1059) * implement multiple destination as the output of the scheduler Signed-off-by: Nir Rozenbaum <nirro@il.ibm.com> * updated max score picker unit tests to cover multiple pods Signed-off-by: Nir Rozenbaum <nirro@il.ibm.com> * imports Signed-off-by: Nir Rozenbaum <nirro@il.ibm.com> * unit-test fix Signed-off-by: Nir Rozenbaum <nirro@il.ibm.com> --------- Signed-off-by: Nir Rozenbaum <nirro@il.ibm.com> refactor: 🔨 use the more explicit singular form (kubernetes-sigs#1129)
1 parent f32d873 commit 1895f0b

38 files changed

+6758
-187
lines changed

cmd/epp/runner/register.go

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
/*
2+
Copyright 2025 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package runner
18+
19+
import (
20+
"context"
21+
22+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/plugins"
23+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/filter"
24+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/multi/prefix"
25+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/picker"
26+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/profile"
27+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/scorer"
28+
)
29+
30+
// RegisterAllPlugins registers the factory functions of all known plugins
31+
func RegisterAllPlugins() {
32+
plugins.Register(filter.DecisionTreeFilterType, filter.DecisionTreeFilterFactory)
33+
plugins.Register(filter.LeastKVCacheFilterType, filter.LeastKVCacheFilterFactory)
34+
plugins.Register(filter.LeastQueueFilterType, filter.LeastQueueFilterFactory)
35+
plugins.Register(filter.LoraAffinityFilterType, filter.LoraAffinityFilterFactory)
36+
plugins.Register(filter.LowQueueFilterType, filter.LowQueueFilterFactory)
37+
plugins.Register(prefix.PrefixCachePluginType, prefix.PrefixCachePluginFactory)
38+
plugins.Register(picker.MaxScorePickerType, picker.MaxScorePickerFactory)
39+
plugins.Register(picker.RandomPickerType, picker.RandomPickerFactory)
40+
plugins.Register(profile.SingleProfileHandlerType, profile.SingleProfileHandlerFactory)
41+
plugins.Register(scorer.KvCacheScorerType, scorer.KvCacheScorerFactory)
42+
plugins.Register(scorer.QueueScorerType, scorer.QueueScorerFactory)
43+
}
44+
45+
// eppHandle is an implementation of the interface plugins.Handle
46+
type eppHandle struct {
47+
ctx context.Context
48+
plugins plugins.HandlePlugins
49+
}
50+
51+
// Context returns a context the plugins can use, if they need one
52+
func (h *eppHandle) Context() context.Context {
53+
return h.ctx
54+
}
55+
56+
// Plugins returns the sub-handle for working with instantiated plugins
57+
func (h *eppHandle) Plugins() plugins.HandlePlugins {
58+
return h.plugins
59+
}
60+
61+
// eppHandlePlugins implements the set of APIs to work with instantiated plugins
62+
type eppHandlePlugins struct {
63+
thePlugins map[string]plugins.Plugin
64+
}
65+
66+
// Plugin returns the named plugin instance
67+
func (h *eppHandlePlugins) Plugin(name string) plugins.Plugin {
68+
return h.thePlugins[name]
69+
}
70+
71+
// AddPlugin adds a plugin to the set of known plugin instances
72+
func (h *eppHandlePlugins) AddPlugin(name string, plugin plugins.Plugin) {
73+
h.thePlugins[name] = plugin
74+
}
75+
76+
// GetAllPlugins returns all of the known plugins
77+
func (h *eppHandlePlugins) GetAllPlugins() []plugins.Plugin {
78+
result := make([]plugins.Plugin, 0)
79+
for _, plugin := range h.thePlugins {
80+
result = append(result, plugin)
81+
}
82+
return result
83+
}
84+
85+
// GetAllPluginsWithNames returns al of the known plugins with their names
86+
func (h *eppHandlePlugins) GetAllPluginsWithNames() map[string]plugins.Plugin {
87+
return h.thePlugins
88+
}
89+
90+
func newEppHandle(ctx context.Context) *eppHandle {
91+
return &eppHandle{
92+
ctx: ctx,
93+
plugins: &eppHandlePlugins{
94+
thePlugins: map[string]plugins.Plugin{},
95+
},
96+
}
97+
}

cmd/epp/runner/runner.go

Lines changed: 73 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ import (
4646
"sigs.k8s.io/gateway-api-inference-extension/internal/runnable"
4747
"sigs.k8s.io/gateway-api-inference-extension/pkg/common"
4848
backendmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics"
49-
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/config/loader"
49+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/common/config/loader"
5050
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datalayer"
5151
dlmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datalayer/metrics"
5252
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datastore"
@@ -215,6 +215,11 @@ func (r *Runner) Run(ctx context.Context) error {
215215
setupLog.Error(err, "Failed to create controller manager")
216216
return err
217217
}
218+
err = setupPprofHandlers(mgr)
219+
if err != nil {
220+
setupLog.Error(err, "Failed to setup pprof handlers")
221+
return err
222+
}
218223

219224
// ===================================================================
220225
// == Latency Predictor Integration
@@ -260,37 +265,40 @@ func (r *Runner) Run(ctx context.Context) error {
260265
runtime.SetBlockProfileRate(1)
261266
}
262267

268+
// START DIFF
269+
// below is what was incomming
270+
err = r.parseConfiguration(ctx)
271+
if err != nil {
272+
setupLog.Error(err, "Failed to parse the configuration")
273+
return err
274+
}
275+
276+
// below is what was current
263277
if len(*configText) != 0 || len(*configFile) != 0 {
264-
theConfig, err := config.LoadConfig([]byte(*configText), *configFile)
278+
theConfig, err := loader.LoadConfig([]byte(*configText), *configFile)
265279
if err != nil {
266280
setupLog.Error(err, "Failed to load the configuration")
267281
return err
268282
}
269283

270-
epp := eppHandle{}
271-
instantiatedPlugins, err := config.LoadPluginReferences(theConfig.Plugins, epp)
284+
epp := newEppHandle()
285+
286+
err = loader.LoadPluginReferences(theConfig.Plugins, epp)
272287
if err != nil {
273288
setupLog.Error(err, "Failed to instantiate the plugins")
274289
return err
275290
}
276-
}
277291

278-
r.schedulerConfig, err = scheduling.LoadSchedulerConfig(theConfig.SchedulingProfiles, instantiatedPlugins)
279-
if err != nil {
280-
setupLog.Error(err, "Failed to create Scheduler configuration")
281-
return err
282-
}
283-
284-
err = r.parsePluginsConfiguration(ctx)
285-
if err != nil {
286-
setupLog.Error(err, "Failed to parse plugins configuration")
287-
return err
288-
}
292+
r.schedulerConfig, err = loader.LoadSchedulerConfig(theConfig.SchedulingProfiles, epp)
293+
if err != nil {
294+
setupLog.Error(err, "Failed to create Scheduler configuration")
295+
return err
296+
}
289297

290-
// Add requestcontrol plugins
291-
if instantiatedPlugins != nil {
292-
r.requestControlConfig = requestcontrol.LoadRequestControlConfig(instantiatedPlugins)
298+
// Add requestControl plugins
299+
r.requestControlConfig.AddPlugins(epp.Plugins().GetAllPlugins()...)
293300
}
301+
// END DIFF
294302

295303
// --- Initialize Core EPP Components ---
296304
if r.schedulerConfig == nil {
@@ -474,6 +482,31 @@ func setupDatalayer() (datalayer.EndpointFactory, error) {
474482
return factory, nil
475483
}
476484

485+
func (r *Runner) parseConfiguration(ctx context.Context) error {
486+
if len(*configText) != 0 || len(*configFile) != 0 {
487+
theConfig, err := loader.LoadConfig([]byte(*configText), *configFile)
488+
if err != nil {
489+
return fmt.Errorf("failed to load the configuration - %w", err)
490+
}
491+
492+
epp := newEppHandle(ctx)
493+
494+
err = loader.LoadPluginReferences(theConfig.Plugins, epp)
495+
if err != nil {
496+
return fmt.Errorf("failed to instantiate the plugins - %w", err)
497+
}
498+
499+
r.schedulerConfig, err = loader.LoadSchedulerConfig(theConfig.SchedulingProfiles, epp)
500+
if err != nil {
501+
return fmt.Errorf("failed to create Scheduler configuration - %w", err)
502+
}
503+
504+
// Add requestControl plugins
505+
r.requestControlConfig.AddPlugins(epp.Plugins().GetAllPlugins()...)
506+
}
507+
return nil
508+
}
509+
477510
func initLogging(opts *zap.Options) {
478511
// Unless -zap-log-level is explicitly set, use -v
479512
useV := true
@@ -587,3 +620,24 @@ func (p *predictorRunnable) Start(ctx context.Context) error {
587620
p.predictor.Stop()
588621
return nil
589622
}
623+
624+
// setupPprofHandlers only implements the pre-defined profiles:
625+
// https://cs.opensource.google/go/go/+/refs/tags/go1.24.4:src/runtime/pprof/pprof.go;l=108
626+
func setupPprofHandlers(mgr ctrl.Manager) error {
627+
var err error
628+
profiles := []string{
629+
"heap",
630+
"goroutine",
631+
"allocs",
632+
"threadcreate",
633+
"block",
634+
"mutex",
635+
}
636+
for _, p := range profiles {
637+
err = mgr.AddMetricsServerExtraHandler("/debug/pprof/"+p, pprof.Handler(p))
638+
if err != nil {
639+
return err
640+
}
641+
}
642+
return nil
643+
}

0 commit comments

Comments
 (0)