-
Notifications
You must be signed in to change notification settings - Fork 25.3k
[Zen2] Introduce PreVoteCollector #32847
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
84ee0fd
3e04a7f
470fd62
8047028
4eb932d
87f3da6
d252130
e893b81
431ea65
0395ebf
3b37b40
68c4b84
63b221c
232b4e9
4f61eff
4ec40e1
7858c6e
5e5965f
b6e1a02
156ddd2
825a031
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,202 @@ | ||
/* | ||
* Licensed to Elasticsearch under one or more contributor | ||
* license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright | ||
* ownership. Elasticsearch licenses this file to you under | ||
* the Apache License, Version 2.0 (the "License"); you may | ||
* not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
|
||
package org.elasticsearch.cluster.coordination; | ||
|
||
import org.apache.logging.log4j.message.ParameterizedMessage; | ||
import org.elasticsearch.cluster.ClusterState; | ||
import org.elasticsearch.cluster.coordination.CoordinationState.VoteCollection; | ||
import org.elasticsearch.cluster.node.DiscoveryNode; | ||
import org.elasticsearch.common.collect.Tuple; | ||
import org.elasticsearch.common.component.AbstractComponent; | ||
import org.elasticsearch.common.lease.Releasable; | ||
import org.elasticsearch.common.settings.Settings; | ||
import org.elasticsearch.threadpool.ThreadPool.Names; | ||
import org.elasticsearch.transport.TransportException; | ||
import org.elasticsearch.transport.TransportResponseHandler; | ||
import org.elasticsearch.transport.TransportService; | ||
|
||
import java.util.Set; | ||
import java.util.concurrent.atomic.AtomicBoolean; | ||
|
||
import static org.elasticsearch.cluster.coordination.CoordinationState.isElectionQuorum; | ||
import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.newConcurrentSet; | ||
|
||
public class PreVoteCollector extends AbstractComponent { | ||
|
||
public static final String REQUEST_PRE_VOTE_ACTION_NAME = "internal:cluster/request_pre_vote"; | ||
|
||
private final TransportService transportService; | ||
private final Runnable startElection; | ||
|
||
// Tuple for simple atomic updates | ||
private volatile Tuple<DiscoveryNode, PreVoteResponse> state; // DiscoveryNode component is null if there is currently no known leader | ||
|
||
PreVoteCollector(final Settings settings, final PreVoteResponse preVoteResponse, | ||
final TransportService transportService, final Runnable startElection) { | ||
super(settings); | ||
state = new Tuple<>(null, preVoteResponse); | ||
this.transportService = transportService; | ||
this.startElection = startElection; | ||
|
||
// TODO does this need to be on the generic threadpool or can it use SAME? | ||
transportService.registerRequestHandler(REQUEST_PRE_VOTE_ACTION_NAME, Names.GENERIC, false, false, | ||
PreVoteRequest::new, | ||
(request, channel, task) -> channel.sendResponse(handlePreVoteRequest(request))); | ||
} | ||
|
||
/** | ||
* Start a new pre-voting round. | ||
* | ||
* @param clusterState the last-accepted cluster state | ||
* @param broadcastNodes the nodes from whom to request pre-votes | ||
* @return the pre-voting round, which can be closed to end the round early. | ||
*/ | ||
public Releasable start(final ClusterState clusterState, final Iterable<DiscoveryNode> broadcastNodes) { | ||
PreVotingRound preVotingRound = new PreVotingRound(clusterState, state.v2().getCurrentTerm()); | ||
preVotingRound.start(broadcastNodes); | ||
return preVotingRound; | ||
} | ||
|
||
public void update(final PreVoteResponse preVoteResponse, final DiscoveryNode leader) { | ||
logger.trace("updating with preVoteResponse={}, leader={}", preVoteResponse, leader); | ||
state = new Tuple<>(leader, preVoteResponse); | ||
} | ||
|
||
private PreVoteResponse handlePreVoteRequest(final PreVoteRequest request) { | ||
// TODO if we are a leader and the max term seen exceeds our term then we need to bump our term | ||
|
||
Tuple<DiscoveryNode, PreVoteResponse> state = this.state; | ||
final DiscoveryNode leader = state.v1(); | ||
|
||
if (leader == null) { | ||
return state.v2(); | ||
} | ||
|
||
if (leader.equals(request.getSourceNode())) { | ||
// This is a _rare_ case where our leader has detected a failure and stepped down, but we are still a follower. It's possible | ||
// that the leader lost its quorum, but while we're still a follower we will not offer joins to any other node so there is no | ||
// major drawback in offering a join to our old leader. The advantage of this is that it makes it slightly more likely that the | ||
// leader won't change, and also that its re-election will happen more quickly than if it had to wait for a quorum of followers | ||
// to also detect its failure. | ||
return state.v2(); | ||
} | ||
|
||
throw new CoordinationStateRejectedException("rejecting " + request + " as there is already a leader"); | ||
} | ||
|
||
@Override | ||
public String toString() { | ||
return "PreVoteCollector{" + | ||
"state=" + state + | ||
'}'; | ||
} | ||
|
||
private class PreVotingRound implements Releasable { | ||
private final Set<DiscoveryNode> preVotesReceived = newConcurrentSet(); | ||
private final AtomicBoolean electionStarted = new AtomicBoolean(); | ||
private final PreVoteRequest preVoteRequest; | ||
private final ClusterState clusterState; | ||
private final AtomicBoolean isClosed = new AtomicBoolean(); | ||
|
||
PreVotingRound(final ClusterState clusterState, final long currentTerm) { | ||
this.clusterState = clusterState; | ||
preVoteRequest = new PreVoteRequest(transportService.getLocalNode(), currentTerm); | ||
} | ||
|
||
void start(final Iterable<DiscoveryNode> broadcastNodes) { | ||
logger.debug("{} requesting pre-votes from {}", this, broadcastNodes); | ||
broadcastNodes.forEach(n -> transportService.sendRequest(n, REQUEST_PRE_VOTE_ACTION_NAME, preVoteRequest, | ||
new TransportResponseHandler<PreVoteResponse>() { | ||
@Override | ||
public void handleResponse(PreVoteResponse response) { | ||
handlePreVoteResponse(response, n); | ||
} | ||
|
||
@Override | ||
public void handleException(TransportException exp) { | ||
if (exp.getRootCause() instanceof CoordinationStateRejectedException) { | ||
logger.debug("{} failed: {}", this, exp.getRootCause().getMessage()); | ||
} else { | ||
logger.debug(new ParameterizedMessage("{} failed", this), exp); | ||
} | ||
} | ||
|
||
@Override | ||
public String executor() { | ||
return Names.GENERIC; | ||
} | ||
|
||
@Override | ||
public String toString() { | ||
return "TransportResponseHandler{" + PreVoteCollector.this + ", node=" + n + '}'; | ||
} | ||
})); | ||
} | ||
|
||
private void handlePreVoteResponse(final PreVoteResponse response, final DiscoveryNode sender) { | ||
if (isClosed.get()) { | ||
logger.debug("{} is closed, ignoring {} from {}", this, response, sender); | ||
return; | ||
} | ||
|
||
// TODO the response carries the sender's current term. If an election starts then it should be in a higher term. | ||
|
||
if (response.getLastAcceptedTerm() > clusterState.term() | ||
|| (response.getLastAcceptedTerm() == clusterState.term() | ||
&& response.getLastAcceptedVersion() > clusterState.version())) { | ||
logger.debug("{} ignoring {} from {} as it is fresher", this, response, sender); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why debug instead of trace logging everywhere? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would prefer to keep trace logging for things that are expected to happen in a stable cluster, and anything unexpected to appear at debug or higher. In a stable cluster, any election activity is unexpected. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ok makes sense |
||
return; | ||
} | ||
|
||
preVotesReceived.add(sender); | ||
final VoteCollection voteCollection = new VoteCollection(); | ||
preVotesReceived.forEach(voteCollection::addVote); | ||
|
||
if (isElectionQuorum(voteCollection, clusterState) == false) { | ||
logger.debug("{} added {} from {}, no quorum yet", this, response, sender); | ||
return; | ||
} | ||
|
||
if (electionStarted.compareAndSet(false, true) == false) { | ||
logger.debug("{} added {} from {} but election has already started", this, response, sender); | ||
return; | ||
} | ||
|
||
logger.debug("{} added {} from {}, starting election", this, response, sender); | ||
startElection.run(); | ||
} | ||
|
||
@Override | ||
public String toString() { | ||
return "PreVotingRound{" + | ||
"preVotesReceived=" + preVotesReceived + | ||
", electionStarted=" + electionStarted + | ||
", preVoteRequest=" + preVoteRequest + | ||
", isClosed=" + isClosed + | ||
'}'; | ||
} | ||
|
||
@Override | ||
public void close() { | ||
final boolean isNotAlreadyClosed = isClosed.compareAndSet(false, true); | ||
assert isNotAlreadyClosed; | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,82 @@ | ||
/* | ||
* Licensed to Elasticsearch under one or more contributor | ||
* license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright | ||
* ownership. Elasticsearch licenses this file to you under | ||
* the Apache License, Version 2.0 (the "License"); you may | ||
* not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
|
||
package org.elasticsearch.cluster.coordination; | ||
|
||
import org.elasticsearch.cluster.node.DiscoveryNode; | ||
import org.elasticsearch.common.io.stream.StreamInput; | ||
import org.elasticsearch.common.io.stream.StreamOutput; | ||
import org.elasticsearch.transport.TransportRequest; | ||
|
||
import java.io.IOException; | ||
import java.util.Objects; | ||
|
||
public class PreVoteRequest extends TransportRequest { | ||
|
||
private final DiscoveryNode sourceNode; | ||
private final long currentTerm; | ||
|
||
public PreVoteRequest(DiscoveryNode sourceNode, long currentTerm) { | ||
this.sourceNode = sourceNode; | ||
this.currentTerm = currentTerm; | ||
} | ||
|
||
public PreVoteRequest(StreamInput in) throws IOException { | ||
super(in); | ||
sourceNode = new DiscoveryNode(in); | ||
currentTerm = in.readLong(); | ||
} | ||
|
||
@Override | ||
public void writeTo(StreamOutput out) throws IOException { | ||
super.writeTo(out); | ||
sourceNode.writeTo(out); | ||
out.writeLong(currentTerm); | ||
} | ||
|
||
public DiscoveryNode getSourceNode() { | ||
return sourceNode; | ||
} | ||
|
||
public long getCurrentTerm() { | ||
return currentTerm; | ||
} | ||
|
||
@Override | ||
public String toString() { | ||
return "PreVoteRequest{" + | ||
"sourceNode=" + sourceNode + | ||
", currentTerm=" + currentTerm + | ||
'}'; | ||
} | ||
|
||
@Override | ||
public boolean equals(Object o) { | ||
if (this == o) return true; | ||
if (o == null || getClass() != o.getClass()) return false; | ||
PreVoteRequest that = (PreVoteRequest) o; | ||
return currentTerm == that.currentTerm && | ||
Objects.equals(sourceNode, that.sourceNode); | ||
} | ||
|
||
@Override | ||
public int hashCode() { | ||
return Objects.hash(sourceNode, currentTerm); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,91 @@ | ||
/* | ||
* Licensed to Elasticsearch under one or more contributor | ||
* license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright | ||
* ownership. Elasticsearch licenses this file to you under | ||
* the Apache License, Version 2.0 (the "License"); you may | ||
* not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
|
||
package org.elasticsearch.cluster.coordination; | ||
|
||
import org.elasticsearch.common.io.stream.StreamInput; | ||
import org.elasticsearch.common.io.stream.StreamOutput; | ||
import org.elasticsearch.transport.TransportResponse; | ||
|
||
import java.io.IOException; | ||
import java.util.Objects; | ||
|
||
public class PreVoteResponse extends TransportResponse { | ||
private final long currentTerm; | ||
private final long lastAcceptedTerm; | ||
private final long lastAcceptedVersion; | ||
|
||
public PreVoteResponse(long currentTerm, long lastAcceptedTerm, long lastAcceptedVersion) { | ||
this.currentTerm = currentTerm; | ||
this.lastAcceptedTerm = lastAcceptedTerm; | ||
this.lastAcceptedVersion = lastAcceptedVersion; | ||
assert lastAcceptedTerm <= currentTerm : currentTerm + " < " + lastAcceptedTerm; | ||
} | ||
|
||
public PreVoteResponse(StreamInput in) throws IOException { | ||
currentTerm = in.readLong(); | ||
lastAcceptedTerm = in.readLong(); | ||
lastAcceptedVersion = in.readLong(); | ||
assert lastAcceptedTerm <= currentTerm : currentTerm + " < " + lastAcceptedTerm; | ||
} | ||
|
||
@Override | ||
public void writeTo(StreamOutput out) throws IOException { | ||
super.writeTo(out); | ||
out.writeLong(currentTerm); | ||
out.writeLong(lastAcceptedTerm); | ||
out.writeLong(lastAcceptedVersion); | ||
} | ||
|
||
public long getCurrentTerm() { | ||
return currentTerm; | ||
} | ||
|
||
public long getLastAcceptedTerm() { | ||
return lastAcceptedTerm; | ||
} | ||
|
||
public long getLastAcceptedVersion() { | ||
return lastAcceptedVersion; | ||
} | ||
|
||
@Override | ||
public String toString() { | ||
return "PreVoteResponse{" + | ||
"currentTerm=" + currentTerm + | ||
", lastAcceptedTerm=" + lastAcceptedTerm + | ||
", lastAcceptedVersion=" + lastAcceptedVersion + | ||
'}'; | ||
} | ||
|
||
@Override | ||
public boolean equals(Object o) { | ||
if (this == o) return true; | ||
if (o == null || getClass() != o.getClass()) return false; | ||
PreVoteResponse that = (PreVoteResponse) o; | ||
return currentTerm == that.currentTerm && | ||
lastAcceptedTerm == that.lastAcceptedTerm && | ||
lastAcceptedVersion == that.lastAcceptedVersion; | ||
} | ||
|
||
@Override | ||
public int hashCode() { | ||
return Objects.hash(currentTerm, lastAcceptedTerm, lastAcceptedVersion); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure yet if it's sufficient to have a unique runnable for all voting rounds or whether we want specific ones, different for each voting round. We can check these though once we integrate these things.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It will become clear later...