Skip to content

Commit

Permalink
Revert "Fix consumer start sequence when sequence not yet in stream"
Browse files Browse the repository at this point in the history
This reverts commit ec54164.
See discussion #6005.
  • Loading branch information
neilalexander committed Oct 17, 2024
1 parent 97a0df3 commit 23de885
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 66 deletions.
16 changes: 6 additions & 10 deletions server/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4907,16 +4907,12 @@ func (o *consumer) selectStartingSeqNo() {
o.sseq = o.cfg.OptStartSeq
}

// Only clip the sseq if the OptStartSeq is not provided, otherwise
// it's possible that the stream just doesn't contain OptStartSeq yet.
if o.cfg.OptStartSeq == 0 {
if state.FirstSeq == 0 {
o.sseq = 1
} else if o.sseq < state.FirstSeq {
o.sseq = state.FirstSeq
} else if o.sseq > state.LastSeq {
o.sseq = state.LastSeq + 1
}
if state.FirstSeq == 0 {
o.sseq = 1
} else if o.sseq < state.FirstSeq {
o.sseq = state.FirstSeq
} else if o.sseq > state.LastSeq {
o.sseq = state.LastSeq + 1
}
}

Expand Down
56 changes: 0 additions & 56 deletions server/jetstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22836,62 +22836,6 @@ func TestJetStreamConsumerInfoNumPending(t *testing.T) {
require_Equal(t, ci.NumPending, 100)
}

func TestJetStreamConsumerStartSequenceNotInStream(t *testing.T) {
// This test is checking that we still correctly set the start
// sequence of a consumer if that start sequence doesn't appear
// in the stream yet. Previously this would have been clipped
// back to between the first and last seq from the stream state.

s := RunBasicJetStreamServer(t)
defer s.Shutdown()

nc, js := jsClientConnect(t, s)
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"test"},
})
require_NoError(t, err)

sub, err := js.PullSubscribe("test", "test_consumer", nats.StartSequence(10))
require_NoError(t, err)

stream, err := s.gacc.lookupStream("TEST")
require_NoError(t, err)
consumer := stream.lookupConsumer("test_consumer")

func() {
consumer.mu.RLock()
defer consumer.mu.RUnlock()

require_Equal(t, consumer.dseq, 1)
require_Equal(t, consumer.sseq, 10)
}()

for i := 1; i <= 10; i++ {
_, err = js.Publish("test", []byte{byte(i)})
require_NoError(t, err)
}

msgs, err := sub.Fetch(1)
require_NoError(t, err)
require_Len(t, len(msgs), 1)
require_Equal(t, msgs[0].Data[0], 10)

require_NoError(t, msgs[0].AckSync())

func() {
consumer.mu.RLock()
defer consumer.mu.RUnlock()

require_Equal(t, consumer.dseq, 2)
require_Equal(t, consumer.adflr, 1)
require_Equal(t, consumer.sseq, 11)
require_Equal(t, consumer.asflr, 10)
}()
}

func TestJetStreamInterestStreamWithDuplicateMessages(t *testing.T) {
s := RunBasicJetStreamServer(t)
defer s.Shutdown()
Expand Down

0 comments on commit 23de885

Please sign in to comment.