Skip to content

Commit 74d12ca

Browse files
committed
Add subsetting logic for epp
1 parent d7e1b64 commit 74d12ca

File tree

12 files changed

+445
-18
lines changed

12 files changed

+445
-18
lines changed

cmd/epp/runner/runner.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ import (
4444
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/saturationdetector"
4545
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling"
4646
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework"
47+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/filter"
4748
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/multi/prefix"
4849
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/picker"
4950
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/profile"
@@ -270,11 +271,13 @@ func (r *Runner) initializeScheduler(datastore datastore.Datastore) (*scheduling
270271
if schedulerV2 {
271272
queueScorerWeight := envutil.GetEnvInt("QUEUE_SCORE_WEIGHT", scorer.DefaultQueueScorerWeight, setupLog)
272273
kvCacheScorerWeight := envutil.GetEnvInt("KV_CACHE_SCORE_WEIGHT", scorer.DefaultKVCacheScorerWeight, setupLog)
274+
endpointSubsetFilter := filter.NewSubsetFilter()
273275

274276
schedulerProfile := framework.NewSchedulerProfile().
275277
WithScorers(framework.NewWeightedScorer(scorer.NewQueueScorer(), queueScorerWeight),
276278
framework.NewWeightedScorer(scorer.NewKVCacheScorer(), kvCacheScorerWeight)).
277-
WithPicker(picker.NewMaxScorePicker())
279+
WithPicker(picker.NewMaxScorePicker()).
280+
WithFilters(endpointSubsetFilter)
278281

279282
if prefixCacheScheduling {
280283
prefixScorerWeight := envutil.GetEnvInt("PREFIX_CACHE_SCORE_WEIGHT", prefix.DefaultScorerWeight, setupLog)

pkg/epp/handlers/server.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -111,8 +111,9 @@ type RequestContext struct {
111111
}
112112

113113
type Request struct {
114-
Headers map[string]string
115-
Body map[string]interface{}
114+
Headers map[string]string
115+
Body map[string]interface{}
116+
Metadata map[string]any
116117
}
117118
type Response struct {
118119
Headers map[string]string
@@ -141,8 +142,9 @@ func (s *StreamingServer) Process(srv extProcPb.ExternalProcessor_ProcessServer)
141142
reqCtx := &RequestContext{
142143
RequestState: RequestReceived,
143144
Request: &Request{
144-
Headers: make(map[string]string),
145-
Body: make(map[string]interface{}),
145+
Headers: make(map[string]string),
146+
Body: make(map[string]interface{}),
147+
Metadata: make(map[string]any),
146148
},
147149
Response: &Response{
148150
Headers: make(map[string]string),
@@ -185,6 +187,8 @@ func (s *StreamingServer) Process(srv extProcPb.ExternalProcessor_ProcessServer)
185187
return status.Errorf(codes.Unknown, "cannot receive stream request: %v", err)
186188
}
187189

190+
reqCtx.Request.Metadata = requtil.ExtractMetadataValues(req, requtil.MetadataSubsetKey)
191+
188192
switch v := req.Request.(type) {
189193
case *extProcPb.ProcessingRequest_RequestHeaders:
190194
if requestId := requtil.ExtractHeaderValue(v, requtil.RequestIdHeaderKey); len(requestId) > 0 {

pkg/epp/requestcontrol/director.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,8 @@ func (d *Director) HandleRequest(ctx context.Context, reqCtx *handlers.RequestCo
117117
Prompt: prompt,
118118
Headers: reqCtx.Request.Headers,
119119
}
120+
reqCtx.SchedulingRequest.SetMetadata(reqCtx.Request.Metadata)
121+
120122
logger = logger.WithValues(
121123
"model", reqCtx.Model,
122124
"resolvedTargetModel", reqCtx.ResolvedTargetModel,

pkg/epp/scheduling/framework/plugins/filter/filter_test.go

Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -247,3 +247,108 @@ func TestLoRASoftAffinityDistribution(t *testing.T) {
247247
actualAvailablePercent, availableLowerBound, availableUpperBound)
248248
}
249249
}
250+
251+
func TestSubsettingFilter(t *testing.T) {
252+
var makeFilterMetadata = func(data []interface{}) map[string]any {
253+
return map[string]any{
254+
"envoy.lb.subset_hint": map[string]any{
255+
"x-gateway-destination-endpoint-subset": data,
256+
},
257+
}
258+
}
259+
260+
tests := []struct {
261+
name string
262+
metadata map[string]any
263+
filter framework.Filter
264+
input []types.Pod
265+
output []types.Pod
266+
}{
267+
{
268+
name: "SubsetFilter, filter not present — return all pods",
269+
filter: &SubsetFilter{},
270+
metadata: map[string]any{},
271+
input: []types.Pod{
272+
&types.PodMetrics{
273+
Pod: &backend.Pod{Address: "10.0.0.1"},
274+
},
275+
&types.PodMetrics{
276+
Pod: &backend.Pod{Address: "10.0.0.2"},
277+
},
278+
},
279+
output: []types.Pod{
280+
&types.PodMetrics{
281+
Pod: &backend.Pod{Address: "10.0.0.1"},
282+
},
283+
&types.PodMetrics{
284+
Pod: &backend.Pod{Address: "10.0.0.2"},
285+
},
286+
},
287+
},
288+
{
289+
name: "SubsetFilter, subset with one matching pod",
290+
metadata: makeFilterMetadata([]interface{}{"10.0.0.1"}),
291+
filter: &SubsetFilter{},
292+
input: []types.Pod{
293+
&types.PodMetrics{
294+
Pod: &backend.Pod{Address: "10.0.0.1"},
295+
},
296+
&types.PodMetrics{
297+
Pod: &backend.Pod{Address: "10.0.0.2"},
298+
},
299+
},
300+
output: []types.Pod{
301+
&types.PodMetrics{
302+
Pod: &backend.Pod{Address: "10.0.0.1"},
303+
},
304+
},
305+
},
306+
{
307+
name: "SubsetFilter, subset with multiple matching pods",
308+
metadata: makeFilterMetadata([]interface{}{"10.0.0.1", "10.0.0.2", "10.0.0.3"}),
309+
filter: &SubsetFilter{},
310+
input: []types.Pod{
311+
&types.PodMetrics{
312+
Pod: &backend.Pod{Address: "10.0.0.1"},
313+
},
314+
&types.PodMetrics{
315+
Pod: &backend.Pod{Address: "10.0.0.2"},
316+
},
317+
},
318+
output: []types.Pod{
319+
&types.PodMetrics{
320+
Pod: &backend.Pod{Address: "10.0.0.1"},
321+
},
322+
&types.PodMetrics{
323+
Pod: &backend.Pod{Address: "10.0.0.2"},
324+
},
325+
},
326+
},
327+
{
328+
name: "SubsetFilter, subset with no matching pods",
329+
metadata: makeFilterMetadata([]interface{}{"10.0.0.3"}),
330+
filter: &SubsetFilter{},
331+
input: []types.Pod{
332+
&types.PodMetrics{
333+
Pod: &backend.Pod{Address: "10.0.0.1"},
334+
},
335+
&types.PodMetrics{
336+
Pod: &backend.Pod{Address: "10.0.0.2"},
337+
},
338+
},
339+
output: []types.Pod{},
340+
},
341+
}
342+
343+
for _, test := range tests {
344+
t.Run(test.name, func(t *testing.T) {
345+
req := &types.LLMRequest{}
346+
req.SetMetadata(test.metadata)
347+
got := test.filter.Filter(context.Background(), req, types.NewCycleState(), test.input)
348+
349+
if diff := cmp.Diff(test.output, got); diff != "" {
350+
t.Errorf("Unexpected output (-want +got): %v", diff)
351+
}
352+
})
353+
}
354+
}
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
/*
2+
Copyright 2025 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package filter
18+
19+
import (
20+
"context"
21+
"strings"
22+
23+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework"
24+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/types"
25+
)
26+
27+
const (
28+
subsetHintNamespace = "envoy.lb.subset_hint"
29+
subsetHintKey = "x-gateway-destination-endpoint-subset"
30+
)
31+
32+
// compile-time type assertion
33+
var _ framework.Filter = &SubsetFilter{}
34+
35+
// NewSubsetFilter initializes a new SubsetFilter.
36+
func NewSubsetFilter() *SubsetFilter {
37+
return &SubsetFilter{}
38+
}
39+
40+
// SubsetFilter filters Pods based on the subset hint provided by the proxy via filterMetadata.
41+
type SubsetFilter struct{}
42+
43+
// Name returns the name of the filter.
44+
func (f *SubsetFilter) Name() string {
45+
return "subset-hint"
46+
}
47+
48+
// Filter filters out pods that are not in the subset provided in filterMetadata.
49+
func (f *SubsetFilter) Filter(_ context.Context, request *types.LLMRequest, _ *types.CycleState, pods []types.Pod) []types.Pod {
50+
// Check if envoy.lb.subset_hint is present in the metadata map
51+
subsetMap, found := request.GetMetadata()[subsetHintNamespace].(map[string]any)
52+
if !found {
53+
return pods
54+
}
55+
56+
// Check if endpoint key is present in the subset map and ensure there is at least one value
57+
endpointSubsetList, found := subsetMap[subsetHintKey].([]interface{})
58+
if !found || len(endpointSubsetList) == 0 {
59+
return pods
60+
}
61+
62+
// Create a map of endpoint addys for easy lookup
63+
endpoints := make(map[string]bool)
64+
for _, endpoint := range endpointSubsetList {
65+
epStr := strings.Split(endpoint.(string), ":")[0]
66+
endpoints[epStr] = true
67+
}
68+
69+
// Filter based on address
70+
filteredPods := []types.Pod{}
71+
for _, pod := range pods {
72+
if _, found := endpoints[pod.GetPod().Address]; found {
73+
filteredPods = append(filteredPods, pod)
74+
}
75+
}
76+
77+
return filteredPods
78+
}

pkg/epp/scheduling/scheduler.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ func NewScheduler(datastore Datastore) *Scheduler {
3838
// When the scheduler is initialized with NewScheduler function, thw below config will be used as default.
3939
// it's possible to call NewSchedulerWithConfig to pass a different scheduler config.
4040
// For build time plugins changes, it's recommended to call in main.go to NewSchedulerWithConfig.
41+
endpointSubsetFilter := filter.NewSubsetFilter()
4142
loraAffinityFilter := filter.NewLoraAffinityFilter()
4243
leastQueueFilter := filter.NewLeastQueueFilter()
4344
leastKvCacheFilter := filter.NewLeastKVCacheFilter()
@@ -65,7 +66,7 @@ func NewScheduler(datastore Datastore) *Scheduler {
6566
}
6667

6768
defaultProfile := framework.NewSchedulerProfile().
68-
WithFilters(lowLatencyFilter).
69+
WithFilters(endpointSubsetFilter, lowLatencyFilter).
6970
WithPicker(&picker.RandomPicker{})
7071

7172
profilePicker := profilepicker.NewSingleProfileHandler()

pkg/epp/scheduling/types/types.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,12 +33,23 @@ type LLMRequest struct {
3333
Prompt string
3434
// Headers is a map of the request headers.
3535
Headers map[string]string
36+
37+
// filterMetadata is a map of metadata in the request
38+
metadata map[string]any
3639
}
3740

3841
func (r *LLMRequest) String() string {
3942
return fmt.Sprintf("RequestID: %s, TargetModel: %s, PromptLength: %d, Headers: %v", r.RequestId, r.TargetModel, len(r.Prompt), r.Headers)
4043
}
4144

45+
func (r *LLMRequest) GetMetadata() map[string]any {
46+
return r.metadata
47+
}
48+
49+
func (r *LLMRequest) SetMetadata(metadata map[string]any) {
50+
r.metadata = metadata
51+
}
52+
4253
type Pod interface {
4354
GetPod() *backend.Pod
4455
GetMetrics() *backendmetrics.MetricsState

pkg/epp/util/request/metadata.go

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/*
2+
Copyright 2025 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package request
18+
19+
import (
20+
extProcPb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3"
21+
)
22+
23+
const (
24+
MetadataSubsetKey = "envoy.lb.subset_hint"
25+
)
26+
27+
func ExtractMetadataValues(req *extProcPb.ProcessingRequest, metadataKey string) map[string]any {
28+
metadata := make(map[string]any)
29+
if req != nil && req.MetadataContext != nil && req.MetadataContext.FilterMetadata != nil {
30+
for key, val := range req.MetadataContext.FilterMetadata {
31+
if key == metadataKey {
32+
// envoy.lb.subset_hint key is always a map[string]any
33+
metadata[key] = val.AsMap()
34+
}
35+
}
36+
}
37+
return metadata
38+
}

0 commit comments

Comments
 (0)