Skip to content

Commit

Permalink
add logic inside operator
Browse files Browse the repository at this point in the history
Signed-off-by: Jorge Turrado <jorge.turrado@scrm.lidl>
  • Loading branch information
JorTurFer committed Sep 25, 2023
1 parent 61c99f7 commit 9ab1a44
Show file tree
Hide file tree
Showing 7 changed files with 95 additions and 62 deletions.
4 changes: 4 additions & 0 deletions apis/keda/v1alpha1/scaledobject_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,10 @@ type AdvancedConfig struct {
type ScalingModifiers struct {
Formula string `json:"formula,omitempty"`
Target string `json:"target,omitempty"`
// +optional
ActivationTarget string `json:"activationTarget,omitempty"`
// +optional
MetricType autoscalingv2.MetricTargetType `json:"metricType,omitempty"`
}

// HorizontalPodAutoscalerConfig specifies horizontal scale config
Expand Down
26 changes: 1 addition & 25 deletions apis/keda/v1alpha1/scaledobject_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -392,31 +392,7 @@ func validateScalingModifiersTarget(so *ScaledObject) error {
return fmt.Errorf("error converting target for scalingModifiers (string->float) to valid target: %w", err)
}

// if target is given, composite-scaler will be passed to HPA -> all types
// need to be the same - make sure all metrics are of the same metricTargetType

var trigType autoscalingv2.MetricTargetType

// gauron99: possible TODO: more sofisticated check for trigger could be used here
// as well if solution is found (check just the right triggers that are used)
for _, trig := range so.Spec.Triggers {
if trig.Type == cpuString || trig.Type == memoryString || trig.Name == "" {
continue
}
var current autoscalingv2.MetricTargetType
if trig.MetricType == "" {
current = autoscalingv2.AverageValueMetricType // default is AverageValue
} else {
current = trig.MetricType
}
if trigType == "" {
trigType = current
} else if trigType != current {
err := fmt.Errorf("error trigger types are not the same for composite scaler: %s & %s", trigType, current)
return err
}
}
if trigType == autoscalingv2.UtilizationMetricType {
if so.Spec.Advanced.ScalingModifiers.MetricType == autoscalingv2.UtilizationMetricType {
err := fmt.Errorf("error trigger type is Utilization, but it needs to be AverageValue or Value for external metrics")
return err
}
Expand Down
1 change: 0 additions & 1 deletion apis/keda/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions config/crd/bases/keda.sh_scaledobjects.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -207,8 +207,15 @@ spec:
description: ScalingModifiers describes advanced scaling logic
options like formula
properties:
activationTarget:
type: string
formula:
type: string
metricType:
description: MetricTargetType specifies the type of metric
being targeted, and should be either "Value", "AverageValue",
or "Utilization"
type: string
target:
type: string
type: object
Expand Down
23 changes: 8 additions & 15 deletions controllers/keda/hpa.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,19 +258,12 @@ func (r *ScaledObjectReconciler) getScaledObjectMetricSpecs(ctx context.Context,
validNumTarget, _ := strconv.ParseFloat(scaledObject.Spec.Advanced.ScalingModifiers.Target, 64)

// check & get metric specs type
var validMetricType autoscalingv2.MetricTargetType
for _, metric := range metricSpecs {
if metric.External == nil {
continue
}
if validMetricType == "" {
validMetricType = metric.External.Target.Type
} else if metric.External.Target.Type != validMetricType {
err := fmt.Errorf("error metric target type is not the same for composite scaler: %s & %s", validMetricType, metric.External.Target.Type)
return nil, err
}
metricType := autoscalingv2.AverageValueMetricType
if scaledObject.Spec.Advanced.ScalingModifiers.MetricType != "" {
metricType = scaledObject.Spec.Advanced.ScalingModifiers.MetricType
}
if validMetricType == autoscalingv2.UtilizationMetricType {

if metricType == autoscalingv2.UtilizationMetricType {
err := fmt.Errorf("error metric target type is Utilization, but it needs to be AverageValue or Value for external metrics")
return nil, err
}
Expand All @@ -280,11 +273,11 @@ func (r *ScaledObjectReconciler) getScaledObjectMetricSpecs(ctx context.Context,
quan := resource.NewMilliQuantity(int64(validNumTarget*1000), resource.DecimalSI)

correctHpaTarget := autoscalingv2.MetricTarget{
Type: validMetricType,
Type: metricType,
}
if validMetricType == autoscalingv2.AverageValueMetricType {
if metricType == autoscalingv2.AverageValueMetricType {
correctHpaTarget.AverageValue = quan
} else if validMetricType == autoscalingv2.ValueMetricType {
} else if metricType == autoscalingv2.ValueMetricType {
correctHpaTarget.Value = quan
}
compMetricName := kedav1alpha1.CompositeMetricName
Expand Down
3 changes: 3 additions & 0 deletions pkg/eventreason/eventreason.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ const (
// KEDAScalerFailed is for event when a scaler fails for a ScaledJob or a ScaledObject
KEDAScalerFailed = "KEDAScalerFailed"

// KEDAMetricSourceFailed is for event when a scaler fails as metric source for custom formula
KEDAMetricSourceFailed = "KEDAMetricSourceFailed"

// KEDAScaleTargetActivated is for event when the scale target of ScaledObject was activated
KEDAScaleTargetActivated = "KEDAScaleTargetActivated"

Expand Down
93 changes: 72 additions & 21 deletions pkg/scaling/scale_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package scaling
import (
"context"
"fmt"
"strconv"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -555,6 +556,8 @@ func (h *scaleHandler) getScaledObjectState(ctx context.Context, scaledObject *k
isScaledObjectActive := false
isScalerError := false
metricsRecord := map[string]metricscache.MetricsRecord{}
metricTriggerPairList := make(map[string]string)
var matchingMetrics []external_metrics.ExternalMetricValue

cache, err := h.GetScalersCache(ctx, scaledObject)
prommetrics.RecordScaledObjectError(scaledObject.Namespace, scaledObject.Name, err)
Expand Down Expand Up @@ -592,9 +595,6 @@ func (h *scaleHandler) getScaledObjectState(ctx context.Context, scaledObject *k
// if cpu/memory resource scaler has minReplicas==0 & at least one external
// trigger exists -> object can be scaled to zero
if spec.External == nil {
if len(scaledObject.Spec.Triggers) <= cpuMemCount {
isScaledObjectActive = true
}
continue
}

Expand All @@ -605,6 +605,7 @@ func (h *scaleHandler) getScaledObjectState(ctx context.Context, scaledObject *k
if latency != -1 {
prommetrics.RecordScalerLatency(scaledObject.Namespace, scaledObject.Name, scalerName, scalerIndex, metricName, float64(latency))
}
matchingMetrics = append(matchingMetrics, metrics...)
logger.V(1).Info("Getting metrics and activity from scaler", "scaler", scalerName, "metricName", metricName, "metrics", metrics, "activity", isMetricActive, "scalerError", err)

if scalerConfigs[scalerIndex].TriggerUseCachedMetrics {
Expand All @@ -615,28 +616,47 @@ func (h *scaleHandler) getScaledObjectState(ctx context.Context, scaledObject *k
}
}

if err != nil {
isScalerError = true
logger.Error(err, "error getting scale decision", "scaler", scalerName)
cache.Recorder.Event(scaledObject, corev1.EventTypeWarning, eventreason.KEDAScalerFailed, err.Error())
} else {
for _, metric := range metrics {
metricValue := metric.Value.AsApproximateFloat64()
prommetrics.RecordScalerMetric(scaledObject.Namespace, scaledObject.Name, scalerName, scalerIndex, metric.MetricName, metricValue)
if scaledObject.IsUsingModifiers() {
if err != nil {
isScalerError = true
logger.Error(err, "error getting metric source", "source", scalerName)
cache.Recorder.Event(scaledObject, corev1.EventTypeWarning, eventreason.KEDAMetricSourceFailed, err.Error())
} else {
for _, metric := range metrics {
metricValue := metric.Value.AsApproximateFloat64()
prommetrics.RecordScalerMetric(scaledObject.Namespace, scaledObject.Name, scalerName, scalerIndex, metric.MetricName, metricValue)
}
}

if isMetricActive {
isScaledObjectActive = true
if spec.External != nil {
logger.V(1).Info("Scaler for scaledObject is active", "scaler", scalerName, "metricName", metricName)
prommetrics.RecordScalerError(scaledObject.Namespace, scaledObject.Name, scalerName, scalerIndex, metricName, err)
} else {
if err != nil {
isScalerError = true
logger.Error(err, "error getting scale decision", "scaler", scalerName)
cache.Recorder.Event(scaledObject, corev1.EventTypeWarning, eventreason.KEDAScalerFailed, err.Error())
} else {
for _, metric := range metrics {
metricValue := metric.Value.AsApproximateFloat64()
prommetrics.RecordScalerMetric(scaledObject.Namespace, scaledObject.Name, scalerName, scalerIndex, metric.MetricName, metricValue)
}
if spec.Resource != nil {
logger.V(1).Info("Scaler for scaledObject is active", "scaler", scalerName, "metricName", spec.Resource.Name)

if isMetricActive {
isScaledObjectActive = true
if spec.External != nil {
logger.V(1).Info("Scaler for scaledObject is active", "scaler", scalerName, "metricName", metricName)
}
if spec.Resource != nil {
logger.V(1).Info("Scaler for scaledObject is active", "scaler", scalerName, "metricName", spec.Resource.Name)
}
}
}
prommetrics.RecordScalerError(scaledObject.Namespace, scaledObject.Name, scalerName, scalerIndex, metricName, err)
prommetrics.RecordScalerActive(scaledObject.Namespace, scaledObject.Name, scalerName, scalerIndex, metricName, isMetricActive)
}

metricTriggerPairList, err = modifiers.AddPairTriggerAndMetric(metricTriggerPairList, scaledObject, metricName, scalerConfigs[scalerIndex].TriggerName)
if err != nil {
logger.Error(err, "error pairing triggers & metrics for compositeScaler")
}
prommetrics.RecordScalerError(scaledObject.Namespace, scaledObject.Name, scalerName, scalerIndex, metricName, err)
prommetrics.RecordScalerActive(scaledObject.Namespace, scaledObject.Name, scalerName, scalerIndex, metricName, isMetricActive)
}
}

Expand All @@ -650,7 +670,38 @@ func (h *scaleHandler) getScaledObjectState(ctx context.Context, scaledObject *k
logger.V(1).Info("scaler error encountered, clearing scaler cache")
}

return isScaledObjectActive, isScalerError, metricsRecord, nil
// apply scaling modifiers
matchingMetrics = modifiers.HandleScalingModifiers(scaledObject, matchingMetrics, metricTriggerPairList, false, cache, logger)

// when we are using formula, we need to reevaluate if it's active here
if scaledObject.IsUsingModifiers() && !isScalerError {
isScaledObjectActive = false
activationValue := float64(0)
if scaledObject.Spec.Advanced.ScalingModifiers.ActivationTarget != "" {
targetQueryValue, err := strconv.ParseFloat(scaledObject.Spec.Advanced.ScalingModifiers.ActivationTarget, 64)
if err != nil {
return false, true, metricsRecord, fmt.Errorf("scalingModifiers.ActivationTarget parsing error %w", err)
}
activationValue = targetQueryValue
}

for _, metric := range matchingMetrics {
value := metric.Value.AsApproximateFloat64()
prommetrics.RecordScalerMetric(scaledObject.Namespace, scaledObject.Name, kedav1alpha1.CompositeMetricName, 0, metric.MetricName, value)
prommetrics.RecordScalerActive(scaledObject.Namespace, scaledObject.Name, kedav1alpha1.CompositeMetricName, 0, metric.MetricName, value > activationValue)
if !isScaledObjectActive {
isScaledObjectActive = value > activationValue
}
}

}

// if cpu/memory resource scaler has minReplicas==0 & at least one external
// trigger exists -> object can be scaled to zero
if len(scaledObject.Spec.Triggers) <= cpuMemCount && !isScalerError {
isScaledObjectActive = true
}
return isScaledObjectActive, isScalerError, metricsRecord, err
}

// / --------------------------------------------------------------------------- ///
Expand Down

0 comments on commit 9ab1a44

Please sign in to comment.