Skip to content

Commit 20a9c65

Browse files
Improve filestore partial cache logging, expire on incorrect sequence, slot check on erase (#7396)
Signed-off-by: Neil Twigg <neil@nats.io>
2 parents e680c1a + 76f24bf commit 20a9c65

File tree

2 files changed

+33
-29
lines changed

2 files changed

+33
-29
lines changed

server/filestore.go

Lines changed: 32 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -270,7 +270,6 @@ type cache struct {
270270
buf []byte
271271
wp int
272272
idx []uint32
273-
lrl uint32
274273
fseq uint64
275274
nra bool
276275
}
@@ -5071,7 +5070,10 @@ func (fs *fileStore) removeMsg(seq uint64, secure, viaLimits, needFSLock bool) (
50715070
// If erase but block is empty, we can simply remove the block later.
50725071
if secure && !isEmpty {
50735072
// Grab record info, but use the pre-computed record length.
5074-
ri, _, _, _ := mb.slotInfo(int(seq - mb.cache.fseq))
5073+
ri, _, _, err := mb.slotInfo(int(seq - mb.cache.fseq))
5074+
if err != nil {
5075+
return false, err
5076+
}
50755077
if err := mb.eraseMsg(seq, int(ri), int(msz), isLastBlock); err != nil {
50765078
mb.finishedWithCache()
50775079
return false, err
@@ -5316,7 +5318,14 @@ func (mb *msgBlock) compactWithFloor(floor uint64) {
53165318
// Grab info from a slot.
53175319
// Lock should be held.
53185320
func (mb *msgBlock) slotInfo(slot int) (uint32, uint32, bool, error) {
5319-
if slot < 0 || mb.cache == nil || slot >= len(mb.cache.idx) {
5321+
switch {
5322+
case mb.cache == nil: // Shouldn't be possible, but check it anyway.
5323+
return 0, 0, false, errNoCache
5324+
case slot < 0:
5325+
mb.fs.warn("Partial cache: offset slot index %d is less zero", slot)
5326+
return 0, 0, false, errPartialCache
5327+
case slot >= len(mb.cache.idx):
5328+
mb.fs.warn("Partial cache: offset slot index %d is greater than index len %d", slot, len(mb.cache.idx))
53205329
return 0, 0, false, errPartialCache
53215330
}
53225331

@@ -5330,24 +5339,20 @@ func (mb *msgBlock) slotInfo(slot int) (uint32, uint32, bool, error) {
53305339

53315340
// Determine record length
53325341
var rl uint32
5333-
if slot >= len(mb.cache.idx) {
5334-
rl = mb.cache.lrl
5335-
} else {
5336-
// Need to account for dbit markers in idx.
5337-
// So we will walk until we find valid idx slot to calculate rl.
5338-
for i := 1; slot+i < len(mb.cache.idx); i++ {
5339-
ni := mb.cache.idx[slot+i] &^ cbit
5340-
if ni == dbit {
5341-
continue
5342-
}
5343-
rl = ni - ri
5344-
break
5345-
}
5346-
// check if we had all trailing dbits.
5347-
// If so use len of cache buf minus ri.
5348-
if rl == 0 {
5349-
rl = uint32(len(mb.cache.buf)) - ri
5342+
// Need to account for dbit markers in idx.
5343+
// So we will walk until we find valid idx slot to calculate rl.
5344+
for i := 1; slot+i < len(mb.cache.idx); i++ {
5345+
ni := mb.cache.idx[slot+i] &^ cbit
5346+
if ni == dbit {
5347+
continue
53505348
}
5349+
rl = ni - ri
5350+
break
5351+
}
5352+
// check if we had all trailing dbits.
5353+
// If so use len of cache buf minus ri.
5354+
if rl == 0 {
5355+
rl = uint32(len(mb.cache.buf)) - ri
53515356
}
53525357
if rl < msgHdrSize {
53535358
return 0, 0, false, errBadMsg{mb.mfn, fmt.Sprintf("length too short for slot %d", slot)}
@@ -5772,10 +5777,10 @@ func (mb *msgBlock) tryForceExpireCache() {
57725777

57735778
// We will attempt to force expire this by temporarily clearing the last load time.
57745779
func (mb *msgBlock) tryForceExpireCacheLocked() {
5775-
llts := mb.llts
5776-
mb.llts = 0
5780+
llts, lwts := mb.llts, mb.lwts
5781+
mb.llts, mb.lwts = 0, 0
57775782
mb.expireCacheLocked()
5778-
mb.llts = llts
5783+
mb.llts, mb.lwts = llts, lwts
57795784
}
57805785

57815786
// This is for expiration of the write cache, which will be partial with fip.
@@ -5850,6 +5855,7 @@ func (mb *msgBlock) expireCacheLocked() {
58505855
recycleMsgBlockBuf(mb.cache.buf)
58515856
}
58525857
mb.cache.buf = nil
5858+
mb.cache.idx = mb.cache.idx[:0]
58535859
mb.cache.wp = 0
58545860
}
58555861

@@ -6342,7 +6348,6 @@ func (mb *msgBlock) writeMsgRecordLocked(rl, seq uint64, subj string, mhdr, msg
63426348
// Update write through cache.
63436349
// Write to msg record.
63446350
mb.cache.buf = append(mb.cache.buf, checksum...)
6345-
mb.cache.lrl = uint32(rl)
63466351

63476352
// Set cache timestamp for last store.
63486353
mb.lwts = ts
@@ -7051,7 +7056,6 @@ func (mb *msgBlock) indexCacheBuf(buf []byte) error {
70517056
}
70527057
// Add to our index.
70537058
idx = append(idx, index)
7054-
mb.cache.lrl = uint32(rl)
70557059
// Adjust if we guessed wrong.
70567060
if seq != 0 && seq < fseq {
70577061
fseq = seq
@@ -7599,7 +7603,7 @@ func (mb *msgBlock) cacheLookupEx(seq uint64, sm *StoreMsg, doCopy bool) (*Store
75997603
}
76007604
// Check partial cache status.
76017605
if seq < mb.cache.fseq {
7602-
mb.fs.warn("Cache lookup detected partial cache: seq %d vs cache fseq %d", seq, mb.cache.fseq)
7606+
mb.fs.warn("Partial cache: seq %d is less than cache fseq %d", seq, mb.cache.fseq)
76037607
return nil, errPartialCache
76047608
}
76057609

@@ -7613,6 +7617,7 @@ func (mb *msgBlock) cacheLookupEx(seq uint64, sm *StoreMsg, doCopy bool) (*Store
76137617

76147618
li := int(bi)
76157619
if li >= len(mb.cache.buf) {
7620+
mb.fs.warn("Partial cache: slot index %d is less than cache buffer len %d", li, len(mb.cache.buf))
76167621
return nil, errPartialCache
76177622
}
76187623
buf := mb.cache.buf[li:]
@@ -7635,8 +7640,7 @@ func (mb *msgBlock) cacheLookupEx(seq uint64, sm *StoreMsg, doCopy bool) (*Store
76357640
}
76367641

76377642
if seq != fsm.seq { // See TestFileStoreInvalidIndexesRebuilt.
7638-
recycleMsgBlockBuf(mb.cache.buf)
7639-
mb.cache.buf = nil
7643+
mb.tryForceExpireCacheLocked()
76407644
return nil, fmt.Errorf("sequence numbers for cache load did not match, %d vs %d", seq, fsm.seq)
76417645
}
76427646

server/filestore_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1784,7 +1784,7 @@ func TestFileStoreInvalidIndexesRebuilt(t *testing.T) {
17841784
// to discard the cache.
17851785
_, err = mb.cacheLookupEx(1, nil, false)
17861786
require_Error(t, err)
1787-
require_True(t, mb.cache.buf == nil)
1787+
require_True(t, mb.ecache.Value() == nil)
17881788

17891789
// Now fetchMsg should notice and rebuild the index with the
17901790
// correct sequence from disk.

0 commit comments

Comments
 (0)