Skip to content

Commit

Permalink
Consumer pending was not correct when stream had max msgs per subject…
Browse files Browse the repository at this point in the history
… set > 1 and a consumer that filtered out part of the stream was created.

Also make sure to update stream's config on a stream restore in case of changes.

Signed-off-by: Derek Collison <derek@nats.io>
  • Loading branch information
derekcollison committed May 24, 2022
1 parent 7bdd679 commit 46f7f7b
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 8 deletions.
28 changes: 21 additions & 7 deletions server/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ type consumer struct {
adflr uint64
asflr uint64
npc uint64
npsm uint64
npcm uint64
dsubj string
qgroup string
lss *lastSeqSkipList
Expand Down Expand Up @@ -3155,7 +3155,7 @@ func (o *consumer) setMaxPendingBytes(limit int) {

// Lock should be held.
func (o *consumer) numPending() uint64 {
if o.npsm == 0 {
if o.npcm == 0 {
o.streamNumPending()
}
// This can wrap based on possibly having a dec before the inc. Account for that here.
Expand All @@ -3166,11 +3166,25 @@ func (o *consumer) numPending() uint64 {
}

// Will force a set from the stream store of num pending.
// Depends on delivery policy, for last per subject we calculate differently.
// Lock should be held.
func (o *consumer) streamNumPending() uint64 {
ss := o.mset.store.FilteredState(o.sseq, o.cfg.FilterSubject)
o.npc = ss.Msgs
o.npsm = ss.Last
if o.mset == nil || o.mset.store == nil {
o.npc, o.npcm = 0, 0
} else if o.cfg.DeliverPolicy == DeliverLastPerSubject {
o.npc, o.npcm = 0, 0
for _, ss := range o.mset.store.SubjectsState(o.cfg.FilterSubject) {
if o.sseq <= ss.Last {
o.npc++
if ss.Last > o.npcm {
o.npcm = ss.Last
}
}
}
} else {
ss := o.mset.store.FilteredState(o.sseq, o.cfg.FilterSubject)
o.npc, o.npcm = ss.Msgs, ss.Last
}
return o.npc
}

Expand All @@ -3183,7 +3197,7 @@ func (o *consumer) deliverMsg(dsubj string, pmsg *jsPubMsg, dc uint64, rp Retent
}

// Update our cached num pending.
if dc == 1 && o.npsm > 0 {
if dc == 1 && o.npcm > 0 {
o.npc--
}

Expand Down Expand Up @@ -4033,7 +4047,7 @@ func (o *consumer) requestNextMsgSubject() string {
func (o *consumer) decStreamPending(sseq uint64, subj string) {
o.mu.Lock()
// Update our cached num pending. Only do so if we think deliverMsg has not done so.
if sseq > o.npsm && sseq >= o.sseq && o.isFilteredMatch(subj) {
if sseq > o.npcm && sseq >= o.sseq && o.isFilteredMatch(subj) {
o.npc--
}
// Check if this message was pending.
Expand Down
54 changes: 54 additions & 0 deletions server/jetstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17612,6 +17612,60 @@ func TestJetStreamConsumerDeliverNewNotConsumingBeforeRestart(t *testing.T) {
checkCount(0)
}

func TestJetStreamConsumerNumPendingWithMaxPerSubjectGreaterThanOne(t *testing.T) {
s := RunBasicJetStreamServer()
if config := s.JetStreamConfig(); config != nil {
defer removeDir(t, config.StoreDir)
}
defer s.Shutdown()

nc, js := jsClientConnect(t, s)
defer nc.Close()

test := func(t *testing.T, st nats.StorageType) {
_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"KV.*.*"},
MaxMsgsPerSubject: 2,
Storage: st,
})
require_NoError(t, err)

// If we allow more than one msg per subject, consumer's num pending can be off (bug in store level).
// This requires a filtered state, simple states work ok.
// Since we now rely on stream's filtered state when asked directly for consumer info in >=2.8.3.
js.PublishAsync("KV.plans.foo", []byte("OK"))
js.PublishAsync("KV.plans.bar", []byte("OK"))
js.PublishAsync("KV.plans.baz", []byte("OK"))
// These are required, the consumer needs to filter these out to see the bug.
js.PublishAsync("KV.config.foo", []byte("OK"))
js.PublishAsync("KV.config.bar", []byte("OK"))
js.PublishAsync("KV.config.baz", []byte("OK"))

// Double up some now.
js.PublishAsync("KV.plans.bar", []byte("OK"))
js.PublishAsync("KV.plans.baz", []byte("OK"))

ci, err := js.AddConsumer("TEST", &nats.ConsumerConfig{
Durable: "d",
AckPolicy: nats.AckExplicitPolicy,
DeliverPolicy: nats.DeliverLastPerSubjectPolicy,
FilterSubject: "KV.plans.*",
})
require_NoError(t, err)

err = js.DeleteStream("TEST")
require_NoError(t, err)

if ci.NumPending != 3 {
t.Fatalf("Expected 3 NumPending, but got %d", ci.NumPending)
}
}

t.Run("MemoryStore", func(t *testing.T) { test(t, nats.MemoryStorage) })
t.Run("FileStore", func(t *testing.T) { test(t, nats.FileStorage) })
}

///////////////////////////////////////////////////////////////////////////
// Simple JetStream Benchmarks
///////////////////////////////////////////////////////////////////////////
Expand Down
10 changes: 9 additions & 1 deletion server/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -3574,7 +3574,7 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte,
for _, o := range mset.consumers {
o.mu.Lock()
if o.isLeader() && o.isFilteredMatch(subject) {
if seq > o.npsm {
if seq > o.npcm {
o.npc++
}
o.signalNewMessages()
Expand Down Expand Up @@ -4251,6 +4251,7 @@ func (a *Account) RestoreStream(ncfg *StreamConfig, r io.Reader) (*stream, error
if err := os.Rename(sdir, ndir); err != nil {
return nil, err
}

if cfg.Template != _EMPTY_ {
if err := jsa.addStreamNameToTemplate(cfg.Template, cfg.Name); err != nil {
return nil, err
Expand All @@ -4265,6 +4266,13 @@ func (a *Account) RestoreStream(ncfg *StreamConfig, r io.Reader) (*stream, error
}
lseq := mset.lastSeq()

// Make sure we do an update if the configs have changed.
if !reflect.DeepEqual(fcfg.StreamConfig, cfg) {
if err := mset.update(&cfg); err != nil {
return nil, err
}
}

// Now do consumers.
odir := filepath.Join(ndir, consumerDir)
ofis, _ := ioutil.ReadDir(odir)
Expand Down

0 comments on commit 46f7f7b

Please sign in to comment.