diff --git a/CHANGELOG.md b/CHANGELOG.md index 8083fdb0928..22b19922fb3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -24,12 +24,13 @@ - Extend Azure Monitor scaler to support custom metrics ([#1883](https://github.com/kedacore/keda/pull/1883)) - Support non-public cloud environments in the Azure Storage Queue and Azure Storage Blob scalers ([#1863](https://github.com/kedacore/keda/pull/1863)) - Show HashiCorp Vault Address when using `kubectl get ta` or `kubectl get cta` ([#1862](https://github.com/kedacore/keda/pull/1862)) +- Add fallback functionality ([#1872](https://github.com/kedacore/keda/issues/1872)) ### Improvements - Fix READY and ACTIVE fields of ScaledJob to show status when we run `kubectl get sj` ([#1855](https://github.com/kedacore/keda/pull/1855)) - Don't panic when HashiCorp Vault path doesn't exist ([#1864](https://github.com/kedacore/keda/pull/1864)) - +- Allow influxdb `authToken`, `serverURL`, and `organizationName` to be sourced from `(Cluster)TriggerAuthentication` ([#1904](https://github.com/kedacore/keda/pull/1904)) ### Breaking Changes - TODO ([#XXX](https://github.com/kedacore/keda/pull/XXX)) diff --git a/api/v1alpha1/scaledobject_types.go b/api/v1alpha1/scaledobject_types.go index b55d543cd90..e3b28bdeb9b 100644 --- a/api/v1alpha1/scaledobject_types.go +++ b/api/v1alpha1/scaledobject_types.go @@ -29,6 +29,25 @@ type ScaledObject struct { Status ScaledObjectStatus `json:"status,omitempty"` } +// HealthStatus is the status for a ScaledObject's health +type HealthStatus struct { + // +optional + NumberOfFailures *int32 `json:"numberOfFailures,omitempty"` + // +optional + Status HealthStatusType `json:"status,omitempty"` +} + +// HealthStatusType is an indication of whether the health status is happy or failing +type HealthStatusType string + +const ( + // HealthStatusHappy means the status of the health object is happy + HealthStatusHappy HealthStatusType = "Happy" + + // HealthStatusFailing means the status of the health object is failing + HealthStatusFailing HealthStatusType = "Failing" +) + // ScaledObjectSpec is the spec for a ScaledObject resource type ScaledObjectSpec struct { ScaleTargetRef *ScaleTarget `json:"scaleTargetRef"` @@ -44,6 +63,14 @@ type ScaledObjectSpec struct { Advanced *AdvancedConfig `json:"advanced,omitempty"` Triggers []ScaleTriggers `json:"triggers"` + // +optional + Fallback *Fallback `json:"fallback,omitempty"` +} + +// Fallback is the spec for fallback options +type Fallback struct { + FailureThreshold int32 `json:"failureThreshold"` + Replicas int32 `json:"replicas"` } // AdvancedConfig specifies advance scaling options @@ -79,8 +106,12 @@ type ScaleTriggers struct { Metadata map[string]string `json:"metadata"` // +optional AuthenticationRef *ScaledObjectAuthRef `json:"authenticationRef,omitempty"` + // +optional + FallbackReplicas *int32 `json:"fallback,omitempty"` } +// +k8s:openapi-gen=true + // ScaledObjectStatus is the status for a ScaledObject resource // +optional type ScaledObjectStatus struct { @@ -98,6 +129,8 @@ type ScaledObjectStatus struct { ResourceMetricNames []string `json:"resourceMetricNames,omitempty"` // +optional Conditions Conditions `json:"conditions,omitempty"` + // +optional + Health map[string]HealthStatus `json:"health,omitempty"` } // +kubebuilder:object:root=true diff --git a/api/v1alpha1/zz_generated.deepcopy.go b/api/v1alpha1/zz_generated.deepcopy.go index b5afe3ee424..3ec46460583 100644 --- a/api/v1alpha1/zz_generated.deepcopy.go +++ b/api/v1alpha1/zz_generated.deepcopy.go @@ -198,6 +198,21 @@ func (in *Credential) DeepCopy() *Credential { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *Fallback) DeepCopyInto(out *Fallback) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Fallback. +func (in *Fallback) DeepCopy() *Fallback { + if in == nil { + return nil + } + out := new(Fallback) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *GroupVersionKindResource) DeepCopyInto(out *GroupVersionKindResource) { *out = *in @@ -238,6 +253,26 @@ func (in *HashiCorpVault) DeepCopy() *HashiCorpVault { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *HealthStatus) DeepCopyInto(out *HealthStatus) { + *out = *in + if in.NumberOfFailures != nil { + in, out := &in.NumberOfFailures, &out.NumberOfFailures + *out = new(int32) + **out = **in + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new HealthStatus. +func (in *HealthStatus) DeepCopy() *HealthStatus { + if in == nil { + return nil + } + out := new(HealthStatus) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *HorizontalPodAutoscalerConfig) DeepCopyInto(out *HorizontalPodAutoscalerConfig) { *out = *in @@ -288,6 +323,11 @@ func (in *ScaleTriggers) DeepCopyInto(out *ScaleTriggers) { *out = new(ScaledObjectAuthRef) **out = **in } + if in.FallbackReplicas != nil { + in, out := &in.FallbackReplicas, &out.FallbackReplicas + *out = new(int32) + **out = **in + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ScaleTriggers. @@ -545,6 +585,11 @@ func (in *ScaledObjectSpec) DeepCopyInto(out *ScaledObjectSpec) { (*in)[i].DeepCopyInto(&(*out)[i]) } } + if in.Fallback != nil { + in, out := &in.Fallback, &out.Fallback + *out = new(Fallback) + **out = **in + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ScaledObjectSpec. @@ -589,6 +634,13 @@ func (in *ScaledObjectStatus) DeepCopyInto(out *ScaledObjectStatus) { *out = make(Conditions, len(*in)) copy(*out, *in) } + if in.Health != nil { + in, out := &in.Health, &out.Health + *out = make(map[string]HealthStatus, len(*in)) + for key, val := range *in { + (*out)[key] = *val.DeepCopy() + } + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ScaledObjectStatus. diff --git a/config/crd/bases/keda.sh_scaledjobs.yaml b/config/crd/bases/keda.sh_scaledjobs.yaml index f5f23014ecf..9dd16b5dac8 100644 --- a/config/crd/bases/keda.sh_scaledjobs.yaml +++ b/config/crd/bases/keda.sh_scaledjobs.yaml @@ -6760,6 +6760,9 @@ spec: required: - name type: object + fallback: + format: int32 + type: integer metadata: additionalProperties: type: string diff --git a/config/crd/bases/keda.sh_scaledobjects.yaml b/config/crd/bases/keda.sh_scaledobjects.yaml index 7fc61cf1bf7..4321666d5e5 100644 --- a/config/crd/bases/keda.sh_scaledobjects.yaml +++ b/config/crd/bases/keda.sh_scaledobjects.yaml @@ -200,6 +200,19 @@ spec: cooldownPeriod: format: int32 type: integer + fallback: + description: Fallback is the spec for fallback options + properties: + failureThreshold: + format: int32 + type: integer + replicas: + format: int32 + type: integer + required: + - failureThreshold + - replicas + type: object maxReplicaCount: format: int32 type: integer @@ -242,6 +255,9 @@ spec: required: - name type: object + fallback: + format: int32 + type: integer metadata: additionalProperties: type: string @@ -290,6 +306,19 @@ spec: items: type: string type: array + health: + additionalProperties: + description: HealthStatus is the status for a ScaledObject's health + properties: + numberOfFailures: + format: int32 + type: integer + status: + description: HealthStatusType is an indication of whether the + health status is happy or failing + type: string + type: object + type: object lastActiveTime: format: date-time type: string diff --git a/controllers/hpa.go b/controllers/hpa.go index 13b1d545e6f..550709b628b 100644 --- a/controllers/hpa.go +++ b/controllers/hpa.go @@ -184,6 +184,9 @@ func (r *ScaledObjectReconciler) getScaledObjectMetricSpecs(logger logr.Logger, status := scaledObject.Status.DeepCopy() status.ExternalMetricNames = externalMetricNames status.ResourceMetricNames = resourceMetricNames + + updateHealthStatus(scaledObject, externalMetricNames, status) + err = kedacontrollerutil.UpdateScaledObjectStatus(r.Client, logger, scaledObject, status) if err != nil { logger.Error(err, "Error updating scaledObject status with used externalMetricNames") @@ -193,6 +196,18 @@ func (r *ScaledObjectReconciler) getScaledObjectMetricSpecs(logger logr.Logger, return scaledObjectMetricSpecs, nil } +func updateHealthStatus(scaledObject *kedav1alpha1.ScaledObject, externalMetricNames []string, status *kedav1alpha1.ScaledObjectStatus) { + health := scaledObject.Status.Health + newHealth := make(map[string]kedav1alpha1.HealthStatus) + for _, metricName := range externalMetricNames { + entry, exists := health[metricName] + if exists { + newHealth[metricName] = entry + } + } + status.Health = newHealth +} + // checkMinK8sVersionforHPABehavior min version (k8s v1.18) for HPA Behavior func (r *ScaledObjectReconciler) checkMinK8sVersionforHPABehavior(logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject) { if r.kubeVersion.MinorVersion < 18 { diff --git a/controllers/hpa_test.go b/controllers/hpa_test.go new file mode 100644 index 00000000000..fe796200038 --- /dev/null +++ b/controllers/hpa_test.go @@ -0,0 +1,127 @@ +package controllers + +import ( + "github.com/go-logr/logr" + "github.com/golang/mock/gomock" + "github.com/kedacore/keda/v2/api/v1alpha1" + "github.com/kedacore/keda/v2/pkg/mock/mock_client" + mock_scalers "github.com/kedacore/keda/v2/pkg/mock/mock_scaler" + "github.com/kedacore/keda/v2/pkg/mock/mock_scaling" + kedascalers "github.com/kedacore/keda/v2/pkg/scalers" + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + "k8s.io/api/autoscaling/v2beta2" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +var _ = Describe("hpa", func() { + var ( + reconciler ScaledObjectReconciler + scaleHandler *mock_scaling.MockScaleHandler + client *mock_client.MockClient + statusWriter *mock_client.MockStatusWriter + scaler *mock_scalers.MockScaler + logger logr.Logger + ctrl *gomock.Controller + ) + + BeforeEach(func() { + ctrl = gomock.NewController(GinkgoT()) + client = mock_client.NewMockClient(ctrl) + scaleHandler = mock_scaling.NewMockScaleHandler(ctrl) + scaler = mock_scalers.NewMockScaler(ctrl) + statusWriter = mock_client.NewMockStatusWriter(ctrl) + logger = logr.DiscardLogger{} + reconciler = ScaledObjectReconciler{ + Client: client, + scaleHandler: scaleHandler, + } + }) + + AfterEach(func() { + ctrl.Finish() + }) + + It("should remove deleted metric from health status", func() { + numberOfFailures := int32(87) + health := make(map[string]v1alpha1.HealthStatus) + health["another metric name"] = v1alpha1.HealthStatus{ + NumberOfFailures: &numberOfFailures, + Status: v1alpha1.HealthStatusFailing, + } + + scaledObject := setupTest(health, scaler, scaleHandler) + + var capturedScaledObject v1alpha1.ScaledObject + client.EXPECT().Status().Return(statusWriter) + statusWriter.EXPECT().Patch(gomock.Any(), gomock.Any(), gomock.Any()).Do(func(arg interface{}, scaledObject *v1alpha1.ScaledObject, anotherArg interface{}) { + capturedScaledObject = *scaledObject + }) + + _, err := reconciler.getScaledObjectMetricSpecs(logger, scaledObject) + + Expect(err).ToNot(HaveOccurred()) + Expect(capturedScaledObject.Status.Health).To(BeEmpty()) + }) + + It("should not remove existing metric from health status", func() { + numberOfFailures := int32(87) + health := make(map[string]v1alpha1.HealthStatus) + health["another metric name"] = v1alpha1.HealthStatus{ + NumberOfFailures: &numberOfFailures, + Status: v1alpha1.HealthStatusFailing, + } + + health["some metric name"] = v1alpha1.HealthStatus{ + NumberOfFailures: &numberOfFailures, + Status: v1alpha1.HealthStatusFailing, + } + + scaledObject := setupTest(health, scaler, scaleHandler) + + var capturedScaledObject v1alpha1.ScaledObject + client.EXPECT().Status().Return(statusWriter) + statusWriter.EXPECT().Patch(gomock.Any(), gomock.Any(), gomock.Any()).Do(func(arg interface{}, scaledObject *v1alpha1.ScaledObject, anotherArg interface{}) { + capturedScaledObject = *scaledObject + }) + + _, err := reconciler.getScaledObjectMetricSpecs(logger, scaledObject) + + expectedHealth := make(map[string]v1alpha1.HealthStatus) + expectedHealth["some metric name"] = v1alpha1.HealthStatus{ + NumberOfFailures: &numberOfFailures, + Status: v1alpha1.HealthStatusFailing, + } + + Expect(err).ToNot(HaveOccurred()) + Expect(capturedScaledObject.Status.Health).To(HaveLen(1)) + Expect(capturedScaledObject.Status.Health).To(Equal(expectedHealth)) + }) + +}) + +func setupTest(health map[string]v1alpha1.HealthStatus, scaler *mock_scalers.MockScaler, scaleHandler *mock_scaling.MockScaleHandler) *v1alpha1.ScaledObject { + scaledObject := &v1alpha1.ScaledObject{ + ObjectMeta: v1.ObjectMeta{ + Name: "some scaled object name", + }, + Status: v1alpha1.ScaledObjectStatus{ + Health: health, + }, + } + + scalers := []kedascalers.Scaler{scaler} + metricSpec := v2beta2.MetricSpec{ + External: &v2beta2.ExternalMetricSource{ + Metric: v2beta2.MetricIdentifier{ + Name: "some metric name", + }, + }, + } + metricSpecs := []v2beta2.MetricSpec{metricSpec} + scaler.EXPECT().GetMetricSpecForScaling().Return(metricSpecs) + scaler.EXPECT().Close() + scaleHandler.EXPECT().GetScalers(gomock.Eq(scaledObject)).Return(scalers, nil) + + return scaledObject +} diff --git a/controllers/scaledobject_controller.go b/controllers/scaledobject_controller.go index 19d391ba347..161bacbd075 100644 --- a/controllers/scaledobject_controller.go +++ b/controllers/scaledobject_controller.go @@ -51,7 +51,7 @@ type ScaledObjectReconciler struct { GlobalHTTPTimeout time.Duration Recorder record.EventRecorder - scaleClient *scale.ScalesGetter + scaleClient scale.ScalesGetter restMapper meta.RESTMapper scaledObjectsGenerations *sync.Map scaleHandler scaling.ScaleHandler @@ -90,7 +90,7 @@ func (r *ScaledObjectReconciler) SetupWithManager(mgr ctrl.Manager) error { // Create Scale Client scaleClient := initScaleClient(mgr, clientset) - r.scaleClient = &scaleClient + r.scaleClient = scaleClient // Init the rest of ScaledObjectReconciler r.restMapper = mgr.GetRESTMapper() @@ -266,7 +266,7 @@ func (r *ScaledObjectReconciler) checkTargetResourceIsScalable(logger logr.Logge // not cached, let's try to detect /scale subresource // also rechecks when we need to update the status. var errScale error - scale, errScale = (*r.scaleClient).Scales(scaledObject.Namespace).Get(context.TODO(), gr, scaledObject.Spec.ScaleTargetRef.Name, metav1.GetOptions{}) + scale, errScale = (r.scaleClient).Scales(scaledObject.Namespace).Get(context.TODO(), gr, scaledObject.Spec.ScaleTargetRef.Name, metav1.GetOptions{}) if errScale != nil { // not able to get /scale subresource -> let's check if the resource even exist in the cluster unstruct := &unstructured.Unstructured{} diff --git a/controllers/scaledobject_finalizer.go b/controllers/scaledobject_finalizer.go index a4e3c72b5c7..a0ac0bc4949 100644 --- a/controllers/scaledobject_finalizer.go +++ b/controllers/scaledobject_finalizer.go @@ -30,7 +30,7 @@ func (r *ScaledObjectReconciler) finalizeScaledObject(logger logr.Logger, scaled // if enabled, scale scaleTarget back to the original replica count (to the state it was before scaling with KEDA) if scaledObject.Spec.Advanced != nil && scaledObject.Spec.Advanced.RestoreToOriginalReplicaCount { - scale, err := (*r.scaleClient).Scales(scaledObject.Namespace).Get(context.TODO(), scaledObject.Status.ScaleTargetGVKR.GroupResource(), scaledObject.Spec.ScaleTargetRef.Name, metav1.GetOptions{}) + scale, err := r.scaleClient.Scales(scaledObject.Namespace).Get(context.TODO(), scaledObject.Status.ScaleTargetGVKR.GroupResource(), scaledObject.Spec.ScaleTargetRef.Name, metav1.GetOptions{}) if err != nil { if errors.IsNotFound(err) { logger.V(1).Info("Failed to get scaleTarget's scale status, because it was probably deleted", "error", err) @@ -39,7 +39,7 @@ func (r *ScaledObjectReconciler) finalizeScaledObject(logger logr.Logger, scaled } } else { scale.Spec.Replicas = *scaledObject.Status.OriginalReplicaCount - _, err = (*r.scaleClient).Scales(scaledObject.Namespace).Update(context.TODO(), scaledObject.Status.ScaleTargetGVKR.GroupResource(), scale, metav1.UpdateOptions{}) + _, err = r.scaleClient.Scales(scaledObject.Namespace).Update(context.TODO(), scaledObject.Status.ScaleTargetGVKR.GroupResource(), scale, metav1.UpdateOptions{}) if err != nil { logger.Error(err, "Failed to restore scaleTarget's replica count back to the original", "finalizer", scaledObjectFinalizer) } diff --git a/go.mod b/go.mod index b3f62446c60..ab341ceca5a 100644 --- a/go.mod +++ b/go.mod @@ -42,6 +42,7 @@ require ( github.com/tidwall/gjson v1.8.0 github.com/xdg/scram v1.0.3 go.mongodb.org/mongo-driver v1.5.2 + golang.org/x/tools v0.1.4 // indirect google.golang.org/api v0.47.0 google.golang.org/genproto v0.0.0-20210517163617-5e0236093d7a google.golang.org/grpc v1.37.1 @@ -55,6 +56,7 @@ require ( k8s.io/metrics v0.20.7 knative.dev/pkg v0.0.0-20210525071438-e2947dba201a sigs.k8s.io/controller-runtime v0.6.5 + sigs.k8s.io/controller-tools v0.3.0 // indirect ) replace k8s.io/client-go => k8s.io/client-go v0.20.7 diff --git a/go.sum b/go.sum index f126068881a..6d122eded7d 100644 --- a/go.sum +++ b/go.sum @@ -387,6 +387,8 @@ github.com/gobuffalo/envy v1.7.0/go.mod h1:n7DRkBerg/aorDM8kbduw5dN3oXGswK5liaSC github.com/gobuffalo/flect v0.1.0/go.mod h1:d2ehjJqGOH/Kjqcoz+F7jHTBbmDb38yXA598Hb50EGs= github.com/gobuffalo/flect v0.1.1/go.mod h1:8JCgGVbRjJhVgD6399mQr4fx5rRfGKVzFjbj6RE/9UI= github.com/gobuffalo/flect v0.1.3/go.mod h1:8JCgGVbRjJhVgD6399mQr4fx5rRfGKVzFjbj6RE/9UI= +github.com/gobuffalo/flect v0.2.0/go.mod h1:W3K3X9ksuZfir8f/LrfVtWmCDQFfayuylOJ7sz/Fj80= +github.com/gobuffalo/flect v0.2.2 h1:PAVD7sp0KOdfswjAw9BpLCU9hXo7wFSzgpQ+zNeks/A= github.com/gobuffalo/flect v0.2.2/go.mod h1:vmkQwuZYhN5Pc4ljYQZzP+1sq+NEkK+lh20jmEmX3jc= github.com/gobuffalo/genny v0.0.0-20190329151137-27723ad26ef9/go.mod h1:rWs4Z12d1Zbf19rlsn0nurr75KqhYp52EAGGxTbBhNk= github.com/gobuffalo/genny v0.0.0-20190403191548-3ca520ef0d9e/go.mod h1:80lIj3kVJWwOrXWWMRzzdhW3DsrdjILVil/SFKBzF28= @@ -603,6 +605,7 @@ github.com/imdario/mergo v0.3.5/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJ github.com/imdario/mergo v0.3.9/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA= github.com/imdario/mergo v0.3.12 h1:b6R2BslTbIEToALKP7LxUvijTsNI9TAe80pLWN2g/HU= github.com/imdario/mergo v0.3.12/go.mod h1:jmQim1M+e3UYxmgPu/WyfjB3N3VflVyUjjjwH0dnCYA= +github.com/inconshreveable/mousetrap v1.0.0 h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NHg9XEKhtSvM= github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8= github.com/influxdata/influxdb-client-go/v2 v2.3.0 h1:4YzLWRsPUoHuQYWDwPoybaJjN01e0/k0AIQO85ymCKI= github.com/influxdata/influxdb-client-go/v2 v2.3.0/go.mod h1:vLNHdxTJkIf2mSLvGrpj8TCcISApPoXkaxP8g9uRlW8= @@ -777,6 +780,7 @@ github.com/onsi/gomega v0.0.0-20170829124025-dcabb60a477c/go.mod h1:C1qb7wdrVGGV github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= github.com/onsi/gomega v1.7.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY= +github.com/onsi/gomega v1.8.1/go.mod h1:Ho0h+IUsWyvy1OpqCwxlQ/21gkhVunqlU8fDGcoTdcA= github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo= github.com/onsi/gomega v1.12.0 h1:p4oGGk2M2UJc0wWN4lHFvIB71lxsh0T/UiKCCgFADY8= github.com/onsi/gomega v1.12.0/go.mod h1:lRk9szgn8TxENtWd0Tp4c3wjlRfMTMH27I+3Je41yGY= @@ -892,6 +896,7 @@ github.com/spf13/afero v1.2.2/go.mod h1:9ZxEEn6pIJ8Rxe320qSDBk6AsU0r9pR7Q4OcevTd github.com/spf13/cast v1.3.0/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkUJE= github.com/spf13/cobra v0.0.3/go.mod h1:1l0Ry5zgKvJasoi3XT1TypsSe7PqH0Sj9dhYf7v3XqQ= github.com/spf13/cobra v0.0.5/go.mod h1:3K3wKZymM7VvHMDS9+Akkh4K60UwM26emMESw8tLCHU= +github.com/spf13/cobra v1.0.0 h1:6m/oheQuQ13N9ks4hubMG6BnvwOeaJrqSPLahSnczz8= github.com/spf13/cobra v1.0.0/go.mod h1:/6GTrnGXV9HjY+aR4k0oJ5tcvakLuG6EuKReYlHNrgE= github.com/spf13/jwalterweatherman v1.0.0/go.mod h1:cQK4TGJAtQXfYWX+Ddv3mKDzgVb68N+wFjFa4jdeBTo= github.com/spf13/pflag v0.0.0-20170130214245-9ff6c6923cff/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4= @@ -1325,6 +1330,8 @@ golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4f golang.org/x/tools v0.1.0/go.mod h1:xkSsbof2nBLbhDlRMhhhyNLN/zl3eTqcnHD5viDpcZ0= golang.org/x/tools v0.1.1 h1:wGiQel/hW0NnEkJUk8lbzkX2gFJU6PFxf1v5OlCfuOs= golang.org/x/tools v0.1.1/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= +golang.org/x/tools v0.1.4 h1:cVngSRcfgyZCzys3KYOpCFa+4dqX/Oub9tAq00ttGVs= +golang.org/x/tools v0.1.4/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= @@ -1470,6 +1477,7 @@ gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= +gopkg.in/yaml.v3 v3.0.0-20190905181640-827449938966/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b h1:h8qDotaEPuJATrMmW04NCwg7v22aHH28wwpauUhK9Oo= @@ -1485,19 +1493,23 @@ honnef.co/go/tools v0.0.1-2020.1.3/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9 honnef.co/go/tools v0.0.1-2020.1.4/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k= honnef.co/go/tools v0.0.1-2020.1.5 h1:nI5egYTGJakVyOryqLs1cQO5dO0ksin5XXs2pspk75k= honnef.co/go/tools v0.0.1-2020.1.5/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k= +k8s.io/api v0.18.2/go.mod h1:SJCWI7OLzhZSvbY7U8zwNl9UA4o1fizoug34OV/2r78= k8s.io/api v0.18.6/go.mod h1:eeyxr+cwCjMdLAmr2W3RyDI0VvTawSg/3RFFBEnmZGI= k8s.io/api v0.19.7/go.mod h1:KTryDUT3l6Mtv7K2J2486PNL9DBns3wOYTkGR+iz63Y= k8s.io/api v0.20.0/go.mod h1:HyLC5l5eoS/ygQYl1BXBgFzWNlkHiAuyNAbevIn+FKg= k8s.io/api v0.20.7 h1:wOEPJ3NoimUfR9v9sAO2JosPiEP9IGFNplf7zZvYzPU= k8s.io/api v0.20.7/go.mod h1:4x0yErUkcEWYG+O0S4QdrYa2+PLEeY2M7aeQe++2nmk= +k8s.io/apiextensions-apiserver v0.18.2/go.mod h1:q3faSnRGmYimiocj6cHQ1I3WpLqmDgJFlKL37fC4ZvY= k8s.io/apiextensions-apiserver v0.18.6/go.mod h1:lv89S7fUysXjLZO7ke783xOwVTm6lKizADfvUM/SS/M= k8s.io/apiextensions-apiserver v0.19.7 h1:aV9DANMSCCYBEMbtoT/5oesrtcciQrjy9yqWVtZZL5A= k8s.io/apiextensions-apiserver v0.19.7/go.mod h1:XJNNtjISNNePDEUClHt/igzMpQcmjVVh88QH+PKztPU= +k8s.io/apimachinery v0.18.2/go.mod h1:9SnR/e11v5IbyPCGbvJViimtJ0SwHG4nfZFjU77ftcA= k8s.io/apimachinery v0.18.6/go.mod h1:OaXp26zu/5J7p0f92ASynJa1pZo06YlV9fG7BoWbCko= k8s.io/apimachinery v0.19.7/go.mod h1:6sRbGRAVY5DOCuZwB5XkqguBqpqLU6q/kOaOdk29z6Q= k8s.io/apimachinery v0.20.0/go.mod h1:WlLqWAHZGg07AeltaI0MV5uk1Omp8xaN0JGLY6gkRpU= k8s.io/apimachinery v0.20.7 h1:tBfhql7OggSCahvASeEpLRzvxc7FK77wNivi1uXCQWM= k8s.io/apimachinery v0.20.7/go.mod h1:ejZXtW1Ra6V1O5H8xPBGz+T3+4gfkTCeExAHKU57MAc= +k8s.io/apiserver v0.18.2/go.mod h1:Xbh066NqrZO8cbsoenCwyDJ1OSi8Ag8I2lezeHxzwzw= k8s.io/apiserver v0.18.6/go.mod h1:Zt2XvTHuaZjBz6EFYzpp+X4hTmgWGy8AthNVnTdm3Wg= k8s.io/apiserver v0.19.7/go.mod h1:DmWVQggNePspa+vSsVytVbS3iBSDTXdJVt0akfHacKk= k8s.io/apiserver v0.20.0/go.mod h1:6gRIWiOkvGvQt12WTYmsiYoUyYW0FXSiMdNl4m+sxY8= @@ -1505,11 +1517,13 @@ k8s.io/apiserver v0.20.7 h1:kmj4lX5evfdm8h07jRjuSANvRH0kPlXTq6LOSGT6n/k= k8s.io/apiserver v0.20.7/go.mod h1:7gbB7UjDdP1/epYBGnIUE6jWY4Wpz99cZ7igfDa9rv4= k8s.io/client-go v0.20.7 h1:Ot22456XfYAWrCWddw/quevMrFHqP7s1qT499FoumVU= k8s.io/client-go v0.20.7/go.mod h1:uGl3qh/Jy3cTF1nDoIKBqUZlRWnj/EM+/leAXETKRuA= +k8s.io/code-generator v0.18.2/go.mod h1:+UHX5rSbxmR8kzS+FAv7um6dtYrZokQvjHpDSYRVkTc= k8s.io/code-generator v0.18.6/go.mod h1:TgNEVx9hCyPGpdtCWA34olQYLkh3ok9ar7XfSsr8b6c= k8s.io/code-generator v0.19.7/go.mod h1:lwEq3YnLYb/7uVXLorOJfxg+cUu2oihFhHZ0n9NIla0= k8s.io/code-generator v0.20.0/go.mod h1:UsqdF+VX4PU2g46NC2JRs4gc+IfrctnwHb76RNbWHJg= k8s.io/code-generator v0.20.7 h1:iXz1ME6EQqoCkLefa7bcniKHu0SzgbxsFV1RlBcfypc= k8s.io/code-generator v0.20.7/go.mod h1:i6FmG+QxaLxvJsezvZp0q/gAEzzOz3U53KFibghWToU= +k8s.io/component-base v0.18.2/go.mod h1:kqLlMuhJNHQ9lz8Z7V5bxUUtjFZnrypArGl58gmDfUM= k8s.io/component-base v0.18.6/go.mod h1:knSVsibPR5K6EW2XOjEHik6sdU5nCvKMrzMt2D4In14= k8s.io/component-base v0.19.7/go.mod h1:YX8spPBgwl3I6UGcSdQiEMAqRMSUsGQOW7SEr4+Qa3U= k8s.io/component-base v0.20.0/go.mod h1:wKPj+RHnAr8LW2EIBIK7AxOHPde4gme2lzXwVSoRXeA= @@ -1530,6 +1544,7 @@ k8s.io/klog/v2 v2.2.0/go.mod h1:Od+F08eJP+W3HUb4pSrPpgp9DGU4GzlpG/TmITuYh/Y= k8s.io/klog/v2 v2.4.0/go.mod h1:Od+F08eJP+W3HUb4pSrPpgp9DGU4GzlpG/TmITuYh/Y= k8s.io/klog/v2 v2.8.0 h1:Q3gmuM9hKEjefWFFYF0Mat+YyFJvsUyYuwyNNJ5C9Ts= k8s.io/klog/v2 v2.8.0/go.mod h1:hy9LJ/NvuK+iVyP4Ehqva4HxZG/oXyIS3n3Jmire4Ec= +k8s.io/kube-openapi v0.0.0-20200121204235-bf4fb3bd569c/go.mod h1:GRQhZsXIAJ1xR0C9bd8UpWHZ5plfAS9fzPjJuQ6JL3E= k8s.io/kube-openapi v0.0.0-20200410145947-61e04a5be9a6/go.mod h1:GRQhZsXIAJ1xR0C9bd8UpWHZ5plfAS9fzPjJuQ6JL3E= k8s.io/kube-openapi v0.0.0-20200805222855-6aeccd4b50c6/go.mod h1:UuqjUnNftUyPE5H64/qeyjQoUZhGpeFDVdxjTeEVN2o= k8s.io/kube-openapi v0.0.0-20201113171705-d219536bb9fd/go.mod h1:WOJ3KddDSol4tAGcJo0Tvi+dK12EcqSLqcWsryKMpfM= @@ -1559,6 +1574,8 @@ sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.0.15 h1:4uqm9Mv+w2MmB sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.0.15/go.mod h1:LEScyzhFmoF5pso/YSeBstl57mOzx9xlU9n85RGrDQg= sigs.k8s.io/controller-runtime v0.6.5 h1:DSRu6E4FBeVwd/p8niskCVWnX5TSC6ZT9L/OIWOBK7s= sigs.k8s.io/controller-runtime v0.6.5/go.mod h1:WlZNXcM0++oyaQt4B7C2lEE5JYRs8vJUzRP4N4JpdAY= +sigs.k8s.io/controller-tools v0.3.0 h1:y3YD99XOyWaXkiF1kd41uRvfp/64teWcrEZFuHxPhJ4= +sigs.k8s.io/controller-tools v0.3.0/go.mod h1:enhtKGfxZD1GFEoMgP8Fdbu+uKQ/cq1/WGJhdVChfvI= sigs.k8s.io/structured-merge-diff/v3 v3.0.0-20200116222232-67a7b8c61874/go.mod h1:PlARxl6Hbt/+BC80dRLi1qAmnMqwqDg62YvvVkZjemw= sigs.k8s.io/structured-merge-diff/v3 v3.0.0/go.mod h1:PlARxl6Hbt/+BC80dRLi1qAmnMqwqDg62YvvVkZjemw= sigs.k8s.io/structured-merge-diff/v4 v4.0.1/go.mod h1:bJZC9H9iH24zzfZ/41RGcq60oK1F7G282QMXDPYydCw= diff --git a/pkg/mock/mock_scale/mock_interfaces.go b/pkg/mock/mock_scale/mock_interfaces.go new file mode 100644 index 00000000000..08aa971a0fa --- /dev/null +++ b/pkg/mock/mock_scale/mock_interfaces.go @@ -0,0 +1,122 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: /home/misha/go/pkg/mod/k8s.io/client-go@v0.20.7/scale/interfaces.go + +// Package mock_scale is a generated GoMock package. +package mock_scale + +import ( + context "context" + reflect "reflect" + + gomock "github.com/golang/mock/gomock" + v1 "k8s.io/api/autoscaling/v1" + v10 "k8s.io/apimachinery/pkg/apis/meta/v1" + schema "k8s.io/apimachinery/pkg/runtime/schema" + types "k8s.io/apimachinery/pkg/types" + scale "k8s.io/client-go/scale" +) + +// MockScalesGetter is a mock of ScalesGetter interface. +type MockScalesGetter struct { + ctrl *gomock.Controller + recorder *MockScalesGetterMockRecorder +} + +// MockScalesGetterMockRecorder is the mock recorder for MockScalesGetter. +type MockScalesGetterMockRecorder struct { + mock *MockScalesGetter +} + +// NewMockScalesGetter creates a new mock instance. +func NewMockScalesGetter(ctrl *gomock.Controller) *MockScalesGetter { + mock := &MockScalesGetter{ctrl: ctrl} + mock.recorder = &MockScalesGetterMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockScalesGetter) EXPECT() *MockScalesGetterMockRecorder { + return m.recorder +} + +// Scales mocks base method. +func (m *MockScalesGetter) Scales(namespace string) scale.ScaleInterface { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Scales", namespace) + ret0, _ := ret[0].(scale.ScaleInterface) + return ret0 +} + +// Scales indicates an expected call of Scales. +func (mr *MockScalesGetterMockRecorder) Scales(namespace interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Scales", reflect.TypeOf((*MockScalesGetter)(nil).Scales), namespace) +} + +// MockScaleInterface is a mock of ScaleInterface interface. +type MockScaleInterface struct { + ctrl *gomock.Controller + recorder *MockScaleInterfaceMockRecorder +} + +// MockScaleInterfaceMockRecorder is the mock recorder for MockScaleInterface. +type MockScaleInterfaceMockRecorder struct { + mock *MockScaleInterface +} + +// NewMockScaleInterface creates a new mock instance. +func NewMockScaleInterface(ctrl *gomock.Controller) *MockScaleInterface { + mock := &MockScaleInterface{ctrl: ctrl} + mock.recorder = &MockScaleInterfaceMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockScaleInterface) EXPECT() *MockScaleInterfaceMockRecorder { + return m.recorder +} + +// Get mocks base method. +func (m *MockScaleInterface) Get(ctx context.Context, resource schema.GroupResource, name string, opts v10.GetOptions) (*v1.Scale, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Get", ctx, resource, name, opts) + ret0, _ := ret[0].(*v1.Scale) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Get indicates an expected call of Get. +func (mr *MockScaleInterfaceMockRecorder) Get(ctx, resource, name, opts interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Get", reflect.TypeOf((*MockScaleInterface)(nil).Get), ctx, resource, name, opts) +} + +// Patch mocks base method. +func (m *MockScaleInterface) Patch(ctx context.Context, gvr schema.GroupVersionResource, name string, pt types.PatchType, data []byte, opts v10.PatchOptions) (*v1.Scale, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Patch", ctx, gvr, name, pt, data, opts) + ret0, _ := ret[0].(*v1.Scale) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Patch indicates an expected call of Patch. +func (mr *MockScaleInterfaceMockRecorder) Patch(ctx, gvr, name, pt, data, opts interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Patch", reflect.TypeOf((*MockScaleInterface)(nil).Patch), ctx, gvr, name, pt, data, opts) +} + +// Update mocks base method. +func (m *MockScaleInterface) Update(ctx context.Context, resource schema.GroupResource, scale *v1.Scale, opts v10.UpdateOptions) (*v1.Scale, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Update", ctx, resource, scale, opts) + ret0, _ := ret[0].(*v1.Scale) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Update indicates an expected call of Update. +func (mr *MockScaleInterfaceMockRecorder) Update(ctx, resource, scale, opts interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Update", reflect.TypeOf((*MockScaleInterface)(nil).Update), ctx, resource, scale, opts) +} diff --git a/pkg/mock/mock_scaler/mock_scaler.go b/pkg/mock/mock_scaler/mock_scaler.go new file mode 100644 index 00000000000..917d39b3cdc --- /dev/null +++ b/pkg/mock/mock_scaler/mock_scaler.go @@ -0,0 +1,189 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: pkg/scalers/scaler.go + +// Package mock_scalers is a generated GoMock package. +package mock_scalers + +import ( + context "context" + reflect "reflect" + + gomock "github.com/golang/mock/gomock" + v2beta2 "k8s.io/api/autoscaling/v2beta2" + labels "k8s.io/apimachinery/pkg/labels" + external_metrics "k8s.io/metrics/pkg/apis/external_metrics" +) + +// MockScaler is a mock of Scaler interface. +type MockScaler struct { + ctrl *gomock.Controller + recorder *MockScalerMockRecorder +} + +// MockScalerMockRecorder is the mock recorder for MockScaler. +type MockScalerMockRecorder struct { + mock *MockScaler +} + +// NewMockScaler creates a new mock instance. +func NewMockScaler(ctrl *gomock.Controller) *MockScaler { + mock := &MockScaler{ctrl: ctrl} + mock.recorder = &MockScalerMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockScaler) EXPECT() *MockScalerMockRecorder { + return m.recorder +} + +// Close mocks base method. +func (m *MockScaler) Close() error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Close") + ret0, _ := ret[0].(error) + return ret0 +} + +// Close indicates an expected call of Close. +func (mr *MockScalerMockRecorder) Close() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockScaler)(nil).Close)) +} + +// GetMetricSpecForScaling mocks base method. +func (m *MockScaler) GetMetricSpecForScaling() []v2beta2.MetricSpec { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetMetricSpecForScaling") + ret0, _ := ret[0].([]v2beta2.MetricSpec) + return ret0 +} + +// GetMetricSpecForScaling indicates an expected call of GetMetricSpecForScaling. +func (mr *MockScalerMockRecorder) GetMetricSpecForScaling() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetMetricSpecForScaling", reflect.TypeOf((*MockScaler)(nil).GetMetricSpecForScaling)) +} + +// GetMetrics mocks base method. +func (m *MockScaler) GetMetrics(ctx context.Context, metricName string, metricSelector labels.Selector) ([]external_metrics.ExternalMetricValue, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetMetrics", ctx, metricName, metricSelector) + ret0, _ := ret[0].([]external_metrics.ExternalMetricValue) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GetMetrics indicates an expected call of GetMetrics. +func (mr *MockScalerMockRecorder) GetMetrics(ctx, metricName, metricSelector interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetMetrics", reflect.TypeOf((*MockScaler)(nil).GetMetrics), ctx, metricName, metricSelector) +} + +// IsActive mocks base method. +func (m *MockScaler) IsActive(ctx context.Context) (bool, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "IsActive", ctx) + ret0, _ := ret[0].(bool) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// IsActive indicates an expected call of IsActive. +func (mr *MockScalerMockRecorder) IsActive(ctx interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsActive", reflect.TypeOf((*MockScaler)(nil).IsActive), ctx) +} + +// MockPushScaler is a mock of PushScaler interface. +type MockPushScaler struct { + ctrl *gomock.Controller + recorder *MockPushScalerMockRecorder +} + +// MockPushScalerMockRecorder is the mock recorder for MockPushScaler. +type MockPushScalerMockRecorder struct { + mock *MockPushScaler +} + +// NewMockPushScaler creates a new mock instance. +func NewMockPushScaler(ctrl *gomock.Controller) *MockPushScaler { + mock := &MockPushScaler{ctrl: ctrl} + mock.recorder = &MockPushScalerMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockPushScaler) EXPECT() *MockPushScalerMockRecorder { + return m.recorder +} + +// Close mocks base method. +func (m *MockPushScaler) Close() error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Close") + ret0, _ := ret[0].(error) + return ret0 +} + +// Close indicates an expected call of Close. +func (mr *MockPushScalerMockRecorder) Close() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockPushScaler)(nil).Close)) +} + +// GetMetricSpecForScaling mocks base method. +func (m *MockPushScaler) GetMetricSpecForScaling() []v2beta2.MetricSpec { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetMetricSpecForScaling") + ret0, _ := ret[0].([]v2beta2.MetricSpec) + return ret0 +} + +// GetMetricSpecForScaling indicates an expected call of GetMetricSpecForScaling. +func (mr *MockPushScalerMockRecorder) GetMetricSpecForScaling() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetMetricSpecForScaling", reflect.TypeOf((*MockPushScaler)(nil).GetMetricSpecForScaling)) +} + +// GetMetrics mocks base method. +func (m *MockPushScaler) GetMetrics(ctx context.Context, metricName string, metricSelector labels.Selector) ([]external_metrics.ExternalMetricValue, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetMetrics", ctx, metricName, metricSelector) + ret0, _ := ret[0].([]external_metrics.ExternalMetricValue) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GetMetrics indicates an expected call of GetMetrics. +func (mr *MockPushScalerMockRecorder) GetMetrics(ctx, metricName, metricSelector interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetMetrics", reflect.TypeOf((*MockPushScaler)(nil).GetMetrics), ctx, metricName, metricSelector) +} + +// IsActive mocks base method. +func (m *MockPushScaler) IsActive(ctx context.Context) (bool, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "IsActive", ctx) + ret0, _ := ret[0].(bool) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// IsActive indicates an expected call of IsActive. +func (mr *MockPushScalerMockRecorder) IsActive(ctx interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsActive", reflect.TypeOf((*MockPushScaler)(nil).IsActive), ctx) +} + +// Run mocks base method. +func (m *MockPushScaler) Run(ctx context.Context, active chan<- bool) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "Run", ctx, active) +} + +// Run indicates an expected call of Run. +func (mr *MockPushScalerMockRecorder) Run(ctx, active interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Run", reflect.TypeOf((*MockPushScaler)(nil).Run), ctx, active) +} diff --git a/pkg/provider/fallback.go b/pkg/provider/fallback.go new file mode 100644 index 00000000000..24823329be2 --- /dev/null +++ b/pkg/provider/fallback.go @@ -0,0 +1,106 @@ +package provider + +import ( + "context" + "fmt" + + kedav1alpha1 "github.com/kedacore/keda/v2/api/v1alpha1" + "github.com/kedacore/keda/v2/pkg/scalers" + "k8s.io/api/autoscaling/v2beta2" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/metrics/pkg/apis/external_metrics" +) + +func isFallbackEnabled(scaledObject *kedav1alpha1.ScaledObject, metricSpec v2beta2.MetricSpec) bool { + return scaledObject.Spec.Fallback != nil && metricSpec.External.Target.Type == v2beta2.AverageValueMetricType +} + +func (p *KedaProvider) getMetricsWithFallback(scaler scalers.Scaler, metricName string, metricSelector labels.Selector, scaledObject *kedav1alpha1.ScaledObject, metricSpec v2beta2.MetricSpec) ([]external_metrics.ExternalMetricValue, error) { + initHealthStatus(scaledObject) + metrics, err := scaler.GetMetrics(context.TODO(), metricName, metricSelector) + healthStatus := getHealthStatus(scaledObject, metricName) + + if err == nil { + zero := int32(0) + healthStatus.NumberOfFailures = &zero + healthStatus.Status = kedav1alpha1.HealthStatusHappy + scaledObject.Status.Health[metricName] = *healthStatus + + p.updateStatus(scaledObject) + return metrics, nil + } + + healthStatus.Status = kedav1alpha1.HealthStatusFailing + *healthStatus.NumberOfFailures++ + scaledObject.Status.Health[metricName] = *healthStatus + + p.updateStatus(scaledObject) + + if !validateFallback(scaledObject, metricSpec) { + logger.Info("Failed to validate ScaledObject Spec. Please check that parameters are positive integers") + return nil, err + } + + switch { + case !isFallbackEnabled(scaledObject, metricSpec): + return nil, err + case *healthStatus.NumberOfFailures > scaledObject.Spec.Fallback.FailureThreshold: + return doFallback(scaledObject, metricSpec, metricName, err), nil + default: + return nil, err + } +} + +func validateFallback(scaledObject *kedav1alpha1.ScaledObject, metricSpec v2beta2.MetricSpec) bool { + if !isFallbackEnabled(scaledObject, metricSpec) { + return true + } + + return scaledObject.Spec.Fallback.FailureThreshold >= 0 && + scaledObject.Spec.Fallback.Replicas >= 0 +} + +func doFallback(scaledObject *kedav1alpha1.ScaledObject, metricSpec v2beta2.MetricSpec, metricName string, suppressedError error) []external_metrics.ExternalMetricValue { + replicas := int64(scaledObject.Spec.Fallback.Replicas) + normalisationValue, _ := metricSpec.External.Target.AverageValue.AsInt64() + metric := external_metrics.ExternalMetricValue{ + MetricName: metricName, + Value: *resource.NewQuantity(normalisationValue*replicas, resource.DecimalSI), + Timestamp: metav1.Now(), + } + fallbackMetrics := []external_metrics.ExternalMetricValue{metric} + + logger.Info(fmt.Sprintf("Suppressing error %s, falling back to %d replicas", suppressedError, replicas)) + return fallbackMetrics +} + +func (p *KedaProvider) updateStatus(scaledObject *kedav1alpha1.ScaledObject) { + err := p.client.Status().Update(context.TODO(), scaledObject) + if err != nil { + logger.Error(err, "Error updating ScaledObject status", "Error") + } +} + +func getHealthStatus(scaledObject *kedav1alpha1.ScaledObject, metricName string) *kedav1alpha1.HealthStatus { + // Get health status for a specific metric + _, healthStatusExists := scaledObject.Status.Health[metricName] + if !healthStatusExists { + zero := int32(0) + status := kedav1alpha1.HealthStatus{ + NumberOfFailures: &zero, + Status: kedav1alpha1.HealthStatusHappy, + } + scaledObject.Status.Health[metricName] = status + } + healthStatus := scaledObject.Status.Health[metricName] + return &healthStatus +} + +func initHealthStatus(scaledObject *kedav1alpha1.ScaledObject) { + // Init health status if missing + if scaledObject.Status.Health == nil { + scaledObject.Status.Health = make(map[string]kedav1alpha1.HealthStatus) + } +} diff --git a/pkg/provider/fallback_test.go b/pkg/provider/fallback_test.go new file mode 100644 index 00000000000..a5f0b2e5859 --- /dev/null +++ b/pkg/provider/fallback_test.go @@ -0,0 +1,356 @@ +package provider + +import ( + "errors" + "fmt" + "testing" + + "github.com/go-logr/logr" + "github.com/golang/mock/gomock" + kedav1alpha1 "github.com/kedacore/keda/v2/api/v1alpha1" + "github.com/kedacore/keda/v2/pkg/mock/mock_client" + mock_scalers "github.com/kedacore/keda/v2/pkg/mock/mock_scaler" + "github.com/kedacore/keda/v2/pkg/mock/mock_scaling" + "github.com/kubernetes-sigs/custom-metrics-apiserver/pkg/provider" + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + "github.com/onsi/gomega/types" + "k8s.io/api/autoscaling/v2beta2" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/metrics/pkg/apis/external_metrics" + "sigs.k8s.io/controller-runtime/pkg/envtest/printer" +) + +const metricName = "some_metric_name" + +func TestFallback(t *testing.T) { + RegisterFailHandler(Fail) + + RunSpecsWithDefaultAndCustomReporters(t, + "Controller Suite", + []Reporter{printer.NewlineReporter{}}) +} + +var _ = Describe("fallback", func() { + var ( + scaleHandler *mock_scaling.MockScaleHandler + client *mock_client.MockClient + providerUnderTest *KedaProvider + scaler *mock_scalers.MockScaler + ctrl *gomock.Controller + ) + + BeforeEach(func() { + ctrl = gomock.NewController(GinkgoT()) + scaleHandler = mock_scaling.NewMockScaleHandler(ctrl) + client = mock_client.NewMockClient(ctrl) + providerUnderTest = &KedaProvider{ + values: make(map[provider.CustomMetricInfo]int64), + externalMetrics: make([]externalMetric, 2, 10), + client: client, + scaleHandler: scaleHandler, + watchedNamespace: "", + } + scaler = mock_scalers.NewMockScaler(ctrl) + + logger = logr.DiscardLogger{} + }) + + AfterEach(func() { + ctrl.Finish() + }) + + It("should return the expected metric when fallback is disabled", func() { + + expectedMetricValue := int64(5) + primeGetMetrics(scaler, expectedMetricValue) + so := buildScaledObject(nil, nil) + metricSpec := createMetricSpec(3) + expectStatusUpdate(ctrl, client) + + metrics, err := providerUnderTest.getMetricsWithFallback(scaler, metricName, nil, so, metricSpec) + + Expect(err).ToNot(HaveOccurred()) + value, _ := metrics[0].Value.AsInt64() + Expect(value).Should(Equal(expectedMetricValue)) + }) + + It("should reset the health status when scaler metrics are available", func() { + expectedMetricValue := int64(6) + startingNumberOfFailures := int32(5) + primeGetMetrics(scaler, expectedMetricValue) + + so := buildScaledObject( + &kedav1alpha1.Fallback{ + FailureThreshold: int32(3), + Replicas: int32(10), + }, + &kedav1alpha1.ScaledObjectStatus{ + Health: map[string]kedav1alpha1.HealthStatus{ + metricName: { + NumberOfFailures: &startingNumberOfFailures, + Status: kedav1alpha1.HealthStatusFailing, + }, + }, + }, + ) + + metricSpec := createMetricSpec(3) + expectStatusUpdate(ctrl, client) + + metrics, err := providerUnderTest.getMetricsWithFallback(scaler, metricName, nil, so, metricSpec) + + Expect(err).ToNot(HaveOccurred()) + value, _ := metrics[0].Value.AsInt64() + Expect(value).Should(Equal(expectedMetricValue)) + Expect(so.Status.Health[metricName]).To(haveFailureAndStatus(0, kedav1alpha1.HealthStatusHappy)) + }) + + It("should propagate the error when fallback is disabled", func() { + scaler.EXPECT().GetMetrics(gomock.Any(), gomock.Eq(metricName), gomock.Any()).Return(nil, errors.New("Some error")) + + so := buildScaledObject(nil, nil) + metricSpec := createMetricSpec(3) + expectStatusUpdate(ctrl, client) + + _, err := providerUnderTest.getMetricsWithFallback(scaler, metricName, nil, so, metricSpec) + + Expect(err).ShouldNot(BeNil()) + Expect(err.Error()).Should(Equal("Some error")) + }) + + It("should bump the number of failures when metrics call fails", func() { + scaler.EXPECT().GetMetrics(gomock.Any(), gomock.Eq(metricName), gomock.Any()).Return(nil, errors.New("Some error")) + startingNumberOfFailures := int32(0) + + so := buildScaledObject( + &kedav1alpha1.Fallback{ + FailureThreshold: int32(3), + Replicas: int32(10), + }, + &kedav1alpha1.ScaledObjectStatus{ + Health: map[string]kedav1alpha1.HealthStatus{ + metricName: { + NumberOfFailures: &startingNumberOfFailures, + Status: kedav1alpha1.HealthStatusHappy, + }, + }, + }, + ) + + metricSpec := createMetricSpec(10) + expectStatusUpdate(ctrl, client) + + _, err := providerUnderTest.getMetricsWithFallback(scaler, metricName, nil, so, metricSpec) + + Expect(err).ShouldNot(BeNil()) + Expect(err.Error()).Should(Equal("Some error")) + Expect(so.Status.Health[metricName]).To(haveFailureAndStatus(1, kedav1alpha1.HealthStatusFailing)) + }) + + It("should return a normalised metric when number of failures are beyond threshold", func() { + scaler.EXPECT().GetMetrics(gomock.Any(), gomock.Eq(metricName), gomock.Any()).Return(nil, errors.New("Some error")) + startingNumberOfFailures := int32(3) + expectedMetricValue := int64(100) + + so := buildScaledObject( + &kedav1alpha1.Fallback{ + FailureThreshold: int32(3), + Replicas: int32(10), + }, + &kedav1alpha1.ScaledObjectStatus{ + Health: map[string]kedav1alpha1.HealthStatus{ + metricName: { + NumberOfFailures: &startingNumberOfFailures, + Status: kedav1alpha1.HealthStatusHappy, + }, + }, + }, + ) + metricSpec := createMetricSpec(10) + expectStatusUpdate(ctrl, client) + + metrics, err := providerUnderTest.getMetricsWithFallback(scaler, metricName, nil, so, metricSpec) + + Expect(err).ToNot(HaveOccurred()) + value, _ := metrics[0].Value.AsInt64() + Expect(value).Should(Equal(expectedMetricValue)) + Expect(so.Status.Health[metricName]).To(haveFailureAndStatus(4, kedav1alpha1.HealthStatusFailing)) + }) + + It("should behave as if fallback is disabled when the metrics spec target type is not average value metric", func() { + so := buildScaledObject( + &kedav1alpha1.Fallback{ + FailureThreshold: int32(3), + Replicas: int32(10), + }, nil, + ) + + qty := resource.NewQuantity(int64(3), resource.DecimalSI) + metricsSpec := v2beta2.MetricSpec{ + External: &v2beta2.ExternalMetricSource{ + Target: v2beta2.MetricTarget{ + Type: v2beta2.UtilizationMetricType, + Value: qty, + }, + }, + } + + isEnabled := isFallbackEnabled(so, metricsSpec) + Expect(isEnabled).Should(BeFalse()) + }) + + It("should ignore error if we fail to update kubernetes status", func() { + scaler.EXPECT().GetMetrics(gomock.Any(), gomock.Eq(metricName), gomock.Any()).Return(nil, errors.New("Some error")) + startingNumberOfFailures := int32(3) + expectedMetricValue := int64(100) + + so := buildScaledObject( + &kedav1alpha1.Fallback{ + FailureThreshold: int32(3), + Replicas: int32(10), + }, + &kedav1alpha1.ScaledObjectStatus{ + Health: map[string]kedav1alpha1.HealthStatus{ + metricName: { + NumberOfFailures: &startingNumberOfFailures, + Status: kedav1alpha1.HealthStatusHappy, + }, + }, + }, + ) + metricSpec := createMetricSpec(10) + + statusWriter := mock_client.NewMockStatusWriter(ctrl) + statusWriter.EXPECT().Update(gomock.Any(), gomock.Any()).Return(errors.New("Some error")) + client.EXPECT().Status().Return(statusWriter) + + metrics, err := providerUnderTest.getMetricsWithFallback(scaler, metricName, nil, so, metricSpec) + + Expect(err).ToNot(HaveOccurred()) + value, _ := metrics[0].Value.AsInt64() + Expect(value).Should(Equal(expectedMetricValue)) + Expect(so.Status.Health[metricName]).To(haveFailureAndStatus(4, kedav1alpha1.HealthStatusFailing)) + }) + + It("should return error when fallback is enabled but scaledobject has invalid parameter", func() { + scaler.EXPECT().GetMetrics(gomock.Any(), gomock.Eq(metricName), gomock.Any()).Return(nil, errors.New("Some error")) + startingNumberOfFailures := int32(3) + + so := buildScaledObject( + &kedav1alpha1.Fallback{ + FailureThreshold: int32(-3), + Replicas: int32(10), + }, + &kedav1alpha1.ScaledObjectStatus{ + Health: map[string]kedav1alpha1.HealthStatus{ + metricName: { + NumberOfFailures: &startingNumberOfFailures, + Status: kedav1alpha1.HealthStatusHappy, + }, + }, + }, + ) + metricSpec := createMetricSpec(10) + expectStatusUpdate(ctrl, client) + + _, err := providerUnderTest.getMetricsWithFallback(scaler, metricName, nil, so, metricSpec) + + Expect(err).ShouldNot(BeNil()) + Expect(err.Error()).Should(Equal("Some error")) + }) +}) + +func haveFailureAndStatus(numberOfFailures int, status kedav1alpha1.HealthStatusType) types.GomegaMatcher { + return &healthStatusMatcher{numberOfFailures: numberOfFailures, status: status} +} + +type healthStatusMatcher struct { + numberOfFailures int + status kedav1alpha1.HealthStatusType +} + +func (h *healthStatusMatcher) Match(actual interface{}) (success bool, err error) { + switch v := actual.(type) { + case kedav1alpha1.HealthStatus: + return *v.NumberOfFailures == int32(h.numberOfFailures) && v.Status == h.status, nil + default: + return false, fmt.Errorf("expected kedav1alpha1.HealthStatus, got %v", actual) + } +} + +func (h *healthStatusMatcher) FailureMessage(actual interface{}) (message string) { + switch v := actual.(type) { + case kedav1alpha1.HealthStatus: + return fmt.Sprintf("expected HealthStatus with NumberOfFailures %d and Status %s, but got NumberOfFailures %d and Status %s", h.numberOfFailures, h.status, *v.NumberOfFailures, v.Status) + default: + return "unexpected error" + } +} + +func (h *healthStatusMatcher) NegatedFailureMessage(actual interface{}) (message string) { + switch v := actual.(type) { + case kedav1alpha1.HealthStatus: + return fmt.Sprintf("did not expect HealthStatus with NumberOfFailures %d and Status %s, but got NumberOfFailures %d and Status %s", h.numberOfFailures, h.status, *v.NumberOfFailures, v.Status) + default: + return "unexpected error" + } +} + +func expectStatusUpdate(ctrl *gomock.Controller, client *mock_client.MockClient) { + statusWriter := mock_client.NewMockStatusWriter(ctrl) + statusWriter.EXPECT().Update(gomock.Any(), gomock.Any()) + client.EXPECT().Status().Return(statusWriter) +} + +func buildScaledObject(fallbackConfig *kedav1alpha1.Fallback, status *kedav1alpha1.ScaledObjectStatus) *kedav1alpha1.ScaledObject { + scaledObject := &kedav1alpha1.ScaledObject{ + ObjectMeta: metav1.ObjectMeta{Name: "clean-up-test", Namespace: "default"}, + Spec: kedav1alpha1.ScaledObjectSpec{ + ScaleTargetRef: &kedav1alpha1.ScaleTarget{ + Name: "myapp", + }, + Triggers: []kedav1alpha1.ScaleTriggers{ + { + Type: "cron", + Metadata: map[string]string{ + "timezone": "UTC", + "start": "0 * * * *", + "end": "1 * * * *", + "desiredReplicas": "1", + }, + }, + }, + Fallback: fallbackConfig, + }, + } + + if status != nil { + scaledObject.Status = *status + } + + return scaledObject +} + +func primeGetMetrics(scaler *mock_scalers.MockScaler, value int64) { + expectedMetric := external_metrics.ExternalMetricValue{ + MetricName: metricName, + Value: *resource.NewQuantity(value, resource.DecimalSI), + Timestamp: metav1.Now(), + } + + scaler.EXPECT().GetMetrics(gomock.Any(), gomock.Eq(metricName), gomock.Any()).Return([]external_metrics.ExternalMetricValue{expectedMetric}, nil) +} + +func createMetricSpec(averageValue int) v2beta2.MetricSpec { + qty := resource.NewQuantity(int64(averageValue), resource.DecimalSI) + return v2beta2.MetricSpec{ + External: &v2beta2.ExternalMetricSource{ + Target: v2beta2.MetricTarget{ + Type: v2beta2.AverageValueMetricType, + AverageValue: qty, + }, + }, + } +} diff --git a/pkg/provider/provider.go b/pkg/provider/provider.go index adf02a9f965..ab0bb020cbb 100644 --- a/pkg/provider/provider.go +++ b/pkg/provider/provider.go @@ -94,7 +94,8 @@ func (p *KedaProvider) GetExternalMetric(namespace string, metricSelector labels } // Filter only the desired metric if strings.EqualFold(metricSpec.External.Metric.Name, info.Metric) { - metrics, err := scaler.GetMetrics(context.TODO(), info.Metric, metricSelector) + metrics, err := p.getMetricsWithFallback(scaler, info.Metric, metricSelector, scaledObject, metricSpec) + if err != nil { logger.Error(err, "error getting metric for scaler", "scaledObject.Namespace", scaledObject.Namespace, "scaledObject.Name", scaledObject.Name, "scaler", scaler) } else { @@ -107,7 +108,6 @@ func (p *KedaProvider) GetExternalMetric(namespace string, metricSelector labels metricsServer.RecordHPAScalerError(namespace, scaledObject.Name, scalerName, scalerIndex, info.Metric, err) } } - scaler.Close() } diff --git a/pkg/scalers/influxdb_scaler.go b/pkg/scalers/influxdb_scaler.go index 8fce45f1443..fdaf2d77a6e 100644 --- a/pkg/scalers/influxdb_scaler.go +++ b/pkg/scalers/influxdb_scaler.go @@ -67,6 +67,8 @@ func parseInfluxDBMetadata(config *ScalerConfig) (*influxDBMetadata, error) { } else { return nil, fmt.Errorf("no auth token given") } + case config.AuthParams["authToken"] != "": + authToken = config.AuthParams["authToken"] default: return nil, fmt.Errorf("no auth token given") } @@ -81,6 +83,8 @@ func parseInfluxDBMetadata(config *ScalerConfig) (*influxDBMetadata, error) { } else { return nil, fmt.Errorf("no organization name given") } + case config.AuthParams["organizationName"] != "": + organizationName = config.AuthParams["organizationName"] default: return nil, fmt.Errorf("no organization name given") } @@ -93,6 +97,8 @@ func parseInfluxDBMetadata(config *ScalerConfig) (*influxDBMetadata, error) { if val, ok := config.TriggerMetadata["serverURL"]; ok { serverURL = val + } else if val, ok := config.AuthParams["serverURL"]; ok { + serverURL = val } else { return nil, fmt.Errorf("no server url given") } diff --git a/pkg/scalers/influxdb_scaler_test.go b/pkg/scalers/influxdb_scaler_test.go index 7aea02f5dc2..7c1b3d62dcd 100644 --- a/pkg/scalers/influxdb_scaler_test.go +++ b/pkg/scalers/influxdb_scaler_test.go @@ -12,8 +12,9 @@ var testInfluxDBResolvedEnv = map[string]string{ } type parseInfluxDBMetadataTestData struct { - metadata map[string]string - isError bool + metadata map[string]string + isError bool + authParams map[string]string } type influxDBMetricIdentifier struct { @@ -23,21 +24,24 @@ type influxDBMetricIdentifier struct { var testInfluxDBMetadata = []parseInfluxDBMetadataTestData{ // nothing passed - {map[string]string{}, true}, + {map[string]string{}, true, map[string]string{}}, // everything is passed in verbatim - {map[string]string{"serverURL": "https://influxdata.com", "metricName": "influx_metric", "organizationName": "influx_org", "query": "from(bucket: hello)", "thresholdValue": "10", "authToken": "myToken"}, false}, + {map[string]string{"serverURL": "https://influxdata.com", "metricName": "influx_metric", "organizationName": "influx_org", "query": "from(bucket: hello)", "thresholdValue": "10", "authToken": "myToken"}, false, map[string]string{}}, // everything is passed in (environment variables) - {map[string]string{"serverURL": "https://influxdata.com", "organizationNameFromEnv": "INFLUX_ORG", "query": "from(bucket: hello)", "thresholdValue": "10", "authTokenFromEnv": "INFLUX_TOKEN"}, false}, + {map[string]string{"serverURL": "https://influxdata.com", "organizationNameFromEnv": "INFLUX_ORG", "query": "from(bucket: hello)", "thresholdValue": "10", "authTokenFromEnv": "INFLUX_TOKEN"}, false, map[string]string{}}, // no serverURL passed - {map[string]string{"metricName": "influx_metric", "organizationName": "influx_org", "query": "from(bucket: hello)", "thresholdValue": "10", "authToken": "myToken"}, true}, + {map[string]string{"metricName": "influx_metric", "organizationName": "influx_org", "query": "from(bucket: hello)", "thresholdValue": "10", "authToken": "myToken"}, true, map[string]string{}}, // no organization name passed - {map[string]string{"serverURL": "https://influxdata.com", "metricName": "influx_metric", "query": "from(bucket: hello)", "thresholdValue": "10", "authToken": "myToken"}, true}, + {map[string]string{"serverURL": "https://influxdata.com", "metricName": "influx_metric", "query": "from(bucket: hello)", "thresholdValue": "10", "authToken": "myToken"}, true, map[string]string{}}, // no query passed - {map[string]string{"serverURL": "https://influxdata.com", "organizationName": "influx_org", "thresholdValue": "10", "authToken": "myToken"}, true}, + {map[string]string{"serverURL": "https://influxdata.com", "organizationName": "influx_org", "thresholdValue": "10", "authToken": "myToken"}, true, map[string]string{}}, // no threshold value passed - {map[string]string{"serverURL": "https://influxdata.com", "organizationName": "influx_org", "query": "from(bucket: hello)", "authToken": "myToken"}, true}, + {map[string]string{"serverURL": "https://influxdata.com", "organizationName": "influx_org", "query": "from(bucket: hello)", "authToken": "myToken"}, true, map[string]string{}}, // no auth token passed - {map[string]string{"serverURL": "https://influxdata.com", "organizationName": "influx_org", "query": "from(bucket: hello)", "thresholdValue": "10"}, true}} + {map[string]string{"serverURL": "https://influxdata.com", "organizationName": "influx_org", "query": "from(bucket: hello)", "thresholdValue": "10"}, true, map[string]string{}}, + // authToken, organizationName, and serverURL are defined in authParams + {map[string]string{"query": "from(bucket: hello)", "thresholdValue": "10"}, false, map[string]string{"serverURL": "https://influxdata.com", "organizationName": "influx_org", "authToken": "myToken"}}, +} var influxDBMetricIdentifiers = []influxDBMetricIdentifier{ {&testInfluxDBMetadata[1], "influxdb-influx_metric"}, @@ -47,7 +51,7 @@ var influxDBMetricIdentifiers = []influxDBMetricIdentifier{ func TestInfluxDBParseMetadata(t *testing.T) { testCaseNum := 1 for _, testData := range testInfluxDBMetadata { - _, err := parseInfluxDBMetadata(&ScalerConfig{TriggerMetadata: testData.metadata, ResolvedEnv: testInfluxDBResolvedEnv}) + _, err := parseInfluxDBMetadata(&ScalerConfig{TriggerMetadata: testData.metadata, ResolvedEnv: testInfluxDBResolvedEnv, AuthParams: testData.authParams}) if err != nil && !testData.isError { t.Errorf("Expected success but got error for unit test # %v", testCaseNum) } diff --git a/pkg/scaling/executor/scale_executor.go b/pkg/scaling/executor/scale_executor.go index b8fd1d144ed..cdb1e93c2cb 100644 --- a/pkg/scaling/executor/scale_executor.go +++ b/pkg/scaling/executor/scale_executor.go @@ -24,19 +24,19 @@ const ( // ScaleExecutor contains methods RequestJobScale and RequestScale type ScaleExecutor interface { RequestJobScale(ctx context.Context, scaledJob *kedav1alpha1.ScaledJob, isActive bool, scaleTo int64, maxScale int64) - RequestScale(ctx context.Context, scaledObject *kedav1alpha1.ScaledObject, isActive bool) + RequestScale(ctx context.Context, scaledObject *kedav1alpha1.ScaledObject, isActive bool, isError bool) } type scaleExecutor struct { client client.Client - scaleClient *scale.ScalesGetter + scaleClient scale.ScalesGetter reconcilerScheme *runtime.Scheme logger logr.Logger recorder record.EventRecorder } // NewScaleExecutor creates a ScaleExecutor object -func NewScaleExecutor(client client.Client, scaleClient *scale.ScalesGetter, reconcilerScheme *runtime.Scheme, recorder record.EventRecorder) ScaleExecutor { +func NewScaleExecutor(client client.Client, scaleClient scale.ScalesGetter, reconcilerScheme *runtime.Scheme, recorder record.EventRecorder) ScaleExecutor { return &scaleExecutor{ client: client, scaleClient: scaleClient, diff --git a/pkg/scaling/executor/scale_scaledobjects.go b/pkg/scaling/executor/scale_scaledobjects.go index c863db288a0..95634888863 100644 --- a/pkg/scaling/executor/scale_scaledobjects.go +++ b/pkg/scaling/executor/scale_scaledobjects.go @@ -16,7 +16,7 @@ import ( kedav1alpha1 "github.com/kedacore/keda/v2/api/v1alpha1" ) -func (e *scaleExecutor) RequestScale(ctx context.Context, scaledObject *kedav1alpha1.ScaledObject, isActive bool) { +func (e *scaleExecutor) RequestScale(ctx context.Context, scaledObject *kedav1alpha1.ScaledObject, isActive bool, isError bool) { logger := e.logger.WithValues("scaledobject.Name", scaledObject.Name, "scaledObject.Namespace", scaledObject.Namespace, "scaleTarget.Name", scaledObject.Spec.ScaleTargetRef.Name) @@ -59,6 +59,21 @@ func (e *scaleExecutor) RequestScale(ctx context.Context, scaledObject *kedav1al // current replica count is 0, but there is an active trigger. // scale the ScaleTarget up e.scaleFromZero(ctx, logger, scaledObject, currentScale) + case !isActive && + isError && + scaledObject.Spec.Fallback != nil && + scaledObject.Spec.Fallback.Replicas != 0: + // there are no active triggers, but a scaler responded with an error + // AND + // there is a fallback replicas count defined + + // Scale to the fallback replicas count + _, err := e.updateScaleOnScaleTarget(ctx, scaledObject, currentScale, scaledObject.Spec.Fallback.Replicas) + if err == nil { + logger.Info("Successfully set ScaleTarget replicas count to ScaledObject fallback.replicas", + "Original Replicas Count", currentReplicas, + "New Replicas Count", scaledObject.Spec.Fallback.Replicas) + } case !isActive && currentReplicas > 0 && (scaledObject.Spec.MinReplicaCount == nil || *scaledObject.Spec.MinReplicaCount == 0): @@ -178,7 +193,7 @@ func (e *scaleExecutor) scaleFromZero(ctx context.Context, logger logr.Logger, s } func (e *scaleExecutor) getScaleTargetScale(ctx context.Context, scaledObject *kedav1alpha1.ScaledObject) (*autoscalingv1.Scale, error) { - return (*e.scaleClient).Scales(scaledObject.Namespace).Get(ctx, scaledObject.Status.ScaleTargetGVKR.GroupResource(), scaledObject.Spec.ScaleTargetRef.Name, metav1.GetOptions{}) + return e.scaleClient.Scales(scaledObject.Namespace).Get(ctx, scaledObject.Status.ScaleTargetGVKR.GroupResource(), scaledObject.Spec.ScaleTargetRef.Name, metav1.GetOptions{}) } func (e *scaleExecutor) updateScaleOnScaleTarget(ctx context.Context, scaledObject *kedav1alpha1.ScaledObject, scale *autoscalingv1.Scale, replicas int32) (int32, error) { @@ -195,6 +210,6 @@ func (e *scaleExecutor) updateScaleOnScaleTarget(ctx context.Context, scaledObje currentReplicas := scale.Spec.Replicas scale.Spec.Replicas = replicas - _, err := (*e.scaleClient).Scales(scaledObject.Namespace).Update(ctx, scaledObject.Status.ScaleTargetGVKR.GroupResource(), scale, metav1.UpdateOptions{}) + _, err := e.scaleClient.Scales(scaledObject.Namespace).Update(ctx, scaledObject.Status.ScaleTargetGVKR.GroupResource(), scale, metav1.UpdateOptions{}) return currentReplicas, err } diff --git a/pkg/scaling/executor/scale_scaledobjects_test.go b/pkg/scaling/executor/scale_scaledobjects_test.go new file mode 100644 index 00000000000..5e3f7878063 --- /dev/null +++ b/pkg/scaling/executor/scale_scaledobjects_test.go @@ -0,0 +1,73 @@ +package executor + +import ( + "context" + "testing" + + "github.com/golang/mock/gomock" + "github.com/kedacore/keda/v2/api/v1alpha1" + "github.com/kedacore/keda/v2/pkg/mock/mock_client" + "github.com/kedacore/keda/v2/pkg/mock/mock_scale" + "github.com/stretchr/testify/assert" + appsv1 "k8s.io/api/apps/v1" + autoscalingv1 "k8s.io/api/autoscaling/v1" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/tools/record" +) + +func TestScaleToFallbackReplicasWhenNotActiveAndIsError(t *testing.T) { + ctrl := gomock.NewController(t) + client := mock_client.NewMockClient(ctrl) + recorder := record.NewFakeRecorder(1) + mockScaleClient := mock_scale.NewMockScalesGetter(ctrl) + mockScaleInterface := mock_scale.NewMockScaleInterface(ctrl) + statusWriter := mock_client.NewMockStatusWriter(ctrl) + + scaleExecutor := NewScaleExecutor(client, mockScaleClient, nil, recorder) + + scaledObject := v1alpha1.ScaledObject{ + ObjectMeta: v1.ObjectMeta{ + Name: "some name", + Namespace: "some namespace", + }, + Spec: v1alpha1.ScaledObjectSpec{ + ScaleTargetRef: &v1alpha1.ScaleTarget{ + Name: "some name", + }, + Fallback: &v1alpha1.Fallback{ + FailureThreshold: 3, + Replicas: 5, + }, + }, + Status: v1alpha1.ScaledObjectStatus{ + ScaleTargetGVKR: &v1alpha1.GroupVersionKindResource{ + Group: "apps", + Kind: "Deployment", + }, + }, + } + + numberOfReplicas := int32(2) + client.EXPECT().Get(gomock.Any(), gomock.Any(), gomock.Any()).SetArg(2, appsv1.Deployment{ + Spec: appsv1.DeploymentSpec{ + Replicas: &numberOfReplicas, + }, + }) + + scale := &autoscalingv1.Scale{ + Spec: autoscalingv1.ScaleSpec{ + Replicas: numberOfReplicas, + }, + } + + mockScaleClient.EXPECT().Scales(gomock.Any()).Return(mockScaleInterface).Times(2) + mockScaleInterface.EXPECT().Get(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(scale, nil) + mockScaleInterface.EXPECT().Update(gomock.Any(), gomock.Any(), gomock.Eq(scale), gomock.Any()) + + client.EXPECT().Status().Return(statusWriter) + statusWriter.EXPECT().Patch(gomock.Any(), gomock.Any(), gomock.Any()) + + scaleExecutor.RequestScale(context.TODO(), &scaledObject, false, true) + + assert.Equal(t, int32(5), scale.Spec.Replicas) +} diff --git a/pkg/scaling/scale_hander_test.go b/pkg/scaling/scale_hander_test.go deleted file mode 100644 index b3b78b421b7..00000000000 --- a/pkg/scaling/scale_hander_test.go +++ /dev/null @@ -1,51 +0,0 @@ -package scaling - -import ( - "testing" - - "github.com/stretchr/testify/assert" - "k8s.io/api/autoscaling/v2beta2" - "k8s.io/apimachinery/pkg/api/resource" -) - -func TestTargetAverageValue(t *testing.T) { - // count = 0 - specs := []v2beta2.MetricSpec{} - targetAverageValue := getTargetAverageValue(specs) - assert.Equal(t, int64(0), targetAverageValue) - // 1 1 - specs = []v2beta2.MetricSpec{ - createMetricSpec(1), - createMetricSpec(1), - } - targetAverageValue = getTargetAverageValue(specs) - assert.Equal(t, int64(1), targetAverageValue) - // 5 5 3 - specs = []v2beta2.MetricSpec{ - createMetricSpec(5), - createMetricSpec(5), - createMetricSpec(3), - } - targetAverageValue = getTargetAverageValue(specs) - assert.Equal(t, int64(4), targetAverageValue) - - // 5 5 4 - specs = []v2beta2.MetricSpec{ - createMetricSpec(5), - createMetricSpec(5), - createMetricSpec(3), - } - targetAverageValue = getTargetAverageValue(specs) - assert.Equal(t, int64(4), targetAverageValue) -} - -func createMetricSpec(averageValue int) v2beta2.MetricSpec { - qty := resource.NewQuantity(int64(averageValue), resource.DecimalSI) - return v2beta2.MetricSpec{ - External: &v2beta2.ExternalMetricSource{ - Target: v2beta2.MetricTarget{ - AverageValue: qty, - }, - }, - } -} diff --git a/pkg/scaling/scale_handler.go b/pkg/scaling/scale_handler.go index 0203fd7b3ff..5edae2bd2dc 100644 --- a/pkg/scaling/scale_handler.go +++ b/pkg/scaling/scale_handler.go @@ -51,7 +51,7 @@ type scaleHandler struct { } // NewScaleHandler creates a ScaleHandler object -func NewScaleHandler(client client.Client, scaleClient *scale.ScalesGetter, reconcilerScheme *runtime.Scheme, globalHTTPTimeout time.Duration, recorder record.EventRecorder) ScaleHandler { +func NewScaleHandler(client client.Client, scaleClient scale.ScalesGetter, reconcilerScheme *runtime.Scheme, globalHTTPTimeout time.Duration, recorder record.EventRecorder) ScaleHandler { return &scaleHandler{ client: client, logger: logf.Log.WithName("scalehandler"), @@ -190,7 +190,7 @@ func (h *scaleHandler) startPushScalers(ctx context.Context, withTriggers *kedav scalingMutex.Lock() switch obj := scalableObject.(type) { case *kedav1alpha1.ScaledObject: - h.scaleExecutor.RequestScale(ctx, obj, active) + h.scaleExecutor.RequestScale(ctx, obj, active, false) case *kedav1alpha1.ScaledJob: h.logger.Info("Warning: External Push Scaler does not support ScaledJob", "object", scalableObject) } @@ -214,7 +214,8 @@ func (h *scaleHandler) checkScalers(ctx context.Context, scalableObject interfac defer scalingMutex.Unlock() switch obj := scalableObject.(type) { case *kedav1alpha1.ScaledObject: - h.scaleExecutor.RequestScale(ctx, obj, h.checkScaledObjectScalers(ctx, scalers, obj)) + isActive, isError := h.checkScaledObjectScalers(ctx, scalers, obj) + h.scaleExecutor.RequestScale(ctx, obj, isActive, isError) case *kedav1alpha1.ScaledJob: scaledJob := scalableObject.(*kedav1alpha1.ScaledJob) isActive, scaleTo, maxScale := h.checkScaledJobScalers(ctx, scalers, scaledJob) @@ -222,29 +223,31 @@ func (h *scaleHandler) checkScalers(ctx context.Context, scalableObject interfac } } -func (h *scaleHandler) checkScaledObjectScalers(ctx context.Context, scalers []scalers.Scaler, scaledObject *kedav1alpha1.ScaledObject) bool { +func (h *scaleHandler) checkScaledObjectScalers(ctx context.Context, scalers []scalers.Scaler, scaledObject *kedav1alpha1.ScaledObject) (bool, bool) { isActive := false + isError := false for i, scaler := range scalers { isTriggerActive, err := scaler.IsActive(ctx) scaler.Close() if err != nil { h.logger.V(1).Info("Error getting scale decision", "Error", err) + isError = true h.recorder.Event(scaledObject, corev1.EventTypeWarning, eventreason.KEDAScalerFailed, err.Error()) continue } else if isTriggerActive { isActive = true - if scaler.GetMetricSpecForScaling()[0].External != nil { - h.logger.V(1).Info("Scaler for scaledObject is active", "Metrics Name", scaler.GetMetricSpecForScaling()[0].External.Metric.Name) + if externalMetricsSpec := scaler.GetMetricSpecForScaling()[0].External; externalMetricsSpec != nil { + h.logger.V(1).Info("Scaler for scaledObject is active", "Metrics Name", externalMetricsSpec.Metric.Name) } - if scaler.GetMetricSpecForScaling()[0].Resource != nil { - h.logger.V(1).Info("Scaler for scaledObject is active", "Metrics Name", scaler.GetMetricSpecForScaling()[0].Resource.Name) + if resourceMetricsSpec := scaler.GetMetricSpecForScaling()[0].Resource; resourceMetricsSpec != nil { + h.logger.V(1).Info("Scaler for scaledObject is active", "Metrics Name", resourceMetricsSpec.Name) } closeScalers(scalers[i+1:]) break } } - return isActive + return isActive, isError } func (h *scaleHandler) checkScaledJobScalers(ctx context.Context, scalers []scalers.Scaler, scaledJob *kedav1alpha1.ScaledJob) (bool, int64, int64) { diff --git a/pkg/scaling/scale_handler_test.go b/pkg/scaling/scale_handler_test.go new file mode 100644 index 00000000000..192f6e45bc0 --- /dev/null +++ b/pkg/scaling/scale_handler_test.go @@ -0,0 +1,122 @@ +package scaling + +import ( + "context" + "errors" + "sync" + "testing" + "time" + + "github.com/golang/mock/gomock" + kedav1alpha1 "github.com/kedacore/keda/v2/api/v1alpha1" + "github.com/kedacore/keda/v2/pkg/mock/mock_client" + mock_scalers "github.com/kedacore/keda/v2/pkg/mock/mock_scaler" + "github.com/kedacore/keda/v2/pkg/scalers" + "github.com/kedacore/keda/v2/pkg/scaling/executor" + "k8s.io/client-go/tools/record" + logf "sigs.k8s.io/controller-runtime/pkg/log" + + "github.com/stretchr/testify/assert" + "k8s.io/api/autoscaling/v2beta2" + "k8s.io/apimachinery/pkg/api/resource" +) + +func TestTargetAverageValue(t *testing.T) { + // count = 0 + specs := []v2beta2.MetricSpec{} + targetAverageValue := getTargetAverageValue(specs) + assert.Equal(t, int64(0), targetAverageValue) + // 1 1 + specs = []v2beta2.MetricSpec{ + createMetricSpec(1), + createMetricSpec(1), + } + targetAverageValue = getTargetAverageValue(specs) + assert.Equal(t, int64(1), targetAverageValue) + // 5 5 3 + specs = []v2beta2.MetricSpec{ + createMetricSpec(5), + createMetricSpec(5), + createMetricSpec(3), + } + targetAverageValue = getTargetAverageValue(specs) + assert.Equal(t, int64(4), targetAverageValue) + + // 5 5 4 + specs = []v2beta2.MetricSpec{ + createMetricSpec(5), + createMetricSpec(5), + createMetricSpec(3), + } + targetAverageValue = getTargetAverageValue(specs) + assert.Equal(t, int64(4), targetAverageValue) +} + +func TestCheckScaledObjectScalersWithError(t *testing.T) { + ctrl := gomock.NewController(t) + client := mock_client.NewMockClient(ctrl) + recorder := record.NewFakeRecorder(1) + + scaleHandler := &scaleHandler{ + client: client, + logger: logf.Log.WithName("scalehandler"), + scaleLoopContexts: &sync.Map{}, + scaleExecutor: executor.NewScaleExecutor(client, nil, nil, recorder), + globalHTTPTimeout: 5 * time.Second, + recorder: recorder, + } + scaler := mock_scalers.NewMockScaler(ctrl) + scalers := []scalers.Scaler{scaler} + scaledObject := &kedav1alpha1.ScaledObject{} + + scaler.EXPECT().IsActive(gomock.Any()).Return(false, errors.New("Some error")) + scaler.EXPECT().Close() + + isActive, isError := scaleHandler.checkScaledObjectScalers(context.TODO(), scalers, scaledObject) + + assert.Equal(t, false, isActive) + assert.Equal(t, true, isError) +} + +func TestCheckScaledObjectFindFirstActiveIgnoringOthers(t *testing.T) { + ctrl := gomock.NewController(t) + client := mock_client.NewMockClient(ctrl) + recorder := record.NewFakeRecorder(1) + + scaleHandler := &scaleHandler{ + client: client, + logger: logf.Log.WithName("scalehandler"), + scaleLoopContexts: &sync.Map{}, + scaleExecutor: executor.NewScaleExecutor(client, nil, nil, recorder), + globalHTTPTimeout: 5 * time.Second, + recorder: recorder, + } + + activeScaler := mock_scalers.NewMockScaler(ctrl) + failingScaler := mock_scalers.NewMockScaler(ctrl) + scalers := []scalers.Scaler{activeScaler, failingScaler} + scaledObject := &kedav1alpha1.ScaledObject{} + + metricsSpecs := []v2beta2.MetricSpec{createMetricSpec(1)} + + activeScaler.EXPECT().IsActive(gomock.Any()).Return(true, nil) + activeScaler.EXPECT().GetMetricSpecForScaling().Times(2).Return(metricsSpecs) + activeScaler.EXPECT().Close() + failingScaler.EXPECT().Close() + + isActive, isError := scaleHandler.checkScaledObjectScalers(context.TODO(), scalers, scaledObject) + + assert.Equal(t, true, isActive) + assert.Equal(t, false, isError) +} + +func createMetricSpec(averageValue int) v2beta2.MetricSpec { + qty := resource.NewQuantity(int64(averageValue), resource.DecimalSI) + return v2beta2.MetricSpec{ + External: &v2beta2.ExternalMetricSource{ + Target: v2beta2.MetricTarget{ + AverageValue: qty, + }, + }, + } +}