Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ RUN go mod download
# Sources
COPY cmd/epp ./cmd
COPY pkg/epp ./pkg/epp
COPY conformance/testing-epp ./conformance/testing-epp
COPY internal ./internal
COPY api ./api
WORKDIR /src/cmd
Expand Down
10 changes: 8 additions & 2 deletions cmd/epp/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/metrics/filters"
metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"

conformance_epp "sigs.k8s.io/gateway-api-inference-extension/conformance/testing-epp"
"sigs.k8s.io/gateway-api-inference-extension/internal/runnable"
backendmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datastore"
Expand Down Expand Up @@ -111,8 +112,9 @@ var (
setupLog = ctrl.Log.WithName("setup")

// Environment variables
schedulerV2 = envutil.GetEnvBool("EXPERIMENTAL_USE_SCHEDULER_V2", false, setupLog)
prefixCacheScheduling = envutil.GetEnvBool("ENABLE_PREFIX_CACHE_SCHEDULING", false, setupLog)
schedulerV2 = envutil.GetEnvBool("EXPERIMENTAL_USE_SCHEDULER_V2", false, setupLog)
prefixCacheScheduling = envutil.GetEnvBool("ENABLE_PREFIX_CACHE_SCHEDULING", false, setupLog)
reqHeaderBasedSchedulerForTesting = envutil.GetEnvBool("ENABLE_REQ_HEADER_BASED_SCHEDULER_FOR_TESTING", false, setupLog)
)

func loadPrefixCacheConfig() prefix.Config {
Expand Down Expand Up @@ -224,6 +226,10 @@ func run() error {
scheduler = scheduling.NewSchedulerWithConfig(datastore, schedulerConfig)
}

if reqHeaderBasedSchedulerForTesting {
scheduler = conformance_epp.NewReqHeaderBasedScheduler(datastore)
}

saturationDetector := saturationdetector.NewDetector(sdConfig, datastore, ctrl.Log)

director := requestcontrol.NewDirector(datastore, scheduler, saturationDetector) // can call "director.WithPostResponsePlugins" to add post response plugins
Expand Down
93 changes: 93 additions & 0 deletions conformance/testing-epp/plugins/filter/filter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
Copyright 2025 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package filter

import (
"context"
"testing"

"github.com/google/go-cmp/cmp"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/types"
)

func TestFilter(t *testing.T) {
tests := []struct {
name string
req *types.LLMRequest
filter framework.Filter
input []types.Pod
output []types.Pod
}{
{
name: "TestHeaderBasedFilter, header endpoint unset in request",
req: &types.LLMRequest{}, // Delieverately unset the header.
filter: &HeaderBasedTestingFilter{},
input: []types.Pod{
&types.PodMetrics{
Pod: &backend.Pod{
Address: "test-endpoint",
},
},
},
output: []types.Pod{},
},
{
name: "TestHeaderBasedFilter, header endpoint set in request but no match",
req: &types.LLMRequest{Headers: map[string]string{headerTestEppEndPointSelectionKey: "test-endpoint"}},
filter: &HeaderBasedTestingFilter{},
input: []types.Pod{
&types.PodMetrics{
Pod: &backend.Pod{
Address: "test-endpoint-unmatch",
},
},
},
output: []types.Pod{},
},
{
name: "TestHeaderBasedFilter, header endpoint set",
req: &types.LLMRequest{Headers: map[string]string{headerTestEppEndPointSelectionKey: "test-endpoint"}},
filter: &HeaderBasedTestingFilter{},
input: []types.Pod{
&types.PodMetrics{
Pod: &backend.Pod{
Address: "test-endpoint",
},
},
},
output: []types.Pod{
&types.PodMetrics{
Pod: &backend.Pod{
Address: "test-endpoint",
},
},
},
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
got := test.filter.Filter(context.Background(), test.req, types.NewCycleState(), test.input)

if diff := cmp.Diff(test.output, got); diff != "" {
t.Errorf("Unexpected output (-want +got): %v", diff)
}
})
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
Copyright 2025 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package filter

import (
"context"

"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/types"
)

const (
headerTestEppEndPointSelectionKey = "test-epp-endpoint-selection"
)

// compile-time type assertion
var _ framework.Filter = &HeaderBasedTestingFilter{}

// NewHeaderBasedTestingFilter initializes a new HeaderBasedTestingFilter and returns its pointer.
// This should be only used in testing purpose.
func NewHeaderBasedTestingFilter() *HeaderBasedTestingFilter {
return &HeaderBasedTestingFilter{}
}

// HeaderBasedTestingFilter filters Pods based on an address specified in the "test-epp-endpoint-selection" request header.
type HeaderBasedTestingFilter struct{}

// Name returns the name of the filter.
func (f *HeaderBasedTestingFilter) Name() string {
return "test-header-based"
}

// Filter filters out pods that doesn't meet the filter criteria.
func (f *HeaderBasedTestingFilter) Filter(_ context.Context, request *types.LLMRequest, _ *types.CycleState, pods []types.Pod) []types.Pod {
filteredPods := []types.Pod{}

endPointInReqeust, found := request.Headers[headerTestEppEndPointSelectionKey]
if !found {
return filteredPods
}

for _, pod := range pods {
if pod.GetPod().Address == endPointInReqeust {
filteredPods = append(filteredPods, pod)
}
}
return filteredPods
}
34 changes: 34 additions & 0 deletions conformance/testing-epp/scheduler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
Copyright 2025 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package scheduling

import (
"sigs.k8s.io/gateway-api-inference-extension/conformance/testing-epp/plugins/filter"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/picker"
profilepicker "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/profile-picker"
)

// NewReqHeaderBasedScheduler creates a scheduler for conformance tests that selects
// an endpoint based on the "test-epp-endpoint-selection" request header. If the
// header is missing or the specified endpoint doesn't exist, no endpoint is returned.
func NewReqHeaderBasedScheduler(datastore scheduling.Datastore) *scheduling.Scheduler {
predicatableSchedulerProfile := framework.NewSchedulerProfile().WithFilters(filter.NewHeaderBasedTestingFilter()).WithPicker(picker.NewMaxScorePicker())
return scheduling.NewSchedulerWithConfig(datastore, scheduling.NewSchedulerConfig(
profilepicker.NewAllProfilesPicker(), map[string]*framework.SchedulerProfile{"req-header-based-profile": predicatableSchedulerProfile}))
}
122 changes: 122 additions & 0 deletions conformance/testing-epp/sheduler_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
/*
Copyright 2025 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package scheduling

import (
"context"
"testing"

"github.com/google/go-cmp/cmp"
"github.com/google/uuid"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend"
backendmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics" // Import config for thresholds
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/types"
)

// Tests the scheduler for conformance tests.
func TestSchedule(t *testing.T) {
tests := []struct {
name string
input []*backendmetrics.FakePodMetrics
req *types.LLMRequest
wantRes map[string]*types.Result
err bool
}{
{
name: "no pods in datastore and req header is set",
req: &types.LLMRequest{
Headers: map[string]string{"test-epp-endpoint-selection": "random-endpoint"},
RequestId: uuid.NewString(),
},
wantRes: nil,
err: true,
},
{
name: "req header not set",
input: []*backendmetrics.FakePodMetrics{
{Pod: &backend.Pod{Address: "random-endpoint"}},
},
req: &types.LLMRequest{
Headers: map[string]string{}, // Deliberately set an empty header.
RequestId: uuid.NewString(),
},
wantRes: nil,
err: true,
},
{
name: "no pods address in datastore matches req header address",
input: []*backendmetrics.FakePodMetrics{
{Pod: &backend.Pod{Address: "nonmatched-endpoint"}},
},
req: &types.LLMRequest{
Headers: map[string]string{"test-epp-endpoint-selection": "matched-endpoint"},
RequestId: uuid.NewString(),
},
wantRes: nil,
err: true,
},
{
name: "one pod address in datastore matches req header address",
input: []*backendmetrics.FakePodMetrics{
{Pod: &backend.Pod{Address: "nonmatched-endpoint"}},
{Pod: &backend.Pod{Address: "matched-endpoint"}},
},
req: &types.LLMRequest{
Headers: map[string]string{"test-epp-endpoint-selection": "matched-endpoint"},
RequestId: uuid.NewString(),
},
wantRes: map[string]*types.Result{
"req-header-based-profile": {
TargetPod: &types.ScoredPod{
Pod: &types.PodMetrics{
Pod: &backend.Pod{
Address: "matched-endpoint",
Labels: map[string]string{},
},
},
},
},
},
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
scheduler := NewReqHeaderBasedScheduler(&fakeDataStore{pods: test.input})
got, err := scheduler.Schedule(context.Background(), test.req)
if test.err != (err != nil) {
t.Errorf("Unexpected error, got %v, want %v", err, test.err)
}

if diff := cmp.Diff(test.wantRes, got); diff != "" {
t.Errorf("Unexpected output (-want +got): %v", diff)
}
})
}
}

type fakeDataStore struct {
pods []*backendmetrics.FakePodMetrics
}

func (fds *fakeDataStore) PodGetAll() []backendmetrics.PodMetrics {
pm := make([]backendmetrics.PodMetrics, 0, len(fds.pods))
for _, pod := range fds.pods {
pm = append(pm, pod)
}
return pm
}