Skip to content

Commit

Permalink
Add setting to ignore throttling nodes for allocation of unassigned r…
Browse files Browse the repository at this point in the history
…emote primaries (#14991)

Signed-off-by: Gaurav Bafna <gbbafna@amazon.com>
  • Loading branch information
gbbafna authored Jul 31, 2024
1 parent eb306d2 commit 5c19809
Show file tree
Hide file tree
Showing 8 changed files with 233 additions and 20 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
### Added
- Fix for hasInitiatedFetching to fix allocation explain and manual reroute APIs (([#14972](https://github.com/opensearch-project/OpenSearch/pull/14972))
- [Workload Management] Add queryGroupId to Task ([14708](https://github.com/opensearch-project/OpenSearch/pull/14708))
- Add setting to ignore throttling nodes for allocation of unassigned primaries in remote restore ([#14991](https://github.com/opensearch-project/OpenSearch/pull/14991))
- Add basic aggregation support for derived fields ([#14618](https://github.com/opensearch-project/OpenSearch/pull/14618))

### Dependencies
Expand All @@ -23,6 +24,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
### Removed

### Fixed
- Fix constraint bug which allows more primary shards than average primary shards per index ([#14908](https://github.com/opensearch-project/OpenSearch/pull/14908))
- Fix missing value of FieldSort for unsigned_long ([#14963](https://github.com/opensearch-project/OpenSearch/pull/14963))

### Security
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,13 @@ public class BalancedShardsAllocator implements ShardsAllocator {
Property.NodeScope
);

public static final Setting<Boolean> IGNORE_THROTTLE_FOR_REMOTE_RESTORE = Setting.boolSetting(
"cluster.routing.allocation.remote_primary.ignore_throttle",
true,
Property.Dynamic,
Property.NodeScope
);

public static final Setting<Float> PRIMARY_SHARD_REBALANCE_BUFFER = Setting.floatSetting(
"cluster.routing.allocation.rebalance.primary.buffer",
0.10f,
Expand All @@ -173,6 +180,8 @@ public class BalancedShardsAllocator implements ShardsAllocator {
private volatile WeightFunction weightFunction;
private volatile float threshold;

private volatile boolean ignoreThrottleInRestore;

public BalancedShardsAllocator(Settings settings) {
this(settings, new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS));
}
Expand All @@ -182,6 +191,7 @@ public BalancedShardsAllocator(Settings settings, ClusterSettings clusterSetting
setShardBalanceFactor(SHARD_BALANCE_FACTOR_SETTING.get(settings));
setIndexBalanceFactor(INDEX_BALANCE_FACTOR_SETTING.get(settings));
setPreferPrimaryShardRebalanceBuffer(PRIMARY_SHARD_REBALANCE_BUFFER.get(settings));
setIgnoreThrottleInRestore(IGNORE_THROTTLE_FOR_REMOTE_RESTORE.get(settings));
updateWeightFunction();
setThreshold(THRESHOLD_SETTING.get(settings));
setPreferPrimaryShardBalance(PREFER_PRIMARY_SHARD_BALANCE.get(settings));
Expand All @@ -195,6 +205,7 @@ public BalancedShardsAllocator(Settings settings, ClusterSettings clusterSetting
clusterSettings.addSettingsUpdateConsumer(PRIMARY_SHARD_REBALANCE_BUFFER, this::updatePreferPrimaryShardBalanceBuffer);
clusterSettings.addSettingsUpdateConsumer(PREFER_PRIMARY_SHARD_REBALANCE, this::setPreferPrimaryShardRebalance);
clusterSettings.addSettingsUpdateConsumer(THRESHOLD_SETTING, this::setThreshold);
clusterSettings.addSettingsUpdateConsumer(IGNORE_THROTTLE_FOR_REMOTE_RESTORE, this::setIgnoreThrottleInRestore);
}

/**
Expand All @@ -205,6 +216,10 @@ private void setMovePrimaryFirst(boolean movePrimaryFirst) {
setShardMovementStrategy(this.shardMovementStrategy);
}

private void setIgnoreThrottleInRestore(boolean ignoreThrottleInRestore) {
this.ignoreThrottleInRestore = ignoreThrottleInRestore;
}

/**
* Sets the correct Shard movement strategy to use.
* If users are still using deprecated setting `move_primary_first`, we want behavior to remain unchanged.
Expand Down Expand Up @@ -282,7 +297,8 @@ public void allocate(RoutingAllocation allocation) {
weightFunction,
threshold,
preferPrimaryShardBalance,
preferPrimaryShardRebalance
preferPrimaryShardRebalance,
ignoreThrottleInRestore
);
localShardsBalancer.allocateUnassigned();
localShardsBalancer.moveShards();
Expand All @@ -304,7 +320,8 @@ public ShardAllocationDecision decideShardAllocation(final ShardRouting shard, f
weightFunction,
threshold,
preferPrimaryShardBalance,
preferPrimaryShardRebalance
preferPrimaryShardRebalance,
ignoreThrottleInRestore
);
AllocateUnassignedDecision allocateUnassignedDecision = AllocateUnassignedDecision.NOT_TAKEN;
MoveDecision moveDecision = MoveDecision.NOT_TAKEN;
Expand Down Expand Up @@ -558,7 +575,7 @@ public Balancer(
float threshold,
boolean preferPrimaryBalance
) {
super(logger, allocation, shardMovementStrategy, weight, threshold, preferPrimaryBalance, false);
super(logger, allocation, shardMovementStrategy, weight, threshold, preferPrimaryBalance, false, false);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.apache.lucene.util.IntroSorter;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.routing.RecoverySource;
import org.opensearch.cluster.routing.RoutingNode;
import org.opensearch.cluster.routing.RoutingNodes;
import org.opensearch.cluster.routing.RoutingPool;
Expand Down Expand Up @@ -60,6 +61,8 @@ public class LocalShardsBalancer extends ShardsBalancer {

private final boolean preferPrimaryBalance;
private final boolean preferPrimaryRebalance;

private final boolean ignoreThrottleInRestore;
private final BalancedShardsAllocator.WeightFunction weight;

private final float threshold;
Expand All @@ -77,7 +80,8 @@ public LocalShardsBalancer(
BalancedShardsAllocator.WeightFunction weight,
float threshold,
boolean preferPrimaryBalance,
boolean preferPrimaryRebalance
boolean preferPrimaryRebalance,
boolean ignoreThrottleInRestore
) {
this.logger = logger;
this.allocation = allocation;
Expand All @@ -94,6 +98,7 @@ public LocalShardsBalancer(
this.preferPrimaryBalance = preferPrimaryBalance;
this.preferPrimaryRebalance = preferPrimaryRebalance;
this.shardMovementStrategy = shardMovementStrategy;
this.ignoreThrottleInRestore = ignoreThrottleInRestore;
}

/**
Expand Down Expand Up @@ -918,7 +923,15 @@ AllocateUnassignedDecision decideAllocateUnassigned(final ShardRouting shard) {
nodeExplanationMap.put(node.getNodeId(), new NodeAllocationResult(node.getRoutingNode().node(), currentDecision, 0));
nodeWeights.add(Tuple.tuple(node.getNodeId(), currentWeight));
}
if (currentDecision.type() == Decision.Type.YES || currentDecision.type() == Decision.Type.THROTTLE) {

// For REMOTE_STORE recoveries, THROTTLE is as good as NO as we want faster recoveries
// The side effect of this are increased relocations post these allocations.
boolean considerThrottleAsNo = ignoreThrottleInRestore
&& shard.recoverySource().getType() == RecoverySource.Type.REMOTE_STORE
&& shard.primary();

if (currentDecision.type() == Decision.Type.YES
|| (currentDecision.type() == Decision.Type.THROTTLE && considerThrottleAsNo == false)) {
final boolean updateMinNode;
if (currentWeight == minWeight) {
/* we have an equal weight tie breaking:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,7 @@ public void apply(Settings value, Settings current, Settings previous) {
BalancedShardsAllocator.SHARD_MOVE_PRIMARY_FIRST_SETTING,
BalancedShardsAllocator.SHARD_MOVEMENT_STRATEGY_SETTING,
BalancedShardsAllocator.THRESHOLD_SETTING,
BalancedShardsAllocator.IGNORE_THROTTLE_FOR_REMOTE_RESTORE,
BreakerSettings.CIRCUIT_BREAKER_LIMIT_SETTING,
BreakerSettings.CIRCUIT_BREAKER_OVERHEAD_SETTING,
BreakerSettings.CIRCUIT_BREAKER_TYPE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
package org.opensearch.cluster.routing.allocation;

import org.opensearch.action.support.replication.ClusterStateCreationUtils;
import org.opensearch.cluster.ClusterInfo;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.OpenSearchAllocationTestCase;
import org.opensearch.cluster.node.DiscoveryNode;
Expand All @@ -50,7 +49,6 @@
import org.opensearch.cluster.routing.allocation.decider.Decision.Type;
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.settings.Settings;
import org.opensearch.snapshots.SnapshotShardSizeInfo;

import java.util.Arrays;
import java.util.Collections;
Expand Down Expand Up @@ -398,19 +396,6 @@ public Decision canRebalance(ShardRouting shardRouting, RoutingAllocation alloca
return Tuple.tuple(clusterState, rebalanceDecision);
}

private RoutingAllocation newRoutingAllocation(AllocationDeciders deciders, ClusterState state) {
RoutingAllocation allocation = new RoutingAllocation(
deciders,
new RoutingNodes(state, false),
state,
ClusterInfo.EMPTY,
SnapshotShardSizeInfo.EMPTY,
System.nanoTime()
);
allocation.debugDecision(true);
return allocation;
}

private void assertAssignedNodeRemainsSame(
BalancedShardsAllocator allocator,
RoutingAllocation routingAllocation,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.cluster.routing.allocation;

import org.opensearch.Version;
import org.opensearch.action.support.replication.ClusterStateCreationUtils;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.OpenSearchAllocationTestCase;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.cluster.routing.AllocationId;
import org.opensearch.cluster.routing.IndexRoutingTable;
import org.opensearch.cluster.routing.IndexShardRoutingTable;
import org.opensearch.cluster.routing.RoutingNode;
import org.opensearch.cluster.routing.RoutingTable;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.ShardRoutingState;
import org.opensearch.cluster.routing.TestShardRouting;
import org.opensearch.cluster.routing.UnassignedInfo;
import org.opensearch.cluster.routing.allocation.allocator.BalancedShardsAllocator;
import org.opensearch.cluster.routing.allocation.decider.AllocationDecider;
import org.opensearch.cluster.routing.allocation.decider.AllocationDeciders;
import org.opensearch.cluster.routing.allocation.decider.Decision;
import org.opensearch.common.settings.Settings;
import org.opensearch.core.index.shard.ShardId;

import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_CREATION_DATE;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_VERSION_CREATED;
import static org.opensearch.cluster.routing.allocation.allocator.BalancedShardsAllocator.IGNORE_THROTTLE_FOR_REMOTE_RESTORE;

public class DecideAllocateUnassignedTests extends OpenSearchAllocationTestCase {
public void testAllocateUnassignedRemoteRestore_IgnoreThrottle() {
final String[] indices = { "idx1" };
// Create a cluster state with 1 indices, each with 1 started primary shard, and only
// one node initially so that all primary shards get allocated to the same node.
//
// When we add 1 more 1 index with 1 started primary shard and 1 more node , if the new node throttles the recovery
// shard should get assigned on the older node if IgnoreThrottle is set to true
ClusterState clusterState = ClusterStateCreationUtils.state(1, indices, 1);
clusterState = addNodesToClusterState(clusterState, 1);
clusterState = addRestoringIndexToClusterState(clusterState, "idx2");
List<AllocationDecider> allocationDeciders = getAllocationDecidersThrottleOnNode1();
RoutingAllocation routingAllocation = newRoutingAllocation(new AllocationDeciders(allocationDeciders), clusterState);
// allocate and get the node that is now relocating
Settings build = Settings.builder().put(IGNORE_THROTTLE_FOR_REMOTE_RESTORE.getKey(), true).build();
BalancedShardsAllocator allocator = new BalancedShardsAllocator(build);
allocator.allocate(routingAllocation);
assertEquals(routingAllocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).currentNodeId(), "node_0");
assertEquals(routingAllocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).getIndexName(), "idx2");
assertFalse(routingAllocation.routingNodes().hasUnassignedPrimaries());
}

public void testAllocateUnassignedRemoteRestore() {
final String[] indices = { "idx1" };
// Create a cluster state with 1 indices, each with 1 started primary shard, and only
// one node initially so that all primary shards get allocated to the same node.
//
// When we add 1 more 1 index with 1 started primary shard and 1 more node , if the new node throttles the recovery
// shard should remain unassigned if IgnoreThrottle is set to false
ClusterState clusterState = ClusterStateCreationUtils.state(1, indices, 1);
clusterState = addNodesToClusterState(clusterState, 1);
clusterState = addRestoringIndexToClusterState(clusterState, "idx2");
List<AllocationDecider> allocationDeciders = getAllocationDecidersThrottleOnNode1();
RoutingAllocation routingAllocation = newRoutingAllocation(new AllocationDeciders(allocationDeciders), clusterState);
// allocate and get the node that is now relocating
Settings build = Settings.builder().put(IGNORE_THROTTLE_FOR_REMOTE_RESTORE.getKey(), false).build();
BalancedShardsAllocator allocator = new BalancedShardsAllocator(build);
allocator.allocate(routingAllocation);
assertEquals(routingAllocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), 0);
assertTrue(routingAllocation.routingNodes().hasUnassignedPrimaries());
}

private static List<AllocationDecider> getAllocationDecidersThrottleOnNode1() {
// Allocation Deciders to throttle on `node_1`
final Set<String> throttleNodes = new HashSet<>();
throttleNodes.add("node_1");
AllocationDecider allocationDecider = new AllocationDecider() {
@Override
public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
if (throttleNodes.contains(node.nodeId())) {
return Decision.THROTTLE;
}
return Decision.YES;
}
};
List<AllocationDecider> allocationDeciders = Arrays.asList(allocationDecider);
return allocationDeciders;
}

private ClusterState addNodesToClusterState(ClusterState clusterState, int nodeId) {
DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.builder(clusterState.nodes());
DiscoveryNode discoveryNode = newNode("node_" + nodeId);
nodesBuilder.add(discoveryNode);
return ClusterState.builder(clusterState).nodes(nodesBuilder).build();
}

private ClusterState addRestoringIndexToClusterState(ClusterState clusterState, String index) {
final int primaryTerm = 1 + randomInt(200);
final ShardId shardId = new ShardId(index, "_na_", 0);

IndexMetadata indexMetadata = IndexMetadata.builder(index)
.settings(
Settings.builder()
.put(SETTING_VERSION_CREATED, Version.CURRENT)
.put(SETTING_NUMBER_OF_SHARDS, 1)
.put(SETTING_NUMBER_OF_REPLICAS, 0)
.put(SETTING_CREATION_DATE, System.currentTimeMillis())
)
.primaryTerm(0, primaryTerm)
.build();

IndexShardRoutingTable.Builder indexShardRoutingBuilder = new IndexShardRoutingTable.Builder(shardId);
UnassignedInfo unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.EXISTING_INDEX_RESTORED, null);
indexShardRoutingBuilder.addShard(
TestShardRouting.newShardRoutingRemoteRestore(index, shardId, null, null, true, ShardRoutingState.UNASSIGNED, unassignedInfo)
);
final IndexShardRoutingTable indexShardRoutingTable = indexShardRoutingBuilder.build();

IndexMetadata.Builder indexMetadataBuilder = new IndexMetadata.Builder(indexMetadata);
indexMetadataBuilder.putInSyncAllocationIds(
0,
indexShardRoutingTable.activeShards()
.stream()
.map(ShardRouting::allocationId)
.map(AllocationId::getId)
.collect(Collectors.toSet())
);
ClusterState.Builder state = ClusterState.builder(clusterState);
state.metadata(Metadata.builder(clusterState.metadata()).put(indexMetadataBuilder.build(), false).generateClusterUuidIfNeeded());
state.routingTable(
RoutingTable.builder(clusterState.routingTable())
.add(IndexRoutingTable.builder(indexMetadata.getIndex()).addIndexShard(indexShardRoutingTable))
.build()
);
return state.build();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.opensearch.cluster.node.DiscoveryNodeRole;
import org.opensearch.cluster.routing.RecoverySource;
import org.opensearch.cluster.routing.RoutingNode;
import org.opensearch.cluster.routing.RoutingNodes;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.UnassignedInfo;
import org.opensearch.cluster.routing.allocation.AllocationService;
Expand Down Expand Up @@ -287,6 +288,19 @@ public static ClusterState startShardsAndReroute(
return allocationService.reroute(allocationService.applyStartedShards(clusterState, initializingShards), "reroute after starting");
}

protected RoutingAllocation newRoutingAllocation(AllocationDeciders deciders, ClusterState state) {
RoutingAllocation allocation = new RoutingAllocation(
deciders,
new RoutingNodes(state, false),
state,
ClusterInfo.EMPTY,
SnapshotShardSizeInfo.EMPTY,
System.nanoTime()
);
allocation.debugDecision(true);
return allocation;
}

public static class TestAllocateDecision extends AllocationDecider {

private final Decision decision;
Expand Down Expand Up @@ -465,5 +479,6 @@ public void allocateUnassigned(
unassignedAllocationHandler.removeAndIgnore(UnassignedInfo.AllocationStatus.DELAYED_ALLOCATION, allocation.changes());
}
}

}
}
Loading

0 comments on commit 5c19809

Please sign in to comment.