Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions test/e2e/epp/e2e_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package epp

import (
"context"
"errors"
"fmt"
"os"
"strings"
Expand Down Expand Up @@ -70,6 +71,8 @@ const (
envoyPort = "8081"
// inferExtName is the name of the inference extension test resources.
inferExtName = "vllm-llama3-8b-instruct-epp"
// metricsReaderSecretName is the name of the metrics reader secret which stores sa token to read epp metrics.
metricsReaderSecretName = "inference-gateway-sa-metrics-reader-secret"
// clientManifest is the manifest for the client test resources.
clientManifest = "../../testdata/client.yaml"
// modelServerSecretManifest is the manifest for the model server secret resource.
Expand All @@ -82,6 +85,8 @@ const (
inferExtManifest = "../../testdata/inferencepool-e2e.yaml"
// envoyManifest is the manifest for the envoy proxy test resources.
envoyManifest = "../../testdata/envoy.yaml"
// metricsRbacManifest is the manifest for the rbac resources for testing metrics.
metricsRbacManifest = "../../testdata/metrics-rbac.yaml"
// modelServerManifestFilepathEnvVar is the env var that holds absolute path to the manifest for the model server test resource.
modelServerManifestFilepathEnvVar = "MANIFEST_PATH"
)
Expand Down Expand Up @@ -133,6 +138,7 @@ func setupInfra() {
createInferExt(cli, inferExtManifest)
createClient(cli, clientManifest)
createEnvoy(cli, envoyManifest)
createMetricsRbac(cli, metricsRbacManifest)
// Run this step last, as it requires additional time for the model server to become ready.
createModelServer(cli, modelServerManifestArray, modelServerManifestPath)
}
Expand Down Expand Up @@ -259,6 +265,30 @@ func createClient(k8sClient client.Client, filePath string) {
testutils.PodReady(ctx, k8sClient, pod, readyTimeout, interval)
}

// createMetricsRbac creates the metrics RBAC resources from the manifest file.
func createMetricsRbac(k8sClient client.Client, filePath string) {
inManifests := readYaml(filePath)
ginkgo.By("Replacing placeholder namespace with E2E_NS environment variable")
outManifests := []string{}
for _, m := range inManifests {
outManifests = append(outManifests, strings.ReplaceAll(m, "$E2E_NS", nsName))
}
ginkgo.By("Creating RBAC resources for scraping metrics from manifest: " + filePath)
createObjsFromYaml(k8sClient, outManifests)

// wait for sa token to exist
testutils.EventuallyExists(ctx, func() error {
token, err := getMetricsReaderToken(k8sClient)
if err != nil {
return err
}
if len(token) == 0 {
return errors.New("failed to get metrics reader token")
}
return nil
}, existsTimeout, interval)
}

// createModelServer creates the model server resources used for testing from the given filePaths.
func createModelServer(k8sClient client.Client, modelServerManifestArray []string, deployPath string) {
ginkgo.By("Creating model server resources from manifest: " + deployPath)
Expand Down
170 changes: 154 additions & 16 deletions test/e2e/epp/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ limitations under the License.
package epp

import (
"encoding/json"
"errors"
"fmt"
"strconv"
"strings"
Expand All @@ -26,9 +28,12 @@ import (
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/onsi/ginkgo/v2"
"github.com/onsi/gomega"
corev1 "k8s.io/api/core/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"
"k8s.io/utils/ptr"
"sigs.k8s.io/gateway-api-inference-extension/api/v1alpha2"
client "sigs.k8s.io/controller-runtime/pkg/client"
v1alpha2 "sigs.k8s.io/gateway-api-inference-extension/api/v1alpha2"
testutils "sigs.k8s.io/gateway-api-inference-extension/test/utils"
)

Expand All @@ -51,38 +56,57 @@ var _ = ginkgo.Describe("InferencePool", func() {
ginkgo.AfterEach(func() {
ginkgo.By("Deleting the InferenceModel test resource.")
cleanupInferModelResources()
gomega.Eventually(func() error {
err := cli.Get(ctx, types.NamespacedName{Namespace: infModel.Namespace, Name: infModel.Name}, infModel)
if err == nil {
return errors.New("InferenceModel resource still exists")
}
if !k8serrors.IsNotFound(err) {
return nil
}
return nil
}, existsTimeout, interval).Should(gomega.Succeed())
})

ginkgo.When("The Inference Extension is running", func() {
ginkgo.It("Should route traffic to target model servers", func() {
for _, t := range []struct {
api string
promptOrMessages string
promptOrMessages any
}{
{
api: "/completions",
promptOrMessages: "Write as if you were a critic: San Francisco",
},
{
api: "/chat/completions",
promptOrMessages: `[{"role": "user", "content": "Write as if you were a critic: San Francisco"}]`,
api: "/chat/completions",
promptOrMessages: []map[string]any{
{
"role": "user",
"content": "Write as if you were a critic: San Francisco",
},
},
},
{
api: "/chat/completions",
promptOrMessages: `[{"role": "user", "content": "Write as if you were a critic: San Francisco"},` +
`{"role": "assistant", "content": "Okay, let's see..."},` +
`{"role": "user", "content": "Now summarize your thoughts."}]`,
promptOrMessages: []map[string]any{
{
"role": "user",
"content": "Write as if you were a critic: San Francisco",
},
{"role": "assistant", "content": "Okay, let's see..."},
{"role": "user", "content": "Now summarize your thoughts."},
},
},
} {
ginkgo.By("Verifying connectivity through the inference extension with " +
t.api + " api and prompt/messages: " + t.promptOrMessages)
ginkgo.By(fmt.Sprintf("Verifying connectivity through the inference extension with %s api and prompt/messages: %v", t.api, t.promptOrMessages))

// Ensure the expected responses include the inferencemodel target model names.
var expected []string
for _, m := range infModel.Spec.TargetModels {
expected = append(expected, m.Name)
}
curlCmd := getCurlCommand(envoyName, nsName, envoyPort, modelName, curlTimeout, t.api, t.promptOrMessages)
curlCmd := getCurlCommand(envoyName, nsName, envoyPort, modelName, curlTimeout, t.api, t.promptOrMessages, false)

actual := make(map[string]int)
gomega.Eventually(func() error {
Expand All @@ -106,11 +130,103 @@ var _ = ginkgo.Describe("InferencePool", func() {
if !cmp.Equal(got, expected, cmpopts.SortSlices(func(a, b string) bool { return a < b })) {
return fmt.Errorf("actual (%v) != expected (%v); resp=%q", got, expected, resp)
}

return nil
}, readyTimeout, curlInterval).Should(gomega.Succeed())
}
})

ginkgo.It("Should expose EPP metrics after generating traffic", func() {
// Define the metrics we expect to see
expectedMetrics := []string{
"inference_model_request_total",
"inference_model_request_error_total",
"inference_model_request_duration_seconds",
// TODO: normalized_time_per_output_token_seconds is not actually recorded yet
// "normalized_time_per_output_token_seconds",
"inference_model_request_sizes",
"inference_model_response_sizes",
"inference_model_input_tokens",
"inference_model_output_tokens",
"inference_pool_average_kv_cache_utilization",
"inference_pool_average_queue_size",
"inference_pool_per_pod_queue_size",
"inference_model_running_requests",
"inference_pool_ready_pods",
"inference_extension_info",
}

// Generate traffic by sending requests through the inference extension
ginkgo.By("Generating traffic through the inference extension")
curlCmd := getCurlCommand(envoyName, nsName, envoyPort, modelName, curlTimeout, "/completions", "Write as if you were a critic: San Francisco", true)

// Run the curl command multiple times to generate some metrics data
for i := 0; i < 5; i++ {
_, err := testutils.ExecCommandInPod(ctx, cfg, scheme, kubeCli, nsName, "curl", "curl", curlCmd)
gomega.Expect(err).NotTo(gomega.HaveOccurred())
}

// modify the curl command to generate some error metrics
curlCmd[len(curlCmd)-1] = "invalid input"
for i := 0; i < 5; i++ {
_, err := testutils.ExecCommandInPod(ctx, cfg, scheme, kubeCli, nsName, "curl", "curl", curlCmd)
gomega.Expect(err).NotTo(gomega.HaveOccurred())
}

// Now scrape metrics from the EPP endpoint via the curl pod
ginkgo.By("Scraping metrics from the EPP endpoint")

// Get Pod IP instead of Service
podList := &corev1.PodList{}
err := cli.List(ctx, podList, client.InNamespace(nsName), client.MatchingLabels{"app": inferExtName})
gomega.Expect(err).NotTo(gomega.HaveOccurred())
gomega.Expect(podList.Items).NotTo(gomega.BeEmpty())
podIP := podList.Items[0].Status.PodIP
gomega.Expect(podIP).NotTo(gomega.BeEmpty())

// Get the authorization token for reading metrics
token := ""
gomega.Eventually(func() error {
token, err = getMetricsReaderToken(cli)
if err != nil {
return err
}
if token == "" {
return errors.New("token not found")
}
return nil
}, existsTimeout, interval).Should(gomega.Succeed())

// Construct the metric scraping curl command using Pod IP
metricScrapeCmd := []string{
"curl",
"-i",
"--max-time",
strconv.Itoa((int)(curlTimeout.Seconds())),
"-H",
"Authorization: Bearer " + token,
fmt.Sprintf("http://%s:%d/metrics", podIP, 9090),
}

ginkgo.By("Verifying that all expected metrics are present.")
gomega.Eventually(func() error {
// Execute the metrics scrape command inside the curl pod
resp, err := testutils.ExecCommandInPod(ctx, cfg, scheme, kubeCli, nsName, "curl", "curl", metricScrapeCmd)
if err != nil {
return err
}
// Verify that we got a 200 OK responsecurl
if !strings.Contains(resp, "200 OK") {
return fmt.Errorf("did not get 200 OK: %s", resp)
}
// Check if all expected metrics are present in the metrics output
for _, metric := range expectedMetrics {
if !strings.Contains(resp, metric) {
return fmt.Errorf("expected metric %s not found in metrics output", metric)
}
}
return nil
}, readyTimeout, curlInterval).Should(gomega.Succeed())
})
})
})

Expand All @@ -130,16 +246,38 @@ func newInferenceModel(ns string) *v1alpha2.InferenceModel {
Obj()
}

func getMetricsReaderToken(k8sClient client.Client) (string, error) {
secret := &corev1.Secret{}
err := k8sClient.Get(ctx, types.NamespacedName{Namespace: nsName, Name: metricsReaderSecretName}, secret)
if err != nil {
return "", err
}
return string(secret.Data["token"]), nil
}

// getCurlCommand returns the command, as a slice of strings, for curl'ing
// the test model server at the given name, namespace, port, and model name.
func getCurlCommand(name, ns, port, model string, timeout time.Duration, api string, promptOrMessages string) []string {
var body string
func getCurlCommand(name, ns, port, model string, timeout time.Duration, api string, promptOrMessages any, streaming bool) []string {
body := map[string]any{
"model": model,
"max_tokens": 100,
"temperature": 0,
}
body["model"] = model
switch api {
case "/completions":
body = fmt.Sprintf(`{"model": "%s", "prompt": "%s", "max_tokens": 100, "temperature": 0}`, model, promptOrMessages)
body["prompt"] = promptOrMessages
case "/chat/completions":
body = fmt.Sprintf(`{"model": "%s", "messages": %s, "max_tokens": 100, "temperature": 0}`, model, promptOrMessages)
body["messages"] = promptOrMessages
}
if streaming {
body["stream"] = true
body["stream_options"] = map[string]any{
"include_usage": true,
}
}
b, err := json.Marshal(body)
gomega.Expect(err).NotTo(gomega.HaveOccurred())
return []string{
"curl",
"-i",
Expand All @@ -149,6 +287,6 @@ func getCurlCommand(name, ns, port, model string, timeout time.Duration, api str
"-H",
"Content-Type: application/json",
"-d",
body,
string(b),
}
}
37 changes: 37 additions & 0 deletions test/testdata/metrics-rbac.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: inference-gateway-metrics-reader
rules:
- nonResourceURLs:
- /metrics
verbs:
- get
---
apiVersion: v1
kind: ServiceAccount
metadata:
name: inference-gateway-sa-metrics-reader
namespace: $E2E_NS
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: inference-gateway-sa-metrics-reader-role-binding
subjects:
- kind: ServiceAccount
name: inference-gateway-sa-metrics-reader
namespace: $E2E_NS
roleRef:
kind: ClusterRole
name: inference-gateway-metrics-reader
apiGroup: rbac.authorization.k8s.io
---
apiVersion: v1
kind: Secret
metadata:
name: inference-gateway-sa-metrics-reader-secret
namespace: $E2E_NS
annotations:
kubernetes.io/service-account.name: inference-gateway-sa-metrics-reader
type: kubernetes.io/service-account-token
22 changes: 22 additions & 0 deletions test/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,24 @@ func DeleteClusterResources(ctx context.Context, cli client.Client) error {
if err != nil && !apierrors.IsNotFound(err) {
return err
}
metricsReaderBinding := &rbacv1.ClusterRoleBinding{
ObjectMeta: metav1.ObjectMeta{
Name: "inference-gateway-sa-metrics-reader-role-binding",
},
}
err = cli.Delete(ctx, metricsReaderBinding, client.PropagationPolicy(metav1.DeletePropagationForeground))
if err != nil && !apierrors.IsNotFound(err) {
return err
}
metricsReaderRole := &rbacv1.ClusterRole{
ObjectMeta: metav1.ObjectMeta{
Name: "inference-gateway-metrics-reader",
},
}
err = cli.Delete(ctx, metricsReaderRole, client.PropagationPolicy(metav1.DeletePropagationForeground))
if err != nil && !apierrors.IsNotFound(err) {
return err
}
model := &apiextv1.CustomResourceDefinition{
ObjectMeta: metav1.ObjectMeta{
Name: "inferencemodels.inference.networking.x-k8s.io",
Expand Down Expand Up @@ -106,6 +124,10 @@ func DeleteNamespacedResources(ctx context.Context, cli client.Client, ns string
if err != nil && !apierrors.IsNotFound(err) {
return err
}
err = cli.DeleteAllOf(ctx, &corev1.ServiceAccount{}, client.InNamespace(ns), client.PropagationPolicy(metav1.DeletePropagationForeground))
if err != nil && !apierrors.IsNotFound(err) {
return err
}
err = cli.DeleteAllOf(ctx, &v1alpha2.InferencePool{}, client.InNamespace(ns), client.PropagationPolicy(metav1.DeletePropagationForeground))
if err != nil && !apierrors.IsNotFound(err) {
return err
Expand Down