Skip to content

Commit

Permalink
Make allocation decisions at node level first for pending task optimi… (
Browse files Browse the repository at this point in the history
opensearch-project#534)

* Make allocation decisions at node level first for pending task optimization

Signed-off-by: Ankit Jain <akjain@amazon.com>

* Addressing review comments

Signed-off-by: Ankit Jain <akjain@amazon.com>

* Fixing benchmark and adding debug mode tests

Signed-off-by: Ankit Jain <akjain@amazon.com>

* Fixing typo in previous commit

Signed-off-by: Ankit Jain <akjain@amazon.com>

* Moving test file to correct package

Signed-off-by: Ankit Jain <akjain@amazon.com>

* Addressing review comments

Signed-off-by: Ankit Jain <akjain@amazon.com>
  • Loading branch information
jainankitk authored May 20, 2021
1 parent cbcae73 commit e0c8b7e
Show file tree
Hide file tree
Showing 13 changed files with 691 additions and 86 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,6 @@

package org.opensearch.benchmark.routing.allocation;

import org.opensearch.Version;
import org.opensearch.cluster.ClusterName;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.cluster.routing.RoutingTable;
import org.opensearch.cluster.routing.ShardRoutingState;
import org.opensearch.cluster.routing.allocation.AllocationService;
import org.opensearch.common.settings.Settings;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
Expand All @@ -52,8 +42,20 @@
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.Warmup;
import org.opensearch.Version;
import org.opensearch.cluster.ClusterName;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.cluster.routing.RoutingTable;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.ShardRoutingState;
import org.opensearch.cluster.routing.allocation.AllocationService;
import org.opensearch.common.settings.Settings;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;

@Fork(3)
Expand All @@ -71,75 +73,103 @@ public class AllocationBenchmark {
// support to constrain the combinations of benchmark parameters and we do not want to rely on OptionsBuilder as each benchmark would
// need its own main method and we cannot execute more than one class with a main method per JAR.
@Param({
// indices| shards| replicas| nodes
" 10| 1| 0| 1",
" 10| 3| 0| 1",
" 10| 10| 0| 1",
" 100| 1| 0| 1",
" 100| 3| 0| 1",
" 100| 10| 0| 1",

" 10| 1| 0| 10",
" 10| 3| 0| 10",
" 10| 10| 0| 10",
" 100| 1| 0| 10",
" 100| 3| 0| 10",
" 100| 10| 0| 10",

" 10| 1| 1| 10",
" 10| 3| 1| 10",
" 10| 10| 1| 10",
" 100| 1| 1| 10",
" 100| 3| 1| 10",
" 100| 10| 1| 10",

" 10| 1| 2| 10",
" 10| 3| 2| 10",
" 10| 10| 2| 10",
" 100| 1| 2| 10",
" 100| 3| 2| 10",
" 100| 10| 2| 10",

" 10| 1| 0| 50",
" 10| 3| 0| 50",
" 10| 10| 0| 50",
" 100| 1| 0| 50",
" 100| 3| 0| 50",
" 100| 10| 0| 50",

" 10| 1| 1| 50",
" 10| 3| 1| 50",
" 10| 10| 1| 50",
" 100| 1| 1| 50",
" 100| 3| 1| 50",
" 100| 10| 1| 50",

" 10| 1| 2| 50",
" 10| 3| 2| 50",
" 10| 10| 2| 50",
" 100| 1| 2| 50",
" 100| 3| 2| 50",
" 100| 10| 2| 50" })
public String indicesShardsReplicasNodes = "10|1|0|1";
// indices| shards| replicas| source| target| concurrentRecoveries
" 10| 2| 0| 1| 1| 1|",
" 10| 3| 0| 1| 1| 2|",
" 10| 10| 0| 1| 1| 5|",
" 100| 1| 0| 1| 1| 10|",
" 100| 3| 0| 1| 1| 10|",
" 100| 10| 0| 1| 1| 10|",

" 10| 2| 0| 10| 10| 1|",
" 10| 3| 0| 10| 5| 2|",
" 10| 10| 0| 10| 5| 5|",
" 100| 1| 0| 5| 10| 5|",
" 100| 3| 0| 10| 5| 5|",
" 100| 10| 0| 10| 20| 6|",

" 10| 1| 1| 10| 10| 1|",
" 10| 3| 1| 10| 3| 3|",
" 10| 10| 1| 5| 12| 5|",
" 100| 1| 1| 10| 10| 6|",
" 100| 3| 1| 10| 5| 8|",
" 100| 10| 1| 8| 17| 8|",

" 10| 1| 2| 10| 10| 1|",
" 10| 3| 2| 10| 5| 3|",
" 10| 10| 2| 5| 10| 5|",
" 100| 1| 2| 10| 8| 7|",
" 100| 3| 2| 13| 17| 5|",
" 100| 10| 2| 10| 20| 8|",

" 10| 2| 1| 20| 20| 1|",
" 10| 3| 1| 20| 30| 1|",
" 10| 10| 1| 20| 10| 3|",
" 100| 1| 1| 20| 5| 5|",
" 100| 3| 1| 20| 23| 6|",
" 100| 10| 1| 40| 20| 8|",

" 10| 3| 2| 50| 30| 1|",
" 10| 3| 2| 50| 25| 1|",
" 10| 10| 1| 50| 33| 2|",
" 100| 1| 1| 40| 50| 2|",
" 100| 3| 1| 50| 70| 3|",
" 100| 10| 1| 60| 50| 3|",

" 10| 10| 2| 50| 50| 1|",
" 10| 3| 2| 50| 30| 1|",
" 10| 10| 2| 50| 40| 2|",
" 100| 1| 2| 40| 50| 2|",
" 100| 3| 2| 50| 30| 6|",
" 100| 10| 2| 33| 55| 6|",

" 500| 60| 1| 100| 100| 12|",
" 500| 60| 1| 100| 40| 12|",
" 500| 60| 1| 40| 100| 12|",

" 50| 60| 1| 100| 100| 6|",
" 50| 60| 1| 100| 40| 6|",
" 50| 60| 1| 40| 100| 6|" })
public String indicesShardsReplicasSourceTargetRecoveries = "10|1|0|1|1|1";

public int numTags = 2;
public int numZone = 3;
public int concurrentRecoveries;
public int numIndices;
public int numShards;
public int numReplicas;
public int sourceNodes;
public int targetNodes;
public int clusterConcurrentRecoveries;

private AllocationService strategy;
private AllocationService initialClusterStrategy;
private AllocationService clusterExcludeStrategy;
private AllocationService clusterZoneAwareExcludeStrategy;
private ClusterState initialClusterState;

@Setup
public void setUp() throws Exception {
final String[] params = indicesShardsReplicasNodes.split("\\|");
final String[] params = indicesShardsReplicasSourceTargetRecoveries.split("\\|");
numIndices = toInt(params[0]);
numShards = toInt(params[1]);
numReplicas = toInt(params[2]);
sourceNodes = toInt(params[3]);
targetNodes = toInt(params[4]);
concurrentRecoveries = toInt(params[5]);

int numIndices = toInt(params[0]);
int numShards = toInt(params[1]);
int numReplicas = toInt(params[2]);
int numNodes = toInt(params[3]);
int totalShardCount = (numReplicas + 1) * numShards * numIndices;

strategy = Allocators.createAllocationService(
Settings.builder().put("cluster.routing.allocation.awareness.attributes", "tag").build()
initialClusterStrategy = Allocators.createAllocationService(
Settings.builder()
.put("cluster.routing.allocation.awareness.attributes", "zone")
.put("cluster.routing.allocation.node_concurrent_recoveries", "20")
.put("cluster.routing.allocation.exclude.tag", "tag_0")
.build()
);

// We'll try to move nodes from tag_1 to tag_0
clusterConcurrentRecoveries = Math.min(sourceNodes, targetNodes) * concurrentRecoveries;

Metadata.Builder mb = Metadata.builder();
for (int i = 1; i <= numIndices; i++) {
mb.put(
Expand All @@ -155,31 +185,96 @@ public void setUp() throws Exception {
rb.addAsNew(metadata.index("test_" + i));
}
RoutingTable routingTable = rb.build();
DiscoveryNodes.Builder nb = DiscoveryNodes.builder();
for (int i = 1; i <= numNodes; i++) {
nb.add(Allocators.newNode("node" + i, Collections.singletonMap("tag", "tag_" + (i % numTags))));
}
initialClusterState = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY))
.metadata(metadata)
.routingTable(routingTable)
.nodes(nb)
.nodes(setUpClusterNodes(sourceNodes, targetNodes))
.build();
// Start all unassigned shards
initialClusterState = initialClusterStrategy.reroute(initialClusterState, "reroute");
while (initialClusterState.getRoutingNodes().hasUnassignedShards()) {
initialClusterState = initialClusterStrategy.applyStartedShards(
initialClusterState,
initialClusterState.getRoutingNodes().shardsWithState(ShardRoutingState.INITIALIZING)
);
initialClusterState = initialClusterStrategy.reroute(initialClusterState, "reroute");
}
// Ensure all shards are started
while (initialClusterState.getRoutingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size() > 0) {
initialClusterState = initialClusterStrategy.applyStartedShards(
initialClusterState,
initialClusterState.getRoutingNodes().shardsWithState(ShardRoutingState.INITIALIZING)
);
}

assert (initialClusterState.getRoutingNodes().shardsWithState(ShardRoutingState.STARTED).size() == totalShardCount);
assert (initialClusterState.getRoutingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size() == 0);
assert (initialClusterState.getRoutingNodes().shardsWithState(ShardRoutingState.RELOCATING).size() == 0);
// make sure shards are only allocated on tag1
for (ShardRouting startedShard : initialClusterState.getRoutingNodes().shardsWithState(ShardRoutingState.STARTED)) {
assert (initialClusterState.getRoutingNodes().node(startedShard.currentNodeId()).node().getAttributes().get("tag")).equals(
"tag_1"
);
}
}

private int toInt(String v) {
return Integer.valueOf(v.trim());
}

@Benchmark
public ClusterState measureAllocation() {
public ClusterState measureExclusionOnZoneAwareStartedShard() throws Exception {
ClusterState clusterState = initialClusterState;
while (clusterState.getRoutingNodes().hasUnassignedShards()) {
clusterState = strategy.applyStartedShards(
clusterZoneAwareExcludeStrategy = Allocators.createAllocationService(
Settings.builder()
.put("cluster.routing.allocation.awareness.attributes", "zone")
.put("cluster.routing.allocation.cluster_concurrent_recoveries", String.valueOf(clusterConcurrentRecoveries))
.put("cluster.routing.allocation.node_concurrent_recoveries", String.valueOf(concurrentRecoveries))
.put("cluster.routing.allocation.exclude.tag", "tag_1")
.build()
);
clusterState = clusterZoneAwareExcludeStrategy.reroute(clusterState, "reroute");
return clusterState;
}

@Benchmark
public ClusterState measureShardRelocationComplete() throws Exception {
ClusterState clusterState = initialClusterState;
clusterZoneAwareExcludeStrategy = Allocators.createAllocationService(
Settings.builder()
.put("cluster.routing.allocation.awareness.attributes", "zone")
.put("cluster.routing.allocation.node_concurrent_recoveries", String.valueOf(concurrentRecoveries))
.put("cluster.routing.allocation.cluster_concurrent_recoveries", String.valueOf(clusterConcurrentRecoveries))
.put("cluster.routing.allocation.exclude.tag", "tag_1")
.build()
);
clusterState = clusterZoneAwareExcludeStrategy.reroute(clusterState, "reroute");
while (clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size() > 0) {
clusterState = clusterZoneAwareExcludeStrategy.applyStartedShards(
clusterState,
clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.INITIALIZING)
);
clusterState = strategy.reroute(clusterState, "reroute");
}
for (ShardRouting startedShard : clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.STARTED)) {
assert (clusterState.getRoutingNodes().node(startedShard.currentNodeId()).node().getAttributes().get("tag")).equals("tag_0");
}
return clusterState;
}

private DiscoveryNodes.Builder setUpClusterNodes(int sourceNodes, int targetNodes) {
DiscoveryNodes.Builder nb = DiscoveryNodes.builder();
for (int i = 1; i <= sourceNodes; i++) {
Map<String, String> attributes = new HashMap<>();
attributes.put("tag", "tag_" + 1);
attributes.put("zone", "zone_" + (i % numZone));
nb.add(Allocators.newNode("node_s_" + i, attributes));
}
for (int j = 1; j <= targetNodes; j++) {
Map<String, String> attributes = new HashMap<>();
attributes.put("tag", "tag_" + 0);
attributes.put("zone", "zone_" + (j % numZone));
nb.add(Allocators.newNode("node_t_" + j, attributes));
}
return nb;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import org.opensearch.cluster.routing.allocation.decider.AllocationDeciders;
import org.opensearch.cluster.routing.allocation.decider.AwarenessAllocationDecider;
import org.opensearch.cluster.routing.allocation.decider.ClusterRebalanceAllocationDecider;
import org.opensearch.cluster.routing.allocation.decider.ConcurrentRecoveriesAllocationDecider;
import org.opensearch.cluster.routing.allocation.decider.ConcurrentRebalanceAllocationDecider;
import org.opensearch.cluster.routing.allocation.decider.DiskThresholdDecider;
import org.opensearch.cluster.routing.allocation.decider.EnableAllocationDecider;
Expand Down Expand Up @@ -244,6 +245,7 @@ public static Collection<AllocationDecider> createAllocationDeciders(Settings se
addAllocationDecider(deciders, new RebalanceOnlyWhenActiveAllocationDecider());
addAllocationDecider(deciders, new ClusterRebalanceAllocationDecider(settings, clusterSettings));
addAllocationDecider(deciders, new ConcurrentRebalanceAllocationDecider(settings, clusterSettings));
addAllocationDecider(deciders, new ConcurrentRecoveriesAllocationDecider(settings, clusterSettings));
addAllocationDecider(deciders, new EnableAllocationDecider(settings, clusterSettings));
addAllocationDecider(deciders, new NodeVersionAllocationDecider());
addAllocationDecider(deciders, new SnapshotInProgressAllocationDecider());
Expand Down
Loading

0 comments on commit e0c8b7e

Please sign in to comment.