From 019e141880c6ed06c721564e0ebb0d57aa8ec0de Mon Sep 17 00:00:00 2001 From: Rushen Wang <45029442+dovics@users.noreply.github.com> Date: Mon, 21 Oct 2024 15:58:47 +0800 Subject: [PATCH] Refactor InfluxDB scaler config (#6240) * Refactor InfluxDB scaler config Signed-off-by: wangrushen * fix: typo in influxdv unsafessl Signed-off-by: wangrushen --------- Signed-off-by: wangrushen --- pkg/scalers/influxdb_scaler.go | 131 +++++----------------------- pkg/scalers/influxdb_scaler_test.go | 2 +- 2 files changed, 25 insertions(+), 108 deletions(-) diff --git a/pkg/scalers/influxdb_scaler.go b/pkg/scalers/influxdb_scaler.go index 4f552999ed8..fba5c841a7c 100644 --- a/pkg/scalers/influxdb_scaler.go +++ b/pkg/scalers/influxdb_scaler.go @@ -3,7 +3,6 @@ package scalers import ( "context" "fmt" - "strconv" "github.com/go-logr/logr" influxdb2 "github.com/influxdata/influxdb-client-go/v2" @@ -23,14 +22,15 @@ type influxDBScaler struct { } type influxDBMetadata struct { - authToken string - organizationName string - query string - serverURL string - unsafeSsl bool - thresholdValue float64 - activationThresholdValue float64 - triggerIndex int + AuthToken string `keda:"name=authToken, order=triggerMetadata;resolvedEnv;authParams"` + OrganizationName string `keda:"name=organizationName, order=triggerMetadata;resolvedEnv;authParams"` + Query string `keda:"name=query, order=triggerMetadata"` + ServerURL string `keda:"name=serverURL, order=triggerMetadata;authParams"` + UnsafeSsl bool `keda:"name=unsafeSsl, order=triggerMetadata, optional"` + ThresholdValue float64 `keda:"name=thresholdValue, order=triggerMetadata, optional"` + ActivationThresholdValue float64 `keda:"name=activationThresholdValue, order=triggerMetadata, optional"` + + triggerIndex int } // NewInfluxDBScaler creates a new influx db scaler @@ -49,9 +49,9 @@ func NewInfluxDBScaler(config *scalersconfig.ScalerConfig) (Scaler, error) { logger.Info("starting up influxdb client") client := influxdb2.NewClientWithOptions( - meta.serverURL, - meta.authToken, - influxdb2.DefaultOptions().SetTLSConfig(util.CreateTLSClientConfig(meta.unsafeSsl))) + meta.ServerURL, + meta.AuthToken, + influxdb2.DefaultOptions().SetTLSConfig(util.CreateTLSClientConfig(meta.UnsafeSsl))) return &influxDBScaler{ client: client, @@ -63,100 +63,17 @@ func NewInfluxDBScaler(config *scalersconfig.ScalerConfig) (Scaler, error) { // parseInfluxDBMetadata parses the metadata passed in from the ScaledObject config func parseInfluxDBMetadata(config *scalersconfig.ScalerConfig) (*influxDBMetadata, error) { - var authToken string - var organizationName string - var query string - var serverURL string - var unsafeSsl bool - var thresholdValue float64 - var activationThresholdValue float64 - - val, ok := config.TriggerMetadata["authToken"] - switch { - case ok && val != "": - authToken = val - case config.TriggerMetadata["authTokenFromEnv"] != "": - if val, ok := config.ResolvedEnv[config.TriggerMetadata["authTokenFromEnv"]]; ok { - authToken = val - } else { - return nil, fmt.Errorf("no auth token given") - } - case config.AuthParams["authToken"] != "": - authToken = config.AuthParams["authToken"] - default: - return nil, fmt.Errorf("no auth token given") - } - - val, ok = config.TriggerMetadata["organizationName"] - switch { - case ok && val != "": - organizationName = val - case config.TriggerMetadata["organizationNameFromEnv"] != "": - if val, ok := config.ResolvedEnv[config.TriggerMetadata["organizationNameFromEnv"]]; ok { - organizationName = val - } else { - return nil, fmt.Errorf("no organization name given") - } - case config.AuthParams["organizationName"] != "": - organizationName = config.AuthParams["organizationName"] - default: - return nil, fmt.Errorf("no organization name given") - } - - if val, ok := config.TriggerMetadata["query"]; ok { - query = val - } else { - return nil, fmt.Errorf("no query provided") - } - - if val, ok := config.TriggerMetadata["serverURL"]; ok { - serverURL = val - } else if val, ok := config.AuthParams["serverURL"]; ok { - serverURL = val - } else { - return nil, fmt.Errorf("no server url given") - } - - if val, ok := config.TriggerMetadata["activationThresholdValue"]; ok { - value, err := strconv.ParseFloat(val, 64) - if err != nil { - return nil, fmt.Errorf("activationThresholdValue: failed to parse activationThresholdValue %w", err) - } - activationThresholdValue = value + meta := &influxDBMetadata{} + meta.triggerIndex = config.TriggerIndex + if err := config.TypedConfig(meta); err != nil { + return nil, fmt.Errorf("error parsing influxdb metadata: %w", err) } - if val, ok := config.TriggerMetadata["thresholdValue"]; ok { - value, err := strconv.ParseFloat(val, 64) - if err != nil { - return nil, fmt.Errorf("thresholdValue: failed to parse thresholdValue length %w", err) - } - thresholdValue = value - } else { - if config.AsMetricSource { - thresholdValue = 0 - } else { - return nil, fmt.Errorf("no threshold value given") - } - } - unsafeSsl = false - if val, ok := config.TriggerMetadata["unsafeSsl"]; ok { - parsedVal, err := strconv.ParseBool(val) - if err != nil { - return nil, fmt.Errorf("error parsing unsafeSsl: %w", err) - } - unsafeSsl = parsedVal + if meta.ThresholdValue == 0 && !config.AsMetricSource { + return nil, fmt.Errorf("no threshold value given") } - return &influxDBMetadata{ - authToken: authToken, - organizationName: organizationName, - query: query, - serverURL: serverURL, - thresholdValue: thresholdValue, - activationThresholdValue: activationThresholdValue, - unsafeSsl: unsafeSsl, - triggerIndex: config.TriggerIndex, - }, nil + return meta, nil } // Close closes the connection of the client to the server @@ -192,25 +109,25 @@ func queryInfluxDB(ctx context.Context, queryAPI api.QueryAPI, query string) (fl // GetMetricsAndActivity connects to influxdb via the client and returns a value based on the query func (s *influxDBScaler) GetMetricsAndActivity(ctx context.Context, metricName string) ([]external_metrics.ExternalMetricValue, bool, error) { // Grab QueryAPI to make queries to influxdb instance - queryAPI := s.client.QueryAPI(s.metadata.organizationName) + queryAPI := s.client.QueryAPI(s.metadata.OrganizationName) - value, err := queryInfluxDB(ctx, queryAPI, s.metadata.query) + value, err := queryInfluxDB(ctx, queryAPI, s.metadata.Query) if err != nil { return []external_metrics.ExternalMetricValue{}, false, err } metric := GenerateMetricInMili(metricName, value) - return []external_metrics.ExternalMetricValue{metric}, value > s.metadata.activationThresholdValue, nil + return []external_metrics.ExternalMetricValue{metric}, value > s.metadata.ActivationThresholdValue, nil } // GetMetricSpecForScaling returns the metric spec for the Horizontal Pod Autoscaler func (s *influxDBScaler) GetMetricSpecForScaling(context.Context) []v2.MetricSpec { externalMetric := &v2.ExternalMetricSource{ Metric: v2.MetricIdentifier{ - Name: GenerateMetricNameWithIndex(s.metadata.triggerIndex, util.NormalizeString(fmt.Sprintf("influxdb-%s", s.metadata.organizationName))), + Name: GenerateMetricNameWithIndex(s.metadata.triggerIndex, util.NormalizeString(fmt.Sprintf("influxdb-%s", s.metadata.OrganizationName))), }, - Target: GetMetricTargetMili(s.metricType, s.metadata.thresholdValue), + Target: GetMetricTargetMili(s.metricType, s.metadata.ThresholdValue), } metricSpec := v2.MetricSpec{ External: externalMetric, Type: externalMetricType, diff --git a/pkg/scalers/influxdb_scaler_test.go b/pkg/scalers/influxdb_scaler_test.go index d238a222a54..6cf23680cd0 100644 --- a/pkg/scalers/influxdb_scaler_test.go +++ b/pkg/scalers/influxdb_scaler_test.go @@ -46,7 +46,7 @@ var testInfluxDBMetadata = []parseInfluxDBMetadataTestData{ {map[string]string{"serverURL": "https://influxdata.com", "organizationName": "influx_org", "query": "from(bucket: hello)", "thresholdValue": "10", "unsafeSsl": "false"}, true, map[string]string{}}, // 9 authToken, organizationName, and serverURL are defined in authParams {map[string]string{"query": "from(bucket: hello)", "thresholdValue": "10", "unsafeSsl": "false"}, false, map[string]string{"serverURL": "https://influxdata.com", "organizationName": "influx_org", "authToken": "myToken"}}, - // 10 no sunsafeSsl value passed + // 10 no unsafeSsl value passed {map[string]string{"serverURL": "https://influxdata.com", "metricName": "influx_metric", "organizationName": "influx_org", "query": "from(bucket: hello)", "thresholdValue": "10", "authToken": "myToken"}, false, map[string]string{}}, // 11 wrong activationThreshold valuequeryInfluxDB {map[string]string{"serverURL": "https://influxdata.com", "metricName": "influx_metric", "organizationName": "influx_org", "query": "from(bucket: hello)", "thresholdValue": "10", "activationThresholdValue": "aa", "authToken": "myToken", "unsafeSsl": "false"}, true, map[string]string{}},