From b10f99ba3b9e5642cc4673cbaee834152bb6e883 Mon Sep 17 00:00:00 2001 From: Maurice van Veen Date: Sun, 9 Feb 2025 18:09:44 +0100 Subject: [PATCH] [FIXED] Preferred stream leader responds without initialized RAFT node Signed-off-by: Maurice van Veen --- server/jetstream_api.go | 11 +++++----- server/jetstream_cluster_3_test.go | 34 ++++++++++++++++++++++++++++++ 2 files changed, 40 insertions(+), 5 deletions(-) diff --git a/server/jetstream_api.go b/server/jetstream_api.go index 1b82aa4d60..4edc99bbd3 100644 --- a/server/jetstream_api.go +++ b/server/jetstream_api.go @@ -1873,13 +1873,14 @@ func (s *Server) jsStreamInfoRequest(sub *subscription, c *client, a *Account, s if cc.meta != nil { ourID = cc.meta.ID() } - // We have seen cases where rg or rg.node is nil at this point, - // so check explicitly on those conditions and bail if that is - // the case. - bail := rg == nil || rg.node == nil || !rg.isMember(ourID) + // We have seen cases where rg is nil at this point, + // so check explicitly and bail if that is the case. + bail := rg == nil || !rg.isMember(ourID) if !bail { // We know we are a member here, if this group is new and we are preferred allow us to answer. - bail = rg.Preferred != ourID || time.Since(rg.node.Created()) > lostQuorumIntervalDefault + // Also, we have seen cases where rg.node is nil at this point, + // so check explicitly and bail if that is the case. + bail = rg.Preferred != ourID || (rg.node != nil && time.Since(rg.node.Created()) > lostQuorumIntervalDefault) } js.mu.RUnlock() if bail { diff --git a/server/jetstream_cluster_3_test.go b/server/jetstream_cluster_3_test.go index d2aed87836..13ca180bb0 100644 --- a/server/jetstream_cluster_3_test.go +++ b/server/jetstream_cluster_3_test.go @@ -1319,6 +1319,40 @@ func TestJetStreamClusterNoPanicOnStreamInfoWhenNoLeaderYet(t *testing.T) { wg.Wait() } +func TestJetStreamClusterNoTimeoutOnStreamInfoOnPreferredLeader(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + Replicas: 3, + }) + require_NoError(t, err) + + _, err = js.StreamInfo("TEST") + require_NoError(t, err) + + // Simulate the preferred stream leader to not have initialized the raft node yet. + sl := c.streamLeader(globalAccountName, "TEST") + acc, err := sl.lookupAccount(globalAccountName) + require_NoError(t, err) + mset, err := acc.lookupStream("TEST") + require_NoError(t, err) + sjs := sl.getJetStream() + rg := mset.raftGroup() + sjs.mu.Lock() + rg.node = nil + sjs.mu.Unlock() + + // Should not time out on the stream info during this condition. + _, err = js.StreamInfo("TEST") + require_NoError(t, err) +} + // Issue https://github.com/nats-io/nats-server/issues/3630 func TestJetStreamClusterPullConsumerAcksExtendInactivityThreshold(t *testing.T) { c := createJetStreamClusterExplicit(t, "R3S", 3)