Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 47 additions & 9 deletions cmd/epp/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ import (
"flag"
"fmt"
"os"
"time"

commonconfig "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/common/config"

"github.com/go-logr/logr"
"github.com/prometheus/client_golang/prometheus"
Expand All @@ -35,7 +38,6 @@ import (
"sigs.k8s.io/controller-runtime/pkg/metrics/filters"
metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"

conformance_epp "sigs.k8s.io/gateway-api-inference-extension/conformance/testing-epp"
"sigs.k8s.io/gateway-api-inference-extension/internal/runnable"
backendmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datastore"
Expand Down Expand Up @@ -112,11 +114,51 @@ var (
setupLog = ctrl.Log.WithName("setup")

// Environment variables
schedulerV2 = envutil.GetEnvBool("EXPERIMENTAL_USE_SCHEDULER_V2", false, setupLog)
prefixCacheScheduling = envutil.GetEnvBool("ENABLE_PREFIX_CACHE_SCHEDULING", false, setupLog)
reqHeaderBasedSchedulerForTesting = envutil.GetEnvBool("ENABLE_REQ_HEADER_BASED_SCHEDULER_FOR_TESTING", false, setupLog)
schedulerV2 = envutil.GetEnvBool("EXPERIMENTAL_USE_SCHEDULER_V2", false, setupLog)
prefixCacheScheduling = envutil.GetEnvBool("ENABLE_PREFIX_CACHE_SCHEDULING", false, setupLog)
)

// Default saturationdetector configuration values
const (
DefaultQueueDepthThreshold = commonconfig.DefaultQueueThresholdCritical
DefaultKVCacheUtilThreshold = commonconfig.DefaultKVCacheThreshold
// DefaultMetricsStalenessThreshold defines how old metrics can be before they
// are considered stale.
// Given the pod metrics refresh interval is 50ms, a threshold slightly above
// that should be fine.
DefaultMetricsStalenessThreshold = 200 * time.Millisecond

// Environment variable names for SaturationDetector configuration
EnvSdQueueDepthThreshold = "SD_QUEUE_DEPTH_THRESHOLD"
EnvSdKVCacheUtilThreshold = "SD_KV_CACHE_UTIL_THRESHOLD"
EnvSdMetricsStalenessThreshold = "SD_METRICS_STALENESS_THRESHOLD"
)

func loadSaturationDetectorConfig() *saturationdetector.Config {
logger := log.Log.WithName("saturation-detector-config")

cfg := &saturationdetector.Config{}

cfg.QueueDepthThreshold = envutil.GetEnvInt(EnvSdQueueDepthThreshold, DefaultQueueDepthThreshold, logger)
if cfg.QueueDepthThreshold <= 0 {
cfg.QueueDepthThreshold = DefaultQueueDepthThreshold
}

cfg.KVCacheUtilThreshold = envutil.GetEnvFloat(EnvSdKVCacheUtilThreshold, DefaultKVCacheUtilThreshold, logger)
if cfg.KVCacheUtilThreshold <= 0 || cfg.KVCacheUtilThreshold >= 1 {
cfg.KVCacheUtilThreshold = DefaultKVCacheUtilThreshold
}

cfg.MetricsStalenessThreshold = envutil.GetEnvDuration(EnvSdMetricsStalenessThreshold, DefaultMetricsStalenessThreshold, logger)
if cfg.MetricsStalenessThreshold <= 0 {
cfg.MetricsStalenessThreshold = DefaultMetricsStalenessThreshold
}

// NewDetector validates the config and assigns defaults.
logger.Info("SaturationDetector configuration loaded from env", "config", fmt.Sprintf("%+v", cfg))
return cfg
}

func loadPrefixCacheConfig() prefix.Config {
baseLogger := log.Log.WithName("env-config")

Expand Down Expand Up @@ -155,7 +197,7 @@ func run() error {
setupLog.Info("Flags processed", "flags", flags)

// --- Load Configurations from Environment Variables ---
sdConfig := saturationdetector.LoadConfigFromEnv()
sdConfig := loadSaturationDetectorConfig()

// --- Get Kubernetes Config ---
cfg, err := ctrl.GetConfig()
Expand Down Expand Up @@ -226,10 +268,6 @@ func run() error {
scheduler = scheduling.NewSchedulerWithConfig(datastore, schedulerConfig)
}

if reqHeaderBasedSchedulerForTesting {
scheduler = conformance_epp.NewReqHeaderBasedScheduler(datastore)
}

saturationDetector := saturationdetector.NewDetector(sdConfig, datastore, ctrl.Log)

director := requestcontrol.NewDirector(datastore, scheduler, saturationDetector) // can call "director.WithPostResponsePlugins" to add post response plugins
Expand Down
37 changes: 0 additions & 37 deletions pkg/epp/saturationdetector/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,9 @@ limitations under the License.
package saturationdetector

import (
"fmt"
"time"

"sigs.k8s.io/controller-runtime/pkg/log"
commonconfig "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/common/config"
envutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/env"
)

// Default configuration values
Expand All @@ -34,37 +31,3 @@ const (
// that should be fine.
DefaultMetricsStalenessThreshold = 200 * time.Millisecond
)

// Environment variable names for SaturationDetector configuration
const (
EnvSdQueueDepthThreshold = "SD_QUEUE_DEPTH_THRESHOLD"
EnvSdKVCacheUtilThreshold = "SD_KV_CACHE_UTIL_THRESHOLD"
EnvSdMetricsStalenessThreshold = "SD_METRICS_STALENESS_THRESHOLD"
)

// LoadConfigFromEnv loads SaturationDetector Config from environment variables.
func LoadConfigFromEnv() *Config {
// Use a default logger for initial configuration loading.
logger := log.Log.WithName("saturation-detector-config")

cfg := &Config{}

cfg.QueueDepthThreshold = envutil.GetEnvInt(EnvSdQueueDepthThreshold, DefaultQueueDepthThreshold, logger)
if cfg.QueueDepthThreshold <= 0 {
cfg.QueueDepthThreshold = DefaultQueueDepthThreshold
}

cfg.KVCacheUtilThreshold = envutil.GetEnvFloat(EnvSdKVCacheUtilThreshold, DefaultKVCacheUtilThreshold, logger)
if cfg.KVCacheUtilThreshold <= 0 || cfg.KVCacheUtilThreshold >= 1 {
cfg.KVCacheUtilThreshold = DefaultKVCacheUtilThreshold
}

cfg.MetricsStalenessThreshold = envutil.GetEnvDuration(EnvSdMetricsStalenessThreshold, DefaultMetricsStalenessThreshold, logger)
if cfg.MetricsStalenessThreshold <= 0 {
cfg.MetricsStalenessThreshold = DefaultMetricsStalenessThreshold
}

// NewDetector validates the config and assigns defaults.
logger.Info("SaturationDetector configuration loaded from env", "config", fmt.Sprintf("%+v", cfg))
return cfg
}
32 changes: 1 addition & 31 deletions pkg/epp/saturationdetector/saturationdetector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,6 @@ package saturationdetector

import (
"context"
"fmt"
"os"
"strconv"
"testing"
"time"

Expand Down Expand Up @@ -77,40 +74,13 @@ func TestNewDetector(t *testing.T) {
expectedKVCacheUtilThreshold: 0.8,
expectedStalenessThreshold: 100 * time.Millisecond,
},
{
name: "invalid thresholds, fallback to default",
config: &Config{
QueueDepthThreshold: -1,
KVCacheUtilThreshold: -5,
MetricsStalenessThreshold: 0,
},
datastore: &mockDatastore{},
expectedQueueDepthThreshold: DefaultQueueDepthThreshold,
expectedKVCacheUtilThreshold: DefaultKVCacheUtilThreshold,
expectedStalenessThreshold: DefaultMetricsStalenessThreshold,
},
{
name: "kv cache threshold above range, fallback to default",
config: &Config{
QueueDepthThreshold: 10,
KVCacheUtilThreshold: 1.5,
MetricsStalenessThreshold: 100 * time.Millisecond,
},
datastore: &mockDatastore{},
expectedQueueDepthThreshold: 10,
expectedKVCacheUtilThreshold: DefaultKVCacheUtilThreshold,
expectedStalenessThreshold: 100 * time.Millisecond,
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
// validate configuration values are loaded from env vars properly, including the use of default values when provided value is invalid.
os.Setenv(EnvSdQueueDepthThreshold, strconv.Itoa(test.config.QueueDepthThreshold))
os.Setenv(EnvSdKVCacheUtilThreshold, fmt.Sprintf("%v", test.config.KVCacheUtilThreshold))
os.Setenv(EnvSdMetricsStalenessThreshold, test.config.MetricsStalenessThreshold.String())

detector := NewDetector(LoadConfigFromEnv(), test.datastore, logr.Discard())
detector := NewDetector(test.config, test.datastore, logr.Discard())
if detector == nil {
t.Fatalf("NewDetector() returned nil detector for valid config")
}
Expand Down