Skip to content

Commit 1f0f270

Browse files
author
Gagan Singh Saini
committed
Making Shard Limit Validator Index Tier Agnostic
Signed-off-by: Gagan Singh Saini <gagasa@amazon.com>
1 parent cb65261 commit 1f0f270

File tree

9 files changed

+362
-82
lines changed

9 files changed

+362
-82
lines changed

server/src/internalClusterTest/java/org/opensearch/cluster/shards/ClusterShardLimitIT.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -385,11 +385,11 @@ public void testIncreaseReplicasOverLimit() {
385385
} catch (IllegalArgumentException e) {
386386
String expectedError = "Validation Failed: 1: this action would add ["
387387
+ (dataNodes * firstShardCount)
388-
+ "] total shards, but this cluster currently has ["
388+
+ "] total Hot shards, but this cluster currently has ["
389389
+ firstShardCount
390390
+ "]/["
391391
+ dataNodes * shardsPerNode
392-
+ "] maximum shards open;";
392+
+ "] maximum Hot shards open;";
393393
assertEquals(expectedError, e.getMessage());
394394
}
395395
Metadata clusterState = client().admin().cluster().prepareState().get().getState().metadata();
@@ -446,11 +446,11 @@ public void testChangingMultipleIndicesOverLimit() {
446446

447447
String expectedError = "Validation Failed: 1: this action would add ["
448448
+ difference
449-
+ "] total shards, but this cluster currently has ["
449+
+ "] total Hot shards, but this cluster currently has ["
450450
+ totalShardsBefore
451451
+ "]/["
452452
+ dataNodes * shardsPerNode
453-
+ "] maximum shards open;";
453+
+ "] maximum Hot shards open;";
454454
assertEquals(expectedError, e.getMessage());
455455
}
456456
Metadata clusterState = client().admin().cluster().prepareState().get().getState().metadata();
@@ -779,22 +779,22 @@ private void verifyException(int dataNodes, ShardCounts counts, IllegalArgumentE
779779
int maxShards = counts.getShardsPerNode() * dataNodes;
780780
String expectedError = "Validation Failed: 1: this action would add ["
781781
+ totalShards
782-
+ "] total shards, but this cluster currently has ["
782+
+ "] total Hot shards, but this cluster currently has ["
783783
+ currentShards
784784
+ "]/["
785785
+ maxShards
786-
+ "] maximum shards open;";
786+
+ "] maximum Hot shards open;";
787787
assertEquals(expectedError, e.getMessage());
788788
}
789789

790790
private void verifyException(int maxShards, int currentShards, int extraShards, IllegalArgumentException e) {
791791
String expectedError = "Validation Failed: 1: this action would add ["
792792
+ extraShards
793-
+ "] total shards, but this cluster currently has ["
793+
+ "] total Hot shards, but this cluster currently has ["
794794
+ currentShards
795795
+ "]/["
796796
+ maxShards
797-
+ "] maximum shards open;";
797+
+ "] maximum Hot shards open;";
798798
assertEquals(expectedError, e.getMessage());
799799
}
800800

server/src/main/java/org/opensearch/action/admin/cluster/settings/TransportClusterUpdateSettingsAction.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@
6464
import java.io.IOException;
6565

6666
import static org.opensearch.cluster.routing.allocation.decider.ShardsLimitAllocationDecider.CLUSTER_TOTAL_PRIMARY_SHARDS_PER_NODE_SETTING;
67+
import static org.opensearch.cluster.routing.allocation.decider.ShardsLimitAllocationDecider.CLUSTER_TOTAL_REMOTE_CAPABLE_PRIMARY_SHARDS_PER_NODE_SETTING;
6768
import static org.opensearch.cluster.service.ClusterManagerTask.CLUSTER_UPDATE_SETTINGS;
6869
import static org.opensearch.index.remote.RemoteStoreUtils.checkAndFinalizeRemoteStoreMigration;
6970

@@ -329,11 +330,16 @@ private void validateAllNodesOfSameType(DiscoveryNodes discoveryNodes) {
329330

330331
private void validateClusterTotalPrimaryShardsPerNodeSetting(ClusterState currentState, ClusterUpdateSettingsRequest request) {
331332
if (request.transientSettings().hasValue(CLUSTER_TOTAL_PRIMARY_SHARDS_PER_NODE_SETTING.getKey())
332-
|| request.persistentSettings().hasValue(CLUSTER_TOTAL_PRIMARY_SHARDS_PER_NODE_SETTING.getKey())) {
333+
|| request.persistentSettings().hasValue(CLUSTER_TOTAL_PRIMARY_SHARDS_PER_NODE_SETTING.getKey())
334+
|| request.transientSettings().hasValue(CLUSTER_TOTAL_REMOTE_CAPABLE_PRIMARY_SHARDS_PER_NODE_SETTING.getKey())
335+
|| request.persistentSettings().hasValue(CLUSTER_TOTAL_REMOTE_CAPABLE_PRIMARY_SHARDS_PER_NODE_SETTING.getKey())) {
333336

334337
Settings settings = Settings.builder().put(request.transientSettings()).put(request.persistentSettings()).build();
335338

336-
int newValue = CLUSTER_TOTAL_PRIMARY_SHARDS_PER_NODE_SETTING.get(settings);
339+
int newValue = Math.max(
340+
CLUSTER_TOTAL_PRIMARY_SHARDS_PER_NODE_SETTING.get(settings),
341+
CLUSTER_TOTAL_REMOTE_CAPABLE_PRIMARY_SHARDS_PER_NODE_SETTING.get(settings)
342+
);
337343

338344
// If default value (-1), no validation needed
339345
if (newValue == -1) {
@@ -351,6 +357,8 @@ private void validateClusterTotalPrimaryShardsPerNodeSetting(ClusterState curren
351357
throw new IllegalArgumentException(
352358
"Setting ["
353359
+ CLUSTER_TOTAL_PRIMARY_SHARDS_PER_NODE_SETTING.getKey()
360+
+ "] or ["
361+
+ CLUSTER_TOTAL_REMOTE_CAPABLE_PRIMARY_SHARDS_PER_NODE_SETTING.getKey()
354362
+ "] can only be used with remote store enabled clusters"
355363
);
356364
}

server/src/main/java/org/opensearch/cluster/metadata/Metadata.java

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
import org.opensearch.cluster.block.ClusterBlockLevel;
4949
import org.opensearch.cluster.coordination.CoordinationMetadata;
5050
import org.opensearch.cluster.decommission.DecommissionAttributeMetadata;
51+
import org.opensearch.cluster.routing.RoutingPool;
5152
import org.opensearch.common.Nullable;
5253
import org.opensearch.common.UUIDs;
5354
import org.opensearch.common.annotation.PublicApi;
@@ -270,7 +271,8 @@ static Custom fromXContent(XContentParser parser, String name) throws IOExceptio
270271
private final Map<String, Custom> customs;
271272

272273
private final transient int totalNumberOfShards; // Transient ? not serializable anyway?
273-
private final int totalOpenIndexShards;
274+
private final int totalOpenHotIndexShards;
275+
private final int totalOpenWarmIndexShards;
274276

275277
private final String[] allIndices;
276278
private final String[] visibleIndices;
@@ -315,15 +317,21 @@ static Custom fromXContent(XContentParser parser, String name) throws IOExceptio
315317
this.customs = Collections.unmodifiableMap(customs);
316318
this.templates = new TemplatesMetadata(templates);
317319
int totalNumberOfShards = 0;
318-
int totalOpenIndexShards = 0;
320+
int totalOpenHotIndexShards = 0;
321+
int totalOpenWarmIndexShards = 0;
319322
for (IndexMetadata cursor : indices.values()) {
320323
totalNumberOfShards += cursor.getTotalNumberOfShards();
321324
if (IndexMetadata.State.OPEN.equals(cursor.getState())) {
322-
totalOpenIndexShards += cursor.getTotalNumberOfShards();
325+
if (RoutingPool.getIndexPool(cursor) == RoutingPool.REMOTE_CAPABLE) {
326+
totalOpenWarmIndexShards += cursor.getTotalNumberOfShards();
327+
} else {
328+
totalOpenHotIndexShards += cursor.getTotalNumberOfShards();
329+
}
323330
}
324331
}
325332
this.totalNumberOfShards = totalNumberOfShards;
326-
this.totalOpenIndexShards = totalOpenIndexShards;
333+
this.totalOpenHotIndexShards = totalOpenHotIndexShards;
334+
this.totalOpenWarmIndexShards = totalOpenWarmIndexShards;
327335

328336
this.allIndices = allIndices;
329337
this.visibleIndices = visibleIndices;
@@ -904,8 +912,17 @@ public int getTotalNumberOfShards() {
904912
* replicas, but does not include shards that are part of closed indices.
905913
* @return The total number of open shards from all indices.
906914
*/
907-
public int getTotalOpenIndexShards() {
908-
return this.totalOpenIndexShards;
915+
public int getTotalOpenHotIndexShards() {
916+
return this.totalOpenHotIndexShards;
917+
}
918+
919+
/**
920+
* Gets the total number of open shards from all indices. Includes
921+
* replicas, but does not include shards that are part of closed indices.
922+
* @return The total number of open shards from all indices.
923+
*/
924+
public int getTotalOpenWarmIndexShards() {
925+
return this.totalOpenWarmIndexShards;
909926
}
910927

911928
/**

server/src/main/java/org/opensearch/cluster/metadata/MetadataUpdateSettingsService.java

Lines changed: 11 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -272,15 +272,11 @@ public ClusterState execute(ClusterState currentState) {
272272
}
273273

274274
// Verify that this won't take us over the cluster shard limit.
275-
int totalNewShards = Arrays.stream(request.indices())
276-
.mapToInt(i -> getTotalNewShards(i, currentState, updatedNumberOfReplicas))
277-
.sum();
278-
Optional<String> error = shardLimitValidator.checkShardLimit(totalNewShards, currentState);
279-
if (error.isPresent()) {
280-
ValidationException ex = new ValidationException();
281-
ex.addValidationError(error.get());
282-
throw ex;
283-
}
275+
shardLimitValidator.validateShardLimitForIndices(
276+
request.indices(),
277+
currentState,
278+
index -> getTotalNewShards(index, currentState, updatedNumberOfReplicas)
279+
);
284280

285281
/*
286282
* We do not update the in-sync allocation IDs as they will be removed upon the first index operation which makes
@@ -315,15 +311,12 @@ public ClusterState execute(ClusterState currentState) {
315311
}
316312

317313
// Verify that this won't take us over the cluster shard limit.
318-
int totalNewShards = Arrays.stream(request.indices())
319-
.mapToInt(i -> getTotalNewShards(i, currentState, updatedNumberOfSearchReplicas))
320-
.sum();
321-
Optional<String> error = shardLimitValidator.checkShardLimit(totalNewShards, currentState);
322-
if (error.isPresent()) {
323-
ValidationException ex = new ValidationException();
324-
ex.addValidationError(error.get());
325-
throw ex;
326-
}
314+
shardLimitValidator.validateShardLimitForIndices(
315+
request.indices(),
316+
currentState,
317+
index -> getTotalNewShards(index, currentState, updatedNumberOfSearchReplicas)
318+
);
319+
327320
routingTableBuilder.updateNumberOfSearchReplicas(updatedNumberOfSearchReplicas, actualIndices);
328321
metadataBuilder.updateNumberOfSearchReplicas(updatedNumberOfSearchReplicas, actualIndices);
329322
logger.info(

server/src/main/java/org/opensearch/cluster/node/DiscoveryNodes.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ public class DiscoveryNodes extends AbstractDiffable<DiscoveryNodes> implements
7474

7575
private final Map<String, DiscoveryNode> nodes;
7676
private final Map<String, DiscoveryNode> dataNodes;
77+
private final Map<String, DiscoveryNode> warmNodes;
7778
private final Map<String, DiscoveryNode> clusterManagerNodes;
7879
private final Map<String, DiscoveryNode> ingestNodes;
7980

@@ -87,6 +88,7 @@ public class DiscoveryNodes extends AbstractDiffable<DiscoveryNodes> implements
8788
private DiscoveryNodes(
8889
final Map<String, DiscoveryNode> nodes,
8990
final Map<String, DiscoveryNode> dataNodes,
91+
final Map<String, DiscoveryNode> warmNodes,
9092
final Map<String, DiscoveryNode> clusterManagerNodes,
9193
final Map<String, DiscoveryNode> ingestNodes,
9294
String clusterManagerNodeId,
@@ -98,6 +100,7 @@ private DiscoveryNodes(
98100
) {
99101
this.nodes = Collections.unmodifiableMap(nodes);
100102
this.dataNodes = Collections.unmodifiableMap(dataNodes);
103+
this.warmNodes = Collections.unmodifiableMap(warmNodes);
101104
this.clusterManagerNodes = Collections.unmodifiableMap(clusterManagerNodes);
102105
this.ingestNodes = Collections.unmodifiableMap(ingestNodes);
103106
this.clusterManagerNodeId = clusterManagerNodeId;
@@ -151,6 +154,15 @@ public Map<String, DiscoveryNode> getDataNodes() {
151154
return this.dataNodes;
152155
}
153156

157+
/**
158+
* Get a {@link Map} of the discovered warm nodes arranged by their ids
159+
*
160+
* @return {@link Map} of the discovered warm nodes arranged by their ids
161+
*/
162+
public Map<String, DiscoveryNode> getWarmNodes() {
163+
return this.warmNodes;
164+
}
165+
154166
/**
155167
* Get a {@link Map} of the discovered cluster-manager nodes arranged by their ids
156168
*
@@ -802,6 +814,7 @@ private String validateAdd(DiscoveryNode node) {
802814

803815
public DiscoveryNodes build() {
804816
final Map<String, DiscoveryNode> dataNodesBuilder = new HashMap<>();
817+
final Map<String, DiscoveryNode> warmNodesBuilder = new HashMap<>();
805818
final Map<String, DiscoveryNode> clusterManagerNodesBuilder = new HashMap<>();
806819
final Map<String, DiscoveryNode> ingestNodesBuilder = new HashMap<>();
807820
Version minNodeVersion = null;
@@ -812,6 +825,9 @@ public DiscoveryNodes build() {
812825
if (nodeEntry.getValue().isDataNode()) {
813826
dataNodesBuilder.put(nodeEntry.getKey(), nodeEntry.getValue());
814827
}
828+
if (nodeEntry.getValue().isWarmNode()) {
829+
warmNodesBuilder.put(nodeEntry.getKey(), nodeEntry.getValue());
830+
}
815831
if (nodeEntry.getValue().isClusterManagerNode()) {
816832
clusterManagerNodesBuilder.put(nodeEntry.getKey(), nodeEntry.getValue());
817833
}
@@ -835,6 +851,7 @@ public DiscoveryNodes build() {
835851
return new DiscoveryNodes(
836852
nodes,
837853
dataNodesBuilder,
854+
warmNodesBuilder,
838855
clusterManagerNodesBuilder,
839856
ingestNodesBuilder,
840857
clusterManagerNodeId,

server/src/main/java/org/opensearch/cluster/routing/allocation/decider/ShardsLimitAllocationDecider.java

Lines changed: 61 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434

3535
import org.opensearch.cluster.metadata.IndexMetadata;
3636
import org.opensearch.cluster.routing.RoutingNode;
37+
import org.opensearch.cluster.routing.RoutingPool;
3738
import org.opensearch.cluster.routing.ShardRouting;
3839
import org.opensearch.cluster.routing.ShardRoutingState;
3940
import org.opensearch.cluster.routing.allocation.RoutingAllocation;
@@ -73,6 +74,8 @@ public class ShardsLimitAllocationDecider extends AllocationDecider {
7374

7475
private volatile int clusterShardLimit;
7576
private volatile int clusterPrimaryShardLimit;
77+
private volatile int clusterRemoteCapableShardLimit;
78+
private volatile int clusterRemoteCapablePrimaryShardLimit;
7679

7780
/**
7881
* Controls the maximum number of shards per index on a single OpenSearch
@@ -122,6 +125,30 @@ public class ShardsLimitAllocationDecider extends AllocationDecider {
122125
Property.NodeScope
123126
);
124127

128+
/**
129+
* Controls the maximum number of remote capable shards per node on a cluster level.
130+
* Negative values are interpreted as unlimited.
131+
*/
132+
public static final Setting<Integer> CLUSTER_TOTAL_REMOTE_CAPABLE_SHARDS_PER_NODE_SETTING = Setting.intSetting(
133+
"cluster.routing.allocation.total_remote_capable_shards_per_node",
134+
-1,
135+
-1,
136+
Property.Dynamic,
137+
Property.NodeScope
138+
);
139+
140+
/**
141+
* Controls the maximum number of remote capable primary shards per node on a cluster level.
142+
* Negative values are interpreted as unlimited.
143+
*/
144+
public static final Setting<Integer> CLUSTER_TOTAL_REMOTE_CAPABLE_PRIMARY_SHARDS_PER_NODE_SETTING = Setting.intSetting(
145+
"cluster.routing.allocation.total_remote_capable_primary_shards_per_node",
146+
-1,
147+
-1,
148+
Property.Dynamic,
149+
Property.NodeScope
150+
);
151+
125152
private final Settings settings;
126153

127154
public ShardsLimitAllocationDecider(Settings settings, ClusterSettings clusterSettings) {
@@ -130,6 +157,14 @@ public ShardsLimitAllocationDecider(Settings settings, ClusterSettings clusterSe
130157
this.clusterPrimaryShardLimit = CLUSTER_TOTAL_PRIMARY_SHARDS_PER_NODE_SETTING.get(settings);
131158
clusterSettings.addSettingsUpdateConsumer(CLUSTER_TOTAL_SHARDS_PER_NODE_SETTING, this::setClusterShardLimit);
132159
clusterSettings.addSettingsUpdateConsumer(CLUSTER_TOTAL_PRIMARY_SHARDS_PER_NODE_SETTING, this::setClusterPrimaryShardLimit);
160+
clusterSettings.addSettingsUpdateConsumer(
161+
CLUSTER_TOTAL_REMOTE_CAPABLE_SHARDS_PER_NODE_SETTING,
162+
this::setClusterRemoteCapableShardLimit
163+
);
164+
clusterSettings.addSettingsUpdateConsumer(
165+
CLUSTER_TOTAL_REMOTE_CAPABLE_PRIMARY_SHARDS_PER_NODE_SETTING,
166+
this::setClusterRemoteCapablePrimaryShardLimit
167+
);
133168
}
134169

135170
private void setClusterShardLimit(int clusterShardLimit) {
@@ -140,6 +175,14 @@ private void setClusterPrimaryShardLimit(int clusterPrimaryShardLimit) {
140175
this.clusterPrimaryShardLimit = clusterPrimaryShardLimit;
141176
}
142177

178+
private void setClusterRemoteCapableShardLimit(int clusterRemoteCapableShardLimit) {
179+
this.clusterRemoteCapableShardLimit = clusterRemoteCapableShardLimit;
180+
}
181+
182+
private void setClusterRemoteCapablePrimaryShardLimit(int clusterRemoteCapablePrimaryShardLimit) {
183+
this.clusterRemoteCapablePrimaryShardLimit = clusterRemoteCapablePrimaryShardLimit;
184+
}
185+
143186
@Override
144187
public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
145188
return doDecide(shardRouting, node, allocation, (count, limit) -> count >= limit);
@@ -156,13 +199,23 @@ private Decision doDecide(
156199
RoutingAllocation allocation,
157200
BiPredicate<Integer, Integer> decider
158201
) {
202+
RoutingPool shardRoutingPool = RoutingPool.getShardPool(shardRouting, allocation);
203+
RoutingPool nodeRoutingPool = RoutingPool.getNodePool(node);
204+
if (shardRoutingPool != nodeRoutingPool) {
205+
return Decision.ALWAYS;
206+
}
207+
159208
IndexMetadata indexMetadata = allocation.metadata().getIndexSafe(shardRouting.index());
160209
final int indexShardLimit = indexMetadata.getIndexTotalShardsPerNodeLimit();
161210
final int indexPrimaryShardLimit = indexMetadata.getIndexTotalPrimaryShardsPerNodeLimit();
162211
// Capture the limit here in case it changes during this method's
163212
// execution
164-
final int clusterShardLimit = this.clusterShardLimit;
165-
final int clusterPrimaryShardLimit = this.clusterPrimaryShardLimit;
213+
final int clusterShardLimit = nodeRoutingPool == RoutingPool.REMOTE_CAPABLE
214+
? this.clusterRemoteCapableShardLimit
215+
: this.clusterShardLimit;
216+
final int clusterPrimaryShardLimit = nodeRoutingPool == RoutingPool.REMOTE_CAPABLE
217+
? this.clusterRemoteCapablePrimaryShardLimit
218+
: this.clusterPrimaryShardLimit;
166219
if (indexShardLimit <= 0 && indexPrimaryShardLimit <= 0 && clusterShardLimit <= 0 && clusterPrimaryShardLimit <= 0) {
167220
return allocation.decision(
168221
Decision.YES,
@@ -183,7 +236,9 @@ private Decision doDecide(
183236
NAME,
184237
"too many shards [%d] allocated to this node, cluster setting [%s=%d]",
185238
nodeShardCount,
186-
CLUSTER_TOTAL_SHARDS_PER_NODE_SETTING.getKey(),
239+
nodeRoutingPool == RoutingPool.REMOTE_CAPABLE
240+
? CLUSTER_TOTAL_REMOTE_CAPABLE_SHARDS_PER_NODE_SETTING.getKey()
241+
: CLUSTER_TOTAL_SHARDS_PER_NODE_SETTING.getKey(),
187242
clusterShardLimit
188243
);
189244
}
@@ -195,7 +250,9 @@ private Decision doDecide(
195250
NAME,
196251
"too many primary shards [%d] allocated to this node, cluster setting [%s=%d]",
197252
nodePrimaryShardCount,
198-
CLUSTER_TOTAL_PRIMARY_SHARDS_PER_NODE_SETTING.getKey(),
253+
nodeRoutingPool == RoutingPool.REMOTE_CAPABLE
254+
? CLUSTER_TOTAL_REMOTE_CAPABLE_PRIMARY_SHARDS_PER_NODE_SETTING.getKey()
255+
: CLUSTER_TOTAL_PRIMARY_SHARDS_PER_NODE_SETTING.getKey(),
199256
clusterPrimaryShardLimit
200257
);
201258
}

0 commit comments

Comments
 (0)