Skip to content

Add support for XREAD last entry #3005

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
Jun 20, 2024
Prev Previous commit
Next Next commit
Changed argument to generic ID, skip tests on Enterprise
  • Loading branch information
vladvildanov committed Jun 20, 2024
commit 48eb1ace92d486cac39bb593e1f16ff0ba25f2fd
22 changes: 11 additions & 11 deletions commands_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5888,11 +5888,11 @@ var _ = Describe("Commands", func() {
Expect(err).To(Equal(redis.Nil))
})

It("should XRead LastEntry", func() {
It("should XRead LastEntry", Label("NonRedisEnterprise"), func() {
res, err := client.XRead(ctx, &redis.XReadArgs{
Streams: []string{"stream"},
Count: 2, // we expect 1 message
LastEntry: true,
Streams: []string{"stream"},
Count: 2, // we expect 1 message
ID: "+",
}).Result()
Expect(err).NotTo(HaveOccurred())
Expect(res).To(Equal([]redis.XStream{
Expand All @@ -5905,10 +5905,10 @@ var _ = Describe("Commands", func() {
}))
})

It("should XRead LastEntry from two streams", func() {
It("should XRead LastEntry from two streams", Label("NonRedisEnterprise"), func() {
res, err := client.XRead(ctx, &redis.XReadArgs{
Streams: []string{"stream", "stream"},
LastEntry: true,
Streams: []string{"stream", "stream"},
ID: "+",
}).Result()
Expect(err).NotTo(HaveOccurred())
Expect(res).To(Equal([]redis.XStream{
Expand All @@ -5927,7 +5927,7 @@ var _ = Describe("Commands", func() {
}))
})

It("should XRead LastEntry blocks", func() {
It("should XRead LastEntry blocks", Label("NonRedisEnterprise"), func() {
start := time.Now()
go func() {
defer GinkgoRecover()
Expand All @@ -5943,9 +5943,9 @@ var _ = Describe("Commands", func() {
}()

res, err := client.XRead(ctx, &redis.XReadArgs{
Streams: []string{"empty"},
Block: 500 * time.Millisecond,
LastEntry: true,
Streams: []string{"empty"},
Block: 500 * time.Millisecond,
ID: "+",
}).Result()
Expect(err).NotTo(HaveOccurred())
// Ensure that the XRead call with LastEntry option blocked for at least 100ms.
Expand Down
12 changes: 6 additions & 6 deletions stream_commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,10 +134,10 @@ func (c cmdable) XRevRangeN(ctx context.Context, stream, start, stop string, cou
}

type XReadArgs struct {
Streams []string // list of streams and ids, e.g. stream1 stream2 id1 id2
Count int64
Block time.Duration
LastEntry bool
Streams []string // list of streams and ids, e.g. stream1 stream2 id1 id2
Count int64
Block time.Duration
ID string
}

func (c cmdable) XRead(ctx context.Context, a *XReadArgs) *XStreamSliceCmd {
Expand All @@ -160,9 +160,9 @@ func (c cmdable) XRead(ctx context.Context, a *XReadArgs) *XStreamSliceCmd {
for _, s := range a.Streams {
args = append(args, s)
}
if a.LastEntry {
if a.ID != "" {
for range a.Streams {
args = append(args, "+")
args = append(args, a.ID)
}
}

Expand Down