Skip to content

Commit

Permalink
Refactored fallback logic to only patch status when the fallback is e… (
Browse files Browse the repository at this point in the history
#5659)

* refactored fallback logic to only patch status when the fallback is enabled

Signed-off-by: Bharath Guvvala <bharath.raghavendra@gmail.com>

* removed newline

Signed-off-by: Bharath Guvvala <bharath.raghavendra@gmail.com>

* formatted imports

Signed-off-by: Bharath Guvvala <bharath.raghavendra@gmail.com>

* WIP: verifying test failure fixes

Signed-off-by: Bharath Guvvala <bharath.raghavendra@gmail.com>

* WIP: verifying test failure fixes

Signed-off-by: Bharath Guvvala <bharath.raghavendra@gmail.com>

* WIP: verifying test failure fixes

Signed-off-by: Bharath Guvvala <bharath.raghavendra@gmail.com>

* WIP: verifying test failure fixes

Signed-off-by: Bharath Guvvala <bharath.raghavendra@gmail.com>

* WIP: verifying test failure fixes

Signed-off-by: Bharath Guvvala <bharath.raghavendra@gmail.com>

* Update pkg/fallback/fallback.go

Co-authored-by: Zbynek Roubalik <zroubalik@gmail.com>
Signed-off-by: Bharath Raghavendra Reddy Guvvala <bharath.raghavendra@gmail.com>

* fixed test failures

Signed-off-by: Bharath Guvvala <bharath.raghavendra@gmail.com>

* fixed test failures

Signed-off-by: Bharath Guvvala <bharath.raghavendra@gmail.com>

---------

Signed-off-by: Bharath Guvvala <bharath.raghavendra@gmail.com>
Signed-off-by: Bharath Raghavendra Reddy Guvvala <bharath.raghavendra@gmail.com>
Co-authored-by: Zbynek Roubalik <zroubalik@gmail.com>
  • Loading branch information
bharathguvvala and zroubalik authored Aug 2, 2024
1 parent 885d0ac commit 0d7c3bf
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 25 deletions.
5 changes: 5 additions & 0 deletions controllers/keda/scaledobject_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import (
"github.com/kedacore/keda/v2/pkg/common/message"
"github.com/kedacore/keda/v2/pkg/eventemitter"
"github.com/kedacore/keda/v2/pkg/eventreason"
"github.com/kedacore/keda/v2/pkg/fallback"
"github.com/kedacore/keda/v2/pkg/metricscollector"
"github.com/kedacore/keda/v2/pkg/scaling"
kedastatus "github.com/kedacore/keda/v2/pkg/status"
Expand Down Expand Up @@ -204,6 +205,10 @@ func (r *ScaledObjectReconciler) Reconcile(ctx context.Context, req ctrl.Request
conditions.SetReadyCondition(metav1.ConditionTrue, kedav1alpha1.ScaledObjectConditionReadySuccessReason, msg)
}

if scaledObject.Spec.Fallback == nil || !fallback.HasValidFallback(scaledObject) {
conditions.SetFallbackCondition(metav1.ConditionFalse, "NoFallbackFound", "No fallbacks are active on this scaled object")
}

if err := kedastatus.SetStatusConditions(ctx, r.Client, reqLogger, scaledObject, &conditions); err != nil {
r.EventEmitter.Emit(scaledObject, req.NamespacedName.Namespace, corev1.EventTypeWarning, eventingv1alpha1.ScaledObjectFailedType, eventreason.ScaledObjectUpdateFailed, err.Error())
return ctrl.Result{}, err
Expand Down
17 changes: 9 additions & 8 deletions pkg/fallback/fallback.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func GetMetricsWithFallback(ctx context.Context, client runtimeclient.Client, me
switch {
case !isFallbackEnabled(scaledObject, metricSpec):
return nil, false, suppressedError
case !validateFallback(scaledObject):
case !HasValidFallback(scaledObject):
log.Info("Failed to validate ScaledObject Spec. Please check that parameters are positive integers", "scaledObject.Namespace", scaledObject.Namespace, "scaledObject.Name", scaledObject.Name)
return nil, false, suppressedError
case *healthStatus.NumberOfFailures > scaledObject.Spec.Fallback.FailureThreshold:
Expand All @@ -82,11 +82,7 @@ func GetMetricsWithFallback(ctx context.Context, client runtimeclient.Client, me
}
}

func fallbackExistsInScaledObject(scaledObject *kedav1alpha1.ScaledObject, metricSpec v2.MetricSpec) bool {
if !isFallbackEnabled(scaledObject, metricSpec) || !validateFallback(scaledObject) {
return false
}

func fallbackExistsInScaledObject(scaledObject *kedav1alpha1.ScaledObject) bool {
for _, element := range scaledObject.Status.Health {
if element.Status == kedav1alpha1.HealthStatusFailing && *element.NumberOfFailures > scaledObject.Spec.Fallback.FailureThreshold {
return true
Expand All @@ -96,7 +92,7 @@ func fallbackExistsInScaledObject(scaledObject *kedav1alpha1.ScaledObject, metri
return false
}

func validateFallback(scaledObject *kedav1alpha1.ScaledObject) bool {
func HasValidFallback(scaledObject *kedav1alpha1.ScaledObject) bool {
modifierChecking := true
if scaledObject.IsUsingModifiers() {
value, err := strconv.ParseInt(scaledObject.Spec.Advanced.ScalingModifiers.Target, 10, 64)
Expand Down Expand Up @@ -132,7 +128,12 @@ func doFallback(scaledObject *kedav1alpha1.ScaledObject, metricSpec v2.MetricSpe
func updateStatus(ctx context.Context, client runtimeclient.Client, scaledObject *kedav1alpha1.ScaledObject, status *kedav1alpha1.ScaledObjectStatus, metricSpec v2.MetricSpec) {
patch := runtimeclient.MergeFrom(scaledObject.DeepCopy())

if fallbackExistsInScaledObject(scaledObject, metricSpec) {
if !isFallbackEnabled(scaledObject, metricSpec) || !HasValidFallback(scaledObject) {
log.V(1).Info("Fallback is not enabled, hence skipping the health update to the scaledobject", "scaledObject.Namespace", scaledObject.Namespace, "scaledObject.Name", scaledObject.Name)
return
}

if fallbackExistsInScaledObject(scaledObject) {
status.Conditions.SetFallbackCondition(metav1.ConditionTrue, "FallbackExists", "At least one trigger is falling back on this scaled object")
} else {
status.Conditions.SetFallbackCondition(metav1.ConditionFalse, "NoFallbackFound", "No fallbacks are active on this scaled object")
Expand Down
41 changes: 36 additions & 5 deletions pkg/fallback/fallback_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ var _ = Describe("fallback", func() {
primeGetMetrics(scaler, expectedMetricValue)
so := buildScaledObject(nil, nil)
metricSpec := createMetricSpec(3)
expectStatusPatch(ctrl, client)

metrics, _, err := scaler.GetMetricsAndActivity(context.Background(), metricName)
metrics, _, err = GetMetricsWithFallback(context.Background(), client, metrics, err, metricName, so, metricSpec)
Expand All @@ -77,7 +76,7 @@ var _ = Describe("fallback", func() {
Expect(value).Should(Equal(expectedMetricValue))
})

It("should reset the health status when scaler metrics are available", func() {
It("should reset the health status when scaler metrics are available when fallback is enabled", func() {
expectedMetricValue := float64(6)
startingNumberOfFailures := int32(5)
primeGetMetrics(scaler, expectedMetricValue)
Expand Down Expand Up @@ -109,12 +108,41 @@ var _ = Describe("fallback", func() {
Expect(so.Status.Health[metricName]).To(haveFailureAndStatus(0, kedav1alpha1.HealthStatusHappy))
})

It("should not reset the health status when fallback is not enabled", func() {
expectedMetricValue := float64(6)
startingNumberOfFailures := int32(5)
primeGetMetrics(scaler, expectedMetricValue)

so := buildScaledObject(
nil,
&kedav1alpha1.ScaledObjectStatus{
Health: map[string]kedav1alpha1.HealthStatus{
metricName: {
NumberOfFailures: &startingNumberOfFailures,
Status: kedav1alpha1.HealthStatusFailing,
},
},
},
)

metricSpec := createMetricSpec(3)
expectNoStatusPatch(ctrl)

metrics, _, err := scaler.GetMetricsAndActivity(context.Background(), metricName)
metrics, _, err = GetMetricsWithFallback(context.Background(), client, metrics, err, metricName, so, metricSpec)

Expect(err).ToNot(HaveOccurred())
value := metrics[0].Value.AsApproximateFloat64()
Expect(value).Should(Equal(expectedMetricValue))
Expect(so.Status.Health[metricName]).To(haveFailureAndStatus(5, kedav1alpha1.HealthStatusFailing))
})

It("should propagate the error when fallback is disabled", func() {
scaler.EXPECT().GetMetricsAndActivity(gomock.Any(), gomock.Eq(metricName)).Return(nil, false, errors.New("some error"))

so := buildScaledObject(nil, nil)
metricSpec := createMetricSpec(3)
expectStatusPatch(ctrl, client)
expectNoStatusPatch(ctrl)

metrics, _, err := scaler.GetMetricsAndActivity(context.Background(), metricName)
_, _, err = GetMetricsWithFallback(context.Background(), client, metrics, err, metricName, so, metricSpec)
Expand Down Expand Up @@ -259,7 +287,6 @@ var _ = Describe("fallback", func() {
},
)
metricSpec := createMetricSpec(10)
expectStatusPatch(ctrl, client)

metrics, _, err := scaler.GetMetricsAndActivity(context.Background(), metricName)
_, _, err = GetMetricsWithFallback(context.Background(), client, metrics, err, metricName, so, metricSpec)
Expand Down Expand Up @@ -327,7 +354,6 @@ var _ = Describe("fallback", func() {
},
)
metricSpec := createMetricSpec(10)
expectStatusPatch(ctrl, client)

metrics, _, err := scaler.GetMetricsAndActivity(context.Background(), metricName)
_, _, err = GetMetricsWithFallback(context.Background(), client, metrics, err, metricName, so, metricSpec)
Expand Down Expand Up @@ -380,6 +406,11 @@ func expectStatusPatch(ctrl *gomock.Controller, client *mock_client.MockClient)
client.EXPECT().Status().Return(statusWriter)
}

func expectNoStatusPatch(ctrl *gomock.Controller) {
statusWriter := mock_client.NewMockStatusWriter(ctrl)
statusWriter.EXPECT().Patch(gomock.Any(), gomock.Any(), gomock.Any()).Times(0)
}

func buildScaledObject(fallbackConfig *kedav1alpha1.Fallback, status *kedav1alpha1.ScaledObjectStatus) *kedav1alpha1.ScaledObject {
scaledObject := &kedav1alpha1.ScaledObject{
ObjectMeta: metav1.ObjectMeta{Name: "clean-up-test", Namespace: "default"},
Expand Down
23 changes: 11 additions & 12 deletions pkg/scaling/scale_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ func TestGetScaledObjectMetrics_DirectCall(t *testing.T) {
recorder := record.NewFakeRecorder(1)
mockClient := mock_client.NewMockClient(ctrl)
mockExecutor := mock_executor.NewMockScaleExecutor(ctrl)
mockStatusWriter := mock_client.NewMockStatusWriter(ctrl)

metricsSpecs := []v2.MetricSpec{createMetricSpec(10, metricName)}
metricValue := scalers.GenerateMetricInMili(metricName, float64(10))
Expand Down Expand Up @@ -130,8 +129,7 @@ func TestGetScaledObjectMetrics_DirectCall(t *testing.T) {
mockExecutor.EXPECT().RequestScale(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any())
sh.checkScalers(context.TODO(), &scaledObject, &sync.RWMutex{})

mockClient.EXPECT().Status().Return(mockStatusWriter)
mockStatusWriter.EXPECT().Patch(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil)
expectNoStatusPatch(ctrl)
scaler.EXPECT().GetMetricSpecForScaling(gomock.Any()).Return(metricsSpecs)
// hitting directly GetMetricsAndActivity()
scaler.EXPECT().GetMetricsAndActivity(gomock.Any(), gomock.Any()).Return([]external_metrics.ExternalMetricValue{metricValue}, true, nil)
Expand All @@ -153,7 +151,6 @@ func TestGetScaledObjectMetrics_FromCache(t *testing.T) {
recorder := record.NewFakeRecorder(1)
mockClient := mock_client.NewMockClient(ctrl)
mockExecutor := mock_executor.NewMockScaleExecutor(ctrl)
mockStatusWriter := mock_client.NewMockStatusWriter(ctrl)

metricsSpecs := []v2.MetricSpec{createMetricSpec(10, metricName)}
metricValue := scalers.GenerateMetricInMili(metricName, float64(10))
Expand Down Expand Up @@ -221,8 +218,7 @@ func TestGetScaledObjectMetrics_FromCache(t *testing.T) {
mockExecutor.EXPECT().RequestScale(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any())
sh.checkScalers(context.TODO(), &scaledObject, &sync.RWMutex{})

mockClient.EXPECT().Status().Return(mockStatusWriter)
mockStatusWriter.EXPECT().Patch(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil)
expectNoStatusPatch(ctrl)
scaler.EXPECT().GetMetricSpecForScaling(gomock.Any()).Return(metricsSpecs)
// hitting cache here instead of calling GetMetricsAndActivity()
metrics, err := sh.GetScaledObjectMetrics(context.TODO(), scaledObjectName, scaledObjectNamespace, metricName)
Expand Down Expand Up @@ -259,7 +255,6 @@ func TestGetScaledObjectMetrics_InParallel(t *testing.T) {
recorder := record.NewFakeRecorder(1)
mockClient := mock_client.NewMockClient(ctrl)
mockExecutor := mock_executor.NewMockScaleExecutor(ctrl)
mockStatusWriter := mock_client.NewMockStatusWriter(ctrl)

scalerCollection := []*mock_scalers.MockScaler{}

Expand Down Expand Up @@ -353,8 +348,8 @@ func TestGetScaledObjectMetrics_InParallel(t *testing.T) {
return true
}, 1*time.Second, 400*time.Millisecond, "timeout exceeded: scalers not processed in parallel during `checkScalers`")

mockClient.EXPECT().Status().Times(len(metricNames)).Return(mockStatusWriter)
mockStatusWriter.EXPECT().Patch(gomock.Any(), gomock.Any(), gomock.Any()).Times(len(metricNames)).Return(nil)
expectNoStatusPatch(ctrl)

for i := 0; i < len(metricNames); i++ {
i := i
scalerCollection[i].EXPECT().GetMetricSpecForScaling(gomock.Any()).Return(metricsSpecFn(i))
Expand Down Expand Up @@ -916,7 +911,6 @@ func TestScalingModifiersFormula(t *testing.T) {
recorder := record.NewFakeRecorder(1)
mockClient := mock_client.NewMockClient(ctrl)
mockExecutor := mock_executor.NewMockScaleExecutor(ctrl)
mockStatusWriter := mock_client.NewMockStatusWriter(ctrl)

metricsSpecs1 := []v2.MetricSpec{createMetricSpec(2, metricName1)}
metricsSpecs2 := []v2.MetricSpec{createMetricSpec(5, metricName2)}
Expand Down Expand Up @@ -1007,8 +1001,8 @@ func TestScalingModifiersFormula(t *testing.T) {
mockExecutor.EXPECT().RequestScale(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any())
sh.checkScalers(context.TODO(), &scaledObject, &sync.RWMutex{})

mockClient.EXPECT().Status().Return(mockStatusWriter).Times(2)
mockStatusWriter.EXPECT().Patch(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).Times(2)
expectNoStatusPatch(ctrl)

scaler1.EXPECT().GetMetricSpecForScaling(gomock.Any()).Return(metricsSpecs1)
scaler2.EXPECT().GetMetricSpecForScaling(gomock.Any()).Return(metricsSpecs2)
scaler1.EXPECT().GetMetricsAndActivity(gomock.Any(), gomock.Any()).Return([]external_metrics.ExternalMetricValue{metricValue1, metricValue2}, true, nil)
Expand All @@ -1032,3 +1026,8 @@ func createMetricSpec(averageValue int64, metricName string) v2.MetricSpec {
},
}
}

func expectNoStatusPatch(ctrl *gomock.Controller) {
statusWriter := mock_client.NewMockStatusWriter(ctrl)
statusWriter.EXPECT().Patch(gomock.Any(), gomock.Any(), gomock.Any()).Times(0)
}

0 comments on commit 0d7c3bf

Please sign in to comment.