Skip to content

Commit

Permalink
let XReadGroup skip empty message and process next message (redis#1243)
Browse files Browse the repository at this point in the history
* let XReadGroup skip empty message and process next message
  • Loading branch information
yeplato authored Feb 2, 2020
1 parent 8a0ab1a commit a8704c3
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 1 deletion.
6 changes: 5 additions & 1 deletion command.go
Original file line number Diff line number Diff line change
Expand Up @@ -1004,10 +1004,14 @@ func xMessageSliceParser(rd *proto.Reader, n int64) (interface{}, error) {
}

v, err := rd.ReadArrayReply(stringInterfaceMapParser)
if err != nil {
if err != nil && err != proto.Nil {
return nil, err
}

if v == nil || err == proto.Nil {
v = make(map[string]interface{})
}

msgs[i] = XMessage{
ID: id,
Values: v.(map[string]interface{}),
Expand Down
21 changes: 21 additions & 0 deletions commands_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3623,6 +3623,27 @@ var _ = Describe("Commands", func() {
Expect(n).To(Equal(int64(1)))
})

It("should XReadGroup skip empty", func() {
n, err := client.XDel("stream", "2-0").Result()
Expect(err).NotTo(HaveOccurred())
Expect(n).To(Equal(int64(1)))

res, err := client.XReadGroup(&redis.XReadGroupArgs{
Group: "group",
Consumer: "consumer",
Streams: []string{"stream", "0"},
}).Result()
Expect(err).NotTo(HaveOccurred())
Expect(res).To(Equal([]redis.XStream{{
Stream: "stream",
Messages: []redis.XMessage{
{ID: "1-0", Values: map[string]interface{}{"uno": "un"}},
{ID: "2-0", Values: map[string]interface{}{}},
{ID: "3-0", Values: map[string]interface{}{"tres": "troix"}},
}},
}))
})

It("should XGroupCreateMkStream", func() {
err := client.XGroupCreateMkStream("stream2", "group", "0").Err()
Expect(err).NotTo(HaveOccurred())
Expand Down

0 comments on commit a8704c3

Please sign in to comment.