Skip to content

Commit

Permalink
[FIXED] Preferred stream leader responds without initialized RAFT node
Browse files Browse the repository at this point in the history
Signed-off-by: Maurice van Veen <github@mauricevanveen.com>
  • Loading branch information
MauriceVanVeen authored and neilalexander committed Feb 10, 2025
1 parent b78f461 commit b10f99b
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 5 deletions.
11 changes: 6 additions & 5 deletions server/jetstream_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -1873,13 +1873,14 @@ func (s *Server) jsStreamInfoRequest(sub *subscription, c *client, a *Account, s
if cc.meta != nil {
ourID = cc.meta.ID()
}
// We have seen cases where rg or rg.node is nil at this point,
// so check explicitly on those conditions and bail if that is
// the case.
bail := rg == nil || rg.node == nil || !rg.isMember(ourID)
// We have seen cases where rg is nil at this point,
// so check explicitly and bail if that is the case.
bail := rg == nil || !rg.isMember(ourID)
if !bail {
// We know we are a member here, if this group is new and we are preferred allow us to answer.
bail = rg.Preferred != ourID || time.Since(rg.node.Created()) > lostQuorumIntervalDefault
// Also, we have seen cases where rg.node is nil at this point,
// so check explicitly and bail if that is the case.
bail = rg.Preferred != ourID || (rg.node != nil && time.Since(rg.node.Created()) > lostQuorumIntervalDefault)
}
js.mu.RUnlock()
if bail {
Expand Down
34 changes: 34 additions & 0 deletions server/jetstream_cluster_3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1319,6 +1319,40 @@ func TestJetStreamClusterNoPanicOnStreamInfoWhenNoLeaderYet(t *testing.T) {
wg.Wait()
}

func TestJetStreamClusterNoTimeoutOnStreamInfoOnPreferredLeader(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Replicas: 3,
})
require_NoError(t, err)

_, err = js.StreamInfo("TEST")
require_NoError(t, err)

// Simulate the preferred stream leader to not have initialized the raft node yet.
sl := c.streamLeader(globalAccountName, "TEST")
acc, err := sl.lookupAccount(globalAccountName)
require_NoError(t, err)
mset, err := acc.lookupStream("TEST")
require_NoError(t, err)
sjs := sl.getJetStream()
rg := mset.raftGroup()
sjs.mu.Lock()
rg.node = nil
sjs.mu.Unlock()

// Should not time out on the stream info during this condition.
_, err = js.StreamInfo("TEST")
require_NoError(t, err)
}

// Issue https://github.com/nats-io/nats-server/issues/3630
func TestJetStreamClusterPullConsumerAcksExtendInactivityThreshold(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
Expand Down

0 comments on commit b10f99b

Please sign in to comment.