Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions conformance/testing-epp/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
func TestSchedule(t *testing.T) {
tests := []struct {
name string
input []backendmetrics.PodMetrics
input []types.Pod
req *types.LLMRequest
wantRes *types.SchedulingResult
err bool
Expand All @@ -47,7 +47,7 @@ func TestSchedule(t *testing.T) {
},
{
name: "req header not set",
input: []backendmetrics.PodMetrics{
input: []types.Pod{
&backendmetrics.FakePodMetrics{Pod: &backend.Pod{Address: "random-endpoint"}},
},
req: &types.LLMRequest{
Expand All @@ -59,7 +59,7 @@ func TestSchedule(t *testing.T) {
},
{
name: "no pods address from the candidate pods matches req header address",
input: []backendmetrics.PodMetrics{
input: []types.Pod{
&backendmetrics.FakePodMetrics{Pod: &backend.Pod{Address: "nonmatched-endpoint"}},
},
req: &types.LLMRequest{
Expand All @@ -71,7 +71,7 @@ func TestSchedule(t *testing.T) {
},
{
name: "one pod address from the candidate pods matches req header address",
input: []backendmetrics.PodMetrics{
input: []types.Pod{
&backendmetrics.FakePodMetrics{Pod: &backend.Pod{Address: "nonmatched-endpoint"}},
&backendmetrics.FakePodMetrics{Pod: &backend.Pod{Address: "matched-endpoint"}},
},
Expand Down Expand Up @@ -100,7 +100,7 @@ func TestSchedule(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
scheduler := NewReqHeaderBasedScheduler()
got, err := scheduler.Schedule(context.Background(), test.req, types.ToSchedulerPodMetrics(test.input))
got, err := scheduler.Schedule(context.Background(), test.req, test.input)
if test.err != (err != nil) {
t.Errorf("Unexpected error, got %v, want %v", err, test.err)
}
Expand Down
15 changes: 12 additions & 3 deletions pkg/epp/requestcontrol/director.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,13 +195,13 @@ func (d *Director) getCandidatePodsForScheduling(ctx context.Context, requestMet

subsetMap, found := requestMetadata[subsetHintNamespace].(map[string]any)
if !found {
return schedulingtypes.ToSchedulerPodMetrics(d.datastore.PodGetAll())
return d.toSchedulerPodMetrics(d.datastore.PodGetAll())
}

// Check if endpoint key is present in the subset map and ensure there is at least one value
endpointSubsetList, found := subsetMap[subsetHintKey].([]any)
if !found {
return schedulingtypes.ToSchedulerPodMetrics(d.datastore.PodGetAll())
return d.toSchedulerPodMetrics(d.datastore.PodGetAll())
} else if len(endpointSubsetList) == 0 {
loggerTrace.Info("found empty subset filter in request metadata, filtering all pods")
return []schedulingtypes.Pod{}
Expand All @@ -227,7 +227,7 @@ func (d *Director) getCandidatePodsForScheduling(ctx context.Context, requestMet

loggerTrace.Info("filtered candidate pods by subset filtering", "podTotalCount", podTotalCount, "filteredCount", len(podFitleredList))

return schedulingtypes.ToSchedulerPodMetrics(podFitleredList)
return d.toSchedulerPodMetrics(podFitleredList)
}

// prepareRequest populates the RequestContext and calls the registered PreRequest plugins
Expand Down Expand Up @@ -257,6 +257,15 @@ func (d *Director) prepareRequest(ctx context.Context, reqCtx *handlers.RequestC
return reqCtx, nil
}

func (d *Director) toSchedulerPodMetrics(pods []backendmetrics.PodMetrics) []schedulingtypes.Pod {
pm := make([]schedulingtypes.Pod, len(pods))
for i, pod := range pods {
pm[i] = &schedulingtypes.PodMetrics{Pod: pod.GetPod().Clone(), MetricsState: pod.GetMetrics().Clone()}
}

return pm
}

func (d *Director) HandleResponse(ctx context.Context, reqCtx *handlers.RequestContext) (*handlers.RequestContext, error) {
response := &Response{
RequestId: reqCtx.Request.Headers[requtil.RequestIdHeaderKey],
Expand Down
31 changes: 15 additions & 16 deletions pkg/epp/scheduling/framework/scheduler_profile_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
k8stypes "k8s.io/apimachinery/pkg/types"

"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend"
backendmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics" // Import config for thresholds
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/plugins"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/types"
)
Expand All @@ -45,7 +44,7 @@ func TestSchedulePlugins(t *testing.T) {
tests := []struct {
name string
profile *SchedulerProfile
input []backendmetrics.PodMetrics
input []types.Pod
wantTargetPod k8stypes.NamespacedName
targetPodScore float64
// Number of expected pods to score (after filter)
Expand All @@ -59,10 +58,10 @@ func TestSchedulePlugins(t *testing.T) {
WithScorers(NewWeightedScorer(tp1, 1), NewWeightedScorer(tp2, 1)).
WithPicker(pickerPlugin).
WithPostCyclePlugins(tp1, tp2),
input: []backendmetrics.PodMetrics{
&backendmetrics.FakePodMetrics{Pod: &backend.Pod{NamespacedName: k8stypes.NamespacedName{Name: "pod1"}}},
&backendmetrics.FakePodMetrics{Pod: &backend.Pod{NamespacedName: k8stypes.NamespacedName{Name: "pod2"}}},
&backendmetrics.FakePodMetrics{Pod: &backend.Pod{NamespacedName: k8stypes.NamespacedName{Name: "pod3"}}},
input: []types.Pod{
&types.PodMetrics{Pod: &backend.Pod{NamespacedName: k8stypes.NamespacedName{Name: "pod1"}}},
&types.PodMetrics{Pod: &backend.Pod{NamespacedName: k8stypes.NamespacedName{Name: "pod2"}}},
&types.PodMetrics{Pod: &backend.Pod{NamespacedName: k8stypes.NamespacedName{Name: "pod3"}}},
},
wantTargetPod: k8stypes.NamespacedName{Name: "pod1"},
targetPodScore: 1.1,
Expand All @@ -76,10 +75,10 @@ func TestSchedulePlugins(t *testing.T) {
WithScorers(NewWeightedScorer(tp1, 60), NewWeightedScorer(tp2, 40)).
WithPicker(pickerPlugin).
WithPostCyclePlugins(tp1, tp2),
input: []backendmetrics.PodMetrics{
&backendmetrics.FakePodMetrics{Pod: &backend.Pod{NamespacedName: k8stypes.NamespacedName{Name: "pod1"}}},
&backendmetrics.FakePodMetrics{Pod: &backend.Pod{NamespacedName: k8stypes.NamespacedName{Name: "pod2"}}},
&backendmetrics.FakePodMetrics{Pod: &backend.Pod{NamespacedName: k8stypes.NamespacedName{Name: "pod3"}}},
input: []types.Pod{
&types.PodMetrics{Pod: &backend.Pod{NamespacedName: k8stypes.NamespacedName{Name: "pod1"}}},
&types.PodMetrics{Pod: &backend.Pod{NamespacedName: k8stypes.NamespacedName{Name: "pod2"}}},
&types.PodMetrics{Pod: &backend.Pod{NamespacedName: k8stypes.NamespacedName{Name: "pod3"}}},
},
wantTargetPod: k8stypes.NamespacedName{Name: "pod1"},
targetPodScore: 50,
Expand All @@ -93,10 +92,10 @@ func TestSchedulePlugins(t *testing.T) {
WithScorers(NewWeightedScorer(tp1, 1), NewWeightedScorer(tp2, 1)).
WithPicker(pickerPlugin).
WithPostCyclePlugins(tp1, tp2),
input: []backendmetrics.PodMetrics{
&backendmetrics.FakePodMetrics{Pod: &backend.Pod{NamespacedName: k8stypes.NamespacedName{Name: "pod1"}}},
&backendmetrics.FakePodMetrics{Pod: &backend.Pod{NamespacedName: k8stypes.NamespacedName{Name: "pod2"}}},
&backendmetrics.FakePodMetrics{Pod: &backend.Pod{NamespacedName: k8stypes.NamespacedName{Name: "pod3"}}},
input: []types.Pod{
&types.PodMetrics{Pod: &backend.Pod{NamespacedName: k8stypes.NamespacedName{Name: "pod1"}}},
&types.PodMetrics{Pod: &backend.Pod{NamespacedName: k8stypes.NamespacedName{Name: "pod2"}}},
&types.PodMetrics{Pod: &backend.Pod{NamespacedName: k8stypes.NamespacedName{Name: "pod3"}}},
},
numPodsToScore: 0,
err: true, // no available pods to server after filter all
Expand All @@ -123,7 +122,7 @@ func TestSchedulePlugins(t *testing.T) {
RequestId: uuid.NewString(),
}
// Run profile cycle
got, err := test.profile.Run(context.Background(), request, types.NewCycleState(), types.ToSchedulerPodMetrics(test.input))
got, err := test.profile.Run(context.Background(), request, types.NewCycleState(), test.input)

// Validate error state
if test.err != (err != nil) {
Expand All @@ -136,7 +135,7 @@ func TestSchedulePlugins(t *testing.T) {

// Validate output
wantPod := &types.PodMetrics{
Pod: &backend.Pod{NamespacedName: test.wantTargetPod, Labels: make(map[string]string)},
Pod: &backend.Pod{NamespacedName: test.wantTargetPod},
}
wantRes := &types.ProfileRunResult{
TargetPod: wantPod,
Expand Down
23 changes: 11 additions & 12 deletions pkg/epp/scheduling/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func TestSchedule(t *testing.T) {
tests := []struct {
name string
req *types.LLMRequest
input []backendmetrics.PodMetrics
input []types.Pod
wantRes *types.SchedulingResult
err bool
}{
Expand All @@ -43,7 +43,7 @@ func TestSchedule(t *testing.T) {
TargetModel: "any-model",
RequestId: uuid.NewString(),
},
input: []backendmetrics.PodMetrics{},
input: []types.Pod{},
wantRes: nil,
err: true,
},
Expand All @@ -55,10 +55,10 @@ func TestSchedule(t *testing.T) {
},
// pod2 will be picked because it has relatively low queue size, with the requested
// model being active, and has low KV cache.
input: []backendmetrics.PodMetrics{
&backendmetrics.FakePodMetrics{
input: []types.Pod{
&types.PodMetrics{
Pod: &backend.Pod{NamespacedName: k8stypes.NamespacedName{Name: "pod1"}},
Metrics: &backendmetrics.MetricsState{
MetricsState: &backendmetrics.MetricsState{
WaitingQueueSize: 0,
KVCacheUsagePercent: 0.2,
MaxActiveModels: 2,
Expand All @@ -68,9 +68,9 @@ func TestSchedule(t *testing.T) {
},
},
},
&backendmetrics.FakePodMetrics{
&types.PodMetrics{
Pod: &backend.Pod{NamespacedName: k8stypes.NamespacedName{Name: "pod2"}},
Metrics: &backendmetrics.MetricsState{
MetricsState: &backendmetrics.MetricsState{
WaitingQueueSize: 3,
KVCacheUsagePercent: 0.1,
MaxActiveModels: 2,
Expand All @@ -80,9 +80,9 @@ func TestSchedule(t *testing.T) {
},
},
},
&backendmetrics.FakePodMetrics{
&types.PodMetrics{
Pod: &backend.Pod{NamespacedName: k8stypes.NamespacedName{Name: "pod3"}},
Metrics: &backendmetrics.MetricsState{
MetricsState: &backendmetrics.MetricsState{
WaitingQueueSize: 10,
KVCacheUsagePercent: 0.2,
MaxActiveModels: 2,
Expand All @@ -97,7 +97,7 @@ func TestSchedule(t *testing.T) {
"default": {
TargetPod: &types.ScoredPod{
Pod: &types.PodMetrics{
Pod: &backend.Pod{NamespacedName: k8stypes.NamespacedName{Name: "pod2"}, Labels: make(map[string]string)},
Pod: &backend.Pod{NamespacedName: k8stypes.NamespacedName{Name: "pod2"}},
MetricsState: &backendmetrics.MetricsState{
WaitingQueueSize: 3,
KVCacheUsagePercent: 0.1,
Expand All @@ -106,7 +106,6 @@ func TestSchedule(t *testing.T) {
"foo": 1,
"critical": 1,
},
WaitingModels: map[string]int{},
},
},
},
Expand All @@ -120,7 +119,7 @@ func TestSchedule(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
scheduler := NewScheduler()
got, err := scheduler.Schedule(context.Background(), test.req, types.ToSchedulerPodMetrics(test.input))
got, err := scheduler.Schedule(context.Background(), test.req, test.input)
if test.err != (err != nil) {
t.Errorf("Unexpected error, got %v, want %v", err, test.err)
}
Expand Down
8 changes: 0 additions & 8 deletions pkg/epp/scheduling/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,14 +70,6 @@ type PodMetrics struct {
*backendmetrics.MetricsState
}

func ToSchedulerPodMetrics(pods []backendmetrics.PodMetrics) []Pod {
pm := make([]Pod, 0, len(pods))
for _, pod := range pods {
pm = append(pm, &PodMetrics{Pod: pod.GetPod().Clone(), MetricsState: pod.GetMetrics().Clone()})
}
return pm
}

// ProfileRunResult captures the profile run result.
type ProfileRunResult struct {
TargetPod Pod
Expand Down