diff --git a/CHANGELOG.md b/CHANGELOG.md index 1972ea21ff2d9..b0bc7d472fb18 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -42,6 +42,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/) - Add dev help in gradle check CI failures ([4872](https://github.com/opensearch-project/OpenSearch/pull/4872)) - Copy `build.sh` over from opensearch-build ([#4887](https://github.com/opensearch-project/OpenSearch/pull/4887)) - Add project health badges to the README.md ([#4843](https://github.com/opensearch-project/OpenSearch/pull/4843)) +- Added changes for graceful node decommission ([#4586](https://github.com/opensearch-project/OpenSearch/pull/4586)) - Build no-jdk distributions as part of release build ([#4902](https://github.com/opensearch-project/OpenSearch/pull/4902)) ### Dependencies @@ -72,8 +73,9 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/) - Exclude jettison version brought in with hadoop-minicluster. ([#4787](https://github.com/opensearch-project/OpenSearch/pull/4787)) - Bump protobuf-java to 3.21.7 in repository-gcs and repository-hdfs ([#4790](https://github.com/opensearch-project/OpenSearch/pull/4790)) - Bump reactor-netty-http to 1.0.24 in repository-azure ([#4880](https://github.com/opensearch-project/OpenSearch/pull/4880)) -- Bumps `protobuf-java` from 3.21.7 to 3.21.8 +- Bumps `protobuf-java` from 3.21.7 to 3.21.8 ([#4886](https://github.com/opensearch-project/OpenSearch/pull/4886)) - Upgrade netty to 4.1.84.Final ([#4893](https://github.com/opensearch-project/OpenSearch/pull/4893)) +- Dependency updates: asm 9.3 -> 9.4, bytebuddy 1.12.12 -> 1.12.18 ([#4889](https://github.com/opensearch-project/OpenSearch/pull/4889)) ### Changed - Dependency updates (httpcore, mockito, slf4j, httpasyncclient, commons-codec) ([#4308](https://github.com/opensearch-project/OpenSearch/pull/4308)) diff --git a/buildSrc/version.properties b/buildSrc/version.properties index 4704e0c71880a..8ff74b40cc2eb 100644 --- a/buildSrc/version.properties +++ b/buildSrc/version.properties @@ -14,9 +14,10 @@ jackson_databind = 2.13.4.2 snakeyaml = 1.32 icu4j = 70.1 supercsv = 2.4.0 +# Update to 2.17.2+ is breaking OpenSearchJsonLayout (see https://issues.apache.org/jira/browse/LOG4J2-3562) log4j = 2.17.1 slf4j = 1.7.36 -asm = 9.3 +asm = 9.4 jettison = 1.5.1 # when updating the JNA version, also update the version in buildSrc/build.gradle @@ -45,9 +46,10 @@ bouncycastle=1.70 randomizedrunner = 2.7.1 junit = 4.13.2 hamcrest = 2.1 +# Update to 4.8.0 is using reflection without SecurityManager checks (fails with java.security.AccessControlException) mockito = 4.7.0 objenesis = 3.2 -bytebuddy = 1.12.12 +bytebuddy = 1.12.18 # benchmark dependencies jmh = 1.35 diff --git a/modules/lang-expression/licenses/asm-9.3.jar.sha1 b/modules/lang-expression/licenses/asm-9.3.jar.sha1 deleted file mode 100644 index 71d3966a6f6f9..0000000000000 --- a/modules/lang-expression/licenses/asm-9.3.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -8e6300ef51c1d801a7ed62d07cd221aca3a90640 \ No newline at end of file diff --git a/modules/lang-expression/licenses/asm-9.4.jar.sha1 b/modules/lang-expression/licenses/asm-9.4.jar.sha1 new file mode 100644 index 0000000000000..75f2b0fe9a112 --- /dev/null +++ b/modules/lang-expression/licenses/asm-9.4.jar.sha1 @@ -0,0 +1 @@ +b4e0e2d2e023aa317b7cfcfc916377ea348e07d1 \ No newline at end of file diff --git a/modules/lang-expression/licenses/asm-commons-9.3.jar.sha1 b/modules/lang-expression/licenses/asm-commons-9.3.jar.sha1 deleted file mode 100644 index fd7cd4943a57c..0000000000000 --- a/modules/lang-expression/licenses/asm-commons-9.3.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -1f2a432d1212f5c352ae607d7b61dcae20c20af5 \ No newline at end of file diff --git a/modules/lang-expression/licenses/asm-commons-9.4.jar.sha1 b/modules/lang-expression/licenses/asm-commons-9.4.jar.sha1 new file mode 100644 index 0000000000000..e0e2a2f4e63e9 --- /dev/null +++ b/modules/lang-expression/licenses/asm-commons-9.4.jar.sha1 @@ -0,0 +1 @@ +8fc2810ddbcbbec0a8bbccb3f8eda58321839912 \ No newline at end of file diff --git a/modules/lang-expression/licenses/asm-tree-9.3.jar.sha1 b/modules/lang-expression/licenses/asm-tree-9.3.jar.sha1 deleted file mode 100644 index 238f0006424d3..0000000000000 --- a/modules/lang-expression/licenses/asm-tree-9.3.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -78d2ecd61318b5a58cd04fb237636c0e86b77d97 \ No newline at end of file diff --git a/modules/lang-expression/licenses/asm-tree-9.4.jar.sha1 b/modules/lang-expression/licenses/asm-tree-9.4.jar.sha1 new file mode 100644 index 0000000000000..50ce6d740aab7 --- /dev/null +++ b/modules/lang-expression/licenses/asm-tree-9.4.jar.sha1 @@ -0,0 +1 @@ +a99175a17d7fdc18cbcbd0e8ea6a5d276844190a \ No newline at end of file diff --git a/modules/lang-painless/licenses/asm-9.3.jar.sha1 b/modules/lang-painless/licenses/asm-9.3.jar.sha1 deleted file mode 100644 index 71d3966a6f6f9..0000000000000 --- a/modules/lang-painless/licenses/asm-9.3.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -8e6300ef51c1d801a7ed62d07cd221aca3a90640 \ No newline at end of file diff --git a/modules/lang-painless/licenses/asm-9.4.jar.sha1 b/modules/lang-painless/licenses/asm-9.4.jar.sha1 new file mode 100644 index 0000000000000..75f2b0fe9a112 --- /dev/null +++ b/modules/lang-painless/licenses/asm-9.4.jar.sha1 @@ -0,0 +1 @@ +b4e0e2d2e023aa317b7cfcfc916377ea348e07d1 \ No newline at end of file diff --git a/modules/lang-painless/licenses/asm-analysis-9.3.jar.sha1 b/modules/lang-painless/licenses/asm-analysis-9.3.jar.sha1 deleted file mode 100644 index f5a04d0196823..0000000000000 --- a/modules/lang-painless/licenses/asm-analysis-9.3.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -4b071f211b37c38e0e9f5998550197c8593f6ad8 \ No newline at end of file diff --git a/modules/lang-painless/licenses/asm-analysis-9.4.jar.sha1 b/modules/lang-painless/licenses/asm-analysis-9.4.jar.sha1 new file mode 100644 index 0000000000000..850a070775e4d --- /dev/null +++ b/modules/lang-painless/licenses/asm-analysis-9.4.jar.sha1 @@ -0,0 +1 @@ +0a5fec9dfc039448d4fd098fbaffcaf55373b223 \ No newline at end of file diff --git a/modules/lang-painless/licenses/asm-commons-9.3.jar.sha1 b/modules/lang-painless/licenses/asm-commons-9.3.jar.sha1 deleted file mode 100644 index fd7cd4943a57c..0000000000000 --- a/modules/lang-painless/licenses/asm-commons-9.3.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -1f2a432d1212f5c352ae607d7b61dcae20c20af5 \ No newline at end of file diff --git a/modules/lang-painless/licenses/asm-commons-9.4.jar.sha1 b/modules/lang-painless/licenses/asm-commons-9.4.jar.sha1 new file mode 100644 index 0000000000000..e0e2a2f4e63e9 --- /dev/null +++ b/modules/lang-painless/licenses/asm-commons-9.4.jar.sha1 @@ -0,0 +1 @@ +8fc2810ddbcbbec0a8bbccb3f8eda58321839912 \ No newline at end of file diff --git a/modules/lang-painless/licenses/asm-tree-9.3.jar.sha1 b/modules/lang-painless/licenses/asm-tree-9.3.jar.sha1 deleted file mode 100644 index 238f0006424d3..0000000000000 --- a/modules/lang-painless/licenses/asm-tree-9.3.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -78d2ecd61318b5a58cd04fb237636c0e86b77d97 \ No newline at end of file diff --git a/modules/lang-painless/licenses/asm-tree-9.4.jar.sha1 b/modules/lang-painless/licenses/asm-tree-9.4.jar.sha1 new file mode 100644 index 0000000000000..50ce6d740aab7 --- /dev/null +++ b/modules/lang-painless/licenses/asm-tree-9.4.jar.sha1 @@ -0,0 +1 @@ +a99175a17d7fdc18cbcbd0e8ea6a5d276844190a \ No newline at end of file diff --git a/modules/lang-painless/licenses/asm-util-9.3.jar.sha1 b/modules/lang-painless/licenses/asm-util-9.3.jar.sha1 deleted file mode 100644 index 8859c317794ba..0000000000000 --- a/modules/lang-painless/licenses/asm-util-9.3.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -9595bc05510d0bd4b610188b77333fe4851a1975 \ No newline at end of file diff --git a/modules/lang-painless/licenses/asm-util-9.4.jar.sha1 b/modules/lang-painless/licenses/asm-util-9.4.jar.sha1 new file mode 100644 index 0000000000000..8c5854f41bcda --- /dev/null +++ b/modules/lang-painless/licenses/asm-util-9.4.jar.sha1 @@ -0,0 +1 @@ +ab1e0a84b72561dbaf1ee260321e72148ebf4b19 \ No newline at end of file diff --git a/server/src/internalClusterTest/java/org/opensearch/cluster/coordination/AwarenessAttributeDecommissionIT.java b/server/src/internalClusterTest/java/org/opensearch/cluster/coordination/AwarenessAttributeDecommissionIT.java index 2dc964e3e8845..14ec041b7464b 100644 --- a/server/src/internalClusterTest/java/org/opensearch/cluster/coordination/AwarenessAttributeDecommissionIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/cluster/coordination/AwarenessAttributeDecommissionIT.java @@ -120,31 +120,44 @@ public void testDecommissionStatusUpdatePublishedToAllNodes() throws ExecutionEx logger.info("--> starting decommissioning nodes in zone {}", 'c'); DecommissionAttribute decommissionAttribute = new DecommissionAttribute("zone", "c"); + // Set the timeout to 0 to do immediate Decommission DecommissionRequest decommissionRequest = new DecommissionRequest(decommissionAttribute); + decommissionRequest.setNoDelay(true); DecommissionResponse decommissionResponse = client().execute(DecommissionAction.INSTANCE, decommissionRequest).get(); assertTrue(decommissionResponse.isAcknowledged()); + logger.info("--> Received decommissioning nodes in zone {}", 'c'); + // Keep some delay for scheduler to invoke decommission flow + Thread.sleep(500); + // Will wait for all events to complete client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).get(); + logger.info("--> Received LANGUID event"); + // assert that decommission status is successful - GetDecommissionStateResponse response = client().execute( + GetDecommissionStateResponse response = client(clusterManagerNodes.get(0)).execute( GetDecommissionStateAction.INSTANCE, new GetDecommissionStateRequest(decommissionAttribute.attributeName()) ).get(); assertEquals(response.getAttributeValue(), decommissionAttribute.attributeValue()); - assertEquals(response.getDecommissionStatus(), DecommissionStatus.SUCCESSFUL); + assertEquals(DecommissionStatus.SUCCESSFUL, response.getDecommissionStatus()); + logger.info("--> Decommission status is successful"); ClusterState clusterState = client(clusterManagerNodes.get(0)).admin().cluster().prepareState().execute().actionGet().getState(); assertEquals(4, clusterState.nodes().getSize()); + logger.info("--> Got cluster state with 4 nodes."); // assert status on nodes that are part of cluster currently Iterator discoveryNodeIterator = clusterState.nodes().getNodes().valuesIt(); + DiscoveryNode clusterManagerNodeAfterDecommission = null; while (discoveryNodeIterator.hasNext()) { // assert no node has decommissioned attribute DiscoveryNode node = discoveryNodeIterator.next(); assertNotEquals(node.getAttributes().get("zone"), "c"); - + if (node.isClusterManagerNode()) { + clusterManagerNodeAfterDecommission = node; + } // assert all the nodes has status as SUCCESSFUL ClusterService localNodeClusterService = internalCluster().getInstance(ClusterService.class, node.getName()); assertEquals( @@ -152,6 +165,8 @@ public void testDecommissionStatusUpdatePublishedToAllNodes() throws ExecutionEx DecommissionStatus.SUCCESSFUL ); } + assertNotNull("Cluster Manager not found after decommission", clusterManagerNodeAfterDecommission); + logger.info("--> Cluster Manager node found after decommission"); // assert status on decommissioned node // Here we will verify that until it got kicked out, it received appropriate status updates @@ -163,16 +178,18 @@ public void testDecommissionStatusUpdatePublishedToAllNodes() throws ExecutionEx decommissionedNodeClusterService.state().metadata().decommissionAttributeMetadata().status(), DecommissionStatus.IN_PROGRESS ); + logger.info("--> Verified the decommissioned node Has in progress state."); // Will wait for all events to complete - client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).get(); - + client(clusterManagerNodeAfterDecommission.getName()).admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).get(); + logger.info("--> Got LANGUID event"); // Recommissioning the zone back to gracefully succeed the test once above tests succeeds - DeleteDecommissionStateResponse deleteDecommissionStateResponse = client(clusterManagerNodes.get(0)).execute( + DeleteDecommissionStateResponse deleteDecommissionStateResponse = client(clusterManagerNodeAfterDecommission.getName()).execute( DeleteDecommissionStateAction.INSTANCE, new DeleteDecommissionStateRequest() ).get(); assertTrue(deleteDecommissionStateResponse.isAcknowledged()); + logger.info("--> Deleting decommission done."); // will wait for cluster to stabilise with a timeout of 2 min (findPeerInterval for decommissioned nodes) // as by then all nodes should have joined the cluster @@ -201,6 +218,7 @@ public void testDecommissionFailedWhenAttributeNotWeighedAway() throws Exception DecommissionAttribute decommissionAttribute = new DecommissionAttribute("zone", "c"); DecommissionRequest decommissionRequest = new DecommissionRequest(decommissionAttribute); + decommissionRequest.setNoDelay(true); assertBusy(() -> { DecommissioningFailedException ex = expectThrows( DecommissioningFailedException.class, diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/decommission/awareness/put/DecommissionRequest.java b/server/src/main/java/org/opensearch/action/admin/cluster/decommission/awareness/put/DecommissionRequest.java index 79a6688dc6049..e2fb353b6c749 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/decommission/awareness/put/DecommissionRequest.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/decommission/awareness/put/DecommissionRequest.java @@ -14,6 +14,7 @@ import org.opensearch.common.Strings; import org.opensearch.common.io.stream.StreamInput; import org.opensearch.common.io.stream.StreamOutput; +import org.opensearch.common.unit.TimeValue; import java.io.IOException; @@ -28,8 +29,15 @@ */ public class DecommissionRequest extends ClusterManagerNodeRequest { + public static final TimeValue DEFAULT_NODE_DRAINING_TIMEOUT = TimeValue.timeValueSeconds(120); + private DecommissionAttribute decommissionAttribute; + private TimeValue delayTimeout = DEFAULT_NODE_DRAINING_TIMEOUT; + + // holder for no_delay param. To avoid draining time timeout. + private boolean noDelay = false; + public DecommissionRequest() {} public DecommissionRequest(DecommissionAttribute decommissionAttribute) { @@ -39,12 +47,14 @@ public DecommissionRequest(DecommissionAttribute decommissionAttribute) { public DecommissionRequest(StreamInput in) throws IOException { super(in); decommissionAttribute = new DecommissionAttribute(in); + this.delayTimeout = in.readTimeValue(); } @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); decommissionAttribute.writeTo(out); + out.writeTimeValue(delayTimeout); } /** @@ -65,6 +75,19 @@ public DecommissionAttribute getDecommissionAttribute() { return this.decommissionAttribute; } + public TimeValue getDelayTimeout() { + return this.delayTimeout; + } + + public void setNoDelay(boolean noDelay) { + this.delayTimeout = TimeValue.ZERO; + this.noDelay = noDelay; + } + + public boolean isNoDelay() { + return noDelay; + } + @Override public ActionRequestValidationException validate() { ActionRequestValidationException validationException = null; @@ -74,6 +97,14 @@ public ActionRequestValidationException validate() { if (decommissionAttribute.attributeValue() == null || Strings.isEmpty(decommissionAttribute.attributeValue())) { validationException = addValidationError("attribute value is missing", validationException); } + // This validation should not fail since we are not allowing delay timeout to be set externally. + // Still keeping it for double check. + if (noDelay && delayTimeout.getSeconds() > 0) { + final String validationMessage = "Invalid decommission request. no_delay is true and delay_timeout is set to " + + delayTimeout.getSeconds() + + "] Seconds"; + validationException = addValidationError(validationMessage, validationException); + } return validationException; } diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/decommission/awareness/put/TransportDecommissionAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/decommission/awareness/put/TransportDecommissionAction.java index 3a067d2f110b9..6f4e3cf82d2ce 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/decommission/awareness/put/TransportDecommissionAction.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/decommission/awareness/put/TransportDecommissionAction.java @@ -76,6 +76,6 @@ protected ClusterBlockException checkBlock(DecommissionRequest request, ClusterS protected void clusterManagerOperation(DecommissionRequest request, ClusterState state, ActionListener listener) throws Exception { logger.info("starting awareness attribute [{}] decommissioning", request.getDecommissionAttribute().toString()); - decommissionService.startDecommissionAction(request.getDecommissionAttribute(), listener); + decommissionService.startDecommissionAction(request, listener); } } diff --git a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionAttributeMetadata.java b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionAttributeMetadata.java index d3d508bf36451..56b8e6aea7c4a 100644 --- a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionAttributeMetadata.java +++ b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionAttributeMetadata.java @@ -24,6 +24,7 @@ import java.io.IOException; import java.util.EnumSet; import java.util.Objects; +import java.util.Set; /** * Contains metadata about decommission attribute @@ -88,11 +89,14 @@ public synchronized void validateNewStatus(DecommissionStatus newStatus) { } // We don't expect that INIT will be new status, as it is registered only when starting the decommission action switch (newStatus) { + case DRAINING: + validateStatus(Set.of(DecommissionStatus.INIT), newStatus); + break; case IN_PROGRESS: - validateStatus(DecommissionStatus.INIT, newStatus); + validateStatus(Set.of(DecommissionStatus.DRAINING, DecommissionStatus.INIT), newStatus); break; case SUCCESSFUL: - validateStatus(DecommissionStatus.IN_PROGRESS, newStatus); + validateStatus(Set.of(DecommissionStatus.IN_PROGRESS), newStatus); break; default: throw new IllegalArgumentException( @@ -101,17 +105,17 @@ public synchronized void validateNewStatus(DecommissionStatus newStatus) { } } - private void validateStatus(DecommissionStatus expected, DecommissionStatus next) { - if (status.equals(expected) == false) { + private void validateStatus(Set expectedStatuses, DecommissionStatus next) { + if (expectedStatuses.contains(status) == false) { assert false : "can't move decommission status to [" + next + "]. current status: [" + status - + "] (expected [" - + expected + + "] (allowed statuses [" + + expectedStatuses + "])"; throw new IllegalStateException( - "can't move decommission status to [" + next + "]. current status: [" + status + "] (expected [" + expected + "])" + "can't move decommission status to [" + next + "]. current status: [" + status + "] (expected [" + expectedStatuses + "])" ); } } diff --git a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionController.java b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionController.java index b58d99a9d59db..ffb20a05b3ef7 100644 --- a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionController.java +++ b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionController.java @@ -18,6 +18,10 @@ import org.opensearch.action.admin.cluster.configuration.ClearVotingConfigExclusionsAction; import org.opensearch.action.admin.cluster.configuration.ClearVotingConfigExclusionsRequest; import org.opensearch.action.admin.cluster.configuration.ClearVotingConfigExclusionsResponse; +import org.opensearch.action.admin.cluster.node.stats.NodeStats; +import org.opensearch.action.admin.cluster.node.stats.NodesStatsAction; +import org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest; +import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.ClusterStateObserver; import org.opensearch.cluster.ClusterStateTaskConfig; @@ -32,6 +36,7 @@ import org.opensearch.common.Strings; import org.opensearch.common.io.stream.StreamInput; import org.opensearch.common.unit.TimeValue; +import org.opensearch.http.HttpStats; import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.TransportException; import org.opensearch.transport.TransportResponseHandler; @@ -39,7 +44,9 @@ import java.io.IOException; import java.util.Arrays; +import java.util.HashMap; import java.util.LinkedHashMap; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.function.Predicate; @@ -271,4 +278,61 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS } }); } + + private void logActiveConnections(NodesStatsResponse nodesStatsResponse) { + if (nodesStatsResponse == null || nodesStatsResponse.getNodes() == null) { + logger.info("Node stats response received is null/empty."); + return; + } + + Map nodeActiveConnectionMap = new HashMap<>(); + List responseNodes = nodesStatsResponse.getNodes(); + for (int i = 0; i < responseNodes.size(); i++) { + HttpStats httpStats = responseNodes.get(i).getHttp(); + DiscoveryNode node = responseNodes.get(i).getNode(); + nodeActiveConnectionMap.put(node.getId(), httpStats.getServerOpen()); + } + logger.info("Decommissioning node with connections : [{}]", nodeActiveConnectionMap); + } + + void getActiveRequestCountOnDecommissionedNodes(Set decommissionedNodes) { + if (decommissionedNodes == null || decommissionedNodes.isEmpty()) { + return; + } + String[] nodes = decommissionedNodes.stream().map(DiscoveryNode::getId).toArray(String[]::new); + if (nodes.length == 0) { + return; + } + + final NodesStatsRequest nodesStatsRequest = new NodesStatsRequest(nodes); + nodesStatsRequest.clear(); + nodesStatsRequest.addMetric(NodesStatsRequest.Metric.HTTP.metricName()); + + transportService.sendRequest( + transportService.getLocalNode(), + NodesStatsAction.NAME, + nodesStatsRequest, + new TransportResponseHandler() { + @Override + public void handleResponse(NodesStatsResponse response) { + logActiveConnections(response); + } + + @Override + public void handleException(TransportException exp) { + logger.error("Failure occurred while dumping connection for decommission nodes - ", exp.unwrapCause()); + } + + @Override + public String executor() { + return ThreadPool.Names.SAME; + } + + @Override + public NodesStatsResponse read(StreamInput in) throws IOException { + return new NodesStatsResponse(in); + } + } + ); + } } diff --git a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java index e6639ae058066..f284eb476a755 100644 --- a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java +++ b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java @@ -15,6 +15,7 @@ import org.opensearch.action.ActionListener; import org.opensearch.action.admin.cluster.decommission.awareness.delete.DeleteDecommissionStateResponse; import org.opensearch.action.admin.cluster.decommission.awareness.put.DecommissionResponse; +import org.opensearch.action.admin.cluster.decommission.awareness.put.DecommissionRequest; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.ClusterStateObserver; import org.opensearch.cluster.ClusterStateUpdateTask; @@ -115,13 +116,14 @@ private void setForcedAwarenessAttributes(Settings forceSettings) { * Starts the new decommission request and registers the metadata with status as {@link DecommissionStatus#INIT} * Once the status is updated, it tries to exclude to-be-decommissioned cluster manager eligible nodes from Voting Configuration * - * @param decommissionAttribute register decommission attribute in the metadata request + * @param decommissionRequest decommission request Object * @param listener register decommission listener */ public void startDecommissionAction( - final DecommissionAttribute decommissionAttribute, + final DecommissionRequest decommissionRequest, final ActionListener listener ) { + final DecommissionAttribute decommissionAttribute = decommissionRequest.getDecommissionAttribute(); // register the metadata with status as INIT as first step clusterService.submitStateUpdateTask("decommission [" + decommissionAttribute + "]", new ClusterStateUpdateTask(Priority.URGENT) { @Override @@ -161,15 +163,16 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS decommissionAttributeMetadata.decommissionAttribute(), decommissionAttributeMetadata.status() ); - decommissionClusterManagerNodes(decommissionAttributeMetadata.decommissionAttribute(), listener); + decommissionClusterManagerNodes(decommissionRequest, listener); } }); } private synchronized void decommissionClusterManagerNodes( - final DecommissionAttribute decommissionAttribute, + final DecommissionRequest decommissionRequest, ActionListener listener ) { + final DecommissionAttribute decommissionAttribute = decommissionRequest.getDecommissionAttribute(); ClusterState state = clusterService.getClusterApplierService().state(); // since here metadata is already registered with INIT, we can guarantee that no new node with decommission attribute can further // join the cluster @@ -212,7 +215,7 @@ public void onResponse(Void unused) { // and to-be-decommissioned cluster manager is no more part of Voting Configuration and no more to-be-decommission // nodes can be part of Voting Config listener.onResponse(new DecommissionResponse(true)); - failDecommissionedNodes(clusterService.getClusterApplierService().state()); + drainNodesWithDecommissionedAttribute(decommissionRequest); } } else { // explicitly calling listener.onFailure with NotClusterManagerException as the local node is not the cluster manager @@ -309,17 +312,74 @@ public void onFailure(Exception e) { } } - private void failDecommissionedNodes(ClusterState state) { - // this method ensures no matter what, we always exit from this function after clearing the voting config exclusion + void drainNodesWithDecommissionedAttribute(DecommissionRequest decommissionRequest) { + ClusterState state = clusterService.getClusterApplierService().state(); + Set decommissionedNodes = filterNodesWithDecommissionAttribute( + state, + decommissionRequest.getDecommissionAttribute(), + false + ); + + if (decommissionRequest.isNoDelay()) { + // Call to fail the decommission nodes + failDecommissionedNodes(decommissionedNodes, decommissionRequest.getDecommissionAttribute()); + } else { + decommissionController.updateMetadataWithDecommissionStatus(DecommissionStatus.DRAINING, new ActionListener<>() { + @Override + public void onResponse(DecommissionStatus status) { + logger.info("updated the decommission status to [{}]", status); + // set the weights + scheduleNodesDecommissionOnTimeout(decommissionedNodes, decommissionRequest.getDelayTimeout()); + } + + @Override + public void onFailure(Exception e) { + logger.error( + () -> new ParameterizedMessage( + "failed to update decommission status for attribute [{}] to [{}]", + decommissionRequest.getDecommissionAttribute().toString(), + DecommissionStatus.DRAINING + ), + e + ); + // since we are not able to update the status, we will clear the voting config exclusion we have set earlier + clearVotingConfigExclusionAndUpdateStatus(false, false); + } + }); + } + } + + void scheduleNodesDecommissionOnTimeout(Set decommissionedNodes, TimeValue timeoutForNodeDraining) { + ClusterState state = clusterService.getClusterApplierService().state(); DecommissionAttributeMetadata decommissionAttributeMetadata = state.metadata().decommissionAttributeMetadata(); + if (decommissionAttributeMetadata == null) { + return; + } + assert decommissionAttributeMetadata.status().equals(DecommissionStatus.DRAINING) + : "Unexpected status encountered while decommissioning nodes."; + + // This method ensures no matter what, we always exit from this function after clearing the voting config exclusion DecommissionAttribute decommissionAttribute = decommissionAttributeMetadata.decommissionAttribute(); + + // Wait for timeout to happen. Log the active connection before decommissioning of nodes. + transportService.getThreadPool().schedule(() -> { + // Log active connections. + decommissionController.getActiveRequestCountOnDecommissionedNodes(decommissionedNodes); + // Call to fail the decommission nodes + failDecommissionedNodes(decommissionedNodes, decommissionAttribute); + }, timeoutForNodeDraining, ThreadPool.Names.GENERIC); + } + + private void failDecommissionedNodes(Set decommissionedNodes, DecommissionAttribute decommissionAttribute) { + + // Weighing away is complete. We have allowed the nodes to be drained. Let's move decommission status to IN_PROGRESS. decommissionController.updateMetadataWithDecommissionStatus(DecommissionStatus.IN_PROGRESS, new ActionListener<>() { @Override public void onResponse(DecommissionStatus status) { logger.info("updated the decommission status to [{}]", status); // execute nodes decommissioning decommissionController.removeDecommissionedNodes( - filterNodesWithDecommissionAttribute(clusterService.getClusterApplierService().state(), decommissionAttribute, false), + decommissionedNodes, "nodes-decommissioned", TimeValue.timeValueSeconds(120L), new ActionListener() { @@ -454,6 +514,7 @@ private static void ensureEligibleRequest( case INIT: case FAILED: break; + case DRAINING: case IN_PROGRESS: case SUCCESSFUL: msg = "same request is already in status [" + decommissionAttributeMetadata.status() + "]"; @@ -471,6 +532,7 @@ private static void ensureEligibleRequest( + decommissionAttributeMetadata.decommissionAttribute().toString() + "] already successfully decommissioned, recommission before triggering another decommission"; break; + case DRAINING: case IN_PROGRESS: case INIT: // it means the decommission has been initiated or is inflight. In that case, will fail new request @@ -582,7 +644,9 @@ public static boolean nodeCommissioned(DiscoveryNode discoveryNode, Metadata met DecommissionStatus status = decommissionAttributeMetadata.status(); if (decommissionAttribute != null && status != null) { if (nodeHasDecommissionedAttribute(discoveryNode, decommissionAttribute) - && (status.equals(DecommissionStatus.IN_PROGRESS) || status.equals(DecommissionStatus.SUCCESSFUL))) { + && (status.equals(DecommissionStatus.IN_PROGRESS) + || status.equals(DecommissionStatus.SUCCESSFUL) + || status.equals(DecommissionStatus.DRAINING))) { return false; } } diff --git a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionStatus.java b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionStatus.java index af88b0d0f5902..4ca8c3cc4286e 100644 --- a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionStatus.java +++ b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionStatus.java @@ -16,6 +16,11 @@ public enum DecommissionStatus { * Decommission process is initiated, and to-be-decommissioned leader is excluded from voting config */ INIT("init"), + /** + * Decommission process is initiated, and the zone is being drained. + */ + DRAINING("draining"), + /** * Decommission process has started, decommissioned nodes should be removed */ @@ -56,6 +61,8 @@ public static DecommissionStatus fromString(String status) { } if (status.equals(INIT.status())) { return INIT; + } else if (status.equals(DRAINING.status())) { + return DRAINING; } else if (status.equals(IN_PROGRESS.status())) { return IN_PROGRESS; } else if (status.equals(SUCCESSFUL.status())) { diff --git a/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestDecommissionAction.java b/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestDecommissionAction.java index 979bf05f537b7..5f1d1ba48c88b 100644 --- a/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestDecommissionAction.java +++ b/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestDecommissionAction.java @@ -49,6 +49,11 @@ DecommissionRequest createRequest(RestRequest request) throws IOException { DecommissionRequest decommissionRequest = Requests.decommissionRequest(); String attributeName = request.param("awareness_attribute_name"); String attributeValue = request.param("awareness_attribute_value"); + // Check if we have no delay set. + boolean noDelay = request.paramAsBoolean("no_delay", false); + if (noDelay) { + decommissionRequest.setNoDelay(noDelay); + } return decommissionRequest.setDecommissionAttribute(new DecommissionAttribute(attributeName, attributeValue)); } } diff --git a/server/src/test/java/org/opensearch/action/admin/cluster/decommission/awareness/put/DecommissionRequestTests.java b/server/src/test/java/org/opensearch/action/admin/cluster/decommission/awareness/put/DecommissionRequestTests.java index c189b5702dea0..112609b0cf8ec 100644 --- a/server/src/test/java/org/opensearch/action/admin/cluster/decommission/awareness/put/DecommissionRequestTests.java +++ b/server/src/test/java/org/opensearch/action/admin/cluster/decommission/awareness/put/DecommissionRequestTests.java @@ -10,6 +10,7 @@ import org.opensearch.action.ActionRequestValidationException; import org.opensearch.cluster.decommission.DecommissionAttribute; +import org.opensearch.common.unit.TimeValue; import org.opensearch.test.OpenSearchTestCase; import java.io.IOException; @@ -25,6 +26,7 @@ public void testSerialization() throws IOException { final DecommissionRequest deserialized = copyWriteable(originalRequest, writableRegistry(), DecommissionRequest::new); assertEquals(deserialized.getDecommissionAttribute(), originalRequest.getDecommissionAttribute()); + assertEquals(deserialized.getDelayTimeout(), originalRequest.getDelayTimeout()); } public void testValidation() { @@ -54,8 +56,20 @@ public void testValidation() { DecommissionAttribute decommissionAttribute = new DecommissionAttribute(attributeName, attributeValue); final DecommissionRequest request = new DecommissionRequest(decommissionAttribute); + request.setNoDelay(true); ActionRequestValidationException e = request.validate(); assertNull(e); + assertEquals(TimeValue.ZERO, request.getDelayTimeout()); + } + { + String attributeName = "zone"; + String attributeValue = "test"; + DecommissionAttribute decommissionAttribute = new DecommissionAttribute(attributeName, attributeValue); + + final DecommissionRequest request = new DecommissionRequest(decommissionAttribute); + ActionRequestValidationException e = request.validate(); + assertNull(e); + assertEquals(DecommissionRequest.DEFAULT_NODE_DRAINING_TIMEOUT, request.getDelayTimeout()); } } } diff --git a/server/src/test/java/org/opensearch/cluster/coordination/JoinTaskExecutorTests.java b/server/src/test/java/org/opensearch/cluster/coordination/JoinTaskExecutorTests.java index 66a3b00f2979d..b5c5d30c45c47 100644 --- a/server/src/test/java/org/opensearch/cluster/coordination/JoinTaskExecutorTests.java +++ b/server/src/test/java/org/opensearch/cluster/coordination/JoinTaskExecutorTests.java @@ -233,7 +233,11 @@ public void testJoinClusterWithNoDecommission() { public void testPreventJoinClusterWithDecommission() { Settings.builder().build(); DecommissionAttribute decommissionAttribute = new DecommissionAttribute("zone", "zone-1"); - DecommissionStatus decommissionStatus = randomFrom(DecommissionStatus.IN_PROGRESS, DecommissionStatus.SUCCESSFUL); + DecommissionStatus decommissionStatus = randomFrom( + DecommissionStatus.IN_PROGRESS, + DecommissionStatus.SUCCESSFUL, + DecommissionStatus.DRAINING + ); DecommissionAttributeMetadata decommissionAttributeMetadata = new DecommissionAttributeMetadata( decommissionAttribute, decommissionStatus diff --git a/server/src/test/java/org/opensearch/cluster/decommission/DecommissionControllerTests.java b/server/src/test/java/org/opensearch/cluster/decommission/DecommissionControllerTests.java index 06f2de04907d6..5a76e0d5137fb 100644 --- a/server/src/test/java/org/opensearch/cluster/decommission/DecommissionControllerTests.java +++ b/server/src/test/java/org/opensearch/cluster/decommission/DecommissionControllerTests.java @@ -241,10 +241,45 @@ public void onFailure(Exception e) { } public void testSuccessfulDecommissionStatusMetadataUpdate() throws InterruptedException { + Map> decommissionStateTransitionMap = Map.of( + DecommissionStatus.INIT, + Set.of(DecommissionStatus.DRAINING, DecommissionStatus.IN_PROGRESS), + DecommissionStatus.DRAINING, + Set.of(DecommissionStatus.IN_PROGRESS), + DecommissionStatus.IN_PROGRESS, + Set.of(DecommissionStatus.SUCCESSFUL) + ); + + for (Map.Entry> entry : decommissionStateTransitionMap.entrySet()) { + for (DecommissionStatus val : entry.getValue()) { + verifyDecommissionStatusTransition(entry.getKey(), val); + } + } + } + + public void testSuccessfulDecommissionStatusMetadataUpdateForFailedState() throws InterruptedException { + Map> decommissionStateTransitionMap = Map.of( + DecommissionStatus.INIT, + Set.of(DecommissionStatus.FAILED), + DecommissionStatus.DRAINING, + Set.of(DecommissionStatus.FAILED), + DecommissionStatus.IN_PROGRESS, + Set.of(DecommissionStatus.FAILED) + ); + + for (Map.Entry> entry : decommissionStateTransitionMap.entrySet()) { + for (DecommissionStatus val : entry.getValue()) { + verifyDecommissionStatusTransition(entry.getKey(), val); + } + } + } + + private void verifyDecommissionStatusTransition(DecommissionStatus currentStatus, DecommissionStatus newStatus) + throws InterruptedException { final CountDownLatch countDownLatch = new CountDownLatch(1); DecommissionAttributeMetadata oldMetadata = new DecommissionAttributeMetadata( new DecommissionAttribute("zone", "zone-1"), - DecommissionStatus.IN_PROGRESS + currentStatus ); ClusterState state = clusterService.state(); Metadata metadata = state.metadata(); @@ -253,25 +288,23 @@ public void testSuccessfulDecommissionStatusMetadataUpdate() throws InterruptedE state = ClusterState.builder(state).metadata(mdBuilder).build(); setState(clusterService, state); - decommissionController.updateMetadataWithDecommissionStatus( - DecommissionStatus.SUCCESSFUL, - new ActionListener() { - @Override - public void onResponse(DecommissionStatus status) { - assertEquals(DecommissionStatus.SUCCESSFUL, status); - countDownLatch.countDown(); - } + decommissionController.updateMetadataWithDecommissionStatus(newStatus, new ActionListener() { + @Override + public void onResponse(DecommissionStatus status) { + assertEquals(newStatus, status); + countDownLatch.countDown(); + } - @Override - public void onFailure(Exception e) { - fail("decommission status update failed"); - } + @Override + public void onFailure(Exception e) { + fail("decommission status update failed"); + countDownLatch.countDown(); } - ); + }); assertTrue(countDownLatch.await(30, TimeUnit.SECONDS)); ClusterState newState = clusterService.getClusterApplierService().state(); DecommissionAttributeMetadata decommissionAttributeMetadata = newState.metadata().decommissionAttributeMetadata(); - assertEquals(decommissionAttributeMetadata.status(), DecommissionStatus.SUCCESSFUL); + assertEquals(decommissionAttributeMetadata.status(), newStatus); } private static class AdjustConfigurationForExclusions implements ClusterStateObserver.Listener { diff --git a/server/src/test/java/org/opensearch/cluster/decommission/DecommissionServiceTests.java b/server/src/test/java/org/opensearch/cluster/decommission/DecommissionServiceTests.java index c4cf6c7cc6641..3f39d67dee765 100644 --- a/server/src/test/java/org/opensearch/cluster/decommission/DecommissionServiceTests.java +++ b/server/src/test/java/org/opensearch/cluster/decommission/DecommissionServiceTests.java @@ -16,6 +16,7 @@ import org.opensearch.Version; import org.opensearch.action.ActionListener; import org.opensearch.action.admin.cluster.decommission.awareness.delete.DeleteDecommissionStateResponse; +import org.opensearch.action.admin.cluster.decommission.awareness.put.DecommissionRequest; import org.opensearch.action.admin.cluster.decommission.awareness.put.DecommissionResponse; import org.opensearch.action.admin.cluster.configuration.ClearVotingConfigExclusionsRequest; import org.opensearch.cluster.ClusterName; @@ -32,6 +33,7 @@ import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; import org.opensearch.test.ClusterServiceUtils; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.test.transport.MockTransport; @@ -141,7 +143,7 @@ public void onFailure(Exception e) { countDownLatch.countDown(); } }; - decommissionService.startDecommissionAction(decommissionAttribute, listener); + decommissionService.startDecommissionAction(new DecommissionRequest(decommissionAttribute), listener); assertTrue(countDownLatch.await(30, TimeUnit.SECONDS)); } @@ -168,7 +170,7 @@ public void onFailure(Exception e) { countDownLatch.countDown(); } }; - decommissionService.startDecommissionAction(decommissionAttribute, listener); + decommissionService.startDecommissionAction(new DecommissionRequest(decommissionAttribute), listener); assertTrue(countDownLatch.await(30, TimeUnit.SECONDS)); } @@ -193,7 +195,7 @@ public void onFailure(Exception e) { countDownLatch.countDown(); } }; - decommissionService.startDecommissionAction(decommissionAttribute, listener); + decommissionService.startDecommissionAction(new DecommissionRequest(decommissionAttribute), listener); assertTrue(countDownLatch.await(30, TimeUnit.SECONDS)); } @@ -218,7 +220,7 @@ public void onFailure(Exception e) { countDownLatch.countDown(); } }; - decommissionService.startDecommissionAction(decommissionAttribute, listener); + decommissionService.startDecommissionAction(new DecommissionRequest(decommissionAttribute), listener); assertTrue(countDownLatch.await(30, TimeUnit.SECONDS)); } @@ -255,10 +257,62 @@ public void onFailure(Exception e) { countDownLatch.countDown(); } }; - decommissionService.startDecommissionAction(new DecommissionAttribute("zone", "zone_2"), listener); + DecommissionRequest request = new DecommissionRequest(new DecommissionAttribute("zone", "zone_2")); + decommissionService.startDecommissionAction(request, listener); assertTrue(countDownLatch.await(30, TimeUnit.SECONDS)); } + public void testScheduleNodesDecommissionOnTimeout() { + TransportService mockTransportService = Mockito.mock(TransportService.class); + ThreadPool mockThreadPool = Mockito.mock(ThreadPool.class); + Mockito.when(mockTransportService.getLocalNode()).thenReturn(Mockito.mock(DiscoveryNode.class)); + Mockito.when(mockTransportService.getThreadPool()).thenReturn(mockThreadPool); + DecommissionService decommissionService = new DecommissionService( + Settings.EMPTY, + clusterSettings, + clusterService, + mockTransportService, + threadPool, + allocationService + ); + DecommissionAttribute decommissionAttribute = new DecommissionAttribute("zone", "zone-2"); + DecommissionAttributeMetadata decommissionAttributeMetadata = new DecommissionAttributeMetadata( + decommissionAttribute, + DecommissionStatus.DRAINING + ); + Metadata metadata = Metadata.builder().putCustom(DecommissionAttributeMetadata.TYPE, decommissionAttributeMetadata).build(); + ClusterState state = ClusterState.builder(new ClusterName("test")).metadata(metadata).build(); + + DiscoveryNode decommissionedNode1 = Mockito.mock(DiscoveryNode.class); + DiscoveryNode decommissionedNode2 = Mockito.mock(DiscoveryNode.class); + + setState(clusterService, state); + decommissionService.scheduleNodesDecommissionOnTimeout( + Set.of(decommissionedNode1, decommissionedNode2), + DecommissionRequest.DEFAULT_NODE_DRAINING_TIMEOUT + ); + + Mockito.verify(mockThreadPool).schedule(Mockito.any(Runnable.class), Mockito.any(TimeValue.class), Mockito.anyString()); + } + + public void testDrainNodesWithDecommissionedAttributeWithNoDelay() { + DecommissionAttribute decommissionAttribute = new DecommissionAttribute("zone", "zone-2"); + DecommissionAttributeMetadata decommissionAttributeMetadata = new DecommissionAttributeMetadata( + decommissionAttribute, + DecommissionStatus.INIT + ); + + Metadata metadata = Metadata.builder().putCustom(DecommissionAttributeMetadata.TYPE, decommissionAttributeMetadata).build(); + ClusterState state = ClusterState.builder(new ClusterName("test")).metadata(metadata).build(); + + DecommissionRequest request = new DecommissionRequest(decommissionAttribute); + request.setNoDelay(true); + + setState(clusterService, state); + decommissionService.drainNodesWithDecommissionedAttribute(request); + + } + public void testClearClusterDecommissionState() throws InterruptedException { final CountDownLatch countDownLatch = new CountDownLatch(1); DecommissionAttribute decommissionAttribute = new DecommissionAttribute("zone", "zone-2"); diff --git a/server/src/test/java/org/opensearch/rest/action/admin/cluster/RestDecommissionActionTests.java b/server/src/test/java/org/opensearch/rest/action/admin/cluster/RestDecommissionActionTests.java index b724de0bd5cc6..bbb21ff8f816c 100644 --- a/server/src/test/java/org/opensearch/rest/action/admin/cluster/RestDecommissionActionTests.java +++ b/server/src/test/java/org/opensearch/rest/action/admin/cluster/RestDecommissionActionTests.java @@ -32,12 +32,43 @@ public void testCreateRequest() throws IOException { Map params = new HashMap<>(); params.put("awareness_attribute_name", "zone"); params.put("awareness_attribute_value", "zone-1"); + params.put("draining_timeout", "60s"); RestRequest deprecatedRequest = buildRestRequest(params); DecommissionRequest request = action.createRequest(deprecatedRequest); assertEquals(request.getDecommissionAttribute().attributeName(), "zone"); assertEquals(request.getDecommissionAttribute().attributeValue(), "zone-1"); + assertEquals(request.getDelayTimeout().getSeconds(), 120); + assertEquals(deprecatedRequest.getHttpRequest().method(), RestRequest.Method.PUT); + } + + public void testCreateRequestWithDefaultTimeout() throws IOException { + Map params = new HashMap<>(); + params.put("awareness_attribute_name", "zone"); + params.put("awareness_attribute_value", "zone-1"); + + RestRequest deprecatedRequest = buildRestRequest(params); + + DecommissionRequest request = action.createRequest(deprecatedRequest); + assertEquals(request.getDecommissionAttribute().attributeName(), "zone"); + assertEquals(request.getDecommissionAttribute().attributeValue(), "zone-1"); + assertEquals(request.getDelayTimeout().getSeconds(), DecommissionRequest.DEFAULT_NODE_DRAINING_TIMEOUT.getSeconds()); + assertEquals(deprecatedRequest.getHttpRequest().method(), RestRequest.Method.PUT); + } + + public void testCreateRequestWithNoDelay() throws IOException { + Map params = new HashMap<>(); + params.put("awareness_attribute_name", "zone"); + params.put("awareness_attribute_value", "zone-1"); + params.put("no_delay", "true"); + + RestRequest deprecatedRequest = buildRestRequest(params); + + DecommissionRequest request = action.createRequest(deprecatedRequest); + assertEquals(request.getDecommissionAttribute().attributeName(), "zone"); + assertEquals(request.getDecommissionAttribute().attributeValue(), "zone-1"); + assertEquals(request.getDelayTimeout().getSeconds(), 0); assertEquals(deprecatedRequest.getHttpRequest().method(), RestRequest.Method.PUT); }