Skip to content

Commit

Permalink
Reduce contentions when checking group leaderless state, minor tweak …
Browse files Browse the repository at this point in the history
…in create consumer

Signed-off-by: Neil Twigg <neil@nats.io>
  • Loading branch information
neilalexander authored and wallyqs committed Jan 8, 2025
1 parent 1739300 commit 5812645
Showing 1 changed file with 11 additions and 6 deletions.
17 changes: 11 additions & 6 deletions server/jetstream_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -906,11 +906,12 @@ func (js *jetStream) isLeaderless() bool {
js.mu.RUnlock()
return false
}
meta := cc.meta
js.mu.RUnlock()

// If we don't have a leader.
// Make sure we have been running for enough time.
if cc.meta.GroupLeader() == _EMPTY_ && time.Since(cc.meta.Created()) > lostQuorumIntervalDefault {
if meta.GroupLeader() == _EMPTY_ && time.Since(meta.Created()) > lostQuorumIntervalDefault {
return true
}
return false
Expand All @@ -922,29 +923,32 @@ func (js *jetStream) isGroupLeaderless(rg *raftGroup) bool {
return false
}
js.mu.RLock()
defer js.mu.RUnlock()

cc := js.cluster
started := js.started

// If we are not a member we can not say..
if cc.meta == nil {
js.mu.RUnlock()
return false
}
if !rg.isMember(cc.meta.ID()) {
js.mu.RUnlock()
return false
}
// Single peer groups always have a leader if we are here.
if rg.node == nil {
js.mu.RUnlock()
return false
}
js.mu.RUnlock()
// If we don't have a leader.
if rg.node.GroupLeader() == _EMPTY_ {
// Threshold for jetstream startup.
const startupThreshold = 10 * time.Second

if rg.node.HadPreviousLeader() {
// Make sure we have been running long enough to intelligently determine this.
if time.Since(js.started) > startupThreshold {
if time.Since(started) > startupThreshold {
return true
}
}
Expand Down Expand Up @@ -4450,10 +4454,11 @@ func (js *jetStream) processClusterCreateConsumer(ca *consumerAssignment, state
} else {
// If we are clustered update the known peers.
js.mu.RLock()
if node := rg.node; node != nil {
node := rg.node
js.mu.RUnlock()
if node != nil {
node.UpdateKnownPeers(ca.Group.Peers)
}
js.mu.RUnlock()
}

// Check if we already have this consumer running.
Expand Down

0 comments on commit 5812645

Please sign in to comment.