diff --git a/test/rekt/features/broker/feature.go b/test/rekt/features/broker/feature.go index dea2fe5fd46..3704da705b7 100644 --- a/test/rekt/features/broker/feature.go +++ b/test/rekt/features/broker/feature.go @@ -27,6 +27,7 @@ import ( "github.com/cloudevents/sdk-go/v2/test" "github.com/google/uuid" "knative.dev/reconciler-test/pkg/environment" + "knative.dev/reconciler-test/pkg/state" duckv1 "knative.dev/eventing/pkg/apis/duck/v1" eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1" @@ -382,8 +383,20 @@ Note: the number denotes the sequence of the event that flows in this test case. */ func brokerEventTransformationForTrigger() *feature.Feature { f := feature.NewFeatureNamed("Broker event transformation for trigger") + config := BrokerEventTransformationForTriggerSetup(f) + BrokerEventTransformationForTriggerAssert(f, config) + return f +} - source := feature.MakeRandomK8sName("source") +type brokerEventTransformationConfig struct { + Broker string + Sink1 string + Sink2 string + EventToSend cloudevents.Event + TransformedEvent cloudevents.Event +} + +func BrokerEventTransformationForTriggerSetup(f *feature.Feature) brokerEventTransformationConfig { sink1 := feature.MakeRandomK8sName("sink1") sink2 := feature.MakeRandomK8sName("sink2") @@ -391,42 +404,43 @@ func brokerEventTransformationForTrigger() *feature.Feature { trigger2 := feature.MakeRandomK8sName("trigger2") // Construct original cloudevent message - eventType := "type1" - eventSource := "http://source1.com" - eventBody := `{"msg":"e2e-brokerchannel-body"}` - // Construct cloudevent message after transformation - transformedEventType := "type2" - transformedEventSource := "http://source2.com" - transformedBody := `{"msg":"transformed body"}` - - // Construct eventToSend eventToSend := cloudevents.NewEvent() - eventToSend.SetID(uuid.New().String()) - eventToSend.SetType(eventType) - eventToSend.SetSource(eventSource) - eventToSend.SetData(cloudevents.ApplicationJSON, []byte(eventBody)) + eventToSend.SetType("type1") + eventToSend.SetSource("http://source1.com") + eventToSend.SetData(cloudevents.ApplicationJSON, []byte(`{"msg":"e2e-brokerchannel-body"}`)) + + // Construct cloudevent message after transformation + transformedEvent := cloudevents.NewEvent() + transformedEvent.SetType("type2") + transformedEvent.SetSource("http://source2.com") + transformedEvent.SetData(cloudevents.ApplicationJSON, []byte(`{"msg":"transformed body"}`)) //Install the broker brokerName := feature.MakeRandomK8sName("broker") + f.Setup("Set context variables", func(ctx context.Context, t feature.T) { + state.SetOrFail(ctx, t, "brokerName", brokerName) + state.SetOrFail(ctx, t, "sink1", sink1) + state.SetOrFail(ctx, t, "sink2", sink2) + }) f.Setup("install broker", broker.Install(brokerName, broker.WithEnvConfig()...)) f.Setup("broker is ready", broker.IsReady(brokerName)) f.Setup("broker is addressable", broker.IsAddressable(brokerName)) f.Setup("install sink1", eventshub.Install(sink1, - eventshub.ReplyWithTransformedEvent(transformedEventType, transformedEventSource, transformedBody), + eventshub.ReplyWithTransformedEvent(transformedEvent.Type(), transformedEvent.Source(), string(transformedEvent.Data())), eventshub.StartReceiver), ) f.Setup("install sink2", eventshub.Install(sink2, eventshub.StartReceiver)) // filter1 filters the original events filter1 := eventingv1.TriggerFilterAttributes{ - "type": eventType, - "source": eventSource, + "type": eventToSend.Type(), + "source": eventToSend.Source(), } // filter2 filters events after transformation filter2 := eventingv1.TriggerFilterAttributes{ - "type": transformedEventType, - "source": transformedEventSource, + "type": transformedEvent.Type(), + "source": transformedEvent.Source(), } // Install the trigger1 point to Broker and transform the original events to new events @@ -446,33 +460,49 @@ func brokerEventTransformationForTrigger() *feature.Feature { )) f.Setup("trigger2 goes ready", trigger.IsReady(trigger2)) + return brokerEventTransformationConfig{ + Broker: brokerName, + Sink1: sink1, + Sink2: sink2, + EventToSend: eventToSend, + TransformedEvent: transformedEvent, + } +} + +func BrokerEventTransformationForTriggerAssert(f *feature.Feature, + cfg brokerEventTransformationConfig) { + + source := feature.MakeRandomK8sName("source") + + // Set new ID every time we send event to allow calling this function repeatedly + cfg.EventToSend.SetID(uuid.New().String()) f.Requirement("install source", eventshub.Install( source, - eventshub.StartSenderToResource(broker.GVR(), brokerName), - eventshub.InputEvent(eventToSend), + eventshub.StartSenderToResource(broker.GVR(), cfg.Broker), + eventshub.InputEvent(cfg.EventToSend), )) eventMatcher := eventasssert.MatchEvent( - test.HasSource(eventSource), - test.HasType(eventType), - test.HasData([]byte(eventBody)), + test.HasId(cfg.EventToSend.ID()), + test.HasSource(cfg.EventToSend.Source()), + test.HasType(cfg.EventToSend.Type()), + test.HasData(cfg.EventToSend.Data()), ) transformEventMatcher := eventasssert.MatchEvent( - test.HasSource(transformedEventSource), - test.HasType(transformedEventType), - test.HasData([]byte(transformedBody)), + test.HasSource(cfg.TransformedEvent.Source()), + test.HasType(cfg.TransformedEvent.Type()), + test.HasData(cfg.TransformedEvent.Data()), ) - f.Stable("Trigger2 has filtered all transformed events"). - Must("delivers original events", - eventasssert.OnStore(sink2).Match(transformEventMatcher).AtLeast(1)) - - f.Stable("Trigger2 has no original events"). - Must("delivers original events", - eventasssert.OnStore(sink2).Match(eventMatcher).Not()) - - return f - + f.Stable("Trigger has filtered all transformed events"). + Must("trigger 1 delivers original events", + eventasssert.OnStore(cfg.Sink1).Match(eventMatcher).AtLeast(1)). + Must("trigger 1 does not deliver transformed events", + eventasssert.OnStore(cfg.Sink1).Match(transformEventMatcher).Not()). + Must("trigger 2 delivers transformed events", + eventasssert.OnStore(cfg.Sink2).Match(transformEventMatcher).AtLeast(1)). + Must("trigger 2 does not deliver original events", + eventasssert.OnStore(cfg.Sink2).Match(eventMatcher).Not()) } func BrokerPreferHeaderCheck() *feature.Feature { diff --git a/test/upgrade/upgrade.go b/test/upgrade/upgrade.go index 6d00a4f6c68..cc9b6bf29f0 100644 --- a/test/upgrade/upgrade.go +++ b/test/upgrade/upgrade.go @@ -23,7 +23,10 @@ import ( "sync" "testing" + "knative.dev/eventing/pkg/apis/eventing" + brokerfeatures "knative.dev/eventing/test/rekt/features/broker" "knative.dev/eventing/test/rekt/features/channel" + brokerresources "knative.dev/eventing/test/rekt/resources/broker" "knative.dev/eventing/test/rekt/resources/channel_impl" "knative.dev/eventing/test/rekt/resources/subscription" duckv1 "knative.dev/pkg/apis/duck/v1" @@ -37,7 +40,16 @@ import ( "knative.dev/reconciler-test/pkg/manifest" ) -var channelConfigMux = &sync.Mutex{} +var ( + channelConfigMux = &sync.Mutex{} + brokerConfigMux = &sync.Mutex{} + opts = []environment.EnvOpts{ + knative.WithKnativeNamespace(system.Namespace()), + knative.WithLoggingConfig, + knative.WithTracingConfig, + k8s.WithEventListener, + } +) // RunMainTest expects flags to be already initialized. // This function needs to be exposed, so that test cases in other repositories can call the upgrade @@ -63,7 +75,7 @@ type DurableFeature struct { EnvOpts []environment.EnvOpts setupEnv environment.Environment setupCtx context.Context - VerifyF *feature.Feature + VerifyF func() *feature.Feature Global environment.GlobalEnvironment } @@ -83,14 +95,14 @@ func (fe *DurableFeature) Setup(label string) pkgupgrade.Operation { func (fe *DurableFeature) Verify(label string) pkgupgrade.Operation { return pkgupgrade.NewOperation(label, func(c pkgupgrade.Context) { c.T.Parallel() - fe.setupEnv.Test(fe.setupCtx, c.T, fe.VerifyF) + fe.setupEnv.Test(fe.setupCtx, c.T, fe.VerifyF()) }) } func (fe *DurableFeature) VerifyAndTeardown(label string) pkgupgrade.Operation { return pkgupgrade.NewOperation(label, func(c pkgupgrade.Context) { c.T.Parallel() - fe.setupEnv.Test(fe.setupCtx, c.T, fe.VerifyF) + fe.setupEnv.Test(fe.setupCtx, c.T, fe.VerifyF()) // Ensures teardown of resources/namespace. fe.setupEnv.Finish() }) @@ -103,7 +115,7 @@ func (fe *DurableFeature) SetupVerifyAndTeardown(label string) pkgupgrade.Operat append(fe.EnvOpts, environment.Managed(c.T))..., ) env.Test(ctx, c.T, fe.SetupF) - env.Test(ctx, c.T, fe.VerifyF) + env.Test(ctx, c.T, fe.VerifyF()) }) } @@ -290,14 +302,29 @@ func InMemoryChannelFeature(glob environment.GlobalEnvironment) *DurableFeature setupF := feature.NewFeature() sink, ch := channel.ChannelChainSetup(setupF, 1, createSubscriberFn) - verifyF := feature.NewFeature() - channel.ChannelChainAssert(verifyF, sink, ch) + verifyF := func() *feature.Feature { + f := feature.NewFeatureNamed(setupF.Name) + channel.ChannelChainAssert(f, sink, ch) + return f + } - opts := []environment.EnvOpts{ - knative.WithKnativeNamespace(system.Namespace()), - knative.WithLoggingConfig, - knative.WithTracingConfig, - k8s.WithEventListener, + return &DurableFeature{SetupF: setupF, VerifyF: verifyF, Global: glob, EnvOpts: opts} +} + +func BrokerEventTransformationForTrigger(glob environment.GlobalEnvironment, +) *DurableFeature { + // Prevent race conditions on EnvCfg.BrokerClass when running tests in parallel. + brokerConfigMux.Lock() + defer brokerConfigMux.Unlock() + brokerresources.EnvCfg.BrokerClass = eventing.MTChannelBrokerClassValue + + setupF := feature.NewFeature() + cfg := brokerfeatures.BrokerEventTransformationForTriggerSetup(setupF) + + verifyF := func() *feature.Feature { + f := feature.NewFeatureNamed(setupF.Name) + brokerfeatures.BrokerEventTransformationForTriggerAssert(f, cfg) + return f } return &DurableFeature{SetupF: setupF, VerifyF: verifyF, Global: glob, EnvOpts: opts} diff --git a/test/upgrade/upgrade_test.go b/test/upgrade/upgrade_test.go index 2c38f02109d..a74463b0174 100644 --- a/test/upgrade/upgrade_test.go +++ b/test/upgrade/upgrade_test.go @@ -52,12 +52,16 @@ func TestEventingUpgrades(t *testing.T) { g := FeatureGroupWithUpgradeTests{ // A feature that will run the same test post-upgrade and post-downgrade. NewFeatureSmoke(InMemoryChannelFeature(global)), + NewFeatureSmoke(BrokerEventTransformationForTrigger(global)), // A feature that will be created pre-upgrade and verified/removed post-upgrade. NewFeatureOnlyUpgrade(InMemoryChannelFeature(global)), + NewFeatureOnlyUpgrade(BrokerEventTransformationForTrigger(global)), // A feature that will be created pre-upgrade, verified post-upgrade, verified and removed post-downgrade. NewFeatureUpgradeDowngrade(InMemoryChannelFeature(global)), + NewFeatureUpgradeDowngrade(BrokerEventTransformationForTrigger(global)), // A feature that will be created post-upgrade, verified and removed post-downgrade. NewFeatureOnlyDowngrade(InMemoryChannelFeature(global)), + NewFeatureOnlyDowngrade(BrokerEventTransformationForTrigger(global)), } suite := pkgupgrade.Suite{