diff --git a/command/builtin/builtin_test.go b/command/builtin/builtin_test.go index 3f3afbff..45a3ef2b 100644 --- a/command/builtin/builtin_test.go +++ b/command/builtin/builtin_test.go @@ -148,6 +148,156 @@ func TestDeleteAggregate(t *testing.T) { } } +func TestDeleteAggregate_CustomEvent(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + aggregateName := "foo" + aggregateID := uuid.New() + + cmd := builtin.DeleteAggregate(aggregateName, aggregateID) + + ebus := eventbus.New() + estore := eventstore.WithBus(eventstore.New(), ebus) + repo := repository.New(estore) + reg := codec.New() + builtin.RegisterCommands(reg) + + subBus := cmdbus.New(reg, ebus) + pubBus := cmdbus.New(reg, ebus) + + runErrs, err := subBus.Run(ctx) + if err != nil { + t.Fatal(err) + } + + go panicOn(runErrs) + go panicOn(builtin.MustHandle( + ctx, + subBus, + repo, + builtin.PublishEvents(ebus, nil), + builtin.DeleteEvent("foo", func(ref aggregate.Ref) event.Event { + return event.New("custom.deleted", customDeletedEvent{Foo: "foo"}, event.Aggregate(ref.ID, ref.Name, 173)).Any() + }), + )) + + awaitCtx, cancel := context.WithTimeout(ctx, 3*time.Second) + defer cancel() + + str, errs := event.Must(eventbus.Await[any](awaitCtx, ebus, "custom.deleted")) + + if err := pubBus.Dispatch(ctx, cmd.Any(), dispatch.Sync()); err != nil { + t.Fatalf("dispatch command: %v", err) + } + + // A "custom.deleted" event should be published + evt, err := streams.Await(ctx, str, errs) + if err != nil { + t.Fatalf("await event: %v", err) + } + + if evt.Name() != "custom.deleted" { + t.Fatalf("Event name should b %q; is %q", "custom.deleted", evt.Name()) + } + + data, ok := evt.Data().(customDeletedEvent) + if !ok { + t.Fatalf("Data() should return type %T; got %T", data, evt.Data()) + } + + if pick.AggregateName(evt) != aggregateName { + t.Fatalf("evt.AggregateName() should be %q; is %q", aggregateName, pick.AggregateName(evt)) + } + + if pick.AggregateID(evt) != aggregateID { + t.Fatalf("evt.AggregateID() should return %q; is %q", aggregateID, pick.AggregateID(evt)) + } + + if pick.AggregateVersion(evt) != 173 { + t.Fatalf("evt.AggregateVersion() should return 173; got %v", pick.AggregateVersion(evt)) + } + + if data.Foo != "foo" { + t.Fatalf("Foo should be %v; is %v", "foo", data.Foo) + } +} + +func TestDeleteAggregate_CustomEvent_MatchAll(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + aggregateName := "foo" + aggregateID := uuid.New() + + cmd := builtin.DeleteAggregate(aggregateName, aggregateID) + + ebus := eventbus.New() + estore := eventstore.WithBus(eventstore.New(), ebus) + repo := repository.New(estore) + reg := codec.New() + builtin.RegisterCommands(reg) + + subBus := cmdbus.New(reg, ebus) + pubBus := cmdbus.New(reg, ebus) + + runErrs, err := subBus.Run(ctx) + if err != nil { + t.Fatal(err) + } + + go panicOn(runErrs) + go panicOn(builtin.MustHandle( + ctx, + subBus, + repo, + builtin.PublishEvents(ebus, nil), + builtin.DeleteEvent("", func(ref aggregate.Ref) event.Event { + return event.New("custom.deleted", customDeletedEvent{Foo: "foo"}, event.Aggregate(ref.ID, ref.Name, 173)).Any() + }), + )) + + awaitCtx, cancel := context.WithTimeout(ctx, 3*time.Second) + defer cancel() + + str, errs := event.Must(eventbus.Await[any](awaitCtx, ebus, "custom.deleted")) + + if err := pubBus.Dispatch(ctx, cmd.Any(), dispatch.Sync()); err != nil { + t.Fatalf("dispatch command: %v", err) + } + + // A "custom.deleted" event should be published + evt, err := streams.Await(ctx, str, errs) + if err != nil { + t.Fatalf("await event: %v", err) + } + + if evt.Name() != "custom.deleted" { + t.Fatalf("Event name should b %q; is %q", "custom.deleted", evt.Name()) + } + + data, ok := evt.Data().(customDeletedEvent) + if !ok { + t.Fatalf("Data() should return type %T; got %T", data, evt.Data()) + } + + if pick.AggregateName(evt) != aggregateName { + t.Fatalf("evt.AggregateName() should be %q; is %q", aggregateName, pick.AggregateName(evt)) + } + + if pick.AggregateID(evt) != aggregateID { + t.Fatalf("evt.AggregateID() should return %q; is %q", aggregateID, pick.AggregateID(evt)) + } + + if pick.AggregateVersion(evt) != 173 { + t.Fatalf("evt.AggregateVersion() should return 173; got %v", pick.AggregateVersion(evt)) + } + + if data.Foo != "foo" { + t.Fatalf("Foo should be %v; is %v", "foo", data.Foo) + } +} + func panicOn(errs <-chan error) { for err := range errs { panic(err) @@ -174,3 +324,7 @@ func (ma *mockAggregate) ApplyEvent(evt event.Event) { func newMockEvent(a aggregate.Aggregate, foo int) event.Event { return aggregate.NextEvent[any](a, "foobar", test.FoobarEventData{A: foo}) } + +type customDeletedEvent struct { + Foo string +} diff --git a/command/builtin/handler.go b/command/builtin/handler.go index 85f6c1ae..2fb4bf2c 100644 --- a/command/builtin/handler.go +++ b/command/builtin/handler.go @@ -17,7 +17,7 @@ type HandleOption func(*handleConfig) // is non-nil, the events are also inserted into store after publishing. // // The following events are published by the handler: -// - AggregateDeleted ("goes.command.aggregate.deleted") +// - AggregateDeleted ("goes.command.aggregate.deleted") (or a user-provided event, see DeleteEvent()) func PublishEvents(bus event.Bus, store event.Store) HandleOption { return func(cfg *handleConfig) { cfg.bus = bus @@ -25,6 +25,19 @@ func PublishEvents(bus event.Bus, store event.Store) HandleOption { } } +// DeleteEvent returns a HandleOption that overrides the deletion event for the +// given aggregate. By default, when the PublishEvents() option is used, a +// "goes.command.aggregate.deleted" event is published when an aggregate is +// deleted from the event store. This option calls the provided makeEvent +// function with a reference to the deleted aggregate to override the published +// event. An empty aggregate name is a wildcard for all aggregates to allow for +// overriding the deletion event for all aggregates. +func DeleteEvent(aggregateName string, makeEvent func(aggregate.Ref) event.Event) HandleOption { + return func(cfg *handleConfig) { + cfg.deleteEvents[aggregateName] = makeEvent + } +} + // MustHandle does the same as Handle, but panic if command registration fails. func MustHandle(ctx context.Context, bus command.Bus, repo aggregate.Repository, opts ...HandleOption) <-chan error { errs, err := Handle(ctx, bus, repo, opts...) @@ -42,7 +55,7 @@ func MustHandle(ctx context.Context, bus command.Bus, repo aggregate.Repository, // The following commands are handled: // - DeleteAggregateCmd ("goes.command.aggregate.delete") func Handle(ctx context.Context, bus command.Bus, repo aggregate.Repository, opts ...HandleOption) (<-chan error, error) { - var cfg handleConfig + cfg := handleConfig{deleteEvents: make(map[string]func(aggregate.Ref) event.Of[any])} for _, opt := range opts { opt(&cfg) } @@ -58,27 +71,40 @@ func Handle(ctx context.Context, bus command.Bus, repo aggregate.Repository, opt return fmt.Errorf("fetch aggregate: %w", err) } - data := AggregateDeletedData{Version: a.AggregateVersion()} - deletedEvent := event.New(AggregateDeleted, data, event.Aggregate(id, name, 0)) - if err := repo.Delete(ctx, a); err != nil { return fmt.Errorf("delete from repository: %w", err) } - if cfg.bus != nil { - if err := cfg.bus.Publish(ctx, deletedEvent.Any()); err != nil { - return fmt.Errorf("publish %q event: %w", deletedEvent.Name(), err) - } + if cfg.bus == nil { + return nil + } + + var deletedEvent event.Event + if makeEvent, ok := cfg.deleteEvents[name]; ok { + deletedEvent = makeEvent(aggregate.Ref{ID: id, Name: name}) + } else if makeEvent, ok := cfg.deleteEvents[""]; ok { + deletedEvent = makeEvent(aggregate.Ref{ID: id, Name: name}) + } else { + deletedEvent = event.New( + AggregateDeleted, + AggregateDeletedData{Version: a.AggregateVersion()}, + event.Aggregate(id, name, 0), + ).Any() + } - if cfg.store != nil { - if err := cfg.store.Insert(ctx, deletedEvent.Any()); err != nil { - return fmt.Errorf("insert %q event into event store: %w", deletedEvent.Name(), err) - } + if err := cfg.bus.Publish(ctx, deletedEvent); err != nil { + return fmt.Errorf("publish %q event: %w", deletedEvent.Name(), err) + } + + if cfg.store != nil { + if err := cfg.store.Insert(ctx, deletedEvent); err != nil { + return fmt.Errorf("insert %q event into event store: %w", deletedEvent.Name(), err) } } return nil }) + if err != nil { return nil, fmt.Errorf("handle %q commands: %w", DeleteAggregateCmd, err) } @@ -87,6 +113,7 @@ func Handle(ctx context.Context, bus command.Bus, repo aggregate.Repository, opt } type handleConfig struct { - bus event.Bus - store event.Store + bus event.Bus + store event.Store + deleteEvents map[string]func(aggregate.Ref) event.Event }