From 29c8bf5a22af36a61e398cad09d8eefd3d0645c6 Mon Sep 17 00:00:00 2001 From: Florian Bacher Date: Mon, 8 Apr 2024 07:13:02 +0200 Subject: [PATCH] feat: implement a Keptn metrics provider Add a Keptn metrics provider for two resources: * KeptnMetric: Verify the value of a single metric. * Analysis (via AnalysisDefinition): Run a Keptn analysis over an interval validating SLOs. Signed-off-by: Florian Bacher --- artifacts/flagger/crd.yaml | 1 + charts/flagger/templates/rbac.yaml | 13 + cmd/flagger/main.go | 1 + docs/gitbook/usage/metrics.md | 62 +++++ go.mod | 4 +- kustomize/base/flagger/crd.yaml | 1 + kustomize/base/flagger/rbac.yaml | 13 + pkg/controller/controller.go | 5 + pkg/controller/scheduler_metrics.go | 4 +- pkg/metrics/providers/factory.go | 9 +- pkg/metrics/providers/keptn.go | 256 +++++++++++++++++++ pkg/metrics/providers/keptn_test.go | 381 ++++++++++++++++++++++++++++ 12 files changed, 741 insertions(+), 9 deletions(-) create mode 100644 pkg/metrics/providers/keptn.go create mode 100644 pkg/metrics/providers/keptn_test.go diff --git a/artifacts/flagger/crd.yaml b/artifacts/flagger/crd.yaml index 3a124a4d9..99940df06 100644 --- a/artifacts/flagger/crd.yaml +++ b/artifacts/flagger/crd.yaml @@ -1298,6 +1298,7 @@ spec: - newrelic - graphite - dynatrace + - keptn address: description: API address of this provider type: string diff --git a/charts/flagger/templates/rbac.yaml b/charts/flagger/templates/rbac.yaml index c1dfc8413..ae9c70155 100644 --- a/charts/flagger/templates/rbac.yaml +++ b/charts/flagger/templates/rbac.yaml @@ -259,6 +259,19 @@ rules: - update - patch - delete + - apiGroups: + - metrics.keptn.sh + resources: + - keptnmetrics + - analyses + verbs: + - get + - list + - watch + - create + - update + - patch + - delete - nonResourceURLs: - /version verbs: diff --git a/cmd/flagger/main.go b/cmd/flagger/main.go index c229601d3..6438187fd 100644 --- a/cmd/flagger/main.go +++ b/cmd/flagger/main.go @@ -253,6 +253,7 @@ func main() { fromEnv("EVENT_WEBHOOK_URL", eventWebhook), clusterName, noCrossNamespaceRefs, + cfg, ) // leader election context diff --git a/docs/gitbook/usage/metrics.md b/docs/gitbook/usage/metrics.md index c1e5bb673..c51fc61c2 100644 --- a/docs/gitbook/usage/metrics.md +++ b/docs/gitbook/usage/metrics.md @@ -668,3 +668,65 @@ Reference the template in the canary analysis: max: 1000 interval: 1m ``` + +## Keptn + +You can create custom metric checks using the Keptn provider. +This Provider allows to verify either the value of a single [KeptnMetric](https://keptn.sh/stable/docs/reference/crd-reference/metric/), +representing the value of a single metric, +or of a [Keptn Analysis](https://keptn.sh/stable/docs/reference/crd-reference/analysis/), +which provides a flexible grading logic for analysing and prioritising a number of different +metric values coming from different data sources. + +This provider requires [Keptn](https://keptn.sh/stable/docs/installation/) to be installed in the cluster. + +Example for a Keptn metric template: + +```yaml +apiVersion: flagger.app/v1beta1 +kind: MetricTemplate +metadata: + name: response-time + namespace: istio-system +spec: + provider: + type: keptn + query: keptnmetric/my-namespace/response-time/2m/reporter=destination +``` + +This will reference the `KeptnMetric` with the name `response-time` in +the namespace `my-namespace`, which could look like the following: + +```yaml +apiVersion: metrics.keptn.sh/v1beta1 +kind: KeptnMetric +metadata: + name: response-time + namespace: my-namespace +spec: + fetchIntervalSeconds: 10 + provider: + name: my-prometheus-keptn-provider + query: histogram_quantile(0.8, sum by(le) (rate(http_server_request_latency_seconds_bucket{status_code='200', + job='simple-go-backend'}[5m[]))) +``` + +The `query` contains the following components, which are divided by `/` characters: + +``` +//// +``` + +* **type (required)**: Must be either `keptnmetric` or `analysis`. +* **namespace (required)**: The namespace of the referenced `KeptnMetric`/`AnalysisDefinition`. +* **resource-name (required):** The name of the referenced `KeptnMetric`/`AnalysisDefinition`. +* **timeframe (optional)**: The timeframe used for the Analysis. +This will usually be set to the same value as the analysis interval of a `Canary`. +Only relevant if the `type` is set to `analysis`. +* **arguments (optional)**: Arguments to be passed to an `Analysis`. +Arguments are passed as a list of key value pairs, separated by `;` characters, +e.g. `foo=bar;bar=foo`. +Only relevant if the `type` is set to `analysis`. + +For the type `analysis`, the value returned by the provider is either `0` +(if the analysis failed), or `1` (analysis passed). diff --git a/go.mod b/go.mod index 4a606babb..0db61754d 100644 --- a/go.mod +++ b/go.mod @@ -9,12 +9,14 @@ require ( github.com/davecgh/go-spew v1.1.1 github.com/go-logr/zapr v1.3.0 github.com/google/go-cmp v0.6.0 + github.com/google/uuid v1.6.0 github.com/googleapis/gax-go/v2 v2.12.4 github.com/hashicorp/go-retryablehttp v0.7.7 github.com/influxdata/influxdb-client-go/v2 v2.13.0 github.com/prometheus/client_golang v1.19.1 github.com/stretchr/testify v1.9.0 go.uber.org/zap v1.27.0 + golang.org/x/sync v0.7.0 google.golang.org/api v0.182.0 google.golang.org/genproto v0.0.0-20240528184218-531527333157 google.golang.org/grpc v1.64.0 @@ -47,7 +49,6 @@ require ( github.com/google/gnostic-models v0.6.8 // indirect github.com/google/gofuzz v1.2.0 // indirect github.com/google/s2a-go v0.1.7 // indirect - github.com/google/uuid v1.6.0 // indirect github.com/googleapis/enterprise-certificate-proxy v0.3.2 // indirect github.com/h2non/parth v0.0.0-20190131123155-b4df798d6542 // indirect github.com/hashicorp/go-cleanhttp v0.5.2 // indirect @@ -77,7 +78,6 @@ require ( golang.org/x/mod v0.15.0 // indirect golang.org/x/net v0.25.0 // indirect golang.org/x/oauth2 v0.20.0 // indirect - golang.org/x/sync v0.7.0 // indirect golang.org/x/sys v0.20.0 // indirect golang.org/x/term v0.20.0 // indirect golang.org/x/text v0.15.0 // indirect diff --git a/kustomize/base/flagger/crd.yaml b/kustomize/base/flagger/crd.yaml index 3a124a4d9..99940df06 100644 --- a/kustomize/base/flagger/crd.yaml +++ b/kustomize/base/flagger/crd.yaml @@ -1298,6 +1298,7 @@ spec: - newrelic - graphite - dynatrace + - keptn address: description: API address of this provider type: string diff --git a/kustomize/base/flagger/rbac.yaml b/kustomize/base/flagger/rbac.yaml index 69ea8445e..7e46cd99c 100644 --- a/kustomize/base/flagger/rbac.yaml +++ b/kustomize/base/flagger/rbac.yaml @@ -241,6 +241,19 @@ rules: - update - patch - delete + - apiGroups: + - metrics.keptn.sh + resources: + - keptnmetrics + - analyses + verbs: + - get + - list + - watch + - create + - update + - patch + - delete - nonResourceURLs: - /version verbs: diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 50fb63e28..cabed052a 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -21,6 +21,8 @@ import ( "sync" "time" + "k8s.io/client-go/rest" + "github.com/google/go-cmp/cmp" "go.uber.org/zap" corev1 "k8s.io/api/core/v1" @@ -49,6 +51,7 @@ const controllerAgentName = "flagger" // Controller is managing the canary objects and schedules canary deployments type Controller struct { + kubeConfig *rest.Config kubeClient kubernetes.Interface flaggerClient clientset.Interface flaggerInformers Informers @@ -91,6 +94,7 @@ func NewController( eventWebhook string, clusterName string, noCrossNamespaceRefs bool, + kubeConfig *rest.Config, ) *Controller { logger.Debug("Creating event broadcaster") flaggerscheme.AddToScheme(scheme.Scheme) @@ -105,6 +109,7 @@ func NewController( recorder.SetInfo(version, meshProvider) ctrl := &Controller{ + kubeConfig: kubeConfig, kubeClient: kubeClient, flaggerClient: flaggerClient, flaggerInformers: flaggerInformers, diff --git a/pkg/controller/scheduler_metrics.go b/pkg/controller/scheduler_metrics.go index 9bcd66d9e..ec45ef85f 100644 --- a/pkg/controller/scheduler_metrics.go +++ b/pkg/controller/scheduler_metrics.go @@ -74,7 +74,7 @@ func (c *Controller) checkMetricProviderAvailability(canary *flaggerv1.Canary) e } factory := providers.Factory{} - provider, err := factory.Provider(metric.Interval, template.Spec.Provider, credentials) + provider, err := factory.Provider(metric.Interval, template.Spec.Provider, credentials, c.kubeConfig) if err != nil { return fmt.Errorf("metric template %s.%s provider %s error: %v", metric.TemplateRef.Name, namespace, template.Spec.Provider.Type, err) @@ -260,7 +260,7 @@ func (c *Controller) runMetricChecks(canary *flaggerv1.Canary) bool { } factory := providers.Factory{} - provider, err := factory.Provider(metric.Interval, template.Spec.Provider, credentials) + provider, err := factory.Provider(metric.Interval, template.Spec.Provider, credentials, c.kubeConfig) if err != nil { c.recordEventErrorf(canary, "Metric template %s.%s provider %s error: %v", metric.TemplateRef.Name, namespace, template.Spec.Provider.Type, err) diff --git a/pkg/metrics/providers/factory.go b/pkg/metrics/providers/factory.go index db5bd9f5b..2370d7e76 100644 --- a/pkg/metrics/providers/factory.go +++ b/pkg/metrics/providers/factory.go @@ -18,15 +18,12 @@ package providers import ( flaggerv1 "github.com/fluxcd/flagger/pkg/apis/flagger/v1beta1" + rest "k8s.io/client-go/rest" ) type Factory struct{} -func (factory Factory) Provider( - metricInterval string, - provider flaggerv1.MetricTemplateProvider, - credentials map[string][]byte, -) (Interface, error) { +func (factory Factory) Provider(metricInterval string, provider flaggerv1.MetricTemplateProvider, credentials map[string][]byte, config *rest.Config) (Interface, error) { switch provider.Type { case "prometheus": return NewPrometheusProvider(provider, credentials) @@ -44,6 +41,8 @@ func (factory Factory) Provider( return NewInfluxdbProvider(provider, credentials) case "dynatrace": return NewDynatraceProvider(metricInterval, provider, credentials) + case "keptn": + return NewKeptnProvider(config) default: return NewPrometheusProvider(provider, credentials) } diff --git a/pkg/metrics/providers/keptn.go b/pkg/metrics/providers/keptn.go new file mode 100644 index 000000000..b81734389 --- /dev/null +++ b/pkg/metrics/providers/keptn.go @@ -0,0 +1,256 @@ +package providers + +import ( + "context" + "errors" + "fmt" + "k8s.io/klog/v2" + "strconv" + "strings" + "time" + + "github.com/google/uuid" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/dynamic" + "k8s.io/client-go/rest" +) + +// api version for the Keptn Metric CRDs +const ( + apiVersion = "v1beta1" + + groupName = "metrics.keptn.sh" + keptnMetricsResourceName = "keptnmetrics" + analysisResourceName = "analyses" +) + +var keptnMetricsResource = schema.GroupVersionResource{ + Group: groupName, + Version: apiVersion, + Resource: keptnMetricsResourceName, +} + +var analysisResource = schema.GroupVersionResource{ + Group: groupName, + Version: apiVersion, + Resource: analysisResourceName, +} + +type queryObject struct { + GroupVersionResource schema.GroupVersionResource + ResourceName string + DurationString string + Namespace string + Arguments map[string]interface{} +} + +type KeptnProvider struct { + client dynamic.Interface + analysisTimeout time.Duration +} + +func NewKeptnProvider(cfg *rest.Config) (*KeptnProvider, error) { + if cfg == nil { + return nil, errors.New("could not initialize KeptnProvider: no KubeConfig provided") + } + client, err := dynamic.NewForConfig(cfg) + if err != nil { + return nil, fmt.Errorf("could not initialize KeptnProvider: %w", err) + } + return &KeptnProvider{ + client: client, + analysisTimeout: 10 * time.Second, + }, nil +} + +// RunQuery fetches the value of a KeptnMetric or Analysis, +// based on the selector provided in the query. +// The format of the selector is the following: +// //// +func (k *KeptnProvider) RunQuery(query string) (float64, error) { + queryObj, err := parseQuery(query) + if err != nil { + return 0, err + } + + switch queryObj.GroupVersionResource.Resource { + case keptnMetricsResourceName: + return k.queryKeptnMetric(queryObj) + case analysisResourceName: + return k.queryKeptnAnalysis(queryObj) + default: + return 0, errors.New("unsupported query") + } + +} + +func (k *KeptnProvider) IsOnline() (bool, error) { + // TODO should we check for the keptn deployment to be up and running in the cluster? + return true, nil +} + +func (k *KeptnProvider) queryKeptnMetric(queryObj *queryObject) (float64, error) { + get, err := k.client.Resource(queryObj.GroupVersionResource). + Namespace(queryObj.Namespace). + Get( + context.Background(), + queryObj.ResourceName, + v1.GetOptions{}, + ) + + if err != nil { + return 0, fmt.Errorf("could not retrieve KeptnMetric %s/%s: %w", queryObj.Namespace, queryObj.ResourceName, err) + } + + if status, ok := get.Object["status"]; ok { + if statusObj, ok := status.(map[string]interface{}); ok { + if value, ok := statusObj["value"].(string); ok { + floatValue, err := strconv.ParseFloat(value, 64) + if err != nil { + return 0, fmt.Errorf("could not parse value of KeptnMetric %s/%s to float: %w", queryObj.Namespace, queryObj.ResourceName, err) + } + return floatValue, nil + } + } + } + return 0, fmt.Errorf("could not retrieve KeptnMetric - no value found in resource %s/%s", queryObj.Namespace, queryObj.ResourceName) +} + +func (k *KeptnProvider) queryKeptnAnalysis(obj *queryObject) (float64, error) { + analysis := &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": fmt.Sprintf("metrics.keptn.sh/%s", apiVersion), + "kind": "Analysis", + "metadata": map[string]interface{}{ + "name": fmt.Sprintf("%s-%s", obj.ResourceName, uuid.New().String()[:6]), + "namespace": obj.Namespace, + }, + "spec": map[string]interface{}{ + "analysisDefinition": map[string]interface{}{ + "name": obj.ResourceName, + }, + "timeframe": map[string]interface{}{ + "recent": obj.DurationString, + }, + "args": obj.Arguments, + }, + }, + } + + // set the timeout to 10s - this will give Keptn enough time to reconcile the Analysis + // and store the result in the status of the resource created here. + ctx, cancel := context.WithTimeout(context.Background(), k.analysisTimeout) + defer cancel() + + createdAnalysis, err := k.client. + Resource(obj.GroupVersionResource). + Namespace(obj.Namespace). + Create(ctx, analysis, v1.CreateOptions{}) + + if err != nil { + return 0, fmt.Errorf("could not create Keptn Analysis %s/%s: %w", obj.Namespace, obj.ResourceName, err) + } + + // delete the created analysis at the end of the function + defer func() { + err := k.client. + Resource(obj.GroupVersionResource). + Namespace(obj.Namespace). + Delete( + context.TODO(), + createdAnalysis.GetName(), + v1.DeleteOptions{}, + ) + if err != nil { + klog.Errorf("Could not delete Keptn Analysis '%s': %v", createdAnalysis.GetName(), err) + } + }() + + for { + // retrieve the current state of the created Analysis resource every 1s, until + // it has been completed, and the evaluation result is available. + // We do this until the timeout of the context expires. If no result is available + // by then, we return an error. + select { + case <-ctx.Done(): + return 0, fmt.Errorf("encountered timeout while waiting for Keptn Analysis %s/%s to be finished", obj.Namespace, obj.ResourceName) + case <-time.After(time.Second): + get, err := k.client.Resource(obj.GroupVersionResource).Namespace(obj.Namespace).Get(ctx, createdAnalysis.GetName(), v1.GetOptions{}) + if err != nil { + return 0, fmt.Errorf("could not check status of created Keptn Analysis %s/%s: %w", obj.Namespace, obj.ResourceName, err) + } + statusStr, ok, err := unstructured.NestedString(get.Object, "status", "state") + if err != nil { + return 0, fmt.Errorf("could not check status of created Keptn Analysis %s/%s: %w", obj.Namespace, obj.ResourceName, err) + } + if ok && statusStr == "Completed" { + passed, ok, err := unstructured.NestedBool(get.Object, "status", "pass") + if err != nil { + return 0, fmt.Errorf("could not check status of created Keptn Analysis %s/%s: %w", obj.Namespace, obj.ResourceName, err) + } + if ok { + if passed { + return 1, nil + } + return 0, nil + } + } + } + } + +} + +func parseQuery(query string) (*queryObject, error) { + result := &queryObject{} + // sanitize the query by converting to lower case, trimming spaces and line break characters + split := strings.Split( + strings.TrimSpace( + strings.TrimSuffix( + strings.ToLower(query), + "\n", + ), + ), + "/", + ) + + if len(split) < 3 { + return nil, errors.New("unexpected query format. query must be in the format ////") + } + switch split[0] { + // take into account both singular and plural naming of resource names, to reduce probability of errors + case "keptnmetric", keptnMetricsResourceName: + result.GroupVersionResource = keptnMetricsResource + case "analysis", analysisResourceName: + result.GroupVersionResource = analysisResource + // add the duration for the Analysis, if available + if len(split) >= 4 { + result.DurationString = split[3] + } else { + //set to '1m' by default + result.DurationString = "1m" + } + + // add arguments - these are provided as a comma separated list of key/value pairs + result.Arguments = map[string]interface{}{} + if len(split) >= 5 { + args := strings.Split(split[4], ";") + + for i := 0; i < len(args); i++ { + keyValue := strings.Split(args[i], "=") + if len(keyValue) == 2 { + result.Arguments[keyValue[0]] = keyValue[1] + } + } + } + + default: + return nil, errors.New("unexpected resource kind provided in the query. must be one of: ['keptnmetric', 'analysis']") + } + + result.Namespace = split[1] + result.ResourceName = split[2] + + return result, nil +} diff --git a/pkg/metrics/providers/keptn_test.go b/pkg/metrics/providers/keptn_test.go new file mode 100644 index 000000000..84dc43ac7 --- /dev/null +++ b/pkg/metrics/providers/keptn_test.go @@ -0,0 +1,381 @@ +package providers + +import ( + "context" + "errors" + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/require" + "golang.org/x/sync/errgroup" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/dynamic/fake" + "k8s.io/client-go/rest" +) + +func TestNewKeptnProvider(t *testing.T) { + provider, err := NewKeptnProvider(&rest.Config{}) + + require.Nil(t, err) + require.NotNil(t, provider) + + isOnline, err := provider.IsOnline() + require.NoError(t, err) + require.True(t, isOnline) +} + +func TestNewKeptnProvider_NoKubeConfig(t *testing.T) { + provider, err := NewKeptnProvider(nil) + + require.Error(t, err) + require.Nil(t, provider) +} + +func TestKeptnProvider_RunQuery_KeptnMetric(t *testing.T) { + tests := []struct { + name string + setupClient func() *fake.FakeDynamicClient + query string + want float64 + wantErr bool + }{ + { + name: "wrong query format", + setupClient: func() *fake.FakeDynamicClient { + fakeClient := fake.NewSimpleDynamicClient( + runtime.NewScheme(), + getSampleKeptnMetric("my-metric", "3.0"), + ) + return fakeClient + }, + query: "invalid/default", + want: 0, + wantErr: true, + }, + { + name: "unsupported resource type", + setupClient: func() *fake.FakeDynamicClient { + fakeClient := fake.NewSimpleDynamicClient( + runtime.NewScheme(), + getSampleKeptnMetric("my-metric", "3.0"), + ) + return fakeClient + }, + query: "invalid/default/my-metric", + want: 0, + wantErr: true, + }, + { + name: "get KeptnMetric value", + setupClient: func() *fake.FakeDynamicClient { + fakeClient := fake.NewSimpleDynamicClient( + runtime.NewScheme(), + getSampleKeptnMetric("my-metric", "3.0"), + ) + return fakeClient + }, + query: "keptnmetric/default/my-metric", + want: 3.0, + wantErr: false, + }, + { + name: "KeptnMetric not found", + setupClient: func() *fake.FakeDynamicClient { + fakeClient := fake.NewSimpleDynamicClient( + runtime.NewScheme(), + ) + return fakeClient + }, + query: "keptnmetric/default/my-metric", + want: 0, + wantErr: true, + }, + { + name: "KeptnMetric with invalid value", + setupClient: func() *fake.FakeDynamicClient { + fakeClient := fake.NewSimpleDynamicClient( + runtime.NewScheme(), + getSampleKeptnMetric("my-metric", "invalid"), + ) + return fakeClient + }, + query: "keptnmetric/default/my-metric", + want: 0, + wantErr: true, + }, + { + name: "KeptnMetric with no value", + setupClient: func() *fake.FakeDynamicClient { + keptnMetric := getSampleKeptnMetric("my-metric", "") + + data := keptnMetric.Object + delete(data, "status") + + keptnMetric.SetUnstructuredContent(data) + fakeClient := fake.NewSimpleDynamicClient( + runtime.NewScheme(), + keptnMetric, + ) + return fakeClient + }, + query: "keptnmetric/default/my-metric", + want: 0, + wantErr: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + k := &KeptnProvider{ + client: tt.setupClient(), + } + got, err := k.RunQuery(tt.query) + if tt.wantErr { + require.NotNil(t, err) + } else { + require.Nil(t, err) + } + + require.Equalf(t, tt.want, got, "RunQuery(%v)", tt.query) + }) + } +} + +func TestKeptnProvider_RunQueryAnalysis(t *testing.T) { + tests := []struct { + name string + setupClient func() *fake.FakeDynamicClient + // verificationFunc() will run in a separate go routine + // and check if the expected resources are created + verificationFunc func(fakeClient *fake.FakeDynamicClient) error + query string + want float64 + wantErr bool + }{ + { + name: "get passed Analysis", + setupClient: func() *fake.FakeDynamicClient { + + scheme := runtime.NewScheme() + scheme.AddKnownTypes(analysisResource.GroupVersion()) + scheme.AddKnownTypeWithName(schema.GroupVersionKind{Group: analysisResource.Group, Version: analysisResource.Version, Kind: "AnalysisList"}, &unstructured.UnstructuredList{}) + fakeClient := fake.NewSimpleDynamicClientWithCustomListKinds( + scheme, + map[schema.GroupVersionResource]string{ + analysisResource: "AnalysisList", + }, + ) + + return fakeClient + }, + verificationFunc: func(fakeClient *fake.FakeDynamicClient) error { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + for { + select { + case <-ctx.Done(): + return errors.New("timed out waiting for the condition") + case <-time.After(100 * time.Millisecond): + // verify the creation of the expected resource + list, err := fakeClient.Resource(analysisResource). + Namespace("default"). + List(ctx, v1.ListOptions{ + Limit: 1, + }) + if err != nil || len(list.Items) == 0 { + continue + } + createdAnalysis := list.Items[0] + require.Equal(t, map[string]interface{}{ + "analysisDefinition": map[string]interface{}{ + "name": "my-analysis", + }, + "args": map[string]interface{}{ + "foo": "bar", + "bar": "foo", + }, + "timeframe": map[string]interface{}{ + "recent": "5m", + }, + }, createdAnalysis.Object["spec"]) + + err = unstructured.SetNestedMap( + createdAnalysis.Object, + map[string]interface{}{ + "state": "Completed", + "pass": true, + }, + "status", + ) + + require.Nil(t, err) + + _, err = fakeClient.Resource(analysisResource).Namespace("default").Update(ctx, &createdAnalysis, v1.UpdateOptions{}) + + require.Nil(t, err) + return nil + } + } + }, + query: "analysis/default/my-analysis/5m/foo=bar;bar=foo", + want: 1, + wantErr: false, + }, + { + name: "get failed Analysis", + setupClient: func() *fake.FakeDynamicClient { + + scheme := runtime.NewScheme() + scheme.AddKnownTypes(analysisResource.GroupVersion()) + scheme.AddKnownTypeWithName(schema.GroupVersionKind{Group: analysisResource.Group, Version: analysisResource.Version, Kind: "AnalysisList"}, &unstructured.UnstructuredList{}) + fakeClient := fake.NewSimpleDynamicClientWithCustomListKinds( + scheme, + map[schema.GroupVersionResource]string{ + analysisResource: "AnalysisList", + }, + ) + + return fakeClient + }, + verificationFunc: func(fakeClient *fake.FakeDynamicClient) error { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + for { + select { + case <-ctx.Done(): + return errors.New("timed out waiting for the condition") + case <-time.After(10 * time.Millisecond): + // verify the creation of the expected resource + list, err := fakeClient.Resource(analysisResource). + Namespace("default"). + List(ctx, v1.ListOptions{ + Limit: 1, + }) + if err != nil || len(list.Items) == 0 { + continue + } + createdAnalysis := list.Items[0] + require.Equal(t, map[string]interface{}{ + "analysisDefinition": map[string]interface{}{ + "name": "my-analysis", + }, + "args": map[string]interface{}{}, + "timeframe": map[string]interface{}{ + "recent": "1m", + }, + }, createdAnalysis.Object["spec"]) + + err = unstructured.SetNestedMap( + createdAnalysis.Object, + map[string]interface{}{ + "state": "Completed", + "pass": false, + }, + "status", + ) + + require.Nil(t, err) + + _, err = fakeClient.Resource(analysisResource).Namespace("default").Update(ctx, &createdAnalysis, v1.UpdateOptions{}) + + require.Nil(t, err) + return nil + } + } + }, + query: "analysis/default/my-analysis", + want: 0, + wantErr: false, + }, + { + name: "analysis does not finish", + setupClient: func() *fake.FakeDynamicClient { + + scheme := runtime.NewScheme() + scheme.AddKnownTypes(analysisResource.GroupVersion()) + scheme.AddKnownTypeWithName(schema.GroupVersionKind{Group: analysisResource.Group, Version: analysisResource.Version, Kind: "AnalysisList"}, &unstructured.UnstructuredList{}) + fakeClient := fake.NewSimpleDynamicClientWithCustomListKinds( + scheme, + map[schema.GroupVersionResource]string{ + analysisResource: "AnalysisList", + }, + ) + + return fakeClient + }, + verificationFunc: func(fakeClient *fake.FakeDynamicClient) error { + return nil + }, + query: "analysis/default/my-analysis", + want: 0, + wantErr: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + fakeClient := tt.setupClient() + k := &KeptnProvider{ + client: fakeClient, + analysisTimeout: 1 * time.Second, + } + + ctx := context.Background() + grp, ctx := errgroup.WithContext(ctx) + + grp.Go(func() error { + return tt.verificationFunc(fakeClient) + }) + + got, err := k.RunQuery(tt.query) + if tt.wantErr { + require.NotNil(t, err) + } else { + require.Nil(t, err) + } + + err = grp.Wait() + require.Nil(t, err) + + require.Equalf(t, tt.want, got, "RunQuery(%v)", tt.query) + + // verify that all created Analysis resources have been cleaned up + list, err := fakeClient.Resource(analysisResource). + Namespace("default"). + List(ctx, v1.ListOptions{ + Limit: 1, + }) + require.NoError(t, err) + + require.Empty(t, list.Items) + }) + } +} + +func getSampleKeptnMetric(metricName, value string) *unstructured.Unstructured { + keptnMetric := &unstructured.Unstructured{} + keptnMetric.SetUnstructuredContent(map[string]interface{}{ + "apiVersion": fmt.Sprintf("metrics.keptn.sh/%s", apiVersion), + "kind": "KeptnMetric", + "metadata": map[string]interface{}{ + "name": metricName, + "namespace": "default", + }, + "spec": map[string]interface{}{ + "fetchIntervalSeconds": "2", + "provider": map[string]interface{}{ + "name": "my-provider", + }, + "query": "my-query", + }, + "status": map[string]interface{}{ + "value": value, + }, + }) + + return keptnMetric +}