Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cmd/epp/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import (
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/saturationdetector"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/filter"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/multi/prefix"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/picker"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/profile"
Expand Down Expand Up @@ -304,6 +305,7 @@ func (r *Runner) initializeScheduler() (*scheduling.Scheduler, error) {
kvCacheScorerWeight := envutil.GetEnvInt("KV_CACHE_SCORE_WEIGHT", scorer.DefaultKVCacheScorerWeight, setupLog)

schedulerProfile := framework.NewSchedulerProfile().
WithFilters(filter.NewSubsetFilter()).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is fine for now, but please open an issue to allow configuring this across profiles irrespective of the source of the configuration (see the discussion we had on the issue)

WithScorers(framework.NewWeightedScorer(scorer.NewQueueScorer(), queueScorerWeight),
framework.NewWeightedScorer(scorer.NewKVCacheScorer(), kvCacheScorerWeight)).
WithPicker(picker.NewMaxScorePicker())
Expand Down
12 changes: 8 additions & 4 deletions pkg/epp/handlers/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,9 @@ type RequestContext struct {
}

type Request struct {
Headers map[string]string
Body map[string]interface{}
Headers map[string]string
Body map[string]interface{}
Metadata map[string]any
}
type Response struct {
Headers map[string]string
Expand Down Expand Up @@ -141,8 +142,9 @@ func (s *StreamingServer) Process(srv extProcPb.ExternalProcessor_ProcessServer)
reqCtx := &RequestContext{
RequestState: RequestReceived,
Request: &Request{
Headers: make(map[string]string),
Body: make(map[string]interface{}),
Headers: make(map[string]string),
Body: make(map[string]interface{}),
Metadata: make(map[string]any),
},
Response: &Response{
Headers: make(map[string]string),
Expand Down Expand Up @@ -185,6 +187,8 @@ func (s *StreamingServer) Process(srv extProcPb.ExternalProcessor_ProcessServer)
return status.Errorf(codes.Unknown, "cannot receive stream request: %v", err)
}

reqCtx.Request.Metadata = requtil.ExtractMetadataValues(req)

switch v := req.Request.(type) {
case *extProcPb.ProcessingRequest_RequestHeaders:
if requestId := requtil.ExtractHeaderValue(v, requtil.RequestIdHeaderKey); len(requestId) > 0 {
Expand Down
13 changes: 7 additions & 6 deletions pkg/epp/requestcontrol/director.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,14 +118,15 @@ func (d *Director) HandleRequest(ctx context.Context, reqCtx *handlers.RequestCo
}

// Prepare LLMRequest (needed for both saturation detection and Scheduler)
reqCtx.SchedulingRequest = &schedulingtypes.LLMRequest{
RequestId: reqCtx.Request.Headers[requtil.RequestIdHeaderKey],
TargetModel: reqCtx.ResolvedTargetModel,
Prompt: prompt,
Headers: reqCtx.Request.Headers,
}
reqCtx.SchedulingRequest = schedulingtypes.NewLLMRequest(
reqCtx.Request.Headers[requtil.RequestIdHeaderKey],
reqCtx.ResolvedTargetModel,
prompt,
reqCtx.Request.Headers,
reqCtx.Request.Metadata)

logger = logger.WithValues("model", reqCtx.Model, "resolvedTargetModel", reqCtx.ResolvedTargetModel, "criticality", requestCriticality)

ctx = log.IntoContext(ctx, logger)
logger.V(logutil.DEBUG).Info("LLM request assembled")

Expand Down
139 changes: 139 additions & 0 deletions pkg/epp/scheduling/framework/plugins/filter/filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,3 +251,142 @@ func TestLoRASoftAffinityDistribution(t *testing.T) {
actualAvailablePercent, availableLowerBound, availableUpperBound)
}
}

func TestSubsettingFilter(t *testing.T) {
var makeFilterMetadata = func(data []interface{}) map[string]any {
return map[string]any{
"envoy.lb.subset_hint": map[string]any{
"x-gateway-destination-endpoint-subset": data,
},
}
}

tests := []struct {
name string
metadata map[string]any
filter framework.Filter
input []types.Pod
output []types.Pod
}{
{
name: "SubsetFilter, filter not present — return all pods",
filter: &SubsetFilter{},
metadata: map[string]any{},
input: []types.Pod{
&types.PodMetrics{
Pod: &backend.Pod{Address: "10.0.0.1"},
},
&types.PodMetrics{
Pod: &backend.Pod{Address: "10.0.0.2"},
},
},
output: []types.Pod{
&types.PodMetrics{
Pod: &backend.Pod{Address: "10.0.0.1"},
},
&types.PodMetrics{
Pod: &backend.Pod{Address: "10.0.0.2"},
},
},
},
{
name: "SubsetFilter, namespace present filter not present — return all pods",
filter: &SubsetFilter{},
metadata: map[string]any{"envoy.lb.subset_hint": map[string]any{}},
input: []types.Pod{
&types.PodMetrics{
Pod: &backend.Pod{Address: "10.0.0.1"},
},
&types.PodMetrics{
Pod: &backend.Pod{Address: "10.0.0.2"},
},
},
output: []types.Pod{
&types.PodMetrics{
Pod: &backend.Pod{Address: "10.0.0.1"},
},
&types.PodMetrics{
Pod: &backend.Pod{Address: "10.0.0.2"},
},
},
},
{
name: "SubsetFilter, filter present with empty list — return no pods",
filter: &SubsetFilter{},
metadata: makeFilterMetadata([]interface{}{}),
input: []types.Pod{
&types.PodMetrics{
Pod: &backend.Pod{Address: "10.0.0.1"},
},
&types.PodMetrics{
Pod: &backend.Pod{Address: "10.0.0.2"},
},
},
output: []types.Pod{},
},
{
name: "SubsetFilter, subset with one matching pod",
metadata: makeFilterMetadata([]interface{}{"10.0.0.1"}),
filter: &SubsetFilter{},
input: []types.Pod{
&types.PodMetrics{
Pod: &backend.Pod{Address: "10.0.0.1"},
},
&types.PodMetrics{
Pod: &backend.Pod{Address: "10.0.0.2"},
},
},
output: []types.Pod{
&types.PodMetrics{
Pod: &backend.Pod{Address: "10.0.0.1"},
},
},
},
{
name: "SubsetFilter, subset with multiple matching pods",
metadata: makeFilterMetadata([]interface{}{"10.0.0.1", "10.0.0.2", "10.0.0.3"}),
filter: &SubsetFilter{},
input: []types.Pod{
&types.PodMetrics{
Pod: &backend.Pod{Address: "10.0.0.1"},
},
&types.PodMetrics{
Pod: &backend.Pod{Address: "10.0.0.2"},
},
},
output: []types.Pod{
&types.PodMetrics{
Pod: &backend.Pod{Address: "10.0.0.1"},
},
&types.PodMetrics{
Pod: &backend.Pod{Address: "10.0.0.2"},
},
},
},
{
name: "SubsetFilter, subset with no matching pods",
metadata: makeFilterMetadata([]interface{}{"10.0.0.3"}),
filter: &SubsetFilter{},
input: []types.Pod{
&types.PodMetrics{
Pod: &backend.Pod{Address: "10.0.0.1"},
},
&types.PodMetrics{
Pod: &backend.Pod{Address: "10.0.0.2"},
},
},
output: []types.Pod{},
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
req := types.NewLLMRequest(uuid.NewString(), "", "", nil, test.metadata)
got := test.filter.Filter(context.Background(), types.NewCycleState(), req, test.input)

if diff := cmp.Diff(test.output, got); diff != "" {
t.Errorf("Unexpected output (-want +got): %v", diff)
}
})
}
}
89 changes: 89 additions & 0 deletions pkg/epp/scheduling/framework/plugins/filter/subsetting_filter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
Copyright 2025 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package filter

import (
"context"
"strings"

"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/types"
)

const (
SubsetFilterType = "subset"

subsetHintKey = "x-gateway-destination-endpoint-subset"
subsetHintNamespace = "envoy.lb.subset_hint"
)

// compile-time type assertion
var _ framework.Filter = &SubsetFilter{}

// NewSubsetFilter initializes a new SubsetFilter.
func NewSubsetFilter() *SubsetFilter {
return &SubsetFilter{}
}

// SubsetFilter filters Pods based on the subset hint provided by the proxy via filterMetadata.
type SubsetFilter struct{}

// Name returns the name of the filter.
func (f *SubsetFilter) Name() string {
return "subset-hint"
}

// Type returns the type of the filter.
func (f *SubsetFilter) Type() string {
return SubsetFilterType
}

// Filter filters out pods that are not in the subset provided in filterMetadata.
func (f *SubsetFilter) Filter(_ context.Context, _ *types.CycleState, request *types.LLMRequest, pods []types.Pod) []types.Pod {
// Check if subset namespace key is present in the metadata map
subsetMap, found := request.GetMetadata()[subsetHintNamespace].(map[string]any)
if !found {
return pods
}

// Check if endpoint key is present in the subset map and ensure there is at least one value
endpointSubsetList, found := subsetMap[subsetHintKey].([]interface{})
if !found {
return pods
} else if len(endpointSubsetList) == 0 {
return []types.Pod{}
}

// Create a map of endpoint addrs for easy lookup
endpoints := make(map[string]bool)
for _, endpoint := range endpointSubsetList {
// Extract address from endpoint
// The endpoint is formatted as "<address>:<port>" (ex. "10.0.1.0:8080")
epStr := strings.Split(endpoint.(string), ":")[0]
endpoints[epStr] = true
}

// Filter based on address
filteredPods := []types.Pod{}
for _, pod := range pods {
if _, found := endpoints[pod.GetPod().Address]; found {
filteredPods = append(filteredPods, pod)
}
}

return filteredPods
}
3 changes: 2 additions & 1 deletion pkg/epp/scheduling/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ func NewScheduler() *Scheduler {
// it's possible to call NewSchedulerWithConfig to pass a different scheduler config.
// For build time plugins changes, it's recommended to call in main.go to NewSchedulerWithConfig.
loraAffinityFilter := filter.NewLoraAffinityFilter(config.Conf.LoraAffinityThreshold)
endpointSubsetFilter := filter.NewSubsetFilter()
leastQueueFilter := filter.NewLeastQueueFilter()
leastKvCacheFilter := filter.NewLeastKVCacheFilter()

Expand All @@ -70,7 +71,7 @@ func NewScheduler() *Scheduler {
}

defaultProfile := framework.NewSchedulerProfile().
WithFilters(lowLatencyFilter).
WithFilters(endpointSubsetFilter, lowLatencyFilter).
WithPicker(&picker.RandomPicker{})

profileHandler := profile.NewSingleProfileHandler()
Expand Down
17 changes: 17 additions & 0 deletions pkg/epp/scheduling/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,29 @@ type LLMRequest struct {
Prompt string
// Headers is a map of the request headers.
Headers map[string]string

// metadata is a map of metadata in the request
metadata map[string]any
}

func NewLLMRequest(reqID, targetModel, prompt string, headers map[string]string, metadata map[string]any) *LLMRequest {
return &LLMRequest{
RequestId: reqID,
TargetModel: targetModel,
Prompt: prompt,
Headers: headers,
metadata: metadata,
}
}

func (r *LLMRequest) String() string {
return fmt.Sprintf("RequestID: %s, TargetModel: %s, PromptLength: %d, Headers: %v", r.RequestId, r.TargetModel, len(r.Prompt), r.Headers)
}

func (r *LLMRequest) GetMetadata() map[string]any {
return r.metadata
}

type Pod interface {
GetPod() *backend.Pod
GetMetrics() *backendmetrics.MetricsState
Expand Down
31 changes: 31 additions & 0 deletions pkg/epp/util/request/metadata.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
Copyright 2025 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package request

import (
extProcPb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3"
)

func ExtractMetadataValues(req *extProcPb.ProcessingRequest) map[string]any {
metadata := make(map[string]any)
if req != nil && req.MetadataContext != nil && req.MetadataContext.FilterMetadata != nil {
for key, val := range req.MetadataContext.FilterMetadata {
metadata[key] = val.AsMap()
}
}
return metadata
}
Loading