Skip to content

Commit

Permalink
Add optional metadata to events
Browse files Browse the repository at this point in the history
  • Loading branch information
maxekman committed Dec 14, 2020
1 parent 9027504 commit 438b101
Show file tree
Hide file tree
Showing 9 changed files with 54 additions and 11 deletions.
19 changes: 19 additions & 0 deletions event.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ type Event interface {
// Version of the aggregate for this event (after it has been applied).
Version() int

// Metadata is app-specific metadata such as request ID, originating user etc.
Metadata() map[string]interface{}

// A string representation of the event.
String() string
}
Expand Down Expand Up @@ -77,6 +80,16 @@ func ForAggregate(aggregateType AggregateType, aggregateID uuid.UUID, version in
}
}

// WithMetadata adds metadata when creating an event.
// Note that the values types must be supprted by the event marshalers in use.
func WithMetadata(metadata map[string]interface{}) EventOption {
return func(e Event) {
if evt, ok := e.(*event); ok {
evt.metadata = metadata
}
}
}

// NewEvent creates a new event with a type and data, setting its timestamp.
func NewEvent(eventType EventType, data EventData, timestamp time.Time, options ...EventOption) Event {
e := &event{
Expand Down Expand Up @@ -109,6 +122,7 @@ type event struct {
aggregateType AggregateType
aggregateID uuid.UUID
version int
metadata map[string]interface{}
}

// EventType implements the EventType method of the Event interface.
Expand Down Expand Up @@ -141,6 +155,11 @@ func (e event) Version() int {
return e.version
}

// Metadata implements the Metadata method of the Event interface.
func (e event) Metadata() map[string]interface{} {
return e.metadata
}

// String implements the String method of the Event interface.
func (e event) String() string {
return fmt.Sprintf("%s@%d", e.eventType, e.version)
Expand Down
7 changes: 6 additions & 1 deletion event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@ func TestNewEvent(t *testing.T) {

id := uuid.New()
event = NewEvent(TestEventType, &TestEventData{"event1"}, timestamp,
ForAggregate(TestAggregateType, id, 3))
ForAggregate(TestAggregateType, id, 3),
WithMetadata(map[string]interface{}{"meta": "data", "num": 42}),
)
if event.EventType() != TestEventType {
t.Error("the event type should be correct:", event.EventType())
}
Expand All @@ -62,6 +64,9 @@ func TestNewEvent(t *testing.T) {
if event.Version() != 3 {
t.Error("the version should be zero:", event.Version())
}
if !reflect.DeepEqual(event.Metadata(), map[string]interface{}{"meta": "data", "num": 42}) {
t.Error("the metadata should be correct:", event.Metadata())
}
if event.String() != "TestEvent@3" {
t.Error("the string representation should be correct:", event.String())
}
Expand Down
4 changes: 3 additions & 1 deletion eventbus/acceptance_testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,9 @@ func AcceptanceTest(t *testing.T, bus1, bus2 eh.EventBus, timeout time.Duration)
id := uuid.New()
timestamp := time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC)
event1 := eh.NewEvent(mocks.EventType, &mocks.EventData{Content: "event1"}, timestamp,
eh.ForAggregate(mocks.AggregateType, id, 1))
eh.ForAggregate(mocks.AggregateType, id, 1),
eh.WithMetadata(map[string]interface{}{"meta": "data", "num": int32(42)}),
)
if err := bus1.HandleEvent(ctx, event1); err != nil {
t.Error("there should be no error:", err)
}
Expand Down
3 changes: 3 additions & 0 deletions eventbus/gcp/eventbus.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ func (b *EventBus) HandleEvent(ctx context.Context, event eh.Event) error {
EventType: event.EventType(),
Version: event.Version(),
Timestamp: event.Timestamp(),
Metadata: event.Metadata(),
Context: eh.MarshalContext(ctx),
}

Expand Down Expand Up @@ -276,6 +277,7 @@ func (b *EventBus) handler(m eh.EventMatcher, h eh.EventHandler) func(ctx contex
aggregateID,
e.Version,
),
eh.WithMetadata(e.Metadata),
)

// Ignore non-matching events.
Expand Down Expand Up @@ -342,5 +344,6 @@ type evt struct {
AggregateType eh.AggregateType `bson:"aggregate_type"`
AggregateID string `bson:"_id"`
Version int `bson:"version"`
Metadata map[string]interface{} `bson:"metadata"`
Context map[string]interface{} `bson:"context"`
}
1 change: 1 addition & 0 deletions eventbus/local/eventbus.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ func (g *Group) publish(ctx context.Context, event eh.Event) error {
event.AggregateID(),
event.Version(),
),
eh.WithMetadata(event.Metadata()),
)
// Marshal and unmarshal the context to both simulate only sending data
// that would be sent over a network bus and also break any relationship
Expand Down
6 changes: 4 additions & 2 deletions eventstore/acceptanece_testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,11 @@ func AcceptanceTest(t *testing.T, ctx context.Context, store eh.EventStore) []eh
t.Error("there should be a ErrIncerrectEventVersion error:", err)
}

// Save event, version 2.
// Save event, version 2, with metadata.
event2 := eh.NewEvent(mocks.EventType, &mocks.EventData{Content: "event2"}, timestamp,
eh.ForAggregate(mocks.AggregateType, id, 2))
eh.ForAggregate(mocks.AggregateType, id, 2),
eh.WithMetadata(map[string]interface{}{"meta": "data", "num": int32(42)}),
)
err = store.Save(ctx, []eh.Event{event2}, 1)
if err != nil {
t.Error("there should be no error:", err)
Expand Down
2 changes: 2 additions & 0 deletions eventstore/memory/eventstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ func (s *EventStore) RenameEvent(ctx context.Context, from, to eh.EventType) err
e.AggregateID(),
e.Version(),
),
eh.WithMetadata(e.Metadata()),
)
}
}
Expand Down Expand Up @@ -265,5 +266,6 @@ func copyEvent(ctx context.Context, event eh.Event) (eh.Event, error) {
event.AggregateID(),
event.Version(),
),
eh.WithMetadata(event.Metadata()),
), nil
}
17 changes: 10 additions & 7 deletions eventstore/mongodb/eventstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,7 @@ func (s *EventStore) Load(ctx context.Context, id uuid.UUID) ([]eh.Event, error)
e.AggregateID,
e.Version,
),
eh.WithMetadata(e.Metadata),
)
events[i] = event
}
Expand Down Expand Up @@ -361,13 +362,14 @@ type aggregateRecord struct {
// evt is the internal event record for the MongoDB event store used
// to save and load events from the DB.
type evt struct {
EventType eh.EventType `bson:"event_type"`
RawData bson.Raw `bson:"data,omitempty"`
data eh.EventData `bson:"-"`
Timestamp time.Time `bson:"timestamp"`
AggregateType eh.AggregateType `bson:"aggregate_type"`
AggregateID uuid.UUID `bson:"_id"`
Version int `bson:"version"`
EventType eh.EventType `bson:"event_type"`
RawData bson.Raw `bson:"data,omitempty"`
data eh.EventData `bson:"-"`
Timestamp time.Time `bson:"timestamp"`
AggregateType eh.AggregateType `bson:"aggregate_type"`
AggregateID uuid.UUID `bson:"_id"`
Version int `bson:"version"`
Metadata map[string]interface{} `bson:"metadata"`
}

// newEvt returns a new evt for an event.
Expand All @@ -378,6 +380,7 @@ func newEvt(ctx context.Context, event eh.Event) (*evt, error) {
AggregateType: event.AggregateType(),
AggregateID: event.AggregateID(),
Version: event.Version(),
Metadata: event.Metadata(),
}

// Marshal event data if there is any.
Expand Down
6 changes: 6 additions & 0 deletions mocks/testutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ func CompareEvents(e1, e2 eh.Event) error {
if !reflect.DeepEqual(e1.Data(), e2.Data()) {
return fmt.Errorf("incorrect event data: %s (should be %s)", e1.Data(), e2.Data())
}
if !reflect.DeepEqual(e1.Metadata(), e2.Metadata()) {
return fmt.Errorf("incorrect event metadata: %s (should be %s)", e1.Metadata(), e2.Metadata())
}
return nil
}

Expand Down Expand Up @@ -64,6 +67,9 @@ func EqualEvents(evts1, evts2 []eh.Event) bool {
if e1.Version() != e2.Version() {
return false
}
if !reflect.DeepEqual(e1.Metadata(), e2.Metadata()) {
return false
}
}

return true
Expand Down

0 comments on commit 438b101

Please sign in to comment.