diff --git a/test/e2e_new/kafka_source_test.go b/test/e2e_new/kafka_source_test.go index b4d03253bf..f918847cb8 100644 --- a/test/e2e_new/kafka_source_test.go +++ b/test/e2e_new/kafka_source_test.go @@ -242,6 +242,22 @@ func TestKafkaSourceKedaScaling(t *testing.T) { } +func TestKafkaSourceScaledObject(t *testing.T) { + t.Parallel() + + ctx, env := global.Environment( + knative.WithKnativeNamespace(system.Namespace()), + knative.WithLoggingConfig, + knative.WithTracingConfig, + k8s.WithEventListener, + environment.WithPollTimings(5*time.Second, 4*time.Minute), + environment.Managed(t), + ) + + env.Test(ctx, t, features.KafkaSourceScaledObjectHasNoEmptyAuthRef()) + +} + func TestKafkaSourceTLSSink(t *testing.T) { t.Parallel() diff --git a/test/rekt/features/keda_scaling.go b/test/rekt/features/keda_scaling.go index df70549aa2..0020c661ee 100644 --- a/test/rekt/features/keda_scaling.go +++ b/test/rekt/features/keda_scaling.go @@ -20,6 +20,7 @@ import ( "context" "fmt" + "knative.dev/eventing-kafka-broker/control-plane/pkg/autoscaler/keda" "knative.dev/eventing-kafka-broker/control-plane/pkg/kafka" "knative.dev/eventing/test/rekt/resources/trigger" @@ -39,6 +40,7 @@ import ( "knative.dev/eventing-kafka-broker/test/rekt/resources/kafkasink" "knative.dev/eventing-kafka-broker/test/rekt/resources/kafkasource" "knative.dev/eventing-kafka-broker/test/rekt/resources/kafkatopic" + kedaclient "knative.dev/eventing-kafka-broker/third_party/pkg/client/injection/client" eventingclient "knative.dev/eventing/pkg/client/injection/client" "knative.dev/eventing/test/rekt/resources/broker" subscriptionresources "knative.dev/eventing/test/rekt/resources/subscription" @@ -51,6 +53,44 @@ import ( "knative.dev/reconciler-test/pkg/resources/service" ) +func KafkaSourceScaledObjectHasNoEmptyAuthRef() *feature.Feature { + f := feature.NewFeatureNamed("KafkaSourceScalesToZeroWithKeda") + + // we need to ensure that autoscaling is enabled for the rest of the feature to work + f.Prerequisite("Autoscaling is enabled", kafkafeatureflags.AutoscalingEnabled()) + + kafkaSource := feature.MakeRandomK8sName("kafka-source") + topic := feature.MakeRandomK8sName("topic") + kafkaSink := feature.MakeRandomK8sName("kafkaSink") + receiver := feature.MakeRandomK8sName("eventshub-receiver") + + event := cetest.FullEvent() + event.SetID(uuid.New().String()) + + f.Setup("install kafka topic", kafkatopic.Install(topic)) + f.Setup("topic is ready", kafkatopic.IsReady(topic)) + + // Binary content mode is default for Kafka Sink. + f.Setup("install kafkasink", kafkasink.Install(kafkaSink, topic, testpkg.BootstrapServersPlaintextArr)) + f.Setup("kafkasink is ready", kafkasink.IsReady(kafkaSink)) + + f.Setup("install eventshub receiver", eventshub.Install(receiver, eventshub.StartReceiver)) + + kafkaSourceOpts := []manifest.CfgFn{ + kafkasource.WithSink(service.AsDestinationRef(receiver)), + kafkasource.WithTopics([]string{topic}), + kafkasource.WithBootstrapServers(testingpkg.BootstrapServersPlaintextArr), + } + + f.Setup("install kafka source", kafkasource.Install(kafkaSource, kafkaSourceOpts...)) + f.Setup("kafka source is ready", kafkasource.IsReady(kafkaSource)) + + // after the event is sent, the source should scale down to zero replicas + f.Alpha("kafka source consumergroup scaled object").MustNot("have an authentication ref set on the trigger", verifyScaledObjectTriggerRef(getKafkaSourceCg(kafkaSource))) + + return f +} + func KafkaSourceScalesToZeroWithKeda() *feature.Feature { f := feature.NewFeatureNamed("KafkaSourceScalesToZeroWithKeda") @@ -265,3 +305,35 @@ func verifyConsumerGroupReplicas(getConsumerGroupName getCgName, expectedReplica } } } + +func verifyScaledObjectTriggerRef(getConsumerGroupName getCgName) feature.StepFn { + return func(ctx context.Context, t feature.T) { + kedaClient := kedaclient.Get(ctx) + internalsClient := consumergroupclient.Get(ctx) + ns := environment.FromContext(ctx).Namespace() + + cgName, err := getConsumerGroupName(ctx) + if err != nil { + t.Fatal(err) + } + + cg, err := internalsClient.InternalV1alpha1().ConsumerGroups(ns).Get(ctx, cgName, metav1.GetOptions{}) + if err != nil { + t.Fatal(err) + } + + so, err := kedaClient.KedaV1alpha1().ScaledObjects(ns).Get(ctx, keda.GenerateScaledObjectName(cg), metav1.GetOptions{}) + if err != nil { + t.Fatal(err) + } + + if so.Spec.Triggers != nil { + for _, trig := range so.Spec.Triggers { + if trig.AuthenticationRef != nil { + t.Fatal("trigger on scaled object should have no authentication ref but there is an authentication ref") + } + } + } + + } +}