Skip to content

Commit d07a2bc

Browse files
[Segment Replication] Add global primary shard balance constraint during allocation (#6643) (#6690)
* [Segment Replication] Add global primary shard balance constraint during allocation * Add javadoc * Fix testAllConstraints test to include all constraints --------- (cherry picked from commit ad823b6) Signed-off-by: Suraj Singh <surajrider@gmail.com> Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com> Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
1 parent add245d commit d07a2bc

File tree

11 files changed

+379
-143
lines changed

11 files changed

+379
-143
lines changed

server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationAllocationIT.java

Lines changed: 55 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -54,19 +54,50 @@ public void enablePreferPrimaryBalance() {
5454
client().admin()
5555
.cluster()
5656
.prepareUpdateSettings()
57-
.setPersistentSettings(
58-
Settings.builder().put(BalancedShardsAllocator.PREFER_PER_INDEX_PRIMARY_SHARD_BALANCE.getKey(), "true")
59-
)
57+
.setPersistentSettings(Settings.builder().put(BalancedShardsAllocator.PREFER_PRIMARY_SHARD_BALANCE.getKey(), "true"))
6058
);
6159
}
6260

61+
/**
62+
* This test verifies that the overall primary balance is attained during allocation. This test verifies primary
63+
* balance per index and across all indices is maintained.
64+
* @throws Exception
65+
*/
66+
public void testGlobalPrimaryAllocation() throws Exception {
67+
internalCluster().startClusterManagerOnlyNode();
68+
final int maxReplicaCount = 1;
69+
final int maxShardCount = 1;
70+
final int nodeCount = randomIntBetween(maxReplicaCount + 1, 10);
71+
final int numberOfIndices = randomIntBetween(5, 10);
72+
73+
final List<String> nodeNames = new ArrayList<>();
74+
logger.info("--> Creating {} nodes", nodeCount);
75+
for (int i = 0; i < nodeCount; i++) {
76+
nodeNames.add(internalCluster().startNode());
77+
}
78+
enablePreferPrimaryBalance();
79+
int shardCount, replicaCount;
80+
ClusterState state;
81+
for (int i = 0; i < numberOfIndices; i++) {
82+
shardCount = randomIntBetween(1, maxShardCount);
83+
replicaCount = randomIntBetween(0, maxReplicaCount);
84+
createIndex("test" + i, shardCount, replicaCount, i % 2 == 0);
85+
logger.info("--> Creating index {} with shard count {} and replica count {}", "test" + i, shardCount, replicaCount);
86+
ensureGreen(TimeValue.timeValueSeconds(60));
87+
}
88+
state = client().admin().cluster().prepareState().execute().actionGet().getState();
89+
logger.info(ShardAllocations.printShardDistribution(state));
90+
verifyPerIndexPrimaryBalance();
91+
verifyPrimaryBalance();
92+
}
93+
6394
/**
6495
* This test verifies the happy path where primary shard allocation is balanced when multiple indices are created.
6596
*
6697
* This test in general passes without primary shard balance as well due to nature of allocation algorithm which
6798
* assigns all primary shards first followed by replica copies.
6899
*/
69-
public void testBalancedPrimaryAllocation() throws Exception {
100+
public void testPerIndexPrimaryAllocation() throws Exception {
70101
internalCluster().startClusterManagerOnlyNode();
71102
final int maxReplicaCount = 2;
72103
final int maxShardCount = 5;
@@ -213,4 +244,24 @@ private void verifyPerIndexPrimaryBalance() throws Exception {
213244
}
214245
}, 60, TimeUnit.SECONDS);
215246
}
247+
248+
private void verifyPrimaryBalance() throws Exception {
249+
assertBusy(() -> {
250+
final ClusterState currentState = client().admin().cluster().prepareState().execute().actionGet().getState();
251+
RoutingNodes nodes = currentState.getRoutingNodes();
252+
int totalPrimaryShards = 0;
253+
for (ObjectObjectCursor<String, IndexRoutingTable> index : currentState.getRoutingTable().indicesRouting()) {
254+
totalPrimaryShards += index.value.primaryShardsActive();
255+
}
256+
final int avgPrimaryShardsPerNode = (int) Math.ceil(totalPrimaryShards * 1f / currentState.getRoutingNodes().size());
257+
for (RoutingNode node : nodes) {
258+
final int primaryCount = node.shardsWithState(STARTED)
259+
.stream()
260+
.filter(ShardRouting::primary)
261+
.collect(Collectors.toList())
262+
.size();
263+
assertTrue(primaryCount <= avgPrimaryShardsPerNode);
264+
}
265+
}, 60, TimeUnit.SECONDS);
266+
}
216267
}

server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationConstraints.java

Lines changed: 12 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -10,37 +10,29 @@
1010

1111
import java.util.HashMap;
1212
import java.util.Map;
13-
import java.util.function.Predicate;
1413

15-
import static org.opensearch.cluster.routing.allocation.RebalanceConstraints.PREFER_PRIMARY_SHARD_BALANCE_NODE_BREACH_ID;
16-
import static org.opensearch.cluster.routing.allocation.RebalanceConstraints.isPrimaryShardsPerIndexPerNodeBreached;
14+
import static org.opensearch.cluster.routing.allocation.ConstraintTypes.CLUSTER_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID;
15+
import static org.opensearch.cluster.routing.allocation.ConstraintTypes.INDEX_SHARD_PER_NODE_BREACH_CONSTRAINT_ID;
16+
import static org.opensearch.cluster.routing.allocation.ConstraintTypes.INDEX_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID;
17+
import static org.opensearch.cluster.routing.allocation.ConstraintTypes.isIndexShardsPerNodeBreached;
18+
import static org.opensearch.cluster.routing.allocation.ConstraintTypes.isPerIndexPrimaryShardsPerNodeBreached;
19+
import static org.opensearch.cluster.routing.allocation.ConstraintTypes.isPrimaryShardsPerNodeBreached;
1720

1821
/**
19-
* Allocation constraints specify conditions which, if breached, reduce the
20-
* priority of a node for receiving unassigned shard allocations.
22+
* Allocation constraints specify conditions which, if breached, reduce the priority of a node for receiving unassigned
23+
* shard allocations. Weight calculation in other scenarios like shard movement and re-balancing remain unaffected by
24+
* this constraint.
2125
*
2226
* @opensearch.internal
2327
*/
2428
public class AllocationConstraints {
25-
26-
/**
27-
*
28-
* This constraint is only applied for unassigned shards to avoid overloading a newly added node.
29-
* Weight calculation in other scenarios like shard movement and re-balancing remain unaffected by this constraint.
30-
*/
31-
public final static String INDEX_SHARD_PER_NODE_BREACH_CONSTRAINT_ID = "index.shard.breach.constraint";
3229
private Map<String, Constraint> constraints;
3330

3431
public AllocationConstraints() {
3532
this.constraints = new HashMap<>();
36-
this.constraints.putIfAbsent(
37-
INDEX_SHARD_PER_NODE_BREACH_CONSTRAINT_ID,
38-
new Constraint(INDEX_SHARD_PER_NODE_BREACH_CONSTRAINT_ID, isIndexShardsPerNodeBreached())
39-
);
40-
this.constraints.putIfAbsent(
41-
PREFER_PRIMARY_SHARD_BALANCE_NODE_BREACH_ID,
42-
new Constraint(PREFER_PRIMARY_SHARD_BALANCE_NODE_BREACH_ID, isPrimaryShardsPerIndexPerNodeBreached())
43-
);
33+
this.constraints.putIfAbsent(INDEX_SHARD_PER_NODE_BREACH_CONSTRAINT_ID, new Constraint(isIndexShardsPerNodeBreached()));
34+
this.constraints.putIfAbsent(INDEX_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID, new Constraint(isPerIndexPrimaryShardsPerNodeBreached()));
35+
this.constraints.putIfAbsent(CLUSTER_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID, new Constraint(isPrimaryShardsPerNodeBreached()));
4436
}
4537

4638
public void updateAllocationConstraint(String constraint, boolean enable) {
@@ -51,26 +43,4 @@ public long weight(ShardsBalancer balancer, BalancedShardsAllocator.ModelNode no
5143
Constraint.ConstraintParams params = new Constraint.ConstraintParams(balancer, node, index);
5244
return params.weight(constraints);
5345
}
54-
55-
/**
56-
* Constraint to control number of shards of an index allocated on a single
57-
* node.
58-
*
59-
* In current weight function implementation, when a node has significantly
60-
* fewer shards than other nodes (e.g. during single new node addition or node
61-
* replacement), its weight is much less than other nodes. All shard allocations
62-
* at this time tend to land on the new node with skewed weight. This breaks
63-
* index level balance in the cluster, by creating all shards of the same index
64-
* on one node, often resulting in a hotspot on that node.
65-
*
66-
* This constraint is breached when balancer attempts to allocate more than
67-
* average shards per index per node.
68-
*/
69-
public static Predicate<Constraint.ConstraintParams> isIndexShardsPerNodeBreached() {
70-
return (params) -> {
71-
int currIndexShardsOnNode = params.getNode().numShards(params.getIndex());
72-
int allowedIndexShardsPerNode = (int) Math.ceil(params.getBalancer().avgShardsPerNode(params.getIndex()));
73-
return (currIndexShardsOnNode >= allowedIndexShardsPerNode);
74-
};
75-
}
7646
}

server/src/main/java/org/opensearch/cluster/routing/allocation/Constraint.java

Lines changed: 3 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,10 @@
1212
import org.opensearch.cluster.routing.allocation.allocator.ShardsBalancer;
1313

1414
import java.util.Map;
15-
import java.util.Objects;
1615
import java.util.function.Predicate;
1716

17+
import static org.opensearch.cluster.routing.allocation.ConstraintTypes.CONSTRAINT_WEIGHT;
18+
1819
/**
1920
* Defines a constraint useful to de-prioritize certain nodes as target of unassigned shards used in {@link AllocationConstraints} or
2021
* re-balancing target used in {@link RebalanceConstraints}
@@ -23,45 +24,22 @@
2324
*/
2425
public class Constraint implements Predicate<Constraint.ConstraintParams> {
2526

26-
public final static long CONSTRAINT_WEIGHT = 1000000L;
27-
28-
private String name;
29-
3027
private boolean enable;
3128
private Predicate<ConstraintParams> predicate;
3229

33-
public Constraint(String name, Predicate<ConstraintParams> constraintPredicate) {
34-
this.name = name;
30+
public Constraint(Predicate<ConstraintParams> constraintPredicate) {
3531
this.predicate = constraintPredicate;
36-
this.enable = false;
3732
}
3833

3934
@Override
4035
public boolean test(ConstraintParams constraintParams) {
4136
return this.enable && predicate.test(constraintParams);
4237
}
4338

44-
public String getName() {
45-
return name;
46-
}
47-
4839
public void setEnable(boolean enable) {
4940
this.enable = enable;
5041
}
5142

52-
@Override
53-
public boolean equals(Object o) {
54-
if (this == o) return true;
55-
if (o == null || getClass() != o.getClass()) return false;
56-
Constraint that = (Constraint) o;
57-
return name.equals(that.name);
58-
}
59-
60-
@Override
61-
public int hashCode() {
62-
return Objects.hash(name);
63-
}
64-
6543
static class ConstraintParams {
6644
private ShardsBalancer balancer;
6745
private BalancedShardsAllocator.ModelNode node;
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.cluster.routing.allocation;
10+
11+
import java.util.function.Predicate;
12+
13+
/**
14+
* Defines different constraints definitions
15+
*
16+
* @opensearch.internal
17+
*/
18+
public class ConstraintTypes {
19+
public final static long CONSTRAINT_WEIGHT = 1000000L;
20+
21+
/**
22+
* Defines per index constraint which is breached when a node contains more than avg number of primary shards for an index
23+
*/
24+
public final static String INDEX_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID = "index.primary.shard.balance.constraint";
25+
26+
/**
27+
* Defines a cluster constraint which is breached when a node contains more than avg primary shards across all indices
28+
*/
29+
public final static String CLUSTER_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID = "cluster.primary.shard.balance.constraint";
30+
31+
/**
32+
* Defines an index constraint which is breached when a node contains more than avg number of shards for an index
33+
*/
34+
public final static String INDEX_SHARD_PER_NODE_BREACH_CONSTRAINT_ID = "index.shard.count.constraint";
35+
36+
/**
37+
* Constraint to control number of shards of an index allocated on a single
38+
* node.
39+
*
40+
* In current weight function implementation, when a node has significantly
41+
* fewer shards than other nodes (e.g. during single new node addition or node
42+
* replacement), its weight is much less than other nodes. All shard allocations
43+
* at this time tend to land on the new node with skewed weight. This breaks
44+
* index level balance in the cluster, by creating all shards of the same index
45+
* on one node, often resulting in a hotspot on that node.
46+
*
47+
* This constraint is breached when balancer attempts to allocate more than
48+
* average shards per index per node.
49+
*/
50+
public static Predicate<Constraint.ConstraintParams> isIndexShardsPerNodeBreached() {
51+
return (params) -> {
52+
int currIndexShardsOnNode = params.getNode().numShards(params.getIndex());
53+
int allowedIndexShardsPerNode = (int) Math.ceil(params.getBalancer().avgShardsPerNode(params.getIndex()));
54+
return (currIndexShardsOnNode >= allowedIndexShardsPerNode);
55+
};
56+
}
57+
58+
/**
59+
* Defines a predicate which returns true when specific to an index, a node contains more than average number of primary
60+
* shards. This constraint is used in weight calculation during allocation and rebalancing. When breached a high weight
61+
* {@link ConstraintTypes#CONSTRAINT_WEIGHT} is assigned to node resulting in lesser chances of node being selected
62+
* as allocation or rebalancing target
63+
*/
64+
public static Predicate<Constraint.ConstraintParams> isPerIndexPrimaryShardsPerNodeBreached() {
65+
return (params) -> {
66+
int perIndexPrimaryShardCount = params.getNode().numPrimaryShards(params.getIndex());
67+
int perIndexAllowedPrimaryShardCount = (int) Math.ceil(params.getBalancer().avgPrimaryShardsPerNode(params.getIndex()));
68+
return perIndexPrimaryShardCount > perIndexAllowedPrimaryShardCount;
69+
};
70+
}
71+
72+
/**
73+
* Defines a predicate which returns true when a node contains more than average number of primary shards. This
74+
* constraint is used in weight calculation during allocation only. When breached a high weight {@link ConstraintTypes#CONSTRAINT_WEIGHT}
75+
* is assigned to node resulting in lesser chances of node being selected as allocation target
76+
*/
77+
public static Predicate<Constraint.ConstraintParams> isPrimaryShardsPerNodeBreached() {
78+
return (params) -> {
79+
int primaryShardCount = params.getNode().numPrimaryShards();
80+
int allowedPrimaryShardCount = (int) Math.ceil(params.getBalancer().avgPrimaryShardsPerNode());
81+
return primaryShardCount >= allowedPrimaryShardCount;
82+
};
83+
}
84+
}

server/src/main/java/org/opensearch/cluster/routing/allocation/RebalanceConstraints.java

Lines changed: 4 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,9 @@
1313

1414
import java.util.HashMap;
1515
import java.util.Map;
16-
import java.util.function.Predicate;
1716

18-
import static org.opensearch.cluster.routing.allocation.allocator.BalancedShardsAllocator.PREFER_PER_INDEX_PRIMARY_SHARD_BALANCE;
17+
import static org.opensearch.cluster.routing.allocation.ConstraintTypes.INDEX_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID;
18+
import static org.opensearch.cluster.routing.allocation.ConstraintTypes.isPerIndexPrimaryShardsPerNodeBreached;
1919

2020
/**
2121
* Constraints applied during rebalancing round; specify conditions which, if breached, reduce the
@@ -24,15 +24,12 @@
2424
* @opensearch.internal
2525
*/
2626
public class RebalanceConstraints {
27-
public final static String PREFER_PRIMARY_SHARD_BALANCE_NODE_BREACH_ID = PREFER_PER_INDEX_PRIMARY_SHARD_BALANCE.getKey();
27+
2828
private Map<String, Constraint> constraints;
2929

3030
public RebalanceConstraints() {
3131
this.constraints = new HashMap<>();
32-
this.constraints.putIfAbsent(
33-
PREFER_PRIMARY_SHARD_BALANCE_NODE_BREACH_ID,
34-
new Constraint(PREFER_PRIMARY_SHARD_BALANCE_NODE_BREACH_ID, isPrimaryShardsPerIndexPerNodeBreached())
35-
);
32+
this.constraints.putIfAbsent(INDEX_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID, new Constraint(isPerIndexPrimaryShardsPerNodeBreached()));
3633
}
3734

3835
public void updateRebalanceConstraint(String constraint, boolean enable) {
@@ -43,16 +40,4 @@ public long weight(ShardsBalancer balancer, BalancedShardsAllocator.ModelNode no
4340
Constraint.ConstraintParams params = new Constraint.ConstraintParams(balancer, node, index);
4441
return params.weight(constraints);
4542
}
46-
47-
/**
48-
* When primary balance is preferred, add node constraint of average primary shards per node to give the node a
49-
* higher weight resulting in lesser chances of being target of unassigned shard allocation or rebalancing target node
50-
*/
51-
public static Predicate<Constraint.ConstraintParams> isPrimaryShardsPerIndexPerNodeBreached() {
52-
return (params) -> {
53-
int currPrimaryShardsOnNode = params.getNode().numPrimaryShards(params.getIndex());
54-
int allowedPrimaryShardsPerNode = (int) Math.ceil(params.getBalancer().avgPrimaryShardsPerNode(params.getIndex()));
55-
return currPrimaryShardsOnNode > allowedPrimaryShardsPerNode;
56-
};
57-
}
5843
}

0 commit comments

Comments
 (0)