Skip to content

Commit

Permalink
[IMPROVED] Various stream catchup improvements (#5454)
Browse files Browse the repository at this point in the history
Various improvements to catchup logistics.

1. Make sure to register our sync sub earlier to avoid missing an early
request for catchup on a scaleup.
2. Make sure we send trailing delete range blocks to avound getting
stuck during catchup.
3. When parallel catchups running make sure to periodically check to see
if we can send next batch since we could be blocked on others and not
ourselves.
4. When catching up a stream that has a large number of interior deletes
use LoadNextMsg() to more efficiently skip over large gaps.
5. When resuming apply queues in raft layer do not unpause our state
until after processing to avoid possible elections under heavy load.
6. when syncing deleted blocks from snapshots only consider those
applicable to our state.

Signed-off-by: Derek Collison <derek@nats.io>

---------

Signed-off-by: Derek Collison <derek@nats.io>
  • Loading branch information
derekcollison authored and wallyqs committed May 20, 2024
1 parent 57e780f commit e59f96f
Show file tree
Hide file tree
Showing 7 changed files with 246 additions and 39 deletions.
13 changes: 7 additions & 6 deletions server/filestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -3430,25 +3430,21 @@ func (fs *fileStore) SkipMsgs(seq uint64, num uint64) error {
lseq := seq + num - 1

mb.mu.Lock()
var needsRecord bool
// If we are empty update meta directly.
if mb.msgs == 0 {
atomic.StoreUint64(&mb.last.seq, lseq)
mb.last.ts = nowts
atomic.StoreUint64(&mb.first.seq, lseq+1)
mb.first.ts = nowts
} else {
needsRecord = true
for ; seq <= lseq; seq++ {
mb.dmap.Insert(seq)
}
}
mb.mu.Unlock()

// Write out our placeholder.
if needsRecord {
mb.writeMsgRecord(emptyRecordLen, lseq|ebit, _EMPTY_, nil, nil, nowts, true)
}
mb.writeMsgRecord(emptyRecordLen, lseq|ebit, _EMPTY_, nil, nil, nowts, true)

// Now update FS accounting.
// Update fs state.
Expand Down Expand Up @@ -8169,6 +8165,7 @@ func (fs *fileStore) deleteBlocks() DeleteBlocks {
}

// SyncDeleted will make sure this stream has same deleted state as dbs.
// This will only process deleted state within our current state.
func (fs *fileStore) SyncDeleted(dbs DeleteBlocks) {
if len(dbs) == 0 {
return
Expand All @@ -8177,18 +8174,22 @@ func (fs *fileStore) SyncDeleted(dbs DeleteBlocks) {
fs.mu.Lock()
defer fs.mu.Unlock()

lseq := fs.state.LastSeq
var needsCheck DeleteBlocks

fs.readLockAllMsgBlocks()
mdbs := fs.deleteBlocks()
for i, db := range dbs {
first, last, num := db.State()
// If the block is same as what we have we can skip.
if i < len(mdbs) {
first, last, num := db.State()
eFirst, eLast, eNum := mdbs[i].State()
if first == eFirst && last == eLast && num == eNum {
continue
}
} else if first > lseq {
// Skip blocks not applicable to our current state.
continue
}
// Need to insert these.
needsCheck = append(needsCheck, db)
Expand Down
75 changes: 56 additions & 19 deletions server/jetstream_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -2405,9 +2405,6 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
// If we are interest based make sure to check consumers if interest retention policy.
// This is to make sure we process any outstanding acks from all consumers.
mset.checkInterestState()
// Make sure we create a new snapshot in case things have changed such that any existing
// snapshot may no longer be valid.
doSnapshot()
// If we became leader during this time and we need to send a snapshot to our
// followers, i.e. as a result of a scale-up from R1, do it now.
if sendSnapshot && isLeader && mset != nil && n != nil {
Expand Down Expand Up @@ -2941,6 +2938,7 @@ func (js *jetStream) applyStreamEntries(mset *stream, ce *CommittedEntry, isReco

if err != nil {
if err == errLastSeqMismatch {

var state StreamState
mset.store.FastState(&state)

Expand All @@ -2952,6 +2950,8 @@ func (js *jetStream) applyStreamEntries(mset *stream, ce *CommittedEntry, isReco
// Retry
err = mset.processJetStreamMsg(subject, reply, hdr, msg, lseq, ts)
}
// FIXME(dlc) - We could just run a catchup with a request defining the span between what we expected
// and what we got.
}

// Only return in place if we are going to reset our stream or we are out of space, or we are closed.
Expand Down Expand Up @@ -3568,9 +3568,14 @@ func (js *jetStream) processClusterUpdateStream(acc *Account, osa, sa *streamAss
js.mu.Unlock()
}

var needsSetLeader bool
if !alreadyRunning && numReplicas > 1 {
if needsNode {
// Since we are scaling up we want to make sure our sync subject
// is registered before we start our raft node.
mset.mu.Lock()
mset.startClusterSubs()
mset.mu.Unlock()

js.createRaftGroup(acc.GetName(), rg, storage, pprofLabels{
"type": "stream",
"account": mset.accName(),
Expand Down Expand Up @@ -3602,16 +3607,13 @@ func (js *jetStream) processClusterUpdateStream(acc *Account, osa, sa *streamAss
rg.node = nil
js.mu.Unlock()
}
// Set the new stream assignment.
mset.setStreamAssignment(sa)

// Call update.
if err = mset.updateWithAdvisory(cfg, !recovering); err != nil {
s.Warnf("JetStream cluster error updating stream %q for account %q: %v", cfg.Name, acc.Name, err)
}
// Set the new stream assignment.
mset.setStreamAssignment(sa)
// Make sure we are the leader now that we are R1.
if needsSetLeader {
mset.setLeader(true)
}
}

// If not found we must be expanding into this node since if we are here we know we are a member.
Expand Down Expand Up @@ -7582,7 +7584,8 @@ func (mset *stream) supportsBinarySnapshotLocked() bool {
// We know we support ourselves.
continue
}
if sir, ok := s.nodeToInfo.Load(p.ID); !ok || sir == nil || !sir.(nodeInfo).binarySnapshots {
// Since release 2.10.16 only deny if we know the other node does not support.
if sir, ok := s.nodeToInfo.Load(p.ID); ok && sir != nil && !sir.(nodeInfo).binarySnapshots {
return false
}
}
Expand Down Expand Up @@ -8681,7 +8684,7 @@ func (mset *stream) runCatchup(sendSubject string, sreq *streamSyncRequest) {
return 0
}

nextBatchC := make(chan struct{}, 1)
nextBatchC := make(chan struct{}, 4)
nextBatchC <- struct{}{}
remoteQuitCh := make(chan struct{})

Expand All @@ -8706,19 +8709,18 @@ func (mset *stream) runCatchup(sendSubject string, sreq *streamSyncRequest) {
// Kick ourselves and anyone else who might have stalled on global state.
select {
case nextBatchC <- struct{}{}:
// Reset our activity
notActive.Reset(activityInterval)
default:
}
// Reset our activity
notActive.Reset(activityInterval)
})
defer s.sysUnsubscribe(ackSub)
ackReplyT := strings.ReplaceAll(ackReply, ".*", ".%d")

// Grab our state.
var state StreamState
mset.mu.RLock()
// mset.store never changes after being set, don't need lock.
mset.store.FastState(&state)
mset.mu.RUnlock()

// Reset notion of first if this request wants sequences before our starting sequence
// and we would have nothing to send. If we have partial messages still need to send skips for those.
Expand Down Expand Up @@ -8756,7 +8758,7 @@ func (mset *stream) runCatchup(sendSubject string, sreq *streamSyncRequest) {
// Wait til we can send at least 4k
const minBatchWait = int32(4 * 1024)
mw := time.NewTimer(minWait)
for done := false; !done; {
for done := maxOutMsgs-atomic.LoadInt32(&outm) > minBatchWait; !done; {
select {
case <-nextBatchC:
done = maxOutMsgs-atomic.LoadInt32(&outm) > minBatchWait
Expand Down Expand Up @@ -8811,9 +8813,33 @@ func (mset *stream) runCatchup(sendSubject string, sreq *streamSyncRequest) {
dr.First, dr.Num = 0, 0
}

// See if we should use LoadNextMsg instead of walking sequence by sequence if we have an order magnitude more interior deletes.
// Only makes sense with delete range capabilities.
useLoadNext := drOk && (uint64(state.NumDeleted) > 10*state.Msgs)

var smv StoreMsg
for ; seq <= last && atomic.LoadInt64(&outb) <= maxOutBytes && atomic.LoadInt32(&outm) <= maxOutMsgs && s.gcbBelowMax(); seq++ {
sm, err := mset.store.LoadMsg(seq, &smv)
var sm *StoreMsg
var err error
// Is we should use load next do so here.
if useLoadNext {
var nseq uint64
sm, nseq, err = mset.store.LoadNextMsg(fwcs, true, seq, &smv)
if err == nil && nseq > seq {
dr.First, dr.Num = seq, nseq-seq
// Jump ahead
seq = nseq
} else if err == ErrStoreEOF {
dr.First, dr.Num = seq, state.LastSeq-seq
// Clear EOF here for normal processing.
err = nil
// Jump ahead
seq = state.LastSeq
}
} else {
sm, err = mset.store.LoadMsg(seq, &smv)
}

// if this is not a deleted msg, bail out.
if err != nil && err != ErrStoreMsgNotFound && err != errDeletedMsg {
if err == ErrStoreEOF {
Expand All @@ -8829,6 +8855,10 @@ func (mset *stream) runCatchup(sendSubject string, sreq *streamSyncRequest) {
if n := mset.raftNode(); n != nil {
n.InstallSnapshot(mset.stateSnapshot())
}
// If we allow gap markers check if we have one pending.
if drOk && dr.First > 0 {
sendDR()
}
// Signal EOF
s.sendInternalMsgLocked(sendSubject, _EMPTY_, nil, nil)
return false
Expand Down Expand Up @@ -8875,6 +8905,9 @@ func (mset *stream) runCatchup(sendSubject string, sreq *streamSyncRequest) {
}
// Recheck our exit condition.
if seq == last {
if drOk && dr.First > 0 {
sendDR()
}
s.Noticef("Catchup for stream '%s > %s' complete", mset.account(), mset.name())
// EOF
s.sendInternalMsgLocked(sendSubject, _EMPTY_, nil, nil)
Expand All @@ -8890,7 +8923,6 @@ func (mset *stream) runCatchup(sendSubject string, sreq *streamSyncRequest) {
if drOk && dr.First > 0 {
sendDR()
}

return true
}

Expand Down Expand Up @@ -8930,6 +8962,11 @@ func (mset *stream) runCatchup(sendSubject string, sreq *streamSyncRequest) {
mset.clearCatchupPeer(sreq.Peer)
return
}
case <-time.After(500 * time.Millisecond):
if !sendNextBatchAndContinue(qch) {
mset.clearCatchupPeer(sreq.Peer)
return
}
}
}
}
Expand Down
6 changes: 3 additions & 3 deletions server/jetstream_helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,9 @@ import (
func init() {
// Speed up raft for tests.
hbInterval = 50 * time.Millisecond
minElectionTimeout = 750 * time.Millisecond
maxElectionTimeout = 2500 * time.Millisecond
lostQuorumInterval = 1 * time.Second
minElectionTimeout = 1500 * time.Millisecond
maxElectionTimeout = 3500 * time.Millisecond
lostQuorumInterval = 2 * time.Second
lostQuorumCheck = 4 * hbInterval
}

Expand Down
14 changes: 10 additions & 4 deletions server/memstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -1402,19 +1402,25 @@ func (ms *memStore) EncodedStreamState(failed uint64) ([]byte, error) {

// SyncDeleted will make sure this stream has same deleted state as dbs.
func (ms *memStore) SyncDeleted(dbs DeleteBlocks) {
ms.mu.Lock()
defer ms.mu.Unlock()

// For now we share one dmap, so if we have one entry here check if states are the same.
// Note this will work for any DeleteBlock type, but we expect this to be a dmap too.
if len(dbs) == 1 {
ms.mu.RLock()
min, max, num := ms.dmap.State()
ms.mu.RUnlock()
if pmin, pmax, pnum := dbs[0].State(); pmin == min && pmax == max && pnum == num {
return
}
}
lseq := ms.state.LastSeq
for _, db := range dbs {
db.Range(func(dseq uint64) bool {
ms.RemoveMsg(dseq)
// Skip if beyond our current state.
if first, _, _ := db.State(); first > lseq {
continue
}
db.Range(func(seq uint64) bool {
ms.removeMsg(seq, false)
return true
})
}
Expand Down
Loading

0 comments on commit e59f96f

Please sign in to comment.