Skip to content

Commit

Permalink
Merge pull request #278 from maxekman/117/metadata-for-events
Browse files Browse the repository at this point in the history
117 / Metadata for events
  • Loading branch information
maxekman authored Dec 22, 2020
2 parents 2c564ad + 5a9952b commit bad92fb
Show file tree
Hide file tree
Showing 21 changed files with 382 additions and 279 deletions.
11 changes: 7 additions & 4 deletions aggregatestore/events/aggregatebase.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,10 +99,13 @@ func (a *AggregateBase) Events() []eh.Event {
}

// AppendEvent appends an event for later retrieval by Events().
func (a *AggregateBase) AppendEvent(t eh.EventType, data eh.EventData, timestamp time.Time) eh.Event {
e := eh.NewEventForAggregate(t, data, timestamp,
a.AggregateType(), a.EntityID(),
a.Version()+len(a.events)+1)
func (a *AggregateBase) AppendEvent(t eh.EventType, data eh.EventData, timestamp time.Time, options ...eh.EventOption) eh.Event {
options = append(options, eh.ForAggregate(
a.AggregateType(),
a.EntityID(),
a.Version()+len(a.events)+1),
)
e := eh.NewEvent(t, data, timestamp, options...)
a.events = append(a.events, e)
return e
}
6 changes: 6 additions & 0 deletions command.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,12 @@ func (ct CommandType) String() string {
return string(ct)
}

// CommandIDer provides a unique command ID to be used for request tracking etc.
type CommandIDer interface {
// CommandID returns the ID of the command instance being handled.
CommandID() uuid.UUID
}

// ErrCommandNotRegistered is when no command factory was registered.
var ErrCommandNotRegistered = errors.New("command not registered")

Expand Down
75 changes: 64 additions & 11 deletions event.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ type Event interface {
// Version of the aggregate for this event (after it has been applied).
Version() int

// Metadata is app-specific metadata such as request ID, originating user etc.
Metadata() map[string]interface{}

// A string representation of the event.
String() string
}
Expand All @@ -63,27 +66,71 @@ func (et EventType) String() string {
// EventData is any additional data for an event.
type EventData interface{}

// EventOption is an option to use when creating events.
type EventOption func(Event)

// ForAggregate adds aggregate data when creating an event.
func ForAggregate(aggregateType AggregateType, aggregateID uuid.UUID, version int) EventOption {
return func(e Event) {
if evt, ok := e.(*event); ok {
evt.aggregateType = aggregateType
evt.aggregateID = aggregateID
evt.version = version
}
}
}

// WithMetadata adds metadata when creating an event.
// Note that the values types must be supprted by the event marshalers in use.
func WithMetadata(metadata map[string]interface{}) EventOption {
return func(e Event) {
if evt, ok := e.(*event); ok {
if evt.metadata == nil {
evt.metadata = metadata
} else {
for k, v := range metadata {
evt.metadata[k] = v
}
}
}
}
}

// FromCommand adds metadat for the originating command when crating an event.
// Currently it adds the command type and optionally a command ID (if the
// CommandIDer interface is implemented).
func FromCommand(cmd Command) EventOption {
md := map[string]interface{}{
"command_type": cmd.CommandType().String(),
}
if c, ok := cmd.(CommandIDer); ok {
md["command_id"] = c.CommandID().String()
}
return WithMetadata(md)
}

// NewEvent creates a new event with a type and data, setting its timestamp.
func NewEvent(eventType EventType, data EventData, timestamp time.Time) Event {
return event{
func NewEvent(eventType EventType, data EventData, timestamp time.Time, options ...EventOption) Event {
e := &event{
eventType: eventType,
data: data,
timestamp: timestamp,
}
for _, option := range options {
if option != nil {
option(e)
}
}
return e
}

// NewEventForAggregate creates a new event with a type and data, setting its
// timestamp. It also sets the aggregate data on it.
// DEPRECATED, use NewEvent() with the WithAggregate() option instead.
func NewEventForAggregate(eventType EventType, data EventData, timestamp time.Time,
aggregateType AggregateType, aggregateID uuid.UUID, version int) Event {
return event{
eventType: eventType,
data: data,
timestamp: timestamp,
aggregateType: aggregateType,
aggregateID: aggregateID,
version: version,
}
aggregateType AggregateType, aggregateID uuid.UUID, version int, options ...EventOption) Event {
options = append(options, ForAggregate(aggregateType, aggregateID, version))
return NewEvent(eventType, data, timestamp, options...)
}

// event is an internal representation of an event, returned when the aggregate
Expand All @@ -96,6 +143,7 @@ type event struct {
aggregateType AggregateType
aggregateID uuid.UUID
version int
metadata map[string]interface{}
}

// EventType implements the EventType method of the Event interface.
Expand Down Expand Up @@ -128,6 +176,11 @@ func (e event) Version() int {
return e.version
}

// Metadata implements the Metadata method of the Event interface.
func (e event) Metadata() map[string]interface{} {
return e.metadata
}

// String implements the String method of the Event interface.
func (e event) String() string {
return fmt.Sprintf("%s@%d", e.eventType, e.version)
Expand Down
35 changes: 33 additions & 2 deletions event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,16 @@ func TestNewEvent(t *testing.T) {
}

id := uuid.New()
event = NewEventForAggregate(TestEventType, &TestEventData{"event1"}, timestamp,
TestAggregateType, id, 3)
cmd := TestCommandID{
TestID: id,
Content: "content",
CmdID: uuid.New(),
}
event = NewEvent(TestEventType, &TestEventData{"event1"}, timestamp,
ForAggregate(TestAggregateType, id, 3),
FromCommand(cmd),
WithMetadata(map[string]interface{}{"meta": "data", "num": 42}),
)
if event.EventType() != TestEventType {
t.Error("the event type should be correct:", event.EventType())
}
Expand All @@ -62,6 +70,14 @@ func TestNewEvent(t *testing.T) {
if event.Version() != 3 {
t.Error("the version should be zero:", event.Version())
}
if !reflect.DeepEqual(event.Metadata(), map[string]interface{}{
"meta": "data",
"num": 42,
"command_type": cmd.CommandType().String(),
"command_id": cmd.CmdID.String(),
}) {
t.Error("the metadata should be correct:", event.Metadata())
}
if event.String() != "TestEvent@3" {
t.Error("the string representation should be correct:", event.String())
}
Expand Down Expand Up @@ -155,3 +171,18 @@ type TestEventRegisterEmptyData struct{}
type TestEventRegisterTwiceData struct{}

type TestEventUnregisterTwiceData struct{}

type TestCommandID struct {
TestID uuid.UUID
Content string
CmdID uuid.UUID
}

var _ = Command(TestCommandID{})

func (t TestCommandID) AggregateID() uuid.UUID { return t.TestID }
func (t TestCommandID) AggregateType() AggregateType { return TestAggregateType }
func (t TestCommandID) CommandType() CommandType {
return CommandType("TestCommandID")
}
func (t TestCommandID) CommandID() uuid.UUID { return t.CmdID }
22 changes: 12 additions & 10 deletions eventbus/acceptance_testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,10 @@ func AcceptanceTest(t *testing.T, bus1, bus2 eh.EventBus, timeout time.Duration)
// Without handler.
id := uuid.New()
timestamp := time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC)
event1 := eh.NewEventForAggregate(mocks.EventType, &mocks.EventData{Content: "event1"}, timestamp,
mocks.AggregateType, id, 1)
event1 := eh.NewEvent(mocks.EventType, &mocks.EventData{Content: "event1"}, timestamp,
eh.ForAggregate(mocks.AggregateType, id, 1),
eh.WithMetadata(map[string]interface{}{"meta": "data", "num": int32(42)}),
)
if err := bus1.HandleEvent(ctx, event1); err != nil {
t.Error("there should be no error:", err)
}
Expand All @@ -80,8 +82,8 @@ func AcceptanceTest(t *testing.T, bus1, bus2 eh.EventBus, timeout time.Duration)
// Event without data (tested in its own handler).
otherHandler := mocks.NewEventHandler("other-handler")
bus1.AddHandler(ctx, eh.MatchEvents{mocks.EventOtherType}, otherHandler)
eventWithoutData := eh.NewEventForAggregate(mocks.EventOtherType, nil, timestamp,
mocks.AggregateType, uuid.New(), 1)
eventWithoutData := eh.NewEvent(mocks.EventOtherType, nil, timestamp,
eh.ForAggregate(mocks.AggregateType, uuid.New(), 1))
if err := bus1.HandleEvent(ctx, eventWithoutData); err != nil {
t.Error("there should be no error:", err)
}
Expand Down Expand Up @@ -232,9 +234,9 @@ func LoadTest(t *testing.T, bus eh.EventBus) {
id := uuid.New()
timestamp := time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC)

event1 := eh.NewEventForAggregate(
mocks.EventType, &mocks.EventData{Content: "event1"},
timestamp, mocks.AggregateType, id, 1)
event1 := eh.NewEvent(
mocks.EventType, &mocks.EventData{Content: "event1"}, timestamp,
eh.ForAggregate(mocks.AggregateType, id, 1))
if err := bus.HandleEvent(ctx, event1); err != nil {
t.Error("there should be no error:", err)
}
Expand All @@ -257,9 +259,9 @@ func Benchmark(b *testing.B, bus eh.EventBus) {
b.ResetTimer()

for n := 0; n < b.N; n++ {
event1 := eh.NewEventForAggregate(
mocks.EventType, &mocks.EventData{Content: "event1"},
timestamp, mocks.AggregateType, id, n+1)
event1 := eh.NewEvent(
mocks.EventType, &mocks.EventData{Content: "event1"}, timestamp,
eh.ForAggregate(mocks.AggregateType, id, n+1))
if err := bus.HandleEvent(ctx, event1); err != nil {
b.Error("there should be no error:", err)
}
Expand Down
63 changes: 17 additions & 46 deletions eventbus/gcp/eventbus.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ func (b *EventBus) HandleEvent(ctx context.Context, event eh.Event) error {
EventType: event.EventType(),
Version: event.Version(),
Timestamp: event.Timestamp(),
Metadata: event.Metadata(),
Context: eh.MarshalContext(ctx),
}

Expand Down Expand Up @@ -260,8 +261,22 @@ func (b *EventBus) handler(m eh.EventMatcher, h eh.EventHandler) func(ctx contex
e.RawData = nil
}

event := event{evt: e}
ctx = eh.UnmarshalContext(ctx, e.Context)
aggregateID, err := uuid.Parse(e.AggregateID)
if err != nil {
aggregateID = uuid.Nil
}
event := eh.NewEvent(
e.EventType,
e.data,
e.Timestamp,
eh.ForAggregate(
e.AggregateType,
aggregateID,
e.Version,
),
eh.WithMetadata(e.Metadata),
)

// Ignore non-matching events.
if !m.Match(event) {
Expand Down Expand Up @@ -327,50 +342,6 @@ type evt struct {
AggregateType eh.AggregateType `bson:"aggregate_type"`
AggregateID string `bson:"_id"`
Version int `bson:"version"`
Metadata map[string]interface{} `bson:"metadata"`
Context map[string]interface{} `bson:"context"`
}

// event is the private implementation of the eventhorizon.Event interface
// for a MongoDB event store.
type event struct {
evt
}

// EventType implements the EventType method of the eventhorizon.Event interface.
func (e event) EventType() eh.EventType {
return e.evt.EventType
}

// Data implements the Data method of the eventhorizon.Event interface.
func (e event) Data() eh.EventData {
return e.evt.data
}

// Timestamp implements the Timestamp method of the eventhorizon.Event interface.
func (e event) Timestamp() time.Time {
return e.evt.Timestamp
}

// AggregateType implements the AggregateType method of the eventhorizon.Event interface.
func (e event) AggregateType() eh.AggregateType {
return e.evt.AggregateType
}

// AggrgateID implements the AggrgateID method of the eventhorizon.Event interface.
func (e event) AggregateID() uuid.UUID {
id, err := uuid.Parse(e.evt.AggregateID)
if err != nil {
return uuid.Nil
}
return id
}

// Version implements the Version method of the eventhorizon.Event interface.
func (e event) Version() int {
return e.evt.Version
}

// String implements the String method of the eventhorizon.Event interface.
func (e event) String() string {
return fmt.Sprintf("%s@%d", e.evt.EventType, e.evt.Version)
}
11 changes: 7 additions & 4 deletions eventbus/local/eventbus.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,13 +166,16 @@ func (g *Group) publish(ctx context.Context, event eh.Event) error {
}
copier.Copy(data, event.Data())
}
eventCopy := eh.NewEventForAggregate(
eventCopy := eh.NewEvent(
event.EventType(),
data,
event.Timestamp(),
event.AggregateType(),
event.AggregateID(),
event.Version(),
eh.ForAggregate(
event.AggregateType(),
event.AggregateID(),
event.Version(),
),
eh.WithMetadata(event.Metadata()),
)
// Marshal and unmarshal the context to both simulate only sending data
// that would be sent over a network bus and also break any relationship
Expand Down
Loading

0 comments on commit bad92fb

Please sign in to comment.