Skip to content

[Zen2] Introduce PreVoteCollector #32847

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 21 commits into from
Aug 20, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,12 @@ public boolean electionWon() {
}

public boolean isElectionQuorum(VoteCollection votes) {
return votes.isQuorum(getLastCommittedConfiguration()) && votes.isQuorum(getLastAcceptedConfiguration());
return isElectionQuorum(votes, getLastAcceptedState());
}

static boolean isElectionQuorum(VoteCollection votes, ClusterState lastAcceptedState) {
return votes.isQuorum(lastAcceptedState.getLastCommittedConfiguration())
&& votes.isQuorum(lastAcceptedState.getLastAcceptedConfiguration());
}

public boolean isPublishQuorum(VoteCollection votes) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.cluster.coordination;

import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.coordination.CoordinationState.VoteCollection;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool.Names;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportResponseHandler;
import org.elasticsearch.transport.TransportService;

import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.elasticsearch.cluster.coordination.CoordinationState.isElectionQuorum;
import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.newConcurrentSet;

public class PreVoteCollector extends AbstractComponent {

public static final String REQUEST_PRE_VOTE_ACTION_NAME = "internal:cluster/request_pre_vote";

private final TransportService transportService;
private final Runnable startElection;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure yet if it's sufficient to have a unique runnable for all voting rounds or whether we want specific ones, different for each voting round. We can check these though once we integrate these things.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will become clear later...


// Tuple for simple atomic updates
private volatile Tuple<DiscoveryNode, PreVoteResponse> state; // DiscoveryNode component is null if there is currently no known leader

PreVoteCollector(final Settings settings, final PreVoteResponse preVoteResponse,
final TransportService transportService, final Runnable startElection) {
super(settings);
state = new Tuple<>(null, preVoteResponse);
this.transportService = transportService;
this.startElection = startElection;

// TODO does this need to be on the generic threadpool or can it use SAME?
transportService.registerRequestHandler(REQUEST_PRE_VOTE_ACTION_NAME, Names.GENERIC, false, false,
PreVoteRequest::new,
(request, channel, task) -> channel.sendResponse(handlePreVoteRequest(request)));
}

/**
* Start a new pre-voting round.
*
* @param clusterState the last-accepted cluster state
* @param broadcastNodes the nodes from whom to request pre-votes
* @return the pre-voting round, which can be closed to end the round early.
*/
public Releasable start(final ClusterState clusterState, final Iterable<DiscoveryNode> broadcastNodes) {
PreVotingRound preVotingRound = new PreVotingRound(clusterState, state.v2().getCurrentTerm());
preVotingRound.start(broadcastNodes);
return preVotingRound;
}

public void update(final PreVoteResponse preVoteResponse, final DiscoveryNode leader) {
logger.trace("updating with preVoteResponse={}, leader={}", preVoteResponse, leader);
state = new Tuple<>(leader, preVoteResponse);
}

private PreVoteResponse handlePreVoteRequest(final PreVoteRequest request) {
// TODO if we are a leader and the max term seen exceeds our term then we need to bump our term

Tuple<DiscoveryNode, PreVoteResponse> state = this.state;
final DiscoveryNode leader = state.v1();

if (leader == null) {
return state.v2();
}

if (leader.equals(request.getSourceNode())) {
// This is a _rare_ case where our leader has detected a failure and stepped down, but we are still a follower. It's possible
// that the leader lost its quorum, but while we're still a follower we will not offer joins to any other node so there is no
// major drawback in offering a join to our old leader. The advantage of this is that it makes it slightly more likely that the
// leader won't change, and also that its re-election will happen more quickly than if it had to wait for a quorum of followers
// to also detect its failure.
return state.v2();
}

throw new CoordinationStateRejectedException("rejecting " + request + " as there is already a leader");
}

@Override
public String toString() {
return "PreVoteCollector{" +
"state=" + state +
'}';
}

private class PreVotingRound implements Releasable {
private final Set<DiscoveryNode> preVotesReceived = newConcurrentSet();
private final AtomicBoolean electionStarted = new AtomicBoolean();
private final PreVoteRequest preVoteRequest;
private final ClusterState clusterState;
private final AtomicBoolean isClosed = new AtomicBoolean();

PreVotingRound(final ClusterState clusterState, final long currentTerm) {
this.clusterState = clusterState;
preVoteRequest = new PreVoteRequest(transportService.getLocalNode(), currentTerm);
}

void start(final Iterable<DiscoveryNode> broadcastNodes) {
logger.debug("{} requesting pre-votes from {}", this, broadcastNodes);
broadcastNodes.forEach(n -> transportService.sendRequest(n, REQUEST_PRE_VOTE_ACTION_NAME, preVoteRequest,
new TransportResponseHandler<PreVoteResponse>() {
@Override
public void handleResponse(PreVoteResponse response) {
handlePreVoteResponse(response, n);
}

@Override
public void handleException(TransportException exp) {
if (exp.getRootCause() instanceof CoordinationStateRejectedException) {
logger.debug("{} failed: {}", this, exp.getRootCause().getMessage());
} else {
logger.debug(new ParameterizedMessage("{} failed", this), exp);
}
}

@Override
public String executor() {
return Names.GENERIC;
}

@Override
public String toString() {
return "TransportResponseHandler{" + PreVoteCollector.this + ", node=" + n + '}';
}
}));
}

private void handlePreVoteResponse(final PreVoteResponse response, final DiscoveryNode sender) {
if (isClosed.get()) {
logger.debug("{} is closed, ignoring {} from {}", this, response, sender);
return;
}

// TODO the response carries the sender's current term. If an election starts then it should be in a higher term.

if (response.getLastAcceptedTerm() > clusterState.term()
|| (response.getLastAcceptedTerm() == clusterState.term()
&& response.getLastAcceptedVersion() > clusterState.version())) {
logger.debug("{} ignoring {} from {} as it is fresher", this, response, sender);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why debug instead of trace logging everywhere?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would prefer to keep trace logging for things that are expected to happen in a stable cluster, and anything unexpected to appear at debug or higher. In a stable cluster, any election activity is unexpected.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok makes sense

return;
}

preVotesReceived.add(sender);
final VoteCollection voteCollection = new VoteCollection();
preVotesReceived.forEach(voteCollection::addVote);

if (isElectionQuorum(voteCollection, clusterState) == false) {
logger.debug("{} added {} from {}, no quorum yet", this, response, sender);
return;
}

if (electionStarted.compareAndSet(false, true) == false) {
logger.debug("{} added {} from {} but election has already started", this, response, sender);
return;
}

logger.debug("{} added {} from {}, starting election", this, response, sender);
startElection.run();
}

@Override
public String toString() {
return "PreVotingRound{" +
"preVotesReceived=" + preVotesReceived +
", electionStarted=" + electionStarted +
", preVoteRequest=" + preVoteRequest +
", isClosed=" + isClosed +
'}';
}

@Override
public void close() {
final boolean isNotAlreadyClosed = isClosed.compareAndSet(false, true);
assert isNotAlreadyClosed;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.cluster.coordination;

import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.transport.TransportRequest;

import java.io.IOException;
import java.util.Objects;

public class PreVoteRequest extends TransportRequest {

private final DiscoveryNode sourceNode;
private final long currentTerm;

public PreVoteRequest(DiscoveryNode sourceNode, long currentTerm) {
this.sourceNode = sourceNode;
this.currentTerm = currentTerm;
}

public PreVoteRequest(StreamInput in) throws IOException {
super(in);
sourceNode = new DiscoveryNode(in);
currentTerm = in.readLong();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
sourceNode.writeTo(out);
out.writeLong(currentTerm);
}

public DiscoveryNode getSourceNode() {
return sourceNode;
}

public long getCurrentTerm() {
return currentTerm;
}

@Override
public String toString() {
return "PreVoteRequest{" +
"sourceNode=" + sourceNode +
", currentTerm=" + currentTerm +
'}';
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
PreVoteRequest that = (PreVoteRequest) o;
return currentTerm == that.currentTerm &&
Objects.equals(sourceNode, that.sourceNode);
}

@Override
public int hashCode() {
return Objects.hash(sourceNode, currentTerm);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.cluster.coordination;

import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.transport.TransportResponse;

import java.io.IOException;
import java.util.Objects;

public class PreVoteResponse extends TransportResponse {
private final long currentTerm;
private final long lastAcceptedTerm;
private final long lastAcceptedVersion;

public PreVoteResponse(long currentTerm, long lastAcceptedTerm, long lastAcceptedVersion) {
this.currentTerm = currentTerm;
this.lastAcceptedTerm = lastAcceptedTerm;
this.lastAcceptedVersion = lastAcceptedVersion;
assert lastAcceptedTerm <= currentTerm : currentTerm + " < " + lastAcceptedTerm;
}

public PreVoteResponse(StreamInput in) throws IOException {
currentTerm = in.readLong();
lastAcceptedTerm = in.readLong();
lastAcceptedVersion = in.readLong();
assert lastAcceptedTerm <= currentTerm : currentTerm + " < " + lastAcceptedTerm;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeLong(currentTerm);
out.writeLong(lastAcceptedTerm);
out.writeLong(lastAcceptedVersion);
}

public long getCurrentTerm() {
return currentTerm;
}

public long getLastAcceptedTerm() {
return lastAcceptedTerm;
}

public long getLastAcceptedVersion() {
return lastAcceptedVersion;
}

@Override
public String toString() {
return "PreVoteResponse{" +
"currentTerm=" + currentTerm +
", lastAcceptedTerm=" + lastAcceptedTerm +
", lastAcceptedVersion=" + lastAcceptedVersion +
'}';
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
PreVoteResponse that = (PreVoteResponse) o;
return currentTerm == that.currentTerm &&
lastAcceptedTerm == that.lastAcceptedTerm &&
lastAcceptedVersion == that.lastAcceptedVersion;
}

@Override
public int hashCode() {
return Objects.hash(currentTerm, lastAcceptedTerm, lastAcceptedVersion);
}
}
Loading