Skip to content

Commit

Permalink
Atomically update cluster state with decommission status and correspo…
Browse files Browse the repository at this point in the history
…nding action (#5093)

* Atomically update the cluster state with decommission status and its corresponding action in the same execute call

Signed-off-by: Rishab Nahata <rnnahata@amazon.com>
  • Loading branch information
imRishN authored and ryanbogan committed Dec 14, 2022
1 parent e64d065 commit d9fb1b0
Show file tree
Hide file tree
Showing 13 changed files with 668 additions and 495 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.opensearch.cluster.ClusterStateObserver;
import org.opensearch.cluster.decommission.DecommissionAttribute;
import org.opensearch.cluster.decommission.DecommissionAttributeMetadata;
import org.opensearch.cluster.decommission.DecommissionService;
import org.opensearch.cluster.decommission.DecommissionStatus;
import org.opensearch.cluster.decommission.DecommissioningFailedException;
import org.opensearch.cluster.decommission.NodeDecommissionedException;
Expand Down Expand Up @@ -824,24 +823,11 @@ public void testDecommissionFailedWithOnlyOneAttributeValue() throws Exception {
// and hence due to which the leader won't get abdicated and decommission request should eventually fail.
// And in this case, to ensure decommission request doesn't leave mutating change in the cluster, we ensure
// that no exclusion is set to the cluster and state for decommission is marked as FAILED
Logger clusterLogger = LogManager.getLogger(DecommissionService.class);
MockLogAppender mockLogAppender = MockLogAppender.createForLoggers(clusterLogger);
mockLogAppender.addExpectation(
new MockLogAppender.SeenEventExpectation(
"test",
DecommissionService.class.getCanonicalName(),
Level.ERROR,
"failure in removing to-be-decommissioned cluster manager eligible nodes"
)
OpenSearchTimeoutException ex = expectThrows(
OpenSearchTimeoutException.class,
() -> client().execute(DecommissionAction.INSTANCE, decommissionRequest).actionGet()
);

assertBusy(() -> {
OpenSearchTimeoutException ex = expectThrows(
OpenSearchTimeoutException.class,
() -> client().execute(DecommissionAction.INSTANCE, decommissionRequest).actionGet()
);
assertTrue(ex.getMessage().contains("timed out waiting for voting config exclusions"));
});
assertTrue(ex.getMessage().contains("while removing to-be-decommissioned cluster manager eligible nodes"));

ClusterService leaderClusterService = internalCluster().getInstance(
ClusterService.class,
Expand Down Expand Up @@ -877,7 +863,6 @@ public void testDecommissionFailedWithOnlyOneAttributeValue() throws Exception {

// if the below condition is passed, then we are sure current decommission status is marked FAILED
assertTrue(expectedStateLatch.await(30, TimeUnit.SECONDS));
mockLogAppender.assertAllExpectationsMatched();

// ensure all nodes are part of cluster
ensureStableCluster(6, TimeValue.timeValueMinutes(2));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,8 @@
import org.opensearch.cluster.ClusterStateUpdateTask;
import org.opensearch.cluster.block.ClusterBlockException;
import org.opensearch.cluster.block.ClusterBlockLevel;
import org.opensearch.cluster.coordination.CoordinationMetadata;
import org.opensearch.cluster.coordination.CoordinationMetadata.VotingConfigExclusion;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.Priority;
import org.opensearch.common.inject.Inject;
Expand All @@ -66,6 +64,9 @@
import java.util.function.Predicate;
import java.util.stream.Collectors;

import static org.opensearch.action.admin.cluster.configuration.VotingConfigExclusionsHelper.resolveVotingConfigExclusionsAndCheckMaximum;
import static org.opensearch.action.admin.cluster.configuration.VotingConfigExclusionsHelper.addExclusionAndGetState;

/**
* Transport endpoint action for adding exclusions to voting config
*
Expand Down Expand Up @@ -144,13 +145,7 @@ public ClusterState execute(ClusterState currentState) {
assert resolvedExclusions == null : resolvedExclusions;
final int finalMaxVotingConfigExclusions = TransportAddVotingConfigExclusionsAction.this.maxVotingConfigExclusions;
resolvedExclusions = resolveVotingConfigExclusionsAndCheckMaximum(request, currentState, finalMaxVotingConfigExclusions);

final CoordinationMetadata.Builder builder = CoordinationMetadata.builder(currentState.coordinationMetadata());
resolvedExclusions.forEach(builder::addVotingConfigExclusion);
final Metadata newMetadata = Metadata.builder(currentState.metadata()).coordinationMetadata(builder.build()).build();
final ClusterState newState = ClusterState.builder(currentState).metadata(newMetadata).build();
assert newState.getVotingConfigExclusions().size() <= finalMaxVotingConfigExclusions;
return newState;
return addExclusionAndGetState(currentState, resolvedExclusions, finalMaxVotingConfigExclusions);
}

@Override
Expand Down Expand Up @@ -213,18 +208,6 @@ public void onTimeout(TimeValue timeout) {
});
}

private static Set<VotingConfigExclusion> resolveVotingConfigExclusionsAndCheckMaximum(
AddVotingConfigExclusionsRequest request,
ClusterState state,
int maxVotingConfigExclusions
) {
return request.resolveVotingConfigExclusionsAndCheckMaximum(
state,
maxVotingConfigExclusions,
MAXIMUM_VOTING_CONFIG_EXCLUSIONS_SETTING.getKey()
);
}

@Override
protected ClusterBlockException checkBlock(AddVotingConfigExclusionsRequest request, ClusterState state) {
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,8 @@
import org.opensearch.cluster.ClusterStateUpdateTask;
import org.opensearch.cluster.block.ClusterBlockException;
import org.opensearch.cluster.block.ClusterBlockLevel;
import org.opensearch.cluster.coordination.CoordinationMetadata;
import org.opensearch.cluster.coordination.CoordinationMetadata.VotingConfigExclusion;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.Priority;
import org.opensearch.common.inject.Inject;
Expand All @@ -60,6 +58,8 @@
import java.io.IOException;
import java.util.function.Predicate;

import static org.opensearch.action.admin.cluster.configuration.VotingConfigExclusionsHelper.clearExclusionsAndGetState;

/**
* Transport endpoint action for clearing exclusions to voting config
*
Expand Down Expand Up @@ -166,13 +166,7 @@ private void submitClearVotingConfigExclusionsTask(
clusterService.submitStateUpdateTask("clear-voting-config-exclusions", new ClusterStateUpdateTask(Priority.URGENT) {
@Override
public ClusterState execute(ClusterState currentState) {
final CoordinationMetadata newCoordinationMetadata = CoordinationMetadata.builder(currentState.coordinationMetadata())
.clearVotingConfigExclusions()
.build();
final Metadata newMetadata = Metadata.builder(currentState.metadata())
.coordinationMetadata(newCoordinationMetadata)
.build();
return ClusterState.builder(currentState).metadata(newMetadata).build();
return clearExclusionsAndGetState(currentState);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.admin.cluster.configuration;

import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.coordination.CoordinationMetadata;
import org.opensearch.cluster.coordination.CoordinationMetadata.VotingConfigExclusion;
import org.opensearch.cluster.metadata.Metadata;

import java.util.Set;

import static org.opensearch.action.admin.cluster.configuration.TransportAddVotingConfigExclusionsAction.MAXIMUM_VOTING_CONFIG_EXCLUSIONS_SETTING;

/**
* Static helper utilities for voting config exclusions cluster state updates
*
* @opensearch.internal
*/
public class VotingConfigExclusionsHelper {

/**
* Static helper to update current state with given resolved exclusions
*
* @param currentState current cluster state
* @param resolvedExclusions resolved exclusions from the request
* @param finalMaxVotingConfigExclusions max exclusions that be added
* @return newly formed cluster state
*/
public static ClusterState addExclusionAndGetState(
ClusterState currentState,
Set<VotingConfigExclusion> resolvedExclusions,
int finalMaxVotingConfigExclusions
) {
final CoordinationMetadata.Builder builder = CoordinationMetadata.builder(currentState.coordinationMetadata());
resolvedExclusions.forEach(builder::addVotingConfigExclusion);
final Metadata newMetadata = Metadata.builder(currentState.metadata()).coordinationMetadata(builder.build()).build();
final ClusterState newState = ClusterState.builder(currentState).metadata(newMetadata).build();
assert newState.getVotingConfigExclusions().size() <= finalMaxVotingConfigExclusions;
return newState;
}

/**
* Resolves the exclusion from the request and throws IAE if no nodes matched or maximum exceeded
*
* @param request AddVotingConfigExclusionsRequest request
* @param state current cluster state
* @param maxVotingConfigExclusions max number of exclusion acceptable
* @return set of VotingConfigExclusion
*/
public static Set<VotingConfigExclusion> resolveVotingConfigExclusionsAndCheckMaximum(
AddVotingConfigExclusionsRequest request,
ClusterState state,
int maxVotingConfigExclusions
) {
return request.resolveVotingConfigExclusionsAndCheckMaximum(
state,
maxVotingConfigExclusions,
MAXIMUM_VOTING_CONFIG_EXCLUSIONS_SETTING.getKey()
);
}

/**
* Clears Voting config exclusion from the given cluster state
*
* @param currentState current cluster state
* @return newly formed cluster state after clearing voting config exclusions
*/
public static ClusterState clearExclusionsAndGetState(ClusterState currentState) {
final CoordinationMetadata newCoordinationMetadata = CoordinationMetadata.builder(currentState.coordinationMetadata())
.clearVotingConfigExclusions()
.build();
final Metadata newMetadata = Metadata.builder(currentState.metadata()).coordinationMetadata(newCoordinationMetadata).build();
return ClusterState.builder(currentState).metadata(newMetadata).build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@
import java.util.stream.StreamSupport;

import static org.opensearch.cluster.coordination.NoClusterManagerBlockService.NO_CLUSTER_MANAGER_BLOCK_ID;
import static org.opensearch.cluster.decommission.DecommissionService.nodeCommissioned;
import static org.opensearch.cluster.decommission.DecommissionHelper.nodeCommissioned;
import static org.opensearch.gateway.ClusterStateUpdaters.hideStateIfNotRecovered;
import static org.opensearch.gateway.GatewayService.STATE_NOT_RECOVERED_BLOCK;
import static org.opensearch.monitor.StatusInfo.Status.UNHEALTHY;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@
import java.util.function.BiConsumer;
import java.util.stream.Collectors;

import static org.opensearch.cluster.decommission.DecommissionService.nodeCommissioned;
import static org.opensearch.cluster.decommission.DecommissionHelper.nodeCommissioned;
import static org.opensearch.gateway.GatewayService.STATE_NOT_RECOVERED_BLOCK;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,6 @@
import org.apache.logging.log4j.Logger;
import org.opensearch.OpenSearchTimeoutException;
import org.opensearch.action.ActionListener;
import org.opensearch.action.admin.cluster.configuration.AddVotingConfigExclusionsAction;
import org.opensearch.action.admin.cluster.configuration.AddVotingConfigExclusionsRequest;
import org.opensearch.action.admin.cluster.configuration.AddVotingConfigExclusionsResponse;
import org.opensearch.action.admin.cluster.configuration.ClearVotingConfigExclusionsAction;
import org.opensearch.action.admin.cluster.configuration.ClearVotingConfigExclusionsRequest;
import org.opensearch.action.admin.cluster.configuration.ClearVotingConfigExclusionsResponse;
import org.opensearch.action.admin.cluster.node.stats.NodeStats;
import org.opensearch.action.admin.cluster.node.stats.NodesStatsAction;
import org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest;
Expand All @@ -33,7 +27,6 @@
import org.opensearch.cluster.routing.allocation.AllocationService;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.Priority;
import org.opensearch.common.Strings;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.http.HttpStats;
Expand All @@ -52,6 +45,8 @@
import java.util.function.Predicate;
import java.util.stream.Collectors;

import static org.opensearch.action.admin.cluster.configuration.VotingConfigExclusionsHelper.clearExclusionsAndGetState;

/**
* Helper controller class to remove list of nodes from the cluster and update status
*
Expand Down Expand Up @@ -79,83 +74,6 @@ public class DecommissionController {
this.threadPool = threadPool;
}

/**
* Transport call to add nodes to voting config exclusion
*
* @param nodes set of nodes Ids to be added to voting config exclusion list
* @param listener callback for response or failure
*/
public void excludeDecommissionedNodesFromVotingConfig(Set<String> nodes, ActionListener<Void> listener) {
transportService.sendRequest(
transportService.getLocalNode(),
AddVotingConfigExclusionsAction.NAME,
new AddVotingConfigExclusionsRequest(
Strings.EMPTY_ARRAY,
nodes.toArray(String[]::new),
Strings.EMPTY_ARRAY,
TimeValue.timeValueSeconds(120) // giving a larger timeout of 120 sec as cluster might already be in stress when
// decommission is triggered
),
new TransportResponseHandler<AddVotingConfigExclusionsResponse>() {
@Override
public void handleResponse(AddVotingConfigExclusionsResponse response) {
listener.onResponse(null);
}

@Override
public void handleException(TransportException exp) {
listener.onFailure(exp);
}

@Override
public String executor() {
return ThreadPool.Names.SAME;
}

@Override
public AddVotingConfigExclusionsResponse read(StreamInput in) throws IOException {
return new AddVotingConfigExclusionsResponse(in);
}
}
);
}

/**
* Transport call to clear voting config exclusion
*
* @param listener callback for response or failure
*/
public void clearVotingConfigExclusion(ActionListener<Void> listener, boolean waitForRemoval) {
final ClearVotingConfigExclusionsRequest clearVotingConfigExclusionsRequest = new ClearVotingConfigExclusionsRequest();
clearVotingConfigExclusionsRequest.setWaitForRemoval(waitForRemoval);
transportService.sendRequest(
transportService.getLocalNode(),
ClearVotingConfigExclusionsAction.NAME,
clearVotingConfigExclusionsRequest,
new TransportResponseHandler<ClearVotingConfigExclusionsResponse>() {
@Override
public void handleResponse(ClearVotingConfigExclusionsResponse response) {
listener.onResponse(null);
}

@Override
public void handleException(TransportException exp) {
listener.onFailure(exp);
}

@Override
public String executor() {
return ThreadPool.Names.SAME;
}

@Override
public ClearVotingConfigExclusionsResponse read(StreamInput in) throws IOException {
return new ClearVotingConfigExclusionsResponse(in);
}
}
);
}

/**
* This method triggers batch of tasks for nodes to be decommissioned using executor {@link NodeRemovalClusterStateTaskExecutor}
* Once the tasks are submitted, it waits for an expected cluster state to guarantee
Expand Down Expand Up @@ -259,9 +177,15 @@ public ClusterState execute(ClusterState currentState) {
decommissionAttributeMetadata.decommissionAttribute(),
decommissionStatus
);
return ClusterState.builder(currentState)
ClusterState newState = ClusterState.builder(currentState)
.metadata(Metadata.builder(currentState.metadata()).decommissionAttributeMetadata(decommissionAttributeMetadata))
.build();

// For terminal status we will go ahead and clear any exclusion that was added as part of decommission action
if (decommissionStatus.equals(DecommissionStatus.SUCCESSFUL) || decommissionStatus.equals(DecommissionStatus.FAILED)) {
newState = clearExclusionsAndGetState(newState);
}
return newState;
}

@Override
Expand Down
Loading

0 comments on commit d9fb1b0

Please sign in to comment.