diff --git a/eventbus/redis/eventbus.go b/eventbus/redis/eventbus.go index 56629796..a11a739f 100644 --- a/eventbus/redis/eventbus.go +++ b/eventbus/redis/eventbus.go @@ -269,11 +269,10 @@ func (b *EventBus) handler(m eh.EventMatcher, h eh.EventHandler, groupName strin // Ignore non-matching events. if !m.Match(event) { - _, err := b.client.XAck(ctx, b.streamName, groupName, msg.ID).Result() - if err != nil { - err = fmt.Errorf("could not ack event: %w", err) + if _, err := b.client.XAck(ctx, b.streamName, groupName, msg.ID).Result(); err != nil { + err = fmt.Errorf("could not ack non-matching event: %w", err) select { - case b.errCh <- &eh.EventBusError{Err: err, Ctx: ctx}: + case b.errCh <- &eh.EventBusError{Err: err, Ctx: ctx, Event: event}: default: log.Printf("eventhorizon: missed error in Redis event bus: %s", err) } @@ -297,9 +296,9 @@ func (b *EventBus) handler(m eh.EventMatcher, h eh.EventHandler, groupName strin _, err = b.client.XAck(ctx, b.streamName, groupName, msg.ID).Result() if err != nil { - err = fmt.Errorf("could not ack event: %w", err) + err = fmt.Errorf("could not ack handled event: %w", err) select { - case b.errCh <- &eh.EventBusError{Err: err, Ctx: ctx}: + case b.errCh <- &eh.EventBusError{Err: err, Ctx: ctx, Event: event}: default: log.Printf("eventhorizon: missed error in Redis event bus: %s", err) }