Skip to content

Commit

Permalink
De-flake TestJetStreamClusterAPILimitAdvisory
Browse files Browse the repository at this point in the history
This should hopefully de-flake this test by ensuring we send enough
requests that we can't possibly interleave with a listening worker.

Signed-off-by: Neil Twigg <neil@nats.io>
  • Loading branch information
neilalexander committed Feb 10, 2025
1 parent a1c16e1 commit a5e8e76
Showing 1 changed file with 13 additions and 10 deletions.
23 changes: 13 additions & 10 deletions server/jetstream_cluster_4_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3383,21 +3383,24 @@ func TestJetStreamClusterAPILimitAdvisory(t *testing.T) {
sub, err := snc.SubscribeSync(JSAdvisoryAPILimitReached)
require_NoError(t, err)

ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()

require_NoError(t, nc.PublishMsg(&nats.Msg{
Subject: fmt.Sprintf(JSApiConsumerListT, "TEST"),
Reply: nc.NewInbox(),
}))
// There's a very slim chance that a worker could pick up a request between
// pushing to and draining the queue, so make sure we've sent enough of them
// to reliably trigger a drain and advisory.
inbox := nc.NewRespInbox()
for i := 0; i < runtime.GOMAXPROCS(-1)*2; i++ {
require_NoError(t, nc.PublishMsg(&nats.Msg{
Subject: fmt.Sprintf(JSApiConsumerListT, "TEST"),
Reply: inbox,
}))
}

// Wait for the advisory to come in.
msg, err := sub.NextMsgWithContext(ctx)
msg, err := sub.NextMsg(time.Second * 5)
require_NoError(t, err)
var advisory JSAPILimitReachedAdvisory
require_NoError(t, json.Unmarshal(msg.Data, &advisory))
require_Equal(t, advisory.Domain, _EMPTY_) // No JetStream domain was set.
require_Equal(t, advisory.Dropped, queueLimit) // Configured queue limit.
require_Equal(t, advisory.Domain, _EMPTY_) // No JetStream domain was set.
require_True(t, advisory.Dropped >= 1) // We dropped at least something.
}

func TestJetStreamPendingRequestsInJsz(t *testing.T) {
Expand Down

0 comments on commit a5e8e76

Please sign in to comment.