Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Weighted Shard Routing] API versioning #5255

Merged
merged 28 commits into from
Jan 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Added experimental support for extensions ([#5347](https://github.com/opensearch-project/OpenSearch/pull/5347)), ([#5518](https://github.com/opensearch-project/OpenSearch/pull/5518), ([#5597](https://github.com/opensearch-project/OpenSearch/pull/5597)), ([#5615](https://github.com/opensearch-project/OpenSearch/pull/5615)))
- Add CI bundle pattern to distribution download ([#5348](https://github.com/opensearch-project/OpenSearch/pull/5348))
- Add support for ppc64le architecture ([#5459](https://github.com/opensearch-project/OpenSearch/pull/5459))
- Support versioning for Weighted routing apis([#5255](https://github.com/opensearch-project/OpenSearch/pull/5255))
- Added @gbbafna as an OpenSearch maintainer ([#5668](https://github.com/opensearch-project/OpenSearch/pull/5668))
- Add support for discovered cluster manager and remove local weights ([#5680](https://github.com/opensearch-project/OpenSearch/pull/5680))
- Added support for feature flags in opensearch.yml ([#4959](https://github.com/opensearch-project/OpenSearch/pull/4959))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ public void testRestStatusForAcknowledgedDecommission() throws IOException {
.cluster()
.prepareWeightedRouting()
.setWeightedRouting(weightedRouting)
.setVersion(-1)
.get();
assertTrue(weightedRoutingResponse.isAcknowledged());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ public void testInvariantsAndLogsOnDecommissionedNodes() throws Exception {
.cluster()
.prepareWeightedRouting()
.setWeightedRouting(weightedRouting)
.setVersion(-1)
.get();
assertTrue(weightedRoutingResponse.isAcknowledged());

Expand Down Expand Up @@ -453,6 +454,7 @@ private void assertNodesRemovedAfterZoneDecommission(boolean originalClusterMana
.cluster()
.prepareWeightedRouting()
.setWeightedRouting(weightedRouting)
.setVersion(-1)
.get();
assertTrue(weightedRoutingResponse.isAcknowledged());

Expand Down Expand Up @@ -565,6 +567,7 @@ public void testDecommissionFailedWhenDifferentAttributeAlreadyDecommissioned()
.cluster()
.prepareWeightedRouting()
.setWeightedRouting(weightedRouting)
.setVersion(-1)
.get();
assertTrue(weightedRoutingResponse.isAcknowledged());

Expand Down Expand Up @@ -649,6 +652,7 @@ public void testDecommissionStatusUpdatePublishedToAllNodes() throws ExecutionEx
.cluster()
.prepareWeightedRouting()
.setWeightedRouting(weightedRouting)
.setVersion(-1)
.get();
assertTrue(weightedRoutingResponse.isAcknowledged());

Expand Down Expand Up @@ -769,6 +773,7 @@ public void testDecommissionFailedWhenAttributeNotWeighedAway() throws Exception
.cluster()
.prepareWeightedRouting()
.setWeightedRouting(weightedRouting)
.setVersion(-1)
.get();
assertTrue(weightedRoutingResponse.isAcknowledged());

Expand Down Expand Up @@ -809,6 +814,7 @@ public void testDecommissionFailedWithOnlyOneAttributeValueForLeader() throws Ex
.cluster()
.prepareWeightedRouting()
.setWeightedRouting(weightedRouting)
.setVersion(-1)
.get();
assertTrue(weightedRoutingResponse.isAcknowledged());

Expand Down Expand Up @@ -922,6 +928,7 @@ public void testDecommissionAcknowledgedIfWeightsNotSetForNonRoutingNode() throw
.cluster()
.prepareWeightedRouting()
.setWeightedRouting(weightedRouting)
.setVersion(-1)
.get();
assertTrue(weightedRoutingResponse.isAcknowledged());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,13 @@

package org.opensearch.cluster.routing;

import org.opensearch.ResourceNotFoundException;
import org.opensearch.action.admin.cluster.health.ClusterHealthResponse;
import org.opensearch.action.admin.cluster.shards.routing.weighted.delete.ClusterDeleteWeightedRoutingResponse;
import org.opensearch.action.admin.cluster.shards.routing.weighted.get.ClusterGetWeightedRoutingResponse;
import org.opensearch.action.admin.cluster.shards.routing.weighted.put.ClusterPutWeightedRoutingResponse;
import org.opensearch.common.settings.Settings;
import org.opensearch.rest.RestStatus;
import org.opensearch.snapshots.mockstore.MockRepository;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.disruption.NetworkDisruption;
Expand Down Expand Up @@ -79,6 +81,7 @@ public void testPutWeightedRouting() {
.cluster()
.prepareWeightedRouting()
.setWeightedRouting(weightedRouting)
.setVersion(-1)
.get();
assertEquals(response.isAcknowledged(), true);

Expand All @@ -88,6 +91,7 @@ public void testPutWeightedRouting() {
.cluster()
.prepareWeightedRouting()
.setWeightedRouting(weightedRouting)
.setVersion(0)
.get();
assertEquals(response.isAcknowledged(), true);
}
Expand Down Expand Up @@ -215,6 +219,7 @@ public void testGetWeightedRouting_WeightsAreSet() throws IOException {
.cluster()
.prepareWeightedRouting()
.setWeightedRouting(weightedRouting)
.setVersion(-1)
.get();
assertEquals(response.isAcknowledged(), true);

Expand Down Expand Up @@ -304,6 +309,7 @@ public void testGetWeightedRouting_ClusterManagerNotDiscovered() throws Exceptio
.cluster()
.prepareWeightedRouting()
.setWeightedRouting(weightedRouting)
.setVersion(-1)
.get();
assertEquals(response.isAcknowledged(), true);

Expand Down Expand Up @@ -361,6 +367,7 @@ public void testWeightedRoutingMetadataOnOSProcessRestart() throws Exception {
.cluster()
.prepareWeightedRouting()
.setWeightedRouting(weightedRouting)
.setVersion(-1)
.get();
assertEquals(response.isAcknowledged(), true);

Expand Down Expand Up @@ -398,12 +405,14 @@ public void testDeleteWeightedRouting_WeightsNotSet() {
assertNull(internalCluster().clusterService().state().metadata().weightedRoutingMetadata());

// delete weighted routing metadata
ClusterDeleteWeightedRoutingResponse deleteResponse = client().admin().cluster().prepareDeleteWeightedRouting().get();
assertTrue(deleteResponse.isAcknowledged());
assertNull(internalCluster().clusterService().state().metadata().weightedRoutingMetadata());
ResourceNotFoundException exception = expectThrows(
ResourceNotFoundException.class,
() -> client().admin().cluster().prepareDeleteWeightedRouting().setVersion(-1).get()
);
assertEquals(RestStatus.NOT_FOUND, exception.status());
}

public void testDeleteWeightedRouting_WeightsAreSet() {
public void testDeleteWeightedRouting_WeightsAreSet() throws IOException {
Settings commonSettings = Settings.builder()
.put("cluster.routing.allocation.awareness.attributes", "zone")
.put("cluster.routing.allocation.awareness.force.zone.values", "a,b,c")
Expand All @@ -430,13 +439,111 @@ public void testDeleteWeightedRouting_WeightsAreSet() {
.cluster()
.prepareWeightedRouting()
.setWeightedRouting(weightedRouting)
.setVersion(-1)
.get();
assertEquals(response.isAcknowledged(), true);
assertTrue(response.isAcknowledged());
assertNotNull(internalCluster().clusterService().state().metadata().weightedRoutingMetadata());

// delete weighted routing metadata
ClusterDeleteWeightedRoutingResponse deleteResponse = client().admin().cluster().prepareDeleteWeightedRouting().get();
ClusterDeleteWeightedRoutingResponse deleteResponse = client().admin().cluster().prepareDeleteWeightedRouting().setVersion(0).get();
assertTrue(deleteResponse.isAcknowledged());
assertNull(internalCluster().clusterService().state().metadata().weightedRoutingMetadata());
}

public void testPutAndDeleteWithVersioning() throws Exception {
Settings commonSettings = Settings.builder()
.put("cluster.routing.allocation.awareness.attributes", "zone")
.put("cluster.routing.allocation.awareness.force.zone.values", "a,b,c")
.build();

logger.info("--> starting 6 nodes on different zones");
int nodeCountPerAZ = 2;

logger.info("--> starting a dedicated cluster manager node");
internalCluster().startClusterManagerOnlyNode(Settings.builder().put(commonSettings).build());

logger.info("--> starting 1 nodes on zones 'a' & 'b' & 'c'");
internalCluster().startDataOnlyNodes(nodeCountPerAZ, Settings.builder().put(commonSettings).put("node.attr.zone", "a").build());
internalCluster().startDataOnlyNodes(nodeCountPerAZ, Settings.builder().put(commonSettings).put("node.attr.zone", "b").build());
internalCluster().startDataOnlyNodes(nodeCountPerAZ, Settings.builder().put(commonSettings).put("node.attr.zone", "c").build());

logger.info("--> waiting for nodes to form a cluster");
ClusterHealthResponse health = client().admin().cluster().prepareHealth().setWaitForNodes("7").execute().actionGet();
assertThat(health.isTimedOut(), equalTo(false));

ensureGreen();

logger.info("--> setting shard routing weights for weighted round robin");

Map<String, Double> weights = Map.of("a", 1.0, "b", 2.0, "c", 3.0);
WeightedRouting weightedRouting = new WeightedRouting("zone", weights);
ClusterPutWeightedRoutingResponse response = client().admin()
.cluster()
.prepareWeightedRouting()
.setWeightedRouting(weightedRouting)
.setVersion(-1)
.get();
assertTrue(response.isAcknowledged());
assertNotNull(internalCluster().clusterService().state().metadata().weightedRoutingMetadata());

// update weights api call with correct version number
weights = Map.of("a", 1.0, "b", 2.0, "c", 4.0);
weightedRouting = new WeightedRouting("zone", weights);
response = client().admin().cluster().prepareWeightedRouting().setWeightedRouting(weightedRouting).setVersion(0).get();
assertTrue(response.isAcknowledged());

// update weights api call with incorrect version number
weights = Map.of("a", 1.0, "b", 2.0, "c", 4.0);
WeightedRouting weightedRouting1 = new WeightedRouting("zone", weights);
UnsupportedWeightedRoutingStateException exception = expectThrows(
UnsupportedWeightedRoutingStateException.class,
() -> client().admin().cluster().prepareWeightedRouting().setWeightedRouting(weightedRouting1).setVersion(100).get()
);
assertEquals(exception.status(), RestStatus.CONFLICT);

// get weights call
ClusterGetWeightedRoutingResponse weightedRoutingResponse = client().admin()
.cluster()
.prepareGetWeightedRouting()
.setAwarenessAttribute("zone")
.get();

// update weights call using version returned by get api call
weights = Map.of("a", 1.0, "b", 2.0, "c", 5.0);
weightedRouting = new WeightedRouting("zone", weights);
response = client().admin()
.cluster()
.prepareWeightedRouting()
.setWeightedRouting(weightedRouting)
.setVersion(weightedRoutingResponse.getVersion())
.get();
assertTrue(response.isAcknowledged());
assertNotNull(internalCluster().clusterService().state().metadata().weightedRoutingMetadata());

// delete weights by awareness attribute
ClusterDeleteWeightedRoutingResponse deleteResponse = client().admin()
.cluster()
.prepareDeleteWeightedRouting()
.setAwarenessAttribute("zone")
.setVersion(2)
.get();
assertTrue(deleteResponse.isAcknowledged());

// update weights again and make sure that version number got updated on delete
weights = Map.of("a", 1.0, "b", 2.0, "c", 6.0);
weightedRouting = new WeightedRouting("zone", weights);
response = client().admin().cluster().prepareWeightedRouting().setWeightedRouting(weightedRouting).setVersion(3).get();
assertTrue(response.isAcknowledged());
assertNotNull(internalCluster().clusterService().state().metadata().weightedRoutingMetadata());

// delete weights
deleteResponse = client().admin().cluster().prepareDeleteWeightedRouting().setVersion(4).get();
assertTrue(deleteResponse.isAcknowledged());

// delete weights call, incorrect version number
UnsupportedWeightedRoutingStateException deleteException = expectThrows(
UnsupportedWeightedRoutingStateException.class,
() -> client().admin().cluster().prepareDeleteWeightedRouting().setVersion(7).get()
);
assertEquals(RestStatus.CONFLICT, deleteException.status());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ public void testSearchWithWRRShardRouting() throws IOException {
.cluster()
.prepareWeightedRouting()
.setWeightedRouting(weightedRouting)
.setVersion(-1)
.get();
assertEquals(response.isAcknowledged(), true);

Expand Down Expand Up @@ -129,7 +130,7 @@ public void testSearchWithWRRShardRouting() throws IOException {

logger.info("--> deleted shard routing weights for weighted round robin");

ClusterDeleteWeightedRoutingResponse deleteResponse = client().admin().cluster().prepareDeleteWeightedRouting().get();
ClusterDeleteWeightedRoutingResponse deleteResponse = client().admin().cluster().prepareDeleteWeightedRouting().setVersion(0).get();
assertEquals(deleteResponse.isAcknowledged(), true);

hitNodes = new HashSet<>();
Expand Down
Loading