Skip to content

Commit

Permalink
Cherry-picks for 2.10.22-RC.3 (#6012)
Browse files Browse the repository at this point in the history
Includes:

- #5986
- #5995 
- #6000
- #5996
- #6002
- #6003
- #6007

Signed-off-by: Neil Twigg <neil@nats.io>
  • Loading branch information
neilalexander authored Oct 16, 2024
2 parents 2c38cc8 + 05331c3 commit d493f0d
Show file tree
Hide file tree
Showing 14 changed files with 586 additions and 291 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/cov.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ jobs:
- name: Convert coverage.out to coverage.lcov
# Use commit hash here to avoid a re-tagging attack, as this is a third-party action
# Commit c680c0f7c7442485f1749eb2a13e54a686e76eb5 = tag v1.0.9
uses: jandelgado/gcov2lcov-action@c680c0f7c7442485f1749eb2a13e54a686e76eb5
# Commit 69ef3d59a24cc6e062516a73d8be123e85b15cc0 = tag v1.1.0
uses: jandelgado/gcov2lcov-action@69ef3d59a24cc6e062516a73d8be123e85b15cc0
with:
infile: acc.out
working-directory: src/github.com/nats-io/nats-server
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/nats-io/nats-server/v2
go 1.21.0

require (
github.com/klauspost/compress v1.17.10
github.com/klauspost/compress v1.17.11
github.com/minio/highwayhash v1.0.3
github.com/nats-io/jwt/v2 v2.5.8
github.com/nats-io/nats.go v1.36.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/klauspost/compress v1.17.10 h1:oXAz+Vh0PMUvJczoi+flxpnBEPxoER1IaAnU/NMPtT0=
github.com/klauspost/compress v1.17.10/go.mod h1:pMDklpSncoRMuLFrf1W9Ss9KT+0rH90U12bZKk7uwG0=
github.com/klauspost/compress v1.17.11 h1:In6xLpyWOi1+C7tXUUWv2ot1QvBjxevKAaI6IXrJmUc=
github.com/klauspost/compress v1.17.11/go.mod h1:pMDklpSncoRMuLFrf1W9Ss9KT+0rH90U12bZKk7uwG0=
github.com/minio/highwayhash v1.0.3 h1:kbnuUMoHYyVl7szWjSxJnxw11k2U709jqFPPmIUyD6Q=
github.com/minio/highwayhash v1.0.3/go.mod h1:GGYsuwP/fPD6Y9hMiXuapVvlIUEhFhMTh0rxU3ik1LQ=
github.com/nats-io/jwt/v2 v2.5.8 h1:uvdSzwWiEGWGXf+0Q+70qv6AQdvcvxrv9hPM0RiPamE=
Expand Down
51 changes: 45 additions & 6 deletions server/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,9 @@ var (
AckNext = []byte("+NXT")
// Terminate delivery of the message.
AckTerm = []byte("+TERM")
)

const (
// reasons to supply when terminating messages using limits
ackTermLimitsReason = "Message deleted by stream limits"
ackTermUnackedLimitsReason = "Unacknowledged message was deleted"
Expand Down Expand Up @@ -1290,6 +1292,9 @@ func (o *consumer) setLeader(isLeader bool) {
pullMode := o.isPullMode()
o.mu.Unlock()

// Check if there are any pending we might need to clean up etc.
o.checkPending()

// Snapshot initial info.
o.infoWithSnap(true)

Expand Down Expand Up @@ -1687,12 +1692,39 @@ func (o *consumer) config() ConsumerConfig {
return o.cfg
}

// Check if we have hit max deliveries. If so do notification and cleanup.
// Return whether or not the max was hit.
// Lock should be held.
func (o *consumer) hasMaxDeliveries(seq uint64) bool {
if o.maxdc == 0 {
return false
}
if dc := o.deliveryCount(seq); dc >= o.maxdc {
// We have hit our max deliveries for this sequence.
// Only send the advisory once.
if dc == o.maxdc {
o.notifyDeliveryExceeded(seq, dc)
}
// Determine if we signal to start flow of messages again.
if o.maxp > 0 && len(o.pending) >= o.maxp {
o.signalNewMessages()
}
// Cleanup our tracking.
delete(o.pending, seq)
if o.rdc != nil {
delete(o.rdc, seq)
}
return true
}
return false
}

// Force expiration of all pending.
// Lock should be held.
func (o *consumer) forceExpirePending() {
var expired []uint64
for seq := range o.pending {
if !o.onRedeliverQueue(seq) {
if !o.onRedeliverQueue(seq) && !o.hasMaxDeliveries(seq) {
expired = append(expired, seq)
}
}
Expand Down Expand Up @@ -3416,6 +3448,14 @@ func trackDownAccountAndInterest(acc *Account, interest string) (*Account, strin
return acc, interest
}

// Return current delivery count for a given sequence.
func (o *consumer) deliveryCount(seq uint64) uint64 {
if o.rdc == nil {
return 1
}
return o.rdc[seq]
}

// Increase the delivery count for this message.
// ONLY used on redelivery semantics.
// Lock should be held.
Expand Down Expand Up @@ -3581,9 +3621,7 @@ func (o *consumer) getNextMsg() (*jsPubMsg, uint64, error) {
var pmsg = getJSPubMsgFromPool()

// Grab next message applicable to us.
// We will unlock here in case lots of contention, e.g. WQ.
filters, subjf, fseq := o.filters, o.subjf, o.sseq
o.mu.Unlock()
// Check if we are multi-filtered or not.
if filters != nil {
sm, sseq, err = store.LoadNextMsgMulti(filters, fseq, &pmsg.StoreMsg)
Expand All @@ -3598,8 +3636,6 @@ func (o *consumer) getNextMsg() (*jsPubMsg, uint64, error) {
pmsg.returnToPool()
pmsg = nil
}
// Re-acquire lock.
o.mu.Lock()
// Check if we should move our o.sseq.
if sseq >= o.sseq {
// If we are moving step by step then sseq == o.sseq.
Expand Down Expand Up @@ -4630,7 +4666,10 @@ func (o *consumer) checkPending() {
}
}
if elapsed >= deadline {
if !o.onRedeliverQueue(seq) {
// We will check if we have hit our max deliveries. Previously we would do this on getNextMsg() which
// worked well for push consumers, but with pull based consumers would require a new pull request to be
// present to process and redelivered could be reported incorrectly.
if !o.onRedeliverQueue(seq) && !o.hasMaxDeliveries(seq) {
expired = append(expired, seq)
}
} else if deadline-elapsed < next {
Expand Down
50 changes: 26 additions & 24 deletions server/jetstream_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -2794,33 +2794,35 @@ func (s *Server) jsLeaderStepDownRequest(sub *subscription, c *client, _ *Accoun
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
if len(req.Placement.Tags) > 0 {
// Tags currently not supported.
resp.Error = NewJSClusterTagsError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
cn := req.Placement.Cluster
var peers []string
ourID := cc.meta.ID()
for _, p := range cc.meta.Peers() {
if si, ok := s.nodeToInfo.Load(p.ID); ok && si != nil {
if ni := si.(nodeInfo); ni.offline || ni.cluster != cn || p.ID == ourID {
continue
if req.Placement != nil {
if len(req.Placement.Tags) > 0 {
// Tags currently not supported.
resp.Error = NewJSClusterTagsError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
cn := req.Placement.Cluster
var peers []string
ourID := cc.meta.ID()
for _, p := range cc.meta.Peers() {
if si, ok := s.nodeToInfo.Load(p.ID); ok && si != nil {
if ni := si.(nodeInfo); ni.offline || ni.cluster != cn || p.ID == ourID {
continue
}
peers = append(peers, p.ID)
}
peers = append(peers, p.ID)
}
if len(peers) == 0 {
resp.Error = NewJSClusterNoPeersError(fmt.Errorf("no replacement peer connected"))
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
// Randomize and select.
if len(peers) > 1 {
rand.Shuffle(len(peers), func(i, j int) { peers[i], peers[j] = peers[j], peers[i] })
}
preferredLeader = peers[0]
}
if len(peers) == 0 {
resp.Error = NewJSClusterNoPeersError(fmt.Errorf("no replacement peer connected"))
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
// Randomize and select.
if len(peers) > 1 {
rand.Shuffle(len(peers), func(i, j int) { peers[i], peers[j] = peers[j], peers[i] })
}
preferredLeader = peers[0]
}

// Call actual stepdown.
Expand Down
7 changes: 4 additions & 3 deletions server/jetstream_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -2266,7 +2266,7 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
// from underneath the one that is running since it will be the same raft node.
defer func() {
// We might be closing during shutdown, don't pre-emptively stop here since we'll still want to install snapshots.
if !mset.closed.Load() {
if mset != nil && !mset.closed.Load() {
n.Stop()
}
}()
Expand Down Expand Up @@ -2851,7 +2851,7 @@ func (mset *stream) resetClusteredState(err error) bool {
}

if node != nil {
if err == errCatchupTooManyRetries {
if errors.Is(err, errCatchupAbortedNoLeader) || err == errCatchupTooManyRetries {
// Don't delete all state, could've just been temporarily unable to reach the leader.
node.Stop()
} else {
Expand Down Expand Up @@ -8186,6 +8186,7 @@ var (
errCatchupStreamStopped = errors.New("stream has been stopped") // when a catchup is terminated due to the stream going away.
errCatchupBadMsg = errors.New("bad catchup msg")
errCatchupWrongSeqForSkip = errors.New("wrong sequence for skipped msg")
errCatchupAbortedNoLeader = errors.New("catchup aborted, no leader")
errCatchupTooManyRetries = errors.New("catchup failed, too many retries")
)

Expand Down Expand Up @@ -8289,7 +8290,7 @@ RETRY:
releaseSyncOutSem()

if n.GroupLeader() == _EMPTY_ {
return fmt.Errorf("catchup for stream '%s > %s' aborted, no leader", mset.account(), mset.name())
return fmt.Errorf("%w for stream '%s > %s'", errCatchupAbortedNoLeader, mset.account(), mset.name())
}

// If we have a sub clear that here.
Expand Down
58 changes: 58 additions & 0 deletions server/jetstream_cluster_1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6440,6 +6440,64 @@ func TestJetStreamClusterMetaStepdownFromNonSysAccount(t *testing.T) {
require_NotEqual(t, ml, c.leader())
}

func TestJetStreamClusterMaxDeliveriesOnInterestStreams(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

// Client based API
nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo.*"},
Retention: nats.InterestPolicy,
})
require_NoError(t, err)

sub1, err := js.PullSubscribe("foo.*", "c1", nats.AckWait(10*time.Millisecond), nats.MaxDeliver(1))
require_NoError(t, err)

sub2, err := js.PullSubscribe("foo.*", "c2", nats.AckWait(10*time.Millisecond), nats.MaxDeliver(1))
require_NoError(t, err)

js.Publish("foo.bar", []byte("HELLO"))

si, err := js.StreamInfo("TEST")
require_NoError(t, err)
require_Equal(t, si.State.Msgs, 1)

msgs, err := sub1.Fetch(1)
require_NoError(t, err)
require_Equal(t, len(msgs), 1)

msgs, err = sub2.Fetch(1)
require_NoError(t, err)
require_Equal(t, len(msgs), 1)

// Wait for redelivery to both consumers which will do nothing.
time.Sleep(250 * time.Millisecond)

// Now check that stream and consumer infos are correct.
si, err = js.StreamInfo("TEST")
require_NoError(t, err)
// Messages that are skipped due to max deliveries should NOT remove messages.
require_Equal(t, si.State.Msgs, 1)
require_Equal(t, si.State.Consumers, 2)

for _, cname := range []string{"c1", "c2"} {
ci, err := js.ConsumerInfo("TEST", cname)
require_NoError(t, err)
require_Equal(t, ci.Delivered.Consumer, 1)
require_Equal(t, ci.Delivered.Stream, 1)
require_Equal(t, ci.AckFloor.Consumer, 1)
require_Equal(t, ci.AckFloor.Stream, 1)
require_Equal(t, ci.NumAckPending, 0)
require_Equal(t, ci.NumRedelivered, 0)
require_Equal(t, ci.NumPending, 0)
}
}

//
// DO NOT ADD NEW TESTS IN THIS FILE (unless to balance test times)
// Add at the end of jetstream_cluster_<n>_test.go, with <n> being the highest value.
Expand Down
Loading

0 comments on commit d493f0d

Please sign in to comment.