Skip to content

Commit

Permalink
Add GET api to get shard routing weights (#4275)
Browse files Browse the repository at this point in the history
* Weighted round-robin scheduling policy for shard coordination traffic routing

Signed-off-by: Anshu Agarwal <anshukag@amazon.com>
  • Loading branch information
anshu1106 authored Oct 6, 2022
1 parent 3186935 commit 563122e
Show file tree
Hide file tree
Showing 21 changed files with 1,152 additions and 39 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
- [CCR] Add getHistoryOperationsFromTranslog method to fetch the history snapshot from translogs ([#3948](https://github.com/opensearch-project/OpenSearch/pull/3948))
- [Remote Store] Change behaviour in replica recovery for remote translog enabled indices ([#4318](https://github.com/opensearch-project/OpenSearch/pull/4318))
- PUT api for weighted shard routing ([#4272](https://github.com/opensearch-project/OpenSearch/pull/4272))
- GET api for weighted shard routing([#4275](https://github.com/opensearch-project/OpenSearch/pull/4275/))
- Unmute test RelocationIT.testRelocationWhileIndexingRandom ([#4580](https://github.com/opensearch-project/OpenSearch/pull/4580))
- Add DecommissionService and helper to execute awareness attribute decommissioning ([#4084](https://github.com/opensearch-project/OpenSearch/pull/4084))
- Further simplification of the ZIP publication implementation ([#4360](https://github.com/opensearch-project/OpenSearch/pull/4360))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -890,6 +890,7 @@ public void testApiNamingConventions() throws Exception {
"search_shards",
"remote_store.restore",
"cluster.put_weighted_routing",
"cluster.get_weighted_routing",
"cluster.put_decommission_awareness",
"cluster.get_decommission_awareness", };
List<String> booleanReturnMethods = Arrays.asList("security.enable_user", "security.disable_user", "security.change_password");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
{
"cluster.get_weighted_routing": {
"documentation": {
"url": "https://opensearch.org/docs/latest/opensearch/rest-api/weighted-routing/get",
"description": "Fetches weighted shard routing weights"
},
"stability": "stable",
"url": {
"paths": [
{
"path": "/_cluster/routing/awareness/{attribute}/weights",
"methods": [
"GET"
],
"parts": {
"attribute": {
"type": "string",
"description": "Awareness attribute name"
}
}
}
]
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,245 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.cluster.routing;

import org.opensearch.action.admin.cluster.health.ClusterHealthResponse;
import org.opensearch.action.admin.cluster.shards.routing.weighted.get.ClusterGetWeightedRoutingResponse;
import org.opensearch.action.admin.cluster.shards.routing.weighted.put.ClusterPutWeightedRoutingResponse;
import org.opensearch.common.settings.Settings;
import org.opensearch.test.OpenSearchIntegTestCase;

import java.io.IOException;
import java.util.List;
import java.util.Map;

import static org.hamcrest.Matchers.equalTo;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0, minNumDataNodes = 3)
public class WeightedRoutingIT extends OpenSearchIntegTestCase {

public void testPutWeightedRouting() {
Settings commonSettings = Settings.builder()
.put("cluster.routing.allocation.awareness.attributes", "zone")
.put("cluster.routing.allocation.awareness.force.zone.values", "a,b,c")
.build();

logger.info("--> starting 6 nodes on different zones");
int nodeCountPerAZ = 2;

logger.info("--> starting a dedicated cluster manager node");
internalCluster().startClusterManagerOnlyNode(Settings.builder().put(commonSettings).build());

logger.info("--> starting 1 nodes on zones 'a' & 'b' & 'c'");
List<String> nodes_in_zone_a = internalCluster().startDataOnlyNodes(
nodeCountPerAZ,
Settings.builder().put(commonSettings).put("node.attr.zone", "a").build()
);
List<String> nodes_in_zone_b = internalCluster().startDataOnlyNodes(
nodeCountPerAZ,
Settings.builder().put(commonSettings).put("node.attr.zone", "b").build()
);
List<String> nodes_in_zone_c = internalCluster().startDataOnlyNodes(
nodeCountPerAZ,
Settings.builder().put(commonSettings).put("node.attr.zone", "c").build()
);

logger.info("--> waiting for nodes to form a cluster");
ClusterHealthResponse health = client().admin().cluster().prepareHealth().setWaitForNodes("7").execute().actionGet();
assertThat(health.isTimedOut(), equalTo(false));

ensureGreen();

logger.info("--> setting shard routing weights for weighted round robin");
Map<String, Double> weights = Map.of("a", 1.0, "b", 2.0, "c", 3.0);
WeightedRouting weightedRouting = new WeightedRouting("zone", weights);

ClusterPutWeightedRoutingResponse response = client().admin()
.cluster()
.prepareWeightedRouting()
.setWeightedRouting(weightedRouting)
.get();
assertEquals(response.isAcknowledged(), true);

// put call made on a data node in zone a
response = internalCluster().client(randomFrom(nodes_in_zone_a.get(0), nodes_in_zone_a.get(1)))
.admin()
.cluster()
.prepareWeightedRouting()
.setWeightedRouting(weightedRouting)
.get();
assertEquals(response.isAcknowledged(), true);
}

public void testPutWeightedRouting_InvalidAwarenessAttribute() {
Settings commonSettings = Settings.builder()
.put("cluster.routing.allocation.awareness.attributes", "zone")
.put("cluster.routing.allocation.awareness.force.zone.values", "a,b,c")
.build();

internalCluster().startNodes(
Settings.builder().put(commonSettings).put("node.attr.zone", "a").build(),
Settings.builder().put(commonSettings).put("node.attr.zone", "b").build(),
Settings.builder().put(commonSettings).put("node.attr.zone", "c").build()
);

logger.info("--> waiting for nodes to form a cluster");
ClusterHealthResponse health = client().admin().cluster().prepareHealth().setWaitForNodes("3").execute().actionGet();
assertThat(health.isTimedOut(), equalTo(false));

ensureGreen();

logger.info("--> setting shard routing weights for weighted round robin");
Map<String, Double> weights = Map.of("a", 1.0, "b", 2.0, "c", 3.0);
WeightedRouting weightedRouting = new WeightedRouting("zone1", weights);

assertThrows(
IllegalArgumentException.class,
() -> client().admin().cluster().prepareWeightedRouting().setWeightedRouting(weightedRouting).get()
);
}

public void testPutWeightedRouting_MoreThanOneZoneHasZeroWeight() {
Settings commonSettings = Settings.builder()
.put("cluster.routing.allocation.awareness.attributes", "zone")
.put("cluster.routing.allocation.awareness.force.zone.values", "a,b,c")
.build();

internalCluster().startNodes(
Settings.builder().put(commonSettings).put("node.attr.zone", "a").build(),
Settings.builder().put(commonSettings).put("node.attr.zone", "b").build(),
Settings.builder().put(commonSettings).put("node.attr.zone", "c").build()
);

logger.info("--> waiting for nodes to form a cluster");
ClusterHealthResponse health = client().admin().cluster().prepareHealth().setWaitForNodes("3").execute().actionGet();
assertThat(health.isTimedOut(), equalTo(false));

ensureGreen();

logger.info("--> setting shard routing weights for weighted round robin");
Map<String, Double> weights = Map.of("a", 1.0, "b", 0.0, "c", 0.0);
WeightedRouting weightedRouting = new WeightedRouting("zone1", weights);

assertThrows(
IllegalArgumentException.class,
() -> client().admin().cluster().prepareWeightedRouting().setWeightedRouting(weightedRouting).get()
);
}

public void testGetWeightedRouting_WeightsNotSet() {
Settings commonSettings = Settings.builder()
.put("cluster.routing.allocation.awareness.attributes", "zone")
.put("cluster.routing.allocation.awareness.force.zone.values", "a,b,c")
.build();

internalCluster().startNodes(
Settings.builder().put(commonSettings).put("node.attr.zone", "a").build(),
Settings.builder().put(commonSettings).put("node.attr.zone", "b").build(),
Settings.builder().put(commonSettings).put("node.attr.zone", "c").build()
);

logger.info("--> waiting for nodes to form a cluster");
ClusterHealthResponse health = client().admin().cluster().prepareHealth().setWaitForNodes("3").execute().actionGet();
assertThat(health.isTimedOut(), equalTo(false));

ensureGreen();

ClusterGetWeightedRoutingResponse weightedRoutingResponse = client().admin()
.cluster()
.prepareGetWeightedRouting()
.setAwarenessAttribute("zone")
.get();
assertNull(weightedRoutingResponse.weights());
}

public void testGetWeightedRouting_WeightsAreSet() throws IOException {

Settings commonSettings = Settings.builder()
.put("cluster.routing.allocation.awareness.attributes", "zone")
.put("cluster.routing.allocation.awareness.force.zone.values", "a,b,c")
.build();

int nodeCountPerAZ = 2;

logger.info("--> starting a dedicated cluster manager node");
internalCluster().startClusterManagerOnlyNode(Settings.builder().put(commonSettings).build());

logger.info("--> starting 2 nodes on zones 'a' & 'b' & 'c'");
List<String> nodes_in_zone_a = internalCluster().startDataOnlyNodes(
nodeCountPerAZ,
Settings.builder().put(commonSettings).put("node.attr.zone", "a").build()
);
List<String> nodes_in_zone_b = internalCluster().startDataOnlyNodes(
nodeCountPerAZ,
Settings.builder().put(commonSettings).put("node.attr.zone", "b").build()
);
List<String> nodes_in_zone_c = internalCluster().startDataOnlyNodes(
nodeCountPerAZ,
Settings.builder().put(commonSettings).put("node.attr.zone", "c").build()
);

logger.info("--> waiting for nodes to form a cluster");
ClusterHealthResponse health = client().admin().cluster().prepareHealth().setWaitForNodes("7").execute().actionGet();
assertThat(health.isTimedOut(), equalTo(false));

ensureGreen();

logger.info("--> setting shard routing weights for weighted round robin");
Map<String, Double> weights = Map.of("a", 1.0, "b", 2.0, "c", 3.0);
WeightedRouting weightedRouting = new WeightedRouting("zone", weights);
// put api call to set weights
ClusterPutWeightedRoutingResponse response = client().admin()
.cluster()
.prepareWeightedRouting()
.setWeightedRouting(weightedRouting)
.get();
assertEquals(response.isAcknowledged(), true);

// get api call to fetch weights
ClusterGetWeightedRoutingResponse weightedRoutingResponse = client().admin()
.cluster()
.prepareGetWeightedRouting()
.setAwarenessAttribute("zone")
.get();
assertEquals(weightedRouting, weightedRoutingResponse.weights());

// get api to fetch local node weight for a node in zone a
weightedRoutingResponse = internalCluster().client(randomFrom(nodes_in_zone_a.get(0), nodes_in_zone_a.get(1)))
.admin()
.cluster()
.prepareGetWeightedRouting()
.setAwarenessAttribute("zone")
.setRequestLocal(true)
.get();
assertEquals(weightedRouting, weightedRoutingResponse.weights());
assertEquals("1.0", weightedRoutingResponse.getLocalNodeWeight());

// get api to fetch local node weight for a node in zone b
weightedRoutingResponse = internalCluster().client(randomFrom(nodes_in_zone_b.get(0), nodes_in_zone_b.get(1)))
.admin()
.cluster()
.prepareGetWeightedRouting()
.setAwarenessAttribute("zone")
.setRequestLocal(true)
.get();
assertEquals(weightedRouting, weightedRoutingResponse.weights());
assertEquals("2.0", weightedRoutingResponse.getLocalNodeWeight());

// get api to fetch local node weight for a node in zone c
weightedRoutingResponse = internalCluster().client(randomFrom(nodes_in_zone_c.get(0), nodes_in_zone_c.get(1)))
.admin()
.cluster()
.prepareGetWeightedRouting()
.setAwarenessAttribute("zone")
.setRequestLocal(true)
.get();
assertEquals(weightedRouting, weightedRoutingResponse.weights());
assertEquals("3.0", weightedRoutingResponse.getLocalNodeWeight());
}
}
5 changes: 5 additions & 0 deletions server/src/main/java/org/opensearch/action/ActionModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@
import org.opensearch.action.admin.cluster.settings.TransportClusterUpdateSettingsAction;
import org.opensearch.action.admin.cluster.shards.ClusterSearchShardsAction;
import org.opensearch.action.admin.cluster.shards.TransportClusterSearchShardsAction;
import org.opensearch.action.admin.cluster.shards.routing.weighted.get.ClusterGetWeightedRoutingAction;
import org.opensearch.action.admin.cluster.shards.routing.weighted.get.TransportGetWeightedRoutingAction;
import org.opensearch.action.admin.cluster.shards.routing.weighted.put.ClusterAddWeightedRoutingAction;
import org.opensearch.action.admin.cluster.shards.routing.weighted.put.TransportAddWeightedRoutingAction;
import org.opensearch.action.admin.cluster.snapshots.clone.CloneSnapshotAction;
Expand Down Expand Up @@ -299,6 +301,7 @@
import org.opensearch.rest.action.admin.cluster.RestCloneSnapshotAction;
import org.opensearch.rest.action.admin.cluster.RestClusterAllocationExplainAction;
import org.opensearch.rest.action.admin.cluster.RestClusterGetSettingsAction;
import org.opensearch.rest.action.admin.cluster.RestClusterGetWeightedRoutingAction;
import org.opensearch.rest.action.admin.cluster.RestClusterHealthAction;
import org.opensearch.rest.action.admin.cluster.RestClusterPutWeightedRoutingAction;
import org.opensearch.rest.action.admin.cluster.RestClusterRerouteAction;
Expand Down Expand Up @@ -573,6 +576,7 @@ public <Request extends ActionRequest, Response extends ActionResponse> void reg
actions.register(SnapshotsStatusAction.INSTANCE, TransportSnapshotsStatusAction.class);

actions.register(ClusterAddWeightedRoutingAction.INSTANCE, TransportAddWeightedRoutingAction.class);
actions.register(ClusterGetWeightedRoutingAction.INSTANCE, TransportGetWeightedRoutingAction.class);
actions.register(IndicesStatsAction.INSTANCE, TransportIndicesStatsAction.class);
actions.register(IndicesSegmentsAction.INSTANCE, TransportIndicesSegmentsAction.class);
actions.register(IndicesShardStoresAction.INSTANCE, TransportIndicesShardStoresAction.class);
Expand Down Expand Up @@ -759,6 +763,7 @@ public void initRestHandlers(Supplier<DiscoveryNodes> nodesInCluster) {
registerHandler.accept(new RestOpenIndexAction());
registerHandler.accept(new RestAddIndexBlockAction());
registerHandler.accept(new RestClusterPutWeightedRoutingAction());
registerHandler.accept(new RestClusterGetWeightedRoutingAction());

registerHandler.accept(new RestUpdateSettingsAction());
registerHandler.accept(new RestGetSettingsAction());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.admin.cluster.shards.routing.weighted.get;

import org.opensearch.action.ActionType;

/**
* Action to get weights for weighted round-robin search routing policy.
*
* @opensearch.internal
*/
public class ClusterGetWeightedRoutingAction extends ActionType<ClusterGetWeightedRoutingResponse> {
public static final ClusterGetWeightedRoutingAction INSTANCE = new ClusterGetWeightedRoutingAction();
public static final String NAME = "cluster:admin/routing/awareness/weights/get";

private ClusterGetWeightedRoutingAction() {
super(NAME, ClusterGetWeightedRoutingResponse::new);
}
}
Loading

0 comments on commit 563122e

Please sign in to comment.