From 401940199000388ef636faecb022de8c2fff506f Mon Sep 17 00:00:00 2001 From: Tobo Atchou Date: Fri, 19 May 2023 09:38:25 +0200 Subject: [PATCH] refactor: util functions for Status & Conditions handling (#4487) Signed-off-by: Tobo Atchou --- CHANGELOG.md | 1 + controllers/keda/hpa.go | 5 +- controllers/keda/scaledjob_controller.go | 6 +- controllers/keda/scaledobject_controller.go | 7 +- pkg/scaling/executor/scale_executor.go | 74 ++++++++++----------- pkg/scaling/executor/scale_scaledobjects.go | 3 +- {controllers/keda => pkg}/util/status.go | 63 +++++++++++++----- 7 files changed, 95 insertions(+), 64 deletions(-) rename {controllers/keda => pkg}/util/status.go (51%) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1d76cf2b337..001677880a2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -87,6 +87,7 @@ New deprecation(s): - **General**: Fix odd number of arguments passed as key-value pairs for logging ([#4368](https://github.com/kedacore/keda/issues/4368)) - **General**: Automatically scale test clusters in/out to reduce environmental footprint & improve cost-efficiency ([#4456](https://github.com/kedacore/keda/pull/4456)) - **General**: Use default metrics provider from sigs.k8s.io/custom-metrics-apiserver ([#4473](https://github.com/kedacore/keda/pull/4473)) +- **General**: Refactor several functions for Status & Conditions handling into pkg util functions ([#2906](https://github.com/kedacore/keda/pull/2906)) ## v2.10.0 diff --git a/controllers/keda/hpa.go b/controllers/keda/hpa.go index 4ee7a2da9d8..febeb0964b4 100644 --- a/controllers/keda/hpa.go +++ b/controllers/keda/hpa.go @@ -32,6 +32,7 @@ import ( kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1" kedacontrollerutil "github.com/kedacore/keda/v2/controllers/keda/util" "github.com/kedacore/keda/v2/pkg/scaling/executor" + kedautil "github.com/kedacore/keda/v2/pkg/util" version "github.com/kedacore/keda/v2/version" ) @@ -60,7 +61,7 @@ func (r *ScaledObjectReconciler) createAndDeployNewHPA(ctx context.Context, logg status := scaledObject.Status.DeepCopy() status.HpaName = hpaName - err = kedacontrollerutil.UpdateScaledObjectStatus(ctx, r.Client, logger, scaledObject, status) + err = kedautil.UpdateScaledObjectStatus(ctx, r.Client, logger, scaledObject, status) if err != nil { logger.Error(err, "Error updating scaledObject status with used hpaName") return err @@ -237,7 +238,7 @@ func (r *ScaledObjectReconciler) getScaledObjectMetricSpecs(ctx context.Context, updateHealthStatus(scaledObject, externalMetricNames, status) - err = kedacontrollerutil.UpdateScaledObjectStatus(ctx, r.Client, logger, scaledObject, status) + err = kedautil.UpdateScaledObjectStatus(ctx, r.Client, logger, scaledObject, status) if err != nil { logger.Error(err, "Error updating scaledObject status with used externalMetricNames") return nil, err diff --git a/controllers/keda/scaledjob_controller.go b/controllers/keda/scaledjob_controller.go index 4adb1ee4f53..806f095e455 100644 --- a/controllers/keda/scaledjob_controller.go +++ b/controllers/keda/scaledjob_controller.go @@ -39,10 +39,10 @@ import ( "sigs.k8s.io/controller-runtime/pkg/predicate" kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1" - kedacontrollerutil "github.com/kedacore/keda/v2/controllers/keda/util" "github.com/kedacore/keda/v2/pkg/eventreason" "github.com/kedacore/keda/v2/pkg/prommetrics" "github.com/kedacore/keda/v2/pkg/scaling" + kedautil "github.com/kedacore/keda/v2/pkg/util" ) // +kubebuilder:rbac:groups=keda.sh,resources=scaledjobs;scaledjobs/finalizers;scaledjobs/status,verbs="*" @@ -124,7 +124,7 @@ func (r *ScaledJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( // ensure Status Conditions are initialized if !scaledJob.Status.Conditions.AreInitialized() { conditions := kedav1alpha1.GetInitializedConditions() - if err := kedacontrollerutil.SetStatusConditions(ctx, r.Client, reqLogger, scaledJob, conditions); err != nil { + if err := kedautil.SetStatusConditions(ctx, r.Client, reqLogger, scaledJob, conditions); err != nil { return ctrl.Result{}, err } } @@ -152,7 +152,7 @@ func (r *ScaledJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( conditions.SetReadyCondition(metav1.ConditionTrue, "ScaledJobReady", msg) } - if err := kedacontrollerutil.SetStatusConditions(ctx, r.Client, reqLogger, scaledJob, &conditions); err != nil { + if err := kedautil.SetStatusConditions(ctx, r.Client, reqLogger, scaledJob, &conditions); err != nil { return ctrl.Result{}, err } return ctrl.Result{}, err diff --git a/controllers/keda/scaledobject_controller.go b/controllers/keda/scaledobject_controller.go index 3a293fef371..058859411c9 100644 --- a/controllers/keda/scaledobject_controller.go +++ b/controllers/keda/scaledobject_controller.go @@ -46,6 +46,7 @@ import ( "github.com/kedacore/keda/v2/pkg/eventreason" "github.com/kedacore/keda/v2/pkg/prommetrics" "github.com/kedacore/keda/v2/pkg/scaling" + kedautil "github.com/kedacore/keda/v2/pkg/util" ) // +kubebuilder:rbac:groups=keda.sh,resources=scaledobjects;scaledobjects/finalizers;scaledobjects/status,verbs="*" @@ -166,7 +167,7 @@ func (r *ScaledObjectReconciler) Reconcile(ctx context.Context, req ctrl.Request // ensure Status Conditions are initialized if !scaledObject.Status.Conditions.AreInitialized() { conditions := kedav1alpha1.GetInitializedConditions() - if err := kedacontrollerutil.SetStatusConditions(ctx, r.Client, reqLogger, scaledObject, conditions); err != nil { + if err := kedautil.SetStatusConditions(ctx, r.Client, reqLogger, scaledObject, conditions); err != nil { return ctrl.Result{}, err } } @@ -188,7 +189,7 @@ func (r *ScaledObjectReconciler) Reconcile(ctx context.Context, req ctrl.Request conditions.SetReadyCondition(metav1.ConditionTrue, kedav1alpha1.ScaledObjectConditionReadySucccesReason, msg) } - if err := kedacontrollerutil.SetStatusConditions(ctx, r.Client, reqLogger, scaledObject, &conditions); err != nil { + if err := kedautil.SetStatusConditions(ctx, r.Client, reqLogger, scaledObject, &conditions); err != nil { return ctrl.Result{}, err } @@ -325,7 +326,7 @@ func (r *ScaledObjectReconciler) checkTargetResourceIsScalable(ctx context.Conte status.PausedReplicaCount = nil } - if err := kedacontrollerutil.UpdateScaledObjectStatus(ctx, r.Client, logger, scaledObject, status); err != nil { + if err := kedautil.UpdateScaledObjectStatus(ctx, r.Client, logger, scaledObject, status); err != nil { return gvkr, err } logger.Info("Detected resource targeted for scaling", "resource", gvkString, "name", scaledObject.Spec.ScaleTargetRef.Name) diff --git a/pkg/scaling/executor/scale_executor.go b/pkg/scaling/executor/scale_executor.go index 78ca082d840..2e0d6a83e31 100644 --- a/pkg/scaling/executor/scale_executor.go +++ b/pkg/scaling/executor/scale_executor.go @@ -29,6 +29,7 @@ import ( logf "sigs.k8s.io/controller-runtime/pkg/log" kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1" + kedautil "github.com/kedacore/keda/v2/pkg/util" ) const ( @@ -62,52 +63,47 @@ func NewScaleExecutor(client runtimeclient.Client, scaleClient scale.ScalesGette } func (e *scaleExecutor) updateLastActiveTime(ctx context.Context, logger logr.Logger, object interface{}) error { - var patch runtimeclient.Patch - now := metav1.Now() - runtimeObj := object.(runtimeclient.Object) - switch obj := runtimeObj.(type) { - case *kedav1alpha1.ScaledObject: - patch = runtimeclient.MergeFrom(obj.DeepCopy()) - obj.Status.LastActiveTime = &now - case *kedav1alpha1.ScaledJob: - patch = runtimeclient.MergeFrom(obj.DeepCopy()) - obj.Status.LastActiveTime = &now - default: - err := fmt.Errorf("unknown scalable object type %v", obj) - logger.Error(err, "Failed to patch Objects Status") - return err - } - - err := e.client.Status().Patch(ctx, runtimeObj, patch) - if err != nil { - logger.Error(err, "Failed to patch Objects Status") + transform := func(runtimeObj runtimeclient.Object, target interface{}) error { + now, ok := target.(metav1.Time) + if !ok { + return fmt.Errorf("transform target is not metav1.Time type %v", target) + } + switch obj := runtimeObj.(type) { + case *kedav1alpha1.ScaledObject: + obj.Status.LastActiveTime = &now + case *kedav1alpha1.ScaledJob: + obj.Status.LastActiveTime = &now + default: + } + return nil } - return err + return kedautil.TransformObject(ctx, e.client, logger, object, now, transform) } func (e *scaleExecutor) setCondition(ctx context.Context, logger logr.Logger, object interface{}, status metav1.ConditionStatus, reason string, message string, setCondition func(kedav1alpha1.Conditions, metav1.ConditionStatus, string, string)) error { - var patch runtimeclient.Patch - - runtimeObj := object.(runtimeclient.Object) - switch obj := runtimeObj.(type) { - case *kedav1alpha1.ScaledObject: - patch = runtimeclient.MergeFrom(obj.DeepCopy()) - setCondition(obj.Status.Conditions, status, reason, message) - case *kedav1alpha1.ScaledJob: - patch = runtimeclient.MergeFrom(obj.DeepCopy()) - setCondition(obj.Status.Conditions, status, reason, message) - default: - err := fmt.Errorf("unknown scalable object type %v", obj) - logger.Error(err, "Failed to patch Objects Status") - return err + type transformStruct struct { + status metav1.ConditionStatus + reason string + message string } - - err := e.client.Status().Patch(ctx, runtimeObj, patch) - if err != nil { - logger.Error(err, "Failed to patch Objects Status") + transform := func(runtimeObj runtimeclient.Object, target interface{}) error { + transformObj := target.(*transformStruct) + switch obj := runtimeObj.(type) { + case *kedav1alpha1.ScaledObject: + setCondition(obj.Status.Conditions, transformObj.status, transformObj.reason, transformObj.message) + case *kedav1alpha1.ScaledJob: + setCondition(obj.Status.Conditions, transformObj.status, transformObj.reason, transformObj.message) + default: + } + return nil + } + target := transformStruct{ + status: status, + reason: reason, + message: message, } - return err + return kedautil.TransformObject(ctx, e.client, logger, object, &target, transform) } func (e *scaleExecutor) setReadyCondition(ctx context.Context, logger logr.Logger, object interface{}, status metav1.ConditionStatus, reason string, message string) error { diff --git a/pkg/scaling/executor/scale_scaledobjects.go b/pkg/scaling/executor/scale_scaledobjects.go index 1411f2b186f..10cbf2470dc 100644 --- a/pkg/scaling/executor/scale_scaledobjects.go +++ b/pkg/scaling/executor/scale_scaledobjects.go @@ -31,6 +31,7 @@ import ( kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1" kedacontrollerutil "github.com/kedacore/keda/v2/controllers/keda/util" "github.com/kedacore/keda/v2/pkg/eventreason" + kedautil "github.com/kedacore/keda/v2/pkg/util" ) func (e *scaleExecutor) RequestScale(ctx context.Context, scaledObject *kedav1alpha1.ScaledObject, isActive bool, isError bool) { @@ -106,7 +107,7 @@ func (e *scaleExecutor) RequestScale(ctx context.Context, scaledObject *kedav1al return } status.PausedReplicaCount = pausedCount - err = kedacontrollerutil.UpdateScaledObjectStatus(ctx, e.client, logger, scaledObject, status) + err = kedautil.UpdateScaledObjectStatus(ctx, e.client, logger, scaledObject, status) if err != nil { logger.Error(err, "error updating status paused replica count") return diff --git a/controllers/keda/util/status.go b/pkg/util/status.go similarity index 51% rename from controllers/keda/util/status.go rename to pkg/util/status.go index 3e36d848c74..479ead8b083 100644 --- a/controllers/keda/util/status.go +++ b/pkg/util/status.go @@ -1,5 +1,5 @@ /* -Copyright 2021 The KEDA Authors +Copyright 2023 The KEDA Authors Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -28,36 +28,67 @@ import ( // SetStatusConditions patches given object with passed list of conditions based on the object's type or returns an error. func SetStatusConditions(ctx context.Context, client runtimeclient.StatusClient, logger logr.Logger, object interface{}, conditions *kedav1alpha1.Conditions) error { + transform := func(runtimeObj runtimeclient.Object, target interface{}) error { + conditions, ok := target.(*kedav1alpha1.Conditions) + if !ok { + return fmt.Errorf("transform target is not kedav1alpha1.Conditions type %v", target) + } + switch obj := runtimeObj.(type) { + case *kedav1alpha1.ScaledObject: + obj.Status.Conditions = *conditions + case *kedav1alpha1.ScaledJob: + obj.Status.Conditions = *conditions + default: + } + return nil + } + return TransformObject(ctx, client, logger, object, conditions, transform) +} + +// UpdateScaledObjectStatus patches the given ScaledObject with the updated status passed to it or returns an error. +func UpdateScaledObjectStatus(ctx context.Context, client runtimeclient.StatusClient, logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject, status *kedav1alpha1.ScaledObjectStatus) error { + transform := func(runtimeObj runtimeclient.Object, target interface{}) error { + status, ok := target.(*kedav1alpha1.ScaledObjectStatus) + if !ok { + return fmt.Errorf("transform target is not kedav1alpha1.ScaledObjectStatus type %v", target) + } + switch obj := runtimeObj.(type) { + case *kedav1alpha1.ScaledObject: + obj.Status = *status + default: + } + return nil + } + return TransformObject(ctx, client, logger, scaledObject, status, transform) +} + +// TransformObject patches the given object with the targeted passed to it through a transformer function or returns an error. +func TransformObject(ctx context.Context, client runtimeclient.StatusClient, logger logr.Logger, object interface{}, target interface{}, transform func(runtimeclient.Object, interface{}) error) error { var patch runtimeclient.Patch runtimeObj := object.(runtimeclient.Object) switch obj := runtimeObj.(type) { case *kedav1alpha1.ScaledObject: patch = runtimeclient.MergeFrom(obj.DeepCopy()) - obj.Status.Conditions = *conditions + if err := transform(obj, target); err != nil { + logger.Error(err, "failed to patch ScaledObject") + return err + } case *kedav1alpha1.ScaledJob: patch = runtimeclient.MergeFrom(obj.DeepCopy()) - obj.Status.Conditions = *conditions + if err := transform(obj, target); err != nil { + logger.Error(err, "failed to patch ScaledJob") + return err + } default: err := fmt.Errorf("unknown scalable object type %v", obj) - logger.Error(err, "Failed to patch Objects Status with Conditions") + logger.Error(err, "failed to patch Objects") return err } err := client.Status().Patch(ctx, runtimeObj, patch) if err != nil { - logger.Error(err, "Failed to patch Objects Status with Conditions") - } - return err -} - -// UpdateScaledObjectStatus patches the given ScaledObject with the updated status passed to it or returns an error. -func UpdateScaledObjectStatus(ctx context.Context, client runtimeclient.StatusClient, logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject, status *kedav1alpha1.ScaledObjectStatus) error { - patch := runtimeclient.MergeFrom(scaledObject.DeepCopy()) - scaledObject.Status = *status - err := client.Status().Patch(ctx, scaledObject, patch) - if err != nil { - logger.Error(err, "Failed to patch ScaledObjects Status") + logger.Error(err, "failed to patch Objects") } return err }