Skip to content

Commit

Permalink
fix: Return errors as pointers
Browse files Browse the repository at this point in the history
  • Loading branch information
maxekman committed Oct 22, 2021
1 parent 2312aba commit 220c3de
Show file tree
Hide file tree
Showing 44 changed files with 222 additions and 235 deletions.
12 changes: 6 additions & 6 deletions aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ type AggregateStoreError struct {
}

// Error implements the Error method of the error interface.
func (e AggregateStoreError) Error() string {
func (e *AggregateStoreError) Error() string {
str := "aggregate store: "

if e.Op != "" {
Expand All @@ -113,12 +113,12 @@ func (e AggregateStoreError) Error() string {
}

// Unwrap implements the errors.Unwrap method.
func (e AggregateStoreError) Unwrap() error {
func (e *AggregateStoreError) Unwrap() error {
return e.Err
}

// Cause implements the github.com/pkg/errors Unwrap method.
func (e AggregateStoreError) Cause() error {
func (e *AggregateStoreError) Cause() error {
return e.Unwrap()
}

Expand All @@ -129,17 +129,17 @@ type AggregateError struct {
}

// Error implements the Error method of the errors.Error interface.
func (e AggregateError) Error() string {
func (e *AggregateError) Error() string {
return "aggregate error: " + e.Err.Error()
}

// Unwrap implements the errors.Unwrap method.
func (e AggregateError) Unwrap() error {
func (e *AggregateError) Unwrap() error {
return e.Err
}

// Cause implements the github.com/pkg/errors Unwrap method.
func (e AggregateError) Cause() error {
func (e *AggregateError) Cause() error {
return e.Unwrap()
}

Expand Down
14 changes: 7 additions & 7 deletions aggregatestore/events/aggregatestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func NewAggregateStore(store eh.EventStore) (*AggregateStore, error) {
func (r *AggregateStore) Load(ctx context.Context, aggregateType eh.AggregateType, id uuid.UUID) (eh.Aggregate, error) {
agg, err := eh.CreateAggregate(aggregateType, id)
if err != nil {
return nil, eh.AggregateStoreError{
return nil, &eh.AggregateStoreError{
Err: err,
Op: eh.AggregateStoreOpLoad,
AggregateType: aggregateType,
Expand All @@ -71,7 +71,7 @@ func (r *AggregateStore) Load(ctx context.Context, aggregateType eh.AggregateTyp

a, ok := agg.(VersionedAggregate)
if !ok {
return nil, eh.AggregateStoreError{
return nil, &eh.AggregateStoreError{
Err: ErrAggregateNotVersioned,
Op: eh.AggregateStoreOpLoad,
AggregateType: aggregateType,
Expand All @@ -81,7 +81,7 @@ func (r *AggregateStore) Load(ctx context.Context, aggregateType eh.AggregateTyp

events, err := r.store.Load(ctx, a.EntityID())
if err != nil && !errors.Is(err, eh.ErrAggregateNotFound) {
return nil, eh.AggregateStoreError{
return nil, &eh.AggregateStoreError{
Err: err,
Op: eh.AggregateStoreOpLoad,
AggregateType: aggregateType,
Expand All @@ -90,7 +90,7 @@ func (r *AggregateStore) Load(ctx context.Context, aggregateType eh.AggregateTyp
}

if err := r.applyEvents(ctx, a, events); err != nil {
return nil, eh.AggregateStoreError{
return nil, &eh.AggregateStoreError{
Err: err,
Op: eh.AggregateStoreOpLoad,
AggregateType: aggregateType,
Expand All @@ -106,7 +106,7 @@ func (r *AggregateStore) Load(ctx context.Context, aggregateType eh.AggregateTyp
func (r *AggregateStore) Save(ctx context.Context, agg eh.Aggregate) error {
a, ok := agg.(VersionedAggregate)
if !ok {
return eh.AggregateStoreError{
return &eh.AggregateStoreError{
Err: ErrAggregateNotVersioned,
Op: eh.AggregateStoreOpSave,
AggregateType: agg.AggregateType(),
Expand All @@ -121,7 +121,7 @@ func (r *AggregateStore) Save(ctx context.Context, agg eh.Aggregate) error {
}

if err := r.store.Save(ctx, events, a.AggregateVersion()); err != nil {
return eh.AggregateStoreError{
return &eh.AggregateStoreError{
Err: err,
Op: eh.AggregateStoreOpSave,
AggregateType: agg.AggregateType(),
Expand All @@ -134,7 +134,7 @@ func (r *AggregateStore) Save(ctx context.Context, agg eh.Aggregate) error {
// Apply the events in case the aggregate needs to be further used
// after this save. Currently it is not reused.
if err := r.applyEvents(ctx, a, events); err != nil {
return eh.AggregateStoreError{
return &eh.AggregateStoreError{
Err: err,
Op: eh.AggregateStoreOpSave,
AggregateType: agg.AggregateType(),
Expand Down
4 changes: 2 additions & 2 deletions aggregatestore/events/aggregatestore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ func TestAggregateStore_SaveEvents(t *testing.T) {
storeErr := errors.New("store error")
eventStore.Err = storeErr
err = store.Save(ctx, agg)
aggStoreErr := eh.AggregateStoreError{}
aggStoreErr := &eh.AggregateStoreError{}
if !errors.As(err, &aggStoreErr) || !errors.Is(err, storeErr) {
t.Error("there should be an aggregate store error:", err)
}
Expand All @@ -214,7 +214,7 @@ func TestAggregateStore_SaveEvents(t *testing.T) {
aggErr := errors.New("aggregate error")
agg.err = aggErr
err = store.Save(ctx, agg)
aggStoreErr = eh.AggregateStoreError{}
aggStoreErr = &eh.AggregateStoreError{}
if !errors.As(err, &aggStoreErr) || !errors.Is(err, aggErr) {
t.Error("there should be an aggregate store error:", err)
}
Expand Down
2 changes: 1 addition & 1 deletion aggregatestore/model/aggregatestore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func TestAggregateStore_LoadNotFound(t *testing.T) {
ctx := context.Background()

id := uuid.New()
repo.LoadErr = eh.RepoError{Err: eh.ErrEntityNotFound}
repo.LoadErr = &eh.RepoError{Err: eh.ErrEntityNotFound}
agg, err := store.Load(ctx, AggregateType, id)
if err != nil {
t.Fatal("there should be no error:", err)
Expand Down
4 changes: 2 additions & 2 deletions command_check.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ type CommandFieldError struct {
}

// Error implements the Error method of the error interface.
func (c CommandFieldError) Error() string {
func (c *CommandFieldError) Error() string {
return "missing field: " + c.Field
}

Expand Down Expand Up @@ -62,7 +62,7 @@ func CheckCommand(cmd Command) error {
}

if zero {
return CommandFieldError{field.Name}
return &CommandFieldError{field.Name}
}
}
return nil
Expand Down
2 changes: 1 addition & 1 deletion commandhandler/aggregate/commandhandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func (h *CommandHandler) HandleCommand(ctx context.Context, cmd eh.Command) erro
}

if err = a.HandleCommand(ctx, cmd); err != nil {
return eh.AggregateError{Err: err}
return &eh.AggregateError{Err: err}
}

return h.store.Save(ctx, a)
Expand Down
2 changes: 1 addition & 1 deletion commandhandler/aggregate/commandhandler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func TestCommandHandler_ErrorInHandler(t *testing.T) {
Content: "command1",
}
err := h.HandleCommand(context.Background(), cmd)
var aggregateErr eh.AggregateError
aggregateErr := &eh.AggregateError{}
if !errors.As(err, &aggregateErr) || !errors.Is(err, commandErr) {
t.Error("there should be a command error:", err)
}
Expand Down
8 changes: 4 additions & 4 deletions eventbus.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ type EventBus interface {
AddHandler(context.Context, EventMatcher, EventHandler) error

// Errors returns an error channel where async handling errors are sent.
Errors() <-chan EventBusError
Errors() <-chan *EventBusError

// Close closes the EventBus and waits for all handlers to finish.
Close() error
Expand All @@ -58,7 +58,7 @@ type EventBusError struct {
}

// Error implements the Error method of the error interface.
func (e EventBusError) Error() string {
func (e *EventBusError) Error() string {
str := "event bus: "

if e.Err != nil {
Expand All @@ -75,11 +75,11 @@ func (e EventBusError) Error() string {
}

// Unwrap implements the errors.Unwrap method.
func (e EventBusError) Unwrap() error {
func (e *EventBusError) Unwrap() error {
return e.Err
}

// Cause implements the github.com/pkg/errors Unwrap method.
func (e EventBusError) Cause() error {
func (e *EventBusError) Cause() error {
return e.Unwrap()
}
12 changes: 6 additions & 6 deletions eventbus/gcp/eventbus.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ type EventBus struct {
topic *pubsub.Topic
registered map[eh.EventHandlerType]struct{}
registeredMu sync.RWMutex
errCh chan eh.EventBusError
errCh chan *eh.EventBusError
cctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
Expand All @@ -52,7 +52,7 @@ func NewEventBus(projectID, appID string, options ...Option) (*EventBus, error)
b := &EventBus{
appID: appID,
registered: map[eh.EventHandlerType]struct{}{},
errCh: make(chan eh.EventBusError, 100),
errCh: make(chan *eh.EventBusError, 100),
cctx: ctx,
cancel: cancel,
codec: &json.EventCodec{},
Expand Down Expand Up @@ -206,7 +206,7 @@ func (b *EventBus) AddHandler(ctx context.Context, m eh.EventMatcher, h eh.Event
}

// Errors implements the Errors method of the eventhorizon.EventBus interface.
func (b *EventBus) Errors() <-chan eh.EventBusError {
func (b *EventBus) Errors() <-chan *eh.EventBusError {
return b.errCh
}

Expand All @@ -231,7 +231,7 @@ func (b *EventBus) handle(m eh.EventMatcher, h eh.EventHandler, sub *pubsub.Subs
if err := sub.Receive(b.cctx, b.handler(m, h)); err != nil {
err = fmt.Errorf("could not receive: %w", err)
select {
case b.errCh <- eh.EventBusError{Err: err}:
case b.errCh <- &eh.EventBusError{Err: err}:
default:
log.Printf("eventhorizon: missed error in GCP event bus: %s", err)
}
Expand All @@ -250,7 +250,7 @@ func (b *EventBus) handler(m eh.EventMatcher, h eh.EventHandler) func(ctx contex
if err != nil {
err = fmt.Errorf("could not unmarshal event: %w", err)
select {
case b.errCh <- eh.EventBusError{Err: err, Ctx: ctx}:
case b.errCh <- &eh.EventBusError{Err: err, Ctx: ctx}:
default:
log.Printf("eventhorizon: missed error in GCP event bus: %s", err)
}
Expand All @@ -268,7 +268,7 @@ func (b *EventBus) handler(m eh.EventMatcher, h eh.EventHandler) func(ctx contex
if err := h.HandleEvent(ctx, event); err != nil {
err = fmt.Errorf("could not handle event (%s): %w", h.HandlerType(), err)
select {
case b.errCh <- eh.EventBusError{Err: err, Ctx: ctx, Event: event}:
case b.errCh <- &eh.EventBusError{Err: err, Ctx: ctx, Event: event}:
default:
log.Printf("eventhorizon: missed error in GCP event bus: %s", err)
}
Expand Down
25 changes: 12 additions & 13 deletions eventbus/kafka/eventbus.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ type EventBus struct {
writer *kafka.Writer
registered map[eh.EventHandlerType]struct{}
registeredMu sync.RWMutex
errCh chan eh.EventBusError
errCh chan *eh.EventBusError
cctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
Expand All @@ -55,7 +55,7 @@ func NewEventBus(addr, appID string, options ...Option) (*EventBus, error) {
appID: appID,
topic: appID + "_events",
registered: map[eh.EventHandlerType]struct{}{},
errCh: make(chan eh.EventBusError, 100),
errCh: make(chan *eh.EventBusError, 100),
cctx: ctx,
cancel: cancel,
codec: &json.EventCodec{},
Expand Down Expand Up @@ -217,7 +217,7 @@ func (b *EventBus) AddHandler(ctx context.Context, m eh.EventMatcher, h eh.Event
}

// Errors implements the Errors method of the eventhorizon.EventBus interface.
func (b *EventBus) Errors() <-chan eh.EventBusError {
func (b *EventBus) Errors() <-chan *eh.EventBusError {
return b.errCh
}

Expand Down Expand Up @@ -249,7 +249,7 @@ func (b *EventBus) handle(m eh.EventMatcher, h eh.EventHandler, r *kafka.Reader)
} else if err != nil {
err = fmt.Errorf("could not fetch message: %w", err)
select {
case b.errCh <- eh.EventBusError{Err: err}:
case b.errCh <- &eh.EventBusError{Err: err}:
default:
log.Printf("eventhorizon: missed error in Kafka event bus: %s", err)
}
Expand All @@ -258,8 +258,7 @@ func (b *EventBus) handle(m eh.EventMatcher, h eh.EventHandler, r *kafka.Reader)
continue
}

var noBusError eh.EventBusError
if err := handler(b.cctx, msg); err != noBusError {
if err := handler(b.cctx, msg); err != nil {
select {
case b.errCh <- err:
default:
Expand All @@ -272,7 +271,7 @@ func (b *EventBus) handle(m eh.EventMatcher, h eh.EventHandler, r *kafka.Reader)
if err := r.CommitMessages(context.Background(), msg); err != nil {
err = fmt.Errorf("could not commit message: %w", err)
select {
case b.errCh <- eh.EventBusError{Err: err}:
case b.errCh <- &eh.EventBusError{Err: err}:
default:
log.Printf("eventhorizon: missed error in Kafka event bus: %s", err)
}
Expand All @@ -284,30 +283,30 @@ func (b *EventBus) handle(m eh.EventMatcher, h eh.EventHandler, r *kafka.Reader)
}
}

func (b *EventBus) handler(m eh.EventMatcher, h eh.EventHandler, r *kafka.Reader) func(ctx context.Context, msg kafka.Message) eh.EventBusError {
return func(ctx context.Context, msg kafka.Message) eh.EventBusError {
func (b *EventBus) handler(m eh.EventMatcher, h eh.EventHandler, r *kafka.Reader) func(ctx context.Context, msg kafka.Message) *eh.EventBusError {
return func(ctx context.Context, msg kafka.Message) *eh.EventBusError {
event, ctx, err := b.codec.UnmarshalEvent(ctx, msg.Value)
if err != nil {
return eh.EventBusError{
return &eh.EventBusError{
Err: fmt.Errorf("could not unmarshal event: %w", err),
Ctx: ctx,
}
}

// Ignore non-matching events.
if !m.Match(event) {
return eh.EventBusError{}
return nil
}

// Handle the event if it did match.
if err := h.HandleEvent(ctx, event); err != nil {
return eh.EventBusError{
return &eh.EventBusError{
Err: fmt.Errorf("could not handle event (%s): %w", h.HandlerType(), err),
Ctx: ctx,
Event: event,
}
}

return eh.EventBusError{}
return nil
}
}
10 changes: 5 additions & 5 deletions eventbus/local/eventbus.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type EventBus struct {
group *Group
registered map[eh.EventHandlerType]struct{}
registeredMu sync.RWMutex
errCh chan eh.EventBusError
errCh chan *eh.EventBusError
cctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
Expand All @@ -48,7 +48,7 @@ func NewEventBus(options ...Option) *EventBus {
b := &EventBus{
group: NewGroup(),
registered: map[eh.EventHandlerType]struct{}{},
errCh: make(chan eh.EventBusError, 100),
errCh: make(chan *eh.EventBusError, 100),
cctx: ctx,
cancel: cancel,
codec: &json.EventCodec{},
Expand Down Expand Up @@ -127,7 +127,7 @@ func (b *EventBus) AddHandler(ctx context.Context, m eh.EventMatcher, h eh.Event
}

// Errors implements the Errors method of the eventhorizon.EventBus interface.
func (b *EventBus) Errors() <-chan eh.EventBusError {
func (b *EventBus) Errors() <-chan *eh.EventBusError {
return b.errCh
}

Expand Down Expand Up @@ -160,7 +160,7 @@ func (b *EventBus) handle(m eh.EventMatcher, h eh.EventHandler, ch <-chan []byte
if err != nil {
err = fmt.Errorf("could not unmarshal event: %w", err)
select {
case b.errCh <- eh.EventBusError{Err: err, Ctx: ctx}:
case b.errCh <- &eh.EventBusError{Err: err, Ctx: ctx}:
default:
log.Printf("eventhorizon: missed error in local event bus: %s", err)
}
Expand All @@ -176,7 +176,7 @@ func (b *EventBus) handle(m eh.EventMatcher, h eh.EventHandler, ch <-chan []byte
if err := h.HandleEvent(ctx, event); err != nil {
err = fmt.Errorf("could not handle event (%s): %s", h.HandlerType(), err.Error())
select {
case b.errCh <- eh.EventBusError{Err: err, Ctx: ctx, Event: event}:
case b.errCh <- &eh.EventBusError{Err: err, Ctx: ctx, Event: event}:
default:
log.Printf("eventhorizon: missed error in local event bus: %s", err)
}
Expand Down
Loading

0 comments on commit 220c3de

Please sign in to comment.