From 93dc6baab97b18cb45894093c8e2a16aae5c5875 Mon Sep 17 00:00:00 2001 From: Zbynek Roubalik Date: Fri, 4 Nov 2022 16:56:35 -0400 Subject: [PATCH 1/3] chore: use kedacore hosted images for prom e2e tests (#3824) Signed-off-by: Zbynek Roubalik --- tests/scalers/prometheus/prometheus_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/scalers/prometheus/prometheus_test.go b/tests/scalers/prometheus/prometheus_test.go index d5e9261b60c..53ee04fea03 100644 --- a/tests/scalers/prometheus/prometheus_test.go +++ b/tests/scalers/prometheus/prometheus_test.go @@ -65,7 +65,7 @@ spec: spec: containers: - name: prom-test-app - image: quay.io/zroubalik/prometheus-app:latest + image: ghcr.io/kedacore/tests-prometheus:latest imagePullPolicy: IfNotPresent securityContext: allowPrivilegeEscalation: false @@ -98,7 +98,7 @@ spec: spec: containers: - name: prom-test-app - image: quay.io/zroubalik/prometheus-app:latest + image: ghcr.io/kedacore/tests-prometheus:latest imagePullPolicy: IfNotPresent securityContext: allowPrivilegeEscalation: false From 4653241e8eb2d1d8ecfe76e2070ecaac9c346264 Mon Sep 17 00:00:00 2001 From: Garret Wyman Date: Fri, 4 Nov 2022 17:27:49 -0400 Subject: [PATCH 2/3] Metrics api unsafessl (#3823) Signed-off-by: Garret Wyman Co-authored-by: Jorge Turrado Ferrero --- CHANGELOG.md | 1 + pkg/scalers/metrics_api_scaler.go | 13 +++++++++++-- pkg/scalers/metrics_api_scaler_test.go | 6 ++++++ 3 files changed, 18 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 144e37954cd..8fdf78eff2d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -58,6 +58,7 @@ To learn more about active deprecations, we recommend checking [GitHub Discussio - **GCP Storage Scaler:** Add prefix and delimiter support ([#3756](https://github.com/kedacore/keda/issues/3756)) - **Prometheus Scaler:** Introduce skipping of certificate check for unsigned certs ([#2310](https://github.com/kedacore/keda/issues/2310)) - **Event Hubs Scaler:** Support Azure Active Direcotry Pod & Workload Identity for Storage Blobs ([#3569](https://github.com/kedacore/keda/issues/3569)) +- **Metrics API Scaler:** Add unsafeSsl paramater to skip certificate validation when connecting over HTTPS ([#3728](https://github.com/kedacore/keda/discussions/3728)) ### Fixes diff --git a/pkg/scalers/metrics_api_scaler.go b/pkg/scalers/metrics_api_scaler.go index 0779cd39eac..f8e7fa33532 100644 --- a/pkg/scalers/metrics_api_scaler.go +++ b/pkg/scalers/metrics_api_scaler.go @@ -33,6 +33,7 @@ type metricsAPIScalerMetadata struct { activationTargetValue float64 url string valueLocation string + unsafeSsl bool // apiKeyAuth enableAPIKeyAuth bool @@ -76,14 +77,13 @@ func NewMetricsAPIScaler(config *ScalerConfig) (Scaler, error) { return nil, fmt.Errorf("error parsing metric API metadata: %s", err) } - httpClient := kedautil.CreateHTTPClient(config.GlobalHTTPTimeout, false) + httpClient := kedautil.CreateHTTPClient(config.GlobalHTTPTimeout, meta.unsafeSsl) if meta.enableTLS || len(meta.ca) > 0 { config, err := kedautil.NewTLSConfig(meta.cert, meta.key, meta.ca) if err != nil { return nil, err } - httpClient.Transport = &http.Transport{TLSClientConfig: config} } @@ -99,6 +99,15 @@ func parseMetricsAPIMetadata(config *ScalerConfig) (*metricsAPIScalerMetadata, e meta := metricsAPIScalerMetadata{} meta.scalerIndex = config.ScalerIndex + meta.unsafeSsl = false + if val, ok := config.TriggerMetadata["unsafeSsl"]; ok { + unsafeSsl, err := strconv.ParseBool(val) + if err != nil { + return nil, fmt.Errorf("error parsing unsafeSsl: %s", err) + } + meta.unsafeSsl = unsafeSsl + } + if val, ok := config.TriggerMetadata["targetValue"]; ok { targetValue, err := strconv.ParseFloat(val, 64) if err != nil { diff --git a/pkg/scalers/metrics_api_scaler_test.go b/pkg/scalers/metrics_api_scaler_test.go index 755550d29cd..badefdfe1c9 100644 --- a/pkg/scalers/metrics_api_scaler_test.go +++ b/pkg/scalers/metrics_api_scaler_test.go @@ -67,6 +67,12 @@ var testMetricsAPIAuthMetadata = []metricAPIAuthMetadataTestData{ {map[string]string{"url": "http://dummy:1230/api/v1/", "valueLocation": "metric", "targetValue": "42", "authMode": "bearer"}, map[string]string{"token": "bearerTokenValue"}, false}, // fail bearerAuth without token {map[string]string{"url": "http://dummy:1230/api/v1/", "valueLocation": "metric", "targetValue": "42", "authMode": "bearer"}, map[string]string{}, true}, + // success unsafeSsl true + {map[string]string{"url": "http://dummy:1230/api/v1/", "valueLocation": "metric", "targetValue": "42", "unsafeSsl": "true"}, map[string]string{}, false}, + // success unsafeSsl false + {map[string]string{"url": "http://dummy:1230/api/v1/", "valueLocation": "metric", "targetValue": "42", "unsafeSsl": "false"}, map[string]string{}, false}, + // failed unsafeSsl non bool + {map[string]string{"url": "http://dummy:1230/api/v1/", "valueLocation": "metric", "targetValue": "42", "unsafeSsl": "yes"}, map[string]string{}, true}, } func TestParseMetricsAPIMetadata(t *testing.T) { From bcc85ec96386b182cf9a62f15470d09b086b947d Mon Sep 17 00:00:00 2001 From: Vighnesh Shenoy Date: Mon, 7 Nov 2022 14:36:09 +0530 Subject: [PATCH 3/3] feat: Provide Prometheus metrics for total number of custom resources. (#3719) --- CHANGELOG.md | 3 +- ...clustertriggerauthentication_controller.go | 56 +++++- .../clustertriggerauthentication_finalizer.go | 19 ++ controllers/keda/scaledjob_controller.go | 73 +++----- controllers/keda/scaledjob_finalizer.go | 2 +- controllers/keda/scaledobject_controller.go | 55 +++--- controllers/keda/scaledobject_finalizer.go | 2 +- .../keda/triggerauthentication_controller.go | 56 +++++- .../keda/triggerauthentication_finalizer.go | 19 ++ controllers/keda/util/finalizer.go | 73 ++++++++ main.go | 10 +- pkg/metrics/operator_prom_metrics.go | 33 ++++ tests/clean-crds.sh | 4 +- .../prometheus_metrics_test.go | 172 +++++++++++++++--- 14 files changed, 457 insertions(+), 120 deletions(-) create mode 100644 controllers/keda/clustertriggerauthentication_finalizer.go create mode 100644 controllers/keda/triggerauthentication_finalizer.go create mode 100644 controllers/keda/util/finalizer.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 8fdf78eff2d..f29a8402d90 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -38,9 +38,10 @@ To learn more about active deprecations, we recommend checking [GitHub Discussio ### New -- **General**: Expand Prometheus metric with label "ScalerName" to distinguish different triggers. The scaleName is defined per Trigger.Name ([#3588](https://github.com/kedacore/keda/issues/3588) +- **General**: Expand Prometheus metric with label "ScalerName" to distinguish different triggers. The scaleName is defined per Trigger.Name ([#3588](https://github.com/kedacore/keda/issues/3588)) - **General:** Introduce new Loki Scaler ([#3699](https://github.com/kedacore/keda/issues/3699)) - **General**: Add ratelimitting parameters to keda manager to allow override of client defaults ([#3730](https://github.com/kedacore/keda/issues/2920)) +- **General**: Provide Prometheus metric with indication of total number of custom resources per namespace for each custom resource type (CRD). ([#2637](https://github.com/kedacore/keda/issues/2637)|[#2638](https://github.com/kedacore/keda/issues/2638)|[#2639](https://github.com/kedacore/keda/issues/2639)) - **General**: Provide Prometheus metric with indication of total number of triggers per trigger type in `ScaledJob`/`ScaledObject`. ([#3663](https://github.com/kedacore/keda/issues/3663)) - **AWS Scalers**: Add setting AWS endpoint url. ([#3337](https://github.com/kedacore/keda/issues/3337)) - **Azure Service Bus Scaler**: Add support for Shared Access Signature (SAS) tokens for authentication. ([#2920](https://github.com/kedacore/keda/issues/2920)) diff --git a/controllers/keda/clustertriggerauthentication_controller.go b/controllers/keda/clustertriggerauthentication_controller.go index 7f4a4c104ae..8e9f6b392d2 100644 --- a/controllers/keda/clustertriggerauthentication_controller.go +++ b/controllers/keda/clustertriggerauthentication_controller.go @@ -18,10 +18,10 @@ package keda import ( "context" + "sync" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" - "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/tools/record" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/builder" @@ -31,13 +31,27 @@ import ( kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1" "github.com/kedacore/keda/v2/pkg/eventreason" + "github.com/kedacore/keda/v2/pkg/metrics" ) // ClusterTriggerAuthenticationReconciler reconciles a ClusterTriggerAuthentication object type ClusterTriggerAuthenticationReconciler struct { client.Client - Scheme *runtime.Scheme - Recorder record.EventRecorder + record.EventRecorder +} + +type clusterTriggerAuthMetricsData struct { + namespace string +} + +var ( + clusterTriggerAuthMetricsMap map[string]clusterTriggerAuthMetricsData + clusterTriggerAuthMetricsLock *sync.Mutex +) + +func init() { + clusterTriggerAuthMetricsMap = make(map[string]clusterTriggerAuthMetricsData) + clusterTriggerAuthMetricsLock = &sync.Mutex{} } // +kubebuilder:rbac:groups=keda.sh,resources=clustertriggerauthentications;clustertriggerauthentications/status,verbs="*" @@ -52,17 +66,21 @@ func (r *ClusterTriggerAuthenticationReconciler) Reconcile(ctx context.Context, if errors.IsNotFound(err) { return ctrl.Result{}, nil } - reqLogger.Error(err, "Failed ot get ClusterTriggerAuthentication") + reqLogger.Error(err, "Failed to get ClusterTriggerAuthentication") return ctrl.Result{}, err } if clusterTriggerAuthentication.GetDeletionTimestamp() != nil { - r.Recorder.Event(clusterTriggerAuthentication, corev1.EventTypeNormal, eventreason.ClusterTriggerAuthenticationDeleted, "ClusterTriggerAuthentication was deleted") - return ctrl.Result{}, nil + return ctrl.Result{}, r.finalizeClusterTriggerAuthentication(ctx, reqLogger, clusterTriggerAuthentication, req.NamespacedName.String()) } + if err := r.ensureFinalizer(ctx, reqLogger, clusterTriggerAuthentication); err != nil { + return ctrl.Result{}, err + } + r.updateMetrics(clusterTriggerAuthentication, req.NamespacedName.String()) + if clusterTriggerAuthentication.ObjectMeta.Generation == 1 { - r.Recorder.Event(clusterTriggerAuthentication, corev1.EventTypeNormal, eventreason.ClusterTriggerAuthenticationAdded, "New ClusterTriggerAuthentication configured") + r.EventRecorder.Event(clusterTriggerAuthentication, corev1.EventTypeNormal, eventreason.ClusterTriggerAuthenticationAdded, "New ClusterTriggerAuthentication configured") } return ctrl.Result{}, nil } @@ -73,3 +91,27 @@ func (r *ClusterTriggerAuthenticationReconciler) SetupWithManager(mgr ctrl.Manag For(&kedav1alpha1.ClusterTriggerAuthentication{}, builder.WithPredicates(predicate.GenerationChangedPredicate{})). Complete(r) } + +func (r *ClusterTriggerAuthenticationReconciler) updateMetrics(clusterTriggerAuth *kedav1alpha1.ClusterTriggerAuthentication, namespacedName string) { + clusterTriggerAuthMetricsLock.Lock() + defer clusterTriggerAuthMetricsLock.Unlock() + + if metricsData, ok := clusterTriggerAuthMetricsMap[namespacedName]; ok { + metrics.DecrementCRDTotal(metrics.ClusterTriggerAuthenticationResource, metricsData.namespace) + } + + metrics.IncrementCRDTotal(metrics.ClusterTriggerAuthenticationResource, clusterTriggerAuth.Namespace) + clusterTriggerAuthMetricsMap[namespacedName] = clusterTriggerAuthMetricsData{namespace: clusterTriggerAuth.Namespace} +} + +// this method is idempotent, so it can be called multiple times without side-effects +func (r *ClusterTriggerAuthenticationReconciler) UpdateMetricsOnDelete(namespacedName string) { + clusterTriggerAuthMetricsLock.Lock() + defer clusterTriggerAuthMetricsLock.Unlock() + + if metricsData, ok := clusterTriggerAuthMetricsMap[namespacedName]; ok { + metrics.DecrementCRDTotal(metrics.ClusterTriggerAuthenticationResource, metricsData.namespace) + } + + delete(clusterTriggerAuthMetricsMap, namespacedName) +} diff --git a/controllers/keda/clustertriggerauthentication_finalizer.go b/controllers/keda/clustertriggerauthentication_finalizer.go new file mode 100644 index 00000000000..4fff965b325 --- /dev/null +++ b/controllers/keda/clustertriggerauthentication_finalizer.go @@ -0,0 +1,19 @@ +package keda + +import ( + "context" + + "github.com/go-logr/logr" + + kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1" + "github.com/kedacore/keda/v2/controllers/keda/util" +) + +func (r *ClusterTriggerAuthenticationReconciler) ensureFinalizer(ctx context.Context, logger logr.Logger, clusterTriggerAuth *kedav1alpha1.ClusterTriggerAuthentication) error { + return util.EnsureAuthenticationResourceFinalizer(ctx, logger, r, clusterTriggerAuth) +} + +func (r *ClusterTriggerAuthenticationReconciler) finalizeClusterTriggerAuthentication(ctx context.Context, logger logr.Logger, + clusterTriggerAuth *kedav1alpha1.ClusterTriggerAuthentication, namespacedName string) error { + return util.FinalizeAuthenticationResource(ctx, logger, r, clusterTriggerAuth, namespacedName) +} diff --git a/controllers/keda/scaledjob_controller.go b/controllers/keda/scaledjob_controller.go index ecaacc7c8d8..2b940607b10 100644 --- a/controllers/keda/scaledjob_controller.go +++ b/controllers/keda/scaledjob_controller.go @@ -58,14 +58,19 @@ type ScaledJobReconciler struct { scaleHandler scaling.ScaleHandler } +type scaledJobMetricsData struct { + namespace string + triggerTypes []string +} + var ( - scaledJobTriggers map[string][]string - scaledJobTriggersLock *sync.Mutex + scaledJobMetricsMap map[string]scaledJobMetricsData + scaledJobMetricsLock *sync.Mutex ) func init() { - scaledJobTriggers = make(map[string][]string) - scaledJobTriggersLock = &sync.Mutex{} + scaledJobMetricsMap = make(map[string]scaledJobMetricsData) + scaledJobMetricsLock = &sync.Mutex{} } // SetupWithManager initializes the ScaledJobReconciler instance and starts a new controller managed by the passed Manager instance. @@ -107,7 +112,7 @@ func (r *ScaledJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( if scaledJob.GetDeletionTimestamp() != nil { return ctrl.Result{}, r.finalizeScaledJob(ctx, reqLogger, scaledJob, req.NamespacedName.String()) } - r.updateTriggerTotals(reqLogger, scaledJob, req.NamespacedName.String()) + r.updateMetrics(scaledJob, req.NamespacedName.String()) // ensure finalizer is set on this CR if err := r.ensureFinalizer(ctx, reqLogger, scaledJob); err != nil { @@ -263,62 +268,42 @@ func (r *ScaledJobReconciler) stopScaleLoop(ctx context.Context, logger logr.Log return nil } -// scaledJobGenerationChanged returns true if ScaledJob's Generation was changed, ie. ScaledJob.Spec was changed -func (r *ScaledJobReconciler) scaledJobGenerationChanged(logger logr.Logger, scaledJob *kedav1alpha1.ScaledJob) (bool, error) { - key, err := cache.MetaNamespaceKeyFunc(scaledJob) - if err != nil { - logger.Error(err, "Error getting key for scaledJob") - return true, err - } - - value, loaded := r.scaledJobGenerations.Load(key) - if loaded { - generation := value.(int64) - if generation == scaledJob.Generation { - return false, nil - } - } - return true, nil -} - -func (r *ScaledJobReconciler) updateTriggerTotals(logger logr.Logger, scaledJob *kedav1alpha1.ScaledJob, namespacedName string) { - specChanged, err := r.scaledJobGenerationChanged(logger, scaledJob) - if err != nil { - logger.Error(err, "failed to update trigger totals") - return - } - - if !specChanged { - return - } +func (r *ScaledJobReconciler) updateMetrics(scaledJob *kedav1alpha1.ScaledJob, namespacedName string) { + scaledJobMetricsLock.Lock() + defer scaledJobMetricsLock.Unlock() - scaledJobTriggersLock.Lock() - defer scaledJobTriggersLock.Unlock() + metricsData, ok := scaledJobMetricsMap[namespacedName] - if triggerTypes, ok := scaledJobTriggers[namespacedName]; ok { - for _, triggerType := range triggerTypes { + if ok { + metrics.DecrementCRDTotal(metrics.ScaledJobResource, metricsData.namespace) + for _, triggerType := range metricsData.triggerTypes { metrics.DecrementTriggerTotal(triggerType) } } + metrics.IncrementCRDTotal(metrics.ScaledJobResource, scaledJob.Namespace) + metricsData.namespace = scaledJob.Namespace + triggerTypes := make([]string, len(scaledJob.Spec.Triggers)) for _, trigger := range scaledJob.Spec.Triggers { metrics.IncrementTriggerTotal(trigger.Type) triggerTypes = append(triggerTypes, trigger.Type) } + metricsData.triggerTypes = triggerTypes - scaledJobTriggers[namespacedName] = triggerTypes + scaledJobMetricsMap[namespacedName] = metricsData } -func (r *ScaledJobReconciler) updateTriggerTotalsOnDelete(namespacedName string) { - scaledJobTriggersLock.Lock() - defer scaledJobTriggersLock.Unlock() +func (r *ScaledJobReconciler) updateMetricsOnDelete(namespacedName string) { + scaledJobMetricsLock.Lock() + defer scaledJobMetricsLock.Unlock() - if triggerTypes, ok := scaledJobTriggers[namespacedName]; ok { - for _, triggerType := range triggerTypes { + if metricsData, ok := scaledJobMetricsMap[namespacedName]; ok { + metrics.DecrementCRDTotal(metrics.ScaledJobResource, metricsData.namespace) + for _, triggerType := range metricsData.triggerTypes { metrics.DecrementTriggerTotal(triggerType) } } - delete(scaledJobTriggers, namespacedName) + delete(scaledJobMetricsMap, namespacedName) } diff --git a/controllers/keda/scaledjob_finalizer.go b/controllers/keda/scaledjob_finalizer.go index faf63bb1f7c..21872f5ab98 100644 --- a/controllers/keda/scaledjob_finalizer.go +++ b/controllers/keda/scaledjob_finalizer.go @@ -50,7 +50,7 @@ func (r *ScaledJobReconciler) finalizeScaledJob(ctx context.Context, logger logr return err } - r.updateTriggerTotalsOnDelete(namespacedName) + r.updateMetricsOnDelete(namespacedName) } logger.Info("Successfully finalized ScaledJob") diff --git a/controllers/keda/scaledobject_controller.go b/controllers/keda/scaledobject_controller.go index 5ffbe9f19f1..f853f94419f 100644 --- a/controllers/keda/scaledobject_controller.go +++ b/controllers/keda/scaledobject_controller.go @@ -77,12 +77,17 @@ type ScaledObjectReconciler struct { kubeVersion kedautil.K8sVersion } +type scaledObjectMetricsData struct { + namespace string + triggerTypes []string +} + var ( // A cache mapping "resource.group" to true or false if we know if this resource is scalable. isScalableCache *sync.Map - scaledObjectTriggers map[string][]string - scaledObjectTriggersLock *sync.Mutex + scaledObjectMetricsMap map[string]scaledObjectMetricsData + scaledObjectMetricsLock *sync.Mutex ) func init() { @@ -91,8 +96,8 @@ func init() { isScalableCache.Store("deployments.apps", true) isScalableCache.Store("statefulsets.apps", true) - scaledObjectTriggers = make(map[string][]string) - scaledObjectTriggersLock = &sync.Mutex{} + scaledObjectMetricsMap = make(map[string]scaledObjectMetricsData) + scaledObjectMetricsLock = &sync.Mutex{} } // SetupWithManager initializes the ScaledObjectReconciler instance and starts a new controller managed by the passed Manager instance. @@ -177,7 +182,7 @@ func (r *ScaledObjectReconciler) Reconcile(ctx context.Context, req ctrl.Request if scaledObject.GetDeletionTimestamp() != nil { return ctrl.Result{}, r.finalizeScaledObject(ctx, reqLogger, scaledObject, req.NamespacedName.String()) } - r.updateTriggerTotals(reqLogger, scaledObject, req.NamespacedName.String()) + r.updateMetrics(scaledObject, req.NamespacedName.String()) // ensure finalizer is set on this CR if err := r.ensureFinalizer(ctx, reqLogger, scaledObject); err != nil { @@ -480,44 +485,42 @@ func (r *ScaledObjectReconciler) scaledObjectGenerationChanged(logger logr.Logge return true, nil } -func (r *ScaledObjectReconciler) updateTriggerTotals(logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject, namespacedName string) { - specChanged, err := r.scaledObjectGenerationChanged(logger, scaledObject) - if err != nil { - logger.Error(err, "failed to update trigger totals") - return - } +func (r *ScaledObjectReconciler) updateMetrics(scaledObject *kedav1alpha1.ScaledObject, namespacedName string) { + scaledObjectMetricsLock.Lock() + defer scaledObjectMetricsLock.Unlock() - if !specChanged { - return - } - - scaledObjectTriggersLock.Lock() - defer scaledObjectTriggersLock.Unlock() + metricsData, ok := scaledObjectMetricsMap[namespacedName] - if triggerTypes, ok := scaledObjectTriggers[namespacedName]; ok { - for _, triggerType := range triggerTypes { + if ok { + metrics.DecrementCRDTotal(metrics.ScaledObjectResource, metricsData.namespace) + for _, triggerType := range metricsData.triggerTypes { metrics.DecrementTriggerTotal(triggerType) } } + metrics.IncrementCRDTotal(metrics.ScaledObjectResource, scaledObject.Namespace) + metricsData.namespace = scaledObject.Namespace + triggerTypes := make([]string, len(scaledObject.Spec.Triggers)) for _, trigger := range scaledObject.Spec.Triggers { metrics.IncrementTriggerTotal(trigger.Type) triggerTypes = append(triggerTypes, trigger.Type) } + metricsData.triggerTypes = triggerTypes - scaledObjectTriggers[namespacedName] = triggerTypes + scaledObjectMetricsMap[namespacedName] = metricsData } -func (r *ScaledObjectReconciler) updateTriggerTotalsOnDelete(namespacedName string) { - scaledObjectTriggersLock.Lock() - defer scaledObjectTriggersLock.Unlock() +func (r *ScaledObjectReconciler) updateMetricsOnDelete(namespacedName string) { + scaledObjectMetricsLock.Lock() + defer scaledObjectMetricsLock.Unlock() - if triggerTypes, ok := scaledObjectTriggers[namespacedName]; ok { - for _, triggerType := range triggerTypes { + if metricsData, ok := scaledObjectMetricsMap[namespacedName]; ok { + metrics.DecrementCRDTotal(metrics.ScaledObjectResource, metricsData.namespace) + for _, triggerType := range metricsData.triggerTypes { metrics.DecrementTriggerTotal(triggerType) } } - delete(scaledObjectTriggers, namespacedName) + delete(scaledObjectMetricsMap, namespacedName) } diff --git a/controllers/keda/scaledobject_finalizer.go b/controllers/keda/scaledobject_finalizer.go index 77a74fdb314..24ee3563dcc 100644 --- a/controllers/keda/scaledobject_finalizer.go +++ b/controllers/keda/scaledobject_finalizer.go @@ -79,7 +79,7 @@ func (r *ScaledObjectReconciler) finalizeScaledObject(ctx context.Context, logge return err } - r.updateTriggerTotalsOnDelete(namespacedName) + r.updateMetricsOnDelete(namespacedName) } logger.Info("Successfully finalized ScaledObject") diff --git a/controllers/keda/triggerauthentication_controller.go b/controllers/keda/triggerauthentication_controller.go index 0b66e5780c0..b4d4f3d6e52 100644 --- a/controllers/keda/triggerauthentication_controller.go +++ b/controllers/keda/triggerauthentication_controller.go @@ -18,10 +18,10 @@ package keda import ( "context" + "sync" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" - "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/tools/record" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/builder" @@ -31,13 +31,27 @@ import ( kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1" "github.com/kedacore/keda/v2/pkg/eventreason" + "github.com/kedacore/keda/v2/pkg/metrics" ) // TriggerAuthenticationReconciler reconciles a TriggerAuthentication object type TriggerAuthenticationReconciler struct { client.Client - Scheme *runtime.Scheme - Recorder record.EventRecorder + record.EventRecorder +} + +type triggerAuthMetricsData struct { + namespace string +} + +var ( + triggerAuthMetricsMap map[string]triggerAuthMetricsData + triggerAuthMetricsLock *sync.Mutex +) + +func init() { + triggerAuthMetricsMap = make(map[string]triggerAuthMetricsData) + triggerAuthMetricsLock = &sync.Mutex{} } // +kubebuilder:rbac:groups=keda.sh,resources=triggerauthentications;triggerauthentications/status,verbs="*" @@ -52,17 +66,21 @@ func (r *TriggerAuthenticationReconciler) Reconcile(ctx context.Context, req ctr if errors.IsNotFound(err) { return ctrl.Result{}, nil } - reqLogger.Error(err, "Failed ot get TriggerAuthentication") + reqLogger.Error(err, "Failed to get TriggerAuthentication") return ctrl.Result{}, err } if triggerAuthentication.GetDeletionTimestamp() != nil { - r.Recorder.Event(triggerAuthentication, corev1.EventTypeNormal, eventreason.TriggerAuthenticationDeleted, "TriggerAuthentication was deleted") - return ctrl.Result{}, nil + return ctrl.Result{}, r.finalizeTriggerAuthentication(ctx, reqLogger, triggerAuthentication, req.NamespacedName.String()) } + if err := r.ensureFinalizer(ctx, reqLogger, triggerAuthentication); err != nil { + return ctrl.Result{}, err + } + r.updateMetrics(triggerAuthentication, req.NamespacedName.String()) + if triggerAuthentication.ObjectMeta.Generation == 1 { - r.Recorder.Event(triggerAuthentication, corev1.EventTypeNormal, eventreason.TriggerAuthenticationAdded, "New TriggerAuthentication configured") + r.EventRecorder.Event(triggerAuthentication, corev1.EventTypeNormal, eventreason.TriggerAuthenticationAdded, "New TriggerAuthentication configured") } return ctrl.Result{}, nil @@ -74,3 +92,27 @@ func (r *TriggerAuthenticationReconciler) SetupWithManager(mgr ctrl.Manager) err For(&kedav1alpha1.TriggerAuthentication{}, builder.WithPredicates(predicate.GenerationChangedPredicate{})). Complete(r) } + +func (r *TriggerAuthenticationReconciler) updateMetrics(triggerAuth *kedav1alpha1.TriggerAuthentication, namespacedName string) { + triggerAuthMetricsLock.Lock() + defer triggerAuthMetricsLock.Unlock() + + if metricsData, ok := triggerAuthMetricsMap[namespacedName]; ok { + metrics.DecrementCRDTotal(metrics.TriggerAuthenticationResource, metricsData.namespace) + } + + metrics.IncrementCRDTotal(metrics.TriggerAuthenticationResource, triggerAuth.Namespace) + triggerAuthMetricsMap[namespacedName] = triggerAuthMetricsData{namespace: triggerAuth.Namespace} +} + +// this method is idempotent, so it can be called multiple times without side-effects +func (r *TriggerAuthenticationReconciler) UpdateMetricsOnDelete(namespacedName string) { + triggerAuthMetricsLock.Lock() + defer triggerAuthMetricsLock.Unlock() + + if metricsData, ok := triggerAuthMetricsMap[namespacedName]; ok { + metrics.DecrementCRDTotal(metrics.TriggerAuthenticationResource, metricsData.namespace) + } + + delete(triggerAuthMetricsMap, namespacedName) +} diff --git a/controllers/keda/triggerauthentication_finalizer.go b/controllers/keda/triggerauthentication_finalizer.go new file mode 100644 index 00000000000..916f1e0d5d2 --- /dev/null +++ b/controllers/keda/triggerauthentication_finalizer.go @@ -0,0 +1,19 @@ +package keda + +import ( + "context" + + "github.com/go-logr/logr" + + kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1" + "github.com/kedacore/keda/v2/controllers/keda/util" +) + +func (r *TriggerAuthenticationReconciler) ensureFinalizer(ctx context.Context, logger logr.Logger, triggerAuth *kedav1alpha1.TriggerAuthentication) error { + return util.EnsureAuthenticationResourceFinalizer(ctx, logger, r, triggerAuth) +} + +func (r *TriggerAuthenticationReconciler) finalizeTriggerAuthentication(ctx context.Context, logger logr.Logger, + triggerAuth *kedav1alpha1.TriggerAuthentication, namespacedName string) error { + return util.FinalizeAuthenticationResource(ctx, logger, r, triggerAuth, namespacedName) +} diff --git a/controllers/keda/util/finalizer.go b/controllers/keda/util/finalizer.go new file mode 100644 index 00000000000..3873db4c7bb --- /dev/null +++ b/controllers/keda/util/finalizer.go @@ -0,0 +1,73 @@ +package util + +import ( + "context" + "fmt" + + "github.com/go-logr/logr" + corev1 "k8s.io/api/core/v1" + "k8s.io/client-go/tools/record" + "sigs.k8s.io/controller-runtime/pkg/client" + + kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1" + "github.com/kedacore/keda/v2/pkg/eventreason" +) + +const ( + authenticationFinalizer = "finalizer.keda.sh" +) + +type authenticationReconciler interface { + client.Client + record.EventRecorder + UpdateMetricsOnDelete(string) +} + +func EnsureAuthenticationResourceFinalizer(ctx context.Context, logger logr.Logger, reconciler authenticationReconciler, authResource client.Object) error { + var authResourceType string + switch authResource.(type) { + case *kedav1alpha1.TriggerAuthentication: + authResourceType = "TriggerAuthentication" + case *kedav1alpha1.ClusterTriggerAuthentication: + authResourceType = "ClusterTriggerAuthentication" + } + + if !Contains(authResource.GetFinalizers(), authenticationFinalizer) { + logger.Info(fmt.Sprintf("Adding Finalizer for the %s", authResourceType)) + authResource.SetFinalizers(append(authResource.GetFinalizers(), authenticationFinalizer)) + + // Update CR + err := reconciler.Update(ctx, authResource) + if err != nil { + logger.Error(err, fmt.Sprintf("Failed to update %s with a finalizer", authResourceType), "finalizer", authenticationFinalizer) + return err + } + } + return nil +} + +func FinalizeAuthenticationResource(ctx context.Context, logger logr.Logger, reconciler authenticationReconciler, authResource client.Object, namespacedName string) error { + var authResourceType, reason string + switch authResource.(type) { + case *kedav1alpha1.TriggerAuthentication: + authResourceType = "TriggerAuthentication" + reason = eventreason.TriggerAuthenticationDeleted + case *kedav1alpha1.ClusterTriggerAuthentication: + authResourceType = "ClusterTriggerAuthentication" + reason = eventreason.ClusterTriggerAuthenticationDeleted + } + + if Contains(authResource.GetFinalizers(), authenticationFinalizer) { + authResource.SetFinalizers(Remove(authResource.GetFinalizers(), authenticationFinalizer)) + if err := reconciler.Update(ctx, authResource); err != nil { + logger.Error(err, fmt.Sprintf("Failed to update %s after removing a finalizer", authResourceType), "finalizer", authenticationFinalizer) + return err + } + + reconciler.UpdateMetricsOnDelete(namespacedName) + } + + logger.Info(fmt.Sprintf("Successfully finalized %s", authResourceType)) + reconciler.Event(authResource, corev1.EventTypeNormal, reason, fmt.Sprintf("%s was deleted", authResourceType)) + return nil +} diff --git a/main.go b/main.go index 9e091e81324..2c547f1493c 100644 --- a/main.go +++ b/main.go @@ -169,17 +169,15 @@ func main() { os.Exit(1) } if err = (&kedacontrollers.TriggerAuthenticationReconciler{ - Client: mgr.GetClient(), - Scheme: mgr.GetScheme(), - Recorder: eventRecorder, + Client: mgr.GetClient(), + EventRecorder: eventRecorder, }).SetupWithManager(mgr); err != nil { setupLog.Error(err, "unable to create controller", "controller", "TriggerAuthentication") os.Exit(1) } if err = (&kedacontrollers.ClusterTriggerAuthenticationReconciler{ - Client: mgr.GetClient(), - Scheme: mgr.GetScheme(), - Recorder: eventRecorder, + Client: mgr.GetClient(), + EventRecorder: eventRecorder, }).SetupWithManager(mgr); err != nil { setupLog.Error(err, "unable to create controller", "controller", "ClusterTriggerAuthentication") os.Exit(1) diff --git a/pkg/metrics/operator_prom_metrics.go b/pkg/metrics/operator_prom_metrics.go index ede0c1f4e7e..f00fb5cc4fa 100644 --- a/pkg/metrics/operator_prom_metrics.go +++ b/pkg/metrics/operator_prom_metrics.go @@ -21,6 +21,13 @@ import ( "sigs.k8s.io/controller-runtime/pkg/metrics" ) +const ( + ClusterTriggerAuthenticationResource = "cluster_trigger_authentication" + TriggerAuthenticationResource = "trigger_authentication" + ScaledObjectResource = "scaled_object" + ScaledJobResource = "scaled_job" +) + var ( triggerTotalsGaugeVec = prometheus.NewGaugeVec( prometheus.GaugeOpts{ @@ -30,10 +37,20 @@ var ( }, []string{"type"}, ) + + crdTotalsGaugeVec = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: "keda_operator", + Subsystem: "resource", + Name: "totals", + }, + []string{"type", "namespace"}, + ) ) func init() { metrics.Registry.MustRegister(triggerTotalsGaugeVec) + metrics.Registry.MustRegister(crdTotalsGaugeVec) } func IncrementTriggerTotal(triggerType string) { @@ -47,3 +64,19 @@ func DecrementTriggerTotal(triggerType string) { triggerTotalsGaugeVec.WithLabelValues(triggerType).Dec() } } + +func IncrementCRDTotal(crdType, namespace string) { + if namespace == "" { + namespace = "default" + } + + crdTotalsGaugeVec.WithLabelValues(crdType, namespace).Inc() +} + +func DecrementCRDTotal(crdType, namespace string) { + if namespace == "" { + namespace = "default" + } + + crdTotalsGaugeVec.WithLabelValues(crdType, namespace).Dec() +} diff --git a/tests/clean-crds.sh b/tests/clean-crds.sh index bcaf925e09a..d092d1b077b 100755 --- a/tests/clean-crds.sh +++ b/tests/clean-crds.sh @@ -1,9 +1,9 @@ #! /bin/bash -echo "Cleaning up scaled objects and jobs before undeploying KEDA" +echo "Cleaning up CRDs before undeploying KEDA" while read -r namespace do - resources=$(kubectl get so,sj -n $namespace -o name) + resources=$(kubectl get so,sj,ta,cta -n $namespace -o name) if [[ -n "$resources" ]] then kubectl delete $resources -n $namespace diff --git a/tests/internals/prometheus_metrics/prometheus_metrics_test.go b/tests/internals/prometheus_metrics/prometheus_metrics_test.go index 3db46563a56..6168a9fe7df 100644 --- a/tests/internals/prometheus_metrics/prometheus_metrics_test.go +++ b/tests/internals/prometheus_metrics/prometheus_metrics_test.go @@ -15,6 +15,7 @@ import ( v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" + "github.com/kedacore/keda/v2/pkg/metrics" . "github.com/kedacore/keda/v2/tests/helper" ) @@ -33,6 +34,7 @@ var ( ) type templateData struct { + TestName string TestNamespace string DeploymentName string ScaledObjectName string @@ -176,6 +178,51 @@ spec: targetPort: 8080 selector: app: keda-operator +` + + authenticationTemplate = ` +apiVersion: v1 +kind: Secret +metadata: + name: {{.TestName}}-secret + namespace: {{.TestNamespace}} +type: Opaque +stringData: + key: value +--- +apiVersion: keda.sh/v1alpha1 +kind: TriggerAuthentication +metadata: + name: {{.TestName}}-ta1 + namespace: {{.TestNamespace}} +spec: + secretTargetRef: + - parameter: param + name: {{.TestName}}-secret + key: key +--- +apiVersion: keda.sh/v1alpha1 +kind: TriggerAuthentication +metadata: + name: {{.TestName}}-ta2 + namespace: {{.TestNamespace}} +spec: + secretTargetRef: + - parameter: param + name: {{.TestName}}-secret + key: key +--- +apiVersion: keda.sh/v1alpha1 +kind: ClusterTriggerAuthentication +metadata: + name: {{.TestName}}-cta + namespace: {{.TestNamespace}} +spec: + secretTargetRef: + - parameter: param + name: {{.TestName}}-secret + key: key +--- ` ) @@ -194,7 +241,7 @@ func TestScaler(t *testing.T) { "replica count should be 2 after 2 minute") testHPAScalerMetricValue(t) - testTriggerTotalMetric(t, kc, data) + testOperatorMetrics(t, kc, data) // cleanup DeleteKubernetesResources(t, kc, testNamespace, data, templates) @@ -202,6 +249,7 @@ func TestScaler(t *testing.T) { func getTemplateData() (templateData, []Template) { return templateData{ + TestName: testName, TestNamespace: testNamespace, DeploymentName: deploymentName, ScaledObjectName: scaledObjectName, @@ -215,6 +263,7 @@ func getTemplateData() (templateData, []Template) { {Name: "scaledObjectTemplate", Config: scaledObjectTemplate}, {Name: "clientTemplate", Config: clientTemplate}, {Name: "serviceTemplate", Config: serviceTemplate}, + {Name: "authenticatioNTemplate", Config: authenticationTemplate}, } } @@ -225,10 +274,10 @@ func fetchAndParsePrometheusMetrics(t *testing.T, cmd string) map[string]*promMo parser := expfmt.TextParser{} // Ensure EOL reader := strings.NewReader(strings.ReplaceAll(out, "\r\n", "\n")) - family, err := parser.TextToMetricFamilies(reader) + families, err := parser.TextToMetricFamilies(reader) assert.NoErrorf(t, err, "cannot parse metrics - %s", err) - return family + return families } func testHPAScalerMetricValue(t *testing.T) { @@ -254,29 +303,52 @@ func testHPAScalerMetricValue(t *testing.T) { } } -func testTriggerTotalMetric(t *testing.T, kc *kubernetes.Clientset, data templateData) { - t.Log("--- testing trigger total metric ---") - testTriggerTotalMetricValue(t, getTriggerTotalsManually(t, kc)) +func testOperatorMetrics(t *testing.T, kc *kubernetes.Clientset, data templateData) { + t.Log("--- testing operator metrics ---") + testOperatorMetricValues(t, kc) KubectlApplyWithTemplate(t, data, "cronScaledJobTemplate", cronScaledJobTemplate) - testTriggerTotalMetricValue(t, getTriggerTotalsManually(t, kc)) + testOperatorMetricValues(t, kc) KubectlDeleteWithTemplate(t, data, "cronScaledJobTemplate", cronScaledJobTemplate) - testTriggerTotalMetricValue(t, getTriggerTotalsManually(t, kc)) + testOperatorMetricValues(t, kc) } -func getTriggerTotalsManually(t *testing.T, kc *kubernetes.Clientset) map[string]int { +func getOperatorMetricsManually(t *testing.T, kc *kubernetes.Clientset) (map[string]int, map[string]map[string]int) { kedaKc := GetKedaKubernetesClient(t) triggerTotals := make(map[string]int) + crTotals := map[string]map[string]int{ + "scaled_object": {}, + "scaled_job": {}, + "trigger_authentication": {}, + "cluster_trigger_authentication": {}, + } namespaceList, err := kc.CoreV1().Namespaces().List(context.Background(), v1.ListOptions{}) assert.NoErrorf(t, err, "failed to list namespaces - %s", err) + clusterTriggerAuthenticationList, err := kedaKc.ClusterTriggerAuthentications().List(context.Background(), v1.ListOptions{}) + assert.NoErrorf(t, err, "failed to list clusterTriggerAuthentications with err - %s") + + for _, clusterTriggerAuth := range clusterTriggerAuthenticationList.Items { + namespace := clusterTriggerAuth.Namespace + if namespace == "" { + namespace = "default" + } + crTotals[metrics.ClusterTriggerAuthenticationResource][namespace]++ + } + for _, namespace := range namespaceList.Items { + namespaceName := namespace.Name + if namespace.Name == "" { + namespaceName = "default" + } + scaledObjectList, err := kedaKc.ScaledObjects(namespace.Name).List(context.Background(), v1.ListOptions{}) assert.NoErrorf(t, err, "failed to list scaledObjects in namespace - %s with err - %s", namespace.Name, err) + crTotals[metrics.ScaledObjectResource][namespaceName] = len(scaledObjectList.Items) for _, scaledObject := range scaledObjectList.Items { for _, trigger := range scaledObject.Spec.Triggers { triggerTotals[trigger.Type]++ @@ -286,34 +358,84 @@ func getTriggerTotalsManually(t *testing.T, kc *kubernetes.Clientset) map[string scaledJobList, err := kedaKc.ScaledJobs(namespace.Name).List(context.Background(), v1.ListOptions{}) assert.NoErrorf(t, err, "failed to list scaledJobs in namespace - %s with err - %s", namespace.Name, err) + crTotals[metrics.ScaledJobResource][namespaceName] = len(scaledJobList.Items) for _, scaledJob := range scaledJobList.Items { for _, trigger := range scaledJob.Spec.Triggers { triggerTotals[trigger.Type]++ } } + + triggerAuthList, err := kedaKc.TriggerAuthentications(namespace.Name).List(context.Background(), v1.ListOptions{}) + assert.NoErrorf(t, err, "failed to list triggerAuthentications in namespace - %s with err - %s", namespace.Name, err) + + crTotals[metrics.TriggerAuthenticationResource][namespaceName] = len(triggerAuthList.Items) } - return triggerTotals + return triggerTotals, crTotals } -func testTriggerTotalMetricValue(t *testing.T, expected map[string]int) { - family := fetchAndParsePrometheusMetrics(t, fmt.Sprintf("curl --insecure http://%s.keda:8080/metrics", serviceName)) +func testOperatorMetricValues(t *testing.T, kc *kubernetes.Clientset) { + families := fetchAndParsePrometheusMetrics(t, fmt.Sprintf("curl --insecure http://%s.keda:8080/metrics", serviceName)) + expectedTriggerTotals, expectedCrTotals := getOperatorMetricsManually(t, kc) - if val, ok := family["keda_operator_trigger_totals"]; ok { - var found bool - metrics := val.GetMetric() - for _, metric := range metrics { - labels := metric.GetLabel() - for _, label := range labels { - if *label.Name == "type" { - assert.Equalf(t, float64(expected[*label.Value]), *metric.Gauge.Value, "expected %f got %f for type %s", - float64(expected[*label.Value]), *metric.Gauge.Value, *label.Value) - found = true - } + checkTriggerTotalValues(t, families, expectedTriggerTotals) + checkCRTotalValues(t, families, expectedCrTotals) +} + +func checkTriggerTotalValues(t *testing.T, families map[string]*promModel.MetricFamily, expected map[string]int) { + t.Log("--- testing trigger total metrics ---") + + family, ok := families["keda_operator_trigger_totals"] + if !ok { + t.Errorf("metric not available") + return + } + + metrics := family.GetMetric() + for _, metric := range metrics { + labels := metric.GetLabel() + for _, label := range labels { + if *label.Name == "type" { + triggerType := *label.Value + metricValue := *metric.Gauge.Value + expectedMetricValue := float64(expected[triggerType]) + + assert.Equalf(t, expectedMetricValue, metricValue, "expected %f got %f for trigger type %s", + expectedMetricValue, metricValue, triggerType) + + delete(expected, triggerType) } } - assert.Equal(t, true, found) - } else { + } + + assert.Equal(t, 0, len(expected)) +} + +func checkCRTotalValues(t *testing.T, families map[string]*promModel.MetricFamily, expected map[string]map[string]int) { + t.Log("--- testing resource total metrics ---") + + family, ok := families["keda_operator_resource_totals"] + if !ok { t.Errorf("metric not available") + return + } + + metrics := family.GetMetric() + for _, metric := range metrics { + labels := metric.GetLabel() + var namespace, crType string + for _, label := range labels { + if *label.Name == "type" { + crType = *label.Value + } else if *label.Name == "namespace" { + namespace = *label.Value + } + } + + metricValue := *metric.Gauge.Value + expectedMetricValue := float64(expected[crType][namespace]) + + assert.Equalf(t, expectedMetricValue, metricValue, "expected %f got %f for cr type %s & namespace %s", + expectedMetricValue, metricValue, crType, namespace) } }