Skip to content

Commit

Permalink
Merge branch 'main' into annotation
Browse files Browse the repository at this point in the history
  • Loading branch information
keegancwinchester authored Nov 7, 2022
2 parents ee28d9b + bcc85ec commit 5972bc2
Show file tree
Hide file tree
Showing 17 changed files with 477 additions and 124 deletions.
4 changes: 3 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,10 @@ To learn more about active deprecations, we recommend checking [GitHub Discussio

### New

- **General**: Expand Prometheus metric with label "ScalerName" to distinguish different triggers. The scaleName is defined per Trigger.Name ([#3588](https://github.com/kedacore/keda/issues/3588)
- **General**: Expand Prometheus metric with label "ScalerName" to distinguish different triggers. The scaleName is defined per Trigger.Name ([#3588](https://github.com/kedacore/keda/issues/3588))
- **General:** Introduce new Loki Scaler ([#3699](https://github.com/kedacore/keda/issues/3699))
- **General**: Add ratelimitting parameters to keda manager to allow override of client defaults ([#3730](https://github.com/kedacore/keda/issues/2920))
- **General**: Provide Prometheus metric with indication of total number of custom resources per namespace for each custom resource type (CRD). ([#2637](https://github.com/kedacore/keda/issues/2637)|[#2638](https://github.com/kedacore/keda/issues/2638)|[#2639](https://github.com/kedacore/keda/issues/2639))
- **General**: Provide Prometheus metric with indication of total number of triggers per trigger type in `ScaledJob`/`ScaledObject`. ([#3663](https://github.com/kedacore/keda/issues/3663))
- **AWS Scalers**: Add setting AWS endpoint url. ([#3337](https://github.com/kedacore/keda/issues/3337))
- **Azure Service Bus Scaler**: Add support for Shared Access Signature (SAS) tokens for authentication. ([#2920](https://github.com/kedacore/keda/issues/2920))
Expand All @@ -59,6 +60,7 @@ To learn more about active deprecations, we recommend checking [GitHub Discussio
- **GCP Storage Scaler:** Add prefix and delimiter support ([#3756](https://github.com/kedacore/keda/issues/3756))
- **Prometheus Scaler:** Introduce skipping of certificate check for unsigned certs ([#2310](https://github.com/kedacore/keda/issues/2310))
- **Event Hubs Scaler:** Support Azure Active Direcotry Pod & Workload Identity for Storage Blobs ([#3569](https://github.com/kedacore/keda/issues/3569))
- **Metrics API Scaler:** Add unsafeSsl paramater to skip certificate validation when connecting over HTTPS ([#3728](https://github.com/kedacore/keda/discussions/3728))

### Fixes

Expand Down
56 changes: 49 additions & 7 deletions controllers/keda/clustertriggerauthentication_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ package keda

import (
"context"
"sync"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/tools/record"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
Expand All @@ -31,13 +31,27 @@ import (

kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1"
"github.com/kedacore/keda/v2/pkg/eventreason"
"github.com/kedacore/keda/v2/pkg/metrics"
)

// ClusterTriggerAuthenticationReconciler reconciles a ClusterTriggerAuthentication object
type ClusterTriggerAuthenticationReconciler struct {
client.Client
Scheme *runtime.Scheme
Recorder record.EventRecorder
record.EventRecorder
}

type clusterTriggerAuthMetricsData struct {
namespace string
}

var (
clusterTriggerAuthMetricsMap map[string]clusterTriggerAuthMetricsData
clusterTriggerAuthMetricsLock *sync.Mutex
)

func init() {
clusterTriggerAuthMetricsMap = make(map[string]clusterTriggerAuthMetricsData)
clusterTriggerAuthMetricsLock = &sync.Mutex{}
}

// +kubebuilder:rbac:groups=keda.sh,resources=clustertriggerauthentications;clustertriggerauthentications/status,verbs="*"
Expand All @@ -52,17 +66,21 @@ func (r *ClusterTriggerAuthenticationReconciler) Reconcile(ctx context.Context,
if errors.IsNotFound(err) {
return ctrl.Result{}, nil
}
reqLogger.Error(err, "Failed ot get ClusterTriggerAuthentication")
reqLogger.Error(err, "Failed to get ClusterTriggerAuthentication")
return ctrl.Result{}, err
}

if clusterTriggerAuthentication.GetDeletionTimestamp() != nil {
r.Recorder.Event(clusterTriggerAuthentication, corev1.EventTypeNormal, eventreason.ClusterTriggerAuthenticationDeleted, "ClusterTriggerAuthentication was deleted")
return ctrl.Result{}, nil
return ctrl.Result{}, r.finalizeClusterTriggerAuthentication(ctx, reqLogger, clusterTriggerAuthentication, req.NamespacedName.String())
}

if err := r.ensureFinalizer(ctx, reqLogger, clusterTriggerAuthentication); err != nil {
return ctrl.Result{}, err
}
r.updateMetrics(clusterTriggerAuthentication, req.NamespacedName.String())

if clusterTriggerAuthentication.ObjectMeta.Generation == 1 {
r.Recorder.Event(clusterTriggerAuthentication, corev1.EventTypeNormal, eventreason.ClusterTriggerAuthenticationAdded, "New ClusterTriggerAuthentication configured")
r.EventRecorder.Event(clusterTriggerAuthentication, corev1.EventTypeNormal, eventreason.ClusterTriggerAuthenticationAdded, "New ClusterTriggerAuthentication configured")
}
return ctrl.Result{}, nil
}
Expand All @@ -73,3 +91,27 @@ func (r *ClusterTriggerAuthenticationReconciler) SetupWithManager(mgr ctrl.Manag
For(&kedav1alpha1.ClusterTriggerAuthentication{}, builder.WithPredicates(predicate.GenerationChangedPredicate{})).
Complete(r)
}

func (r *ClusterTriggerAuthenticationReconciler) updateMetrics(clusterTriggerAuth *kedav1alpha1.ClusterTriggerAuthentication, namespacedName string) {
clusterTriggerAuthMetricsLock.Lock()
defer clusterTriggerAuthMetricsLock.Unlock()

if metricsData, ok := clusterTriggerAuthMetricsMap[namespacedName]; ok {
metrics.DecrementCRDTotal(metrics.ClusterTriggerAuthenticationResource, metricsData.namespace)
}

metrics.IncrementCRDTotal(metrics.ClusterTriggerAuthenticationResource, clusterTriggerAuth.Namespace)
clusterTriggerAuthMetricsMap[namespacedName] = clusterTriggerAuthMetricsData{namespace: clusterTriggerAuth.Namespace}
}

// this method is idempotent, so it can be called multiple times without side-effects
func (r *ClusterTriggerAuthenticationReconciler) UpdateMetricsOnDelete(namespacedName string) {
clusterTriggerAuthMetricsLock.Lock()
defer clusterTriggerAuthMetricsLock.Unlock()

if metricsData, ok := clusterTriggerAuthMetricsMap[namespacedName]; ok {
metrics.DecrementCRDTotal(metrics.ClusterTriggerAuthenticationResource, metricsData.namespace)
}

delete(clusterTriggerAuthMetricsMap, namespacedName)
}
19 changes: 19 additions & 0 deletions controllers/keda/clustertriggerauthentication_finalizer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package keda

import (
"context"

"github.com/go-logr/logr"

kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1"
"github.com/kedacore/keda/v2/controllers/keda/util"
)

func (r *ClusterTriggerAuthenticationReconciler) ensureFinalizer(ctx context.Context, logger logr.Logger, clusterTriggerAuth *kedav1alpha1.ClusterTriggerAuthentication) error {
return util.EnsureAuthenticationResourceFinalizer(ctx, logger, r, clusterTriggerAuth)
}

func (r *ClusterTriggerAuthenticationReconciler) finalizeClusterTriggerAuthentication(ctx context.Context, logger logr.Logger,
clusterTriggerAuth *kedav1alpha1.ClusterTriggerAuthentication, namespacedName string) error {
return util.FinalizeAuthenticationResource(ctx, logger, r, clusterTriggerAuth, namespacedName)
}
73 changes: 29 additions & 44 deletions controllers/keda/scaledjob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,14 +58,19 @@ type ScaledJobReconciler struct {
scaleHandler scaling.ScaleHandler
}

type scaledJobMetricsData struct {
namespace string
triggerTypes []string
}

var (
scaledJobTriggers map[string][]string
scaledJobTriggersLock *sync.Mutex
scaledJobMetricsMap map[string]scaledJobMetricsData
scaledJobMetricsLock *sync.Mutex
)

func init() {
scaledJobTriggers = make(map[string][]string)
scaledJobTriggersLock = &sync.Mutex{}
scaledJobMetricsMap = make(map[string]scaledJobMetricsData)
scaledJobMetricsLock = &sync.Mutex{}
}

// SetupWithManager initializes the ScaledJobReconciler instance and starts a new controller managed by the passed Manager instance.
Expand Down Expand Up @@ -111,7 +116,7 @@ func (r *ScaledJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
if scaledJob.GetDeletionTimestamp() != nil {
return ctrl.Result{}, r.finalizeScaledJob(ctx, reqLogger, scaledJob, req.NamespacedName.String())
}
r.updateTriggerTotals(reqLogger, scaledJob, req.NamespacedName.String())
r.updateMetrics(scaledJob, req.NamespacedName.String())

// ensure finalizer is set on this CR
if err := r.ensureFinalizer(ctx, reqLogger, scaledJob); err != nil {
Expand Down Expand Up @@ -272,64 +277,44 @@ func (r *ScaledJobReconciler) stopScaleLoop(ctx context.Context, logger logr.Log
return nil
}

// scaledJobGenerationChanged returns true if ScaledJob's Generation was changed, ie. ScaledJob.Spec was changed
func (r *ScaledJobReconciler) scaledJobGenerationChanged(logger logr.Logger, scaledJob *kedav1alpha1.ScaledJob) (bool, error) {
key, err := cache.MetaNamespaceKeyFunc(scaledJob)
if err != nil {
logger.Error(err, "Error getting key for scaledJob")
return true, err
}

value, loaded := r.scaledJobGenerations.Load(key)
if loaded {
generation := value.(int64)
if generation == scaledJob.Generation {
return false, nil
}
}
return true, nil
}

func (r *ScaledJobReconciler) updateTriggerTotals(logger logr.Logger, scaledJob *kedav1alpha1.ScaledJob, namespacedName string) {
specChanged, err := r.scaledJobGenerationChanged(logger, scaledJob)
if err != nil {
logger.Error(err, "failed to update trigger totals")
return
}

if !specChanged {
return
}
func (r *ScaledJobReconciler) updateMetrics(scaledJob *kedav1alpha1.ScaledJob, namespacedName string) {
scaledJobMetricsLock.Lock()
defer scaledJobMetricsLock.Unlock()

scaledJobTriggersLock.Lock()
defer scaledJobTriggersLock.Unlock()
metricsData, ok := scaledJobMetricsMap[namespacedName]

if triggerTypes, ok := scaledJobTriggers[namespacedName]; ok {
for _, triggerType := range triggerTypes {
if ok {
metrics.DecrementCRDTotal(metrics.ScaledJobResource, metricsData.namespace)
for _, triggerType := range metricsData.triggerTypes {
metrics.DecrementTriggerTotal(triggerType)
}
}

metrics.IncrementCRDTotal(metrics.ScaledJobResource, scaledJob.Namespace)
metricsData.namespace = scaledJob.Namespace

triggerTypes := make([]string, len(scaledJob.Spec.Triggers))
for _, trigger := range scaledJob.Spec.Triggers {
metrics.IncrementTriggerTotal(trigger.Type)
triggerTypes = append(triggerTypes, trigger.Type)
}
metricsData.triggerTypes = triggerTypes

scaledJobTriggers[namespacedName] = triggerTypes
scaledJobMetricsMap[namespacedName] = metricsData
}

func (r *ScaledJobReconciler) updateTriggerTotalsOnDelete(namespacedName string) {
scaledJobTriggersLock.Lock()
defer scaledJobTriggersLock.Unlock()
func (r *ScaledJobReconciler) updateMetricsOnDelete(namespacedName string) {
scaledJobMetricsLock.Lock()
defer scaledJobMetricsLock.Unlock()

if triggerTypes, ok := scaledJobTriggers[namespacedName]; ok {
for _, triggerType := range triggerTypes {
if metricsData, ok := scaledJobMetricsMap[namespacedName]; ok {
metrics.DecrementCRDTotal(metrics.ScaledJobResource, metricsData.namespace)
for _, triggerType := range metricsData.triggerTypes {
metrics.DecrementTriggerTotal(triggerType)
}
}

delete(scaledJobTriggers, namespacedName)
delete(scaledJobMetricsMap, namespacedName)
}
//stopScaleLoop when scaledJob.Status.IsPaused constant is set
func (r *ScaledJobReconciler) Pause(ctx context.Context, logger logr.Logger, scaledJob *kedav1alpha1.ScaledJob) error {
Expand Down
2 changes: 1 addition & 1 deletion controllers/keda/scaledjob_finalizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func (r *ScaledJobReconciler) finalizeScaledJob(ctx context.Context, logger logr
return err
}

r.updateTriggerTotalsOnDelete(namespacedName)
r.updateMetricsOnDelete(namespacedName)
}

logger.Info("Successfully finalized ScaledJob")
Expand Down
55 changes: 29 additions & 26 deletions controllers/keda/scaledobject_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,12 +77,17 @@ type ScaledObjectReconciler struct {
kubeVersion kedautil.K8sVersion
}

type scaledObjectMetricsData struct {
namespace string
triggerTypes []string
}

var (
// A cache mapping "resource.group" to true or false if we know if this resource is scalable.
isScalableCache *sync.Map

scaledObjectTriggers map[string][]string
scaledObjectTriggersLock *sync.Mutex
scaledObjectMetricsMap map[string]scaledObjectMetricsData
scaledObjectMetricsLock *sync.Mutex
)

func init() {
Expand All @@ -91,8 +96,8 @@ func init() {
isScalableCache.Store("deployments.apps", true)
isScalableCache.Store("statefulsets.apps", true)

scaledObjectTriggers = make(map[string][]string)
scaledObjectTriggersLock = &sync.Mutex{}
scaledObjectMetricsMap = make(map[string]scaledObjectMetricsData)
scaledObjectMetricsLock = &sync.Mutex{}
}

// SetupWithManager initializes the ScaledObjectReconciler instance and starts a new controller managed by the passed Manager instance.
Expand Down Expand Up @@ -177,7 +182,7 @@ func (r *ScaledObjectReconciler) Reconcile(ctx context.Context, req ctrl.Request
if scaledObject.GetDeletionTimestamp() != nil {
return ctrl.Result{}, r.finalizeScaledObject(ctx, reqLogger, scaledObject, req.NamespacedName.String())
}
r.updateTriggerTotals(reqLogger, scaledObject, req.NamespacedName.String())
r.updateMetrics(scaledObject, req.NamespacedName.String())

// ensure finalizer is set on this CR
if err := r.ensureFinalizer(ctx, reqLogger, scaledObject); err != nil {
Expand Down Expand Up @@ -480,44 +485,42 @@ func (r *ScaledObjectReconciler) scaledObjectGenerationChanged(logger logr.Logge
return true, nil
}

func (r *ScaledObjectReconciler) updateTriggerTotals(logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject, namespacedName string) {
specChanged, err := r.scaledObjectGenerationChanged(logger, scaledObject)
if err != nil {
logger.Error(err, "failed to update trigger totals")
return
}
func (r *ScaledObjectReconciler) updateMetrics(scaledObject *kedav1alpha1.ScaledObject, namespacedName string) {
scaledObjectMetricsLock.Lock()
defer scaledObjectMetricsLock.Unlock()

if !specChanged {
return
}

scaledObjectTriggersLock.Lock()
defer scaledObjectTriggersLock.Unlock()
metricsData, ok := scaledObjectMetricsMap[namespacedName]

if triggerTypes, ok := scaledObjectTriggers[namespacedName]; ok {
for _, triggerType := range triggerTypes {
if ok {
metrics.DecrementCRDTotal(metrics.ScaledObjectResource, metricsData.namespace)
for _, triggerType := range metricsData.triggerTypes {
metrics.DecrementTriggerTotal(triggerType)
}
}

metrics.IncrementCRDTotal(metrics.ScaledObjectResource, scaledObject.Namespace)
metricsData.namespace = scaledObject.Namespace

triggerTypes := make([]string, len(scaledObject.Spec.Triggers))
for _, trigger := range scaledObject.Spec.Triggers {
metrics.IncrementTriggerTotal(trigger.Type)
triggerTypes = append(triggerTypes, trigger.Type)
}
metricsData.triggerTypes = triggerTypes

scaledObjectTriggers[namespacedName] = triggerTypes
scaledObjectMetricsMap[namespacedName] = metricsData
}

func (r *ScaledObjectReconciler) updateTriggerTotalsOnDelete(namespacedName string) {
scaledObjectTriggersLock.Lock()
defer scaledObjectTriggersLock.Unlock()
func (r *ScaledObjectReconciler) updateMetricsOnDelete(namespacedName string) {
scaledObjectMetricsLock.Lock()
defer scaledObjectMetricsLock.Unlock()

if triggerTypes, ok := scaledObjectTriggers[namespacedName]; ok {
for _, triggerType := range triggerTypes {
if metricsData, ok := scaledObjectMetricsMap[namespacedName]; ok {
metrics.DecrementCRDTotal(metrics.ScaledObjectResource, metricsData.namespace)
for _, triggerType := range metricsData.triggerTypes {
metrics.DecrementTriggerTotal(triggerType)
}
}

delete(scaledObjectTriggers, namespacedName)
delete(scaledObjectMetricsMap, namespacedName)
}
2 changes: 1 addition & 1 deletion controllers/keda/scaledobject_finalizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func (r *ScaledObjectReconciler) finalizeScaledObject(ctx context.Context, logge
return err
}

r.updateTriggerTotalsOnDelete(namespacedName)
r.updateMetricsOnDelete(namespacedName)
}

logger.Info("Successfully finalized ScaledObject")
Expand Down
Loading

0 comments on commit 5972bc2

Please sign in to comment.