From e59f96f448966f69902cea10f65ebac15a203f27 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Mon, 20 May 2024 14:02:21 -0700 Subject: [PATCH] [IMPROVED] Various stream catchup improvements (#5454) Various improvements to catchup logistics. 1. Make sure to register our sync sub earlier to avoid missing an early request for catchup on a scaleup. 2. Make sure we send trailing delete range blocks to avound getting stuck during catchup. 3. When parallel catchups running make sure to periodically check to see if we can send next batch since we could be blocked on others and not ourselves. 4. When catching up a stream that has a large number of interior deletes use LoadNextMsg() to more efficiently skip over large gaps. 5. When resuming apply queues in raft layer do not unpause our state until after processing to avoid possible elections under heavy load. 6. when syncing deleted blocks from snapshots only consider those applicable to our state. Signed-off-by: Derek Collison --------- Signed-off-by: Derek Collison --- server/filestore.go | 13 +-- server/jetstream_cluster.go | 75 +++++++++++---- server/jetstream_helpers_test.go | 6 +- server/memstore.go | 14 ++- server/norace_test.go | 155 +++++++++++++++++++++++++++++++ server/raft.go | 16 +++- server/stream.go | 6 +- 7 files changed, 246 insertions(+), 39 deletions(-) diff --git a/server/filestore.go b/server/filestore.go index 9bceef35202..188de4aca51 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -3430,7 +3430,6 @@ func (fs *fileStore) SkipMsgs(seq uint64, num uint64) error { lseq := seq + num - 1 mb.mu.Lock() - var needsRecord bool // If we are empty update meta directly. if mb.msgs == 0 { atomic.StoreUint64(&mb.last.seq, lseq) @@ -3438,7 +3437,6 @@ func (fs *fileStore) SkipMsgs(seq uint64, num uint64) error { atomic.StoreUint64(&mb.first.seq, lseq+1) mb.first.ts = nowts } else { - needsRecord = true for ; seq <= lseq; seq++ { mb.dmap.Insert(seq) } @@ -3446,9 +3444,7 @@ func (fs *fileStore) SkipMsgs(seq uint64, num uint64) error { mb.mu.Unlock() // Write out our placeholder. - if needsRecord { - mb.writeMsgRecord(emptyRecordLen, lseq|ebit, _EMPTY_, nil, nil, nowts, true) - } + mb.writeMsgRecord(emptyRecordLen, lseq|ebit, _EMPTY_, nil, nil, nowts, true) // Now update FS accounting. // Update fs state. @@ -8169,6 +8165,7 @@ func (fs *fileStore) deleteBlocks() DeleteBlocks { } // SyncDeleted will make sure this stream has same deleted state as dbs. +// This will only process deleted state within our current state. func (fs *fileStore) SyncDeleted(dbs DeleteBlocks) { if len(dbs) == 0 { return @@ -8177,18 +8174,22 @@ func (fs *fileStore) SyncDeleted(dbs DeleteBlocks) { fs.mu.Lock() defer fs.mu.Unlock() + lseq := fs.state.LastSeq var needsCheck DeleteBlocks fs.readLockAllMsgBlocks() mdbs := fs.deleteBlocks() for i, db := range dbs { + first, last, num := db.State() // If the block is same as what we have we can skip. if i < len(mdbs) { - first, last, num := db.State() eFirst, eLast, eNum := mdbs[i].State() if first == eFirst && last == eLast && num == eNum { continue } + } else if first > lseq { + // Skip blocks not applicable to our current state. + continue } // Need to insert these. needsCheck = append(needsCheck, db) diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 880f9346f28..505fab1f9d5 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -2405,9 +2405,6 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps // If we are interest based make sure to check consumers if interest retention policy. // This is to make sure we process any outstanding acks from all consumers. mset.checkInterestState() - // Make sure we create a new snapshot in case things have changed such that any existing - // snapshot may no longer be valid. - doSnapshot() // If we became leader during this time and we need to send a snapshot to our // followers, i.e. as a result of a scale-up from R1, do it now. if sendSnapshot && isLeader && mset != nil && n != nil { @@ -2941,6 +2938,7 @@ func (js *jetStream) applyStreamEntries(mset *stream, ce *CommittedEntry, isReco if err != nil { if err == errLastSeqMismatch { + var state StreamState mset.store.FastState(&state) @@ -2952,6 +2950,8 @@ func (js *jetStream) applyStreamEntries(mset *stream, ce *CommittedEntry, isReco // Retry err = mset.processJetStreamMsg(subject, reply, hdr, msg, lseq, ts) } + // FIXME(dlc) - We could just run a catchup with a request defining the span between what we expected + // and what we got. } // Only return in place if we are going to reset our stream or we are out of space, or we are closed. @@ -3568,9 +3568,14 @@ func (js *jetStream) processClusterUpdateStream(acc *Account, osa, sa *streamAss js.mu.Unlock() } - var needsSetLeader bool if !alreadyRunning && numReplicas > 1 { if needsNode { + // Since we are scaling up we want to make sure our sync subject + // is registered before we start our raft node. + mset.mu.Lock() + mset.startClusterSubs() + mset.mu.Unlock() + js.createRaftGroup(acc.GetName(), rg, storage, pprofLabels{ "type": "stream", "account": mset.accName(), @@ -3602,16 +3607,13 @@ func (js *jetStream) processClusterUpdateStream(acc *Account, osa, sa *streamAss rg.node = nil js.mu.Unlock() } + // Set the new stream assignment. + mset.setStreamAssignment(sa) + // Call update. if err = mset.updateWithAdvisory(cfg, !recovering); err != nil { s.Warnf("JetStream cluster error updating stream %q for account %q: %v", cfg.Name, acc.Name, err) } - // Set the new stream assignment. - mset.setStreamAssignment(sa) - // Make sure we are the leader now that we are R1. - if needsSetLeader { - mset.setLeader(true) - } } // If not found we must be expanding into this node since if we are here we know we are a member. @@ -7582,7 +7584,8 @@ func (mset *stream) supportsBinarySnapshotLocked() bool { // We know we support ourselves. continue } - if sir, ok := s.nodeToInfo.Load(p.ID); !ok || sir == nil || !sir.(nodeInfo).binarySnapshots { + // Since release 2.10.16 only deny if we know the other node does not support. + if sir, ok := s.nodeToInfo.Load(p.ID); ok && sir != nil && !sir.(nodeInfo).binarySnapshots { return false } } @@ -8681,7 +8684,7 @@ func (mset *stream) runCatchup(sendSubject string, sreq *streamSyncRequest) { return 0 } - nextBatchC := make(chan struct{}, 1) + nextBatchC := make(chan struct{}, 4) nextBatchC <- struct{}{} remoteQuitCh := make(chan struct{}) @@ -8706,19 +8709,18 @@ func (mset *stream) runCatchup(sendSubject string, sreq *streamSyncRequest) { // Kick ourselves and anyone else who might have stalled on global state. select { case nextBatchC <- struct{}{}: - // Reset our activity - notActive.Reset(activityInterval) default: } + // Reset our activity + notActive.Reset(activityInterval) }) defer s.sysUnsubscribe(ackSub) ackReplyT := strings.ReplaceAll(ackReply, ".*", ".%d") // Grab our state. var state StreamState - mset.mu.RLock() + // mset.store never changes after being set, don't need lock. mset.store.FastState(&state) - mset.mu.RUnlock() // Reset notion of first if this request wants sequences before our starting sequence // and we would have nothing to send. If we have partial messages still need to send skips for those. @@ -8756,7 +8758,7 @@ func (mset *stream) runCatchup(sendSubject string, sreq *streamSyncRequest) { // Wait til we can send at least 4k const minBatchWait = int32(4 * 1024) mw := time.NewTimer(minWait) - for done := false; !done; { + for done := maxOutMsgs-atomic.LoadInt32(&outm) > minBatchWait; !done; { select { case <-nextBatchC: done = maxOutMsgs-atomic.LoadInt32(&outm) > minBatchWait @@ -8811,9 +8813,33 @@ func (mset *stream) runCatchup(sendSubject string, sreq *streamSyncRequest) { dr.First, dr.Num = 0, 0 } + // See if we should use LoadNextMsg instead of walking sequence by sequence if we have an order magnitude more interior deletes. + // Only makes sense with delete range capabilities. + useLoadNext := drOk && (uint64(state.NumDeleted) > 10*state.Msgs) + var smv StoreMsg for ; seq <= last && atomic.LoadInt64(&outb) <= maxOutBytes && atomic.LoadInt32(&outm) <= maxOutMsgs && s.gcbBelowMax(); seq++ { - sm, err := mset.store.LoadMsg(seq, &smv) + var sm *StoreMsg + var err error + // Is we should use load next do so here. + if useLoadNext { + var nseq uint64 + sm, nseq, err = mset.store.LoadNextMsg(fwcs, true, seq, &smv) + if err == nil && nseq > seq { + dr.First, dr.Num = seq, nseq-seq + // Jump ahead + seq = nseq + } else if err == ErrStoreEOF { + dr.First, dr.Num = seq, state.LastSeq-seq + // Clear EOF here for normal processing. + err = nil + // Jump ahead + seq = state.LastSeq + } + } else { + sm, err = mset.store.LoadMsg(seq, &smv) + } + // if this is not a deleted msg, bail out. if err != nil && err != ErrStoreMsgNotFound && err != errDeletedMsg { if err == ErrStoreEOF { @@ -8829,6 +8855,10 @@ func (mset *stream) runCatchup(sendSubject string, sreq *streamSyncRequest) { if n := mset.raftNode(); n != nil { n.InstallSnapshot(mset.stateSnapshot()) } + // If we allow gap markers check if we have one pending. + if drOk && dr.First > 0 { + sendDR() + } // Signal EOF s.sendInternalMsgLocked(sendSubject, _EMPTY_, nil, nil) return false @@ -8875,6 +8905,9 @@ func (mset *stream) runCatchup(sendSubject string, sreq *streamSyncRequest) { } // Recheck our exit condition. if seq == last { + if drOk && dr.First > 0 { + sendDR() + } s.Noticef("Catchup for stream '%s > %s' complete", mset.account(), mset.name()) // EOF s.sendInternalMsgLocked(sendSubject, _EMPTY_, nil, nil) @@ -8890,7 +8923,6 @@ func (mset *stream) runCatchup(sendSubject string, sreq *streamSyncRequest) { if drOk && dr.First > 0 { sendDR() } - return true } @@ -8930,6 +8962,11 @@ func (mset *stream) runCatchup(sendSubject string, sreq *streamSyncRequest) { mset.clearCatchupPeer(sreq.Peer) return } + case <-time.After(500 * time.Millisecond): + if !sendNextBatchAndContinue(qch) { + mset.clearCatchupPeer(sreq.Peer) + return + } } } } diff --git a/server/jetstream_helpers_test.go b/server/jetstream_helpers_test.go index 43072323551..f130b52482c 100644 --- a/server/jetstream_helpers_test.go +++ b/server/jetstream_helpers_test.go @@ -38,9 +38,9 @@ import ( func init() { // Speed up raft for tests. hbInterval = 50 * time.Millisecond - minElectionTimeout = 750 * time.Millisecond - maxElectionTimeout = 2500 * time.Millisecond - lostQuorumInterval = 1 * time.Second + minElectionTimeout = 1500 * time.Millisecond + maxElectionTimeout = 3500 * time.Millisecond + lostQuorumInterval = 2 * time.Second lostQuorumCheck = 4 * hbInterval } diff --git a/server/memstore.go b/server/memstore.go index 3da696bcc6e..19560b04dac 100644 --- a/server/memstore.go +++ b/server/memstore.go @@ -1402,19 +1402,25 @@ func (ms *memStore) EncodedStreamState(failed uint64) ([]byte, error) { // SyncDeleted will make sure this stream has same deleted state as dbs. func (ms *memStore) SyncDeleted(dbs DeleteBlocks) { + ms.mu.Lock() + defer ms.mu.Unlock() + // For now we share one dmap, so if we have one entry here check if states are the same. // Note this will work for any DeleteBlock type, but we expect this to be a dmap too. if len(dbs) == 1 { - ms.mu.RLock() min, max, num := ms.dmap.State() - ms.mu.RUnlock() if pmin, pmax, pnum := dbs[0].State(); pmin == min && pmax == max && pnum == num { return } } + lseq := ms.state.LastSeq for _, db := range dbs { - db.Range(func(dseq uint64) bool { - ms.RemoveMsg(dseq) + // Skip if beyond our current state. + if first, _, _ := db.State(); first > lseq { + continue + } + db.Range(func(seq uint64) bool { + ms.removeMsg(seq, false) return true }) } diff --git a/server/norace_test.go b/server/norace_test.go index 9d61625e431..87d864314c2 100644 --- a/server/norace_test.go +++ b/server/norace_test.go @@ -10414,3 +10414,158 @@ func TestNoRaceFileStoreWriteFullStateUniqueSubjects(t *testing.T) { // ~500MB, could change if we tweak encodings.. require_True(t, fi.Size() > 500*1024*1024) } + +// When a catchup takes a long time and the ingest rate is high enough to cause us +// to drop append entries and move our first past out catchup window due to max bytes or max msgs, etc. +func TestNoRaceLargeStreamCatchups(t *testing.T) { + // This usually takes too long on Travis. + t.Skip() + + var jsMaxOutTempl = ` + listen: 127.0.0.1:-1 + server_name: %s + jetstream: {max_file_store: 22GB, store_dir: '%s', max_outstanding_catchup: 128KB} + cluster { + name: %s + listen: 127.0.0.1:%d + routes = [%s] + } + # For access to system account. + accounts { $SYS { users = [ { user: "admin", pass: "s3cr3t!" } ] } } +` + + c := createJetStreamClusterWithTemplate(t, jsMaxOutTempl, "R3S", 3) + defer c.shutdown() + + nc, _ := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + js, err := nc.JetStream(nats.PublishAsyncMaxPending(64 * 1024)) + require_NoError(t, err) + + cfg := &nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"Z.>"}, + MaxBytes: 4 * 1024 * 1024 * 1024, + Replicas: 1, // Start at R1 + } + + _, err = js.AddStream(cfg) + require_NoError(t, err) + + // Load up to a decent size first. + num, msg := 25_000, bytes.Repeat([]byte("Z"), 256*1024) + for i := 0; i < num; i++ { + _, err := js.PublishAsync("Z.Z.Z", msg) + require_NoError(t, err) + } + select { + case <-js.PublishAsyncComplete(): + case <-time.After(20 * time.Second): + t.Fatalf("Did not receive completion signal") + } + + var sl *Server + time.AfterFunc(time.Second, func() { + cfg.Replicas = 3 + _, err = js.UpdateStream(cfg) + require_NoError(t, err) + c.waitOnStreamLeader(globalAccountName, "TEST") + sl = c.streamLeader(globalAccountName, "TEST") + }) + + // Run for 60 seconds sending new messages at a high rate. + timeout := time.Now().Add(60 * time.Second) + for time.Now().Before(timeout) { + for i := 0; i < 5_000; i++ { + // Not worried about each message, so not checking err here. + // Just generating high load of new traffic while trying to catch up. + js.PublishAsync("Z.Z.Z", msg) + } + // This will gate us waiting on a response. + js.Publish("Z.Z.Z", msg) + } + + // Make sure the leader has not changed. + require_Equal(t, sl, c.streamLeader(globalAccountName, "TEST")) + + // Grab the leader and its state. + mset, err := sl.GlobalAccount().lookupStream("TEST") + require_NoError(t, err) + expected := mset.state() + + checkFor(t, 45*time.Second, time.Second, func() error { + for _, s := range c.servers { + if s == sl { + continue + } + mset, err := s.GlobalAccount().lookupStream("TEST") + require_NoError(t, err) + state := mset.state() + if !reflect.DeepEqual(expected, state) { + return fmt.Errorf("Follower %v state does not match: %+v vs %+v", s, state, expected) + } + } + return nil + }) +} + +func TestNoRaceLargeNumDeletesStreamCatchups(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + nc, _ := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + js, err := nc.JetStream(nats.PublishAsyncMaxPending(16 * 1024)) + require_NoError(t, err) + + cfg := &nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + Replicas: 1, // Start at R1 + } + _, err = js.AddStream(cfg) + require_NoError(t, err) + + // We will manipulate the stream at the lower level to achieve large number of interior deletes. + // We will store only 2 msgs, but have 100M deletes in between. + sl := c.streamLeader(globalAccountName, "TEST") + mset, err := sl.GlobalAccount().lookupStream("TEST") + require_NoError(t, err) + mset.mu.Lock() + mset.store.StoreMsg("foo", nil, []byte("ok")) + mset.store.SkipMsgs(2, 1_000_000_000) + mset.store.StoreMsg("foo", nil, []byte("ok")) + mset.store.SkipMsgs(1_000_000_003, 1_000_000_000) + var state StreamState + mset.store.FastState(&state) + mset.lseq = state.LastSeq + mset.mu.Unlock() + + cfg.Replicas = 3 + _, err = js.UpdateStream(cfg) + require_NoError(t, err) + c.waitOnStreamLeader(globalAccountName, "TEST") + mset, err = sl.GlobalAccount().lookupStream("TEST") + require_NoError(t, err) + expected := mset.state() + + // This should happen fast and not spin on all interior deletes. + checkFor(t, 250*time.Millisecond, 50*time.Millisecond, func() error { + for _, s := range c.servers { + if s == sl { + continue + } + mset, err := s.GlobalAccount().lookupStream("TEST") + require_NoError(t, err) + state := mset.state() + // Ignore LastTime for this test since we send delete range at end. + state.LastTime = expected.LastTime + if !reflect.DeepEqual(expected, state) { + return fmt.Errorf("Follower %v state does not match: %+v vs %+v", s, state, expected) + } + } + return nil + }) +} diff --git a/server/raft.go b/server/raft.go index 975ebd068bf..e762417754f 100644 --- a/server/raft.go +++ b/server/raft.go @@ -887,8 +887,10 @@ func (n *raft) ResumeApply() { } n.debug("Resuming our apply channel") - n.observer, n.pobserver = n.pobserver, false - n.paused = false + + // Reset before we start. + n.resetElectionTimeout() + // Run catchup.. if n.hcommit > n.commit { n.debug("Resuming %d replays", n.hcommit+1-n.commit) @@ -904,12 +906,16 @@ func (n *raft) ResumeApply() { runtime.Gosched() // Simply re-acquire n.Lock() - // Need to check if we got closed or if we were paused again. - if n.State() == Closed || n.paused { + // Need to check if we got closed. + if n.State() == Closed { return } } } + + // Clear our observer and paused state after we apply. + n.observer, n.pobserver = n.pobserver, false + n.paused = false n.hcommit = 0 // If we had been selected to be the next leader campaign here now that we have resumed. @@ -3352,7 +3358,7 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) { if l > paeWarnThreshold && l%paeWarnModulo == 0 { n.warn("%d append entries pending", len(n.pae)) } - } else { + } else if l%paeWarnModulo == 0 { n.debug("Not saving to append entries pending") } } else { diff --git a/server/stream.go b/server/stream.go index 518fb6599db..b3add51cc71 100644 --- a/server/stream.go +++ b/server/stream.go @@ -840,7 +840,9 @@ func (mset *stream) setLeader(isLeader bool) error { if isLeader { // Make sure we are listening for sync requests. // TODO(dlc) - Original design was that all in sync members of the group would do DQ. - mset.startClusterSubs() + if mset.isClustered() { + mset.startClusterSubs() + } // Setup subscriptions if we were not already the leader. if err := mset.subscribeToStream(); err != nil { @@ -875,7 +877,7 @@ func (mset *stream) setLeader(isLeader bool) error { // Lock should be held. func (mset *stream) startClusterSubs() { - if mset.isClustered() && mset.syncSub == nil { + if mset.syncSub == nil { mset.syncSub, _ = mset.srv.systemSubscribe(mset.sa.Sync, _EMPTY_, false, mset.sysc, mset.handleClusterSyncRequest) } }