Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"strings"

"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/plugins"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/types"
)
Expand All @@ -38,20 +39,19 @@ var _ framework.Filter = &HeaderBasedTestingFilter{}
// NewHeaderBasedTestingFilter initializes a new HeaderBasedTestingFilter.
// This should only be used for testing purposes.
func NewHeaderBasedTestingFilter() *HeaderBasedTestingFilter {
return &HeaderBasedTestingFilter{}
return &HeaderBasedTestingFilter{
tn: plugins.TypedName{Type: "header-based-testing", Name: "header-based-testing-filter"},
}
}

// HeaderBasedTestingFilter filters Pods based on an address specified in the "test-epp-endpoint-selection" request header.
type HeaderBasedTestingFilter struct{}

// Type returns the type of the filter.
func (f *HeaderBasedTestingFilter) Type() string {
return "header-based-testing"
type HeaderBasedTestingFilter struct {
tn plugins.TypedName
}

// Name returns the type of the filter.
func (f *HeaderBasedTestingFilter) Name() string {
return "header-based-testing-filter"
// TypedName returns the type and name tuple of this plugin instance.
func (f *HeaderBasedTestingFilter) TypedName() plugins.TypedName {
return f.tn
}

// Filter selects pods that match the IP addresses specified in the request header.
Expand Down
73 changes: 44 additions & 29 deletions pkg/epp/common/config/loader/configloader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -555,15 +555,18 @@ schedulingProfiles:
var _ framework.Filter = &test1{}

type test1 struct {
tn plugins.TypedName
Threshold int `json:"threshold"`
}

func (f *test1) Type() string {
return test1Type
func newTest1() *test1 {
return &test1{
tn: plugins.TypedName{Type: test1Type, Name: "test-1"},
}
}

func (f *test1) Name() string {
return "test-1"
func (f *test1) TypedName() plugins.TypedName {
return f.tn
}

// Filter filters out pods that doesn't meet the filter criteria.
Expand All @@ -575,14 +578,18 @@ func (f *test1) Filter(_ context.Context, _ *types.CycleState, _ *types.LLMReque
var _ framework.Scorer = &test2{}
var _ framework.PostCycle = &test2{}

type test2 struct{}
type test2 struct {
tn plugins.TypedName
}

func (f *test2) Type() string {
return test2Type
func newTest2() *test2 {
return &test2{
tn: plugins.TypedName{Type: test2Type, Name: "test-2"},
}
}

func (f *test2) Name() string {
return "test-2"
func (m *test2) TypedName() plugins.TypedName {
return m.tn
}

func (m *test2) Score(_ context.Context, _ *types.CycleState, _ *types.LLMRequest, _ []types.Pod) map[types.Pod]float64 {
Expand All @@ -594,14 +601,18 @@ func (m *test2) PostCycle(_ context.Context, _ *types.CycleState, _ *types.Profi
// compile-time type validation
var _ framework.Picker = &testPicker{}

type testPicker struct{}
type testPicker struct {
tn plugins.TypedName
}

func (p *testPicker) Type() string {
return testPickerType
func newTestPicker() *testPicker {
return &testPicker{
tn: plugins.TypedName{Type: testPickerType, Name: "test-picker"},
}
}

func (p *testPicker) Name() string {
return "test-picker"
func (p *testPicker) TypedName() plugins.TypedName {
return p.tn
}

func (p *testPicker) Pick(_ context.Context, _ *types.CycleState, _ []*types.ScoredPod) *types.ProfileRunResult {
Expand All @@ -611,14 +622,18 @@ func (p *testPicker) Pick(_ context.Context, _ *types.CycleState, _ []*types.Sco
// compile-time type validation
var _ framework.ProfileHandler = &testProfileHandler{}

type testProfileHandler struct{}
type testProfileHandler struct {
tn plugins.TypedName
}

func (p *testProfileHandler) Type() string {
return testProfileHandlerType
func newTestProfileHandler() *testProfileHandler {
return &testProfileHandler{
tn: plugins.TypedName{Type: testProfileHandlerType, Name: "test-profile-handler"},
}
}

func (p *testProfileHandler) Name() string {
return "test-profile-handler"
func (p *testProfileHandler) TypedName() plugins.TypedName {
return p.tn
}

func (p *testProfileHandler) Pick(_ context.Context, _ *types.CycleState, _ *types.LLMRequest, _ map[string]*framework.SchedulerProfile, _ map[string]*types.ProfileRunResult) map[string]*framework.SchedulerProfile {
Expand All @@ -631,28 +646,28 @@ func (p *testProfileHandler) ProcessResults(_ context.Context, _ *types.CycleSta

func registerTestPlugins() {
plugins.Register(test1Type,
func(name string, parameters json.RawMessage, handle plugins.Handle) (plugins.Plugin, error) {
result := test1{}
err := json.Unmarshal(parameters, &result)
return &result, err
func(_ string, parameters json.RawMessage, _ plugins.Handle) (plugins.Plugin, error) {
result := newTest1()
err := json.Unmarshal(parameters, result)
return result, err
},
)

plugins.Register(test2Type,
func(name string, parameters json.RawMessage, handle plugins.Handle) (plugins.Plugin, error) {
return &test2{}, nil
func(_ string, _ json.RawMessage, _ plugins.Handle) (plugins.Plugin, error) {
return newTest2(), nil
},
)

plugins.Register(testPickerType,
func(name string, parameters json.RawMessage, handle plugins.Handle) (plugins.Plugin, error) {
return &testPicker{}, nil
func(_ string, _ json.RawMessage, _ plugins.Handle) (plugins.Plugin, error) {
return newTestPicker(), nil
},
)

plugins.Register(testProfileHandlerType,
func(name string, parameters json.RawMessage, handle plugins.Handle) (plugins.Plugin, error) {
return &testProfileHandler{}, nil
func(_ string, _ json.RawMessage, _ plugins.Handle) (plugins.Plugin, error) {
return newTestProfileHandler(), nil
},
)
}
Expand Down
6 changes: 2 additions & 4 deletions pkg/epp/plugins/plugins.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,8 @@ import (
// Plugin defines the interface for a plugin.
// This interface should be embedded in all plugins across the code.
type Plugin interface {
// Type returns the type of the plugin.
Type() string
// Name returns the name of this plugin instance.
Name() string
// TypedName returns the type and name tuple of this plugin instance.
TypedName() TypedName
}

// Handle provides plugins a set of standard data and tools to work with
Expand Down
38 changes: 38 additions & 0 deletions pkg/epp/plugins/typedname.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
Copyright 2025 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package plugins

// TypedName is a utility struct providing a type and a name
// to plugins.
// It implements the Plugin interface and can be embedded in
// plugins across the code to reduce boilerplate.
type TypedName struct {
// Type returns the type of the plugin.
Type string
// Name returns the name of this plugin instance.
Name string
}

const (
Separator = "/"
)

// String returns the type and name rendered as
// "<type>/<name>".
func (tn *TypedName) String() string {
return tn.Type + Separator + tn.Name
}
8 changes: 4 additions & 4 deletions pkg/epp/requestcontrol/director.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,18 +312,18 @@ func RandomWeightedDraw(logger logr.Logger, model *v1alpha2.InferenceModel, seed
func (d *Director) runPreRequestPlugins(ctx context.Context, request *schedulingtypes.LLMRequest, schedulingResult *schedulingtypes.SchedulingResult,
targetPort int) {
for _, plugin := range d.preRequestPlugins {
log.FromContext(ctx).V(logutil.DEBUG).Info("Running pre-request plugin", "plugin", plugin.Type())
log.FromContext(ctx).V(logutil.DEBUG).Info("Running pre-request plugin", "plugin", plugin.TypedName().Type)
before := time.Now()
plugin.PreRequest(ctx, request, schedulingResult, targetPort)
metrics.RecordRequestControlPluginProcessingLatency(PreRequestPluginType, plugin.Type(), time.Since(before))
metrics.RecordRequestControlPluginProcessingLatency(PreRequestPluginType, plugin.TypedName().Type, time.Since(before))
}
}

func (d *Director) runPostResponsePlugins(ctx context.Context, request *schedulingtypes.LLMRequest, response *Response, targetPod *backend.Pod) {
for _, plugin := range d.postResponsePlugins {
log.FromContext(ctx).V(logutil.DEBUG).Info("Running post-response plugin", "plugin", plugin.Type())
log.FromContext(ctx).V(logutil.DEBUG).Info("Running post-response plugin", "plugin", plugin.TypedName().Type)
before := time.Now()
plugin.PostResponse(ctx, request, response, targetPod)
metrics.RecordRequestControlPluginProcessingLatency(PostResponsePluginType, plugin.Type(), time.Since(before))
metrics.RecordRequestControlPluginProcessingLatency(PostResponsePluginType, plugin.TypedName().Type, time.Since(before))
}
}
22 changes: 16 additions & 6 deletions pkg/epp/requestcontrol/director_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
backendmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datastore"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/handlers"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/plugins"
schedulingtypes "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/types"
errutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/error"
logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging"
Expand Down Expand Up @@ -653,9 +654,7 @@ func pointer(v int32) *int32 {
}

func TestDirector_HandleResponse(t *testing.T) {
pr1 := &testPostResponse{
TypeRes: "pr1",
}
pr1 := newTestPostResponse("pr1")

ctx := logutil.NewTestLoggerIntoContext(context.Background())
ds := datastore.NewDatastore(t.Context(), nil)
Expand Down Expand Up @@ -691,14 +690,25 @@ func TestDirector_HandleResponse(t *testing.T) {
}
}

const (
testPostResponseType = "test-post-response"
)

type testPostResponse struct {
TypeRes string
tn plugins.TypedName
lastRespOnResponse *Response
lastTargetPodOnResponse string
}

func (p *testPostResponse) Type() string { return p.TypeRes }
func (p *testPostResponse) Name() string { return "test-post-response" }
func newTestPostResponse(name string) *testPostResponse {
return &testPostResponse{
tn: plugins.TypedName{Type: testPostResponseType, Name: name},
}
}

func (p *testPostResponse) TypedName() plugins.TypedName {
return p.tn
}

func (p *testPostResponse) PostResponse(_ context.Context, _ *schedulingtypes.LLMRequest, response *Response, targetPod *backend.Pod) {
p.lastRespOnResponse = response
Expand Down
26 changes: 11 additions & 15 deletions pkg/epp/scheduling/framework/plugins/filter/decision_tree_filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,11 @@ const (
// compile-time type assertion
var _ framework.Filter = &DecisionTreeFilter{}

// DecisionTreeFilter applies current fitler, and then recursively applies next filters
// DecisionTreeFilter applies current filter, and then recursively applies next filters
// depending success or failure of the current filter.
// It can be used to construct a flow chart algorithm.
// Since a DecisionTreeFilter takes on the type and name of the current filter,
// it is not embedding a fixed plugins.TypeName.
type DecisionTreeFilter struct {
Current framework.Filter
// NextOnSuccess filter will be applied after successfully applying the current filter.
Expand Down Expand Up @@ -131,20 +133,14 @@ func loadDecisionTreeEntry(entry *decisionTreeFilterEntry, handle plugins.Handle
return nil, errors.New("either pluginRef or decisionTree must be specified")
}

// Type returns the type of the filter.
func (f *DecisionTreeFilter) Type() string {
func (f *DecisionTreeFilter) TypedName() plugins.TypedName {
if f == nil {
return "nil"
// TODO: this keeps the previous behavior ("nil"/"") - not sure
// why done this way.
// Change to empty TypedName or some more meaningful values?
return plugins.TypedName{Type: "nil", Name: ""}
}
return f.Current.Type()
}

// Name returns the name of the filter.
func (f *DecisionTreeFilter) Name() string {
if f == nil {
return ""
}
return f.Current.Name()
return f.Current.TypedName()
}

// Filter filters out pods that doesn't meet the filter criteria.
Expand All @@ -161,7 +157,7 @@ func (f *DecisionTreeFilter) Filter(ctx context.Context, cycleState *types.Cycle
if f.NextOnSuccess != nil {
next = f.NextOnSuccess
}
loggerTrace.Info("Filter succeeded", "filter", f.Type(), "next", next.Type(), "filteredPodCount", len(filteredPod))
loggerTrace.Info("Filter succeeded", "filter", f.TypedName(), "next", next.TypedName(), "filteredPodCount", len(filteredPod))
// On success, pass the filtered result to the next filter.
return next.Filter(ctx, cycleState, request, filteredPod)
} else {
Expand All @@ -172,7 +168,7 @@ func (f *DecisionTreeFilter) Filter(ctx context.Context, cycleState *types.Cycle
if f.NextOnFailure != nil {
next = f.NextOnFailure
}
loggerTrace.Info("Filter failed", "filter", f.Type(), "next", next.Type())
loggerTrace.Info("Filter failed", "filter", f.TypedName(), "next", next.TypedName())
// On failure, pass the initial set of pods to the next filter.
return next.Filter(ctx, cycleState, request, pods)
}
Expand Down
Loading