Skip to content

Commit

Permalink
Backports from v2.10 for v2.9.23 release (#4647)
Browse files Browse the repository at this point in the history
Backports the following PRs from v2.10:

- #4510 
- #4556 
- #4578 
- #4605 

Signed-off-by: Your Name <wally@nats.io>
  • Loading branch information
wallyqs authored Oct 11, 2023
2 parents 770cf2e + 28eb7c0 commit 8b981a2
Show file tree
Hide file tree
Showing 11 changed files with 398 additions and 15 deletions.
6 changes: 6 additions & 0 deletions server/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4375,6 +4375,12 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, deliver,
continue
}

// If we are a spoke leaf node make sure to not forward across routes.
// This mimics same behavior for normal subs above.
if c.kind == LEAF && c.isSpokeLeafNode() && sub.client.kind == ROUTER {
continue
}

// We have taken care of preferring local subs for a message from a route above.
// Here we just care about a client or leaf and skipping a leaf and preferring locals.
if dst := sub.client.kind; dst == ROUTER || dst == LEAF {
Expand Down
11 changes: 9 additions & 2 deletions server/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -1816,8 +1816,12 @@ func (o *consumer) loopAndForwardProposals(qch chan struct{}) {
return
}

forwardProposals := func() {
forwardProposals := func() error {
o.mu.Lock()
if o.node != node || node.State() != Leader {
o.mu.Unlock()
return errors.New("no longer leader")
}
proposal := o.phead
o.phead, o.ptail = nil, nil
o.mu.Unlock()
Expand All @@ -1839,6 +1843,7 @@ func (o *consumer) loopAndForwardProposals(qch chan struct{}) {
if len(entries) > 0 {
node.ProposeDirect(entries)
}
return nil
}

// In case we have anything pending on entry.
Expand All @@ -1850,7 +1855,9 @@ func (o *consumer) loopAndForwardProposals(qch chan struct{}) {
forwardProposals()
return
case <-pch:
forwardProposals()
if err := forwardProposals(); err != nil {
return
}
}
}
}
Expand Down
73 changes: 72 additions & 1 deletion server/jetstream_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -1159,6 +1159,65 @@ func (js *jetStream) checkForOrphans() {
}
}

// Check and delete any orphans we may come across.
func (s *Server) checkForNRGOrphans() {
js, cc := s.getJetStreamCluster()
if js == nil || cc == nil || js.isMetaRecovering() {
// No cluster means no NRGs. Also return if still recovering.
return
}

// Track which assets R>1 should be on this server.
nrgMap := make(map[string]struct{})
trackGroup := func(rg *raftGroup) {
// If R>1 track this as a legit NRG.
if rg.node != nil {
nrgMap[rg.Name] = struct{}{}
}
}
// Register our meta.
js.mu.RLock()
meta := cc.meta
if meta == nil {
js.mu.RUnlock()
// Bail with no meta node.
return
}

ourID := meta.ID()
nrgMap[meta.Group()] = struct{}{}

// Collect all valid groups from our assignments.
for _, asa := range cc.streams {
for _, sa := range asa {
if sa.Group.isMember(ourID) && sa.Restore == nil {
trackGroup(sa.Group)
for _, ca := range sa.consumers {
if ca.Group.isMember(ourID) {
trackGroup(ca.Group)
}
}
}
}
}
js.mu.RUnlock()

// Check NRGs that are running.
var needDelete []RaftNode
s.rnMu.RLock()
for name, n := range s.raftNodes {
if _, ok := nrgMap[name]; !ok {
needDelete = append(needDelete, n)
}
}
s.rnMu.RUnlock()

for _, n := range needDelete {
s.Warnf("Detected orphaned NRG %q, will cleanup", n.Group())
n.Delete()
}
}

func (js *jetStream) monitorCluster() {
s, n := js.server(), js.getMetaGroup()
qch, rqch, lch, aq := js.clusterQuitC(), n.QuitC(), n.LeadChangeC(), n.ApplyQ()
Expand Down Expand Up @@ -1189,6 +1248,8 @@ func (js *jetStream) monitorCluster() {
if hs := s.healthz(nil); hs.Error != _EMPTY_ {
s.Warnf("%v", hs.Error)
}
// Also check for orphaned NRGs.
s.checkForNRGOrphans()
}

var (
Expand Down Expand Up @@ -1269,7 +1330,6 @@ func (js *jetStream) monitorCluster() {
go checkHealth()
continue
}
// FIXME(dlc) - Deal with errors.
if didSnap, didStreamRemoval, didConsumerRemoval, err := js.applyMetaEntries(ce.Entries, ru); err == nil {
_, nb := n.Applied(ce.Index)
if js.hasPeerEntries(ce.Entries) || didStreamRemoval || (didSnap && !isLeader) {
Expand All @@ -1280,6 +1340,8 @@ func (js *jetStream) monitorCluster() {
doSnapshot()
}
ce.ReturnToPool()
} else {
s.Warnf("Error applying JetStream cluster entries: %v", err)
}
}
aq.recycle(&ces)
Expand Down Expand Up @@ -2028,6 +2090,15 @@ func (mset *stream) removeNode() {
}
}

func (mset *stream) clearRaftNode() {
if mset == nil {
return
}
mset.mu.Lock()
defer mset.mu.Unlock()
mset.node = nil
}

// Helper function to generate peer info.
// lists and sets for old and new.
func genPeerInfo(peers []string, split int) (newPeers, oldPeers []string, newPeerSet, oldPeerSet map[string]bool) {
Expand Down
177 changes: 176 additions & 1 deletion server/jetstream_cluster_3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package server

import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
Expand Down Expand Up @@ -5305,7 +5306,7 @@ func TestJetStreamClusterCheckFileStoreBlkSizes(t *testing.T) {
nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

// Nowmal Stream
// Normal Stream
_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"*"},
Expand Down Expand Up @@ -5384,3 +5385,177 @@ func TestJetStreamClusterCheckFileStoreBlkSizes(t *testing.T) {
require_True(t, blkSize(fs) == defaultMediumBlockSize)
}
}

func TestJetStreamClusterDetectOrphanNRGs(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

// Normal Stream
_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"*"},
Replicas: 3,
})
require_NoError(t, err)

_, err = js.AddConsumer("TEST", &nats.ConsumerConfig{
Durable: "DC",
AckPolicy: nats.AckExplicitPolicy,
})
require_NoError(t, err)

// We will force an orphan for a certain server.
s := c.randomNonStreamLeader(globalAccountName, "TEST")

mset, err := s.GlobalAccount().lookupStream("TEST")
require_NoError(t, err)
sgn := mset.raftNode().Group()
mset.clearRaftNode()

o := mset.lookupConsumer("DC")
require_True(t, o != nil)
ogn := o.raftNode().Group()
o.clearRaftNode()

require_NoError(t, js.DeleteStream("TEST"))

// Check that we do in fact have orphans.
require_True(t, s.numRaftNodes() > 1)

// This function will detect orphans and clean them up.
s.checkForNRGOrphans()

// Should only be meta NRG left.
require_True(t, s.numRaftNodes() == 1)
require_True(t, s.lookupRaftNode(sgn) == nil)
require_True(t, s.lookupRaftNode(ogn) == nil)
}

func TestJetStreamClusterRestartThenScaleStreamReplicas(t *testing.T) {
t.Skip("This test takes too long, need to make shorter")

c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

s := c.randomNonLeader()
nc, js := jsClientConnect(t, s)
defer nc.Close()

nc2, producer := jsClientConnect(t, s)
defer nc2.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Replicas: 3,
})
require_NoError(t, err)
c.waitOnStreamLeader(globalAccountName, "TEST")

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

end := time.Now().Add(2 * time.Second)
for time.Now().Before(end) {
producer.Publish("foo", []byte(strings.Repeat("A", 128)))
time.Sleep(time.Millisecond)
}

var wg sync.WaitGroup
for i := 0; i < 5; i++ {
sub, err := js.PullSubscribe("foo", fmt.Sprintf("C-%d", i))
require_NoError(t, err)

wg.Add(1)
go func() {
defer wg.Done()
for range time.NewTicker(10 * time.Millisecond).C {
select {
case <-ctx.Done():
return
default:
}

msgs, err := sub.Fetch(1)
if err != nil && !errors.Is(err, nats.ErrTimeout) && !errors.Is(err, nats.ErrConnectionClosed) {
t.Logf("Pull Error: %v", err)
}
for _, msg := range msgs {
msg.Ack()
}
}
}()
}
c.lameDuckRestartAll()
c.waitOnStreamLeader(globalAccountName, "TEST")

// Swap the logger to try to detect the condition after the restart.
loggers := make([]*captureDebugLogger, 3)
for i, srv := range c.servers {
l := &captureDebugLogger{dbgCh: make(chan string, 10)}
loggers[i] = l
srv.SetLogger(l, true, false)
}
condition := `Direct proposal ignored, not leader (state: CLOSED)`
errCh := make(chan error, 10)

wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case dl := <-loggers[0].dbgCh:
if strings.Contains(dl, condition) {
errCh <- fmt.Errorf(condition)
}
case dl := <-loggers[1].dbgCh:
if strings.Contains(dl, condition) {
errCh <- fmt.Errorf(condition)
}
case dl := <-loggers[2].dbgCh:
if strings.Contains(dl, condition) {
errCh <- fmt.Errorf(condition)
}
case <-ctx.Done():
return
}
}
}()

// Start publishing again for a while.
end = time.Now().Add(2 * time.Second)
for time.Now().Before(end) {
producer.Publish("foo", []byte(strings.Repeat("A", 128)))
time.Sleep(time.Millisecond)
}

// Try to do a stream edit back to R=1 after doing all the upgrade.
info, _ := js.StreamInfo("TEST")
sconfig := info.Config
sconfig.Replicas = 1
_, err = js.UpdateStream(&sconfig)
require_NoError(t, err)

// Leave running for some time after the update.
time.Sleep(2 * time.Second)

info, _ = js.StreamInfo("TEST")
sconfig = info.Config
sconfig.Replicas = 3
_, err = js.UpdateStream(&sconfig)
require_NoError(t, err)

select {
case e := <-errCh:
t.Fatalf("Bad condition on raft node: %v", e)
case <-time.After(2 * time.Second):
// Done
}

// Stop goroutines and wait for them to exit.
cancel()
wg.Wait()
}
15 changes: 15 additions & 0 deletions server/jetstream_helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1521,6 +1521,21 @@ func (c *cluster) restartAll() {
c.waitOnClusterReady()
}

func (c *cluster) lameDuckRestartAll() {
c.t.Helper()
for i, s := range c.servers {
s.lameDuckMode()
s.WaitForShutdown()
if !s.Running() {
opts := c.opts[i]
s, o := RunServerWithConfig(opts.ConfigFile)
c.servers[i] = s
c.opts[i] = o
}
}
c.waitOnClusterReady()
}

func (c *cluster) restartAllSamePorts() {
c.t.Helper()
for i, s := range c.servers {
Expand Down
Loading

0 comments on commit 8b981a2

Please sign in to comment.