Skip to content

Commit 9619844

Browse files
[FIXED] Stream desync after out-of-order SkipMsg (#7400)
2 parents 20a9c65 + 332bf2c commit 9619844

File tree

9 files changed

+108
-27
lines changed

9 files changed

+108
-27
lines changed

server/filestore.go

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4541,19 +4541,27 @@ func (mb *msgBlock) skipMsg(seq uint64, now int64) {
45414541
}
45424542

45434543
// SkipMsg will use the next sequence number but not store anything.
4544-
func (fs *fileStore) SkipMsg() uint64 {
4544+
func (fs *fileStore) SkipMsg(seq uint64) (uint64, error) {
4545+
// Grab time.
4546+
now := ats.AccessTime()
4547+
45454548
fs.mu.Lock()
45464549
defer fs.mu.Unlock()
45474550

4551+
// Check sequence matches our last sequence.
4552+
if seq != fs.state.LastSeq+1 {
4553+
if seq > 0 {
4554+
return 0, ErrSequenceMismatch
4555+
}
4556+
seq = fs.state.LastSeq + 1
4557+
}
4558+
45484559
// Grab our current last message block.
45494560
mb, err := fs.checkLastBlock(emptyRecordLen)
45504561
if err != nil {
4551-
return 0
4562+
return 0, err
45524563
}
45534564

4554-
// Grab time and last seq.
4555-
now, seq := ats.AccessTime(), fs.state.LastSeq+1
4556-
45574565
// Write skip msg.
45584566
mb.skipMsg(seq, now)
45594567

@@ -4568,7 +4576,7 @@ func (fs *fileStore) SkipMsg() uint64 {
45684576
// Mark as dirty for stream state.
45694577
fs.dirty++
45704578

4571-
return seq
4579+
return seq, nil
45724580
}
45734581

45744582
// Skip multiple msgs. We will determine if we can fit into current lmb or we need to create a new block.

server/filestore_test.go

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -345,7 +345,7 @@ func TestFileStoreSkipMsg(t *testing.T) {
345345

346346
numSkips := 10
347347
for i := 0; i < numSkips; i++ {
348-
fs.SkipMsg()
348+
fs.SkipMsg(0)
349349
}
350350
state := fs.State()
351351
if state.Msgs != 0 {
@@ -356,10 +356,10 @@ func TestFileStoreSkipMsg(t *testing.T) {
356356
}
357357

358358
fs.StoreMsg("zzz", nil, []byte("Hello World!"), 0)
359-
fs.SkipMsg()
360-
fs.SkipMsg()
359+
fs.SkipMsg(0)
360+
fs.SkipMsg(0)
361361
fs.StoreMsg("zzz", nil, []byte("Hello World!"), 0)
362-
fs.SkipMsg()
362+
fs.SkipMsg(0)
363363

364364
state = fs.State()
365365
if state.Msgs != 2 {
@@ -393,7 +393,7 @@ func TestFileStoreSkipMsg(t *testing.T) {
393393
t.Fatalf("Message did not match")
394394
}
395395

396-
fs.SkipMsg()
396+
fs.SkipMsg(0)
397397
nseq, _, err := fs.StoreMsg("AAA", nil, []byte("Skip?"), 0)
398398
if err != nil {
399399
t.Fatalf("Unexpected error looking up seq 11: %v", err)
@@ -5080,7 +5080,7 @@ func TestFileStoreSkipMsgAndNumBlocks(t *testing.T) {
50805080

50815081
fs.StoreMsg(subj, nil, msg, 0)
50825082
for i := 0; i < numMsgs; i++ {
5083-
fs.SkipMsg()
5083+
fs.SkipMsg(0)
50845084
}
50855085
fs.StoreMsg(subj, nil, msg, 0)
50865086
require_Equal(t, fs.numMsgBlocks(), 3)
@@ -6824,7 +6824,7 @@ func TestFileStoreEraseMsgWithDbitSlots(t *testing.T) {
68246824

68256825
fs.StoreMsg("foo", nil, []byte("abd"), 0)
68266826
for i := 0; i < 10; i++ {
6827-
fs.SkipMsg()
6827+
fs.SkipMsg(0)
68286828
}
68296829
fs.StoreMsg("foo", nil, []byte("abd"), 0)
68306830
// Now grab that first block and compact away the skips which will
@@ -6853,7 +6853,7 @@ func TestFileStoreEraseMsgWithAllTrailingDbitSlots(t *testing.T) {
68536853
fs.StoreMsg("foo", nil, []byte("abcdefg"), 0)
68546854

68556855
for i := 0; i < 10; i++ {
6856-
fs.SkipMsg()
6856+
fs.SkipMsg(0)
68576857
}
68586858
// Now grab that first block and compact away the skips which will
68596859
// introduce dbits into our idx.
@@ -7200,7 +7200,7 @@ func TestFileStoreReloadAndLoseLastSequence(t *testing.T) {
72007200
defer fs.Stop()
72017201

72027202
for i := 0; i < 22; i++ {
7203-
fs.SkipMsg()
7203+
fs.SkipMsg(0)
72047204
}
72057205

72067206
// Restart 5 times.
@@ -9053,7 +9053,7 @@ func TestFileStoreLeftoverSkipMsgInDmap(t *testing.T) {
90539053
}
90549054

90559055
// Only skip a message.
9056-
fs.SkipMsg()
9056+
fs.SkipMsg(0)
90579057

90589058
// Confirm state.
90599059
state := fs.State()

server/jetstream_cluster.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3630,7 +3630,7 @@ func (js *jetStream) applyStreamMsgOp(mset *stream, op entryOp, mbuf []byte, isR
36303630
// Messages to be skipped have no subject or timestamp or msg or hdr.
36313631
if subject == _EMPTY_ && ts == 0 && len(msg) == 0 && len(hdr) == 0 {
36323632
// Skip and update our lseq.
3633-
last := mset.store.SkipMsg()
3633+
last, _ := mset.store.SkipMsg(0)
36343634
if needLock {
36353635
mset.mu.Lock()
36363636
}
@@ -9478,7 +9478,7 @@ func (mset *stream) processCatchupMsg(msg []byte) (uint64, error) {
94789478
// Messages to be skipped have no subject or timestamp.
94799479
// TODO(dlc) - formalize with skipMsgOp
94809480
if subj == _EMPTY_ && ts == 0 {
9481-
if lseq := mset.store.SkipMsg(); lseq != seq {
9481+
if _, err = mset.store.SkipMsg(seq); err != nil {
94829482
return 0, errCatchupWrongSeqForSkip
94839483
}
94849484
} else if err := mset.store.StoreRawMsg(subj, hdr, msg, seq, ts, ttl); err != nil {

server/jetstream_cluster_1_test.go

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10330,6 +10330,71 @@ func TestJetStreamClusterDeleteMsgEOF(t *testing.T) {
1033010330
}
1033110331
}
1033210332

10333+
func TestJetStreamClusterCatchupSkipMsgDesync(t *testing.T) {
10334+
for _, storage := range []nats.StorageType{nats.FileStorage, nats.MemoryStorage} {
10335+
t.Run(storage.String(), func(t *testing.T) {
10336+
c := createJetStreamClusterExplicit(t, "R3S", 3)
10337+
defer c.shutdown()
10338+
10339+
nc, js := jsClientConnect(t, c.randomServer())
10340+
defer nc.Close()
10341+
10342+
_, err := js.AddStream(&nats.StreamConfig{
10343+
Name: "TEST",
10344+
Subjects: []string{"foo"},
10345+
Storage: storage,
10346+
Replicas: 3,
10347+
})
10348+
require_NoError(t, err)
10349+
checkFor(t, 2*time.Second, 200*time.Millisecond, func() error {
10350+
return checkState(t, c, globalAccountName, "TEST")
10351+
})
10352+
10353+
rs := c.randomNonStreamLeader(globalAccountName, "TEST")
10354+
mset, err := rs.globalAccount().lookupStream("TEST")
10355+
require_NoError(t, err)
10356+
sa := mset.streamAssignment()
10357+
10358+
// Make sure this server can't become the leader.
10359+
n := mset.raftNode().(*raft)
10360+
n.SetObserver(true)
10361+
10362+
sysNc, err := nats.Connect(rs.ClientURL(), nats.UserInfo("admin", "s3cr3t!"))
10363+
require_NoError(t, err)
10364+
defer sysNc.Close()
10365+
10366+
sjs := rs.getJetStream()
10367+
sjs.mu.RLock()
10368+
syncSubj := sa.Sync
10369+
sjs.mu.RUnlock()
10370+
10371+
// Respond to the catchup with an out-of-order SkipMsg.
10372+
var eof bool
10373+
sub, err := sysNc.Subscribe(syncSubj, func(msg *nats.Msg) {
10374+
if !eof {
10375+
msg.Respond(encodeStreamMsg(_EMPTY_, _EMPTY_, nil, nil, 10, 0, false))
10376+
eof = true
10377+
}
10378+
msg.Respond(nil)
10379+
})
10380+
require_NoError(t, err)
10381+
defer sub.Drain()
10382+
require_NoError(t, sysNc.Flush()) // Must flush, otherwise our subscription could be too late.
10383+
10384+
err = mset.processSnapshot(&StreamReplicatedState{FirstSeq: 1, LastSeq: 1}, 1)
10385+
require_Error(t, err, errCatchupTooManyRetries)
10386+
c.waitOnStreamLeader(globalAccountName, "TEST")
10387+
10388+
pubAck, err := js.Publish("foo", nil)
10389+
require_NoError(t, err)
10390+
require_Equal(t, pubAck.Sequence, 1)
10391+
checkFor(t, 2*time.Second, 200*time.Millisecond, func() error {
10392+
return checkState(t, c, globalAccountName, "TEST")
10393+
})
10394+
})
10395+
}
10396+
}
10397+
1033310398
//
1033410399
// DO NOT ADD NEW TESTS IN THIS FILE (unless to balance test times)
1033510400
// Add at the end of jetstream_cluster_<n>_test.go, with <n> being the highest value.

server/memstore.go

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -359,12 +359,21 @@ func (ms *memStore) StoreMsg(subj string, hdr, msg []byte, ttl int64) (uint64, i
359359
}
360360

361361
// SkipMsg will use the next sequence number but not store anything.
362-
func (ms *memStore) SkipMsg() uint64 {
362+
func (ms *memStore) SkipMsg(seq uint64) (uint64, error) {
363363
// Grab time.
364364
now := time.Unix(0, ats.AccessTime()).UTC()
365365

366366
ms.mu.Lock()
367-
seq := ms.state.LastSeq + 1
367+
defer ms.mu.Unlock()
368+
369+
// Check sequence matches our last sequence.
370+
if seq != ms.state.LastSeq+1 {
371+
if seq > 0 {
372+
return 0, ErrSequenceMismatch
373+
}
374+
seq = ms.state.LastSeq + 1
375+
}
376+
368377
ms.state.LastSeq = seq
369378
ms.state.LastTime = now
370379
if ms.state.Msgs == 0 {
@@ -373,8 +382,7 @@ func (ms *memStore) SkipMsg() uint64 {
373382
} else {
374383
ms.dmap.Insert(seq)
375384
}
376-
ms.mu.Unlock()
377-
return seq
385+
return seq, nil
378386
}
379387

380388
// Skip multiple msgs.

server/memstore_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1379,7 +1379,7 @@ func Benchmark_MemStoreNumPendingWithLargeInteriorDeletesScan(b *testing.B) {
13791379
msg := []byte("abc")
13801380
ms.StoreMsg("foo.bar.baz", nil, msg, 0)
13811381
for i := 1; i <= 1_000_000; i++ {
1382-
ms.SkipMsg()
1382+
ms.SkipMsg(0)
13831383
}
13841384
ms.StoreMsg("foo.bar.baz", nil, msg, 0)
13851385

@@ -1406,7 +1406,7 @@ func Benchmark_MemStoreNumPendingWithLargeInteriorDeletesExclude(b *testing.B) {
14061406
msg := []byte("abc")
14071407
ms.StoreMsg("foo.bar.baz", nil, msg, 0)
14081408
for i := 1; i <= 1_000_000; i++ {
1409-
ms.SkipMsg()
1409+
ms.SkipMsg(0)
14101410
}
14111411
ms.StoreMsg("foo.bar.baz", nil, msg, 0)
14121412

server/store.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ type ProcessJetStreamMsgHandler func(*inMsg)
9191
type StreamStore interface {
9292
StoreMsg(subject string, hdr, msg []byte, ttl int64) (uint64, int64, error)
9393
StoreRawMsg(subject string, hdr, msg []byte, seq uint64, ts int64, ttl int64) error
94-
SkipMsg() uint64
94+
SkipMsg(seq uint64) (uint64, error)
9595
SkipMsgs(seq uint64, num uint64) error
9696
FlushAllPending()
9797
LoadMsg(seq uint64, sm *StoreMsg) (*StoreMsg, error)

server/store_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -639,7 +639,7 @@ func TestStoreStreamInteriorDeleteAccounting(t *testing.T) {
639639
{
640640
title: "SkipMsg",
641641
action: func(s StreamStore, lseq uint64) {
642-
s.SkipMsg()
642+
s.SkipMsg(0)
643643
},
644644
},
645645
{

server/stream.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5935,7 +5935,7 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte,
59355935

59365936
// Skip msg here.
59375937
if noInterest {
5938-
mset.lseq = store.SkipMsg()
5938+
mset.lseq, _ = store.SkipMsg(0)
59395939
mset.lmsgId = msgId
59405940
// If we have a msgId make sure to save.
59415941
if msgId != _EMPTY_ {

0 commit comments

Comments
 (0)