@@ -17,6 +17,7 @@ limitations under the License.
17
17
package epp
18
18
19
19
import (
20
+ "encoding/json"
20
21
"fmt"
21
22
"strconv"
22
23
"strings"
@@ -26,9 +27,12 @@ import (
26
27
"github.com/google/go-cmp/cmp/cmpopts"
27
28
"github.com/onsi/ginkgo/v2"
28
29
"github.com/onsi/gomega"
30
+ corev1 "k8s.io/api/core/v1"
31
+ "k8s.io/apimachinery/pkg/api/errors"
29
32
"k8s.io/apimachinery/pkg/types"
30
33
"k8s.io/utils/ptr"
31
- "sigs.k8s.io/gateway-api-inference-extension/api/v1alpha2"
34
+ client "sigs.k8s.io/controller-runtime/pkg/client"
35
+ v1alpha2 "sigs.k8s.io/gateway-api-inference-extension/api/v1alpha2"
32
36
testutils "sigs.k8s.io/gateway-api-inference-extension/test/utils"
33
37
)
34
38
@@ -51,38 +55,57 @@ var _ = ginkgo.Describe("InferencePool", func() {
51
55
ginkgo .AfterEach (func () {
52
56
ginkgo .By ("Deleting the InferenceModel test resource." )
53
57
cleanupInferModelResources ()
58
+ gomega .Eventually (func () error {
59
+ err := cli .Get (ctx , types.NamespacedName {Namespace : infModel .Namespace , Name : infModel .Name }, infModel )
60
+ if err == nil {
61
+ return fmt .Errorf ("InferenceModel resource still exists" )
62
+ }
63
+ if ! errors .IsNotFound (err ) {
64
+ return nil
65
+ }
66
+ return nil
67
+ }, existsTimeout , interval ).Should (gomega .Succeed ())
54
68
})
55
69
56
70
ginkgo .When ("The Inference Extension is running" , func () {
57
71
ginkgo .It ("Should route traffic to target model servers" , func () {
58
72
for _ , t := range []struct {
59
73
api string
60
- promptOrMessages string
74
+ promptOrMessages any
61
75
}{
62
76
{
63
77
api : "/completions" ,
64
78
promptOrMessages : "Write as if you were a critic: San Francisco" ,
65
79
},
66
80
{
67
- api : "/chat/completions" ,
68
- promptOrMessages : `[{"role": "user", "content": "Write as if you were a critic: San Francisco"}]` ,
81
+ api : "/chat/completions" ,
82
+ promptOrMessages : []map [string ]any {
83
+ {
84
+ "role" : "user" ,
85
+ "content" : "Write as if you were a critic: San Francisco" ,
86
+ },
87
+ },
69
88
},
70
89
{
71
90
api : "/chat/completions" ,
72
- promptOrMessages : `[{"role": "user", "content": "Write as if you were a critic: San Francisco"},` +
73
- `{"role": "assistant", "content": "Okay, let's see..."},` +
74
- `{"role": "user", "content": "Now summarize your thoughts."}]` ,
91
+ promptOrMessages : []map [string ]any {
92
+ {
93
+ "role" : "user" ,
94
+ "content" : "Write as if you were a critic: San Francisco" ,
95
+ },
96
+ {"role" : "assistant" , "content" : "Okay, let's see..." },
97
+ {"role" : "user" , "content" : "Now summarize your thoughts." },
98
+ },
75
99
},
76
100
} {
77
- ginkgo .By ("Verifying connectivity through the inference extension with " +
78
- t .api + " api and prompt/messages: " + t .promptOrMessages )
101
+ ginkgo .By (fmt .Sprintf ("Verifying connectivity through the inference extension with %s api and prompt/messages: %v" , t .api , t .promptOrMessages ))
79
102
80
103
// Ensure the expected responses include the inferencemodel target model names.
81
104
var expected []string
82
105
for _ , m := range infModel .Spec .TargetModels {
83
106
expected = append (expected , m .Name )
84
107
}
85
- curlCmd := getCurlCommand (envoyName , nsName , envoyPort , modelName , curlTimeout , t .api , t .promptOrMessages )
108
+ curlCmd := getCurlCommand (envoyName , nsName , envoyPort , modelName , curlTimeout , t .api , t .promptOrMessages , false )
86
109
87
110
actual := make (map [string ]int )
88
111
gomega .Eventually (func () error {
@@ -106,11 +129,103 @@ var _ = ginkgo.Describe("InferencePool", func() {
106
129
if ! cmp .Equal (got , expected , cmpopts .SortSlices (func (a , b string ) bool { return a < b })) {
107
130
return fmt .Errorf ("actual (%v) != expected (%v); resp=%q" , got , expected , resp )
108
131
}
109
-
110
132
return nil
111
133
}, readyTimeout , curlInterval ).Should (gomega .Succeed ())
112
134
}
113
135
})
136
+
137
+ ginkgo .It ("Should expose EPP metrics after generating traffic" , func () {
138
+ // Define the metrics we expect to see
139
+ expectedMetrics := []string {
140
+ "inference_model_request_total" ,
141
+ "inference_model_request_error_total" ,
142
+ "inference_model_request_duration_seconds" ,
143
+ // TODO: normalized_time_per_output_token_seconds is not actually recorded yet
144
+ // "normalized_time_per_output_token_seconds",
145
+ "inference_model_request_sizes" ,
146
+ "inference_model_response_sizes" ,
147
+ "inference_model_input_tokens" ,
148
+ "inference_model_output_tokens" ,
149
+ "inference_pool_average_kv_cache_utilization" ,
150
+ "inference_pool_average_queue_size" ,
151
+ "inference_pool_per_pod_queue_size" ,
152
+ "inference_model_running_requests" ,
153
+ "inference_pool_ready_pods" ,
154
+ "inference_extension_info" ,
155
+ }
156
+
157
+ // Generate traffic by sending requests through the inference extension
158
+ ginkgo .By ("Generating traffic through the inference extension" )
159
+ curlCmd := getCurlCommand (envoyName , nsName , envoyPort , modelName , curlTimeout , "/completions" , "Write as if you were a critic: San Francisco" , true )
160
+
161
+ // Run the curl command multiple times to generate some metrics data
162
+ for i := 0 ; i < 5 ; i ++ {
163
+ _ , err := testutils .ExecCommandInPod (ctx , cfg , scheme , kubeCli , nsName , "curl" , "curl" , curlCmd )
164
+ gomega .Expect (err ).NotTo (gomega .HaveOccurred ())
165
+ }
166
+
167
+ // modify the curl command to generate some error metrics
168
+ curlCmd [len (curlCmd )- 1 ] = "invalid input"
169
+ for i := 0 ; i < 5 ; i ++ {
170
+ _ , err := testutils .ExecCommandInPod (ctx , cfg , scheme , kubeCli , nsName , "curl" , "curl" , curlCmd )
171
+ gomega .Expect (err ).NotTo (gomega .HaveOccurred ())
172
+ }
173
+
174
+ // Now scrape metrics from the EPP endpoint via the curl pod
175
+ ginkgo .By ("Scraping metrics from the EPP endpoint" )
176
+
177
+ // Get Pod IP instead of Service
178
+ podList := & corev1.PodList {}
179
+ err := cli .List (ctx , podList , client .InNamespace (nsName ), client.MatchingLabels {"app" : inferExtName })
180
+ gomega .Expect (err ).NotTo (gomega .HaveOccurred ())
181
+ gomega .Expect (podList .Items ).NotTo (gomega .BeEmpty ())
182
+ podIP := podList .Items [0 ].Status .PodIP
183
+ gomega .Expect (podIP ).NotTo (gomega .BeEmpty ())
184
+
185
+ // Get the authorization token for reading metrics
186
+ token := ""
187
+ gomega .Eventually (func () error {
188
+ token , err = getMetricsReaderToken (cli )
189
+ if err != nil {
190
+ return err
191
+ }
192
+ if token == "" {
193
+ return fmt .Errorf ("token not found" )
194
+ }
195
+ return nil
196
+ }, existsTimeout , interval ).Should (gomega .Succeed ())
197
+
198
+ // Construct the metric scraping curl command using Pod IP
199
+ metricScrapeCmd := []string {
200
+ "curl" ,
201
+ "-i" ,
202
+ "--max-time" ,
203
+ strconv .Itoa ((int )(curlTimeout .Seconds ())),
204
+ "-H" ,
205
+ "Authorization: Bearer " + token ,
206
+ fmt .Sprintf ("http://%s:%d/metrics" , podIP , 9090 ),
207
+ }
208
+
209
+ ginkgo .By ("Verifying that all expected metrics are present." )
210
+ gomega .Eventually (func () error {
211
+ // Execute the metrics scrape command inside the curl pod
212
+ resp , err := testutils .ExecCommandInPod (ctx , cfg , scheme , kubeCli , nsName , "curl" , "curl" , metricScrapeCmd )
213
+ if err != nil {
214
+ return err
215
+ }
216
+ // Verify that we got a 200 OK responsecurl
217
+ if ! strings .Contains (resp , "200 OK" ) {
218
+ return fmt .Errorf ("did not get 200 OK: %s" , resp )
219
+ }
220
+ // Check if all expected metrics are present in the metrics output
221
+ for _ , metric := range expectedMetrics {
222
+ if ! strings .Contains (resp , metric ) {
223
+ return fmt .Errorf ("expected metric %s not found in metrics output" , metric )
224
+ }
225
+ }
226
+ return nil
227
+ }, readyTimeout , curlInterval ).Should (gomega .Succeed ())
228
+ })
114
229
})
115
230
})
116
231
@@ -130,16 +245,38 @@ func newInferenceModel(ns string) *v1alpha2.InferenceModel {
130
245
Obj ()
131
246
}
132
247
248
+ func getMetricsReaderToken (k8sClient client.Client ) (string , error ) {
249
+ secret := & corev1.Secret {}
250
+ err := k8sClient .Get (ctx , types.NamespacedName {Namespace : nsName , Name : metricsReaderSecretName }, secret )
251
+ if err != nil {
252
+ return "" , err
253
+ }
254
+ return string (secret .Data ["token" ]), nil
255
+ }
256
+
133
257
// getCurlCommand returns the command, as a slice of strings, for curl'ing
134
258
// the test model server at the given name, namespace, port, and model name.
135
- func getCurlCommand (name , ns , port , model string , timeout time.Duration , api string , promptOrMessages string ) []string {
136
- var body string
259
+ func getCurlCommand (name , ns , port , model string , timeout time.Duration , api string , promptOrMessages any , streaming bool ) []string {
260
+ body := map [string ]any {
261
+ "model" : model ,
262
+ "max_tokens" : 100 ,
263
+ "temperature" : 0 ,
264
+ }
265
+ body ["model" ] = model
137
266
switch api {
138
267
case "/completions" :
139
- body = fmt . Sprintf ( `{"model": "%s", " prompt": "%s", "max_tokens": 100, "temperature": 0}` , model , promptOrMessages )
268
+ body [ " prompt"] = promptOrMessages
140
269
case "/chat/completions" :
141
- body = fmt .Sprintf (`{"model": "%s", "messages": %s, "max_tokens": 100, "temperature": 0}` , model , promptOrMessages )
270
+ body ["messages" ] = promptOrMessages
271
+ }
272
+ if streaming {
273
+ body ["stream" ] = true
274
+ body ["stream_options" ] = map [string ]any {
275
+ "include_usage" : true ,
276
+ }
142
277
}
278
+ b , err := json .Marshal (body )
279
+ gomega .Expect (err ).NotTo (gomega .HaveOccurred ())
143
280
return []string {
144
281
"curl" ,
145
282
"-i" ,
@@ -149,6 +286,6 @@ func getCurlCommand(name, ns, port, model string, timeout time.Duration, api str
149
286
"-H" ,
150
287
"Content-Type: application/json" ,
151
288
"-d" ,
152
- body ,
289
+ string ( b ) ,
153
290
}
154
291
}
0 commit comments