Skip to content

Commit

Permalink
at least once with receive pending messages
Browse files Browse the repository at this point in the history
  • Loading branch information
covrom committed Nov 27, 2022
1 parent f26e03b commit 81b5252
Showing 1 changed file with 29 additions and 4 deletions.
33 changes: 29 additions & 4 deletions subscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,21 +87,47 @@ func (s *subscription) ReceiveBatch(ctx context.Context, maxMessages int) ([]*dr
// if maxMessages > 0 {
// args.Count = int64(maxMessages)
// }
xStreamSlice, err := s.broker.XReadGroup(ctx, &args).Result()

// What will happen if we crash in the middle of processing messages,
// is that our messages will remain in the pending entries list,
// so we can access our history by giving XREADGROUP initially an ID of 0,
// and performing the same loop. Once providing an ID of 0 the reply
// is an empty set of messages, we know that we processed and acknowledged
// all the pending messages.
args0 := args
args0.Streams = []string{args.Streams[0], "0"}
if dm, err := s.receiveNextMessage(ctx, &args0); dm != nil && err == nil {
return []*driver.Message{dm}, nil
}

// We can start to use > as ID, in order to get the new messages
// and rejoin the consumers that are processing new things.
dm, err := s.receiveNextMessage(ctx, &args)
if err != nil {
return nil, err
}
return []*driver.Message{dm}, nil
}

func (s *subscription) receiveNextMessage(ctx context.Context, args *redis.XReadGroupArgs) (*driver.Message, error) {
xStreamSlice, err := s.broker.XReadGroup(ctx, args).Result()
if err != nil || ctx.Err() != nil {
if err == nil {
err = ctx.Err()
}
return nil, err
}
if len(xStreamSlice) == 0 || len(xStreamSlice[0].Messages) == 0 {
return nil, nil
}
msg := xStreamSlice[0].Messages[0]
bd := []byte(msg.Values["body"].(string))
var bm map[string]string
if err := json.Unmarshal([]byte(msg.Values["headers"].(string)), &bm); err != nil {
return nil, err
}

dm := &driver.Message{
return &driver.Message{
LoggableID: fmt.Sprintf("msg %s", msg.ID),
Body: bd,
Metadata: bm,
Expand All @@ -113,8 +139,7 @@ func (s *subscription) ReceiveBatch(ctx context.Context, maxMessages int) ([]*dr
}
return false
},
}
return []*driver.Message{dm}, nil
}, nil
}

// SendAcks implements driver.Subscription.SendAcks.
Expand Down

0 comments on commit 81b5252

Please sign in to comment.