Skip to content

Commit

Permalink
Add support for Trigger Authentication for InfluxDB (kedacore#1904)
Browse files Browse the repository at this point in the history
Signed-off-by: misha <mishamo@gmail.com>
  • Loading branch information
acobaugh authored and mishamo committed Jun 28, 2021
1 parent 6530be7 commit 12f8576
Show file tree
Hide file tree
Showing 24 changed files with 1,309 additions and 85 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,13 @@
- Extend Azure Monitor scaler to support custom metrics ([#1883](https://github.com/kedacore/keda/pull/1883))
- Support non-public cloud environments in the Azure Storage Queue and Azure Storage Blob scalers ([#1863](https://github.com/kedacore/keda/pull/1863))
- Show HashiCorp Vault Address when using `kubectl get ta` or `kubectl get cta` ([#1862](https://github.com/kedacore/keda/pull/1862))
- Add fallback functionality ([#1872](https://github.com/kedacore/keda/issues/1872))

### Improvements

- Fix READY and ACTIVE fields of ScaledJob to show status when we run `kubectl get sj` ([#1855](https://github.com/kedacore/keda/pull/1855))
- Don't panic when HashiCorp Vault path doesn't exist ([#1864](https://github.com/kedacore/keda/pull/1864))

- Allow influxdb `authToken`, `serverURL`, and `organizationName` to be sourced from `(Cluster)TriggerAuthentication` ([#1904](https://github.com/kedacore/keda/pull/1904))
### Breaking Changes

- TODO ([#XXX](https://github.com/kedacore/keda/pull/XXX))
Expand Down
33 changes: 33 additions & 0 deletions api/v1alpha1/scaledobject_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,25 @@ type ScaledObject struct {
Status ScaledObjectStatus `json:"status,omitempty"`
}

// HealthStatus is the status for a ScaledObject's health
type HealthStatus struct {
// +optional
NumberOfFailures *int32 `json:"numberOfFailures,omitempty"`
// +optional
Status HealthStatusType `json:"status,omitempty"`
}

// HealthStatusType is an indication of whether the health status is happy or failing
type HealthStatusType string

const (
// HealthStatusHappy means the status of the health object is happy
HealthStatusHappy HealthStatusType = "Happy"

// HealthStatusFailing means the status of the health object is failing
HealthStatusFailing HealthStatusType = "Failing"
)

// ScaledObjectSpec is the spec for a ScaledObject resource
type ScaledObjectSpec struct {
ScaleTargetRef *ScaleTarget `json:"scaleTargetRef"`
Expand All @@ -44,6 +63,14 @@ type ScaledObjectSpec struct {
Advanced *AdvancedConfig `json:"advanced,omitempty"`

Triggers []ScaleTriggers `json:"triggers"`
// +optional
Fallback *Fallback `json:"fallback,omitempty"`
}

// Fallback is the spec for fallback options
type Fallback struct {
FailureThreshold int32 `json:"failureThreshold"`
Replicas int32 `json:"replicas"`
}

// AdvancedConfig specifies advance scaling options
Expand Down Expand Up @@ -79,8 +106,12 @@ type ScaleTriggers struct {
Metadata map[string]string `json:"metadata"`
// +optional
AuthenticationRef *ScaledObjectAuthRef `json:"authenticationRef,omitempty"`
// +optional
FallbackReplicas *int32 `json:"fallback,omitempty"`
}

// +k8s:openapi-gen=true

// ScaledObjectStatus is the status for a ScaledObject resource
// +optional
type ScaledObjectStatus struct {
Expand All @@ -98,6 +129,8 @@ type ScaledObjectStatus struct {
ResourceMetricNames []string `json:"resourceMetricNames,omitempty"`
// +optional
Conditions Conditions `json:"conditions,omitempty"`
// +optional
Health map[string]HealthStatus `json:"health,omitempty"`
}

// +kubebuilder:object:root=true
Expand Down
52 changes: 52 additions & 0 deletions api/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions config/crd/bases/keda.sh_scaledjobs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6760,6 +6760,9 @@ spec:
required:
- name
type: object
fallback:
format: int32
type: integer
metadata:
additionalProperties:
type: string
Expand Down
29 changes: 29 additions & 0 deletions config/crd/bases/keda.sh_scaledobjects.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,19 @@ spec:
cooldownPeriod:
format: int32
type: integer
fallback:
description: Fallback is the spec for fallback options
properties:
failureThreshold:
format: int32
type: integer
replicas:
format: int32
type: integer
required:
- failureThreshold
- replicas
type: object
maxReplicaCount:
format: int32
type: integer
Expand Down Expand Up @@ -242,6 +255,9 @@ spec:
required:
- name
type: object
fallback:
format: int32
type: integer
metadata:
additionalProperties:
type: string
Expand Down Expand Up @@ -290,6 +306,19 @@ spec:
items:
type: string
type: array
health:
additionalProperties:
description: HealthStatus is the status for a ScaledObject's health
properties:
numberOfFailures:
format: int32
type: integer
status:
description: HealthStatusType is an indication of whether the
health status is happy or failing
type: string
type: object
type: object
lastActiveTime:
format: date-time
type: string
Expand Down
15 changes: 15 additions & 0 deletions controllers/hpa.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,9 @@ func (r *ScaledObjectReconciler) getScaledObjectMetricSpecs(logger logr.Logger,
status := scaledObject.Status.DeepCopy()
status.ExternalMetricNames = externalMetricNames
status.ResourceMetricNames = resourceMetricNames

updateHealthStatus(scaledObject, externalMetricNames, status)

err = kedacontrollerutil.UpdateScaledObjectStatus(r.Client, logger, scaledObject, status)
if err != nil {
logger.Error(err, "Error updating scaledObject status with used externalMetricNames")
Expand All @@ -193,6 +196,18 @@ func (r *ScaledObjectReconciler) getScaledObjectMetricSpecs(logger logr.Logger,
return scaledObjectMetricSpecs, nil
}

func updateHealthStatus(scaledObject *kedav1alpha1.ScaledObject, externalMetricNames []string, status *kedav1alpha1.ScaledObjectStatus) {
health := scaledObject.Status.Health
newHealth := make(map[string]kedav1alpha1.HealthStatus)
for _, metricName := range externalMetricNames {
entry, exists := health[metricName]
if exists {
newHealth[metricName] = entry
}
}
status.Health = newHealth
}

// checkMinK8sVersionforHPABehavior min version (k8s v1.18) for HPA Behavior
func (r *ScaledObjectReconciler) checkMinK8sVersionforHPABehavior(logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject) {
if r.kubeVersion.MinorVersion < 18 {
Expand Down
127 changes: 127 additions & 0 deletions controllers/hpa_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
package controllers

import (
"github.com/go-logr/logr"
"github.com/golang/mock/gomock"
"github.com/kedacore/keda/v2/api/v1alpha1"
"github.com/kedacore/keda/v2/pkg/mock/mock_client"
mock_scalers "github.com/kedacore/keda/v2/pkg/mock/mock_scaler"
"github.com/kedacore/keda/v2/pkg/mock/mock_scaling"
kedascalers "github.com/kedacore/keda/v2/pkg/scalers"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"k8s.io/api/autoscaling/v2beta2"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

var _ = Describe("hpa", func() {
var (
reconciler ScaledObjectReconciler
scaleHandler *mock_scaling.MockScaleHandler
client *mock_client.MockClient
statusWriter *mock_client.MockStatusWriter
scaler *mock_scalers.MockScaler
logger logr.Logger
ctrl *gomock.Controller
)

BeforeEach(func() {
ctrl = gomock.NewController(GinkgoT())
client = mock_client.NewMockClient(ctrl)
scaleHandler = mock_scaling.NewMockScaleHandler(ctrl)
scaler = mock_scalers.NewMockScaler(ctrl)
statusWriter = mock_client.NewMockStatusWriter(ctrl)
logger = logr.DiscardLogger{}
reconciler = ScaledObjectReconciler{
Client: client,
scaleHandler: scaleHandler,
}
})

AfterEach(func() {
ctrl.Finish()
})

It("should remove deleted metric from health status", func() {
numberOfFailures := int32(87)
health := make(map[string]v1alpha1.HealthStatus)
health["another metric name"] = v1alpha1.HealthStatus{
NumberOfFailures: &numberOfFailures,
Status: v1alpha1.HealthStatusFailing,
}

scaledObject := setupTest(health, scaler, scaleHandler)

var capturedScaledObject v1alpha1.ScaledObject
client.EXPECT().Status().Return(statusWriter)
statusWriter.EXPECT().Patch(gomock.Any(), gomock.Any(), gomock.Any()).Do(func(arg interface{}, scaledObject *v1alpha1.ScaledObject, anotherArg interface{}) {
capturedScaledObject = *scaledObject
})

_, err := reconciler.getScaledObjectMetricSpecs(logger, scaledObject)

Expect(err).ToNot(HaveOccurred())
Expect(capturedScaledObject.Status.Health).To(BeEmpty())
})

It("should not remove existing metric from health status", func() {
numberOfFailures := int32(87)
health := make(map[string]v1alpha1.HealthStatus)
health["another metric name"] = v1alpha1.HealthStatus{
NumberOfFailures: &numberOfFailures,
Status: v1alpha1.HealthStatusFailing,
}

health["some metric name"] = v1alpha1.HealthStatus{
NumberOfFailures: &numberOfFailures,
Status: v1alpha1.HealthStatusFailing,
}

scaledObject := setupTest(health, scaler, scaleHandler)

var capturedScaledObject v1alpha1.ScaledObject
client.EXPECT().Status().Return(statusWriter)
statusWriter.EXPECT().Patch(gomock.Any(), gomock.Any(), gomock.Any()).Do(func(arg interface{}, scaledObject *v1alpha1.ScaledObject, anotherArg interface{}) {
capturedScaledObject = *scaledObject
})

_, err := reconciler.getScaledObjectMetricSpecs(logger, scaledObject)

expectedHealth := make(map[string]v1alpha1.HealthStatus)
expectedHealth["some metric name"] = v1alpha1.HealthStatus{
NumberOfFailures: &numberOfFailures,
Status: v1alpha1.HealthStatusFailing,
}

Expect(err).ToNot(HaveOccurred())
Expect(capturedScaledObject.Status.Health).To(HaveLen(1))
Expect(capturedScaledObject.Status.Health).To(Equal(expectedHealth))
})

})

func setupTest(health map[string]v1alpha1.HealthStatus, scaler *mock_scalers.MockScaler, scaleHandler *mock_scaling.MockScaleHandler) *v1alpha1.ScaledObject {
scaledObject := &v1alpha1.ScaledObject{
ObjectMeta: v1.ObjectMeta{
Name: "some scaled object name",
},
Status: v1alpha1.ScaledObjectStatus{
Health: health,
},
}

scalers := []kedascalers.Scaler{scaler}
metricSpec := v2beta2.MetricSpec{
External: &v2beta2.ExternalMetricSource{
Metric: v2beta2.MetricIdentifier{
Name: "some metric name",
},
},
}
metricSpecs := []v2beta2.MetricSpec{metricSpec}
scaler.EXPECT().GetMetricSpecForScaling().Return(metricSpecs)
scaler.EXPECT().Close()
scaleHandler.EXPECT().GetScalers(gomock.Eq(scaledObject)).Return(scalers, nil)

return scaledObject
}
Loading

0 comments on commit 12f8576

Please sign in to comment.