Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mirrors failed when upstream messages had expired. #2110

Merged
merged 1 commit into from
Apr 13, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 15 additions & 3 deletions server/jetstream_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -1363,9 +1363,13 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment) {
doSnapshot()
}
} else if err == errLastSeqMismatch {
s.Warnf("Got stream sequence mismatch for '%s > %s'", sa.Client.serviceAccount(), sa.Config.Name)
if mset.resetClusteredState() {
return
if mset.isMirror() {
mset.retryMirrorConsumer()
} else {
s.Warnf("Got stream sequence mismatch for '%s > %s'", sa.Client.serviceAccount(), sa.Config.Name)
if mset.resetClusteredState() {
return
}
}
} else {
s.Warnf("Error applying entries to '%s > %s'", sa.Client.serviceAccount(), sa.Config.Name)
Expand Down Expand Up @@ -1564,6 +1568,14 @@ func (js *jetStream) applyStreamEntries(mset *stream, ce *CommittedEntry, isReco
mset.checkForFlowControl(lseq + 1)

s := js.srv

// Messages to be skipped have no subject or timestamp.
if subject == _EMPTY_ && ts == 0 {
// Skip and update our lseq.
mset.setLastSeq(mset.store.SkipMsg())
continue
}

if err := mset.processJetStreamMsg(subject, reply, hdr, msg, lseq, ts); err != nil {
if !isRecovering {
if err == errLastSeqMismatch {
Expand Down
164 changes: 164 additions & 0 deletions server/jetstream_cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5640,6 +5640,170 @@ func TestJetStreamClusterStreamInfoDeletedDetails(t *testing.T) {
}
}

func TestJetStreamClusterMirrorExpirationAndMissingSequences(t *testing.T) {
c := createJetStreamClusterExplicit(t, "MMS", 5)
defer c.shutdown()

// Client for API requests.
nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

// Origin
_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
MaxAge: 100 * time.Millisecond,
})

ts := c.streamLeader("$G", "TEST")

sendBatch := func(n int) {
t.Helper()
// Send a batch to a given subject.
for i := 0; i < n; i++ {
if _, err := js.Publish("TEST", []byte("OK")); err != nil {
t.Fatalf("Unexpected publish error: %v", err)
}
}
}

checkStream := func(stream string, num uint64) {
t.Helper()
checkFor(t, 5*time.Second, 50*time.Millisecond, func() error {
si, err := js.StreamInfo(stream)
if err != nil {
return err
}
if si.State.Msgs != num {
return fmt.Errorf("Expected %d msgs, got %d", num, si.State.Msgs)
}
return nil
})
}

checkMirror := func(num uint64) { t.Helper(); checkStream("M", num) }
checkTest := func(num uint64) { t.Helper(); checkStream("TEST", num) }

for _, test := range []struct {
name string
replicas int
}{
{"R-1", 1},
{"R-2", 2},
} {
t.Run(test.name, func(t *testing.T) {
// Create mirror now.
for ms := ts; ms == ts; {
_, err = js.AddStream(&nats.StreamConfig{
Name: "M",
Mirror: &nats.StreamSource{Name: "TEST"},
Replicas: test.replicas,
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
ms = c.streamLeader("$G", "M")
if ms == ts {
// Delete and retry.
js.DeleteStream("M")
} else {
defer js.DeleteStream("M")
}
}

sendBatch(10)
checkMirror(10)

// Now shutdown the server with the mirror.
ms := c.streamLeader("$G", "M")
ms.Shutdown()

// Send more messages but let them expire.
sendBatch(10)
checkTest(0)

c.restartServer(ms)
c.waitOnStreamLeader("$G", "M")

sendBatch(10)
checkMirror(20)
})
}
}

func TestJetStreamClusterMirrorAndSourceExpiration(t *testing.T) {
c := createJetStreamClusterExplicit(t, "MSE", 3)
defer c.shutdown()

// Client for API requests.
nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

// Origin
_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

sendBatch := func(n int) {
t.Helper()
// Send a batch to a given subject.
for i := 0; i < n; i++ {
if _, err := js.Publish("TEST", []byte("OK")); err != nil {
t.Fatalf("Unexpected publish error: %v", err)
}
}
}

checkStream := func(stream string, num uint64) {
t.Helper()
checkFor(t, 5*time.Second, 50*time.Millisecond, func() error {
si, err := js.StreamInfo(stream)
if err != nil {
return err
}
if si.State.Msgs != num {
return fmt.Errorf("Expected %d msgs, got %d", num, si.State.Msgs)
}
return nil
})
}

checkSource := func(num uint64) { t.Helper(); checkStream("S", num) }
checkMirror := func(num uint64) { t.Helper(); checkStream("M", num) }
checkTest := func(num uint64) { t.Helper(); checkStream("TEST", num) }

_, err = js.AddStream(&nats.StreamConfig{
Name: "M",
Mirror: &nats.StreamSource{Name: "TEST"},
Replicas: 2,
MaxAge: 100 * time.Millisecond,
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

_, err = js.AddStream(&nats.StreamConfig{
Name: "S",
Sources: []*nats.StreamSource{{Name: "TEST"}},
Replicas: 2,
MaxAge: 100 * time.Millisecond,
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

sendBatch(20)
checkTest(20)
checkMirror(20)
checkSource(20)

// Make sure they expire.
checkMirror(0)
checkSource(0)
}

// Support functions

// Used to setup superclusters for tests.
Expand Down
19 changes: 17 additions & 2 deletions server/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -1332,7 +1332,8 @@ func (mset *stream) setupMirrorConsumer() error {
if mset.mirror.sub != nil {
mset.unsubscribe(mset.mirror.sub)
mset.mirror.sub = nil
mset.mirror.dseq = 1
mset.mirror.dseq = 0
mset.mirror.sseq = mset.lseq
}
// Make sure to delete any prior consumers if we know about them.
mset.removeInternalConsumer(mset.mirror)
Expand Down Expand Up @@ -1431,6 +1432,19 @@ func (mset *stream) setupMirrorConsumer() error {
return
}

// When an upstream stream expires messages or in general has messages that we want
// that are no longer available we need to adjust here.
mset.store.FastState(&state)
if state.LastSeq != ccr.ConsumerInfo.Delivered.Stream {
for seq := state.LastSeq + 1; seq <= ccr.ConsumerInfo.Delivered.Stream; seq++ {
if mset.node != nil {
mset.node.Propose(encodeStreamMsg(_EMPTY_, _EMPTY_, nil, nil, seq-1, 0))
} else {
mset.lseq = mset.store.SkipMsg()
}
}
}

// Capture consumer name.
mset.mirror.cname = ccr.ConsumerInfo.Name
msgs := mset.mirror.msgs
Expand All @@ -1448,7 +1462,7 @@ func (mset *stream) setupMirrorConsumer() error {
mset.mirror.err = nil
mset.mirror.sub = sub
mset.mirror.last = time.Now()
mset.mirror.dseq = 1
mset.mirror.dseq = 0
}
mset.mu.Unlock()
}
Expand Down Expand Up @@ -2344,6 +2358,7 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte,
isMisMatch = false
}
}

if isMisMatch {
outq := mset.outq
mset.mu.Unlock()
Expand Down