Skip to content

Commit

Permalink
Provide CloudEvents around the management of ScaledObjects resources (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
SpiritZhou authored Jul 10, 2024
1 parent 22fd250 commit 5cbe424
Show file tree
Hide file tree
Showing 9 changed files with 127 additions and 18 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ To learn more about active deprecations, we recommend checking [GitHub Discussio
- **General**: Add --ca-dir flag to KEDA operator to specify directories with CA certificates for scalers to authenticate TLS connections (defaults to /custom/ca) ([#5860](https://github.com/kedacore/keda/issues/5860))
- **General**: Declarative parsing of scaler config ([#5037](https://github.com/kedacore/keda/issues/5037)|[#5797](https://github.com/kedacore/keda/issues/5797))
- **General**: Introduce new Splunk Scaler ([#5904](https://github.com/kedacore/keda/issues/5904))
- **General**: Provide CloudEvents around the management of ScaledObjects resources ([#3522](https://github.com/kedacore/keda/issues/3522))
- **General**: Remove deprecated Kustomize commonLabels ([#5888](https://github.com/kedacore/keda/pull/5888))
- **General**: Support for Kubernetes v1.30 ([#5828](https://github.com/kedacore/keda/issues/5828))

Expand Down
3 changes: 3 additions & 0 deletions apis/eventing/v1alpha1/cloudevent_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ const (

// ScaledObjectFailedType is for event when creating ScaledObject failed
ScaledObjectFailedType CloudEventType = "keda.scaledobject.failed.v1"

// ScaledObjectFailedType is for event when removed ScaledObject
ScaledObjectRemovedType CloudEventType = "keda.scaledobject.removed.v1"
)

var AllEventTypes = []CloudEventType{ScaledObjectFailedType, ScaledObjectReadyType}
1 change: 0 additions & 1 deletion cmd/operator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,6 @@ func main() {
if err = (&kedacontrollers.ScaledObjectReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
Recorder: eventRecorder,
ScaleClient: scaleClient,
ScaleHandler: scaledHandler,
EventEmitter: eventEmitter,
Expand Down
20 changes: 9 additions & 11 deletions controllers/keda/scaledobject_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/scale"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -70,7 +69,6 @@ import (
type ScaledObjectReconciler struct {
Client client.Client
Scheme *runtime.Scheme
Recorder record.EventRecorder
ScaleClient scale.ScalesGetter
ScaleHandler scaling.ScaleHandler
EventEmitter eventemitter.EventHandler
Expand Down Expand Up @@ -119,8 +117,8 @@ func (r *ScaledObjectReconciler) SetupWithManager(mgr ctrl.Manager, options cont
if r.Scheme == nil {
return fmt.Errorf("ScaledObjectReconciler.Scheme is not initialized")
}
if r.Recorder == nil {
return fmt.Errorf("ScaledObjectReconciler.Recorder is not initialized")
if r.EventEmitter == nil {
return fmt.Errorf("ScaledObjectReconciler.EventEmitter is not initialized")
}
// Start controller
return ctrl.NewControllerManagedBy(mgr).
Expand Down Expand Up @@ -184,7 +182,7 @@ func (r *ScaledObjectReconciler) Reconcile(ctx context.Context, req ctrl.Request
if !scaledObject.Status.Conditions.AreInitialized() {
conditions := kedav1alpha1.GetInitializedConditions()
if err := kedastatus.SetStatusConditions(ctx, r.Client, reqLogger, scaledObject, conditions); err != nil {
r.EventEmitter.Emit(scaledObject, req.NamespacedName, corev1.EventTypeWarning, eventingv1alpha1.ScaledObjectFailedType, eventreason.ScaledObjectUpdateFailed, err.Error())
r.EventEmitter.Emit(scaledObject, req.NamespacedName.Namespace, corev1.EventTypeWarning, eventingv1alpha1.ScaledObjectFailedType, eventreason.ScaledObjectUpdateFailed, err.Error())
return ctrl.Result{}, err
}
}
Expand All @@ -196,18 +194,18 @@ func (r *ScaledObjectReconciler) Reconcile(ctx context.Context, req ctrl.Request
reqLogger.Error(err, msg)
conditions.SetReadyCondition(metav1.ConditionFalse, "ScaledObjectCheckFailed", msg)
conditions.SetActiveCondition(metav1.ConditionUnknown, "UnknownState", "ScaledObject check failed")
r.EventEmitter.Emit(scaledObject, req.NamespacedName, corev1.EventTypeWarning, eventingv1alpha1.ScaledObjectFailedType, eventreason.ScaledObjectCheckFailed, msg)
r.EventEmitter.Emit(scaledObject, req.NamespacedName.Namespace, corev1.EventTypeWarning, eventingv1alpha1.ScaledObjectFailedType, eventreason.ScaledObjectCheckFailed, msg)
} else {
wasReady := conditions.GetReadyCondition()
if wasReady.IsFalse() || wasReady.IsUnknown() {
r.EventEmitter.Emit(scaledObject, req.NamespacedName, corev1.EventTypeNormal, eventingv1alpha1.ScaledObjectReadyType, eventreason.ScaledObjectReady, message.ScalerReadyMsg)
r.EventEmitter.Emit(scaledObject, req.NamespacedName.Namespace, corev1.EventTypeNormal, eventingv1alpha1.ScaledObjectReadyType, eventreason.ScaledObjectReady, message.ScalerReadyMsg)
}
reqLogger.V(1).Info(msg)
conditions.SetReadyCondition(metav1.ConditionTrue, kedav1alpha1.ScaledObjectConditionReadySuccessReason, msg)
}

if err := kedastatus.SetStatusConditions(ctx, r.Client, reqLogger, scaledObject, &conditions); err != nil {
r.EventEmitter.Emit(scaledObject, req.NamespacedName, corev1.EventTypeWarning, eventingv1alpha1.ScaledObjectFailedType, eventreason.ScaledObjectUpdateFailed, err.Error())
r.EventEmitter.Emit(scaledObject, req.NamespacedName.Namespace, corev1.EventTypeWarning, eventingv1alpha1.ScaledObjectFailedType, eventreason.ScaledObjectUpdateFailed, err.Error())
return ctrl.Result{}, err
}

Expand Down Expand Up @@ -359,7 +357,7 @@ func (r *ScaledObjectReconciler) checkTargetResourceIsScalable(ctx context.Conte
if err != nil {
msg := "Failed to parse Group, Version, Kind, Resource"
logger.Error(err, msg, "apiVersion", scaledObject.Spec.ScaleTargetRef.APIVersion, "kind", scaledObject.Spec.ScaleTargetRef.Kind)
r.Recorder.Event(scaledObject, corev1.EventTypeWarning, eventreason.ScaledObjectUpdateFailed, msg)
r.EventEmitter.Emit(scaledObject, scaledObject.Namespace, corev1.EventTypeWarning, eventingv1alpha1.ScaledObjectFailedType, eventreason.ScaledObjectUpdateFailed, err.Error())
return gvkr, err
}
gvkString := gvkr.GVKString()
Expand Down Expand Up @@ -396,12 +394,12 @@ func (r *ScaledObjectReconciler) checkTargetResourceIsScalable(ctx context.Conte
if err := r.Client.Get(ctx, client.ObjectKey{Namespace: scaledObject.Namespace, Name: scaledObject.Spec.ScaleTargetRef.Name}, unstruct); err != nil {
// resource doesn't exist
logger.Error(err, message.ScaleTargetNotFoundMsg, "resource", gvkString, "name", scaledObject.Spec.ScaleTargetRef.Name)
r.Recorder.Event(scaledObject, corev1.EventTypeWarning, eventreason.ScaledObjectCheckFailed, message.ScaleTargetNotFoundMsg)
r.EventEmitter.Emit(scaledObject, scaledObject.Namespace, corev1.EventTypeWarning, eventingv1alpha1.ScaledObjectFailedType, eventreason.ScaledObjectCheckFailed, message.ScaleTargetNotFoundMsg)
return gvkr, err
}
// resource exist but doesn't expose /scale subresource
logger.Error(errScale, message.ScaleTargetNoSubresourceMsg, "resource", gvkString, "name", scaledObject.Spec.ScaleTargetRef.Name)
r.Recorder.Event(scaledObject, corev1.EventTypeWarning, eventreason.ScaledObjectCheckFailed, message.ScaleTargetNoSubresourceMsg)
r.EventEmitter.Emit(scaledObject, scaledObject.Namespace, corev1.EventTypeWarning, eventingv1alpha1.ScaledObjectFailedType, eventreason.ScaledObjectCheckFailed, message.ScaleTargetNoSubresourceMsg)
return gvkr, errScale
}
isScalableCache.Store(gr.String(), true)
Expand Down
4 changes: 3 additions & 1 deletion controllers/keda/scaledobject_finalizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@ import (
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

eventingv1alpha1 "github.com/kedacore/keda/v2/apis/eventing/v1alpha1"
kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1"
"github.com/kedacore/keda/v2/controllers/keda/util"
"github.com/kedacore/keda/v2/pkg/common/message"
"github.com/kedacore/keda/v2/pkg/eventreason"
)

Expand Down Expand Up @@ -86,7 +88,7 @@ func (r *ScaledObjectReconciler) finalizeScaledObject(ctx context.Context, logge
}

logger.Info("Successfully finalized ScaledObject")
r.Recorder.Event(scaledObject, corev1.EventTypeNormal, eventreason.ScaledObjectDeleted, "ScaledObject was deleted")
r.EventEmitter.Emit(scaledObject, scaledObject.Namespace, corev1.EventTypeWarning, eventingv1alpha1.ScaledObjectRemovedType, eventreason.ScaledObjectDeleted, message.ScaledObjectRemoved)
return nil
}

Expand Down
1 change: 0 additions & 1 deletion controllers/keda/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,6 @@ var _ = BeforeSuite(func() {
err = (&ScaledObjectReconciler{
Client: k8sManager.GetClient(),
Scheme: k8sManager.GetScheme(),
Recorder: k8sManager.GetEventRecorderFor("keda-operator"),
ScaleHandler: scaling.NewScaleHandler(k8sManager.GetClient(), scaleClient, k8sManager.GetScheme(), time.Duration(10), k8sManager.GetEventRecorderFor("keda-operator"), nil),
ScaleClient: scaleClient,
EventEmitter: eventemitter.NewEventEmitter(k8sManager.GetClient(), k8sManager.GetEventRecorderFor("keda-operator"), "kubernetes-default", nil),
Expand Down
2 changes: 2 additions & 0 deletions pkg/common/message/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,6 @@ const (
ScaleTargetNotFoundMsg = "Target resource doesn't exist"

ScaleTargetNoSubresourceMsg = "Target resource doesn't expose /scale subresource"

ScaledObjectRemoved = "ScaledObject was deleted"
)
6 changes: 3 additions & 3 deletions pkg/eventemitter/eventemitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ type EventEmitter struct {
type EventHandler interface {
DeleteCloudEventSource(cloudEventSource *eventingv1alpha1.CloudEventSource) error
HandleCloudEventSource(ctx context.Context, cloudEventSource *eventingv1alpha1.CloudEventSource) error
Emit(object runtime.Object, namesapce types.NamespacedName, eventType string, cloudeventType eventingv1alpha1.CloudEventType, reason string, message string)
Emit(object runtime.Object, namesapce string, eventType string, cloudeventType eventingv1alpha1.CloudEventType, reason string, message string)
}

// EventDataHandler defines the behavior for different event handlers
Expand Down Expand Up @@ -325,7 +325,7 @@ func (e *EventEmitter) checkEventHandlers(ctx context.Context, cloudEventSource
}

// Emit is emitting event to both local kubernetes and custom CloudEventSource handler. After emit event to local kubernetes, event will inqueue and waitng for handler's consuming.
func (e *EventEmitter) Emit(object runtime.Object, namesapce types.NamespacedName, eventType string, cloudeventType eventingv1alpha1.CloudEventType, reason, message string) {
func (e *EventEmitter) Emit(object runtime.Object, namesapce string, eventType string, cloudeventType eventingv1alpha1.CloudEventType, reason, message string) {
e.recorder.Event(object, eventType, reason, message)

e.eventHandlersCacheLock.RLock()
Expand All @@ -337,7 +337,7 @@ func (e *EventEmitter) Emit(object runtime.Object, namesapce types.NamespacedNam
objectName, _ := meta.NewAccessor().Name(object)
objectType, _ := meta.NewAccessor().Kind(object)
eventData := eventdata.EventData{
Namespace: namesapce.Namespace,
Namespace: namesapce,
CloudEventType: cloudeventType,
ObjectName: strings.ToLower(objectName),
ObjectType: strings.ToLower(objectType),
Expand Down
107 changes: 106 additions & 1 deletion tests/internals/cloudevent_source/cloudevent_source_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ var _ = godotenv.Load("../../.env")
var (
namespace = fmt.Sprintf("%s-ns", testName)
scaledObjectName = fmt.Sprintf("%s-so", testName)
deploymentName = fmt.Sprintf("%s-d", testName)
clientName = fmt.Sprintf("%s-client", testName)
cloudeventSourceName = fmt.Sprintf("%s-ce", testName)
cloudeventSourceErrName = fmt.Sprintf("%s-ce-err", testName)
Expand All @@ -43,6 +44,7 @@ var (
type templateData struct {
TestNamespace string
ScaledObject string
DeploymentName string
ClientName string
CloudEventSourceName string
CloudeventSourceErrName string
Expand Down Expand Up @@ -210,6 +212,56 @@ spec:
excludedEventTypes:
- keda.scaledobject.failed.v1
`

deploymentTemplate = `
apiVersion: apps/v1
kind: Deployment
metadata:
name: {{.DeploymentName}}
namespace: {{.TestNamespace}}
labels:
deploy: {{.DeploymentName}}
spec:
replicas: 1
selector:
matchLabels:
pod: {{.DeploymentName}}
template:
metadata:
labels:
pod: {{.DeploymentName}}
spec:
containers:
- name: nginx
image: 'nginxinc/nginx-unprivileged'
`

scaledObjectTemplate = `
apiVersion: keda.sh/v1alpha1
kind: ScaledObject
metadata:
name: {{.ScaledObject}}
namespace: {{.TestNamespace}}
spec:
scaleTargetRef:
name: {{.DeploymentName}}
pollingInterval: 5
cooldownPeriod: 5
minReplicaCount: 1
maxReplicaCount: 10
advanced:
horizontalPodAutoscalerConfig:
behavior:
scaleDown:
stabilizationWindowSeconds: 15
triggers:
- type: cron
metadata:
timezone: Etc/UTC
start: 3 * * * *
end: 5 * * * *
desiredReplicas: '4'
`
)

func TestScaledObjectGeneral(t *testing.T) {
Expand All @@ -223,6 +275,7 @@ func TestScaledObjectGeneral(t *testing.T) {
assert.True(t, WaitForAllPodRunningInNamespace(t, kc, namespace, 5, 20), "all pods should be running")

testErrEventSourceEmitValue(t, kc, data)
testEventSourceEmitValue(t, kc, data)
testErrEventSourceExcludeValue(t, kc, data)
testErrEventSourceIncludeValue(t, kc, data)
testErrEventSourceCreation(t, kc, data)
Expand Down Expand Up @@ -258,8 +311,16 @@ func testErrEventSourceEmitValue(t *testing.T, _ *kubernetes.Clientset, data tem
foundEvents = append(foundEvents, cloudEvent)
data := map[string]string{}
err := cloudEvent.DataAs(&data)
t.Log("--- test emitting eventsource about scaledobject err---", "message", data["message"])

assert.NoError(t, err)
assert.Equal(t, data["message"], "ScaledObject doesn't have correct scaleTargetRef specification")
assert.Condition(t, func() bool {
if data["message"] == "ScaledObject doesn't have correct scaleTargetRef specification" || data["message"] == "Target resource doesn't exist" {
return true
}
return false
}, "get filtered event")

assert.Equal(t, cloudEvent.Type(), "keda.scaledobject.failed.v1")
assert.Equal(t, cloudEvent.Source(), expectedSource)
assert.Equal(t, cloudEvent.DataContentType(), "application/json")
Expand All @@ -272,6 +333,49 @@ func testErrEventSourceEmitValue(t *testing.T, _ *kubernetes.Clientset, data tem
assert.NotEmpty(t, foundEvents)
}

func testEventSourceEmitValue(t *testing.T, _ *kubernetes.Clientset, data templateData) {
t.Log("--- test emitting eventsource about scaledobject removed---")
KubectlApplyWithTemplate(t, data, "scaledObjectTemplate", scaledObjectTemplate)
KubectlApplyWithTemplate(t, data, "deploymentTemplate", deploymentTemplate)

// wait 15 seconds to ensure event propagation
time.Sleep(5 * time.Second)
KubectlDeleteWithTemplate(t, data, "scaledObjectTemplate", scaledObjectTemplate)
time.Sleep(10 * time.Second)

out, outErr, err := ExecCommandOnSpecificPod(t, clientName, namespace, fmt.Sprintf("curl -X GET %s/getCloudEvent/%s", cloudEventHTTPServiceURL, "ScaledObjectDeleted"))
assert.NotEmpty(t, out)
assert.Empty(t, outErr)
assert.NoError(t, err, "dont expect error requesting ")

cloudEvents := []cloudevents.Event{}
err = json.Unmarshal([]byte(out), &cloudEvents)

assert.NoError(t, err, "dont expect error unmarshaling the cloudEvents")
assert.Greater(t, len(cloudEvents), 0, "cloudEvents should have at least 1 item")

foundEvents := []cloudevents.Event{}

for _, cloudEvent := range cloudEvents {
if cloudEvent.Subject() == expectedSubject {
foundEvents = append(foundEvents, cloudEvent)
data := map[string]string{}
err := cloudEvent.DataAs(&data)

assert.NoError(t, err)
assert.Equal(t, data["message"], "ScaledObject was deleted")
assert.Equal(t, cloudEvent.Type(), "keda.scaledobject.removed.v1")
assert.Equal(t, cloudEvent.Source(), expectedSource)
assert.Equal(t, cloudEvent.DataContentType(), "application/json")

if lastCloudEventTime.Before(cloudEvent.Time()) {
lastCloudEventTime = cloudEvent.Time()
}
}
}
assert.NotEmpty(t, foundEvents)
}

// tests error events not emitted by
func testErrEventSourceExcludeValue(t *testing.T, _ *kubernetes.Clientset, data templateData) {
t.Log("--- test emitting eventsource about scaledobject err with exclude filter---")
Expand Down Expand Up @@ -362,6 +466,7 @@ func getTemplateData() (templateData, []Template) {
return templateData{
TestNamespace: namespace,
ScaledObject: scaledObjectName,
DeploymentName: deploymentName,
ClientName: clientName,
CloudEventSourceName: cloudeventSourceName,
CloudeventSourceErrName: cloudeventSourceErrName,
Expand Down

0 comments on commit 5cbe424

Please sign in to comment.