Skip to content

Zen2: Deterministic MasterService #32493

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 18 commits into from
Aug 13, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

import org.elasticsearch.action.support.replication.ReplicationOperation;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.coordination.CoordinationStateRejectedException;
import org.elasticsearch.common.CheckedFunction;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ParseField;
Expand Down Expand Up @@ -1006,8 +1005,8 @@ private enum ElasticsearchExceptionHandle {
UNKNOWN_VERSION_ADDED),
TYPE_MISSING_EXCEPTION(org.elasticsearch.indices.TypeMissingException.class,
org.elasticsearch.indices.TypeMissingException::new, 137, UNKNOWN_VERSION_ADDED),
FAILED_TO_COMMIT_CLUSTER_STATE_EXCEPTION(org.elasticsearch.discovery.Discovery.FailedToCommitClusterStateException.class,
org.elasticsearch.discovery.Discovery.FailedToCommitClusterStateException::new, 140, UNKNOWN_VERSION_ADDED),
FAILED_TO_COMMIT_CLUSTER_STATE_EXCEPTION(org.elasticsearch.cluster.coordination.FailedToCommitClusterStateException.class,
org.elasticsearch.cluster.coordination.FailedToCommitClusterStateException::new, 140, UNKNOWN_VERSION_ADDED),
QUERY_SHARD_EXCEPTION(org.elasticsearch.index.query.QueryShardException.class,
org.elasticsearch.index.query.QueryShardException::new, 141, UNKNOWN_VERSION_ADDED),
NO_LONGER_PRIMARY_SHARD_EXCEPTION(ShardStateAction.NoLongerPrimaryShardException.class,
Expand All @@ -1026,8 +1025,8 @@ private enum ElasticsearchExceptionHandle {
org.elasticsearch.common.xcontent.UnknownNamedObjectException::new, 148, Version.V_5_2_0),
TOO_MANY_BUCKETS_EXCEPTION(MultiBucketConsumerService.TooManyBucketsException.class,
MultiBucketConsumerService.TooManyBucketsException::new, 149, Version.V_7_0_0_alpha1),
COORDINATION_STATE_REJECTED_EXCEPTION(CoordinationStateRejectedException.class,
CoordinationStateRejectedException::new, 150, Version.V_7_0_0_alpha1);
COORDINATION_STATE_REJECTED_EXCEPTION(org.elasticsearch.cluster.coordination.CoordinationStateRejectedException.class,
org.elasticsearch.cluster.coordination.CoordinationStateRejectedException::new, 150, Version.V_7_0_0_alpha1);


final Class<? extends ElasticsearchException> exceptionClass;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.cluster.coordination.FailedToCommitClusterStateException;
import org.elasticsearch.discovery.MasterNotDiscoveredException;
import org.elasticsearch.node.NodeClosedException;
import org.elasticsearch.tasks.Task;
Expand Down Expand Up @@ -174,7 +174,7 @@ public void onResponse(Response response) {

@Override
public void onFailure(Exception t) {
if (t instanceof Discovery.FailedToCommitClusterStateException
if (t instanceof FailedToCommitClusterStateException
|| (t instanceof NotMasterException)) {
logger.debug(() -> new ParameterizedMessage("master could not publish cluster state or stepped down before publishing action [{}], scheduling a retry", actionName), t);
retry(t, masterChangePredicate);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.cluster.coordination.FailedToCommitClusterStateException;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.node.NodeClosedException;
import org.elasticsearch.tasks.Task;
Expand Down Expand Up @@ -132,7 +132,7 @@ public void handleException(TransportException exp) {
private static Class[] MASTER_CHANNEL_EXCEPTIONS = new Class[]{
NotMasterException.class,
ConnectTransportException.class,
Discovery.FailedToCommitClusterStateException.class
FailedToCommitClusterStateException.class
};

private static boolean isMasterChannelException(TransportException exp) {
Expand Down Expand Up @@ -625,7 +625,7 @@ default void onSuccess() {
* are:
* - {@link NotMasterException}
* - {@link NodeDisconnectedException}
* - {@link Discovery.FailedToCommitClusterStateException}
* - {@link FailedToCommitClusterStateException}
*
* Any other exception is communicated to the requester via
* this notification.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.coordination;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.unit.TimeValue;

public interface ClusterStatePublisher {
/**
* Publish all the changes to the cluster from the master (can be called just by the master). The publish
* process should apply this state to the master as well!
*
* The publishListener allows to wait for the publication to complete, which can be either successful completion, timing out or failing.
* The method is guaranteed to pass back a {@link FailedToCommitClusterStateException} to the publishListener if the change is not
* committed and should be rejected. Any other exception signals that something bad happened but the change is committed.
*
* The {@link AckListener} allows to keep track of the ack received from nodes, and verify whether
* they updated their own cluster state or not.
*/
void publish(ClusterChangedEvent clusterChangedEvent, ActionListener<Void> publishListener, AckListener ackListener);

interface AckListener {
/**
* Should be called when the cluster coordination layer has committed the cluster state (i.e. even if this publication fails,
* it is guaranteed to appear in future publications).
* @param commitTime the time it took to commit the cluster state
*/
void onCommit(TimeValue commitTime);

/**
* Should be called whenever the cluster coordination layer receives confirmation from a node that it has successfully applied
* the cluster state. In case of failures, an exception should be provided as parameter.
* @param node the node
* @param e the optional exception
*/
void onNodeAck(DiscoveryNode node, @Nullable Exception e);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.coordination;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.cluster.coordination.ClusterStatePublisher;
import org.elasticsearch.common.io.stream.StreamInput;

import java.io.IOException;

/**
* Thrown when failing to publish a cluster state. See {@link ClusterStatePublisher} for more details.
*/
public class FailedToCommitClusterStateException extends ElasticsearchException {

public FailedToCommitClusterStateException(StreamInput in) throws IOException {
super(in);
}

public FailedToCommitClusterStateException(String msg, Object... args) {
super(msg, args);
}

public FailedToCommitClusterStateException(String msg, Throwable cause, Object... args) {
super(msg, cause, args);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,11 @@
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.coordination.ClusterStatePublisher.AckListener;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.discovery.Discovery.AckListener;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportResponse;

Expand Down Expand Up @@ -145,7 +144,7 @@ private void onPossibleCommitFailure() {
if (isPublishQuorum(possiblySuccessfulNodes) == false) {
logger.debug("onPossibleCommitFailure: non-failed nodes {} do not form a quorum, so {} cannot succeed",
possiblySuccessfulNodes, this);
Exception e = new Discovery.FailedToCommitClusterStateException("non-failed nodes do not form a quorum");
Exception e = new FailedToCommitClusterStateException("non-failed nodes do not form a quorum");
publicationTargets.stream().filter(PublicationTarget::isActive).forEach(pt -> pt.setFailed(e));
onPossibleCompletion();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,25 +59,31 @@ public class ClusterService extends AbstractLifecycleComponent {
private final OperationRouting operationRouting;

private final ClusterSettings clusterSettings;
private final Map<String, Supplier<ClusterState.Custom>> initialClusterStateCustoms;

public ClusterService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool,
Map<String, Supplier<ClusterState.Custom>> initialClusterStateCustoms) {
this(settings, clusterSettings, new MasterService(settings, threadPool),
new ClusterApplierService(settings, clusterSettings, threadPool,
() -> ClusterService.newClusterStateBuilder(settings, initialClusterStateCustoms)));
}

public ClusterService(Settings settings, ClusterSettings clusterSettings,
MasterService masterService, ClusterApplierService clusterApplierService) {
super(settings);
this.masterService = new MasterService(settings, threadPool);
this.masterService = masterService;
this.operationRouting = new OperationRouting(settings, clusterSettings);
this.clusterSettings = clusterSettings;
this.clusterName = ClusterName.CLUSTER_NAME_SETTING.get(settings);
this.clusterSettings.addSettingsUpdateConsumer(CLUSTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING,
this::setSlowTaskLoggingThreshold);
this.initialClusterStateCustoms = initialClusterStateCustoms;
this.clusterApplierService = new ClusterApplierService(settings, clusterSettings, threadPool, this::newClusterStateBuilder);
this.clusterApplierService = clusterApplierService;
}

/**
* Creates a new cluster state builder that is initialized with the cluster name and all initial cluster state customs.
*/
public ClusterState.Builder newClusterStateBuilder() {
private static ClusterState.Builder newClusterStateBuilder(Settings settings,
Map<String, Supplier<ClusterState.Custom>> initialClusterStateCustoms) {
ClusterState.Builder builder = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.get(settings));
for (Map.Entry<String, Supplier<ClusterState.Custom>> entry : initialClusterStateCustoms.entrySet()) {
builder.putCustom(entry.getKey(), entry.getValue().get());
Expand Down
Loading