diff --git a/aggregate.go b/aggregate.go index 92cb0eb7..5c65f594 100644 --- a/aggregate.go +++ b/aggregate.go @@ -88,7 +88,7 @@ type AggregateStoreError struct { } // Error implements the Error method of the error interface. -func (e AggregateStoreError) Error() string { +func (e *AggregateStoreError) Error() string { str := "aggregate store: " if e.Op != "" { @@ -113,12 +113,12 @@ func (e AggregateStoreError) Error() string { } // Unwrap implements the errors.Unwrap method. -func (e AggregateStoreError) Unwrap() error { +func (e *AggregateStoreError) Unwrap() error { return e.Err } // Cause implements the github.com/pkg/errors Unwrap method. -func (e AggregateStoreError) Cause() error { +func (e *AggregateStoreError) Cause() error { return e.Unwrap() } @@ -129,17 +129,17 @@ type AggregateError struct { } // Error implements the Error method of the errors.Error interface. -func (e AggregateError) Error() string { +func (e *AggregateError) Error() string { return "aggregate error: " + e.Err.Error() } // Unwrap implements the errors.Unwrap method. -func (e AggregateError) Unwrap() error { +func (e *AggregateError) Unwrap() error { return e.Err } // Cause implements the github.com/pkg/errors Unwrap method. -func (e AggregateError) Cause() error { +func (e *AggregateError) Cause() error { return e.Unwrap() } diff --git a/aggregatestore/events/aggregatestore.go b/aggregatestore/events/aggregatestore.go index b3df8e18..ad10732c 100644 --- a/aggregatestore/events/aggregatestore.go +++ b/aggregatestore/events/aggregatestore.go @@ -61,7 +61,7 @@ func NewAggregateStore(store eh.EventStore) (*AggregateStore, error) { func (r *AggregateStore) Load(ctx context.Context, aggregateType eh.AggregateType, id uuid.UUID) (eh.Aggregate, error) { agg, err := eh.CreateAggregate(aggregateType, id) if err != nil { - return nil, eh.AggregateStoreError{ + return nil, &eh.AggregateStoreError{ Err: err, Op: eh.AggregateStoreOpLoad, AggregateType: aggregateType, @@ -71,7 +71,7 @@ func (r *AggregateStore) Load(ctx context.Context, aggregateType eh.AggregateTyp a, ok := agg.(VersionedAggregate) if !ok { - return nil, eh.AggregateStoreError{ + return nil, &eh.AggregateStoreError{ Err: ErrAggregateNotVersioned, Op: eh.AggregateStoreOpLoad, AggregateType: aggregateType, @@ -81,7 +81,7 @@ func (r *AggregateStore) Load(ctx context.Context, aggregateType eh.AggregateTyp events, err := r.store.Load(ctx, a.EntityID()) if err != nil && !errors.Is(err, eh.ErrAggregateNotFound) { - return nil, eh.AggregateStoreError{ + return nil, &eh.AggregateStoreError{ Err: err, Op: eh.AggregateStoreOpLoad, AggregateType: aggregateType, @@ -90,7 +90,7 @@ func (r *AggregateStore) Load(ctx context.Context, aggregateType eh.AggregateTyp } if err := r.applyEvents(ctx, a, events); err != nil { - return nil, eh.AggregateStoreError{ + return nil, &eh.AggregateStoreError{ Err: err, Op: eh.AggregateStoreOpLoad, AggregateType: aggregateType, @@ -106,7 +106,7 @@ func (r *AggregateStore) Load(ctx context.Context, aggregateType eh.AggregateTyp func (r *AggregateStore) Save(ctx context.Context, agg eh.Aggregate) error { a, ok := agg.(VersionedAggregate) if !ok { - return eh.AggregateStoreError{ + return &eh.AggregateStoreError{ Err: ErrAggregateNotVersioned, Op: eh.AggregateStoreOpSave, AggregateType: agg.AggregateType(), @@ -121,7 +121,7 @@ func (r *AggregateStore) Save(ctx context.Context, agg eh.Aggregate) error { } if err := r.store.Save(ctx, events, a.AggregateVersion()); err != nil { - return eh.AggregateStoreError{ + return &eh.AggregateStoreError{ Err: err, Op: eh.AggregateStoreOpSave, AggregateType: agg.AggregateType(), @@ -134,7 +134,7 @@ func (r *AggregateStore) Save(ctx context.Context, agg eh.Aggregate) error { // Apply the events in case the aggregate needs to be further used // after this save. Currently it is not reused. if err := r.applyEvents(ctx, a, events); err != nil { - return eh.AggregateStoreError{ + return &eh.AggregateStoreError{ Err: err, Op: eh.AggregateStoreOpSave, AggregateType: agg.AggregateType(), diff --git a/aggregatestore/events/aggregatestore_test.go b/aggregatestore/events/aggregatestore_test.go index e96a1da8..4cd16f43 100644 --- a/aggregatestore/events/aggregatestore_test.go +++ b/aggregatestore/events/aggregatestore_test.go @@ -203,7 +203,7 @@ func TestAggregateStore_SaveEvents(t *testing.T) { storeErr := errors.New("store error") eventStore.Err = storeErr err = store.Save(ctx, agg) - aggStoreErr := eh.AggregateStoreError{} + aggStoreErr := &eh.AggregateStoreError{} if !errors.As(err, &aggStoreErr) || !errors.Is(err, storeErr) { t.Error("there should be an aggregate store error:", err) } @@ -214,7 +214,7 @@ func TestAggregateStore_SaveEvents(t *testing.T) { aggErr := errors.New("aggregate error") agg.err = aggErr err = store.Save(ctx, agg) - aggStoreErr = eh.AggregateStoreError{} + aggStoreErr = &eh.AggregateStoreError{} if !errors.As(err, &aggStoreErr) || !errors.Is(err, aggErr) { t.Error("there should be an aggregate store error:", err) } diff --git a/aggregatestore/model/aggregatestore_test.go b/aggregatestore/model/aggregatestore_test.go index b2eeb554..af288b7c 100644 --- a/aggregatestore/model/aggregatestore_test.go +++ b/aggregatestore/model/aggregatestore_test.go @@ -55,7 +55,7 @@ func TestAggregateStore_LoadNotFound(t *testing.T) { ctx := context.Background() id := uuid.New() - repo.LoadErr = eh.RepoError{Err: eh.ErrEntityNotFound} + repo.LoadErr = &eh.RepoError{Err: eh.ErrEntityNotFound} agg, err := store.Load(ctx, AggregateType, id) if err != nil { t.Fatal("there should be no error:", err) diff --git a/command_check.go b/command_check.go index 6052fc72..4e580e01 100644 --- a/command_check.go +++ b/command_check.go @@ -33,7 +33,7 @@ type CommandFieldError struct { } // Error implements the Error method of the error interface. -func (c CommandFieldError) Error() string { +func (c *CommandFieldError) Error() string { return "missing field: " + c.Field } @@ -62,7 +62,7 @@ func CheckCommand(cmd Command) error { } if zero { - return CommandFieldError{field.Name} + return &CommandFieldError{field.Name} } } return nil diff --git a/commandhandler/aggregate/commandhandler.go b/commandhandler/aggregate/commandhandler.go index 2fa32ab9..d5006cb6 100644 --- a/commandhandler/aggregate/commandhandler.go +++ b/commandhandler/aggregate/commandhandler.go @@ -67,7 +67,7 @@ func (h *CommandHandler) HandleCommand(ctx context.Context, cmd eh.Command) erro } if err = a.HandleCommand(ctx, cmd); err != nil { - return eh.AggregateError{Err: err} + return &eh.AggregateError{Err: err} } return h.store.Save(ctx, a) diff --git a/commandhandler/aggregate/commandhandler_test.go b/commandhandler/aggregate/commandhandler_test.go index dccf9197..bd7d2611 100644 --- a/commandhandler/aggregate/commandhandler_test.go +++ b/commandhandler/aggregate/commandhandler_test.go @@ -99,7 +99,7 @@ func TestCommandHandler_ErrorInHandler(t *testing.T) { Content: "command1", } err := h.HandleCommand(context.Background(), cmd) - var aggregateErr eh.AggregateError + aggregateErr := &eh.AggregateError{} if !errors.As(err, &aggregateErr) || !errors.Is(err, commandErr) { t.Error("there should be a command error:", err) } diff --git a/eventbus.go b/eventbus.go index 71ba0a8e..2f0e9e86 100644 --- a/eventbus.go +++ b/eventbus.go @@ -31,7 +31,7 @@ type EventBus interface { AddHandler(context.Context, EventMatcher, EventHandler) error // Errors returns an error channel where async handling errors are sent. - Errors() <-chan EventBusError + Errors() <-chan *EventBusError // Close closes the EventBus and waits for all handlers to finish. Close() error @@ -58,7 +58,7 @@ type EventBusError struct { } // Error implements the Error method of the error interface. -func (e EventBusError) Error() string { +func (e *EventBusError) Error() string { str := "event bus: " if e.Err != nil { @@ -75,11 +75,11 @@ func (e EventBusError) Error() string { } // Unwrap implements the errors.Unwrap method. -func (e EventBusError) Unwrap() error { +func (e *EventBusError) Unwrap() error { return e.Err } // Cause implements the github.com/pkg/errors Unwrap method. -func (e EventBusError) Cause() error { +func (e *EventBusError) Cause() error { return e.Unwrap() } diff --git a/eventbus/gcp/eventbus.go b/eventbus/gcp/eventbus.go index 1c1949b4..c1e6d681 100644 --- a/eventbus/gcp/eventbus.go +++ b/eventbus/gcp/eventbus.go @@ -38,7 +38,7 @@ type EventBus struct { topic *pubsub.Topic registered map[eh.EventHandlerType]struct{} registeredMu sync.RWMutex - errCh chan eh.EventBusError + errCh chan *eh.EventBusError cctx context.Context cancel context.CancelFunc wg sync.WaitGroup @@ -52,7 +52,7 @@ func NewEventBus(projectID, appID string, options ...Option) (*EventBus, error) b := &EventBus{ appID: appID, registered: map[eh.EventHandlerType]struct{}{}, - errCh: make(chan eh.EventBusError, 100), + errCh: make(chan *eh.EventBusError, 100), cctx: ctx, cancel: cancel, codec: &json.EventCodec{}, @@ -206,7 +206,7 @@ func (b *EventBus) AddHandler(ctx context.Context, m eh.EventMatcher, h eh.Event } // Errors implements the Errors method of the eventhorizon.EventBus interface. -func (b *EventBus) Errors() <-chan eh.EventBusError { +func (b *EventBus) Errors() <-chan *eh.EventBusError { return b.errCh } @@ -231,7 +231,7 @@ func (b *EventBus) handle(m eh.EventMatcher, h eh.EventHandler, sub *pubsub.Subs if err := sub.Receive(b.cctx, b.handler(m, h)); err != nil { err = fmt.Errorf("could not receive: %w", err) select { - case b.errCh <- eh.EventBusError{Err: err}: + case b.errCh <- &eh.EventBusError{Err: err}: default: log.Printf("eventhorizon: missed error in GCP event bus: %s", err) } @@ -250,7 +250,7 @@ func (b *EventBus) handler(m eh.EventMatcher, h eh.EventHandler) func(ctx contex if err != nil { err = fmt.Errorf("could not unmarshal event: %w", err) select { - case b.errCh <- eh.EventBusError{Err: err, Ctx: ctx}: + case b.errCh <- &eh.EventBusError{Err: err, Ctx: ctx}: default: log.Printf("eventhorizon: missed error in GCP event bus: %s", err) } @@ -268,7 +268,7 @@ func (b *EventBus) handler(m eh.EventMatcher, h eh.EventHandler) func(ctx contex if err := h.HandleEvent(ctx, event); err != nil { err = fmt.Errorf("could not handle event (%s): %w", h.HandlerType(), err) select { - case b.errCh <- eh.EventBusError{Err: err, Ctx: ctx, Event: event}: + case b.errCh <- &eh.EventBusError{Err: err, Ctx: ctx, Event: event}: default: log.Printf("eventhorizon: missed error in GCP event bus: %s", err) } diff --git a/eventbus/kafka/eventbus.go b/eventbus/kafka/eventbus.go index 84c13d94..c0da3592 100644 --- a/eventbus/kafka/eventbus.go +++ b/eventbus/kafka/eventbus.go @@ -39,7 +39,7 @@ type EventBus struct { writer *kafka.Writer registered map[eh.EventHandlerType]struct{} registeredMu sync.RWMutex - errCh chan eh.EventBusError + errCh chan *eh.EventBusError cctx context.Context cancel context.CancelFunc wg sync.WaitGroup @@ -55,7 +55,7 @@ func NewEventBus(addr, appID string, options ...Option) (*EventBus, error) { appID: appID, topic: appID + "_events", registered: map[eh.EventHandlerType]struct{}{}, - errCh: make(chan eh.EventBusError, 100), + errCh: make(chan *eh.EventBusError, 100), cctx: ctx, cancel: cancel, codec: &json.EventCodec{}, @@ -217,7 +217,7 @@ func (b *EventBus) AddHandler(ctx context.Context, m eh.EventMatcher, h eh.Event } // Errors implements the Errors method of the eventhorizon.EventBus interface. -func (b *EventBus) Errors() <-chan eh.EventBusError { +func (b *EventBus) Errors() <-chan *eh.EventBusError { return b.errCh } @@ -249,7 +249,7 @@ func (b *EventBus) handle(m eh.EventMatcher, h eh.EventHandler, r *kafka.Reader) } else if err != nil { err = fmt.Errorf("could not fetch message: %w", err) select { - case b.errCh <- eh.EventBusError{Err: err}: + case b.errCh <- &eh.EventBusError{Err: err}: default: log.Printf("eventhorizon: missed error in Kafka event bus: %s", err) } @@ -258,8 +258,7 @@ func (b *EventBus) handle(m eh.EventMatcher, h eh.EventHandler, r *kafka.Reader) continue } - var noBusError eh.EventBusError - if err := handler(b.cctx, msg); err != noBusError { + if err := handler(b.cctx, msg); err != nil { select { case b.errCh <- err: default: @@ -272,7 +271,7 @@ func (b *EventBus) handle(m eh.EventMatcher, h eh.EventHandler, r *kafka.Reader) if err := r.CommitMessages(context.Background(), msg); err != nil { err = fmt.Errorf("could not commit message: %w", err) select { - case b.errCh <- eh.EventBusError{Err: err}: + case b.errCh <- &eh.EventBusError{Err: err}: default: log.Printf("eventhorizon: missed error in Kafka event bus: %s", err) } @@ -284,11 +283,11 @@ func (b *EventBus) handle(m eh.EventMatcher, h eh.EventHandler, r *kafka.Reader) } } -func (b *EventBus) handler(m eh.EventMatcher, h eh.EventHandler, r *kafka.Reader) func(ctx context.Context, msg kafka.Message) eh.EventBusError { - return func(ctx context.Context, msg kafka.Message) eh.EventBusError { +func (b *EventBus) handler(m eh.EventMatcher, h eh.EventHandler, r *kafka.Reader) func(ctx context.Context, msg kafka.Message) *eh.EventBusError { + return func(ctx context.Context, msg kafka.Message) *eh.EventBusError { event, ctx, err := b.codec.UnmarshalEvent(ctx, msg.Value) if err != nil { - return eh.EventBusError{ + return &eh.EventBusError{ Err: fmt.Errorf("could not unmarshal event: %w", err), Ctx: ctx, } @@ -296,18 +295,18 @@ func (b *EventBus) handler(m eh.EventMatcher, h eh.EventHandler, r *kafka.Reader // Ignore non-matching events. if !m.Match(event) { - return eh.EventBusError{} + return nil } // Handle the event if it did match. if err := h.HandleEvent(ctx, event); err != nil { - return eh.EventBusError{ + return &eh.EventBusError{ Err: fmt.Errorf("could not handle event (%s): %w", h.HandlerType(), err), Ctx: ctx, Event: event, } } - return eh.EventBusError{} + return nil } } diff --git a/eventbus/local/eventbus.go b/eventbus/local/eventbus.go index 3f3c2c4f..623a56ed 100644 --- a/eventbus/local/eventbus.go +++ b/eventbus/local/eventbus.go @@ -34,7 +34,7 @@ type EventBus struct { group *Group registered map[eh.EventHandlerType]struct{} registeredMu sync.RWMutex - errCh chan eh.EventBusError + errCh chan *eh.EventBusError cctx context.Context cancel context.CancelFunc wg sync.WaitGroup @@ -48,7 +48,7 @@ func NewEventBus(options ...Option) *EventBus { b := &EventBus{ group: NewGroup(), registered: map[eh.EventHandlerType]struct{}{}, - errCh: make(chan eh.EventBusError, 100), + errCh: make(chan *eh.EventBusError, 100), cctx: ctx, cancel: cancel, codec: &json.EventCodec{}, @@ -127,7 +127,7 @@ func (b *EventBus) AddHandler(ctx context.Context, m eh.EventMatcher, h eh.Event } // Errors implements the Errors method of the eventhorizon.EventBus interface. -func (b *EventBus) Errors() <-chan eh.EventBusError { +func (b *EventBus) Errors() <-chan *eh.EventBusError { return b.errCh } @@ -160,7 +160,7 @@ func (b *EventBus) handle(m eh.EventMatcher, h eh.EventHandler, ch <-chan []byte if err != nil { err = fmt.Errorf("could not unmarshal event: %w", err) select { - case b.errCh <- eh.EventBusError{Err: err, Ctx: ctx}: + case b.errCh <- &eh.EventBusError{Err: err, Ctx: ctx}: default: log.Printf("eventhorizon: missed error in local event bus: %s", err) } @@ -176,7 +176,7 @@ func (b *EventBus) handle(m eh.EventMatcher, h eh.EventHandler, ch <-chan []byte if err := h.HandleEvent(ctx, event); err != nil { err = fmt.Errorf("could not handle event (%s): %s", h.HandlerType(), err.Error()) select { - case b.errCh <- eh.EventBusError{Err: err, Ctx: ctx, Event: event}: + case b.errCh <- &eh.EventBusError{Err: err, Ctx: ctx, Event: event}: default: log.Printf("eventhorizon: missed error in local event bus: %s", err) } diff --git a/eventbus/nats/eventbus.go b/eventbus/nats/eventbus.go index 8a547018..1ff631a1 100644 --- a/eventbus/nats/eventbus.go +++ b/eventbus/nats/eventbus.go @@ -38,7 +38,7 @@ type EventBus struct { connOpts []nats.Option registered map[eh.EventHandlerType]struct{} registeredMu sync.RWMutex - errCh chan eh.EventBusError + errCh chan *eh.EventBusError cctx context.Context cancel context.CancelFunc wg sync.WaitGroup @@ -53,7 +53,7 @@ func NewEventBus(url, appID string, options ...Option) (*EventBus, error) { appID: appID, streamName: appID + "_events", registered: map[eh.EventHandlerType]struct{}{}, - errCh: make(chan eh.EventBusError, 100), + errCh: make(chan *eh.EventBusError, 100), cctx: ctx, cancel: cancel, codec: &json.EventCodec{}, @@ -179,7 +179,7 @@ func (b *EventBus) AddHandler(ctx context.Context, m eh.EventMatcher, h eh.Event } // Errors implements the Errors method of the eventhorizon.EventBus interface. -func (b *EventBus) Errors() <-chan eh.EventBusError { +func (b *EventBus) Errors() <-chan *eh.EventBusError { return b.errCh } @@ -215,7 +215,7 @@ func (b *EventBus) handler(ctx context.Context, m eh.EventMatcher, h eh.EventHan if err != nil { err = fmt.Errorf("could not unmarshal event: %w", err) select { - case b.errCh <- eh.EventBusError{Err: err, Ctx: ctx}: + case b.errCh <- &eh.EventBusError{Err: err, Ctx: ctx}: default: log.Printf("eventhorizon: missed error in NATS event bus: %s", err) } @@ -233,7 +233,7 @@ func (b *EventBus) handler(ctx context.Context, m eh.EventMatcher, h eh.EventHan if err := h.HandleEvent(ctx, event); err != nil { err = fmt.Errorf("could not handle event (%s): %w", h.HandlerType(), err) select { - case b.errCh <- eh.EventBusError{Err: err, Ctx: ctx, Event: event}: + case b.errCh <- &eh.EventBusError{Err: err, Ctx: ctx, Event: event}: default: log.Printf("eventhorizon: missed error in NATS event bus: %s", err) } diff --git a/eventbus/redis/eventbus.go b/eventbus/redis/eventbus.go index e6b08237..155c8a6c 100644 --- a/eventbus/redis/eventbus.go +++ b/eventbus/redis/eventbus.go @@ -39,7 +39,7 @@ type EventBus struct { clientOpts *redis.Options registered map[eh.EventHandlerType]struct{} registeredMu sync.RWMutex - errCh chan eh.EventBusError + errCh chan *eh.EventBusError cctx context.Context cancel context.CancelFunc wg sync.WaitGroup @@ -55,7 +55,7 @@ func NewEventBus(addr, appID, clientID string, options ...Option) (*EventBus, er clientID: clientID, streamName: appID + "_events", registered: map[eh.EventHandlerType]struct{}{}, - errCh: make(chan eh.EventBusError, 100), + errCh: make(chan *eh.EventBusError, 100), cctx: ctx, cancel: cancel, codec: &json.EventCodec{}, @@ -178,7 +178,7 @@ func (b *EventBus) AddHandler(ctx context.Context, m eh.EventMatcher, h eh.Event } // Errors implements the Errors method of the eventhorizon.EventBus interface. -func (b *EventBus) Errors() <-chan eh.EventBusError { +func (b *EventBus) Errors() <-chan *eh.EventBusError { return b.errCh } @@ -208,7 +208,7 @@ func (b *EventBus) handle(m eh.EventMatcher, h eh.EventHandler, groupName string } else if err != nil { err = fmt.Errorf("could not receive: %w", err) select { - case b.errCh <- eh.EventBusError{Err: err}: + case b.errCh <- &eh.EventBusError{Err: err}: default: log.Printf("eventhorizon: missed error in Redis event bus: %s", err) } @@ -236,7 +236,7 @@ func (b *EventBus) handler(m eh.EventMatcher, h eh.EventHandler, groupName strin if err != nil { err = fmt.Errorf("could not unmarshal event: %w", err) select { - case b.errCh <- eh.EventBusError{Err: err, Ctx: ctx}: + case b.errCh <- &eh.EventBusError{Err: err, Ctx: ctx}: default: log.Printf("eventhorizon: missed error in Redis event bus: %s", err) } @@ -250,7 +250,7 @@ func (b *EventBus) handler(m eh.EventMatcher, h eh.EventHandler, groupName strin if err != nil { err = fmt.Errorf("could not ack event: %w", err) select { - case b.errCh <- eh.EventBusError{Err: err, Ctx: ctx}: + case b.errCh <- &eh.EventBusError{Err: err, Ctx: ctx}: default: log.Printf("eventhorizon: missed error in Redis event bus: %s", err) } @@ -262,7 +262,7 @@ func (b *EventBus) handler(m eh.EventMatcher, h eh.EventHandler, groupName strin if err := h.HandleEvent(ctx, event); err != nil { err = fmt.Errorf("could not handle event (%s): %w", h.HandlerType(), err) select { - case b.errCh <- eh.EventBusError{Err: err, Ctx: ctx, Event: event}: + case b.errCh <- &eh.EventBusError{Err: err, Ctx: ctx, Event: event}: default: log.Printf("eventhorizon: missed error in Redis event bus: %s", err) } @@ -274,7 +274,7 @@ func (b *EventBus) handler(m eh.EventMatcher, h eh.EventHandler, groupName strin if err != nil { err = fmt.Errorf("could not ack event: %w", err) select { - case b.errCh <- eh.EventBusError{Err: err, Ctx: ctx}: + case b.errCh <- &eh.EventBusError{Err: err, Ctx: ctx}: default: log.Printf("eventhorizon: missed error in Redis event bus: %s", err) } diff --git a/eventbus_test.go b/eventbus_test.go index 48cabc87..2c3d5c3d 100644 --- a/eventbus_test.go +++ b/eventbus_test.go @@ -56,7 +56,7 @@ func TestEventBusError(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - busError := EventBusError{ + busError := &EventBusError{ Err: tc.err, Event: tc.event, } diff --git a/eventhandler.go b/eventhandler.go index c080066d..bf638d43 100644 --- a/eventhandler.go +++ b/eventhandler.go @@ -68,7 +68,7 @@ type EventHandlerError struct { } // Error implements the Error method of the errors.Error interface. -func (e EventHandlerError) Error() string { +func (e *EventHandlerError) Error() string { str := "could not handle event: " if e.Err != nil { @@ -85,11 +85,11 @@ func (e EventHandlerError) Error() string { } // Unwrap implements the errors.Unwrap method. -func (e EventHandlerError) Unwrap() error { +func (e *EventHandlerError) Unwrap() error { return e.Err } // Cause implements the github.com/pkg/errors Unwrap method. -func (e EventHandlerError) Cause() error { +func (e *EventHandlerError) Cause() error { return e.Unwrap() } diff --git a/eventhandler/projector/eventhandler.go b/eventhandler/projector/eventhandler.go index 3724cbfb..730c7f3a 100644 --- a/eventhandler/projector/eventhandler.go +++ b/eventhandler/projector/eventhandler.go @@ -65,7 +65,7 @@ type Error struct { } // Error implements the Error method of the errors.Error interface. -func (e Error) Error() string { +func (e *Error) Error() string { str := "projector '" + e.Projector + "': " if e.Err != nil { @@ -86,12 +86,12 @@ func (e Error) Error() string { } // Unwrap implements the errors.Unwrap method. -func (e Error) Unwrap() error { +func (e *Error) Unwrap() error { return e.Err } // Cause implements the github.com/pkg/errors Unwrap method. -func (e Error) Cause() error { +func (e *Error) Cause() error { return e.Unwrap() } @@ -195,7 +195,7 @@ retryOnce: entity, err := h.repo.Find(findCtx, id) if errors.Is(err, eh.ErrEntityNotFound) { if h.factoryFn == nil { - return Error{ + return &Error{ Err: ErrModelNotSet, Projector: h.projector.ProjectorType().String(), Event: event, @@ -210,14 +210,14 @@ retryOnce: goto retryOnce } - return Error{ + return &Error{ Err: fmt.Errorf("could not load entity with correct version: %w", err), Projector: h.projector.ProjectorType().String(), Event: event, EntityID: id, } } else if err != nil { - return Error{ + return &Error{ Err: fmt.Errorf("could not load entity: %w", err), Projector: h.projector.ProjectorType().String(), Event: event, @@ -243,7 +243,7 @@ retryOnce: goto retryOnce } - return Error{ + return &Error{ Err: eh.ErrIncorrectEntityVersion, Projector: h.projector.ProjectorType().String(), Event: event, @@ -256,7 +256,7 @@ retryOnce: // Run the projection, which will possibly increment the version. newEntity, err := h.projector.Project(ctx, event, entity) if err != nil { - return Error{ + return &Error{ Err: fmt.Errorf("could not project: %w", err), Projector: h.projector.ProjectorType().String(), Event: event, @@ -269,7 +269,7 @@ retryOnce: if newEntity, ok := newEntity.(eh.Versionable); ok { entityVersion = newEntity.AggregateVersion() if newEntity.AggregateVersion() != event.Version() { - return Error{ + return &Error{ Err: ErrIncorrectProjectedEntityVersion, Projector: h.projector.ProjectorType().String(), Event: event, @@ -282,7 +282,7 @@ retryOnce: // Update or remove the model. if newEntity != nil { if newEntity.EntityID() != id { - return Error{ + return &Error{ Err: fmt.Errorf("incorrect entity ID after projection"), Projector: h.projector.ProjectorType().String(), Event: event, @@ -291,7 +291,7 @@ retryOnce: } } if err := h.repo.Save(ctx, newEntity); err != nil { - return Error{ + return &Error{ Err: fmt.Errorf("could not save: %w", err), Projector: h.projector.ProjectorType().String(), Event: event, @@ -301,7 +301,7 @@ retryOnce: } } else { if err := h.repo.Remove(ctx, id); err != nil { - return Error{ + return &Error{ Err: fmt.Errorf("could not remove: %w", err), Projector: h.projector.ProjectorType().String(), Event: event, diff --git a/eventhandler/projector/eventhandler_test.go b/eventhandler/projector/eventhandler_test.go index aa9366a3..a4eba76a 100644 --- a/eventhandler/projector/eventhandler_test.go +++ b/eventhandler/projector/eventhandler_test.go @@ -46,7 +46,7 @@ func TestEventHandler_CreateModel(t *testing.T) { entity := &mocks.SimpleModel{ ID: id, } - repo.LoadErr = eh.RepoError{ + repo.LoadErr = &eh.RepoError{ Err: eh.ErrEntityNotFound, } projector.newEntity = entity @@ -151,15 +151,10 @@ func TestEventHandler_UpdateModelWithVersion(t *testing.T) { // Handling a future event with a gap in versions should produce an error. futureEvent := eh.NewEvent(mocks.EventType, eventData, timestamp, eh.ForAggregate(mocks.AggregateType, id, 8)) - expectedErr := Error{ - Err: eh.ErrIncorrectEntityVersion, - Projector: TestProjectorType.String(), - Event: futureEvent, - EntityID: id, - EntityVersion: 1, - } + errType := &Error{} err := handler.HandleEvent(ctx, futureEvent) - if !errors.Is(err, expectedErr) { + if !errors.As(err, &errType) || !errors.Is(err, eh.ErrIncorrectEntityVersion) { + // if err != expectedErr { t.Error("there should be an error:", err) } @@ -171,15 +166,9 @@ func TestEventHandler_UpdateModelWithVersion(t *testing.T) { Version: 3, Content: "version 1", } - expectedErr = Error{ - Err: ErrIncorrectProjectedEntityVersion, - Projector: TestProjectorType.String(), - Event: nextEvent, - EntityID: id, - EntityVersion: 3, - } + errType = &Error{} err = handler.HandleEvent(ctx, nextEvent) - if !errors.Is(err, expectedErr) { + if !errors.As(err, &errType) || !errors.Is(err, ErrIncorrectProjectedEntityVersion) { t.Error("there should be an error:", err) } @@ -362,7 +351,7 @@ func TestEventHandler_LoadError(t *testing.T) { err := handler.HandleEvent(ctx, event) - projectError := Error{} + projectError := &Error{} if !errors.As(err, &projectError) || !errors.Is(err, loadErr) { t.Error("there should be an error:", err) } @@ -389,7 +378,7 @@ func TestEventHandler_SaveError(t *testing.T) { err := handler.HandleEvent(ctx, event) - projectError := Error{} + projectError := &Error{} if !errors.As(err, &projectError) || !errors.Is(err, saveErr) { t.Error("there should be an error:", err) } @@ -416,7 +405,7 @@ func TestEventHandler_ProjectError(t *testing.T) { err := handler.HandleEvent(ctx, event) - projectError := Error{} + projectError := &Error{} if !errors.As(err, &projectError) || !errors.Is(err, projectErr) { t.Error("there should be an error:", err) } diff --git a/eventhandler/saga/eventhandler.go b/eventhandler/saga/eventhandler.go index 9a329f1f..1057bb01 100644 --- a/eventhandler/saga/eventhandler.go +++ b/eventhandler/saga/eventhandler.go @@ -58,17 +58,17 @@ type Error struct { } // Error implements the Error method of the errors.Error interface. -func (e Error) Error() string { +func (e *Error) Error() string { return fmt.Sprintf("%s: %s", e.Saga, e.Err) } // Unwrap implements the errors.Unwrap method. -func (e Error) Unwrap() error { +func (e *Error) Unwrap() error { return e.Err } // Cause implements the github.com/pkg/errors Unwrap method. -func (e Error) Cause() error { +func (e *Error) Cause() error { return e.Unwrap() } @@ -89,7 +89,7 @@ func (h *EventHandler) HandlerType() eh.EventHandlerType { func (h *EventHandler) HandleEvent(ctx context.Context, event eh.Event) error { // Run the saga which can issue commands on the provided command handler. if err := h.saga.RunSaga(ctx, event, h.commandHandler); err != nil { - return Error{ + return &Error{ Err: err, Saga: h.saga.SagaType().String(), } diff --git a/eventstore.go b/eventstore.go index d7d80283..e7b2deee 100644 --- a/eventstore.go +++ b/eventstore.go @@ -67,7 +67,7 @@ type EventStoreError struct { } // Error implements the Error method of the errors.Error interface. -func (e EventStoreError) Error() string { +func (e *EventStoreError) Error() string { str := "event store: " if e.Op != "" { @@ -100,11 +100,11 @@ func (e EventStoreError) Error() string { } // Unwrap implements the errors.Unwrap method. -func (e EventStoreError) Unwrap() error { +func (e *EventStoreError) Unwrap() error { return e.Err } // Cause implements the github.com/pkg/errors Unwrap method. -func (e EventStoreError) Cause() error { +func (e *EventStoreError) Cause() error { return e.Unwrap() } diff --git a/eventstore/acceptanece_testing.go b/eventstore/acceptanece_testing.go index e5c9790e..a94b8cf4 100644 --- a/eventstore/acceptanece_testing.go +++ b/eventstore/acceptanece_testing.go @@ -44,7 +44,7 @@ func AcceptanceTest(t *testing.T, store eh.EventStore, ctx context.Context) []eh // Save no events. err := store.Save(ctx, []eh.Event{}, 0) - eventStoreErr := eh.EventStoreError{} + eventStoreErr := &eh.EventStoreError{} if !errors.As(err, &eventStoreErr) || eventStoreErr.Err.Error() != "no events" { t.Error("there should be a event store error:", err) } @@ -65,7 +65,7 @@ func AcceptanceTest(t *testing.T, store eh.EventStore, ctx context.Context) []eh // Try to save same event twice. err = store.Save(ctx, []eh.Event{event1}, 1) - eventStoreErr = eh.EventStoreError{} + eventStoreErr = &eh.EventStoreError{} if !errors.As(err, &eventStoreErr) || eventStoreErr.Err.Error() != "invalid event version" { t.Error("there should be a event store error:", err) } @@ -115,7 +115,7 @@ func AcceptanceTest(t *testing.T, store eh.EventStore, ctx context.Context) []eh // Load events for non-existing aggregate. events, err := store.Load(ctx, uuid.New()) - eventStoreErr = eh.EventStoreError{} + eventStoreErr = &eh.EventStoreError{} if !errors.As(err, &eventStoreErr) || !errors.Is(err, eh.ErrAggregateNotFound) { t.Error("there should be a not found error:", err) } diff --git a/eventstore/maintenance_testing.go b/eventstore/maintenance_testing.go index a2817caa..3edce3f0 100644 --- a/eventstore/maintenance_testing.go +++ b/eventstore/maintenance_testing.go @@ -63,7 +63,7 @@ func MaintenanceAcceptanceTest(t *testing.T, store eh.EventStore, storeMaintenan eventWithInvalidVersion := eh.NewEvent(mocks.EventType, &mocks.EventData{Content: "event20"}, timestamp, eh.ForAggregate(mocks.AggregateType, id, 20)) err = storeMaintenance.Replace(ctx, eventWithInvalidVersion) - eventStoreErr := eh.EventStoreError{} + eventStoreErr := &eh.EventStoreError{} if !errors.As(err, &eventStoreErr) || eventStoreErr.Err.Error() != "could not find original event" { t.Error("there should be a event store error:", err) } diff --git a/eventstore/memory/eventmaintenance.go b/eventstore/memory/eventmaintenance.go index 39df9b71..3a7e5def 100644 --- a/eventstore/memory/eventmaintenance.go +++ b/eventstore/memory/eventmaintenance.go @@ -30,7 +30,7 @@ func (s *EventStore) Replace(ctx context.Context, event eh.Event) error { aggregate, ok := s.db[id] if !ok { s.dbMu.RUnlock() - return eh.EventStoreError{ + return &eh.EventStoreError{ Err: eh.ErrAggregateNotFound, Op: eh.EventStoreOpReplace, AggregateID: id, @@ -42,7 +42,7 @@ func (s *EventStore) Replace(ctx context.Context, event eh.Event) error { // Create the event record for the Database. e, err := copyEvent(ctx, event) if err != nil { - return eh.EventStoreError{ + return &eh.EventStoreError{ Err: fmt.Errorf("could not copy event: %w", err), Op: eh.EventStoreOpReplace, AggregateID: id, @@ -59,7 +59,7 @@ func (s *EventStore) Replace(ctx context.Context, event eh.Event) error { } } if idx == -1 { - return eh.EventStoreError{ + return &eh.EventStoreError{ Err: fmt.Errorf("could not find original event"), Op: eh.EventStoreOpReplace, AggregateID: id, diff --git a/eventstore/memory/eventstore.go b/eventstore/memory/eventstore.go index 9d5d2315..13ffa367 100644 --- a/eventstore/memory/eventstore.go +++ b/eventstore/memory/eventstore.go @@ -69,7 +69,7 @@ func (s *EventStore) Save(ctx context.Context, events []eh.Event, originalVersio if s.eventHandler != nil { for _, e := range events { if err := s.eventHandler.HandleEvent(ctx, e); err != nil { - return eh.EventHandlerError{ + return &eh.EventHandlerError{ Err: err, Event: e, } @@ -86,7 +86,7 @@ func (s *EventStore) save(ctx context.Context, events []eh.Event, originalVersio defer s.dbMu.Unlock() if len(events) == 0 { - return eh.EventStoreError{ + return &eh.EventStoreError{ Err: fmt.Errorf("no events"), Op: eh.EventStoreOpSave, } @@ -101,7 +101,7 @@ func (s *EventStore) save(ctx context.Context, events []eh.Event, originalVersio for i, event := range events { // Only accept events belonging to the same aggregate. if event.AggregateID() != id { - return eh.EventStoreError{ + return &eh.EventStoreError{ Err: fmt.Errorf("event has different aggregate ID"), Op: eh.EventStoreOpSave, AggregateType: at, @@ -112,7 +112,7 @@ func (s *EventStore) save(ctx context.Context, events []eh.Event, originalVersio } if event.AggregateType() != at { - return eh.EventStoreError{ + return &eh.EventStoreError{ Err: fmt.Errorf("event has different aggregate type"), Op: eh.EventStoreOpSave, AggregateType: at, @@ -124,7 +124,7 @@ func (s *EventStore) save(ctx context.Context, events []eh.Event, originalVersio // Only accept events that apply to the correct aggregate version. if event.Version() != originalVersion+i+1 { - return eh.EventStoreError{ + return &eh.EventStoreError{ Err: fmt.Errorf("invalid event version"), Op: eh.EventStoreOpSave, AggregateType: at, @@ -137,7 +137,7 @@ func (s *EventStore) save(ctx context.Context, events []eh.Event, originalVersio // Create the event record with timestamp. e, err := copyEvent(ctx, event) if err != nil { - return eh.EventStoreError{ + return &eh.EventStoreError{ Err: fmt.Errorf("could not copy event: %w", err), Op: eh.EventStoreOpSave, AggregateType: at, @@ -165,7 +165,7 @@ func (s *EventStore) save(ctx context.Context, events []eh.Event, originalVersio // since loading the aggregate). if aggregate, ok := s.db[id]; ok { if aggregate.Version != originalVersion { - return eh.EventStoreError{ + return &eh.EventStoreError{ Err: fmt.Errorf("invalid original aggregate version, new version: %d", aggregate.Version), Op: eh.EventStoreOpSave, AggregateType: at, @@ -192,7 +192,7 @@ func (s *EventStore) Load(ctx context.Context, id uuid.UUID) ([]eh.Event, error) aggregate, ok := s.db[id] if !ok { - return nil, eh.EventStoreError{ + return nil, &eh.EventStoreError{ Err: eh.ErrAggregateNotFound, Op: eh.EventStoreOpLoad, AggregateID: id, @@ -203,7 +203,7 @@ func (s *EventStore) Load(ctx context.Context, id uuid.UUID) ([]eh.Event, error) for i, event := range aggregate.Events { e, err := copyEvent(ctx, event) if err != nil { - return nil, eh.EventStoreError{ + return nil, &eh.EventStoreError{ Err: fmt.Errorf("could not copy event: %w", err), Op: eh.EventStoreOpLoad, AggregateType: e.AggregateType(), @@ -233,16 +233,15 @@ func (s *EventStore) Close() error { // copyEvent duplicates an event. func copyEvent(ctx context.Context, event eh.Event) (eh.Event, error) { - // Copy data if there is any. var data eh.EventData + + // Copy data if there is any. if event.Data() != nil { var err error if data, err = eh.CreateEventData(event.EventType()); err != nil { return nil, fmt.Errorf("could not create event data: %w", err) - // return nil, eh.EventStoreError{ - // Err: fmt.Errorf("could not create event data: %w", err), - // } } + copier.Copy(data, event.Data()) } diff --git a/eventstore/mongodb/eventmaintenance.go b/eventstore/mongodb/eventmaintenance.go index 7ca19fae..7eda35f8 100644 --- a/eventstore/mongodb/eventmaintenance.go +++ b/eventstore/mongodb/eventmaintenance.go @@ -35,7 +35,7 @@ func (s *EventStore) Replace(ctx context.Context, event eh.Event) error { // First check if the aggregate exists, the not found error in the update // query can mean both that the aggregate or the event is not found. if n, err := s.aggregates.CountDocuments(ctx, bson.M{"_id": id}); n == 0 { - return eh.EventStoreError{ + return &eh.EventStoreError{ Err: eh.ErrAggregateNotFound, Op: eh.EventStoreOpReplace, AggregateType: at, @@ -44,7 +44,7 @@ func (s *EventStore) Replace(ctx context.Context, event eh.Event) error { Events: []eh.Event{event}, } } else if err != nil { - return eh.EventStoreError{ + return &eh.EventStoreError{ Err: fmt.Errorf("could not check aggregate existence: %w", err), Op: eh.EventStoreOpReplace, AggregateType: at, @@ -70,7 +70,7 @@ func (s *EventStore) Replace(ctx context.Context, event eh.Event) error { "$set": bson.M{"events.$": *e}, }, ); err != nil { - return eh.EventStoreError{ + return &eh.EventStoreError{ Err: err, Op: eh.EventStoreOpReplace, AggregateType: at, @@ -79,7 +79,7 @@ func (s *EventStore) Replace(ctx context.Context, event eh.Event) error { Events: []eh.Event{event}, } } else if r.MatchedCount == 0 { - return eh.EventStoreError{ + return &eh.EventStoreError{ Err: fmt.Errorf("could not find original event"), Op: eh.EventStoreOpReplace, AggregateType: at, @@ -104,7 +104,7 @@ func (s *EventStore) RenameEvent(ctx context.Context, from, to eh.EventType) err "$set": bson.M{"events.$.event_type": to.String()}, }, ); err != nil { - return eh.EventStoreError{ + return &eh.EventStoreError{ Err: fmt.Errorf("could not update events of type '%s': %w", from, err), Op: eh.EventStoreOpRename, } @@ -116,7 +116,7 @@ func (s *EventStore) RenameEvent(ctx context.Context, from, to eh.EventType) err // Clear clears the event storage. func (s *EventStore) Clear(ctx context.Context) error { if err := s.aggregates.Drop(ctx); err != nil { - return eh.EventStoreError{ + return &eh.EventStoreError{ Err: err, Op: eh.EventStoreOpRename, } diff --git a/eventstore/mongodb/eventstore.go b/eventstore/mongodb/eventstore.go index 61e5b198..d73868b2 100644 --- a/eventstore/mongodb/eventstore.go +++ b/eventstore/mongodb/eventstore.go @@ -96,7 +96,7 @@ func WithEventHandler(h eh.EventHandler) Option { // Save implements the Save method of the eventhorizon.EventStore interface. func (s *EventStore) Save(ctx context.Context, events []eh.Event, originalVersion int) error { if len(events) == 0 { - return eh.EventStoreError{ + return &eh.EventStoreError{ Err: fmt.Errorf("no events"), Op: eh.EventStoreOpSave, } @@ -111,7 +111,7 @@ func (s *EventStore) Save(ctx context.Context, events []eh.Event, originalVersio for i, event := range events { // Only accept events belonging to the same aggregate. if event.AggregateID() != id { - return eh.EventStoreError{ + return &eh.EventStoreError{ Err: fmt.Errorf("event has different aggregate ID"), Op: eh.EventStoreOpSave, AggregateType: at, @@ -122,7 +122,7 @@ func (s *EventStore) Save(ctx context.Context, events []eh.Event, originalVersio } if event.AggregateType() != at { - return eh.EventStoreError{ + return &eh.EventStoreError{ Err: fmt.Errorf("event has different aggregate type"), Op: eh.EventStoreOpSave, AggregateType: at, @@ -134,7 +134,7 @@ func (s *EventStore) Save(ctx context.Context, events []eh.Event, originalVersio // Only accept events that apply to the correct aggregate version. if event.Version() != originalVersion+i+1 { - return eh.EventStoreError{ + return &eh.EventStoreError{ Err: fmt.Errorf("invalid event version"), Op: eh.EventStoreOpSave, AggregateType: at, @@ -147,7 +147,7 @@ func (s *EventStore) Save(ctx context.Context, events []eh.Event, originalVersio // Create the event record for the DB. e, err := newEvt(ctx, event) if err != nil { - return eh.EventStoreError{ + return &eh.EventStoreError{ Err: fmt.Errorf("could not copy event: %w", err), Op: eh.EventStoreOpSave, AggregateType: at, @@ -169,7 +169,7 @@ func (s *EventStore) Save(ctx context.Context, events []eh.Event, originalVersio } if _, err := s.aggregates.InsertOne(ctx, aggregate); err != nil { - return eh.EventStoreError{ + return &eh.EventStoreError{ Err: fmt.Errorf("could not insert: %w", err), Op: eh.EventStoreOpSave, AggregateType: at, @@ -192,7 +192,7 @@ func (s *EventStore) Save(ctx context.Context, events []eh.Event, originalVersio "$inc": bson.M{"version": len(dbEvents)}, }, ); err != nil { - return eh.EventStoreError{ + return &eh.EventStoreError{ Err: fmt.Errorf("could not update: %w", err), Op: eh.EventStoreOpSave, AggregateType: at, @@ -201,7 +201,7 @@ func (s *EventStore) Save(ctx context.Context, events []eh.Event, originalVersio Events: events, } } else if r.MatchedCount == 0 { - return eh.EventStoreError{ + return &eh.EventStoreError{ Err: fmt.Errorf("invalid original aggregate version, new version %d", originalVersion), Op: eh.EventStoreOpSave, AggregateType: at, @@ -216,7 +216,7 @@ func (s *EventStore) Save(ctx context.Context, events []eh.Event, originalVersio if s.eventHandler != nil { for _, e := range events { if err := s.eventHandler.HandleEvent(ctx, e); err != nil { - return eh.EventHandlerError{ + return &eh.EventHandlerError{ Err: err, Event: e, } @@ -236,7 +236,7 @@ func (s *EventStore) Load(ctx context.Context, id uuid.UUID) ([]eh.Event, error) err = eh.ErrAggregateNotFound } - return nil, eh.EventStoreError{ + return nil, &eh.EventStoreError{ Err: err, Op: eh.EventStoreOpLoad, AggregateID: id, @@ -250,7 +250,7 @@ func (s *EventStore) Load(ctx context.Context, id uuid.UUID) ([]eh.Event, error) if len(e.RawData) > 0 { var err error if e.data, err = eh.CreateEventData(e.EventType); err != nil { - return nil, eh.EventStoreError{ + return nil, &eh.EventStoreError{ Err: fmt.Errorf("could not create event data: %w", err), Op: eh.EventStoreOpLoad, AggregateType: e.AggregateType, @@ -261,7 +261,7 @@ func (s *EventStore) Load(ctx context.Context, id uuid.UUID) ([]eh.Event, error) } if err := bson.Unmarshal(e.RawData, e.data); err != nil { - return nil, eh.EventStoreError{ + return nil, &eh.EventStoreError{ Err: fmt.Errorf("could not unmarshal event data: %w", err), Op: eh.EventStoreOpLoad, AggregateType: e.AggregateType, diff --git a/eventstore/mongodb_v2/eventmaintenance.go b/eventstore/mongodb_v2/eventmaintenance.go index e599e8cc..2aaa41df 100644 --- a/eventstore/mongodb_v2/eventmaintenance.go +++ b/eventstore/mongodb_v2/eventmaintenance.go @@ -35,7 +35,7 @@ func (s *EventStore) Replace(ctx context.Context, event eh.Event) error { sess, err := s.client.StartSession(nil) if err != nil { - return eh.EventStoreError{ + return &eh.EventStoreError{ Err: fmt.Errorf("could not start transaction: %w", err), Op: eh.EventStoreOpSave, AggregateType: at, @@ -94,7 +94,7 @@ func (s *EventStore) Replace(ctx context.Context, event eh.Event) error { return nil, nil }); err != nil { - return eh.EventStoreError{ + return &eh.EventStoreError{ Err: err, Op: eh.EventStoreOpReplace, AggregateType: at, @@ -119,7 +119,7 @@ func (s *EventStore) RenameEvent(ctx context.Context, from, to eh.EventType) err "$set": bson.M{"event_type": to.String()}, }, ); err != nil { - return eh.EventStoreError{ + return &eh.EventStoreError{ Err: fmt.Errorf("could not update events of type '%s': %w", from, err), Op: eh.EventStoreOpRename, } @@ -131,13 +131,13 @@ func (s *EventStore) RenameEvent(ctx context.Context, from, to eh.EventType) err // Clear clears the event storage. func (s *EventStore) Clear(ctx context.Context) error { if err := s.events.Drop(ctx); err != nil { - return eh.EventStoreError{ + return &eh.EventStoreError{ Err: fmt.Errorf("could not clear events collection: %w", err), } } if err := s.streams.Drop(ctx); err != nil { - return eh.EventStoreError{ + return &eh.EventStoreError{ Err: fmt.Errorf("could not clear streams collection: %w", err), } } diff --git a/eventstore/mongodb_v2/eventstore.go b/eventstore/mongodb_v2/eventstore.go index d765f777..d339c817 100644 --- a/eventstore/mongodb_v2/eventstore.go +++ b/eventstore/mongodb_v2/eventstore.go @@ -122,7 +122,7 @@ func WithEventHandler(h eh.EventHandler) Option { // Save implements the Save method of the eventhorizon.EventStore interface. func (s *EventStore) Save(ctx context.Context, events []eh.Event, originalVersion int) error { if len(events) == 0 { - return eh.EventStoreError{ + return &eh.EventStoreError{ Err: fmt.Errorf("no events"), Op: eh.EventStoreOpSave, } @@ -137,7 +137,7 @@ func (s *EventStore) Save(ctx context.Context, events []eh.Event, originalVersio for i, event := range events { // Only accept events belonging to the same aggregate. if event.AggregateID() != id { - return eh.EventStoreError{ + return &eh.EventStoreError{ Err: fmt.Errorf("event has different aggregate"), Op: eh.EventStoreOpSave, AggregateType: at, @@ -149,7 +149,7 @@ func (s *EventStore) Save(ctx context.Context, events []eh.Event, originalVersio // Only accept events that apply to the correct aggregate version. if event.Version() != originalVersion+i+1 { - return eh.EventStoreError{ + return &eh.EventStoreError{ Err: fmt.Errorf("invalid event version"), Op: eh.EventStoreOpSave, AggregateType: at, @@ -170,7 +170,7 @@ func (s *EventStore) Save(ctx context.Context, events []eh.Event, originalVersio sess, err := s.client.StartSession(nil) if err != nil { - return eh.EventStoreError{ + return &eh.EventStoreError{ Err: fmt.Errorf("could not start transaction: %w", err), Op: eh.EventStoreOpSave, AggregateType: at, @@ -274,7 +274,7 @@ func (s *EventStore) Save(ctx context.Context, events []eh.Event, originalVersio return nil, nil }); err != nil { - return eh.EventStoreError{ + return &eh.EventStoreError{ Err: err, Op: eh.EventStoreOpSave, AggregateType: at, @@ -288,7 +288,7 @@ func (s *EventStore) Save(ctx context.Context, events []eh.Event, originalVersio if s.eventHandler != nil { for _, e := range events { if err := s.eventHandler.HandleEvent(ctx, e); err != nil { - return eh.EventHandlerError{ + return &eh.EventHandlerError{ Err: err, Event: e, } @@ -303,7 +303,7 @@ func (s *EventStore) Save(ctx context.Context, events []eh.Event, originalVersio func (s *EventStore) Load(ctx context.Context, id uuid.UUID) ([]eh.Event, error) { cursor, err := s.events.Find(ctx, bson.M{"aggregate_id": id}) if err != nil { - return nil, eh.EventStoreError{ + return nil, &eh.EventStoreError{ Err: fmt.Errorf("could not find event: %w", err), Op: eh.EventStoreOpLoad, AggregateID: id, @@ -315,7 +315,7 @@ func (s *EventStore) Load(ctx context.Context, id uuid.UUID) ([]eh.Event, error) for cursor.Next(ctx) { var e evt if err := cursor.Decode(&e); err != nil { - return nil, eh.EventStoreError{ + return nil, &eh.EventStoreError{ Err: fmt.Errorf("could not decode event: %w", err), Op: eh.EventStoreOpLoad, AggregateID: id, @@ -327,7 +327,7 @@ func (s *EventStore) Load(ctx context.Context, id uuid.UUID) ([]eh.Event, error) if len(e.RawData) > 0 { var err error if e.data, err = eh.CreateEventData(e.EventType); err != nil { - return nil, eh.EventStoreError{ + return nil, &eh.EventStoreError{ Err: fmt.Errorf("could not create event data: %w", err), Op: eh.EventStoreOpLoad, AggregateType: e.AggregateType, @@ -338,7 +338,7 @@ func (s *EventStore) Load(ctx context.Context, id uuid.UUID) ([]eh.Event, error) } if err := bson.Unmarshal(e.RawData, e.data); err != nil { - return nil, eh.EventStoreError{ + return nil, &eh.EventStoreError{ Err: fmt.Errorf("could not unmarshal event data: %w", err), Op: eh.EventStoreOpLoad, AggregateType: e.AggregateType, @@ -366,7 +366,7 @@ func (s *EventStore) Load(ctx context.Context, id uuid.UUID) ([]eh.Event, error) } if len(events) == 0 { - return nil, eh.EventStoreError{ + return nil, &eh.EventStoreError{ Err: eh.ErrAggregateNotFound, Op: eh.EventStoreOpLoad, AggregateID: id, @@ -425,7 +425,7 @@ func newEvt(ctx context.Context, event eh.Event) (*evt, error) { e.RawData, err = bson.Marshal(event.Data()) if err != nil { - return nil, eh.EventStoreError{ + return nil, &eh.EventStoreError{ Err: fmt.Errorf("could not marshal event data: %w", err), } } diff --git a/examples/todomvc/backend/handler/handler_test.go b/examples/todomvc/backend/handler/handler_test.go index be5b4eb2..bd53d0a5 100644 --- a/examples/todomvc/backend/handler/handler_test.go +++ b/examples/todomvc/backend/handler/handler_test.go @@ -224,7 +224,7 @@ func TestDelete(t *testing.T) { cancelTimeout() _, err = todoRepo.Find(ctx, id) - repoErr := eh.RepoError{} + repoErr := &eh.RepoError{} if !errors.As(err, &repoErr) || !errors.Is(err, eh.ErrEntityNotFound) { t.Error("there should be a not found error:", err) } diff --git a/middleware/commandhandler/async/middleware.go b/middleware/commandhandler/async/middleware.go index f62dbb5b..90ea5e3a 100644 --- a/middleware/commandhandler/async/middleware.go +++ b/middleware/commandhandler/async/middleware.go @@ -23,14 +23,14 @@ import ( // NewMiddleware returns a new async handling middleware that returns any errors // on a error channel. -func NewMiddleware() (eh.CommandHandlerMiddleware, chan Error) { - errCh := make(chan Error, 20) +func NewMiddleware() (eh.CommandHandlerMiddleware, chan *Error) { + errCh := make(chan *Error, 20) return eh.CommandHandlerMiddleware(func(h eh.CommandHandler) eh.CommandHandler { return eh.CommandHandlerFunc(func(ctx context.Context, cmd eh.Command) error { go func() { if err := h.HandleCommand(ctx, cmd); err != nil { // Always try to deliver errors. - errCh <- Error{err, ctx, cmd} + errCh <- &Error{err, ctx, cmd} } }() return nil @@ -49,16 +49,16 @@ type Error struct { } // Error implements the Error method of the error interface. -func (e Error) Error() string { +func (e *Error) Error() string { return fmt.Sprintf("%s (%s): %s", e.Command.CommandType(), e.Command.AggregateID(), e.Err.Error()) } // Unwrap implements the errors.Unwrap method. -func (e Error) Unwrap() error { +func (e *Error) Unwrap() error { return e.Err } // Cause implements the github.com/pkg/errors Unwrap method. -func (e Error) Cause() error { +func (e *Error) Cause() error { return e.Unwrap() } diff --git a/middleware/commandhandler/scheduler/middleware.go b/middleware/commandhandler/scheduler/middleware.go index d2304d00..168069e2 100644 --- a/middleware/commandhandler/scheduler/middleware.go +++ b/middleware/commandhandler/scheduler/middleware.go @@ -24,8 +24,8 @@ import ( // NewMiddleware returns a new async handling middleware that returns any errors // on a error channel. -func NewMiddleware() (eh.CommandHandlerMiddleware, chan Error) { - errCh := make(chan Error, 20) +func NewMiddleware() (eh.CommandHandlerMiddleware, chan *Error) { + errCh := make(chan *Error, 20) return eh.CommandHandlerMiddleware(func(h eh.CommandHandler) eh.CommandHandler { return eh.CommandHandlerFunc(func(ctx context.Context, cmd eh.Command) error { // Delayed command execution if there is time set. @@ -44,7 +44,7 @@ func NewMiddleware() (eh.CommandHandlerMiddleware, chan Error) { if err != nil { // Always try to deliver errors. - errCh <- Error{err, ctx, cmd} + errCh <- &Error{err, ctx, cmd} } }() return nil @@ -91,16 +91,16 @@ type Error struct { } // Error implements the Error method of the error interface. -func (e Error) Error() string { +func (e *Error) Error() string { return fmt.Sprintf("%s (%s): %s", e.Command.CommandType(), e.Command.AggregateID(), e.Err.Error()) } // Unwrap implements the errors.Unwrap method. -func (e Error) Unwrap() error { +func (e *Error) Unwrap() error { return e.Err } // Cause implements the github.com/pkg/errors Unwrap method. -func (e Error) Cause() error { +func (e *Error) Cause() error { return e.Unwrap() } diff --git a/middleware/commandhandler/scheduler/middleware_test.go b/middleware/commandhandler/scheduler/middleware_test.go index 452dbb8f..ee2f9320 100644 --- a/middleware/commandhandler/scheduler/middleware_test.go +++ b/middleware/commandhandler/scheduler/middleware_test.go @@ -103,7 +103,7 @@ func TestMiddleware_Errors(t *testing.T) { if len(inner.Commands) != 0 { t.Error("the command should not have been handled yet:", inner.Commands) } - var err Error + var err *Error select { case err = <-errCh: case <-time.After(10 * time.Millisecond): @@ -133,7 +133,7 @@ func TestMiddleware_ContextCanceled(t *testing.T) { if len(inner.Commands) != 0 { t.Error("the command should not have been handled yet:", inner.Commands) } - var err Error + var err *Error select { case err = <-errCh: case <-time.After(10 * time.Millisecond): @@ -164,7 +164,7 @@ func TestMiddleware_ContextDeadline(t *testing.T) { if len(inner.Commands) != 0 { t.Error("the command should not have been handled yet:", inner.Commands) } - var err Error + var err *Error select { case err = <-errCh: case <-time.After(10 * time.Millisecond): diff --git a/middleware/commandhandler/validate/middleware.go b/middleware/commandhandler/validate/middleware.go index 320f2fb1..6f92f14b 100644 --- a/middleware/commandhandler/validate/middleware.go +++ b/middleware/commandhandler/validate/middleware.go @@ -43,7 +43,7 @@ func NewMiddleware() eh.CommandHandlerMiddleware { // Call the validation method if it exists. if c, ok := cmd.(Command); ok { if err := c.Validate(); err != nil { - return Error{err} + return &Error{err} } } @@ -59,17 +59,17 @@ type Error struct { } // Error implements the Error method of the error interface. -func (e Error) Error() string { +func (e *Error) Error() string { return fmt.Sprintf("invalid command: %s", e.err.Error()) } // Unwrap implements the errors.Unwrap method. -func (e Error) Unwrap() error { +func (e *Error) Unwrap() error { return e.err } // Cause implements the github.com/pkg/errors Unwrap method. -func (e Error) Cause() error { +func (e *Error) Cause() error { return e.Unwrap() } diff --git a/middleware/commandhandler/validate/middleware_test.go b/middleware/commandhandler/validate/middleware_test.go index 5cd7b426..e95a7019 100644 --- a/middleware/commandhandler/validate/middleware_test.go +++ b/middleware/commandhandler/validate/middleware_test.go @@ -52,7 +52,7 @@ func TestMiddleware_WithValidationError(t *testing.T) { e := errors.New("a validation error") c := CommandWithValidation(cmd, func() error { return e }) err := h.HandleCommand(context.Background(), c) - var validateErr Error + validateErr := &Error{} if !errors.As(err, &validateErr) { t.Error("there should be a validate error:", err) } diff --git a/middleware/eventhandler/async/middleware.go b/middleware/eventhandler/async/middleware.go index f61976da..7a634aac 100644 --- a/middleware/eventhandler/async/middleware.go +++ b/middleware/eventhandler/async/middleware.go @@ -23,8 +23,8 @@ import ( // NewMiddleware returns a new async handling middleware that returns any errors // on a error channel. -func NewMiddleware() (eh.EventHandlerMiddleware, chan Error) { - errCh := make(chan Error, 20) +func NewMiddleware() (eh.EventHandlerMiddleware, chan *Error) { + errCh := make(chan *Error, 20) return eh.EventHandlerMiddleware(func(h eh.EventHandler) eh.EventHandler { return &eventHandler{h, errCh} }), errCh @@ -32,7 +32,7 @@ func NewMiddleware() (eh.EventHandlerMiddleware, chan Error) { type eventHandler struct { eh.EventHandler - errCh chan Error + errCh chan *Error } // HandleEvent implements the HandleEvent method of the EventHandler. @@ -40,7 +40,7 @@ func (h *eventHandler) HandleEvent(ctx context.Context, event eh.Event) error { go func() { if err := h.EventHandler.HandleEvent(ctx, event); err != nil { // Always try to deliver errors. - h.errCh <- Error{err, ctx, event} + h.errCh <- &Error{err, ctx, event} } }() return nil @@ -57,16 +57,16 @@ type Error struct { } // Error implements the Error method of the error interface. -func (e Error) Error() string { +func (e *Error) Error() string { return fmt.Sprintf("%s: %s", e.Event.String(), e.Err.Error()) } // Unwrap implements the errors.Unwrap method. -func (e Error) Unwrap() error { +func (e *Error) Unwrap() error { return e.Err } // Cause implements the github.com/pkg/errors Unwrap method. -func (e Error) Cause() error { +func (e *Error) Cause() error { return e.Unwrap() } diff --git a/mocks/mocks.go b/mocks/mocks.go index 58b5f7ec..92deea0d 100644 --- a/mocks/mocks.go +++ b/mocks/mocks.go @@ -364,8 +364,8 @@ func (b *EventBus) AddHandler(ctx context.Context, m eh.EventMatcher, h eh.Event } // Errors implements the Errors method of the eventhorizon.EventBus interface. -func (b *EventBus) Errors() <-chan eh.EventBusError { - return make(chan eh.EventBusError) +func (b *EventBus) Errors() <-chan *eh.EventBusError { + return make(chan *eh.EventBusError) } // Close implements the Close method of the eventhorizon.EventBus interface. diff --git a/namespace/error.go b/namespace/error.go index 3e022bcc..6271b91b 100644 --- a/namespace/error.go +++ b/namespace/error.go @@ -26,16 +26,16 @@ type Error struct { } // Error implements the Error method of the errors.Error interface. -func (e Error) Error() string { +func (e *Error) Error() string { return fmt.Sprintf("%s (%s)", e.Err, e.Namespace) } // Unwrap implements the errors.Unwrap method. -func (e Error) Unwrap() error { +func (e *Error) Unwrap() error { return e.Err } // Cause implements the github.com/pkg/errors Unwrap method. -func (e Error) Cause() error { +func (e *Error) Cause() error { return e.Unwrap() } diff --git a/repo.go b/repo.go index d6ce26e7..0d327b84 100644 --- a/repo.go +++ b/repo.go @@ -101,7 +101,7 @@ type RepoError struct { } // Error implements the Error method of the errors.Error interface. -func (e RepoError) Error() string { +func (e *RepoError) Error() string { str := "repo: " if e.Op != "" { @@ -122,11 +122,11 @@ func (e RepoError) Error() string { } // Unwrap implements the errors.Unwrap method. -func (e RepoError) Unwrap() error { +func (e *RepoError) Unwrap() error { return e.Err } // Cause implements the github.com/pkg/errors Unwrap method. -func (e RepoError) Cause() error { +func (e *RepoError) Cause() error { return e.Unwrap() } diff --git a/repo/acceptance_testing.go b/repo/acceptance_testing.go index e4b6addc..8b5a01d1 100644 --- a/repo/acceptance_testing.go +++ b/repo/acceptance_testing.go @@ -60,7 +60,7 @@ func AcceptanceTest(t *testing.T, repo eh.ReadWriteRepo, ctx context.Context) { CreatedAt: time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC), } err = repo.Save(ctx, entityMissingID) - repoErr := eh.RepoError{} + repoErr := &eh.RepoError{} if !errors.As(err, &repoErr) || repoErr.Err.Error() != "missing entity ID" { t.Error("there should be a repo error:", err) } diff --git a/repo/memory/repo.go b/repo/memory/repo.go index a2065d56..7c88e5f0 100644 --- a/repo/memory/repo.go +++ b/repo/memory/repo.go @@ -66,7 +66,7 @@ func IntoRepo(ctx context.Context, repo eh.ReadRepo) *Repo { // Find implements the Find method of the eventhorizon.ReadRepo interface. func (r *Repo) Find(ctx context.Context, id uuid.UUID) (eh.Entity, error) { if r.factoryFn == nil { - return nil, eh.RepoError{ + return nil, &eh.RepoError{ Err: ErrModelNotSet, Op: eh.RepoOpFind, EntityID: id, @@ -79,7 +79,7 @@ func (r *Repo) Find(ctx context.Context, id uuid.UUID) (eh.Entity, error) { // Fetch entity. b, ok := r.db[id] if !ok { - return nil, eh.RepoError{ + return nil, &eh.RepoError{ Err: eh.ErrEntityNotFound, Op: eh.RepoOpFind, EntityID: id, @@ -89,7 +89,7 @@ func (r *Repo) Find(ctx context.Context, id uuid.UUID) (eh.Entity, error) { // Unmarshal. entity := r.factoryFn() if err := json.Unmarshal(b, &entity); err != nil { - return nil, eh.RepoError{ + return nil, &eh.RepoError{ Err: fmt.Errorf("could not unmarshal: %w", err), Op: eh.RepoOpFind, EntityID: id, @@ -102,7 +102,7 @@ func (r *Repo) Find(ctx context.Context, id uuid.UUID) (eh.Entity, error) { // FindAll implements the FindAll method of the eventhorizon.ReadRepo interface. func (r *Repo) FindAll(ctx context.Context) ([]eh.Entity, error) { if r.factoryFn == nil { - return nil, eh.RepoError{ + return nil, &eh.RepoError{ Err: ErrModelNotSet, Op: eh.RepoOpFindAll, } @@ -115,7 +115,7 @@ func (r *Repo) FindAll(ctx context.Context) ([]eh.Entity, error) { if b, ok := r.db[id]; ok { entity := r.factoryFn() if err := json.Unmarshal(b, &entity); err != nil { - return nil, eh.RepoError{ + return nil, &eh.RepoError{ Err: fmt.Errorf("could not unmarshal: %w", err), Op: eh.RepoOpFindAll, } @@ -130,7 +130,7 @@ func (r *Repo) FindAll(ctx context.Context) ([]eh.Entity, error) { // Save implements the Save method of the eventhorizon.WriteRepo interface. func (r *Repo) Save(ctx context.Context, entity eh.Entity) error { if r.factoryFn == nil { - return eh.RepoError{ + return &eh.RepoError{ Err: ErrModelNotSet, Op: eh.RepoOpSave, } @@ -138,7 +138,7 @@ func (r *Repo) Save(ctx context.Context, entity eh.Entity) error { id := entity.EntityID() if id == uuid.Nil { - return eh.RepoError{ + return &eh.RepoError{ Err: fmt.Errorf("missing entity ID"), Op: eh.RepoOpSave, } @@ -150,7 +150,7 @@ func (r *Repo) Save(ctx context.Context, entity eh.Entity) error { // Insert entity. b, err := json.Marshal(entity) if err != nil { - return eh.RepoError{ + return &eh.RepoError{ Err: fmt.Errorf("could not marshal: %w", err), Op: eh.RepoOpSave, EntityID: id, @@ -185,7 +185,7 @@ func (r *Repo) Remove(ctx context.Context, id uuid.UUID) error { return nil } - return eh.RepoError{ + return &eh.RepoError{ Err: eh.ErrEntityNotFound, Op: eh.RepoOpRemove, EntityID: id, diff --git a/repo/mongodb/repo.go b/repo/mongodb/repo.go index 95e9669b..05d46151 100644 --- a/repo/mongodb/repo.go +++ b/repo/mongodb/repo.go @@ -105,7 +105,7 @@ func IntoRepo(ctx context.Context, repo eh.ReadRepo) *Repo { // Find implements the Find method of the eventhorizon.ReadRepo interface. func (r *Repo) Find(ctx context.Context, id uuid.UUID) (eh.Entity, error) { if r.newEntity == nil { - return nil, eh.RepoError{ + return nil, &eh.RepoError{ Err: ErrModelNotSet, Op: eh.RepoOpFind, EntityID: id, @@ -117,7 +117,7 @@ func (r *Repo) Find(ctx context.Context, id uuid.UUID) (eh.Entity, error) { if err == mongo.ErrNoDocuments { err = eh.ErrEntityNotFound } - return nil, eh.RepoError{ + return nil, &eh.RepoError{ Err: err, Op: eh.RepoOpFind, EntityID: id, @@ -130,7 +130,7 @@ func (r *Repo) Find(ctx context.Context, id uuid.UUID) (eh.Entity, error) { // FindAll implements the FindAll method of the eventhorizon.ReadRepo interface. func (r *Repo) FindAll(ctx context.Context) ([]eh.Entity, error) { if r.newEntity == nil { - return nil, eh.RepoError{ + return nil, &eh.RepoError{ Err: ErrModelNotSet, Op: eh.RepoOpFindAll, } @@ -138,7 +138,7 @@ func (r *Repo) FindAll(ctx context.Context) ([]eh.Entity, error) { cursor, err := r.entities.Find(ctx, bson.M{}) if err != nil { - return nil, eh.RepoError{ + return nil, &eh.RepoError{ Err: fmt.Errorf("could not find: %w", err), Op: eh.RepoOpFindAll, } @@ -148,7 +148,7 @@ func (r *Repo) FindAll(ctx context.Context) ([]eh.Entity, error) { for cursor.Next(ctx) { entity := r.newEntity() if err := cursor.Decode(entity); err != nil { - return nil, eh.RepoError{ + return nil, &eh.RepoError{ Err: fmt.Errorf("could not unmarshal: %w", err), Op: eh.RepoOpFindAll, } @@ -157,7 +157,7 @@ func (r *Repo) FindAll(ctx context.Context) ([]eh.Entity, error) { } if err := cursor.Close(ctx); err != nil { - return nil, eh.RepoError{ + return nil, &eh.RepoError{ Err: fmt.Errorf("could not close cursor: %w", err), Op: eh.RepoOpFindAll, } @@ -199,7 +199,7 @@ func (i *iter) Close(ctx context.Context) error { // FindCustomIter returns a mgo cursor you can use to stream results of very large datasets func (r *Repo) FindCustomIter(ctx context.Context, f func(context.Context, *mongo.Collection) (*mongo.Cursor, error)) (eh.Iter, error) { if r.newEntity == nil { - return nil, eh.RepoError{ + return nil, &eh.RepoError{ Err: ErrModelNotSet, Op: eh.RepoOpFindQuery, } @@ -207,13 +207,13 @@ func (r *Repo) FindCustomIter(ctx context.Context, f func(context.Context, *mong cursor, err := f(ctx, r.entities) if err != nil { - return nil, eh.RepoError{ + return nil, &eh.RepoError{ Err: fmt.Errorf("could not find: %w", err), Op: eh.RepoOpFindQuery, } } if cursor == nil { - return nil, eh.RepoError{ + return nil, &eh.RepoError{ Err: fmt.Errorf("no cursor"), Op: eh.RepoOpFindQuery, } @@ -232,7 +232,7 @@ func (r *Repo) FindCustomIter(ctx context.Context, f func(context.Context, *mong // query from the callback. func (r *Repo) FindCustom(ctx context.Context, f func(context.Context, *mongo.Collection) (*mongo.Cursor, error)) ([]interface{}, error) { if r.newEntity == nil { - return nil, eh.RepoError{ + return nil, &eh.RepoError{ Err: ErrModelNotSet, Op: eh.RepoOpFindQuery, } @@ -240,13 +240,13 @@ func (r *Repo) FindCustom(ctx context.Context, f func(context.Context, *mongo.Co cursor, err := f(ctx, r.entities) if err != nil { - return nil, eh.RepoError{ + return nil, &eh.RepoError{ Err: fmt.Errorf("could not find: %w", err), Op: eh.RepoOpFindQuery, } } if cursor == nil { - return nil, eh.RepoError{ + return nil, &eh.RepoError{ Err: fmt.Errorf("no cursor"), Op: eh.RepoOpFindQuery, } @@ -256,7 +256,7 @@ func (r *Repo) FindCustom(ctx context.Context, f func(context.Context, *mongo.Co entity := r.newEntity() for cursor.Next(ctx) { if err := cursor.Decode(entity); err != nil { - return nil, eh.RepoError{ + return nil, &eh.RepoError{ Err: fmt.Errorf("could not unmarshal: %w", err), Op: eh.RepoOpFindQuery, } @@ -265,7 +265,7 @@ func (r *Repo) FindCustom(ctx context.Context, f func(context.Context, *mongo.Co entity = r.newEntity() } if err := cursor.Close(ctx); err != nil { - return nil, eh.RepoError{ + return nil, &eh.RepoError{ Err: fmt.Errorf("could not close cursor: %w", err), Op: eh.RepoOpFindQuery, } @@ -278,7 +278,7 @@ func (r *Repo) FindCustom(ctx context.Context, f func(context.Context, *mongo.Co func (r *Repo) Save(ctx context.Context, entity eh.Entity) error { id := entity.EntityID() if id == uuid.Nil { - return eh.RepoError{ + return &eh.RepoError{ Err: fmt.Errorf("missing entity ID"), Op: eh.RepoOpSave, } @@ -293,7 +293,7 @@ func (r *Repo) Save(ctx context.Context, entity eh.Entity) error { }, options.Update().SetUpsert(true), ); err != nil { - return eh.RepoError{ + return &eh.RepoError{ Err: fmt.Errorf("could not save/update: %w", err), Op: eh.RepoOpSave, EntityID: id, @@ -305,13 +305,13 @@ func (r *Repo) Save(ctx context.Context, entity eh.Entity) error { // Remove implements the Remove method of the eventhorizon.WriteRepo interface. func (r *Repo) Remove(ctx context.Context, id uuid.UUID) error { if r, err := r.entities.DeleteOne(ctx, bson.M{"_id": id.String()}); err != nil { - return eh.RepoError{ + return &eh.RepoError{ Err: err, Op: eh.RepoOpRemove, EntityID: id, } } else if r.DeletedCount == 0 { - return eh.RepoError{ + return &eh.RepoError{ Err: eh.ErrEntityNotFound, Op: eh.RepoOpRemove, EntityID: id, @@ -324,7 +324,7 @@ func (r *Repo) Remove(ctx context.Context, id uuid.UUID) error { // Collection lets the function do custom actions on the collection. func (r *Repo) Collection(ctx context.Context, f func(context.Context, *mongo.Collection) error) error { if err := f(ctx, r.entities); err != nil { - return eh.RepoError{ + return &eh.RepoError{ Err: err, } } @@ -349,7 +349,7 @@ func (r *Repo) SetEntityFactory(f func() eh.Entity) { // Clear clears the read model database. func (r *Repo) Clear(ctx context.Context) error { if err := r.entities.Drop(ctx); err != nil { - return eh.RepoError{ + return &eh.RepoError{ Err: fmt.Errorf("could not drop collection: %w", err), Op: eh.RepoOpClear, } diff --git a/repo/mongodb/repo_test.go b/repo/mongodb/repo_test.go index 99e0efb4..3a69b38f 100644 --- a/repo/mongodb/repo_test.go +++ b/repo/mongodb/repo_test.go @@ -101,7 +101,7 @@ func extraRepoTests(t *testing.T, r *Repo) { result, err = r.FindCustom(ctx, func(ctx context.Context, c *mongo.Collection) (*mongo.Cursor, error) { return nil, nil }) - repoErr := eh.RepoError{} + repoErr := &eh.RepoError{} if !errors.As(err, &repoErr) || repoErr.Err.Error() != "no cursor" { t.Error("there should be a invalid query error:", err) } @@ -116,7 +116,7 @@ func extraRepoTests(t *testing.T, r *Repo) { // Be sure to return nil to not execute the query again in FindCustom. return nil, nil }) - repoErr = eh.RepoError{} + repoErr = &eh.RepoError{} if !errors.As(err, &repoErr) || repoErr.Err.Error() != "no cursor" { t.Error("there should be a invalid query error:", err) } diff --git a/repo/tracing/repo.go b/repo/tracing/repo.go index 00a30b1c..4727dc74 100644 --- a/repo/tracing/repo.go +++ b/repo/tracing/repo.go @@ -16,6 +16,7 @@ package tracing import ( "context" + "errors" "github.com/opentracing/opentracing-go" "github.com/opentracing/opentracing-go/ext" @@ -60,8 +61,7 @@ func (r *Repo) Find(ctx context.Context, id uuid.UUID) (eh.Entity, error) { entity, err := r.ReadWriteRepo.Find(ctx, id) sp.SetTag("eh.aggregate_id", id) - if rrErr, ok := err.(eh.RepoError); err != nil && - !(ok && rrErr.Err == eh.ErrEntityNotFound) { + if err != nil && !errors.Is(err, eh.ErrEntityNotFound) { ext.LogError(sp, err) } sp.Finish() diff --git a/repo/version/repo.go b/repo/version/repo.go index dbace72c..bb6631d4 100644 --- a/repo/version/repo.go +++ b/repo/version/repo.go @@ -16,6 +16,7 @@ package version import ( "context" + "errors" "time" "github.com/jpillora/backoff" @@ -75,8 +76,7 @@ func (r *Repo) Find(ctx context.Context, id uuid.UUID) (eh.Entity, error) { _, hasDeadline := ctx.Deadline() for { entity, err := r.findMinVersion(ctx, id, minVersion) - if rrErr, ok := err.(eh.RepoError); ok && - (rrErr.Err == eh.ErrIncorrectEntityVersion || rrErr.Err == eh.ErrEntityNotFound) { + if errors.Is(err, eh.ErrIncorrectEntityVersion) || errors.Is(err, eh.ErrEntityNotFound) { // Try again for incorrect version or if the entity was not found. } else if err != nil { // Return any real error. @@ -109,13 +109,13 @@ func (r *Repo) findMinVersion(ctx context.Context, id uuid.UUID, minVersion int) versionable, ok := entity.(eh.Versionable) if !ok { - return nil, eh.RepoError{ + return nil, &eh.RepoError{ Err: eh.ErrEntityHasNoVersion, } } if versionable.AggregateVersion() < minVersion { - return nil, eh.RepoError{ + return nil, &eh.RepoError{ Err: eh.ErrIncorrectEntityVersion, } } diff --git a/repo/version/repo_test.go b/repo/version/repo_test.go index 0c0f33c3..f6415378 100644 --- a/repo/version/repo_test.go +++ b/repo/version/repo_test.go @@ -67,7 +67,7 @@ func extraRepoTests(t *testing.T, r *Repo, baseRepo *memory.Repo) { // Find with min version without version. ctxVersion := NewContextWithMinVersion(ctx, 1) _, err := r.Find(ctxVersion, simpleModel.ID) - repoErr := eh.RepoError{} + repoErr := &eh.RepoError{} if !errors.As(err, &repoErr) || !errors.Is(err, eh.ErrEntityHasNoVersion) { t.Error("there should be a model has no version error:", err) } @@ -90,7 +90,7 @@ func extraRepoTests(t *testing.T, r *Repo, baseRepo *memory.Repo) { // Find with min version, too low. ctxVersion = NewContextWithMinVersion(ctx, 2) _, err = r.Find(ctxVersion, m1.ID) - repoErr = eh.RepoError{} + repoErr = &eh.RepoError{} if !errors.As(err, &repoErr) || !errors.Is(err, eh.ErrIncorrectEntityVersion) { t.Error("there should be a incorrect model version error:", err) }