Skip to content

Commit

Permalink
feat(command/builtin): allow custom deletion events (closes #19)
Browse files Browse the repository at this point in the history
  • Loading branch information
bounoable committed Mar 3, 2022
1 parent 9c71568 commit df156dc
Show file tree
Hide file tree
Showing 2 changed files with 196 additions and 15 deletions.
154 changes: 154 additions & 0 deletions command/builtin/builtin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,156 @@ func TestDeleteAggregate(t *testing.T) {
}
}

func TestDeleteAggregate_CustomEvent(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

aggregateName := "foo"
aggregateID := uuid.New()

cmd := builtin.DeleteAggregate(aggregateName, aggregateID)

ebus := eventbus.New()
estore := eventstore.WithBus(eventstore.New(), ebus)
repo := repository.New(estore)
reg := codec.New()
builtin.RegisterCommands(reg)

subBus := cmdbus.New(reg, ebus)
pubBus := cmdbus.New(reg, ebus)

runErrs, err := subBus.Run(ctx)
if err != nil {
t.Fatal(err)
}

go panicOn(runErrs)
go panicOn(builtin.MustHandle(
ctx,
subBus,
repo,
builtin.PublishEvents(ebus, nil),
builtin.DeleteEvent("foo", func(ref aggregate.Ref) event.Event {
return event.New("custom.deleted", customDeletedEvent{Foo: "foo"}, event.Aggregate(ref.ID, ref.Name, 173)).Any()
}),
))

awaitCtx, cancel := context.WithTimeout(ctx, 3*time.Second)
defer cancel()

str, errs := event.Must(eventbus.Await[any](awaitCtx, ebus, "custom.deleted"))

if err := pubBus.Dispatch(ctx, cmd.Any(), dispatch.Sync()); err != nil {
t.Fatalf("dispatch command: %v", err)
}

// A "custom.deleted" event should be published
evt, err := streams.Await(ctx, str, errs)
if err != nil {
t.Fatalf("await event: %v", err)
}

if evt.Name() != "custom.deleted" {
t.Fatalf("Event name should b %q; is %q", "custom.deleted", evt.Name())
}

data, ok := evt.Data().(customDeletedEvent)
if !ok {
t.Fatalf("Data() should return type %T; got %T", data, evt.Data())
}

if pick.AggregateName(evt) != aggregateName {
t.Fatalf("evt.AggregateName() should be %q; is %q", aggregateName, pick.AggregateName(evt))
}

if pick.AggregateID(evt) != aggregateID {
t.Fatalf("evt.AggregateID() should return %q; is %q", aggregateID, pick.AggregateID(evt))
}

if pick.AggregateVersion(evt) != 173 {
t.Fatalf("evt.AggregateVersion() should return 173; got %v", pick.AggregateVersion(evt))
}

if data.Foo != "foo" {
t.Fatalf("Foo should be %v; is %v", "foo", data.Foo)
}
}

func TestDeleteAggregate_CustomEvent_MatchAll(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

aggregateName := "foo"
aggregateID := uuid.New()

cmd := builtin.DeleteAggregate(aggregateName, aggregateID)

ebus := eventbus.New()
estore := eventstore.WithBus(eventstore.New(), ebus)
repo := repository.New(estore)
reg := codec.New()
builtin.RegisterCommands(reg)

subBus := cmdbus.New(reg, ebus)
pubBus := cmdbus.New(reg, ebus)

runErrs, err := subBus.Run(ctx)
if err != nil {
t.Fatal(err)
}

go panicOn(runErrs)
go panicOn(builtin.MustHandle(
ctx,
subBus,
repo,
builtin.PublishEvents(ebus, nil),
builtin.DeleteEvent("", func(ref aggregate.Ref) event.Event {
return event.New("custom.deleted", customDeletedEvent{Foo: "foo"}, event.Aggregate(ref.ID, ref.Name, 173)).Any()
}),
))

awaitCtx, cancel := context.WithTimeout(ctx, 3*time.Second)
defer cancel()

str, errs := event.Must(eventbus.Await[any](awaitCtx, ebus, "custom.deleted"))

if err := pubBus.Dispatch(ctx, cmd.Any(), dispatch.Sync()); err != nil {
t.Fatalf("dispatch command: %v", err)
}

// A "custom.deleted" event should be published
evt, err := streams.Await(ctx, str, errs)
if err != nil {
t.Fatalf("await event: %v", err)
}

if evt.Name() != "custom.deleted" {
t.Fatalf("Event name should b %q; is %q", "custom.deleted", evt.Name())
}

data, ok := evt.Data().(customDeletedEvent)
if !ok {
t.Fatalf("Data() should return type %T; got %T", data, evt.Data())
}

if pick.AggregateName(evt) != aggregateName {
t.Fatalf("evt.AggregateName() should be %q; is %q", aggregateName, pick.AggregateName(evt))
}

if pick.AggregateID(evt) != aggregateID {
t.Fatalf("evt.AggregateID() should return %q; is %q", aggregateID, pick.AggregateID(evt))
}

if pick.AggregateVersion(evt) != 173 {
t.Fatalf("evt.AggregateVersion() should return 173; got %v", pick.AggregateVersion(evt))
}

if data.Foo != "foo" {
t.Fatalf("Foo should be %v; is %v", "foo", data.Foo)
}
}

func panicOn(errs <-chan error) {
for err := range errs {
panic(err)
Expand All @@ -174,3 +324,7 @@ func (ma *mockAggregate) ApplyEvent(evt event.Event) {
func newMockEvent(a aggregate.Aggregate, foo int) event.Event {
return aggregate.NextEvent[any](a, "foobar", test.FoobarEventData{A: foo})
}

type customDeletedEvent struct {
Foo string
}
57 changes: 42 additions & 15 deletions command/builtin/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,27 @@ type HandleOption func(*handleConfig)
// is non-nil, the events are also inserted into store after publishing.
//
// The following events are published by the handler:
// - AggregateDeleted ("goes.command.aggregate.deleted")
// - AggregateDeleted ("goes.command.aggregate.deleted") (or a user-provided event, see DeleteEvent())
func PublishEvents(bus event.Bus, store event.Store) HandleOption {
return func(cfg *handleConfig) {
cfg.bus = bus
cfg.store = store
}
}

// DeleteEvent returns a HandleOption that overrides the deletion event for the
// given aggregate. By default, when the PublishEvents() option is used, a
// "goes.command.aggregate.deleted" event is published when an aggregate is
// deleted from the event store. This option calls the provided makeEvent
// function with a reference to the deleted aggregate to override the published
// event. An empty aggregate name is a wildcard for all aggregates to allow for
// overriding the deletion event for all aggregates.
func DeleteEvent(aggregateName string, makeEvent func(aggregate.Ref) event.Event) HandleOption {
return func(cfg *handleConfig) {
cfg.deleteEvents[aggregateName] = makeEvent
}
}

// MustHandle does the same as Handle, but panic if command registration fails.
func MustHandle(ctx context.Context, bus command.Bus, repo aggregate.Repository, opts ...HandleOption) <-chan error {
errs, err := Handle(ctx, bus, repo, opts...)
Expand All @@ -42,7 +55,7 @@ func MustHandle(ctx context.Context, bus command.Bus, repo aggregate.Repository,
// The following commands are handled:
// - DeleteAggregateCmd ("goes.command.aggregate.delete")
func Handle(ctx context.Context, bus command.Bus, repo aggregate.Repository, opts ...HandleOption) (<-chan error, error) {
var cfg handleConfig
cfg := handleConfig{deleteEvents: make(map[string]func(aggregate.Ref) event.Of[any])}
for _, opt := range opts {
opt(&cfg)
}
Expand All @@ -58,27 +71,40 @@ func Handle(ctx context.Context, bus command.Bus, repo aggregate.Repository, opt
return fmt.Errorf("fetch aggregate: %w", err)
}

data := AggregateDeletedData{Version: a.AggregateVersion()}
deletedEvent := event.New(AggregateDeleted, data, event.Aggregate(id, name, 0))

if err := repo.Delete(ctx, a); err != nil {
return fmt.Errorf("delete from repository: %w", err)
}

if cfg.bus != nil {
if err := cfg.bus.Publish(ctx, deletedEvent.Any()); err != nil {
return fmt.Errorf("publish %q event: %w", deletedEvent.Name(), err)
}
if cfg.bus == nil {
return nil
}

var deletedEvent event.Event
if makeEvent, ok := cfg.deleteEvents[name]; ok {
deletedEvent = makeEvent(aggregate.Ref{ID: id, Name: name})
} else if makeEvent, ok := cfg.deleteEvents[""]; ok {
deletedEvent = makeEvent(aggregate.Ref{ID: id, Name: name})
} else {
deletedEvent = event.New(
AggregateDeleted,
AggregateDeletedData{Version: a.AggregateVersion()},
event.Aggregate(id, name, 0),
).Any()
}

if cfg.store != nil {
if err := cfg.store.Insert(ctx, deletedEvent.Any()); err != nil {
return fmt.Errorf("insert %q event into event store: %w", deletedEvent.Name(), err)
}
if err := cfg.bus.Publish(ctx, deletedEvent); err != nil {
return fmt.Errorf("publish %q event: %w", deletedEvent.Name(), err)
}

if cfg.store != nil {
if err := cfg.store.Insert(ctx, deletedEvent); err != nil {
return fmt.Errorf("insert %q event into event store: %w", deletedEvent.Name(), err)
}
}

return nil
})

if err != nil {
return nil, fmt.Errorf("handle %q commands: %w", DeleteAggregateCmd, err)
}
Expand All @@ -87,6 +113,7 @@ func Handle(ctx context.Context, bus command.Bus, repo aggregate.Repository, opt
}

type handleConfig struct {
bus event.Bus
store event.Store
bus event.Bus
store event.Store
deleteEvents map[string]func(aggregate.Ref) event.Event
}

0 comments on commit df156dc

Please sign in to comment.