Skip to content

Commit

Permalink
[FIXED] Invalid checkPending logic (#1516)
Browse files Browse the repository at this point in the history
Signed-off-by: Piotr Piotrowski <piotr@synadia.com>
  • Loading branch information
piotrpio authored Jan 11, 2024
1 parent 44fac59 commit 2d90f1f
Show file tree
Hide file tree
Showing 2 changed files with 196 additions and 5 deletions.
182 changes: 182 additions & 0 deletions jetstream/jetstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,3 +274,185 @@ func TestRetryWithBackoff(t *testing.T) {
})
}
}

func TestPullConsumer_checkPending(t *testing.T) {
tests := []struct {
name string
givenSub *pullSubscription
shouldSend bool
expectedPullRequest *pullRequest
}{
{
name: "msgs threshold not reached, bytes not set, no pull request",
givenSub: &pullSubscription{
pending: pendingMsgs{
msgCount: 10,
},
consumeOpts: &consumeOpts{
ThresholdMessages: 5,
MaxMessages: 10,
},
fetchInProgress: 0,
},
shouldSend: false,
},
{
name: "pending msgs below threshold, send pull request",
givenSub: &pullSubscription{
pending: pendingMsgs{
msgCount: 4,
byteCount: 400, // byte count should be ignored
},
consumeOpts: &consumeOpts{
ThresholdMessages: 5,
MaxMessages: 10,
},
fetchInProgress: 0,
},
shouldSend: true,
expectedPullRequest: &pullRequest{
Batch: 6,
MaxBytes: 0,
},
},
{
name: "pending msgs below threshold but PR in progress",
givenSub: &pullSubscription{
pending: pendingMsgs{
msgCount: 4,
},
consumeOpts: &consumeOpts{
ThresholdMessages: 5,
MaxMessages: 10,
},
fetchInProgress: 1,
},
shouldSend: false,
},
{
name: "pending bytes below threshold, send pull request",
givenSub: &pullSubscription{
pending: pendingMsgs{
byteCount: 400,
msgCount: 1000000, // msgs count should be ignored
},
consumeOpts: &consumeOpts{
MaxMessages: 1000000,
ThresholdBytes: 500,
MaxBytes: 1000,
},
fetchInProgress: 0,
},
shouldSend: true,
expectedPullRequest: &pullRequest{
Batch: 1000000,
MaxBytes: 600,
},
},
{
name: "pending bytes above threshold, no pull request",
givenSub: &pullSubscription{
pending: pendingMsgs{
byteCount: 600,
},
consumeOpts: &consumeOpts{
ThresholdBytes: 500,
MaxBytes: 1000,
},
fetchInProgress: 0,
},
shouldSend: false,
},
{
name: "pending bytes below threshold, fetch in progress, no pull request",
givenSub: &pullSubscription{
pending: pendingMsgs{
byteCount: 400,
},
consumeOpts: &consumeOpts{
ThresholdBytes: 500,
MaxBytes: 1000,
},
fetchInProgress: 1,
},
shouldSend: false,
},
{
name: "StopAfter set, pending msgs below StopAfter, send pull request",
givenSub: &pullSubscription{
pending: pendingMsgs{
msgCount: 4,
},
consumeOpts: &consumeOpts{
ThresholdMessages: 5,
MaxMessages: 10,
StopAfter: 8,
},
fetchInProgress: 0,
delivered: 2,
},
shouldSend: true,
expectedPullRequest: &pullRequest{
Batch: 2, // StopAfter (8) - delivered (2) - pending (4)
MaxBytes: 0,
},
},
{
name: "StopAfter set, pending msgs equal to StopAfter, no pull request",
givenSub: &pullSubscription{
pending: pendingMsgs{
msgCount: 6,
},
consumeOpts: &consumeOpts{
ThresholdMessages: 5,
MaxMessages: 10,
StopAfter: 6,
},
fetchInProgress: 0,
delivered: 0,
},
shouldSend: false,
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
prChan := make(chan *pullRequest, 1)
test.givenSub.fetchNext = prChan
errs := make(chan error, 1)
ok := make(chan struct{}, 1)
go func() {
if test.shouldSend {
select {
case pr := <-prChan:
if *pr != *test.expectedPullRequest {
errs <- fmt.Errorf("Invalid pull request; want: %#v; got: %#v", test.expectedPullRequest, pr)
return
}
ok <- struct{}{}
case <-time.After(1 * time.Second):
errs <- fmt.Errorf("Timeout")
return
}
} else {
select {
case <-prChan:
errs <- fmt.Errorf("Unexpected pull request")
case <-time.After(100 * time.Millisecond):
ok <- struct{}{}
return
}
}
}()

test.givenSub.checkPending()
select {
case <-ok:
// ok
case err := <-errs:
t.Fatal(err)
}

})
}
}
19 changes: 14 additions & 5 deletions jetstream/pull.go
Original file line number Diff line number Diff line change
Expand Up @@ -417,18 +417,27 @@ func (s *pullSubscription) incrementDeliveredMsgs() {
// the buffer to trigger a new pull request.
// lock should be held before calling this method
func (s *pullSubscription) checkPending() {
if s.pending.msgCount < s.consumeOpts.ThresholdMessages ||
(s.pending.byteCount < s.consumeOpts.ThresholdBytes && s.consumeOpts.MaxBytes != 0) &&
atomic.LoadUint32(&s.fetchInProgress) == 1 {
batchSize := s.consumeOpts.MaxMessages - s.pending.msgCount
if (s.pending.msgCount < s.consumeOpts.ThresholdMessages ||
(s.pending.byteCount < s.consumeOpts.ThresholdBytes && s.consumeOpts.MaxBytes != 0)) &&
atomic.LoadUint32(&s.fetchInProgress) == 0 {

var batchSize, maxBytes int
if s.consumeOpts.MaxBytes == 0 {
// if using messages, calculate appropriate batch size
batchSize = s.consumeOpts.MaxMessages - s.pending.msgCount
} else {
// if using bytes, use the max value
batchSize = s.consumeOpts.MaxMessages
maxBytes = s.consumeOpts.MaxBytes - s.pending.byteCount
}
if s.consumeOpts.StopAfter > 0 {
batchSize = min(batchSize, s.consumeOpts.StopAfter-s.delivered-s.pending.msgCount)
}
if batchSize > 0 {
s.fetchNext <- &pullRequest{
Expires: s.consumeOpts.Expires,
Batch: batchSize,
MaxBytes: s.consumeOpts.MaxBytes - s.pending.byteCount,
MaxBytes: maxBytes,
Heartbeat: s.consumeOpts.Heartbeat,
}

Expand Down

0 comments on commit 2d90f1f

Please sign in to comment.