Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

General Updates and Stability Improvements #2131

Merged
merged 19 commits into from
Apr 20, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion server/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ var (

const (
// VERSION is the current version for the server.
VERSION = "2.2.2-beta.5"
VERSION = "2.2.2-beta.8"

// PROTO is the currently supported protocol.
// 0 was the original
Expand Down
8 changes: 6 additions & 2 deletions server/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2735,10 +2735,14 @@ func (o *consumer) stopWithFlags(dflag, doSignal, advisory bool) error {
o.active = false
o.unsubscribe(o.ackSub)
o.unsubscribe(o.reqSub)
o.unsubscribe(o.infoSub)
o.unsubscribe(o.fcSub)
o.ackSub = nil
o.reqSub = nil
o.infoSub = nil
o.fcSub = nil
if o.infoSub != nil {
o.srv.sysUnsubscribe(o.infoSub)
o.infoSub = nil
}
c := o.client
o.client = nil
sysc := o.sysc
Expand Down
36 changes: 21 additions & 15 deletions server/jetstream_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,13 +221,16 @@ func (s *Server) JetStreamSnapshotMeta() error {
return ErrJetStreamNotEnabled
}
js.mu.RLock()
defer js.mu.RUnlock()
cc := js.cluster
if !cc.isLeader() {
isLeader := cc.isLeader()
meta := cc.meta
js.mu.RUnlock()

if !isLeader {
return errNotLeader
}

return cc.meta.InstallSnapshot(js.metaSnapshot())
return meta.InstallSnapshot(js.metaSnapshot())
}

func (s *Server) JetStreamStepdownStream(account, stream string) error {
Expand Down Expand Up @@ -1363,7 +1366,11 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment) {
doSnapshot()
}
} else if err == errLastSeqMismatch {
if mset.isMirror() {
mset.mu.RLock()
isLeader := mset.isLeader()
mset.mu.RUnlock()

if mset.isMirror() && isLeader {
mset.retryMirrorConsumer()
} else {
s.Warnf("Got stream sequence mismatch for '%s > %s'", sa.Client.serviceAccount(), sa.Config.Name)
Expand Down Expand Up @@ -1546,6 +1553,7 @@ func (js *jetStream) applyStreamEntries(mset *stream, ce *CommittedEntry, isReco
if mset == nil {
continue
}
s := js.srv

subject, reply, hdr, msg, lseq, ts, err := decodeStreamMsg(buf[1:])
if err != nil {
Expand All @@ -1555,8 +1563,10 @@ func (js *jetStream) applyStreamEntries(mset *stream, ce *CommittedEntry, isReco
// We can skip if we know this is less than what we already have.
last := mset.lastSeq()
if lseq < last {
s.Debugf("Apply stream entries skipping message with sequence %d with last of %d", lseq, last)
continue
}

// Skip by hand here since first msg special case.
// Reason is sequence is unsigned and for lseq being 0
// the lseq under stream would have be -1.
Expand All @@ -1567,8 +1577,6 @@ func (js *jetStream) applyStreamEntries(mset *stream, ce *CommittedEntry, isReco
// Check for flowcontrol here.
mset.checkForFlowControl(lseq + 1)

s := js.srv

// Messages to be skipped have no subject or timestamp.
if subject == _EMPTY_ && ts == 0 {
// Skip and update our lseq.
Expand Down Expand Up @@ -4159,6 +4167,7 @@ func (mset *stream) processClusteredInboundMsg(subject, reply string, hdr, msg [
s, js, jsa, st, rf, outq := mset.srv, mset.js, mset.jsa, mset.cfg.Storage, mset.cfg.Replicas, mset.outq
maxMsgSize := int(mset.cfg.MaxMsgSize)
msetName := mset.cfg.Name
lseq := mset.lseq
mset.mu.RUnlock()

// Check here pre-emptively if we have exceeded this server limits.
Expand Down Expand Up @@ -4222,7 +4231,7 @@ func (mset *stream) processClusteredInboundMsg(subject, reply string, hdr, msg [
// We only use mset.clseq for clustering and in case we run ahead of actual commits.
// Check if we need to set initial value here
mset.clMu.Lock()
if mset.clseq == 0 {
if mset.clseq == 0 || mset.clseq < lseq {
mset.mu.RLock()
mset.clseq = mset.lseq
mset.mu.RUnlock()
Expand All @@ -4234,25 +4243,22 @@ func (mset *stream) processClusteredInboundMsg(subject, reply string, hdr, msg [

// Do proposal.
err := mset.node.Propose(esm)
if err != nil {
mset.clseq--
}
mset.clMu.Unlock()

if err != nil {
seq = 0
mset.mu.Lock()
mset.clseq--
mset.mu.Unlock()
if canRespond {
var resp = &JSPubAckResponse{PubAck: &PubAck{Stream: mset.cfg.Name}}
resp.Error = &ApiError{Code: 503, Description: err.Error()}
response, _ = json.Marshal(resp)
// If we errored out respond here.
outq.send(&jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, response, nil, 0, nil})
}
}

// If we errored out respond here.
if err != nil && canRespond {
outq.send(&jsPubMsg{reply, _EMPTY_, _EMPTY_, nil, response, nil, 0, nil})
}

if err != nil && isOutOfSpaceErr(err) {
s.handleOutOfSpace(msetName)
}
Expand Down
Loading