Skip to content
119 changes: 88 additions & 31 deletions cmd/epp/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ import (
"sigs.k8s.io/gateway-api-inference-extension/pkg/common"
backendmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/config/loader"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datalayer"
dlmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datalayer/metrics"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datastore"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics/collectors"
Expand All @@ -59,10 +61,17 @@ import (
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/scorer"
testfilter "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/test/filter"
runserver "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/server"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/env"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging"
"sigs.k8s.io/gateway-api-inference-extension/version"
)

const (
// enableExperimentalDatalayerV2 defines the environment variable
// used as feature flag for the pluggable data layer.
enableExperimentalDatalayerV2 = "ENABLE_EXPERIMENTAL_DATALAYER_V2"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesnt have to be done in this PR, but I'm wondering if we extend:

To include a field for features to be enabled, we will soon have: Flow Control, SLO prediction, & now the pluggable data layer as experimental, opt-in features. So rather than having various env vars, we keep all feature gating in one place

Copy link
Contributor

@nirrozenbaum nirrozenbaum Aug 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the intention is to have everything configured through the config file eventually.
do you suggest adding a free form of key value pairs as variables (kinda similar to env vars) for the transitionary state when a feature is experimental?
I was proposing this change recently, which I think aligns with your intention - #1288

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe not everything but would be good to discuss the cutline. Def out of scope for this PR

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

right. maybe not everything.
I meant the extension points of those parts :)
anyway, the parameters section the was suggested in #1288 can be leveraged as an solution for experimental features, so instead of having env vars we can just configure a parameter - e.g.,

- name: enable-experimental-datalayer
  value: true

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I definitely agree there

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And we doc ^ features similar to K8s feature gates. We should also drop any "experimental" references and simply call them by feature name under featureGates. The associated docs will detail feature level, e.g., Alpha, and a short description of the feature. For example:

apiVersion: inference.networking.x-k8s.io/v1alpha1
kind: EndpointPickerConfig
featureGates:
  dataLayer: true # Defaults to false
  flowControl: true # Defaults to false until the feature graduates to beta
  ...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @shmuelk another useful use case for adding parameters section (key value pairs) to the config API.
to answer the above, we should have those parameters consumable not necessarily though the plugins but also by runner.go (as an example) to enable/disable features. generally speaking it could replace the usage of env vars (e.g., today we have env vars in saturation detector).

Copy link
Contributor

@shmuelk shmuelk Aug 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like @danehans suggestion of a featureGates section. From a config point of view it is simply a map[string]bool.

The code that consumes the feature gates should validate the set and apply the values.

Overloading a shared parameters with so-called well known parameter names is bad idea and will lead to conflicts and confusion.

)

var (
grpcPort = flag.Int(
"grpc-port",
Expand Down Expand Up @@ -245,40 +254,12 @@ func (r *Runner) Run(ctx context.Context) error {
}

// --- Setup Datastore ---
mapping, err := backendmetrics.NewMetricMapping(
*totalQueuedRequestsMetric,
*kvCacheUsagePercentageMetric,
*loraInfoMetric,
)
useDatalayerV2 := env.GetEnvBool(enableExperimentalDatalayerV2, false, setupLog)
epf, err := r.setupMetricsCollection(setupLog, useDatalayerV2)
if err != nil {
setupLog.Error(err, "Failed to create metric mapping from flags.")
return err
}
verifyMetricMapping(*mapping, setupLog)

var metricsHttpClient *http.Client
if *modelServerMetricsScheme == "https" {
metricsHttpClient = &http.Client{
Transport: &http.Transport{
TLSClientConfig: &tls.Config{
InsecureSkipVerify: *modelServerMetricsHttpsInsecureSkipVerify,
},
},
}
} else {
metricsHttpClient = http.DefaultClient
}

pmf := backendmetrics.NewPodMetricsFactory(&backendmetrics.PodMetricsClientImpl{
MetricMapping: mapping,
ModelServerMetricsPort: int32(*modelServerMetricsPort),
ModelServerMetricsPath: *modelServerMetricsPath,
ModelServerMetricsScheme: *modelServerMetricsScheme,
Client: metricsHttpClient,
},
*refreshMetricsInterval)

datastore := datastore.NewDatastore(ctx, pmf)
datastore := datastore.NewDatastore(ctx, epf)

// --- Setup Metrics Server ---
customCollectors := []prometheus.Collector{collectors.NewInferencePoolMetricsCollector(datastore)}
Expand Down Expand Up @@ -371,6 +352,7 @@ func (r *Runner) Run(ctx context.Context) error {
MetricsStalenessThreshold: *metricsStalenessThreshold,
Director: director,
SaturationDetector: saturationDetector,
UseExperimentalDatalayerV2: useDatalayerV2, // pluggable data layer feature flag
}
if err := serverRunner.SetupWithManager(ctx, mgr); err != nil {
setupLog.Error(err, "Failed to setup EPP controllers")
Expand Down Expand Up @@ -446,6 +428,81 @@ func (r *Runner) parsePluginsConfiguration(ctx context.Context) error {
return nil
}

func (r *Runner) setupMetricsCollection(setupLog logr.Logger, useExperimentalDatalayer bool) (datalayer.EndpointFactory, error) {
if useExperimentalDatalayer {
return setupDatalayer()
}

if len(datalayer.GetSources()) != 0 {
setupLog.Info("data sources registered but pluggable datalayer is disabled")
}
return setupMetricsV1(setupLog)
}

func setupMetricsV1(setupLog logr.Logger) (datalayer.EndpointFactory, error) {
mapping, err := backendmetrics.NewMetricMapping(
*totalQueuedRequestsMetric,
*kvCacheUsagePercentageMetric,
*loraInfoMetric,
)
if err != nil {
setupLog.Error(err, "Failed to create metric mapping from flags.")
return nil, err
}
verifyMetricMapping(*mapping, setupLog)

var metricsHttpClient *http.Client
if *modelServerMetricsScheme == "https" {
metricsHttpClient = &http.Client{
Transport: &http.Transport{
TLSClientConfig: &tls.Config{
InsecureSkipVerify: *modelServerMetricsHttpsInsecureSkipVerify,
},
},
}
} else {
metricsHttpClient = http.DefaultClient
}

pmf := backendmetrics.NewPodMetricsFactory(&backendmetrics.PodMetricsClientImpl{
MetricMapping: mapping,
ModelServerMetricsPort: int32(*modelServerMetricsPort),
ModelServerMetricsPath: *modelServerMetricsPath,
ModelServerMetricsScheme: *modelServerMetricsScheme,
Client: metricsHttpClient,
},
*refreshMetricsInterval)
return pmf, nil
}

func setupDatalayer() (datalayer.EndpointFactory, error) {
// create and register a metrics data source and extractor. In the future,
// data sources and extractors might be configured via a file. Once done,
// this (and registering the sources with the endpoint factory) should
// be moved accordingly.
source := dlmetrics.NewDataSource(*modelServerMetricsScheme,
int32(*modelServerMetricsPort), // start with (optional) command line port value
*modelServerMetricsPath,
*modelServerMetricsHttpsInsecureSkipVerify,
nil)
extractor, err := dlmetrics.NewExtractor(*totalQueuedRequestsMetric,
*kvCacheUsagePercentageMetric,
*loraInfoMetric)

if err != nil {
return nil, err
}
if err := source.AddExtractor(extractor); err != nil {
return nil, err
}
if err := datalayer.RegisterSource(source); err != nil {
return nil, err
}

factory := datalayer.NewEndpointFactory(datalayer.GetSources(), *refreshMetricsInterval)
return factory, nil
}

func initLogging(opts *zap.Options) {
// Unless -zap-log-level is explicitly set, use -v
useV := true
Expand Down
4 changes: 3 additions & 1 deletion pkg/epp/datalayer/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,17 +82,18 @@ func NewCollector() *Collector {
}

// Start initiates data source collection for the endpoint.
// TODO: pass PoolInfo for backward compatibility
func (c *Collector) Start(ctx context.Context, ticker Ticker, ep Endpoint, sources []DataSource) error {
var ready chan struct{}
started := false

c.startOnce.Do(func() {
logger := log.FromContext(ctx).WithValues("endpoint", ep.GetPod().GetIPAddress())
c.ctx, c.cancel = context.WithCancel(ctx)
started = true
ready = make(chan struct{})

go func(endpoint Endpoint, sources []DataSource) {
logger := log.FromContext(ctx).WithValues("endpoint", ep.GetPod().GetIPAddress())
logger.V(logging.DEFAULT).Info("starting collection")

defer func() {
Expand All @@ -107,6 +108,7 @@ func (c *Collector) Start(ctx context.Context, ticker Ticker, ep Endpoint, sourc
case <-c.ctx.Done(): // per endpoint context cancelled
return
case <-ticker.Channel():
// TODO: do not collect if there's no pool specified?
for _, src := range sources {
ctx, cancel := context.WithTimeout(c.ctx, defaultCollectionTimeout)
_ = src.Collect(ctx, endpoint) // TODO: track errors per collector?
Expand Down
17 changes: 15 additions & 2 deletions pkg/epp/datalayer/datasource.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,14 +92,27 @@ func (dsr *DataSourceRegistry) GetSources() []DataSource {

// --- default registry accessors ---

// RegisterSource adds a new data source to the default registry.
func RegisterSource(src DataSource) error {
return defaultDataSources.Register(src)
}

func GetNamedSource(name string) (DataSource, bool) {
return defaultDataSources.GetNamedSource(name)
// GetNamedSource returns a typed data source from the default registry.
func GetNamedSource[T DataSource](name string) (T, bool) {
v, ok := defaultDataSources.GetNamedSource(name)
if !ok {
var zero T
return zero, false
}
src, ok := v.(T)
if !ok {
var zero T
return zero, false
}
return src, true
}

// GetSources returns the list of data sources registered in the default registry.
func GetSources() []DataSource {
return defaultDataSources.GetSources()
}
Expand Down
75 changes: 75 additions & 0 deletions pkg/epp/datalayer/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,12 @@ package datalayer

import (
"context"
"sync"
"time"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/log"

v1 "sigs.k8s.io/gateway-api-inference-extension/api/v1"
)
Expand All @@ -44,3 +48,74 @@ type EndpointFactory interface {
NewEndpoint(parent context.Context, inpod *corev1.Pod, poolinfo PoolInfo) Endpoint
ReleaseEndpoint(ep Endpoint)
}

// EndpointLifecycle manages the life cycle (creation and termination) of
// endpoints.
type EndpointLifecycle struct {
sources []DataSource // data sources for collectors
collectors sync.Map // collectors map. key: Pod namespaced name, value: *Collector
refreshInterval time.Duration // metrics refresh interval
}

// NewEndpointFactory returns a new endpoint for factory, managing collectors for
// its endpoints. This function assumes that sources are not modified afterwards.
func NewEndpointFactory(sources []DataSource, refreshMetricsInterval time.Duration) *EndpointLifecycle {
return &EndpointLifecycle{
sources: sources,
collectors: sync.Map{},
refreshInterval: refreshMetricsInterval,
}
}

// NewEndpoint implements EndpointFactory.NewEndpoint.
// Creates a new endpoint and starts its associated collector with its own ticker.
// Guards against multiple concurrent calls for the same endpoint.
func (lc *EndpointLifecycle) NewEndpoint(parent context.Context, inpod *corev1.Pod, _ PoolInfo) Endpoint {
key := types.NamespacedName{Namespace: inpod.Namespace, Name: inpod.Name}
logger := log.FromContext(parent).WithValues("pod", key)

if _, ok := lc.collectors.Load(key); ok {
logger.Info("collector already running for endpoint", "endpoint", key)
return nil
}

endpoint := NewEndpoint()
endpoint.UpdatePod(inpod)
collector := NewCollector() // for full backward compatibility, set the logger and poolinfo

if _, loaded := lc.collectors.LoadOrStore(key, collector); loaded {
// another goroutine already created and stored a collector for this endpoint.
// No need to start the new collector.
logger.Info("collector already running for endpoint", "endpoint", key)
return nil
}

ticker := NewTimeTicker(lc.refreshInterval)
if err := collector.Start(parent, ticker, endpoint, lc.sources); err != nil {
logger.Error(err, "failed to start collector for endpoint", "endpoint", key)
lc.collectors.Delete(key)
}

return endpoint
}

// ReleaseEndpoint implements EndpointFactory.ReleaseEndpoint
// Stops the collector and cleans up resources for the endpoint
func (lc *EndpointLifecycle) ReleaseEndpoint(ep Endpoint) {
key := ep.GetPod().GetNamespacedName()

if value, ok := lc.collectors.LoadAndDelete(key); ok {
collector := value.(*Collector)
_ = collector.Stop()
}
}

// Shutdown gracefully stops all collectors and cleans up all resources.
func (lc *EndpointLifecycle) Shutdown() {
lc.collectors.Range(func(key, value any) bool {
collector := value.(*Collector)
_ = collector.Stop()
lc.collectors.Delete(key)
return true
})
}
27 changes: 18 additions & 9 deletions pkg/epp/datalayer/metrics/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,22 +33,31 @@ type Client interface {
Get(ctx context.Context, target *url.URL, ep datalayer.Addressable) (PrometheusMetricMap, error)
}

// -- package implementations --
const (
// the maximum idle connection count is shared by all endpoints. The value is
// set high to ensure the number of idle connection is above the expected
// endpoint count. Setting it too low would cause thrashing of the idle connection
// pool and incur higher overheads for every GET (e.g., socket initiation, certificate
// exchange, connections in timed wait state, etc.).
maxIdleConnections = 5000
maxIdleTime = 10 * time.Second
timeout = 10 * time.Second
maxIdleTime = 10 * time.Second // once a endpoint goes down, allow closing.
timeout = 10 * time.Second // mostly guard against unresponsive endpoints.
// allow some grace when connections are not made idle immediately (e.g., parsing
// and updating might take some time). This allows maintaining up to two idle connections
// per endpoint (defined as scheme://host:port).
maxIdleConnsPerHost = 2
)

var (
baseTransport = &http.Transport{
MaxIdleConns: maxIdleConnections,
MaxIdleConnsPerHost: maxIdleConnsPerHost,
// TODO: set additional timeouts, transport options, etc.
}
defaultClient = &client{
Client: http.Client{
Timeout: timeout,
Transport: &http.Transport{
MaxIdleConns: maxIdleConnections,
MaxIdleConnsPerHost: 4, // host is defined as scheme://host:port
},
// TODO: set additional timeouts, transport options, etc.
Timeout: timeout,
Transport: baseTransport,
},
}
)
Expand Down
Loading