Skip to content

Add core coordination algorithm for cluster state publishing #32171

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 21 commits into from
Jul 20, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.elasticsearch.action.support.replication.ReplicationOperation;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.coordination.CoordinationStateRejectedException;
import org.elasticsearch.common.CheckedFunction;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ParseField;
Expand Down Expand Up @@ -1024,8 +1025,10 @@ private enum ElasticsearchExceptionHandle {
UNKNOWN_NAMED_OBJECT_EXCEPTION(org.elasticsearch.common.xcontent.UnknownNamedObjectException.class,
org.elasticsearch.common.xcontent.UnknownNamedObjectException::new, 148, Version.V_5_2_0),
TOO_MANY_BUCKETS_EXCEPTION(MultiBucketConsumerService.TooManyBucketsException.class,
MultiBucketConsumerService.TooManyBucketsException::new, 149,
Version.V_7_0_0_alpha1);
MultiBucketConsumerService.TooManyBucketsException::new, 149, Version.V_7_0_0_alpha1),
COORDINATION_STATE_REJECTED_EXCEPTION(CoordinationStateRejectedException.class,
CoordinationStateRejectedException::new, 150, Version.V_7_0_0_alpha1);


final Class<? extends ElasticsearchException> exceptionClass;
final CheckedFunction<StreamInput, ? extends ElasticsearchException, IOException> constructor;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.coordination;

import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;

import java.io.IOException;

/**
* A master node sends this request to its peers to inform them that it could commit the
* cluster state with the given term and version. Peers that have accepted the given cluster
* state will then consider it as committed and proceed to apply the state locally.
*/
public class ApplyCommitRequest extends TermVersionRequest {

public ApplyCommitRequest(DiscoveryNode sourceNode, long term, long version) {
super(sourceNode, term, version);
}

public ApplyCommitRequest(StreamInput in) throws IOException {
super(in);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
}

@Override
public String toString() {
return "ApplyCommitRequest{" +
"term=" + term +
", version=" + version +
'}';
}
}

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.cluster.coordination;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.io.stream.StreamInput;

import java.io.IOException;

/**
* This exception is thrown when rejecting state transitions on the {@link CoordinationState} object,
* for example when receiving a publish request with the wrong term or version.
* Occurrences of this exception don't always signal failures, but can often be just caused by the
* asynchronous, distributed nature of the system. They will, for example, naturally happen during
* leader election, if multiple nodes are trying to become leader at the same time.
*/
public class CoordinationStateRejectedException extends ElasticsearchException {
public CoordinationStateRejectedException(String msg, Object... args) {
super(msg, args);
}

public CoordinationStateRejectedException(StreamInput in) throws IOException {
super(in);
}
}
127 changes: 127 additions & 0 deletions server/src/main/java/org/elasticsearch/cluster/coordination/Join.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.coordination;

import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;

import java.io.IOException;

/**
* Triggered by a {@link StartJoinRequest}, instances of this class represent join votes,
* and have a source and target node. The source node is the node that provides the vote,
* and the target node is the node for which this vote is cast. A node will only cast
* a single vote per term, and this for a unique target node. The vote also carries
* information about the current state of the node that provided the vote, so that
* the receiver of the vote can determine if it has a more up-to-date state than the
* source node.
*/
public class Join implements Writeable {
private final DiscoveryNode sourceNode;
private final DiscoveryNode targetNode;
private final long term;
private final long lastAcceptedTerm;
private final long lastAcceptedVersion;

public Join(DiscoveryNode sourceNode, DiscoveryNode targetNode, long term, long lastAcceptedTerm, long lastAcceptedVersion) {
assert term >= 0;
assert lastAcceptedTerm >= 0;
assert lastAcceptedVersion >= 0;

this.sourceNode = sourceNode;
this.targetNode = targetNode;
this.term = term;
this.lastAcceptedTerm = lastAcceptedTerm;
this.lastAcceptedVersion = lastAcceptedVersion;
}

public Join(StreamInput in) throws IOException {
sourceNode = new DiscoveryNode(in);
targetNode = new DiscoveryNode(in);
term = in.readLong();
lastAcceptedTerm = in.readLong();
lastAcceptedVersion = in.readLong();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
sourceNode.writeTo(out);
targetNode.writeTo(out);
out.writeLong(term);
out.writeLong(lastAcceptedTerm);
out.writeLong(lastAcceptedVersion);
}

public DiscoveryNode getSourceNode() {
return sourceNode;
}

public DiscoveryNode getTargetNode() {
return targetNode;
}

public long getLastAcceptedVersion() {
return lastAcceptedVersion;
}

public long getTerm() {
return term;
}

public long getLastAcceptedTerm() {
return lastAcceptedTerm;
}

@Override
public String toString() {
return "Join{" +
"term=" + term +
", lastAcceptedTerm=" + lastAcceptedTerm +
", lastAcceptedVersion=" + lastAcceptedVersion +
", sourceNode=" + sourceNode +
", targetNode=" + targetNode +
'}';
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;

Join join = (Join) o;

if (sourceNode.equals(join.sourceNode) == false) return false;
if (targetNode.equals(join.targetNode) == false) return false;
if (lastAcceptedVersion != join.lastAcceptedVersion) return false;
if (term != join.term) return false;
return lastAcceptedTerm == join.lastAcceptedTerm;
}

@Override
public int hashCode() {
int result = (int) (lastAcceptedVersion ^ (lastAcceptedVersion >>> 32));
result = 31 * result + sourceNode.hashCode();
result = 31 * result + targetNode.hashCode();
result = 31 * result + (int) (term ^ (term >>> 32));
result = 31 * result + (int) (lastAcceptedTerm ^ (lastAcceptedTerm >>> 32));
return result;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.coordination;

import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.transport.TransportRequest;

import java.io.IOException;
import java.util.Objects;

/**
* Request which is used by the master node to publish cluster state changes.
*/
public class PublishRequest extends TransportRequest {

private final ClusterState acceptedState;

public PublishRequest(ClusterState acceptedState) {
this.acceptedState = acceptedState;
}

public PublishRequest(StreamInput in, DiscoveryNode localNode) throws IOException {
super(in);
acceptedState = ClusterState.readFrom(in, localNode);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
acceptedState.writeTo(out);
}

public ClusterState getAcceptedState() {
return acceptedState;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (!(o instanceof PublishRequest)) return false;

PublishRequest that = (PublishRequest) o;

return acceptedState.term() == that.acceptedState.term() &&
acceptedState.version() == that.acceptedState.version();
}

@Override
public int hashCode() {
return Objects.hash(acceptedState.term(), acceptedState.version());
}

@Override
public String toString() {
return "PublishRequest{term=" + acceptedState.term()
+ ", version=" + acceptedState.version()
+ ", state=" + acceptedState + '}';
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.coordination;

import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;

import java.io.IOException;

/**
* Response to a {@link PublishRequest}, carrying the term and version of the request.
*/
public class PublishResponse extends TermVersionResponse {

public PublishResponse(long term, long version) {
super(term, version);
}

public PublishResponse(StreamInput in) throws IOException {
super(in);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
}

@Override
public String toString() {
return "PublishResponse{" +
"term=" + term +
", version=" + version +
'}';
}
}
Loading