diff --git a/CHANGELOG.md b/CHANGELOG.md index d17e633edf2..b5de70b764b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -58,7 +58,7 @@ To learn more about active deprecations, we recommend checking [GitHub Discussio Here is an overview of all new **experimental** features: -- **General**: Add support for formula based evaluation of metric values ([#2440](https://github.com/kedacore/keda/issues/2440)) +- **General**: Add support for formula based evaluation of metric values ([#2440](https://github.com/kedacore/keda/issues/2440)|[#4998](https://github.com/kedacore/keda/pull/4998)) ### Improvements - **General**: Add apiserver Prometheus metrics to KEDA Metric Server ([#4460](https://github.com/kedacore/keda/issues/4460)) diff --git a/apis/keda/v1alpha1/scaledobject_types.go b/apis/keda/v1alpha1/scaledobject_types.go index c674fd2ae72..08cc0187c20 100644 --- a/apis/keda/v1alpha1/scaledobject_types.go +++ b/apis/keda/v1alpha1/scaledobject_types.go @@ -117,6 +117,10 @@ type AdvancedConfig struct { type ScalingModifiers struct { Formula string `json:"formula,omitempty"` Target string `json:"target,omitempty"` + // +optional + ActivationTarget string `json:"activationTarget,omitempty"` + // +optional + MetricType autoscalingv2.MetricTargetType `json:"metricType,omitempty"` } // HorizontalPodAutoscalerConfig specifies horizontal scale config diff --git a/apis/keda/v1alpha1/scaledobject_webhook.go b/apis/keda/v1alpha1/scaledobject_webhook.go index d7b9abea6d4..071630f2e16 100644 --- a/apis/keda/v1alpha1/scaledobject_webhook.go +++ b/apis/keda/v1alpha1/scaledobject_webhook.go @@ -392,31 +392,7 @@ func validateScalingModifiersTarget(so *ScaledObject) error { return fmt.Errorf("error converting target for scalingModifiers (string->float) to valid target: %w", err) } - // if target is given, composite-scaler will be passed to HPA -> all types - // need to be the same - make sure all metrics are of the same metricTargetType - - var trigType autoscalingv2.MetricTargetType - - // gauron99: possible TODO: more sofisticated check for trigger could be used here - // as well if solution is found (check just the right triggers that are used) - for _, trig := range so.Spec.Triggers { - if trig.Type == cpuString || trig.Type == memoryString || trig.Name == "" { - continue - } - var current autoscalingv2.MetricTargetType - if trig.MetricType == "" { - current = autoscalingv2.AverageValueMetricType // default is AverageValue - } else { - current = trig.MetricType - } - if trigType == "" { - trigType = current - } else if trigType != current { - err := fmt.Errorf("error trigger types are not the same for composite scaler: %s & %s", trigType, current) - return err - } - } - if trigType == autoscalingv2.UtilizationMetricType { + if so.Spec.Advanced.ScalingModifiers.MetricType == autoscalingv2.UtilizationMetricType { err := fmt.Errorf("error trigger type is Utilization, but it needs to be AverageValue or Value for external metrics") return err } diff --git a/apis/keda/v1alpha1/scaledobject_webhook_test.go b/apis/keda/v1alpha1/scaledobject_webhook_test.go index c20ad35d644..c02361ac432 100644 --- a/apis/keda/v1alpha1/scaledobject_webhook_test.go +++ b/apis/keda/v1alpha1/scaledobject_webhook_test.go @@ -18,6 +18,7 @@ package v1alpha1 import ( "context" + "time" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" @@ -416,7 +417,9 @@ var _ = It("shouldn't create so when stabilizationWindowSeconds exceeds 3600", f Eventually(func() error { return k8sClient.Create(context.Background(), so) - }).Should(HaveOccurred()) + }). + WithTimeout(5 * time.Second). + Should(HaveOccurred()) }) var _ = It("should validate the so creation with ScalingModifiers.Formula", func() { diff --git a/config/crd/bases/keda.sh_scaledobjects.yaml b/config/crd/bases/keda.sh_scaledobjects.yaml index 86ab804d13e..c35bb68a1e2 100644 --- a/config/crd/bases/keda.sh_scaledobjects.yaml +++ b/config/crd/bases/keda.sh_scaledobjects.yaml @@ -207,8 +207,15 @@ spec: description: ScalingModifiers describes advanced scaling logic options like formula properties: + activationTarget: + type: string formula: type: string + metricType: + description: MetricTargetType specifies the type of metric + being targeted, and should be either "Value", "AverageValue", + or "Utilization" + type: string target: type: string type: object diff --git a/controllers/keda/hpa.go b/controllers/keda/hpa.go index 8f0e470782f..4dc9d3d08d9 100644 --- a/controllers/keda/hpa.go +++ b/controllers/keda/hpa.go @@ -258,19 +258,12 @@ func (r *ScaledObjectReconciler) getScaledObjectMetricSpecs(ctx context.Context, validNumTarget, _ := strconv.ParseFloat(scaledObject.Spec.Advanced.ScalingModifiers.Target, 64) // check & get metric specs type - var validMetricType autoscalingv2.MetricTargetType - for _, metric := range metricSpecs { - if metric.External == nil { - continue - } - if validMetricType == "" { - validMetricType = metric.External.Target.Type - } else if metric.External.Target.Type != validMetricType { - err := fmt.Errorf("error metric target type is not the same for composite scaler: %s & %s", validMetricType, metric.External.Target.Type) - return nil, err - } + metricType := autoscalingv2.AverageValueMetricType + if scaledObject.Spec.Advanced.ScalingModifiers.MetricType != "" { + metricType = scaledObject.Spec.Advanced.ScalingModifiers.MetricType } - if validMetricType == autoscalingv2.UtilizationMetricType { + + if metricType == autoscalingv2.UtilizationMetricType { err := fmt.Errorf("error metric target type is Utilization, but it needs to be AverageValue or Value for external metrics") return nil, err } @@ -280,11 +273,11 @@ func (r *ScaledObjectReconciler) getScaledObjectMetricSpecs(ctx context.Context, quan := resource.NewMilliQuantity(int64(validNumTarget*1000), resource.DecimalSI) correctHpaTarget := autoscalingv2.MetricTarget{ - Type: validMetricType, + Type: metricType, } - if validMetricType == autoscalingv2.AverageValueMetricType { + if metricType == autoscalingv2.AverageValueMetricType { correctHpaTarget.AverageValue = quan - } else if validMetricType == autoscalingv2.ValueMetricType { + } else if metricType == autoscalingv2.ValueMetricType { correctHpaTarget.Value = quan } compMetricName := kedav1alpha1.CompositeMetricName diff --git a/pkg/eventreason/eventreason.go b/pkg/eventreason/eventreason.go index 6b56ef79abd..6fbc854ddc6 100644 --- a/pkg/eventreason/eventreason.go +++ b/pkg/eventreason/eventreason.go @@ -50,6 +50,9 @@ const ( // KEDAScalerFailed is for event when a scaler fails for a ScaledJob or a ScaledObject KEDAScalerFailed = "KEDAScalerFailed" + // KEDAMetricSourceFailed is for event when a scaler fails as metric source for custom formula + KEDAMetricSourceFailed = "KEDAMetricSourceFailed" + // KEDAScaleTargetActivated is for event when the scale target of ScaledObject was activated KEDAScaleTargetActivated = "KEDAScaleTargetActivated" diff --git a/pkg/scalers/arangodb_scaler.go b/pkg/scalers/arangodb_scaler.go index 1a7208f5ee1..a9dd1f558fa 100644 --- a/pkg/scalers/arangodb_scaler.go +++ b/pkg/scalers/arangodb_scaler.go @@ -155,7 +155,11 @@ func parseArangoDBMetadata(config *ScalerConfig) (*arangoDBMetadata, error) { } meta.queryValue = queryValue } else { - return nil, fmt.Errorf("no queryValue given") + if config.AsMetricSource { + meta.queryValue = 0 + } else { + return nil, fmt.Errorf("no queryValue given") + } } meta.activationQueryValue = 0 diff --git a/pkg/scalers/aws_dynamodb_scaler.go b/pkg/scalers/aws_dynamodb_scaler.go index f4f82027092..05b173fd1ab 100644 --- a/pkg/scalers/aws_dynamodb_scaler.go +++ b/pkg/scalers/aws_dynamodb_scaler.go @@ -152,7 +152,11 @@ func parseAwsDynamoDBMetadata(config *ScalerConfig) (*awsDynamoDBMetadata, error meta.targetValue = n } else { - return nil, ErrAwsDynamoNoTargetValue + if config.AsMetricSource { + meta.targetValue = 0 + } else { + return nil, ErrAwsDynamoNoTargetValue + } } if val, ok := config.TriggerMetadata["activationTargetValue"]; ok && val != "" { diff --git a/pkg/scalers/azure_app_insights_scaler.go b/pkg/scalers/azure_app_insights_scaler.go index d704bd3bde4..b4a164dd58f 100644 --- a/pkg/scalers/azure_app_insights_scaler.go +++ b/pkg/scalers/azure_app_insights_scaler.go @@ -79,7 +79,11 @@ func parseAzureAppInsightsMetadata(config *ScalerConfig, logger logr.Logger) (*a val, err := getParameterFromConfig(config, azureAppInsightsTargetValueName, false) if err != nil { - return nil, err + if config.AsMetricSource { + meta.targetValue = 0 + } else { + return nil, err + } } targetValue, err := strconv.ParseFloat(val, 64) if err != nil { diff --git a/pkg/scalers/azure_log_analytics_scaler.go b/pkg/scalers/azure_log_analytics_scaler.go index 77cb7b74a39..167e0c83d66 100644 --- a/pkg/scalers/azure_log_analytics_scaler.go +++ b/pkg/scalers/azure_log_analytics_scaler.go @@ -182,7 +182,11 @@ func parseAzureLogAnalyticsMetadata(config *ScalerConfig) (*azureLogAnalyticsMet // Getting threshold, observe that we don't check AuthParams for threshold val, err := getParameterFromConfig(config, "threshold", false) if err != nil { - return nil, err + if config.AsMetricSource { + val = "0" + } else { + return nil, err + } } threshold, err := strconv.ParseFloat(val, 64) if err != nil { diff --git a/pkg/scalers/azure_monitor_scaler.go b/pkg/scalers/azure_monitor_scaler.go index f6a044814ec..5f66854ad33 100644 --- a/pkg/scalers/azure_monitor_scaler.go +++ b/pkg/scalers/azure_monitor_scaler.go @@ -87,7 +87,11 @@ func parseAzureMonitorMetadata(config *ScalerConfig, logger logr.Logger) (*azure } meta.targetValue = targetValue } else { - return nil, fmt.Errorf("no targetValue given") + if config.AsMetricSource { + meta.targetValue = 0 + } else { + return nil, fmt.Errorf("no targetValue given") + } } if val, ok := config.TriggerMetadata[activationTargetValueName]; ok && val != "" { diff --git a/pkg/scalers/cassandra_scaler.go b/pkg/scalers/cassandra_scaler.go index 3ee539d1623..976f6359047 100644 --- a/pkg/scalers/cassandra_scaler.go +++ b/pkg/scalers/cassandra_scaler.go @@ -82,7 +82,11 @@ func parseCassandraMetadata(config *ScalerConfig) (*CassandraMetadata, error) { } meta.targetQueryValue = targetQueryValue } else { - return nil, fmt.Errorf("no targetQueryValue given") + if config.AsMetricSource { + meta.targetQueryValue = 0 + } else { + return nil, fmt.Errorf("no targetQueryValue given") + } } meta.activationTargetQueryValue = 0 diff --git a/pkg/scalers/couchdb_scaler.go b/pkg/scalers/couchdb_scaler.go index 507a5848f97..3025e807ec7 100644 --- a/pkg/scalers/couchdb_scaler.go +++ b/pkg/scalers/couchdb_scaler.go @@ -114,7 +114,11 @@ func parseCouchDBMetadata(config *ScalerConfig) (*couchDBMetadata, string, error } meta.queryValue = queryValue } else { - return nil, "", fmt.Errorf("no queryValue given") + if config.AsMetricSource { + meta.queryValue = 0 + } else { + return nil, "", fmt.Errorf("no queryValue given") + } } meta.activationQueryValue = 0 diff --git a/pkg/scalers/datadog_scaler.go b/pkg/scalers/datadog_scaler.go index e021bfa7cc9..8e643fe118d 100644 --- a/pkg/scalers/datadog_scaler.go +++ b/pkg/scalers/datadog_scaler.go @@ -143,7 +143,11 @@ func parseDatadogMetadata(config *ScalerConfig, logger logr.Logger) (*datadogMet } meta.queryValue = queryValue } else { - return nil, fmt.Errorf("no queryValue given") + if config.AsMetricSource { + meta.queryValue = 0 + } else { + return nil, fmt.Errorf("no queryValue given") + } } if val, ok := config.TriggerMetadata["queryAggregator"]; ok && val != "" { diff --git a/pkg/scalers/elasticsearch_scaler.go b/pkg/scalers/elasticsearch_scaler.go index f06827b9f44..6e72ea4ce5b 100644 --- a/pkg/scalers/elasticsearch_scaler.go +++ b/pkg/scalers/elasticsearch_scaler.go @@ -199,7 +199,11 @@ func parseElasticsearchMetadata(config *ScalerConfig) (*elasticsearchMetadata, e targetValueString, err := GetFromAuthOrMeta(config, "targetValue") if err != nil { - return nil, err + if config.AsMetricSource { + targetValueString = "0" + } else { + return nil, err + } } targetValue, err := strconv.ParseFloat(targetValueString, 64) if err != nil { diff --git a/pkg/scalers/gcp_cloud_tasks_scaler.go b/pkg/scalers/gcp_cloud_tasks_scaler.go index 88a3382702c..e482824b68e 100644 --- a/pkg/scalers/gcp_cloud_tasks_scaler.go +++ b/pkg/scalers/gcp_cloud_tasks_scaler.go @@ -73,7 +73,6 @@ func parseGcpCloudTasksMetadata(config *ScalerConfig) (*gcpCloudTaskMetadata, er if val == "" { return nil, fmt.Errorf("no queue name given") } - meta.queueName = val } else { return nil, fmt.Errorf("no queue name given") diff --git a/pkg/scalers/influxdb_scaler.go b/pkg/scalers/influxdb_scaler.go index 36aa59230e1..8522607cee3 100644 --- a/pkg/scalers/influxdb_scaler.go +++ b/pkg/scalers/influxdb_scaler.go @@ -131,7 +131,11 @@ func parseInfluxDBMetadata(config *ScalerConfig) (*influxDBMetadata, error) { } thresholdValue = value } else { - return nil, fmt.Errorf("no threshold value given") + if config.AsMetricSource { + thresholdValue = 0 + } else { + return nil, fmt.Errorf("no threshold value given") + } } unsafeSsl = false if val, ok := config.TriggerMetadata["unsafeSsl"]; ok { diff --git a/pkg/scalers/kubernetes_workload_scaler.go b/pkg/scalers/kubernetes_workload_scaler.go index 342a8956cee..3e2bc8fca91 100644 --- a/pkg/scalers/kubernetes_workload_scaler.go +++ b/pkg/scalers/kubernetes_workload_scaler.go @@ -73,7 +73,11 @@ func parseWorkloadMetadata(config *ScalerConfig) (*kubernetesWorkloadMetadata, e meta.podSelector = podSelector value, err := strconv.ParseFloat(config.TriggerMetadata[valueKey], 64) if err != nil || value == 0 { - return nil, fmt.Errorf("value must be a float greater than 0") + if config.AsMetricSource { + value = 0 + } else { + return nil, fmt.Errorf("value must be a float greater than 0") + } } meta.value = value diff --git a/pkg/scalers/loki_scaler.go b/pkg/scalers/loki_scaler.go index 601bffa5f18..f5e62dcd45e 100644 --- a/pkg/scalers/loki_scaler.go +++ b/pkg/scalers/loki_scaler.go @@ -110,7 +110,11 @@ func parseLokiMetadata(config *ScalerConfig) (meta *lokiMetadata, err error) { meta.threshold = t } else { - return nil, fmt.Errorf("no %s given", lokiThreshold) + if config.AsMetricSource { + meta.threshold = 0 + } else { + return nil, fmt.Errorf("no %s given", lokiThreshold) + } } meta.activationThreshold = 0 diff --git a/pkg/scalers/metrics_api_scaler.go b/pkg/scalers/metrics_api_scaler.go index 668288e797c..350f3981eee 100644 --- a/pkg/scalers/metrics_api_scaler.go +++ b/pkg/scalers/metrics_api_scaler.go @@ -114,7 +114,11 @@ func parseMetricsAPIMetadata(config *ScalerConfig) (*metricsAPIScalerMetadata, e } meta.targetValue = targetValue } else { - return nil, fmt.Errorf("no targetValue given in metadata") + if config.AsMetricSource { + meta.targetValue = 0 + } else { + return nil, fmt.Errorf("no targetValue given in metadata") + } } meta.activationTargetValue = 0 diff --git a/pkg/scalers/mongo_scaler.go b/pkg/scalers/mongo_scaler.go index 61328f0ab7b..271e5567c3b 100644 --- a/pkg/scalers/mongo_scaler.go +++ b/pkg/scalers/mongo_scaler.go @@ -131,7 +131,11 @@ func parseMongoDBMetadata(config *ScalerConfig) (*mongoDBMetadata, string, error } meta.queryValue = queryValue } else { - return nil, "", fmt.Errorf("no queryValue given") + if config.AsMetricSource { + meta.queryValue = 0 + } else { + return nil, "", fmt.Errorf("no queryValue given") + } } meta.activationQueryValue = 0 diff --git a/pkg/scalers/mssql_scaler.go b/pkg/scalers/mssql_scaler.go index 285b6e9acb6..353420d326e 100644 --- a/pkg/scalers/mssql_scaler.go +++ b/pkg/scalers/mssql_scaler.go @@ -113,7 +113,11 @@ func parseMSSQLMetadata(config *ScalerConfig) (*mssqlMetadata, error) { } meta.targetValue = targetValue } else { - return nil, ErrMsSQLNoTargetValue + if config.AsMetricSource { + meta.targetValue = 0 + } else { + return nil, ErrMsSQLNoTargetValue + } } // Activation target value diff --git a/pkg/scalers/mysql_scaler.go b/pkg/scalers/mysql_scaler.go index 5ba4b5eedc5..aa5154efd3f 100644 --- a/pkg/scalers/mysql_scaler.go +++ b/pkg/scalers/mysql_scaler.go @@ -78,7 +78,11 @@ func parseMySQLMetadata(config *ScalerConfig) (*mySQLMetadata, error) { } meta.queryValue = queryValue } else { - return nil, fmt.Errorf("no queryValue given") + if config.AsMetricSource { + meta.queryValue = 0 + } else { + return nil, fmt.Errorf("no queryValue given") + } } meta.activationQueryValue = 0 diff --git a/pkg/scalers/newrelic_scaler.go b/pkg/scalers/newrelic_scaler.go index 5090732da02..8aef0a71de8 100644 --- a/pkg/scalers/newrelic_scaler.go +++ b/pkg/scalers/newrelic_scaler.go @@ -116,7 +116,11 @@ func parseNewRelicMetadata(config *ScalerConfig, logger logr.Logger) (*newrelicM } meta.threshold = t } else { - return nil, fmt.Errorf("missing %s value", threshold) + if config.AsMetricSource { + meta.threshold = 0 + } else { + return nil, fmt.Errorf("missing %s value", threshold) + } } meta.activationThreshold = 0 diff --git a/pkg/scalers/postgresql_scaler.go b/pkg/scalers/postgresql_scaler.go index e9aa6d562a5..b0f2551fbef 100644 --- a/pkg/scalers/postgresql_scaler.go +++ b/pkg/scalers/postgresql_scaler.go @@ -73,7 +73,11 @@ func parsePostgreSQLMetadata(config *ScalerConfig) (*postgreSQLMetadata, error) } meta.targetQueryValue = targetQueryValue } else { - return nil, fmt.Errorf("no targetQueryValue given") + if config.AsMetricSource { + meta.targetQueryValue = 0 + } else { + return nil, fmt.Errorf("no targetQueryValue given") + } } meta.activationTargetQueryValue = 0 diff --git a/pkg/scalers/predictkube_scaler.go b/pkg/scalers/predictkube_scaler.go index 4ea7341a088..336a4eed342 100644 --- a/pkg/scalers/predictkube_scaler.go +++ b/pkg/scalers/predictkube_scaler.go @@ -405,7 +405,11 @@ func parsePredictKubeMetadata(config *ScalerConfig) (result *predictKubeMetadata } meta.threshold = threshold } else { - return nil, fmt.Errorf("no threshold given") + if config.AsMetricSource { + meta.threshold = 0 + } else { + return nil, fmt.Errorf("no threshold given") + } } meta.activationThreshold = 0 diff --git a/pkg/scalers/prometheus_scaler.go b/pkg/scalers/prometheus_scaler.go index 4cde60ec88c..f80e801207b 100644 --- a/pkg/scalers/prometheus_scaler.go +++ b/pkg/scalers/prometheus_scaler.go @@ -159,7 +159,11 @@ func parsePrometheusMetadata(config *ScalerConfig) (meta *prometheusMetadata, er meta.threshold = t } else { - return nil, fmt.Errorf("no %s given", promThreshold) + if config.AsMetricSource { + meta.threshold = 0 + } else { + return nil, fmt.Errorf("no %s given", promThreshold) + } } meta.activationThreshold = 0 diff --git a/pkg/scalers/scaler.go b/pkg/scalers/scaler.go index b90bced0be6..3aaa1bad11a 100644 --- a/pkg/scalers/scaler.go +++ b/pkg/scalers/scaler.go @@ -99,6 +99,9 @@ type ScalerConfig struct { // MetricType MetricType v2.MetricTargetType + + // When we use the scaler for composite scaler, we shouldn't require the value because it'll be ignored + AsMetricSource bool } var ( diff --git a/pkg/scalers/solr_scaler.go b/pkg/scalers/solr_scaler.go index 486eb0c4cd7..7188286901a 100644 --- a/pkg/scalers/solr_scaler.go +++ b/pkg/scalers/solr_scaler.go @@ -93,7 +93,11 @@ func parseSolrMetadata(config *ScalerConfig) (*solrMetadata, error) { } meta.targetQueryValue = targetQueryValue } else { - return nil, fmt.Errorf("no targetQueryValue given") + if config.AsMetricSource { + meta.targetQueryValue = 0 + } else { + return nil, fmt.Errorf("no targetQueryValue given") + } } meta.activationTargetQueryValue = 0 diff --git a/pkg/scaling/scale_handler.go b/pkg/scaling/scale_handler.go index db9bc8b2adb..d95f3ab2c0c 100644 --- a/pkg/scaling/scale_handler.go +++ b/pkg/scaling/scale_handler.go @@ -19,10 +19,12 @@ package scaling import ( "context" "fmt" + "strconv" "strings" "sync" "time" + "github.com/go-logr/logr" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" @@ -354,7 +356,14 @@ func (h *scaleHandler) performGetScalersCache(ctx context.Context, key string, s return nil, err } - scalers, err := h.buildScalers(ctx, withTriggers, podTemplateSpec, containerName) + asMetricSource := false + switch obj := scalableObject.(type) { + case *kedav1alpha1.ScaledObject: + asMetricSource = obj.IsUsingModifiers() + default: + } + + scalers, err := h.buildScalers(ctx, withTriggers, podTemplateSpec, containerName, asMetricSource) if err != nil { return nil, err } @@ -553,8 +562,10 @@ func (h *scaleHandler) getScaledObjectState(ctx context.Context, scaledObject *k logger := log.WithValues("scaledObject.Namespace", scaledObject.Namespace, "scaledObject.Name", scaledObject.Name) isScaledObjectActive := false - isScalerError := false + isScaledObjectError := false metricsRecord := map[string]metricscache.MetricsRecord{} + metricTriggerPairList := make(map[string]string) + var matchingMetrics []external_metrics.ExternalMetricValue cache, err := h.GetScalersCache(ctx, scaledObject) prommetrics.RecordScaledObjectError(scaledObject.Namespace, scaledObject.Name, err) @@ -576,57 +587,147 @@ func (h *scaleHandler) getScaledObjectState(ctx context.Context, scaledObject *k // Let's collect status of all scalers, no matter if any scaler raises error or is active scalers, scalerConfigs := cache.GetScalers() for scalerIndex := 0; scalerIndex < len(scalers); scalerIndex++ { - scalerName := strings.Replace(fmt.Sprintf("%T", scalers[scalerIndex]), "*scalers.", "", 1) - if scalerConfigs[scalerIndex].TriggerName != "" { - scalerName = scalerConfigs[scalerIndex].TriggerName + result := h.getScalerState(ctx, scalers[scalerIndex], scalerIndex, scalerConfigs[scalerIndex], cache, logger, scaledObject) + if !isScaledObjectActive { + isScaledObjectActive = result.IsActive + } + if !isScaledObjectError { + isScaledObjectError = result.IsError + } + matchingMetrics = append(matchingMetrics, result.Metrics...) + for k, v := range result.Pairs { + metricTriggerPairList[k] = v + } + for k, v := range result.Records { + metricsRecord[k] = v } + } - metricSpecs, err := cache.GetMetricSpecForScalingForScaler(ctx, scalerIndex) + // invalidate the cache for the ScaledObject, if we hit an error in any scaler + // in this case we try to build all scalers (and resolve all secrets/creds) again in the next call + if isScaledObjectError { + err := h.ClearScalersCache(ctx, scaledObject) if err != nil { - isScalerError = true - logger.Error(err, "error getting metric spec for the scaler", "scaler", scalerName) - cache.Recorder.Event(scaledObject, corev1.EventTypeWarning, eventreason.KEDAScalerFailed, err.Error()) + logger.Error(err, "error clearing scalers cache") } + logger.V(1).Info("scaler error encountered, clearing scaler cache") + } - for _, spec := range metricSpecs { - // if cpu/memory resource scaler has minReplicas==0 & at least one external - // trigger exists -> object can be scaled to zero - if spec.External == nil { - if len(scaledObject.Spec.Triggers) <= cpuMemCount { - isScaledObjectActive = true + // apply scaling modifiers + matchingMetrics = modifiers.HandleScalingModifiers(scaledObject, matchingMetrics, metricTriggerPairList, false, cache, logger) + + // when we are using formula, we need to reevaluate if it's active here + if scaledObject.IsUsingModifiers() { + // we need to reset the activity even if there is an error + isScaledObjectActive = false + if !isScaledObjectError { + activationValue := float64(0) + if scaledObject.Spec.Advanced.ScalingModifiers.ActivationTarget != "" { + targetValue, err := strconv.ParseFloat(scaledObject.Spec.Advanced.ScalingModifiers.ActivationTarget, 64) + if err != nil { + return false, true, metricsRecord, fmt.Errorf("scalingModifiers.ActivationTarget parsing error %w", err) } - continue + activationValue = targetValue } - metricName := spec.External.Metric.Name - - var latency int64 - metrics, isMetricActive, latency, err := cache.GetMetricsAndActivityForScaler(ctx, scalerIndex, metricName) - if latency != -1 { - prommetrics.RecordScalerLatency(scaledObject.Namespace, scaledObject.Name, scalerName, scalerIndex, metricName, float64(latency)) + for _, metric := range matchingMetrics { + value := metric.Value.AsApproximateFloat64() + prommetrics.RecordScalerMetric(scaledObject.Namespace, scaledObject.Name, kedav1alpha1.CompositeMetricName, 0, metric.MetricName, value) + prommetrics.RecordScalerActive(scaledObject.Namespace, scaledObject.Name, kedav1alpha1.CompositeMetricName, 0, metric.MetricName, value > activationValue) + if !isScaledObjectActive { + isScaledObjectActive = value > activationValue + } } - logger.V(1).Info("Getting metrics and activity from scaler", "scaler", scalerName, "metricName", metricName, "metrics", metrics, "activity", isMetricActive, "scalerError", err) + } + } - if scalerConfigs[scalerIndex].TriggerUseCachedMetrics { - metricsRecord[metricName] = metricscache.MetricsRecord{ - IsActive: isMetricActive, - Metric: metrics, - ScalerError: err, - } + // cpu/memory scaler only can scale to zero if there is any other external metric because otherwise + // it'll never scale from 0. If all the triggers are only cpu/memory, we enforce the IsActive + if len(scaledObject.Spec.Triggers) <= cpuMemCount && !isScaledObjectError { + isScaledObjectActive = true + } + return isScaledObjectActive, isScaledObjectError, metricsRecord, err +} + +// scalerState is used as return +// for the function getScalerState. It contains +// the state of the scaler and all the required +// info for calculating the ScaledObjectState +type scalerState struct { + // IsActive will be overrided by formula calculation + IsActive bool + IsError bool + Metrics []external_metrics.ExternalMetricValue + Pairs map[string]string + Records map[string]metricscache.MetricsRecord +} + +// getScalerState returns getStateScalerResult with the state +// for an specific scaler. The state contains if it's active or +// with erros, but also the records for the cache and he metrics +// for the custom formulas +func (*scaleHandler) getScalerState(ctx context.Context, scaler scalers.Scaler, scalerIndex int, scalerConfig scalers.ScalerConfig, + cache *cache.ScalersCache, logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject) scalerState { + result := scalerState{ + IsActive: false, + IsError: false, + Metrics: []external_metrics.ExternalMetricValue{}, + Pairs: map[string]string{}, + Records: map[string]metricscache.MetricsRecord{}, + } + + scalerName := strings.Replace(fmt.Sprintf("%T", scaler), "*scalers.", "", 1) + if scalerConfig.TriggerName != "" { + scalerName = scalerConfig.TriggerName + } + + metricSpecs, err := cache.GetMetricSpecForScalingForScaler(ctx, scalerIndex) + if err != nil { + result.IsError = true + logger.Error(err, "error getting metric spec for the scaler", "scaler", scalerName) + cache.Recorder.Event(scaledObject, corev1.EventTypeWarning, eventreason.KEDAScalerFailed, err.Error()) + } + + for _, spec := range metricSpecs { + if spec.External == nil { + continue + } + + metricName := spec.External.Metric.Name + + var latency int64 + metrics, isMetricActive, latency, err := cache.GetMetricsAndActivityForScaler(ctx, scalerIndex, metricName) + if latency != -1 { + prommetrics.RecordScalerLatency(scaledObject.Namespace, scaledObject.Name, scalerName, scalerIndex, metricName, float64(latency)) + } + result.Metrics = append(result.Metrics, metrics...) + logger.V(1).Info("Getting metrics and activity from scaler", "scaler", scalerName, "metricName", metricName, "metrics", metrics, "activity", isMetricActive, "scalerError", err) + + if scalerConfig.TriggerUseCachedMetrics { + result.Records[metricName] = metricscache.MetricsRecord{ + IsActive: isMetricActive, + Metric: metrics, + ScalerError: err, } + } - if err != nil { - isScalerError = true + if err != nil { + result.IsError = true + if scaledObject.IsUsingModifiers() { + logger.Error(err, "error getting metric source", "source", scalerName) + cache.Recorder.Event(scaledObject, corev1.EventTypeWarning, eventreason.KEDAMetricSourceFailed, err.Error()) + } else { logger.Error(err, "error getting scale decision", "scaler", scalerName) cache.Recorder.Event(scaledObject, corev1.EventTypeWarning, eventreason.KEDAScalerFailed, err.Error()) - } else { - for _, metric := range metrics { - metricValue := metric.Value.AsApproximateFloat64() - prommetrics.RecordScalerMetric(scaledObject.Namespace, scaledObject.Name, scalerName, scalerIndex, metric.MetricName, metricValue) - } - + } + } else { + result.IsActive = isMetricActive + for _, metric := range metrics { + metricValue := metric.Value.AsApproximateFloat64() + prommetrics.RecordScalerMetric(scaledObject.Namespace, scaledObject.Name, scalerName, scalerIndex, metric.MetricName, metricValue) + } + if !scaledObject.IsUsingModifiers() { if isMetricActive { - isScaledObjectActive = true if spec.External != nil { logger.V(1).Info("Scaler for scaledObject is active", "scaler", scalerName, "metricName", metricName) } @@ -634,23 +735,16 @@ func (h *scaleHandler) getScaledObjectState(ctx context.Context, scaledObject *k logger.V(1).Info("Scaler for scaledObject is active", "scaler", scalerName, "metricName", spec.Resource.Name) } } + prommetrics.RecordScalerActive(scaledObject.Namespace, scaledObject.Name, scalerName, scalerIndex, metricName, isMetricActive) } - prommetrics.RecordScalerError(scaledObject.Namespace, scaledObject.Name, scalerName, scalerIndex, metricName, err) - prommetrics.RecordScalerActive(scaledObject.Namespace, scaledObject.Name, scalerName, scalerIndex, metricName, isMetricActive) } - } - // invalidate the cache for the ScaledObject, if we hit an error in any scaler - // in this case we try to build all scalers (and resolve all secrets/creds) again in the next call - if isScalerError { - err := h.ClearScalersCache(ctx, scaledObject) + result.Pairs, err = modifiers.AddPairTriggerAndMetric(result.Pairs, scaledObject, metricName, scalerConfig.TriggerName) if err != nil { - logger.Error(err, "error clearing scalers cache") + logger.Error(err, "error pairing triggers & metrics for compositeScaler") } - logger.V(1).Info("scaler error encountered, clearing scaler cache") } - - return isScaledObjectActive, isScalerError, metricsRecord, nil + return result } // / --------------------------------------------------------------------------- /// diff --git a/pkg/scaling/scalers_builder.go b/pkg/scaling/scalers_builder.go index b9f1a23f2e1..58cd74ac5cc 100644 --- a/pkg/scaling/scalers_builder.go +++ b/pkg/scaling/scalers_builder.go @@ -36,7 +36,7 @@ import ( /// --------------------------------------------------------------------------- /// // buildScalers returns list of Scalers for the specified triggers -func (h *scaleHandler) buildScalers(ctx context.Context, withTriggers *kedav1alpha1.WithTriggers, podTemplateSpec *corev1.PodTemplateSpec, containerName string) ([]cache.ScalerBuilder, error) { +func (h *scaleHandler) buildScalers(ctx context.Context, withTriggers *kedav1alpha1.WithTriggers, podTemplateSpec *corev1.PodTemplateSpec, containerName string, asMetricSource bool) ([]cache.ScalerBuilder, error) { logger := log.WithValues("type", withTriggers.Kind, "namespace", withTriggers.Namespace, "name", withTriggers.Name) var err error resolvedEnv := make(map[string]string) @@ -64,6 +64,7 @@ func (h *scaleHandler) buildScalers(ctx context.Context, withTriggers *kedav1alp GlobalHTTPTimeout: h.globalHTTPTimeout, ScalerIndex: triggerIndex, MetricType: trigger.MetricType, + AsMetricSource: asMetricSource, } authParams, podIdentity, err := resolver.ResolveAuthRefAndPodIdentity(ctx, h.client, logger, trigger.AuthenticationRef, podTemplateSpec, withTriggers.Namespace, h.secretsLister) diff --git a/tests/internals/scaling_modifiers/scaling_modifiers_test.go b/tests/internals/scaling_modifiers/scaling_modifiers_test.go index ab02f01ea73..c8abf95a7a5 100644 --- a/tests/internals/scaling_modifiers/scaling_modifiers_test.go +++ b/tests/internals/scaling_modifiers/scaling_modifiers_test.go @@ -143,6 +143,7 @@ spec: scalingModifiers: formula: metrics_api + kw_trig target: '2' + activationTarget: '2' pollingInterval: 5 cooldownPeriod: 5 minReplicaCount: 0 @@ -154,7 +155,6 @@ spec: - type: metrics-api name: metrics_api metadata: - targetValue: "2" url: "{{.MetricsServerEndpoint}}" valueLocation: 'value' method: "query" @@ -164,7 +164,6 @@ spec: name: kw_trig metadata: podSelector: pod=workload-test - value: '1' ` workloadDeploymentTemplate = ` @@ -238,11 +237,17 @@ func TestScalingModifiers(t *testing.T) { func testFormula(t *testing.T, kc *kubernetes.Clientset, data templateData) { t.Log("--- testFormula ---") + + // formula simply adds 2 metrics together (0+2=2; activationTarget = 2 -> replicas should be 0) + KubectlApplyWithTemplate(t, data, "soFallbackTemplate", soFallbackTemplate) + data.MetricValue = 0 + KubectlApplyWithTemplate(t, data, "updateMetricsTemplate", updateMetricsTemplate) + AssertReplicaCountNotChangeDuringTimePeriod(t, kc, deploymentName, namespace, 0, 60) + // formula simply adds 2 metrics together (3+2=5; target = 2 -> 5/2 replicas should be 3) data.MetricValue = 3 KubectlApplyWithTemplate(t, data, "updateMetricsTemplate", updateMetricsTemplate) - KubectlApplyWithTemplate(t, data, "soFallbackTemplate", soFallbackTemplate) _, err := ExecuteCommand(fmt.Sprintf("kubectl scale deployment/depl-workload-base --replicas=2 -n %s", namespace)) assert.NoErrorf(t, err, "cannot scale workload deployment - %s", err) diff --git a/tests/sequential/prometheus_metrics/prometheus_metrics_test.go b/tests/sequential/prometheus_metrics/prometheus_metrics_test.go index f86cb1e3e1a..4ff40b7041b 100644 --- a/tests/sequential/prometheus_metrics/prometheus_metrics_test.go +++ b/tests/sequential/prometheus_metrics/prometheus_metrics_test.go @@ -371,8 +371,8 @@ func testScalerErrors(t *testing.T, data templateData) { if val, ok := family["keda_scaler_errors"]; ok { errCounterVal1 := getErrorMetricsValue(val) - // wait for 10 seconds to correctly fetch metrics. - time.Sleep(10 * time.Second) + // wait for 20 seconds to correctly fetch metrics. + time.Sleep(20 * time.Second) family = fetchAndParsePrometheusMetrics(t, fmt.Sprintf("curl --insecure %s", kedaOperatorPrometheusURL)) if val, ok := family["keda_scaler_errors"]; ok {