Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Do not limit expansion of new peers #2119

Merged
merged 2 commits into from
Apr 15, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions server/jetstream_cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5519,7 +5519,7 @@ func TestJetStreamClusterMixedMode(t *testing.T) {
}

func TestJetStreamClusterLeafnodeSpokes(t *testing.T) {
c := createJetStreamClusterExplicit(t, "HUB", 3)
c := createJetStreamCluster(t, jsClusterTempl, "HUB", _EMPTY_, 3, 22020, false)
defer c.shutdown()

lnc1 := c.createLeafNodesWithStartPort("R1", 3, 22110)
Expand Down Expand Up @@ -6367,7 +6367,7 @@ func (c *cluster) waitOnPeerCount(n int) {
c.t.Helper()
c.waitOnLeader()
leader := c.leader()
expires := time.Now().Add(20 * time.Second)
expires := time.Now().Add(10 * time.Second)
for time.Now().Before(expires) {
peers := leader.JetStreamClusterPeers()
if len(peers) == n {
Expand Down
63 changes: 39 additions & 24 deletions server/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ type raft struct {
csz int
qn int
peers map[string]*lps
removed map[string]string
acks map[uint64]map[string]struct{}
pae map[uint64]*appendEntry
elect *time.Timer
Expand Down Expand Up @@ -238,7 +239,6 @@ var (
errNotLeader = errors.New("raft: not leader")
errAlreadyLeader = errors.New("raft: already leader")
errNilCfg = errors.New("raft: no config given")
errUnknownPeer = errors.New("raft: unknown peer")
errCorruptPeers = errors.New("raft: corrupt peer state")
errStepdownFailed = errors.New("raft: stepdown failed")
errEntryLoadFailed = errors.New("raft: could not load entry from WAL")
Expand Down Expand Up @@ -660,7 +660,7 @@ func (n *raft) ProposeAddPeer(peer string) error {
func (n *raft) ProposeRemovePeer(peer string) error {
n.RLock()
propc, subj := n.propc, n.rpsubj
isUs, isLeader := peer == n.id, n.state == Leader
isLeader, isUs := n.state == Leader, n.id == peer
werr := n.werr
n.RUnlock()

Expand All @@ -670,14 +670,13 @@ func (n *raft) ProposeRemovePeer(peer string) error {
}

if isLeader {
if isUs {
n.StepDown()
} else {
select {
case propc <- &Entry{EntryRemovePeer, []byte(peer)}:
default:
return errProposalFailed
select {
case propc <- &Entry{EntryRemovePeer, []byte(peer)}:
if isUs {
n.attemptStepDown(noLeader)
}
default:
return errProposalFailed
}
return nil
}
Expand Down Expand Up @@ -2065,6 +2064,12 @@ func (n *raft) applyCommit(index uint64) error {
case EntryAddPeer:
newPeer := string(e.Data)
n.debug("Added peer %q", newPeer)

// If we were on the removed list reverse that here.
if n.removed != nil {
delete(n.removed, newPeer)
}

if _, ok := n.peers[newPeer]; !ok {
// We are not tracking this one automatically so we need to bump cluster size.
n.peers[newPeer] = &lps{time.Now().UnixNano(), 0}
Expand All @@ -2076,19 +2081,25 @@ func (n *raft) applyCommit(index uint64) error {
}
n.writePeerState(&peerState{n.peerNames(), n.csz})
case EntryRemovePeer:
oldPeer := string(e.Data)
n.debug("Removing peer %q", oldPeer)
peer := string(e.Data)
n.debug("Removing peer %q", peer)

// FIXME(dlc) - Check if this is us??
if _, ok := n.peers[oldPeer]; ok {
// Make sure we have our removed map.
if n.removed == nil {
n.removed = make(map[string]string)
}
n.removed[peer] = peer

if _, ok := n.peers[peer]; ok {
// We should decrease our cluster size since we are tracking this peer.
delete(n.peers, oldPeer)
delete(n.peers, peer)
if n.csz != len(n.peers) {
n.debug("Decreasing our clustersize: %d -> %d", n.csz, len(n.peers))
n.csz = len(n.peers)
n.qn = n.csz/2 + 1
}
}

n.writePeerState(&peerState{n.peerNames(), n.csz})
// We pass these up as well.
committed = append(committed, e)
Expand Down Expand Up @@ -2164,21 +2175,21 @@ func (n *raft) trackResponse(ar *appendEntryResponse) {
// Track interactions with this peer.
func (n *raft) trackPeer(peer string) error {
n.Lock()
var needPeerUpdate bool
var needPeerUpdate, isRemoved bool
if n.removed != nil {
_, isRemoved = n.removed[peer]
}
if n.state == Leader {
if _, ok := n.peers[peer]; !ok {
// This is someone new, if we have registered all of the peers already
// this is an error.
if len(n.peers) >= n.csz {
n.Unlock()
return errUnknownPeer
// Check if this peer had been removed previously.
if !isRemoved {
needPeerUpdate = true
}
needPeerUpdate = true
}
}
if ps := n.peers[peer]; ps != nil {
ps.ts = time.Now().UnixNano()
} else {
} else if !isRemoved {
n.peers[peer] = &lps{time.Now().UnixNano(), 0}
}
n.Unlock()
Expand Down Expand Up @@ -3113,11 +3124,15 @@ func (n *raft) requestVote() {
}

func (n *raft) sendRPC(subject, reply string, msg []byte) {
n.sq.send(subject, reply, nil, msg)
if n.sq != nil {
n.sq.send(subject, reply, nil, msg)
}
}

func (n *raft) sendReply(subject string, msg []byte) {
n.sq.send(subject, _EMPTY_, nil, msg)
if n.sq != nil {
n.sq.send(subject, _EMPTY_, nil, msg)
}
}

func (n *raft) wonElection(votes int) bool {
Expand Down