Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Gracefully handle concurrent zone decommission action #5542

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
767803e
Control concurrency and handle retries
imRishN Dec 13, 2022
743ca01
Fix spotless check
imRishN Dec 13, 2022
dbb404e
Add changelog
imRishN Dec 13, 2022
ee90987
Add request timeout param
imRishN Dec 14, 2022
5870737
Changes
imRishN Dec 16, 2022
8dd5899
Merge remote-tracking branch 'upstream/main' into decommission/handle…
imRishN Dec 16, 2022
4946914
Fix spotless check
imRishN Dec 16, 2022
4f5e73c
Fix
imRishN Dec 16, 2022
378e3d2
Refactor
imRishN Dec 27, 2022
dd1b19f
Refactor
imRishN Dec 27, 2022
e4107d5
Add test for request
imRishN Jan 5, 2023
7954fb2
Merge branch 'main' of https://github.com/opensearch-project/OpenSear…
imRishN Jan 5, 2023
3caed7b
Add tests for controller
imRishN Jan 5, 2023
8e4ab4d
Test for retry
imRishN Jan 5, 2023
a5c4e08
Fix spotless check
imRishN Jan 5, 2023
51743ce
Move check at rest layer
imRishN Jan 5, 2023
b63baf7
Fix spotless check
imRishN Jan 5, 2023
b8b1434
Minor fix
imRishN Jan 5, 2023
ad6207f
Fix spotless check
imRishN Jan 5, 2023
4801501
Fix
imRishN Jan 5, 2023
3a88fb1
Merge remote-tracking branch 'upstream/main' into decommission/handle…
imRishN Jan 9, 2023
eb5c393
Remove retry flag and use original flag
imRishN Jan 9, 2023
ed6bffb
Fix spotless check
imRishN Jan 9, 2023
c68cdc7
Refactor code
imRishN Jan 9, 2023
1368ff4
Clean up
imRishN Jan 9, 2023
8ddd128
Empty-Commit
imRishN Jan 9, 2023
aa5c4b4
Empty-Commit
imRishN Jan 9, 2023
3993de9
Empty-Commit
imRishN Jan 9, 2023
afe5f46
Cleanup
imRishN Jan 10, 2023
61c9e5d
Fix
imRishN Jan 10, 2023
0e8b76c
Throw exception in line
imRishN Jan 10, 2023
7395dff
Fixes
imRishN Jan 10, 2023
319b4b1
Add IT for concurrency
imRishN Jan 10, 2023
c46992c
Add request id to decommission request
imRishN Jan 10, 2023
d4670b7
Fix
imRishN Jan 10, 2023
7ecbe6e
Fix
imRishN Jan 10, 2023
88ec79d
Fix
imRishN Jan 10, 2023
2b40e12
Resolve comments
imRishN Jan 10, 2023
a6d0c3b
Fix test
imRishN Jan 10, 2023
927d0cc
Test fix
imRishN Jan 10, 2023
539d77a
Merge remote-tracking branch 'upstream/main' into decommission/handle…
imRishN Jan 10, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Changed http code on create index API with bad input raising NotXContentException from 500 to 400 ([#4773](https://github.com/opensearch-project/OpenSearch/pull/4773))
- Change http code for DecommissioningFailedException from 500 to 400 ([#5283](https://github.com/opensearch-project/OpenSearch/pull/5283))
- Pre conditions check before updating weighted routing metadata ([#4955](https://github.com/opensearch-project/OpenSearch/pull/4955))
- Gracefully handle concurrent zone decommission action ([#5542](https://github.com/opensearch-project/OpenSearch/pull/5542))

### Deprecated

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@
import org.opensearch.test.MockLogAppender;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.transport.MockTransportService;
import org.opensearch.threadpool.TestThreadPool;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.RemoteTransportException;
import org.opensearch.transport.Transport;
import org.opensearch.transport.TransportService;
Expand All @@ -59,6 +61,7 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Predicate;

import static org.opensearch.test.NodeRoles.onlyRole;
Expand Down Expand Up @@ -961,6 +964,114 @@ public void testDecommissionAcknowledgedIfWeightsNotSetForNonRoutingNode() throw
ensureStableCluster(6, TimeValue.timeValueMinutes(2));
}

public void testConcurrentDecommissionAction() throws Exception {
Settings commonSettings = Settings.builder()
.put("cluster.routing.allocation.awareness.attributes", "zone")
.put("cluster.routing.allocation.awareness.force.zone.values", "a,b,c")
.build();

logger.info("--> start 3 cluster manager nodes on zones 'a' & 'b' & 'c'");
internalCluster().startNodes(
Settings.builder()
.put(commonSettings)
.put("node.attr.zone", "a")
.put(onlyRole(commonSettings, DiscoveryNodeRole.CLUSTER_MANAGER_ROLE))
.build(),
Settings.builder()
.put(commonSettings)
.put("node.attr.zone", "b")
.put(onlyRole(commonSettings, DiscoveryNodeRole.CLUSTER_MANAGER_ROLE))
.build(),
Settings.builder()
.put(commonSettings)
.put("node.attr.zone", "c")
.put(onlyRole(commonSettings, DiscoveryNodeRole.CLUSTER_MANAGER_ROLE))
.build()
);
logger.info("--> start 3 data nodes on zones 'a' & 'b' & 'c'");
internalCluster().startNodes(
Settings.builder()
.put(commonSettings)
.put("node.attr.zone", "a")
.put(onlyRole(commonSettings, DiscoveryNodeRole.DATA_ROLE))
.build(),
Settings.builder()
.put(commonSettings)
.put("node.attr.zone", "b")
.put(onlyRole(commonSettings, DiscoveryNodeRole.DATA_ROLE))
.build(),
Settings.builder()
.put(commonSettings)
.put("node.attr.zone", "c")
.put(onlyRole(commonSettings, DiscoveryNodeRole.DATA_ROLE))
.build()
);

ensureStableCluster(6);
ClusterHealthResponse health = client().admin()
.cluster()
.prepareHealth()
.setWaitForEvents(Priority.LANGUID)
.setWaitForGreenStatus()
.setWaitForNodes(Integer.toString(6))
.execute()
.actionGet();
assertFalse(health.isTimedOut());

logger.info("--> setting shard routing weights for weighted round robin");
Map<String, Double> weights = Map.of("a", 0.0, "b", 1.0, "c", 1.0);
WeightedRouting weightedRouting = new WeightedRouting("zone", weights);

ClusterPutWeightedRoutingResponse weightedRoutingResponse = client().admin()
.cluster()
.prepareWeightedRouting()
.setWeightedRouting(weightedRouting)
.setVersion(-1)
.get();
assertTrue(weightedRoutingResponse.isAcknowledged());

AtomicInteger numRequestAcknowledged = new AtomicInteger();
AtomicInteger numRequestUnAcknowledged = new AtomicInteger();
AtomicInteger numRequestFailed = new AtomicInteger();
int concurrentRuns = randomIntBetween(5, 10);
TestThreadPool testThreadPool = null;
logger.info("--> starting {} concurrent decommission action in zone {}", concurrentRuns, 'a');
try {
testThreadPool = new TestThreadPool(AwarenessAttributeDecommissionIT.class.getName());
List<Runnable> operationThreads = new ArrayList<>();
CountDownLatch countDownLatch = new CountDownLatch(concurrentRuns);
for (int i = 0; i < concurrentRuns; i++) {
Runnable thread = () -> {
logger.info("Triggering decommission action");
DecommissionAttribute decommissionAttribute = new DecommissionAttribute("zone", "a");
DecommissionRequest decommissionRequest = new DecommissionRequest(decommissionAttribute);
decommissionRequest.setNoDelay(true);
try {
DecommissionResponse decommissionResponse = client().execute(DecommissionAction.INSTANCE, decommissionRequest)
.get();
if (decommissionResponse.isAcknowledged()) {
numRequestAcknowledged.incrementAndGet();
} else {
numRequestUnAcknowledged.incrementAndGet();
}
} catch (Exception e) {
numRequestFailed.incrementAndGet();
}
countDownLatch.countDown();
};
operationThreads.add(thread);
}
TestThreadPool finalTestThreadPool = testThreadPool;
operationThreads.forEach(runnable -> finalTestThreadPool.executor("generic").execute(runnable));
countDownLatch.await();
} finally {
ThreadPool.terminate(testThreadPool, 500, TimeUnit.MILLISECONDS);
}
assertEquals(concurrentRuns, numRequestAcknowledged.get() + numRequestUnAcknowledged.get() + numRequestFailed.get());
assertEquals(concurrentRuns - 1, numRequestFailed.get());
assertEquals(1, numRequestAcknowledged.get() + numRequestUnAcknowledged.get());
}

private static class WaitForFailedDecommissionState implements ClusterStateObserver.Listener {

final CountDownLatch doneLatch;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@
import static org.opensearch.action.ValidateActions.addValidationError;

/**
* Register decommission request.
* <p>
* Registers a decommission request with decommission attribute and timeout
*
* @opensearch.internal
Expand All @@ -32,7 +30,7 @@ public class DecommissionRequest extends ClusterManagerNodeRequest<DecommissionR
public static final TimeValue DEFAULT_NODE_DRAINING_TIMEOUT = TimeValue.timeValueSeconds(120);

private DecommissionAttribute decommissionAttribute;

private String requestID;
private TimeValue delayTimeout = DEFAULT_NODE_DRAINING_TIMEOUT;

// holder for no_delay param. To avoid draining time timeout.
Expand All @@ -49,6 +47,7 @@ public DecommissionRequest(StreamInput in) throws IOException {
decommissionAttribute = new DecommissionAttribute(in);
this.delayTimeout = in.readTimeValue();
this.noDelay = in.readBoolean();
this.requestID = in.readOptionalString();
}

@Override
Expand All @@ -57,6 +56,7 @@ public void writeTo(StreamOutput out) throws IOException {
decommissionAttribute.writeTo(out);
out.writeTimeValue(delayTimeout);
out.writeBoolean(noDelay);
out.writeOptionalString(requestID);
}

/**
Expand All @@ -77,25 +77,45 @@ public DecommissionAttribute getDecommissionAttribute() {
return this.decommissionAttribute;
}

public void setDelayTimeout(TimeValue delayTimeout) {
public DecommissionRequest setDelayTimeout(TimeValue delayTimeout) {
this.delayTimeout = delayTimeout;
return this;
}

public TimeValue getDelayTimeout() {
return this.delayTimeout;
}

public void setNoDelay(boolean noDelay) {
public DecommissionRequest setNoDelay(boolean noDelay) {
if (noDelay) {
this.delayTimeout = TimeValue.ZERO;
}
this.noDelay = noDelay;
return this;
}

public boolean isNoDelay() {
return noDelay;
}

/**
* Sets id for decommission request
*
* @param requestID uuid for request
* @return this request
*/
public DecommissionRequest setRequestID(String requestID) {
this.requestID = requestID;
return this;
}

/**
* @return Returns id of decommission request
*/
public String requestID() {
return requestID;
}

@Override
public ActionRequestValidationException validate() {
ActionRequestValidationException validationException = null;
Expand All @@ -122,6 +142,13 @@ public ActionRequestValidationException validate() {

@Override
public String toString() {
return "DecommissionRequest{" + "decommissionAttribute=" + decommissionAttribute + '}';
return "DecommissionRequest{"
+ "decommissionAttribute="
+ decommissionAttribute
+ ", delayTimeout="
+ delayTimeout
+ ", noDelay="
+ noDelay
+ '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,4 +46,15 @@ public DecommissionRequestBuilder setNoDelay(boolean noDelay) {
request.setNoDelay(noDelay);
return this;
}

/**
* Sets request id for decommission request
*
* @param requestID for decommission request
* @return current object
*/
public DecommissionRequestBuilder requestID(String requestID) {
request.setRequestID(requestID);
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public class DecommissionAttributeMetadata extends AbstractNamedDiffable<Custom>

private final DecommissionAttribute decommissionAttribute;
private DecommissionStatus status;
private String requestID;
public static final String attributeType = "awareness";

/**
Expand All @@ -45,18 +46,19 @@ public class DecommissionAttributeMetadata extends AbstractNamedDiffable<Custom>
* @param decommissionAttribute attribute details
* @param status current status of the attribute decommission
*/
public DecommissionAttributeMetadata(DecommissionAttribute decommissionAttribute, DecommissionStatus status) {
public DecommissionAttributeMetadata(DecommissionAttribute decommissionAttribute, DecommissionStatus status, String requestId) {
this.decommissionAttribute = decommissionAttribute;
this.status = status;
this.requestID = requestId;
}

/**
* Constructs new decommission attribute metadata with status as {@link DecommissionStatus#INIT}
* Constructs new decommission attribute metadata with status as {@link DecommissionStatus#INIT} and request id
*
* @param decommissionAttribute attribute details
*/
public DecommissionAttributeMetadata(DecommissionAttribute decommissionAttribute) {
this(decommissionAttribute, DecommissionStatus.INIT);
public DecommissionAttributeMetadata(DecommissionAttribute decommissionAttribute, String requestID) {
this(decommissionAttribute, DecommissionStatus.INIT, requestID);
}

/**
Expand All @@ -77,6 +79,15 @@ public DecommissionStatus status() {
return this.status;
}

/**
* Returns the request id of the decommission
*
* @return request id
*/
public String requestID() {
return this.requestID;
}

/**
* Returns instance of the metadata with updated status
* @param newStatus status to be updated with
Expand Down Expand Up @@ -128,12 +139,13 @@ public boolean equals(Object o) {
DecommissionAttributeMetadata that = (DecommissionAttributeMetadata) o;

if (!status.equals(that.status)) return false;
if (!requestID.equals(that.requestID)) return false;
return decommissionAttribute.equals(that.decommissionAttribute);
}

@Override
public int hashCode() {
return Objects.hash(attributeType, decommissionAttribute, status);
return Objects.hash(attributeType, decommissionAttribute, status, requestID);
}

/**
Expand All @@ -152,6 +164,7 @@ public Version getMinimalSupportedVersion() {
public DecommissionAttributeMetadata(StreamInput in) throws IOException {
this.decommissionAttribute = new DecommissionAttribute(in);
this.status = DecommissionStatus.fromString(in.readString());
this.requestID = in.readString();
imRishN marked this conversation as resolved.
Show resolved Hide resolved
}

public static NamedDiff<Custom> readDiffFrom(StreamInput in) throws IOException {
Expand All @@ -165,12 +178,14 @@ public static NamedDiff<Custom> readDiffFrom(StreamInput in) throws IOException
public void writeTo(StreamOutput out) throws IOException {
decommissionAttribute.writeTo(out);
out.writeString(status.status());
out.writeString(requestID);
}

public static DecommissionAttributeMetadata fromXContent(XContentParser parser) throws IOException {
XContentParser.Token token;
DecommissionAttribute decommissionAttribute = null;
DecommissionStatus status = null;
String requestID = null;
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
String currentFieldName = parser.currentName();
Expand Down Expand Up @@ -210,6 +225,13 @@ public static DecommissionAttributeMetadata fromXContent(XContentParser parser)
);
}
status = DecommissionStatus.fromString(parser.text());
} else if ("requestID".equals(currentFieldName)) {
if (parser.nextToken() != XContentParser.Token.VALUE_STRING) {
throw new OpenSearchParseException(
"failed to parse status of decommissioning, expected string but found unknown type"
);
}
requestID = parser.text();
} else {
throw new OpenSearchParseException(
"unknown field found [{}], failed to parse the decommission attribute",
Expand All @@ -218,15 +240,15 @@ public static DecommissionAttributeMetadata fromXContent(XContentParser parser)
}
}
}
return new DecommissionAttributeMetadata(decommissionAttribute, status);
return new DecommissionAttributeMetadata(decommissionAttribute, status, requestID);
}

/**
* {@inheritDoc}
*/
@Override
public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException {
toXContent(decommissionAttribute, status, attributeType, builder, params);
toXContent(decommissionAttribute, status, requestID, attributeType, builder, params);
return builder;
}

Expand All @@ -245,6 +267,7 @@ public EnumSet<Metadata.XContentContext> context() {
public static void toXContent(
DecommissionAttribute decommissionAttribute,
DecommissionStatus status,
String requestID,
String attributeType,
XContentBuilder builder,
ToXContent.Params params
Expand All @@ -253,6 +276,7 @@ public static void toXContent(
builder.field(decommissionAttribute.attributeName(), decommissionAttribute.attributeValue());
builder.endObject();
builder.field("status", status.status());
builder.field("requestID", requestID);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,8 @@ public ClusterState execute(ClusterState currentState) {
decommissionAttributeMetadata.validateNewStatus(decommissionStatus);
decommissionAttributeMetadata = new DecommissionAttributeMetadata(
decommissionAttributeMetadata.decommissionAttribute(),
decommissionStatus
decommissionStatus,
decommissionAttributeMetadata.requestID()
);
ClusterState newState = ClusterState.builder(currentState)
.metadata(Metadata.builder(currentState.metadata()).decommissionAttributeMetadata(decommissionAttributeMetadata))
Expand Down
Loading