Skip to content

Commit

Permalink
test: verify that scaledobjects have no authenticationref when it is …
Browse files Browse the repository at this point in the history
…empty (#3944)

Signed-off-by: Calum Murray <cmurray@redhat.com>
  • Loading branch information
Cali0707 authored Jun 25, 2024
1 parent a73d4e6 commit b5bb25a
Show file tree
Hide file tree
Showing 2 changed files with 88 additions and 0 deletions.
16 changes: 16 additions & 0 deletions test/e2e_new/kafka_source_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,22 @@ func TestKafkaSourceKedaScaling(t *testing.T) {

}

func TestKafkaSourceScaledObject(t *testing.T) {
t.Parallel()

ctx, env := global.Environment(
knative.WithKnativeNamespace(system.Namespace()),
knative.WithLoggingConfig,
knative.WithTracingConfig,
k8s.WithEventListener,
environment.WithPollTimings(5*time.Second, 4*time.Minute),
environment.Managed(t),
)

env.Test(ctx, t, features.KafkaSourceScaledObjectHasNoEmptyAuthRef())

}

func TestKafkaSourceTLSSink(t *testing.T) {

t.Parallel()
Expand Down
72 changes: 72 additions & 0 deletions test/rekt/features/keda_scaling.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"fmt"

"knative.dev/eventing-kafka-broker/control-plane/pkg/autoscaler/keda"
"knative.dev/eventing-kafka-broker/control-plane/pkg/kafka"

"knative.dev/eventing/test/rekt/resources/trigger"
Expand All @@ -39,6 +40,7 @@ import (
"knative.dev/eventing-kafka-broker/test/rekt/resources/kafkasink"
"knative.dev/eventing-kafka-broker/test/rekt/resources/kafkasource"
"knative.dev/eventing-kafka-broker/test/rekt/resources/kafkatopic"
kedaclient "knative.dev/eventing-kafka-broker/third_party/pkg/client/injection/client"
eventingclient "knative.dev/eventing/pkg/client/injection/client"
"knative.dev/eventing/test/rekt/resources/broker"
subscriptionresources "knative.dev/eventing/test/rekt/resources/subscription"
Expand All @@ -51,6 +53,44 @@ import (
"knative.dev/reconciler-test/pkg/resources/service"
)

func KafkaSourceScaledObjectHasNoEmptyAuthRef() *feature.Feature {
f := feature.NewFeatureNamed("KafkaSourceScalesToZeroWithKeda")

// we need to ensure that autoscaling is enabled for the rest of the feature to work
f.Prerequisite("Autoscaling is enabled", kafkafeatureflags.AutoscalingEnabled())

kafkaSource := feature.MakeRandomK8sName("kafka-source")
topic := feature.MakeRandomK8sName("topic")
kafkaSink := feature.MakeRandomK8sName("kafkaSink")
receiver := feature.MakeRandomK8sName("eventshub-receiver")

event := cetest.FullEvent()
event.SetID(uuid.New().String())

f.Setup("install kafka topic", kafkatopic.Install(topic))
f.Setup("topic is ready", kafkatopic.IsReady(topic))

// Binary content mode is default for Kafka Sink.
f.Setup("install kafkasink", kafkasink.Install(kafkaSink, topic, testpkg.BootstrapServersPlaintextArr))
f.Setup("kafkasink is ready", kafkasink.IsReady(kafkaSink))

f.Setup("install eventshub receiver", eventshub.Install(receiver, eventshub.StartReceiver))

kafkaSourceOpts := []manifest.CfgFn{
kafkasource.WithSink(service.AsDestinationRef(receiver)),
kafkasource.WithTopics([]string{topic}),
kafkasource.WithBootstrapServers(testingpkg.BootstrapServersPlaintextArr),
}

f.Setup("install kafka source", kafkasource.Install(kafkaSource, kafkaSourceOpts...))
f.Setup("kafka source is ready", kafkasource.IsReady(kafkaSource))

// after the event is sent, the source should scale down to zero replicas
f.Alpha("kafka source consumergroup scaled object").MustNot("have an authentication ref set on the trigger", verifyScaledObjectTriggerRef(getKafkaSourceCg(kafkaSource)))

return f
}

func KafkaSourceScalesToZeroWithKeda() *feature.Feature {
f := feature.NewFeatureNamed("KafkaSourceScalesToZeroWithKeda")

Expand Down Expand Up @@ -265,3 +305,35 @@ func verifyConsumerGroupReplicas(getConsumerGroupName getCgName, expectedReplica
}
}
}

func verifyScaledObjectTriggerRef(getConsumerGroupName getCgName) feature.StepFn {
return func(ctx context.Context, t feature.T) {
kedaClient := kedaclient.Get(ctx)
internalsClient := consumergroupclient.Get(ctx)
ns := environment.FromContext(ctx).Namespace()

cgName, err := getConsumerGroupName(ctx)
if err != nil {
t.Fatal(err)
}

cg, err := internalsClient.InternalV1alpha1().ConsumerGroups(ns).Get(ctx, cgName, metav1.GetOptions{})
if err != nil {
t.Fatal(err)
}

so, err := kedaClient.KedaV1alpha1().ScaledObjects(ns).Get(ctx, keda.GenerateScaledObjectName(cg), metav1.GetOptions{})
if err != nil {
t.Fatal(err)
}

if so.Spec.Triggers != nil {
for _, trig := range so.Spec.Triggers {
if trig.AuthenticationRef != nil {
t.Fatal("trigger on scaled object should have no authentication ref but there is an authentication ref")
}
}
}

}
}

0 comments on commit b5bb25a

Please sign in to comment.