From e0ddf524e705097a5e52ddbc9f7045680a38ffb2 Mon Sep 17 00:00:00 2001 From: Anshu Agarwal Date: Mon, 9 Jan 2023 13:36:52 +0530 Subject: [PATCH] [Weighted Shard Routing] API versioning (#5255) * Support API versioning for weighted shard routing Signed-off-by: Anshu Agarwal --- CHANGELOG.md | 1 + .../AwarenessAttributeDecommissionRestIT.java | 1 + .../AwarenessAttributeDecommissionIT.java | 7 + .../cluster/routing/WeightedRoutingIT.java | 121 +++++++++++++++++- .../search/SearchWeightedRoutingIT.java | 3 +- .../ClusterDeleteWeightedRoutingRequest.java | 109 +++++++++++++++- ...erDeleteWeightedRoutingRequestBuilder.java | 11 ++ .../ClusterGetWeightedRoutingResponse.java | 82 +++++++++--- .../TransportGetWeightedRoutingAction.java | 3 +- .../put/ClusterPutWeightedRoutingRequest.java | 69 ++++++++-- ...usterPutWeightedRoutingRequestBuilder.java | 4 + .../java/org/opensearch/client/Requests.java | 4 +- .../metadata/WeightedRoutingMetadata.java | 66 +++++++--- .../cluster/routing/OperationRouting.java | 2 +- .../cluster/routing/WeightedRouting.java | 10 ++ .../routing/WeightedRoutingService.java | 67 +++++++--- ...estClusterDeleteWeightedRoutingAction.java | 20 ++- .../RestClusterPutWeightedRoutingAction.java | 2 +- ...ClusterPutWeightedRoutingRequestTests.java | 21 ++- ...lusterGetWeightedRoutingResponseTests.java | 2 +- ...ransportGetWeightedRoutingActionTests.java | 2 +- .../DecommissionServiceTests.java | 2 +- .../WeightedRoutingMetadataTests.java | 2 +- .../routing/OperationRoutingTests.java | 4 +- .../routing/WeightedRoutingServiceTests.java | 6 +- ...tClusterAddWeightedRoutingActionTests.java | 11 +- ...usterDeleteWeightedRoutingActionTests.java | 79 ++++++++++++ 27 files changed, 612 insertions(+), 99 deletions(-) create mode 100644 server/src/test/java/org/opensearch/rest/action/admin/cluster/RestClusterDeleteWeightedRoutingActionTests.java diff --git a/CHANGELOG.md b/CHANGELOG.md index a883658845936..f35d42e8398ce 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -20,6 +20,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Added experimental support for extensions ([#5347](https://github.com/opensearch-project/OpenSearch/pull/5347)), ([#5518](https://github.com/opensearch-project/OpenSearch/pull/5518), ([#5597](https://github.com/opensearch-project/OpenSearch/pull/5597)), ([#5615](https://github.com/opensearch-project/OpenSearch/pull/5615))) - Add CI bundle pattern to distribution download ([#5348](https://github.com/opensearch-project/OpenSearch/pull/5348)) - Add support for ppc64le architecture ([#5459](https://github.com/opensearch-project/OpenSearch/pull/5459)) +- Support versioning for Weighted routing apis([#5255](https://github.com/opensearch-project/OpenSearch/pull/5255)) - Added @gbbafna as an OpenSearch maintainer ([#5668](https://github.com/opensearch-project/OpenSearch/pull/5668)) - Add support for discovered cluster manager and remove local weights ([#5680](https://github.com/opensearch-project/OpenSearch/pull/5680)) - Added support for feature flags in opensearch.yml ([#4959](https://github.com/opensearch-project/OpenSearch/pull/4959)) diff --git a/qa/smoke-test-http/src/test/java/org/opensearch/http/AwarenessAttributeDecommissionRestIT.java b/qa/smoke-test-http/src/test/java/org/opensearch/http/AwarenessAttributeDecommissionRestIT.java index 4d9115b8962ea..b7228a75984fa 100644 --- a/qa/smoke-test-http/src/test/java/org/opensearch/http/AwarenessAttributeDecommissionRestIT.java +++ b/qa/smoke-test-http/src/test/java/org/opensearch/http/AwarenessAttributeDecommissionRestIT.java @@ -91,6 +91,7 @@ public void testRestStatusForAcknowledgedDecommission() throws IOException { .cluster() .prepareWeightedRouting() .setWeightedRouting(weightedRouting) + .setVersion(-1) .get(); assertTrue(weightedRoutingResponse.isAcknowledged()); diff --git a/server/src/internalClusterTest/java/org/opensearch/cluster/coordination/AwarenessAttributeDecommissionIT.java b/server/src/internalClusterTest/java/org/opensearch/cluster/coordination/AwarenessAttributeDecommissionIT.java index 8dde5f3dc3a93..07580f17a67bc 100644 --- a/server/src/internalClusterTest/java/org/opensearch/cluster/coordination/AwarenessAttributeDecommissionIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/cluster/coordination/AwarenessAttributeDecommissionIT.java @@ -223,6 +223,7 @@ public void testInvariantsAndLogsOnDecommissionedNodes() throws Exception { .cluster() .prepareWeightedRouting() .setWeightedRouting(weightedRouting) + .setVersion(-1) .get(); assertTrue(weightedRoutingResponse.isAcknowledged()); @@ -453,6 +454,7 @@ private void assertNodesRemovedAfterZoneDecommission(boolean originalClusterMana .cluster() .prepareWeightedRouting() .setWeightedRouting(weightedRouting) + .setVersion(-1) .get(); assertTrue(weightedRoutingResponse.isAcknowledged()); @@ -565,6 +567,7 @@ public void testDecommissionFailedWhenDifferentAttributeAlreadyDecommissioned() .cluster() .prepareWeightedRouting() .setWeightedRouting(weightedRouting) + .setVersion(-1) .get(); assertTrue(weightedRoutingResponse.isAcknowledged()); @@ -649,6 +652,7 @@ public void testDecommissionStatusUpdatePublishedToAllNodes() throws ExecutionEx .cluster() .prepareWeightedRouting() .setWeightedRouting(weightedRouting) + .setVersion(-1) .get(); assertTrue(weightedRoutingResponse.isAcknowledged()); @@ -769,6 +773,7 @@ public void testDecommissionFailedWhenAttributeNotWeighedAway() throws Exception .cluster() .prepareWeightedRouting() .setWeightedRouting(weightedRouting) + .setVersion(-1) .get(); assertTrue(weightedRoutingResponse.isAcknowledged()); @@ -809,6 +814,7 @@ public void testDecommissionFailedWithOnlyOneAttributeValueForLeader() throws Ex .cluster() .prepareWeightedRouting() .setWeightedRouting(weightedRouting) + .setVersion(-1) .get(); assertTrue(weightedRoutingResponse.isAcknowledged()); @@ -922,6 +928,7 @@ public void testDecommissionAcknowledgedIfWeightsNotSetForNonRoutingNode() throw .cluster() .prepareWeightedRouting() .setWeightedRouting(weightedRouting) + .setVersion(-1) .get(); assertTrue(weightedRoutingResponse.isAcknowledged()); diff --git a/server/src/internalClusterTest/java/org/opensearch/cluster/routing/WeightedRoutingIT.java b/server/src/internalClusterTest/java/org/opensearch/cluster/routing/WeightedRoutingIT.java index e190db5b73412..d3c0fa9ae73af 100644 --- a/server/src/internalClusterTest/java/org/opensearch/cluster/routing/WeightedRoutingIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/cluster/routing/WeightedRoutingIT.java @@ -8,11 +8,13 @@ package org.opensearch.cluster.routing; +import org.opensearch.ResourceNotFoundException; import org.opensearch.action.admin.cluster.health.ClusterHealthResponse; import org.opensearch.action.admin.cluster.shards.routing.weighted.delete.ClusterDeleteWeightedRoutingResponse; import org.opensearch.action.admin.cluster.shards.routing.weighted.get.ClusterGetWeightedRoutingResponse; import org.opensearch.action.admin.cluster.shards.routing.weighted.put.ClusterPutWeightedRoutingResponse; import org.opensearch.common.settings.Settings; +import org.opensearch.rest.RestStatus; import org.opensearch.snapshots.mockstore.MockRepository; import org.opensearch.test.OpenSearchIntegTestCase; import org.opensearch.test.disruption.NetworkDisruption; @@ -79,6 +81,7 @@ public void testPutWeightedRouting() { .cluster() .prepareWeightedRouting() .setWeightedRouting(weightedRouting) + .setVersion(-1) .get(); assertEquals(response.isAcknowledged(), true); @@ -88,6 +91,7 @@ public void testPutWeightedRouting() { .cluster() .prepareWeightedRouting() .setWeightedRouting(weightedRouting) + .setVersion(0) .get(); assertEquals(response.isAcknowledged(), true); } @@ -215,6 +219,7 @@ public void testGetWeightedRouting_WeightsAreSet() throws IOException { .cluster() .prepareWeightedRouting() .setWeightedRouting(weightedRouting) + .setVersion(-1) .get(); assertEquals(response.isAcknowledged(), true); @@ -304,6 +309,7 @@ public void testGetWeightedRouting_ClusterManagerNotDiscovered() throws Exceptio .cluster() .prepareWeightedRouting() .setWeightedRouting(weightedRouting) + .setVersion(-1) .get(); assertEquals(response.isAcknowledged(), true); @@ -361,6 +367,7 @@ public void testWeightedRoutingMetadataOnOSProcessRestart() throws Exception { .cluster() .prepareWeightedRouting() .setWeightedRouting(weightedRouting) + .setVersion(-1) .get(); assertEquals(response.isAcknowledged(), true); @@ -398,12 +405,14 @@ public void testDeleteWeightedRouting_WeightsNotSet() { assertNull(internalCluster().clusterService().state().metadata().weightedRoutingMetadata()); // delete weighted routing metadata - ClusterDeleteWeightedRoutingResponse deleteResponse = client().admin().cluster().prepareDeleteWeightedRouting().get(); - assertTrue(deleteResponse.isAcknowledged()); - assertNull(internalCluster().clusterService().state().metadata().weightedRoutingMetadata()); + ResourceNotFoundException exception = expectThrows( + ResourceNotFoundException.class, + () -> client().admin().cluster().prepareDeleteWeightedRouting().setVersion(-1).get() + ); + assertEquals(RestStatus.NOT_FOUND, exception.status()); } - public void testDeleteWeightedRouting_WeightsAreSet() { + public void testDeleteWeightedRouting_WeightsAreSet() throws IOException { Settings commonSettings = Settings.builder() .put("cluster.routing.allocation.awareness.attributes", "zone") .put("cluster.routing.allocation.awareness.force.zone.values", "a,b,c") @@ -430,13 +439,111 @@ public void testDeleteWeightedRouting_WeightsAreSet() { .cluster() .prepareWeightedRouting() .setWeightedRouting(weightedRouting) + .setVersion(-1) .get(); - assertEquals(response.isAcknowledged(), true); + assertTrue(response.isAcknowledged()); assertNotNull(internalCluster().clusterService().state().metadata().weightedRoutingMetadata()); // delete weighted routing metadata - ClusterDeleteWeightedRoutingResponse deleteResponse = client().admin().cluster().prepareDeleteWeightedRouting().get(); + ClusterDeleteWeightedRoutingResponse deleteResponse = client().admin().cluster().prepareDeleteWeightedRouting().setVersion(0).get(); assertTrue(deleteResponse.isAcknowledged()); - assertNull(internalCluster().clusterService().state().metadata().weightedRoutingMetadata()); + } + + public void testPutAndDeleteWithVersioning() throws Exception { + Settings commonSettings = Settings.builder() + .put("cluster.routing.allocation.awareness.attributes", "zone") + .put("cluster.routing.allocation.awareness.force.zone.values", "a,b,c") + .build(); + + logger.info("--> starting 6 nodes on different zones"); + int nodeCountPerAZ = 2; + + logger.info("--> starting a dedicated cluster manager node"); + internalCluster().startClusterManagerOnlyNode(Settings.builder().put(commonSettings).build()); + + logger.info("--> starting 1 nodes on zones 'a' & 'b' & 'c'"); + internalCluster().startDataOnlyNodes(nodeCountPerAZ, Settings.builder().put(commonSettings).put("node.attr.zone", "a").build()); + internalCluster().startDataOnlyNodes(nodeCountPerAZ, Settings.builder().put(commonSettings).put("node.attr.zone", "b").build()); + internalCluster().startDataOnlyNodes(nodeCountPerAZ, Settings.builder().put(commonSettings).put("node.attr.zone", "c").build()); + + logger.info("--> waiting for nodes to form a cluster"); + ClusterHealthResponse health = client().admin().cluster().prepareHealth().setWaitForNodes("7").execute().actionGet(); + assertThat(health.isTimedOut(), equalTo(false)); + + ensureGreen(); + + logger.info("--> setting shard routing weights for weighted round robin"); + + Map weights = Map.of("a", 1.0, "b", 2.0, "c", 3.0); + WeightedRouting weightedRouting = new WeightedRouting("zone", weights); + ClusterPutWeightedRoutingResponse response = client().admin() + .cluster() + .prepareWeightedRouting() + .setWeightedRouting(weightedRouting) + .setVersion(-1) + .get(); + assertTrue(response.isAcknowledged()); + assertNotNull(internalCluster().clusterService().state().metadata().weightedRoutingMetadata()); + + // update weights api call with correct version number + weights = Map.of("a", 1.0, "b", 2.0, "c", 4.0); + weightedRouting = new WeightedRouting("zone", weights); + response = client().admin().cluster().prepareWeightedRouting().setWeightedRouting(weightedRouting).setVersion(0).get(); + assertTrue(response.isAcknowledged()); + + // update weights api call with incorrect version number + weights = Map.of("a", 1.0, "b", 2.0, "c", 4.0); + WeightedRouting weightedRouting1 = new WeightedRouting("zone", weights); + UnsupportedWeightedRoutingStateException exception = expectThrows( + UnsupportedWeightedRoutingStateException.class, + () -> client().admin().cluster().prepareWeightedRouting().setWeightedRouting(weightedRouting1).setVersion(100).get() + ); + assertEquals(exception.status(), RestStatus.CONFLICT); + + // get weights call + ClusterGetWeightedRoutingResponse weightedRoutingResponse = client().admin() + .cluster() + .prepareGetWeightedRouting() + .setAwarenessAttribute("zone") + .get(); + + // update weights call using version returned by get api call + weights = Map.of("a", 1.0, "b", 2.0, "c", 5.0); + weightedRouting = new WeightedRouting("zone", weights); + response = client().admin() + .cluster() + .prepareWeightedRouting() + .setWeightedRouting(weightedRouting) + .setVersion(weightedRoutingResponse.getVersion()) + .get(); + assertTrue(response.isAcknowledged()); + assertNotNull(internalCluster().clusterService().state().metadata().weightedRoutingMetadata()); + + // delete weights by awareness attribute + ClusterDeleteWeightedRoutingResponse deleteResponse = client().admin() + .cluster() + .prepareDeleteWeightedRouting() + .setAwarenessAttribute("zone") + .setVersion(2) + .get(); + assertTrue(deleteResponse.isAcknowledged()); + + // update weights again and make sure that version number got updated on delete + weights = Map.of("a", 1.0, "b", 2.0, "c", 6.0); + weightedRouting = new WeightedRouting("zone", weights); + response = client().admin().cluster().prepareWeightedRouting().setWeightedRouting(weightedRouting).setVersion(3).get(); + assertTrue(response.isAcknowledged()); + assertNotNull(internalCluster().clusterService().state().metadata().weightedRoutingMetadata()); + + // delete weights + deleteResponse = client().admin().cluster().prepareDeleteWeightedRouting().setVersion(4).get(); + assertTrue(deleteResponse.isAcknowledged()); + + // delete weights call, incorrect version number + UnsupportedWeightedRoutingStateException deleteException = expectThrows( + UnsupportedWeightedRoutingStateException.class, + () -> client().admin().cluster().prepareDeleteWeightedRouting().setVersion(7).get() + ); + assertEquals(RestStatus.CONFLICT, deleteException.status()); } } diff --git a/server/src/internalClusterTest/java/org/opensearch/search/SearchWeightedRoutingIT.java b/server/src/internalClusterTest/java/org/opensearch/search/SearchWeightedRoutingIT.java index 097775b7ab4ac..80e7f22c47c58 100644 --- a/server/src/internalClusterTest/java/org/opensearch/search/SearchWeightedRoutingIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/search/SearchWeightedRoutingIT.java @@ -86,6 +86,7 @@ public void testSearchWithWRRShardRouting() throws IOException { .cluster() .prepareWeightedRouting() .setWeightedRouting(weightedRouting) + .setVersion(-1) .get(); assertEquals(response.isAcknowledged(), true); @@ -129,7 +130,7 @@ public void testSearchWithWRRShardRouting() throws IOException { logger.info("--> deleted shard routing weights for weighted round robin"); - ClusterDeleteWeightedRoutingResponse deleteResponse = client().admin().cluster().prepareDeleteWeightedRouting().get(); + ClusterDeleteWeightedRoutingResponse deleteResponse = client().admin().cluster().prepareDeleteWeightedRouting().setVersion(0).get(); assertEquals(deleteResponse.isAcknowledged(), true); hitNodes = new HashSet<>(); diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/delete/ClusterDeleteWeightedRoutingRequest.java b/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/delete/ClusterDeleteWeightedRoutingRequest.java index 71eab8ff35a2d..5451cec1db21d 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/delete/ClusterDeleteWeightedRoutingRequest.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/delete/ClusterDeleteWeightedRoutingRequest.java @@ -8,12 +8,26 @@ package org.opensearch.action.admin.cluster.shards.routing.weighted.delete; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.OpenSearchGenerationException; +import org.opensearch.OpenSearchParseException; import org.opensearch.action.ActionRequestValidationException; import org.opensearch.action.support.clustermanager.ClusterManagerNodeRequest; +import org.opensearch.cluster.metadata.WeightedRoutingMetadata; +import org.opensearch.common.bytes.BytesReference; import org.opensearch.common.io.stream.StreamInput; import org.opensearch.common.io.stream.StreamOutput; +import org.opensearch.common.xcontent.DeprecationHandler; +import org.opensearch.common.xcontent.NamedXContentRegistry; +import org.opensearch.common.xcontent.XContentBuilder; +import org.opensearch.common.xcontent.XContentFactory; +import org.opensearch.common.xcontent.XContentHelper; +import org.opensearch.common.xcontent.XContentParser; +import org.opensearch.common.xcontent.XContentType; import java.io.IOException; +import java.util.Map; /** * Request to delete weights for weighted round-robin shard routing policy. @@ -21,10 +35,42 @@ * @opensearch.internal */ public class ClusterDeleteWeightedRoutingRequest extends ClusterManagerNodeRequest { - public ClusterDeleteWeightedRoutingRequest() {} + private static final Logger logger = LogManager.getLogger(ClusterDeleteWeightedRoutingRequest.class); + + private long version; + private String awarenessAttribute; + + public void setVersion(long version) { + this.version = version; + } + + ClusterDeleteWeightedRoutingRequest() { + this.version = WeightedRoutingMetadata.VERSION_UNSET_VALUE; + } public ClusterDeleteWeightedRoutingRequest(StreamInput in) throws IOException { super(in); + version = in.readLong(); + if (in.available() != 0) { + awarenessAttribute = in.readString(); + } + } + + public long getVersion() { + return version; + } + + public String getAwarenessAttribute() { + return awarenessAttribute; + } + + public void setAwarenessAttribute(String awarenessAttribute) { + this.awarenessAttribute = awarenessAttribute; + } + + public ClusterDeleteWeightedRoutingRequest(String awarenessAttribute) { + this.awarenessAttribute = awarenessAttribute; + this.version = WeightedRoutingMetadata.VERSION_UNSET_VALUE; } @Override @@ -32,13 +78,72 @@ public ActionRequestValidationException validate() { return null; } + /** + * @param source weights definition from request body + * @return this request + */ + public ClusterDeleteWeightedRoutingRequest source(Map source) { + try { + if (source.isEmpty()) { + throw new OpenSearchParseException(("Empty request body")); + } + XContentBuilder builder = XContentFactory.jsonBuilder(); + builder.map(source); + setRequestBody(BytesReference.bytes(builder), builder.contentType()); + } catch (IOException e) { + throw new OpenSearchGenerationException("Failed to generate [" + source + "]", e); + } + return this; + } + + public void setRequestBody(BytesReference source, XContentType contentType) { + try ( + XContentParser parser = XContentHelper.createParser( + NamedXContentRegistry.EMPTY, + DeprecationHandler.THROW_UNSUPPORTED_OPERATION, + source, + contentType + ) + ) { + String versionAttr = null; + XContentParser.Token token; + // move to the first alias + parser.nextToken(); + while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { + if (token == XContentParser.Token.FIELD_NAME) { + String fieldName = parser.currentName(); + if (fieldName != null && fieldName.equals(WeightedRoutingMetadata.VERSION)) { + versionAttr = parser.currentName(); + } else { + throw new OpenSearchParseException( + "failed to parse delete weighted routing request body [{}], unknown type", + fieldName + ); + } + } else if (token == XContentParser.Token.VALUE_STRING) { + if (versionAttr != null && versionAttr.equals(WeightedRoutingMetadata.VERSION)) { + this.version = Long.parseLong(parser.text()); + } + } else { + throw new OpenSearchParseException("failed to parse delete weighted routing request body"); + } + } + } catch (IOException e) { + logger.error("error while parsing delete request for weighted routing request object", e); + } + } + @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); + out.writeLong(version); + if (awarenessAttribute != null) { + out.writeString(awarenessAttribute); + } } @Override public String toString() { - return "ClusterDeleteWeightedRoutingRequest"; + return "ClusterDeleteWeightedRoutingRequest{" + "version= " + version + "awarenessAttribute=" + awarenessAttribute + "}"; } } diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/delete/ClusterDeleteWeightedRoutingRequestBuilder.java b/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/delete/ClusterDeleteWeightedRoutingRequestBuilder.java index 19976ac6b07aa..bb34fea589534 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/delete/ClusterDeleteWeightedRoutingRequestBuilder.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/delete/ClusterDeleteWeightedRoutingRequestBuilder.java @@ -24,4 +24,15 @@ public class ClusterDeleteWeightedRoutingRequestBuilder extends ClusterManagerNo public ClusterDeleteWeightedRoutingRequestBuilder(OpenSearchClient client, ClusterDeleteWeightedRoutingAction action) { super(client, action, new ClusterDeleteWeightedRoutingRequest()); } + + public ClusterDeleteWeightedRoutingRequestBuilder setVersion(long version) { + request.setVersion(version); + return this; + } + + public ClusterDeleteWeightedRoutingRequestBuilder setAwarenessAttribute(String attribute) { + request.setAwarenessAttribute(attribute); + return this; + } + } diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/get/ClusterGetWeightedRoutingResponse.java b/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/get/ClusterGetWeightedRoutingResponse.java index e49cc38474802..9a2858f17c53e 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/get/ClusterGetWeightedRoutingResponse.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/get/ClusterGetWeightedRoutingResponse.java @@ -11,6 +11,7 @@ import org.opensearch.OpenSearchParseException; import org.opensearch.action.ActionResponse; +import org.opensearch.cluster.metadata.WeightedRoutingMetadata; import org.opensearch.cluster.routing.WeightedRouting; import org.opensearch.common.io.stream.StreamInput; import org.opensearch.common.io.stream.StreamOutput; @@ -31,33 +32,42 @@ * @opensearch.internal */ public class ClusterGetWeightedRoutingResponse extends ActionResponse implements ToXContentObject { + + private static final String WEIGHTS = "weights"; + private long version; + private final Boolean discoveredClusterManager; + + private static final String DISCOVERED_CLUSTER_MANAGER = "discovered_cluster_manager"; + public WeightedRouting getWeightedRouting() { return weightedRouting; } private final WeightedRouting weightedRouting; + public long getVersion() { + return version; + } + public Boolean getDiscoveredClusterManager() { return discoveredClusterManager; } - private final Boolean discoveredClusterManager; - - private static final String DISCOVERED_CLUSTER_MANAGER = "discovered_cluster_manager"; - ClusterGetWeightedRoutingResponse() { this.weightedRouting = null; this.discoveredClusterManager = null; } - public ClusterGetWeightedRoutingResponse(WeightedRouting weightedRouting, Boolean discoveredClusterManager) { + public ClusterGetWeightedRoutingResponse(WeightedRouting weightedRouting, Boolean discoveredClusterManager, long version) { this.discoveredClusterManager = discoveredClusterManager; this.weightedRouting = weightedRouting; + this.version = version; } ClusterGetWeightedRoutingResponse(StreamInput in) throws IOException { if (in.available() != 0) { this.weightedRouting = new WeightedRouting(in); + this.version = in.readLong(); this.discoveredClusterManager = in.readOptionalBoolean(); } else { this.weightedRouting = null; @@ -78,6 +88,7 @@ public WeightedRouting weights() { public void writeTo(StreamOutput out) throws IOException { if (weightedRouting != null) { weightedRouting.writeTo(out); + out.writeLong(version); } if (discoveredClusterManager != null) { out.writeOptionalBoolean(discoveredClusterManager); @@ -88,9 +99,13 @@ public void writeTo(StreamOutput out) throws IOException { public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); if (this.weightedRouting != null) { + builder.startObject(WEIGHTS); for (Map.Entry entry : weightedRouting.weights().entrySet()) { builder.field(entry.getKey(), entry.getValue().toString()); } + + builder.endObject(); + builder.field(WeightedRoutingMetadata.VERSION, version); if (discoveredClusterManager != null) { builder.field(DISCOVERED_CLUSTER_MANAGER, discoveredClusterManager); } @@ -102,26 +117,59 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws public static ClusterGetWeightedRoutingResponse fromXContent(XContentParser parser) throws IOException { ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser); XContentParser.Token token; - String attrKey = null, attrValue = null; + String attrKey = null, attrValue; Boolean discoveredClusterManager = null; Map weights = new HashMap<>(); + long version = WeightedRoutingMetadata.VERSION_UNSET_VALUE; + String weightsAttr; + String fieldName = null; while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { if (token == XContentParser.Token.FIELD_NAME) { - attrKey = parser.currentName(); - } else if (token == XContentParser.Token.VALUE_STRING) { - attrValue = parser.text(); - if (attrKey != null) { - weights.put(attrKey, Double.parseDouble(attrValue)); + fieldName = parser.currentName(); + if (fieldName != null + && (fieldName.equals(WeightedRoutingMetadata.VERSION) || fieldName.equals(DISCOVERED_CLUSTER_MANAGER))) { + continue; + } else if (fieldName != null && fieldName.equals(WEIGHTS)) { + weightsAttr = parser.currentName(); + } else { + throw new OpenSearchParseException("failed to parse weighted routing request object", fieldName); } - } else if (token == XContentParser.Token.VALUE_BOOLEAN && attrKey != null && attrKey.equals(DISCOVERED_CLUSTER_MANAGER)) { - discoveredClusterManager = Boolean.parseBoolean(parser.text()); - } else { - throw new OpenSearchParseException("failed to parse weighted routing response"); - } + if (parser.nextToken() != XContentParser.Token.START_OBJECT) { + throw new OpenSearchParseException( + "failed to parse weighted routing request object [{}], expected object", + weightsAttr + ); + } + + while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { + if (token == XContentParser.Token.FIELD_NAME) { + attrKey = parser.currentName(); + } else if (token == XContentParser.Token.VALUE_STRING) { + attrValue = parser.text(); + if (attrKey != null) { + weights.put(attrKey, Double.parseDouble(attrValue)); + } + } else { + throw new OpenSearchParseException("failed to parse weighted routing request attribute [{}]", attrKey); + } + } + } else if (token == XContentParser.Token.VALUE_NUMBER + && fieldName != null + && fieldName.equals(WeightedRoutingMetadata.VERSION)) { + version = parser.longValue(); + + } else if (token == XContentParser.Token.VALUE_BOOLEAN + && fieldName != null + && fieldName.equals(DISCOVERED_CLUSTER_MANAGER)) { + discoveredClusterManager = Boolean.parseBoolean(parser.text()); + } else { + throw new OpenSearchParseException("failed to parse weighted routing request"); + } } + WeightedRouting weightedRouting = new WeightedRouting("", weights); - return new ClusterGetWeightedRoutingResponse(weightedRouting, discoveredClusterManager); + return new ClusterGetWeightedRoutingResponse(weightedRouting, discoveredClusterManager, version); } @Override diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/get/TransportGetWeightedRoutingAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/get/TransportGetWeightedRoutingAction.java index 6c7103e45652a..280fca29944e3 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/get/TransportGetWeightedRoutingAction.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/get/TransportGetWeightedRoutingAction.java @@ -92,7 +92,8 @@ protected void clusterManagerOperation( WeightedRouting weightedRouting = weightedRoutingMetadata.getWeightedRouting(); clusterGetWeightedRoutingResponse = new ClusterGetWeightedRoutingResponse( weightedRouting, - state.nodes().getClusterManagerNodeId() != null + state.nodes().getClusterManagerNodeId() != null, + weightedRoutingMetadata.getVersion() ); } listener.onResponse(clusterGetWeightedRoutingResponse); diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/put/ClusterPutWeightedRoutingRequest.java b/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/put/ClusterPutWeightedRoutingRequest.java index 8adbf13a000c5..cba4d0e8e796c 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/put/ClusterPutWeightedRoutingRequest.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/put/ClusterPutWeightedRoutingRequest.java @@ -14,6 +14,7 @@ import org.opensearch.OpenSearchParseException; import org.opensearch.action.ActionRequestValidationException; import org.opensearch.action.support.clustermanager.ClusterManagerNodeRequest; +import org.opensearch.cluster.metadata.WeightedRoutingMetadata; import org.opensearch.cluster.routing.WeightedRouting; import org.opensearch.common.bytes.BytesReference; import org.opensearch.common.io.stream.StreamInput; @@ -43,6 +44,15 @@ public class ClusterPutWeightedRoutingRequest extends ClusterManagerNodeRequest< private WeightedRouting weightedRouting; private String attributeName; + private long version; + + public void version(long version) { + this.version = version; + } + + public long getVersion() { + return this.version; + } public ClusterPutWeightedRoutingRequest() {} @@ -62,13 +72,14 @@ public void attributeName(String attributeName) { public ClusterPutWeightedRoutingRequest(StreamInput in) throws IOException { super(in); weightedRouting = new WeightedRouting(in); + version = in.readLong(); } public ClusterPutWeightedRoutingRequest(String attributeName) { this.attributeName = attributeName; } - public void setWeightedRouting(Map source) { + public void setWeightedRouting(Map source) { try { if (source.isEmpty()) { throw new OpenSearchParseException(("Empty request body")); @@ -96,22 +107,56 @@ public void setWeightedRouting(BytesReference source, XContentType contentType) XContentParser.Token token; // move to the first alias parser.nextToken(); + String versionAttr = null; + String weightsAttr; + long version = WeightedRoutingMetadata.VERSION_UNSET_VALUE; + while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { if (token == XContentParser.Token.FIELD_NAME) { - attrValue = parser.currentName(); - } else if (token == XContentParser.Token.VALUE_STRING) { - attrWeight = Double.parseDouble(parser.text()); - weights.put(attrValue, attrWeight); + String fieldName = parser.currentName(); + if (fieldName != null && fieldName.equals(WeightedRoutingMetadata.VERSION)) { + versionAttr = parser.currentName(); + continue; + } else if (fieldName != null && fieldName.equals("weights")) { + weightsAttr = parser.currentName(); + } else { + throw new OpenSearchParseException("failed to parse weighted routing request object [{}]", fieldName); + } + if (parser.nextToken() != XContentParser.Token.START_OBJECT) { + throw new OpenSearchParseException( + "failed to parse weighted routing request object [{}], expected object", + weightsAttr + ); + } + + while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { + if (token == XContentParser.Token.FIELD_NAME) { + attrValue = parser.currentName(); + } else if (token == XContentParser.Token.VALUE_STRING) { + attrWeight = Double.parseDouble(parser.text()); + weights.put(attrValue, attrWeight); + } else { + throw new OpenSearchParseException( + "failed to parse weighted routing request attribute [{}], " + "unknown type", + attrWeight + ); + } + } + } else if (token == XContentParser.Token.VALUE_NUMBER) { + if (versionAttr != null && versionAttr.equals(WeightedRoutingMetadata.VERSION)) { + version = parser.longValue(); + } } else { throw new OpenSearchParseException( - "failed to parse weighted routing request attribute [{}], " + "unknown type", - attrWeight + "failed to parse weighted routing request " + "[{}], unknown " + "type", + attributeName ); } } this.weightedRouting = new WeightedRouting(this.attributeName, weights); + this.version = version; } catch (IOException e) { - logger.error("error while parsing put for weighted routing request object", e); + logger.error("error while parsing put weighted routing request object", e); } } @@ -127,6 +172,9 @@ public ActionRequestValidationException validate() { if (weightedRouting.weights() == null || weightedRouting.weights().isEmpty()) { validationException = addValidationError("Weights are missing", validationException); } + if (version == WeightedRoutingMetadata.VERSION_UNSET_VALUE) { + validationException = addValidationError("Version is missing", validationException); + } int countValueWithZeroWeights = 0; double weight; try { @@ -164,7 +212,7 @@ public ActionRequestValidationException validate() { * @param source weights definition from request body * @return this request */ - public ClusterPutWeightedRoutingRequest source(Map source) { + public ClusterPutWeightedRoutingRequest source(Map source) { setWeightedRouting(source); return this; } @@ -173,11 +221,12 @@ public ClusterPutWeightedRoutingRequest source(Map source) { public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); weightedRouting.writeTo(out); + out.writeLong(version); } @Override public String toString() { - return "ClusterPutWeightedRoutingRequest{" + "weightedRouting= " + weightedRouting.toString() + "}"; + return "ClusterPutWeightedRoutingRequest{" + "weightedRouting= " + weightedRouting.toString() + "version= " + version + "}"; } } diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/put/ClusterPutWeightedRoutingRequestBuilder.java b/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/put/ClusterPutWeightedRoutingRequestBuilder.java index b437f4c54d8d6..adfb2cf02f6d9 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/put/ClusterPutWeightedRoutingRequestBuilder.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/put/ClusterPutWeightedRoutingRequestBuilder.java @@ -30,4 +30,8 @@ public ClusterPutWeightedRoutingRequestBuilder setWeightedRouting(WeightedRoutin return this; } + public ClusterPutWeightedRoutingRequestBuilder setVersion(long version) { + request.version(version); + return this; + } } diff --git a/server/src/main/java/org/opensearch/client/Requests.java b/server/src/main/java/org/opensearch/client/Requests.java index 21f2a2d906602..cad5bac8acf0d 100644 --- a/server/src/main/java/org/opensearch/client/Requests.java +++ b/server/src/main/java/org/opensearch/client/Requests.java @@ -578,8 +578,8 @@ public static ClusterGetWeightedRoutingRequest getWeightedRoutingRequest(String * * @return delete weight request */ - public static ClusterDeleteWeightedRoutingRequest deleteWeightedRoutingRequest() { - return new ClusterDeleteWeightedRoutingRequest(); + public static ClusterDeleteWeightedRoutingRequest deleteWeightedRoutingRequest(String attributeName) { + return new ClusterDeleteWeightedRoutingRequest(attributeName); } /** diff --git a/server/src/main/java/org/opensearch/cluster/metadata/WeightedRoutingMetadata.java b/server/src/main/java/org/opensearch/cluster/metadata/WeightedRoutingMetadata.java index 07cdc949c4529..320f75a9f2ada 100644 --- a/server/src/main/java/org/opensearch/cluster/metadata/WeightedRoutingMetadata.java +++ b/server/src/main/java/org/opensearch/cluster/metadata/WeightedRoutingMetadata.java @@ -36,6 +36,15 @@ public class WeightedRoutingMetadata extends AbstractNamedDiffable weights = new HashMap<>(); - WeightedRouting weightedRouting = null; + WeightedRouting weightedRouting; XContentParser.Token token; - String awarenessField = null; + String awarenessField; + String versionAttr = null; + long version = VERSION_UNSET_VALUE; while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { if (token == XContentParser.Token.FIELD_NAME) { - awarenessField = parser.currentName(); + String attr = parser.currentName(); + if (attr != null && attr.equals(VERSION)) { + versionAttr = parser.currentName(); + continue; + } else { + awarenessField = parser.currentName(); + } if (parser.nextToken() != XContentParser.Token.START_OBJECT) { - throw new OpenSearchParseException( - "failed to parse weighted routing metadata [{}], expected " + "object", - awarenessField - ); + throw new OpenSearchParseException("failed to parse weighted routing metadata [{}], expected object", awarenessField); } while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { attributeName = parser.currentName(); if (parser.nextToken() != XContentParser.Token.START_OBJECT) { throw new OpenSearchParseException( - "failed to parse weighted routing metadata [{}], expected" + " object", + "failed to parse weighted routing metadata [{}], expected object", attributeName ); } while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { if (token == XContentParser.Token.FIELD_NAME) { attrKey = parser.currentName(); + } else if (token == XContentParser.Token.VALUE_NUMBER) { - attrValue = Double.parseDouble(parser.text()); - weights.put(attrKey, attrValue); + if (attrKey != null && attrKey.equals(VERSION)) { + version = Long.parseLong(parser.text()); + } else { + attrValue = Double.parseDouble(parser.text()); + weights.put(attrKey, attrValue); + } + } else { throw new OpenSearchParseException( "failed to parse weighted routing metadata attribute " + "[{}], unknown type", @@ -123,10 +146,14 @@ public static WeightedRoutingMetadata fromXContent(XContentParser parser) throws } } } + } else if (token == XContentParser.Token.VALUE_NUMBER) { + if (versionAttr != null && versionAttr.equals(VERSION)) { + version = Long.parseLong(parser.text()); + } } } weightedRouting = new WeightedRouting(attributeName, weights); - return new WeightedRoutingMetadata(weightedRouting); + return new WeightedRoutingMetadata(weightedRouting, version); } @Override @@ -144,18 +171,21 @@ public int hashCode() { @Override public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException { - toXContent(weightedRouting, builder); + toXContent(weightedRouting, builder, version); return builder; } - public static void toXContent(WeightedRouting weightedRouting, XContentBuilder builder) throws IOException { + public static void toXContent(WeightedRouting weightedRouting, XContentBuilder builder, long version) throws IOException { builder.startObject(AWARENESS); - builder.startObject(weightedRouting.attributeName()); - for (Map.Entry entry : weightedRouting.weights().entrySet()) { - builder.field(entry.getKey(), entry.getValue()); + if (weightedRouting.isSet()) { + builder.startObject(weightedRouting.attributeName()); + for (Map.Entry entry : weightedRouting.weights().entrySet()) { + builder.field(entry.getKey(), entry.getValue()); + } + builder.endObject(); } builder.endObject(); - builder.endObject(); + builder.field(VERSION, version); } @Override diff --git a/server/src/main/java/org/opensearch/cluster/routing/OperationRouting.java b/server/src/main/java/org/opensearch/cluster/routing/OperationRouting.java index 9026da667ccb0..a4b4cc961fade 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/OperationRouting.java +++ b/server/src/main/java/org/opensearch/cluster/routing/OperationRouting.java @@ -324,7 +324,7 @@ private ShardIterator shardRoutings( @Nullable Map nodeCounts, @Nullable WeightedRoutingMetadata weightedRoutingMetadata ) { - if (weightedRoutingMetadata != null) { + if (weightedRoutingMetadata != null && weightedRoutingMetadata.getWeightedRouting().isSet()) { return indexShard.activeInitializingShardsWeightedIt( weightedRoutingMetadata.getWeightedRouting(), nodes, diff --git a/server/src/main/java/org/opensearch/cluster/routing/WeightedRouting.java b/server/src/main/java/org/opensearch/cluster/routing/WeightedRouting.java index df2d8d595eaab..bff8dd833f2de 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/WeightedRouting.java +++ b/server/src/main/java/org/opensearch/cluster/routing/WeightedRouting.java @@ -13,6 +13,7 @@ import org.opensearch.common.io.stream.Writeable; import java.io.IOException; +import java.util.HashMap; import java.util.Map; import java.util.Objects; @@ -25,6 +26,11 @@ public class WeightedRouting implements Writeable { private String attributeName; private Map weights; + public WeightedRouting() { + this.attributeName = ""; + this.weights = new HashMap<>(3); + } + public WeightedRouting(String attributeName, Map weights) { this.attributeName = attributeName; this.weights = weights; @@ -40,6 +46,10 @@ public WeightedRouting(StreamInput in) throws IOException { weights = (Map) in.readGenericValue(); } + public boolean isSet() { + return (!this.attributeName.isEmpty() && !this.weights.isEmpty()); + } + @Override public void writeTo(StreamOutput out) throws IOException { out.writeString(attributeName); diff --git a/server/src/main/java/org/opensearch/cluster/routing/WeightedRoutingService.java b/server/src/main/java/org/opensearch/cluster/routing/WeightedRoutingService.java index 895b790f8499a..9992930a1a7f6 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/WeightedRoutingService.java +++ b/server/src/main/java/org/opensearch/cluster/routing/WeightedRoutingService.java @@ -13,6 +13,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; +import org.opensearch.ResourceNotFoundException; import org.opensearch.action.ActionListener; import org.opensearch.action.admin.cluster.shards.routing.weighted.delete.ClusterDeleteWeightedRoutingRequest; import org.opensearch.action.ActionRequestValidationException; @@ -77,13 +78,16 @@ public WeightedRoutingService( CLUSTER_ROUTING_ALLOCATION_AWARENESS_FORCE_GROUP_SETTING, this::setForcedAwarenessAttributes ); + } public void registerWeightedRoutingMetadata( final ClusterPutWeightedRoutingRequest request, final ActionListener listener ) { - final WeightedRoutingMetadata newWeightedRoutingMetadata = new WeightedRoutingMetadata(request.getWeightedRouting()); + final WeightedRouting newWeightedRouting = new WeightedRouting(request.getWeightedRouting()); + + final long requestVersion = request.getVersion(); clusterService.submitStateUpdateTask("update_weighted_routing", new ClusterStateUpdateTask(Priority.URGENT) { @Override public ClusterState execute(ClusterState currentState) { @@ -94,19 +98,21 @@ public ClusterState execute(ClusterState currentState) { Metadata metadata = currentState.metadata(); Metadata.Builder mdBuilder = Metadata.builder(currentState.metadata()); WeightedRoutingMetadata weightedRoutingMetadata = metadata.custom(WeightedRoutingMetadata.TYPE); + ensureNoVersionConflict(requestVersion, weightedRoutingMetadata); if (weightedRoutingMetadata == null) { - logger.info("put weighted routing weights in metadata [{}]", request.getWeightedRouting()); - weightedRoutingMetadata = new WeightedRoutingMetadata(request.getWeightedRouting()); + logger.info("add weighted routing weights in metadata [{}]", newWeightedRouting); + weightedRoutingMetadata = new WeightedRoutingMetadata(newWeightedRouting, requestVersion + 1); } else { - if (!checkIfSameWeightsInMetadata(newWeightedRoutingMetadata, weightedRoutingMetadata)) { - logger.info("updated weighted routing weights [{}] in metadata", request.getWeightedRouting()); - weightedRoutingMetadata = new WeightedRoutingMetadata(newWeightedRoutingMetadata.getWeightedRouting()); + if (!newWeightedRouting.equals(weightedRoutingMetadata.getWeightedRouting())) { + logger.info("updated weighted routing weights [{}] in metadata", newWeightedRouting); + weightedRoutingMetadata = new WeightedRoutingMetadata(newWeightedRouting, requestVersion + 1); } else { + logger.info("weights are same, not updating weighted routing weights [{}] in metadata", newWeightedRouting); return currentState; } } mdBuilder.putCustom(WeightedRoutingMetadata.TYPE, weightedRoutingMetadata); - logger.info("building cluster state with weighted routing weights [{}]", request.getWeightedRouting()); + logger.info("building cluster state with weighted routing weights [{}]", newWeightedRouting); return ClusterState.builder(currentState).metadata(mdBuilder).build(); } @@ -124,23 +130,37 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS }); } - private boolean checkIfSameWeightsInMetadata( - WeightedRoutingMetadata newWeightedRoutingMetadata, - WeightedRoutingMetadata oldWeightedRoutingMetadata - ) { - return newWeightedRoutingMetadata.getWeightedRouting().equals(oldWeightedRoutingMetadata.getWeightedRouting()); - } - public void deleteWeightedRoutingMetadata( final ClusterDeleteWeightedRoutingRequest request, final ActionListener listener ) { + final long requestVersion = request.getVersion(); + final String awarenessAttribute = request.getAwarenessAttribute(); clusterService.submitStateUpdateTask("delete_weighted_routing", new ClusterStateUpdateTask(Priority.URGENT) { @Override public ClusterState execute(ClusterState currentState) { logger.info("Deleting weighted routing metadata from the cluster state"); + Metadata metadata = currentState.metadata(); Metadata.Builder mdBuilder = Metadata.builder(currentState.metadata()); - mdBuilder.removeCustom(WeightedRoutingMetadata.TYPE); + WeightedRoutingMetadata weightedRoutingMetadata = metadata.custom(WeightedRoutingMetadata.TYPE); + ensureNoVersionConflict(requestVersion, weightedRoutingMetadata); + + if ((weightedRoutingMetadata != null && awarenessAttribute == null) + || (weightedRoutingMetadata != null + && weightedRoutingMetadata.getWeightedRouting().attributeName().equals(awarenessAttribute))) { + weightedRoutingMetadata = new WeightedRoutingMetadata(new WeightedRouting(), weightedRoutingMetadata.getVersion() + 1); + } else { + throw new ResourceNotFoundException( + String.format( + Locale.ROOT, + "weighted routing metadata does not have weights set for awareness attribute %s", + awarenessAttribute + ) + ); + } + + mdBuilder.putCustom(WeightedRoutingMetadata.TYPE, weightedRoutingMetadata); + logger.info("building cluster state with weighted routing weights deleted"); return ClusterState.builder(currentState).metadata(mdBuilder).build(); } @@ -153,7 +173,6 @@ public void onFailure(String source, Exception e) { @Override public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { logger.debug("cluster weighted routing metadata change is processed by all the nodes"); - assert newState.metadata().weightedRoutingMetadata() == null; listener.onResponse(new ClusterDeleteWeightedRoutingResponse(true)); } }); @@ -183,7 +202,7 @@ public void verifyAwarenessAttribute(String attributeName) { if (getAwarenessAttributes().contains(attributeName) == false) { ActionRequestValidationException validationException = null; validationException = addValidationError( - String.format(Locale.ROOT, "invalid awareness attribute %s requested for updating weighted routing", attributeName), + String.format(Locale.ROOT, "invalid awareness attribute %s requested for weighted routing", attributeName), validationException ); throw validationException; @@ -270,4 +289,18 @@ private void ensureDecommissionedAttributeHasZeroWeight(ClusterState state, Clus ); } } + + private void ensureNoVersionConflict(long requestedVersion, WeightedRoutingMetadata weightedRoutingMetadata) { + if ((weightedRoutingMetadata == null && requestedVersion != WeightedRoutingMetadata.INITIAL_VERSION) + || (weightedRoutingMetadata != null && requestedVersion != weightedRoutingMetadata.getVersion())) { + throw new UnsupportedWeightedRoutingStateException( + String.format( + Locale.ROOT, + "requested version is %s but cluster weighted routing metadata is at a " + "different version %s ", + requestedVersion, + weightedRoutingMetadata != null ? weightedRoutingMetadata.getVersion() : WeightedRoutingMetadata.INITIAL_VERSION + ) + ); + } + } } diff --git a/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestClusterDeleteWeightedRoutingAction.java b/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestClusterDeleteWeightedRoutingAction.java index 9742cc373d520..d9dedf8d14506 100644 --- a/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestClusterDeleteWeightedRoutingAction.java +++ b/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestClusterDeleteWeightedRoutingAction.java @@ -20,7 +20,8 @@ import java.io.IOException; import java.util.List; -import static java.util.Collections.singletonList; +import static java.util.Arrays.asList; +import static java.util.Collections.unmodifiableList; import static org.opensearch.rest.RestRequest.Method.DELETE; /** @@ -35,7 +36,12 @@ public class RestClusterDeleteWeightedRoutingAction extends BaseRestHandler { @Override public List routes() { - return singletonList(new Route(DELETE, "/_cluster/routing/awareness/weights")); + return unmodifiableList( + asList( + new Route(DELETE, "/_cluster/routing/awareness/weights"), + new Route(DELETE, "/_cluster/routing/awareness/{attribute}/weights") + ) + ); } @Override @@ -45,9 +51,17 @@ public String getName() { @Override protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException { - ClusterDeleteWeightedRoutingRequest clusterDeleteWeightedRoutingRequest = Requests.deleteWeightedRoutingRequest(); + ClusterDeleteWeightedRoutingRequest clusterDeleteWeightedRoutingRequest = createRequest(request); return channel -> client.admin() .cluster() .deleteWeightedRouting(clusterDeleteWeightedRoutingRequest, new RestToXContentListener<>(channel)); } + + public static ClusterDeleteWeightedRoutingRequest createRequest(RestRequest request) throws IOException { + ClusterDeleteWeightedRoutingRequest deleteWeightedRoutingRequest = Requests.deleteWeightedRoutingRequest( + request.param("attribute") + ); + request.applyContentParser(p -> deleteWeightedRoutingRequest.source(p.mapStrings())); + return deleteWeightedRoutingRequest; + } } diff --git a/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestClusterPutWeightedRoutingAction.java b/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestClusterPutWeightedRoutingAction.java index 1cf44e665cf84..5f845b7a66c1f 100644 --- a/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestClusterPutWeightedRoutingAction.java +++ b/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestClusterPutWeightedRoutingAction.java @@ -51,7 +51,7 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli public static ClusterPutWeightedRoutingRequest createRequest(RestRequest request) throws IOException { ClusterPutWeightedRoutingRequest putWeightedRoutingRequest = Requests.putWeightedRoutingRequest(request.param("attribute")); - request.applyContentParser(p -> putWeightedRoutingRequest.source(p.mapStrings())); + request.applyContentParser(p -> putWeightedRoutingRequest.source(p.mapOrdered())); return putWeightedRoutingRequest; } diff --git a/server/src/test/java/org/opensearch/action/admin/cluster/shards/routing/weighted/put/ClusterPutWeightedRoutingRequestTests.java b/server/src/test/java/org/opensearch/action/admin/cluster/shards/routing/weighted/put/ClusterPutWeightedRoutingRequestTests.java index 5e456158941b8..9d0ed8e03d7f2 100644 --- a/server/src/test/java/org/opensearch/action/admin/cluster/shards/routing/weighted/put/ClusterPutWeightedRoutingRequestTests.java +++ b/server/src/test/java/org/opensearch/action/admin/cluster/shards/routing/weighted/put/ClusterPutWeightedRoutingRequestTests.java @@ -19,16 +19,18 @@ public class ClusterPutWeightedRoutingRequestTests extends OpenSearchTestCase { public void testSetWeightedRoutingWeight() { - String reqString = "{\"us-east-1c\" : \"0\", \"us-east-1b\":\"1\",\"us-east-1a\":\"1\"}"; + String reqString = "{\"weights\":{\"us-east-1c\":\"0\",\"us-east-1b\":\"1\",\"us-east-1a\":\"1\"},\"_version\":1}"; ClusterPutWeightedRoutingRequest request = new ClusterPutWeightedRoutingRequest("zone"); Map weights = Map.of("us-east-1a", 1.0, "us-east-1b", 1.0, "us-east-1c", 0.0); WeightedRouting weightedRouting = new WeightedRouting("zone", weights); + request.setWeightedRouting(new BytesArray(reqString), XContentType.JSON); - assertEquals(request.getWeightedRouting(), weightedRouting); + assertEquals(weightedRouting, request.getWeightedRouting()); + assertEquals(1, request.getVersion()); } public void testValidate_ValuesAreProper() { - String reqString = "{\"us-east-1c\" : \"1\", \"us-east-1b\":\"0\",\"us-east-1a\":\"1\"}"; + String reqString = "{\"weights\":{\"us-east-1c\":\"0\",\"us-east-1b\":\"1\",\"us-east-1a\":\"1\"},\"_version\":1}"; ClusterPutWeightedRoutingRequest request = new ClusterPutWeightedRoutingRequest("zone"); request.setWeightedRouting(new BytesArray(reqString), XContentType.JSON); ActionRequestValidationException actionRequestValidationException = request.validate(); @@ -45,7 +47,7 @@ public void testValidate_MissingWeights() { } public void testValidate_AttributeMissing() { - String reqString = "{\"us-east-1c\" : \"0\", \"us-east-1b\":\"1\",\"us-east-1a\":\"1\"}"; + String reqString = "{\"weights\":{\"us-east-1c\":\"0\",\"us-east-1b\":\"1\",\"us-east-1a\": \"1\"},\"_version\":1}"; ClusterPutWeightedRoutingRequest request = new ClusterPutWeightedRoutingRequest(); request.setWeightedRouting(new BytesArray(reqString), XContentType.JSON); ActionRequestValidationException actionRequestValidationException = request.validate(); @@ -54,7 +56,7 @@ public void testValidate_AttributeMissing() { } public void testValidate_MoreThanHalfWithZeroWeight() { - String reqString = "{\"us-east-1c\" : \"0\", \"us-east-1b\":\"0\",\"us-east-1a\":\"1\"}"; + String reqString = "{\"weights\":{\"us-east-1c\":\"0\",\"us-east-1b\":\"0\",\"us-east-1a\": \"1\"}," + "\"_version\":1}"; ClusterPutWeightedRoutingRequest request = new ClusterPutWeightedRoutingRequest("zone"); request.setWeightedRouting(new BytesArray(reqString), XContentType.JSON); ActionRequestValidationException actionRequestValidationException = request.validate(); @@ -63,4 +65,13 @@ public void testValidate_MoreThanHalfWithZeroWeight() { actionRequestValidationException.getMessage().contains("Maximum expected number of routing weights having zero weight is [1]") ); } + + public void testValidate_VersionMissing() { + String reqString = "{\"weights\":{\"us-east-1c\": \"0\",\"us-east-1b\": \"1\",\"us-east-1a\": \"1\"}}"; + ClusterPutWeightedRoutingRequest request = new ClusterPutWeightedRoutingRequest("zone"); + request.setWeightedRouting(new BytesArray(reqString), XContentType.JSON); + ActionRequestValidationException actionRequestValidationException = request.validate(); + assertNotNull(actionRequestValidationException); + assertTrue(actionRequestValidationException.getMessage().contains("Version is missing")); + } } diff --git a/server/src/test/java/org/opensearch/cluster/action/shard/routing/weighted/get/ClusterGetWeightedRoutingResponseTests.java b/server/src/test/java/org/opensearch/cluster/action/shard/routing/weighted/get/ClusterGetWeightedRoutingResponseTests.java index e48ba6fbd62d6..6107c03e5a891 100644 --- a/server/src/test/java/org/opensearch/cluster/action/shard/routing/weighted/get/ClusterGetWeightedRoutingResponseTests.java +++ b/server/src/test/java/org/opensearch/cluster/action/shard/routing/weighted/get/ClusterGetWeightedRoutingResponseTests.java @@ -21,7 +21,7 @@ public class ClusterGetWeightedRoutingResponseTests extends AbstractXContentTest protected ClusterGetWeightedRoutingResponse createTestInstance() { Map weights = Map.of("zone_A", 1.0, "zone_B", 0.0, "zone_C", 1.0); WeightedRouting weightedRouting = new WeightedRouting("", weights); - ClusterGetWeightedRoutingResponse response = new ClusterGetWeightedRoutingResponse(weightedRouting, true); + ClusterGetWeightedRoutingResponse response = new ClusterGetWeightedRoutingResponse(weightedRouting, true, 0); return response; } diff --git a/server/src/test/java/org/opensearch/cluster/action/shard/routing/weighted/get/TransportGetWeightedRoutingActionTests.java b/server/src/test/java/org/opensearch/cluster/action/shard/routing/weighted/get/TransportGetWeightedRoutingActionTests.java index b4e883ecaacb1..df5b6566b503e 100644 --- a/server/src/test/java/org/opensearch/cluster/action/shard/routing/weighted/get/TransportGetWeightedRoutingActionTests.java +++ b/server/src/test/java/org/opensearch/cluster/action/shard/routing/weighted/get/TransportGetWeightedRoutingActionTests.java @@ -174,7 +174,7 @@ private ClusterState setLocalNode(ClusterState clusterState, String nodeId) { private ClusterState setWeightedRoutingWeights(ClusterState clusterState, Map weights) { WeightedRouting weightedRouting = new WeightedRouting("zone", weights); - WeightedRoutingMetadata weightedRoutingMetadata = new WeightedRoutingMetadata(weightedRouting); + WeightedRoutingMetadata weightedRoutingMetadata = new WeightedRoutingMetadata(weightedRouting, 0); Metadata.Builder metadataBuilder = Metadata.builder(clusterState.metadata()); metadataBuilder.putCustom(WeightedRoutingMetadata.TYPE, weightedRoutingMetadata); clusterState = ClusterState.builder(clusterState).metadata(metadataBuilder).build(); diff --git a/server/src/test/java/org/opensearch/cluster/decommission/DecommissionServiceTests.java b/server/src/test/java/org/opensearch/cluster/decommission/DecommissionServiceTests.java index 81fbd7c0e332b..a942c62bd05eb 100644 --- a/server/src/test/java/org/opensearch/cluster/decommission/DecommissionServiceTests.java +++ b/server/src/test/java/org/opensearch/cluster/decommission/DecommissionServiceTests.java @@ -332,7 +332,7 @@ public void onFailure(Exception e) { private void setWeightedRoutingWeights(Map weights) { ClusterState clusterState = clusterService.state(); WeightedRouting weightedRouting = new WeightedRouting("zone", weights); - WeightedRoutingMetadata weightedRoutingMetadata = new WeightedRoutingMetadata(weightedRouting); + WeightedRoutingMetadata weightedRoutingMetadata = new WeightedRoutingMetadata(weightedRouting, 0); Metadata.Builder metadataBuilder = Metadata.builder(clusterState.metadata()); metadataBuilder.putCustom(WeightedRoutingMetadata.TYPE, weightedRoutingMetadata); clusterState = ClusterState.builder(clusterState).metadata(metadataBuilder).build(); diff --git a/server/src/test/java/org/opensearch/cluster/metadata/WeightedRoutingMetadataTests.java b/server/src/test/java/org/opensearch/cluster/metadata/WeightedRoutingMetadataTests.java index a0a9d2bd9586b..17b682618b1a8 100644 --- a/server/src/test/java/org/opensearch/cluster/metadata/WeightedRoutingMetadataTests.java +++ b/server/src/test/java/org/opensearch/cluster/metadata/WeightedRoutingMetadataTests.java @@ -20,7 +20,7 @@ public class WeightedRoutingMetadataTests extends AbstractXContentTestCase weights = Map.of("a", 1.0, "b", 1.0, "c", 0.0); WeightedRouting weightedRouting = new WeightedRouting("zone", weights); - WeightedRoutingMetadata weightedRoutingMetadata = new WeightedRoutingMetadata(weightedRouting); + WeightedRoutingMetadata weightedRoutingMetadata = new WeightedRoutingMetadata(weightedRouting, -1); return weightedRoutingMetadata; } diff --git a/server/src/test/java/org/opensearch/cluster/routing/OperationRoutingTests.java b/server/src/test/java/org/opensearch/cluster/routing/OperationRoutingTests.java index 014f2d237a306..d64402a74fba2 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/OperationRoutingTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/OperationRoutingTests.java @@ -789,7 +789,7 @@ private ClusterState clusterStateForWeightedRouting(String[] indexNames, int num private ClusterState setWeightedRoutingWeights(ClusterState clusterState, Map weights) { WeightedRouting weightedRouting = new WeightedRouting("zone", weights); - WeightedRoutingMetadata weightedRoutingMetadata = new WeightedRoutingMetadata(weightedRouting); + WeightedRoutingMetadata weightedRoutingMetadata = new WeightedRoutingMetadata(weightedRouting, 0); Metadata.Builder metadataBuilder = Metadata.builder(clusterState.metadata()); metadataBuilder.putCustom(WeightedRoutingMetadata.TYPE, weightedRoutingMetadata); clusterState = ClusterState.builder(clusterState).metadata(metadataBuilder).build(); @@ -1158,7 +1158,7 @@ private ClusterState updateStatetoTestWeightedRouting( // add weighted routing weights in metadata Map weights = Map.of("a", 1.0, "b", 1.0, "c", 0.0); WeightedRouting weightedRouting = new WeightedRouting("zone", weights); - WeightedRoutingMetadata weightedRoutingMetadata = new WeightedRoutingMetadata(weightedRouting); + WeightedRoutingMetadata weightedRoutingMetadata = new WeightedRoutingMetadata(weightedRouting, 0); metadataBuilder.putCustom(WeightedRoutingMetadata.TYPE, weightedRoutingMetadata); clusterState.metadata(metadataBuilder); clusterState.routingTable(routingTableBuilder.build()); diff --git a/server/src/test/java/org/opensearch/cluster/routing/WeightedRoutingServiceTests.java b/server/src/test/java/org/opensearch/cluster/routing/WeightedRoutingServiceTests.java index 089fb453ca2c0..1f892b993d4d6 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/WeightedRoutingServiceTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/WeightedRoutingServiceTests.java @@ -175,7 +175,7 @@ private ClusterState setLocalNode(ClusterState clusterState, String nodeId) { private ClusterState setWeightedRoutingWeights(ClusterState clusterState, Map weights) { WeightedRouting weightedRouting = new WeightedRouting("zone", weights); - WeightedRoutingMetadata weightedRoutingMetadata = new WeightedRoutingMetadata(weightedRouting); + WeightedRoutingMetadata weightedRoutingMetadata = new WeightedRoutingMetadata(weightedRouting, 0); Metadata.Builder metadataBuilder = Metadata.builder(clusterState.metadata()); metadataBuilder.putCustom(WeightedRoutingMetadata.TYPE, weightedRoutingMetadata); clusterState = ClusterState.builder(clusterState).metadata(metadataBuilder).build(); @@ -260,13 +260,13 @@ public void testDeleteWeightedRoutingMetadata() throws InterruptedException { ClusterState.Builder builder = ClusterState.builder(state); ClusterServiceUtils.setState(clusterService, builder); - ClusterDeleteWeightedRoutingRequest clusterDeleteWeightedRoutingRequest = new ClusterDeleteWeightedRoutingRequest(); + ClusterDeleteWeightedRoutingRequest clusterDeleteWeightedRoutingRequest = new ClusterDeleteWeightedRoutingRequest("zone"); + clusterDeleteWeightedRoutingRequest.setVersion(0); final CountDownLatch countDownLatch = new CountDownLatch(1); ActionListener listener = new ActionListener() { @Override public void onResponse(ClusterDeleteWeightedRoutingResponse clusterDeleteWeightedRoutingResponse) { assertTrue(clusterDeleteWeightedRoutingResponse.isAcknowledged()); - assertNull(clusterService.state().metadata().weightedRoutingMetadata()); countDownLatch.countDown(); } diff --git a/server/src/test/java/org/opensearch/rest/action/admin/cluster/RestClusterAddWeightedRoutingActionTests.java b/server/src/test/java/org/opensearch/rest/action/admin/cluster/RestClusterAddWeightedRoutingActionTests.java index a4cd6224217b7..582fbfce315b2 100644 --- a/server/src/test/java/org/opensearch/rest/action/admin/cluster/RestClusterAddWeightedRoutingActionTests.java +++ b/server/src/test/java/org/opensearch/rest/action/admin/cluster/RestClusterAddWeightedRoutingActionTests.java @@ -34,14 +34,15 @@ public void setupAction() { } public void testCreateRequest_SupportedRequestBody() throws IOException { - String req = "{\"us-east-1c\" : \"1\", \"us-east-1d\":\"1.0\", \"us-east-1a\":\"0.0\"}"; + String req = "{\"weights\":{\"us-east-1c\":\"0\",\"us-east-1b\":\"1\",\"us-east-1a\":\"1\"},\"_version\":1}"; RestRequest restRequest = buildRestRequest(req); ClusterPutWeightedRoutingRequest clusterPutWeightedRoutingRequest = RestClusterPutWeightedRoutingAction.createRequest(restRequest); assertEquals("zone", clusterPutWeightedRoutingRequest.getWeightedRouting().attributeName()); assertNotNull(clusterPutWeightedRoutingRequest.getWeightedRouting().weights()); - assertEquals("1.0", clusterPutWeightedRoutingRequest.getWeightedRouting().weights().get("us-east-1c").toString()); - assertEquals("1.0", clusterPutWeightedRoutingRequest.getWeightedRouting().weights().get("us-east-1d").toString()); - assertEquals("0.0", clusterPutWeightedRoutingRequest.getWeightedRouting().weights().get("us-east-1a").toString()); + assertEquals("0.0", clusterPutWeightedRoutingRequest.getWeightedRouting().weights().get("us-east-1c").toString()); + assertEquals("1.0", clusterPutWeightedRoutingRequest.getWeightedRouting().weights().get("us-east-1b").toString()); + assertEquals("1.0", clusterPutWeightedRoutingRequest.getWeightedRouting().weights().get("us-east-1a").toString()); + assertEquals(1, clusterPutWeightedRoutingRequest.getVersion()); } public void testCreateRequest_UnsupportedRequestBody() throws IOException { @@ -54,7 +55,7 @@ public void testCreateRequest_UnsupportedRequestBody() throws IOException { public void testCreateRequest_MalformedRequestBody() throws IOException { Map params = new HashMap<>(); - String req = "{\"us-east-1c\" : \1\", \"us-east-1d\":\"1\", \"us-east-1a\":\"0\"}"; + String req = "{\"weights\":{\"us-east-1c\":\"0,\"us-east-1b\":\"1\",\"us-east-1a\":\"1\"},\"_version\":1}"; RestRequest restRequest = buildRestRequest(req); assertThrows(JsonParseException.class, () -> RestClusterPutWeightedRoutingAction.createRequest(restRequest)); } diff --git a/server/src/test/java/org/opensearch/rest/action/admin/cluster/RestClusterDeleteWeightedRoutingActionTests.java b/server/src/test/java/org/opensearch/rest/action/admin/cluster/RestClusterDeleteWeightedRoutingActionTests.java new file mode 100644 index 0000000000000..2589d68e4cf0b --- /dev/null +++ b/server/src/test/java/org/opensearch/rest/action/admin/cluster/RestClusterDeleteWeightedRoutingActionTests.java @@ -0,0 +1,79 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.rest.action.admin.cluster; + +import org.junit.Before; +import org.opensearch.OpenSearchParseException; +import org.opensearch.action.admin.cluster.shards.routing.weighted.delete.ClusterDeleteWeightedRoutingRequest; +import org.opensearch.common.bytes.BytesArray; +import org.opensearch.common.xcontent.XContentType; +import org.opensearch.rest.RestRequest; +import org.opensearch.test.rest.FakeRestRequest; +import org.opensearch.test.rest.RestActionTestCase; + +import java.io.IOException; + +import static java.util.Collections.singletonMap; + +public class RestClusterDeleteWeightedRoutingActionTests extends RestActionTestCase { + private RestClusterPutWeightedRoutingAction action; + + @Before + public void setupAction() { + action = new RestClusterPutWeightedRoutingAction(); + controller().registerHandler(action); + } + + public void testDeleteRequest_SupportedRequestBody() throws IOException { + String req = "{\"_version\":2}"; + RestRequest restRequest = buildRestRequest(req); + ClusterDeleteWeightedRoutingRequest clusterDeleteWeightedRoutingRequest = RestClusterDeleteWeightedRoutingAction.createRequest( + restRequest + ); + assertEquals(2, clusterDeleteWeightedRoutingRequest.getVersion()); + + restRequest = buildRestRequestWithAwarenessAttribute(req); + clusterDeleteWeightedRoutingRequest = RestClusterDeleteWeightedRoutingAction.createRequest(restRequest); + assertEquals("zone", clusterDeleteWeightedRoutingRequest.getAwarenessAttribute()); + assertEquals(2, clusterDeleteWeightedRoutingRequest.getVersion()); + } + + public void testDeleteRequest_BadRequest() throws IOException { + String req = "{\"_ver\":2}"; + RestRequest restRequest = buildRestRequest(req); + assertThrows(OpenSearchParseException.class, () -> RestClusterDeleteWeightedRoutingAction.createRequest(restRequest)); + + RestRequest restRequest2 = buildRestRequestWithAwarenessAttribute(req); + assertThrows(OpenSearchParseException.class, () -> RestClusterDeleteWeightedRoutingAction.createRequest(restRequest2)); + } + + private RestRequest buildRestRequestWithAwarenessAttribute(String content) { + return new FakeRestRequest.Builder(xContentRegistry()).withMethod(RestRequest.Method.DELETE) + .withPath("/_cluster/routing/awareness/zone/weights") + .withParams(singletonMap("attribute", "zone")) + .withContent(new BytesArray(content), XContentType.JSON) + .build(); + } + + private RestRequest buildRestRequest(String content) { + return new FakeRestRequest.Builder(xContentRegistry()).withMethod(RestRequest.Method.DELETE) + .withPath("/_cluster/routing/awareness/weights") + .withContent(new BytesArray(content), XContentType.JSON) + .build(); + } + + public void testCreateRequest_EmptyRequestBody() throws IOException { + String req = "{}"; + RestRequest restRequest = buildRestRequest(req); + assertThrows(OpenSearchParseException.class, () -> RestClusterDeleteWeightedRoutingAction.createRequest(restRequest)); + + RestRequest restRequest2 = buildRestRequestWithAwarenessAttribute(req); + assertThrows(OpenSearchParseException.class, () -> RestClusterDeleteWeightedRoutingAction.createRequest(restRequest2)); + } +}