Skip to content

Commit

Permalink
[FIXED] JetStream API: Fix Next() blocking indefinitely after calli…
Browse files Browse the repository at this point in the history
…ng `Stop()` (#1344)
  • Loading branch information
mdawar authored Jul 13, 2023
1 parent 10c8eab commit 49ae579
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 0 deletions.
2 changes: 2 additions & 0 deletions jetstream/pull.go
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,8 @@ func (s *pullSubscription) Next() (Msg, error) {
for {
s.checkPending()
select {
case <-s.done:
return nil, ErrMsgIteratorClosed
case msg := <-s.msgs:
if hbMonitor != nil {
hbMonitor.Reset(2 * s.consumeOpts.Heartbeat)
Expand Down
67 changes: 67 additions & 0 deletions jetstream/test/pull_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1220,6 +1220,73 @@ func TestPullConsumerMessages(t *testing.T) {
t.Fatalf("Unexpected error: %s", err)
}
})

t.Run("with graceful shutdown", func(t *testing.T) {
srv := RunBasicJetStreamServer()
defer shutdownJSServerAndRemoveStorage(t, srv)

nc, err := nats.Connect(srv.ClientURL())
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

js, err := jetstream.New(nc)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer nc.Close()

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
s, err := js.CreateStream(ctx, jetstream.StreamConfig{Name: "foo", Subjects: []string{"FOO.*"}})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
c, err := s.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{AckPolicy: jetstream.AckExplicitPolicy})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

it, err := c.Messages()
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

publishTestMsgs(t, nc)

errs := make(chan error)
msgs := make([]jetstream.Msg, 0)

go func() {
for {
msg, err := it.Next()
if err != nil {
errs <- err
return
}
msg.Ack()
msgs = append(msgs, msg)
}
}()

time.Sleep(10 * time.Millisecond)
it.Stop() // Next() should return ErrMsgIteratorClosed

timeout := time.NewTimer(5 * time.Second)

select {
case <-timeout.C:
t.Fatal("Timed out waiting for Next() to return after Stop()")
case err := <-errs:
if !errors.Is(err, jetstream.ErrMsgIteratorClosed) {
t.Fatalf("Unexpected error: %v", err)
}

if len(msgs) != len(testMsgs) {
t.Fatalf("Unexpected received message count; want %d; got %d", len(testMsgs), len(msgs))
}
}
})
}

func TestPullConsumerConsume(t *testing.T) {
Expand Down

0 comments on commit 49ae579

Please sign in to comment.