Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Gracefully handle concurrent zone decommission action #5542

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
767803e
Control concurrency and handle retries
imRishN Dec 13, 2022
743ca01
Fix spotless check
imRishN Dec 13, 2022
dbb404e
Add changelog
imRishN Dec 13, 2022
ee90987
Add request timeout param
imRishN Dec 14, 2022
5870737
Changes
imRishN Dec 16, 2022
8dd5899
Merge remote-tracking branch 'upstream/main' into decommission/handle…
imRishN Dec 16, 2022
4946914
Fix spotless check
imRishN Dec 16, 2022
4f5e73c
Fix
imRishN Dec 16, 2022
378e3d2
Refactor
imRishN Dec 27, 2022
dd1b19f
Refactor
imRishN Dec 27, 2022
e4107d5
Add test for request
imRishN Jan 5, 2023
7954fb2
Merge branch 'main' of https://github.com/opensearch-project/OpenSear…
imRishN Jan 5, 2023
3caed7b
Add tests for controller
imRishN Jan 5, 2023
8e4ab4d
Test for retry
imRishN Jan 5, 2023
a5c4e08
Fix spotless check
imRishN Jan 5, 2023
51743ce
Move check at rest layer
imRishN Jan 5, 2023
b63baf7
Fix spotless check
imRishN Jan 5, 2023
b8b1434
Minor fix
imRishN Jan 5, 2023
ad6207f
Fix spotless check
imRishN Jan 5, 2023
4801501
Fix
imRishN Jan 5, 2023
3a88fb1
Merge remote-tracking branch 'upstream/main' into decommission/handle…
imRishN Jan 9, 2023
eb5c393
Remove retry flag and use original flag
imRishN Jan 9, 2023
ed6bffb
Fix spotless check
imRishN Jan 9, 2023
c68cdc7
Refactor code
imRishN Jan 9, 2023
1368ff4
Clean up
imRishN Jan 9, 2023
8ddd128
Empty-Commit
imRishN Jan 9, 2023
aa5c4b4
Empty-Commit
imRishN Jan 9, 2023
3993de9
Empty-Commit
imRishN Jan 9, 2023
afe5f46
Cleanup
imRishN Jan 10, 2023
61c9e5d
Fix
imRishN Jan 10, 2023
0e8b76c
Throw exception in line
imRishN Jan 10, 2023
7395dff
Fixes
imRishN Jan 10, 2023
319b4b1
Add IT for concurrency
imRishN Jan 10, 2023
c46992c
Add request id to decommission request
imRishN Jan 10, 2023
d4670b7
Fix
imRishN Jan 10, 2023
7ecbe6e
Fix
imRishN Jan 10, 2023
88ec79d
Fix
imRishN Jan 10, 2023
2b40e12
Resolve comments
imRishN Jan 10, 2023
a6d0c3b
Fix test
imRishN Jan 10, 2023
927d0cc
Test fix
imRishN Jan 10, 2023
539d77a
Merge remote-tracking branch 'upstream/main' into decommission/handle…
imRishN Jan 10, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Add IT for concurrency
Signed-off-by: Rishab Nahata <rnnahata@amazon.com>
  • Loading branch information
imRishN committed Jan 10, 2023
commit 319b4b107eed7d466b03a24e3605c2ea573511c7
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@
import org.opensearch.test.MockLogAppender;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.transport.MockTransportService;
import org.opensearch.threadpool.TestThreadPool;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.RemoteTransportException;
import org.opensearch.transport.Transport;
import org.opensearch.transport.TransportService;
Expand All @@ -59,6 +61,7 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Predicate;

import static org.opensearch.test.NodeRoles.onlyRole;
Expand Down Expand Up @@ -961,6 +964,114 @@ public void testDecommissionAcknowledgedIfWeightsNotSetForNonRoutingNode() throw
ensureStableCluster(6, TimeValue.timeValueMinutes(2));
}

public void testConcurrentDecommissionAction() throws Exception {
Settings commonSettings = Settings.builder()
.put("cluster.routing.allocation.awareness.attributes", "zone")
.put("cluster.routing.allocation.awareness.force.zone.values", "a,b,c")
.build();

logger.info("--> start 3 cluster manager nodes on zones 'a' & 'b' & 'c'");
internalCluster().startNodes(
Settings.builder()
.put(commonSettings)
.put("node.attr.zone", "a")
.put(onlyRole(commonSettings, DiscoveryNodeRole.CLUSTER_MANAGER_ROLE))
.build(),
Settings.builder()
.put(commonSettings)
.put("node.attr.zone", "b")
.put(onlyRole(commonSettings, DiscoveryNodeRole.CLUSTER_MANAGER_ROLE))
.build(),
Settings.builder()
.put(commonSettings)
.put("node.attr.zone", "c")
.put(onlyRole(commonSettings, DiscoveryNodeRole.CLUSTER_MANAGER_ROLE))
.build()
);
logger.info("--> start 3 data nodes on zones 'a' & 'b' & 'c'");
internalCluster().startNodes(
Settings.builder()
.put(commonSettings)
.put("node.attr.zone", "a")
.put(onlyRole(commonSettings, DiscoveryNodeRole.DATA_ROLE))
.build(),
Settings.builder()
.put(commonSettings)
.put("node.attr.zone", "b")
.put(onlyRole(commonSettings, DiscoveryNodeRole.DATA_ROLE))
.build(),
Settings.builder()
.put(commonSettings)
.put("node.attr.zone", "c")
.put(onlyRole(commonSettings, DiscoveryNodeRole.DATA_ROLE))
.build()
);

ensureStableCluster(6);
ClusterHealthResponse health = client().admin()
.cluster()
.prepareHealth()
.setWaitForEvents(Priority.LANGUID)
.setWaitForGreenStatus()
.setWaitForNodes(Integer.toString(6))
.execute()
.actionGet();
assertFalse(health.isTimedOut());

logger.info("--> setting shard routing weights for weighted round robin");
Map<String, Double> weights = Map.of("a", 0.0, "b", 1.0, "c", 1.0);
WeightedRouting weightedRouting = new WeightedRouting("zone", weights);

ClusterPutWeightedRoutingResponse weightedRoutingResponse = client().admin()
.cluster()
.prepareWeightedRouting()
.setWeightedRouting(weightedRouting)
.setVersion(-1)
.get();
assertTrue(weightedRoutingResponse.isAcknowledged());

AtomicInteger numRequestAcknowledged = new AtomicInteger();
AtomicInteger numRequestUnAcknowledged = new AtomicInteger();
AtomicInteger numRequestFailed = new AtomicInteger();
int concurrentRuns = randomIntBetween(5, 10);
TestThreadPool testThreadPool = null;
logger.info("--> starting {} concurrent decommission action in zone {}", concurrentRuns, 'a');
try {
testThreadPool = new TestThreadPool(AwarenessAttributeDecommissionIT.class.getName());
List<Runnable> operationThreads = new ArrayList<>();
CountDownLatch countDownLatch = new CountDownLatch(concurrentRuns);
for (int i = 0; i < concurrentRuns; i++) {
Runnable thread = () -> {
logger.info("Triggering decommission action");
DecommissionAttribute decommissionAttribute = new DecommissionAttribute("zone", "a");
DecommissionRequest decommissionRequest = new DecommissionRequest(decommissionAttribute);
decommissionRequest.setNoDelay(true);
try {
DecommissionResponse decommissionResponse = client().execute(DecommissionAction.INSTANCE, decommissionRequest)
.get();
if (decommissionResponse.isAcknowledged()) {
numRequestAcknowledged.incrementAndGet();
} else {
numRequestUnAcknowledged.incrementAndGet();
}
} catch (Exception e) {
numRequestFailed.incrementAndGet();
}
countDownLatch.countDown();
};
operationThreads.add(thread);
}
TestThreadPool finalTestThreadPool = testThreadPool;
operationThreads.forEach(runnable -> finalTestThreadPool.executor("generic").execute(runnable));
countDownLatch.await();
} finally {
ThreadPool.terminate(testThreadPool, 500, TimeUnit.MILLISECONDS);
}
assertEquals(concurrentRuns, numRequestAcknowledged.get() + numRequestUnAcknowledged.get() + numRequestFailed.get());
assertEquals(concurrentRuns - 1, numRequestFailed.get());
assertEquals(1, numRequestAcknowledged.get() + numRequestUnAcknowledged.get());
}

private static class WaitForFailedDecommissionState implements ClusterStateObserver.Listener {

final CountDownLatch doneLatch;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -469,9 +469,7 @@ private static void ensureEligibleRequest(
switch (decommissionAttributeMetadata.status()) {
// for INIT - check if it is eligible internal retry
case INIT:
msg = (decommissionRequest.originalRequest() == false)
? "same request is already in status [INIT]"
: null;
msg = (decommissionRequest.originalRequest() == false) ? "same request is already in status [INIT]" : null;
throw new DecommissioningFailedException(requestedDecommissionAttribute, msg);
// for FAILED - we are good to process it again
case FAILED:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,10 +230,7 @@ public void onFailure(Exception e) {
assertTrue(countDownLatch.await(30, TimeUnit.SECONDS));
MatcherAssert.assertThat("Expected onFailure to be called", exceptionReference.get(), notNullValue());
MatcherAssert.assertThat(exceptionReference.get(), instanceOf(DecommissioningFailedException.class));
MatcherAssert.assertThat(
exceptionReference.get().getMessage(),
containsString("same request is already in status [INIT]")
);
MatcherAssert.assertThat(exceptionReference.get().getMessage(), containsString("same request is already in status [INIT]"));
}

@SuppressWarnings("unchecked")
Expand Down