diff --git a/CHANGELOG.md b/CHANGELOG.md index 0c0c5757651..4b646580924 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -64,6 +64,7 @@ To learn more about active deprecations, we recommend checking [GitHub Discussio - TODO ([#XXX](https://github.com/kedacore/keda/issues/XXX)) - **General**: Add --ca-dir flag to KEDA operator to specify directories with CA certificates for scalers to authenticate TLS connections (defaults to /custom/ca) ([#5860](https://github.com/kedacore/keda/issues/5860)) +- **General**: Add Dynatrace Scaler ([#5685](https://github.com/kedacore/keda/pull/5685)) - **General**: Declarative parsing of scaler config ([#5037](https://github.com/kedacore/keda/issues/5037)|[#5797](https://github.com/kedacore/keda/issues/5797)) - **General**: Introduce new Splunk Scaler ([#5904](https://github.com/kedacore/keda/issues/5904)) - **General**: Provide CloudEvents around the management of ScaledObjects resources ([#3522](https://github.com/kedacore/keda/issues/3522)) diff --git a/pkg/scalers/dynatrace_scaler.go b/pkg/scalers/dynatrace_scaler.go new file mode 100644 index 00000000000..76e2099eb3e --- /dev/null +++ b/pkg/scalers/dynatrace_scaler.go @@ -0,0 +1,195 @@ +package scalers + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "io" + "net/http" + neturl "net/url" + "strings" + + "github.com/go-logr/logr" + v2 "k8s.io/api/autoscaling/v2" + "k8s.io/metrics/pkg/apis/external_metrics" + + "github.com/kedacore/keda/v2/pkg/scalers/scalersconfig" + kedautil "github.com/kedacore/keda/v2/pkg/util" +) + +const ( + dynatraceMetricDataPointsAPI = "api/v2/metrics/query" +) + +type dynatraceScaler struct { + metricType v2.MetricTargetType + metadata *dynatraceMetadata + httpClient *http.Client + logger logr.Logger +} + +type dynatraceMetadata struct { + Host string `keda:"name=host, order=triggerMetadata;authParams"` + Token string `keda:"name=token, order=authParams"` + MetricSelector string `keda:"name=metricSelector, order=triggerMetadata"` + FromTimestamp string `keda:"name=from, order=triggerMetadata, default=now-2h, optional"` + Threshold float64 `keda:"name=threshold, order=triggerMetadata"` + ActivationThreshold float64 `keda:"name=activationThreshold, order=triggerMetadata, optional"` + TriggerIndex int +} + +// Model of relevant part of Dynatrace's Metric Data Points API Response +// as per https://docs.dynatrace.com/docs/dynatrace-api/environment-api/metric-v2/get-data-points#definition--MetricData +type dynatraceResponse struct { + Result []struct { + Data []struct { + Values []float64 `json:"values"` + } `json:"data"` + } `json:"result"` +} + +func NewDynatraceScaler(config *scalersconfig.ScalerConfig) (Scaler, error) { + metricType, err := GetMetricTargetType(config) + if err != nil { + return nil, fmt.Errorf("error getting scaler metric type: %w", err) + } + + logger := InitializeLogger(config, "dynatrace_scaler") + + meta, err := parseDynatraceMetadata(config) + if err != nil { + return nil, fmt.Errorf("error parsing dynatrace metadata: %w", err) + } + + httpClient := kedautil.CreateHTTPClient(config.GlobalHTTPTimeout, false) + + logMsg := fmt.Sprintf("Initializing Dynatrace Scaler (Host: %s)", meta.Host) + + logger.Info(logMsg) + + return &dynatraceScaler{ + metricType: metricType, + metadata: meta, + httpClient: httpClient, + logger: logger}, nil +} + +func parseDynatraceMetadata(config *scalersconfig.ScalerConfig) (*dynatraceMetadata, error) { + meta := dynatraceMetadata{} + + meta.TriggerIndex = config.TriggerIndex + if err := config.TypedConfig(&meta); err != nil { + return nil, fmt.Errorf("error parsing dynatrace metadata: %w", err) + } + return &meta, nil +} + +func (s *dynatraceScaler) Close(context.Context) error { + if s.httpClient != nil { + s.httpClient.CloseIdleConnections() + } + return nil +} + +// Validate that response object contains the minimum expected structure +// as per https://docs.dynatrace.com/docs/dynatrace-api/environment-api/metric-v2/get-data-points#definition--MetricData +func validateDynatraceResponse(response *dynatraceResponse) error { + if len(response.Result) == 0 { + return errors.New("dynatrace response does not contain any results") + } + if len(response.Result[0].Data) == 0 { + return errors.New("dynatrace response does not contain any metric series") + } + if len(response.Result[0].Data[0].Values) == 0 { + return errors.New("dynatrace response does not contain any values for the metric series") + } + return nil +} + +func (s *dynatraceScaler) GetMetricValue(ctx context.Context) (float64, error) { + /* + * Build request + */ + var req *http.Request + var err error + + // Append host information to appropriate API endpoint + // Trailing slashes are removed from provided host information to avoid double slashes in the URL + dynatraceAPIURL := fmt.Sprintf("%s/%s", strings.TrimRight(s.metadata.Host, "/"), dynatraceMetricDataPointsAPI) + + // Add query parameters to the URL + url, _ := neturl.Parse(dynatraceAPIURL) + queryString := url.Query() + queryString.Set("metricSelector", s.metadata.MetricSelector) + queryString.Set("from", s.metadata.FromTimestamp) + url.RawQuery = queryString.Encode() + + req, err = http.NewRequestWithContext(ctx, "GET", url.String(), nil) + if err != nil { + return 0, err + } + + // Authentication header as per https://docs.dynatrace.com/docs/dynatrace-api/basics/dynatrace-api-authentication#authenticate + req.Header.Add("Authorization", fmt.Sprintf("Api-Token %s", s.metadata.Token)) + + /* + * Execute request + */ + r, err := s.httpClient.Do(req) + if err != nil { + return 0, err + } + defer r.Body.Close() + + if r.StatusCode != http.StatusOK { + msg := fmt.Sprintf("%s: api returned %d", r.Request.URL.Path, r.StatusCode) + return 0, errors.New(msg) + } + + /* + * Parse response + */ + b, err := io.ReadAll(r.Body) + if err != nil { + return 0, err + } + var dynatraceResponse *dynatraceResponse + err = json.Unmarshal(b, &dynatraceResponse) + if err != nil { + return -1, fmt.Errorf("unable to parse Dynatrace Metric Data Points API response: %w", err) + } + + err = validateDynatraceResponse(dynatraceResponse) + if err != nil { + return 0, err + } + + return dynatraceResponse.Result[0].Data[0].Values[0], nil +} + +func (s *dynatraceScaler) GetMetricsAndActivity(ctx context.Context, metricName string) ([]external_metrics.ExternalMetricValue, bool, error) { + val, err := s.GetMetricValue(ctx) + + if err != nil { + s.logger.Error(err, "error executing Dynatrace query") + return []external_metrics.ExternalMetricValue{}, false, err + } + + metric := GenerateMetricInMili(metricName, val) + + return []external_metrics.ExternalMetricValue{metric}, val > s.metadata.ActivationThreshold, nil +} + +func (s *dynatraceScaler) GetMetricSpecForScaling(context.Context) []v2.MetricSpec { + externalMetric := &v2.ExternalMetricSource{ + Metric: v2.MetricIdentifier{ + Name: GenerateMetricNameWithIndex(s.metadata.TriggerIndex, kedautil.NormalizeString("dynatrace")), + }, + Target: GetMetricTargetMili(s.metricType, s.metadata.Threshold), + } + metricSpec := v2.MetricSpec{ + External: externalMetric, Type: externalMetricType, + } + return []v2.MetricSpec{metricSpec} +} diff --git a/pkg/scalers/dynatrace_scaler_test.go b/pkg/scalers/dynatrace_scaler_test.go new file mode 100644 index 00000000000..a29a3de56d6 --- /dev/null +++ b/pkg/scalers/dynatrace_scaler_test.go @@ -0,0 +1,74 @@ +package scalers + +import ( + "context" + "fmt" + "testing" + + "github.com/kedacore/keda/v2/pkg/scalers/scalersconfig" +) + +type dynatraceMetadataTestData struct { + metadata map[string]string + authParams map[string]string + errorCase bool +} + +type dynatraceMetricIdentifier struct { + metadataTestData *dynatraceMetadataTestData + triggerIndex int + name string +} + +var testDynatraceMetadata = []dynatraceMetadataTestData{ + {map[string]string{}, map[string]string{}, true}, + // all properly formed + {map[string]string{"threshold": "100", "from": "now-3d", "metricSelector": "MyCustomEvent:filter(eq(\"someProperty\",\"someValue\")):count:splitBy(\"dt.entity.process_group\"):fold"}, map[string]string{"host": "http://dummy:1234", "token": "dummy"}, false}, + // malformed threshold + {map[string]string{"threshold": "abc", "from": "now-3d", "metricSelector": "MyCustomEvent:filter(eq(\"someProperty\",\"someValue\")):count:splitBy(\"dt.entity.process_group\"):fold"}, map[string]string{"host": "http://dummy:1234", "token": "dummy"}, true}, + // malformed activationThreshold + {map[string]string{"activationThreshold": "abc", "threshold": "100", "from": "now-3d", "metricSelector": "MyCustomEvent:filter(eq(\"someProperty\",\"someValue\")):count:splitBy(\"dt.entity.process_group\"):fold"}, map[string]string{"host": "http://dummy:1234", "token": "dummy"}, true}, + // missing threshold + {map[string]string{"metricSelector": "MyCustomEvent:filter(eq(\"someProperty\",\"someValue\")):count:splitBy(\"dt.entity.process_group\"):fold"}, map[string]string{"host": "http://dummy:1234", "token": "dummy"}, true}, + // missing metricsSelector + {map[string]string{"threshold": "100"}, map[string]string{"host": "http://dummy:1234", "token": "dummy"}, true}, + // missing token (must come from auth params) + {map[string]string{"token": "foo", "threshold": "100", "from": "now-3d", "metricSelector": "MyCustomEvent:filter(eq(\"someProperty\",\"someValue\")):count:splitBy(\"dt.entity.process_group\"):fold"}, map[string]string{"host": "http://dummy:1234"}, true}, +} + +var dynatraceMetricIdentifiers = []dynatraceMetricIdentifier{ + {&testDynatraceMetadata[1], 0, "s0-dynatrace"}, + {&testDynatraceMetadata[1], 1, "s1-dynatrace"}, +} + +func TestDynatraceParseMetadata(t *testing.T) { + for _, testData := range testDynatraceMetadata { + _, err := parseDynatraceMetadata(&scalersconfig.ScalerConfig{TriggerMetadata: testData.metadata, AuthParams: testData.authParams}) + if err != nil && !testData.errorCase { + fmt.Printf("X: %s", testData.metadata) + t.Error("Expected success but got error", err) + } + if testData.errorCase && err == nil { + fmt.Printf("X: %s", testData.metadata) + t.Error("Expected error but got success") + } + } +} +func TestDynatraceGetMetricSpecForScaling(t *testing.T) { + for _, testData := range dynatraceMetricIdentifiers { + meta, err := parseDynatraceMetadata(&scalersconfig.ScalerConfig{TriggerMetadata: testData.metadataTestData.metadata, AuthParams: testData.metadataTestData.authParams, TriggerIndex: testData.triggerIndex}) + if err != nil { + t.Fatal("Could not parse metadata:", err) + } + mockNewRelicScaler := dynatraceScaler{ + metadata: meta, + httpClient: nil, + } + + metricSpec := mockNewRelicScaler.GetMetricSpecForScaling(context.Background()) + metricName := metricSpec[0].External.Metric.Name + if metricName != testData.name { + t.Error("Wrong External metric source name:", metricName) + } + } +} diff --git a/pkg/scaling/scalers_builder.go b/pkg/scaling/scalers_builder.go index 702521f7719..1f4549c7ffa 100644 --- a/pkg/scaling/scalers_builder.go +++ b/pkg/scaling/scalers_builder.go @@ -162,6 +162,8 @@ func buildScaler(ctx context.Context, client client.Client, triggerType string, return scalers.NewCronScaler(config) case "datadog": return scalers.NewDatadogScaler(ctx, config) + case "dynatrace": + return scalers.NewDynatraceScaler(config) case "elasticsearch": return scalers.NewElasticsearchScaler(config) case "etcd": diff --git a/tests/scalers/dynatrace/dynatrace_test.go b/tests/scalers/dynatrace/dynatrace_test.go new file mode 100644 index 00000000000..7d20231a91a --- /dev/null +++ b/tests/scalers/dynatrace/dynatrace_test.go @@ -0,0 +1,231 @@ +//go:build e2e +// +build e2e + +package dynatrace_test + +import ( + "bytes" + "encoding/base64" + "fmt" + "net/http" + "os" + "testing" + "time" + + "github.com/joho/godotenv" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "k8s.io/client-go/kubernetes" + + . "github.com/kedacore/keda/v2/tests/helper" +) + +// Load environment variables from .env file +var _ = godotenv.Load("../../.env") + +const ( + testName = "dynatrace-test" +) + +var ( + testNamespace = fmt.Sprintf("%s-ns", testName) + deploymentName = fmt.Sprintf("%s-deployment", testName) + scaledObjectName = fmt.Sprintf("%s-so", testName) + secretName = fmt.Sprintf("%s-secret", testName) + triggerAuthName = fmt.Sprintf("%s-ta", testName) + dynatraceHost = os.Getenv("DYNATRACE_HOST") + dynatraceToken = os.Getenv("DYNATRACE_METRICS_TOKEN") + dynatraceInjestHost = fmt.Sprintf("%s/api/v2/metrics/ingest", dynatraceHost) + dynatraceMetricName = fmt.Sprintf("metric-%d", GetRandomNumber()) + minReplicaCount = 0 + maxReplicaCount = 2 +) + +type templateData struct { + TestNamespace string + DeploymentName string + ScaledObjectName string + TriggerAuthName string + SecretName string + DynatraceToken string + DynatraceHost string + MinReplicaCount string + MaxReplicaCount string + MetricName string +} + +const ( + secretTemplate = `apiVersion: v1 +kind: Secret +metadata: + name: {{.SecretName}} + namespace: {{.TestNamespace}} +data: + apiToken: {{.DynatraceToken}} +` + + triggerAuthenticationTemplate = `apiVersion: keda.sh/v1alpha1 +kind: TriggerAuthentication +metadata: + name: {{.TriggerAuthName}} + namespace: {{.TestNamespace}} +spec: + secretTargetRef: + - parameter: token + name: {{.SecretName}} + key: apiToken +` + deploymentTemplate = `apiVersion: apps/v1 +kind: Deployment +metadata: + name: {{.DeploymentName}} + namespace: {{.TestNamespace}} + labels: + app: {{.DeploymentName}} +spec: + replicas: 0 + selector: + matchLabels: + app: {{.DeploymentName}} + template: + metadata: + labels: + app: {{.DeploymentName}} + spec: + containers: + - name: prom-test-app + image: tbickford/simple-web-app-prometheus:a13ade9 + imagePullPolicy: IfNotPresent +` + + scaledObjectTemplate = `apiVersion: keda.sh/v1alpha1 +kind: ScaledObject +metadata: + name: {{.ScaledObjectName}} + namespace: {{.TestNamespace}} + labels: + app: {{.DeploymentName}} +spec: + scaleTargetRef: + name: {{.DeploymentName}} + minReplicaCount: {{.MinReplicaCount}} + maxReplicaCount: {{.MaxReplicaCount}} + pollingInterval: 1 + cooldownPeriod: 1 + advanced: + horizontalPodAutoscalerConfig: + behavior: + scaleDown: + stabilizationWindowSeconds: 10 + triggers: + - type: dynatrace + metadata: + host: {{.DynatraceHost}} + threshold: "2" + activationThreshold: "3" + metricSelector: "{{.MetricName}}:max" + from: now-2m + authenticationRef: + name: {{.TriggerAuthName}} +` +) + +func TestDynatraceScaler(t *testing.T) { + // setup + t.Log("--- setting up ---") + require.NotEmpty(t, dynatraceToken, "DYNATRACE_METRICS_TOKEN env variable is required for dynatrace tests") + require.NotEmpty(t, dynatraceHost, "DYNATRACE_HOST env variable is required for dynatrace tests") + + kc := GetKubernetesClient(t) + data, templates := getTemplateData() + t.Cleanup(func() { + DeleteKubernetesResources(t, testNamespace, data, templates) + }) + + // Create kubernetes resources + CreateKubernetesResources(t, kc, testNamespace, data, templates) + + assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, minReplicaCount, 60, 1), + "replica count should be %s after a minute", minReplicaCount) + + // test scaling + testActivation(t, kc) + testScaleOut(t, kc) + testScaleIn(t, kc) +} + +func testActivation(t *testing.T, kc *kubernetes.Clientset) { + t.Log("--- testing activation ---") + stopCh := make(chan struct{}) + go setMetricValue(t, 1, stopCh) + AssertReplicaCountNotChangeDuringTimePeriod(t, kc, deploymentName, testNamespace, minReplicaCount, 120) + close(stopCh) +} + +func testScaleOut(t *testing.T, kc *kubernetes.Clientset) { + t.Log("--- testing scale out ---") + stopCh := make(chan struct{}) + go setMetricValue(t, 10, stopCh) + assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, maxReplicaCount, 60, 3), + "replica count should be %d after 2 minutes", maxReplicaCount) + close(stopCh) +} + +func testScaleIn(t *testing.T, kc *kubernetes.Clientset) { + t.Log("--- testing scale in ---") + + stopCh := make(chan struct{}) + go setMetricValue(t, 0, stopCh) + assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, minReplicaCount, 60, 3), + "replica count should be %d after 2 minutes", minReplicaCount) + close(stopCh) +} + +func setMetricValue(t *testing.T, value float64, stopCh <-chan struct{}) { + metric := fmt.Sprintf("%s %f", dynatraceMetricName, value) + for { + select { + case <-stopCh: + return + default: + time.Sleep(time.Second) + req, err := http.NewRequest("POST", dynatraceInjestHost, bytes.NewBufferString(metric)) + req.Header.Add("'Content-Type", "text/plain") + if err != nil { + t.Log("Invalid injection request") + continue + } + req.Header.Add("Authorization", fmt.Sprintf("Api-Token %s", dynatraceToken)) + r, err := http.DefaultClient.Do(req) + if err != nil { + t.Log("Error executing request") + continue + } + defer r.Body.Close() + if r.StatusCode != http.StatusAccepted { + msg := fmt.Sprintf("%s: api returned %d", r.Request.URL.Path, r.StatusCode) + t.Log(msg) + } + } + } +} + +func getTemplateData() (templateData, []Template) { + return templateData{ + TestNamespace: testNamespace, + DeploymentName: deploymentName, + TriggerAuthName: triggerAuthName, + ScaledObjectName: scaledObjectName, + SecretName: secretName, + MinReplicaCount: fmt.Sprintf("%v", minReplicaCount), + MaxReplicaCount: fmt.Sprintf("%v", maxReplicaCount), + DynatraceToken: base64.StdEncoding.EncodeToString([]byte(dynatraceToken)), + DynatraceHost: dynatraceHost, + MetricName: dynatraceMetricName, + }, []Template{ + {Name: "secretTemplate", Config: secretTemplate}, + {Name: "triggerAuthenticationTemplate", Config: triggerAuthenticationTemplate}, + {Name: "deploymentTemplate", Config: deploymentTemplate}, + {Name: "scaledObjectTemplate", Config: scaledObjectTemplate}, + } +}