diff --git a/server/src/internalClusterTest/java/org/opensearch/snapshots/SearchableSnapshotIT.java b/server/src/internalClusterTest/java/org/opensearch/snapshots/SearchableSnapshotIT.java index c528a311e0761..0e964fdf14109 100644 --- a/server/src/internalClusterTest/java/org/opensearch/snapshots/SearchableSnapshotIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/snapshots/SearchableSnapshotIT.java @@ -53,6 +53,13 @@ protected boolean addMockInternalEngine() { return false; } + @Override + protected Settings.Builder randomRepositorySettings() { + final Settings.Builder settings = Settings.builder(); + settings.put("location", randomRepoPath()).put("compress", randomBoolean()); + return settings; + } + public void testCreateSearchableSnapshot() throws Exception { final int numReplicasIndex1 = randomIntBetween(1, 4); final int numReplicasIndex2 = randomIntBetween(0, 2); @@ -92,6 +99,8 @@ public void testCreateSearchableSnapshot() throws Exception { assertTrue(client.admin().indices().prepareDelete("test-idx-1", "test-idx-2").get().isAcknowledged()); + internalCluster().ensureAtLeastNumSearchNodes(Math.max(numReplicasIndex1, numReplicasIndex2) + 1); + logger.info("--> restore indices as 'remote_snapshot'"); client.admin() .cluster() diff --git a/server/src/main/java/org/opensearch/cluster/ClusterModule.java b/server/src/main/java/org/opensearch/cluster/ClusterModule.java index f8ba520e465e2..ec15bf16a5544 100644 --- a/server/src/main/java/org/opensearch/cluster/ClusterModule.java +++ b/server/src/main/java/org/opensearch/cluster/ClusterModule.java @@ -72,6 +72,7 @@ import org.opensearch.cluster.routing.allocation.decider.SameShardAllocationDecider; import org.opensearch.cluster.routing.allocation.decider.ShardsLimitAllocationDecider; import org.opensearch.cluster.routing.allocation.decider.SnapshotInProgressAllocationDecider; +import org.opensearch.cluster.routing.allocation.decider.TargetPoolAllocationDecider; import org.opensearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.ParseField; @@ -83,6 +84,7 @@ import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Setting.Property; import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.FeatureFlags; import org.opensearch.common.util.concurrent.ThreadContext; import org.opensearch.common.util.set.Sets; import org.opensearch.common.xcontent.NamedXContentRegistry; @@ -337,6 +339,9 @@ public static Collection createAllocationDeciders( addAllocationDecider(deciders, new ShardsLimitAllocationDecider(settings, clusterSettings)); addAllocationDecider(deciders, new AwarenessAllocationDecider(settings, clusterSettings)); addAllocationDecider(deciders, new NodeLoadAwareAllocationDecider(settings, clusterSettings)); + if (FeatureFlags.isEnabled(FeatureFlags.SEARCHABLE_SNAPSHOT)) { + addAllocationDecider(deciders, new TargetPoolAllocationDecider()); + } clusterPlugins.stream() .flatMap(p -> p.createAllocationDeciders(settings, clusterSettings).stream()) diff --git a/server/src/main/java/org/opensearch/cluster/routing/RoutingNodes.java b/server/src/main/java/org/opensearch/cluster/routing/RoutingNodes.java index 986df494917c0..17267d5474738 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/RoutingNodes.java +++ b/server/src/main/java/org/opensearch/cluster/routing/RoutingNodes.java @@ -702,6 +702,23 @@ assert node(failedShard.currentNodeId()).getByShardId(failedShard.shardId()) == + " was matched but wasn't removed"; } + public void swapPrimaryWithReplica( + Logger logger, + ShardRouting primaryShard, + ShardRouting replicaShard, + RoutingChangesObserver changes + ) { + assert primaryShard.primary() : "Invalid primary shard provided"; + assert !replicaShard.primary() : "Invalid Replica shard provided"; + + ShardRouting newPrimary = primaryShard.moveActivePrimaryToReplica(); + ShardRouting newReplica = replicaShard.moveActiveReplicaToPrimary(); + updateAssigned(primaryShard, newPrimary); + updateAssigned(replicaShard, newReplica); + logger.info("Swap relocation performed for shard [{}]", newPrimary.shortSummary()); + changes.replicaPromoted(newPrimary); + } + private void unassignPrimaryAndPromoteActiveReplicaIfExists( ShardRouting failedShard, UnassignedInfo unassignedInfo, @@ -1127,6 +1144,18 @@ public ShardRouting[] drain() { primaries = 0; return mutableShardRoutings; } + + /** + * Drains all ignored shards and returns it. + * This method will not drain unassigned shards. + */ + public ShardRouting[] drainIgnored() { + nodes.ensureMutable(); + ShardRouting[] mutableShardRoutings = ignored.toArray(new ShardRouting[ignored.size()]); + ignored.clear(); + ignoredPrimaries = 0; + return mutableShardRoutings; + } } /** diff --git a/server/src/main/java/org/opensearch/cluster/routing/RoutingPool.java b/server/src/main/java/org/opensearch/cluster/routing/RoutingPool.java new file mode 100644 index 0000000000000..1a3c366694221 --- /dev/null +++ b/server/src/main/java/org/opensearch/cluster/routing/RoutingPool.java @@ -0,0 +1,69 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cluster.routing; + +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.routing.allocation.RoutingAllocation; +import org.opensearch.common.settings.Settings; +import org.opensearch.index.IndexModule; + +/** + * {@link RoutingPool} defines the different node types based on the assigned capabilities. The methods + * help decide the capabilities of a specific node as well as an index or shard based on the index configuration. + * These methods help with allocation decisions and determining shard classification with the allocation process. + * + * @opensearch.internal + */ +public enum RoutingPool { + LOCAL_ONLY, + REMOTE_CAPABLE; + + /** + * Helps to determine the appropriate {@link RoutingPool} for a given node from the {@link RoutingNode} + */ + public static RoutingPool getNodePool(RoutingNode node) { + return getNodePool(node.node()); + } + + /** + * Helps to determine the appropriate {@link RoutingPool} for a given node from the {@link DiscoveryNode} + */ + public static RoutingPool getNodePool(DiscoveryNode node) { + if (node.isSearchNode()) { + return REMOTE_CAPABLE; + } + return LOCAL_ONLY; + } + + /** + * Can determine the appropriate {@link RoutingPool} for a given shard using the {@link IndexMetadata} for the + * index using the {@link RoutingAllocation}. + * @param shard the shard routing for which {@link RoutingPool} has to be determined. + * @param allocation the current allocation of the cluster + * @return {@link RoutingPool} for the given shard. + */ + public static RoutingPool getShardPool(ShardRouting shard, RoutingAllocation allocation) { + IndexMetadata indexMetadata = allocation.metadata().getIndexSafe(shard.index()); + return getIndexPool(indexMetadata); + } + + /** + * Can determine the appropriate {@link RoutingPool} for a given index using the {@link IndexMetadata}. + * @param indexMetadata the index metadata object for which {@link RoutingPool} has to be determined. + * @return {@link RoutingPool} for the given index. + */ + public static RoutingPool getIndexPool(IndexMetadata indexMetadata) { + Settings indexSettings = indexMetadata.getSettings(); + if (IndexModule.Type.REMOTE_SNAPSHOT.getSettingsKey().equals(indexSettings.get(IndexModule.INDEX_STORE_TYPE_SETTING.getKey()))) { + return REMOTE_CAPABLE; + } + return LOCAL_ONLY; + } +} diff --git a/server/src/main/java/org/opensearch/cluster/routing/ShardRouting.java b/server/src/main/java/org/opensearch/cluster/routing/ShardRouting.java index e3aa2a666d454..5945c7d17a69a 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/ShardRouting.java +++ b/server/src/main/java/org/opensearch/cluster/routing/ShardRouting.java @@ -534,6 +534,29 @@ public ShardRouting moveToStarted() { ); } + /** + * Make the active primary shard as replica + * + * @throws IllegalShardRoutingStateException if shard is already a replica + */ + public ShardRouting moveActivePrimaryToReplica() { + assert active() : "expected an active shard " + this; + if (!primary) { + throw new IllegalShardRoutingStateException(this, "Not a primary shard, can't move to replica"); + } + return new ShardRouting( + shardId, + currentNodeId, + relocatingNodeId, + false, + state, + recoverySource, + unassignedInfo, + allocationId, + expectedShardSize + ); + } + /** * Make the active shard primary unless it's not primary * diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java index 42c8f7987bf3d..d8761e9b1a78e 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java @@ -50,6 +50,7 @@ import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Setting.Property; import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.FeatureFlags; import java.util.HashMap; import java.util.HashSet; @@ -145,6 +146,13 @@ public void allocate(RoutingAllocation allocation) { localShardsBalancer.allocateUnassigned(); localShardsBalancer.moveShards(); localShardsBalancer.balance(); + + if (FeatureFlags.isEnabled(FeatureFlags.SEARCHABLE_SNAPSHOT)) { + final ShardsBalancer remoteShardsBalancer = new RemoteShardsBalancer(logger, allocation); + remoteShardsBalancer.allocateUnassigned(); + remoteShardsBalancer.moveShards(); + remoteShardsBalancer.balance(); + } } @Override diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/LocalShardsBalancer.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/LocalShardsBalancer.java index 53d7c827392d5..3c5e4013748af 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/LocalShardsBalancer.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/LocalShardsBalancer.java @@ -15,6 +15,7 @@ import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.routing.RoutingNode; import org.opensearch.cluster.routing.RoutingNodes; +import org.opensearch.cluster.routing.RoutingPool; import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.cluster.routing.ShardRoutingState; import org.opensearch.cluster.routing.UnassignedInfo; @@ -27,9 +28,11 @@ import org.opensearch.cluster.routing.allocation.decider.Decision; import org.opensearch.cluster.routing.allocation.decider.DiskThresholdDecider; import org.opensearch.common.collect.Tuple; +import org.opensearch.common.util.FeatureFlags; import org.opensearch.gateway.PriorityComparator; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.Comparator; import java.util.HashMap; @@ -38,6 +41,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; import java.util.stream.StreamSupport; import static org.opensearch.cluster.routing.ShardRoutingState.RELOCATING; @@ -102,6 +106,10 @@ public float avgShardsPerNode(String index) { */ @Override public float avgShardsPerNode() { + if (FeatureFlags.isEnabled(FeatureFlags.SEARCHABLE_SNAPSHOT)) { + float totalShards = nodes.values().stream().map(BalancedShardsAllocator.ModelNode::numShards).reduce(0, Integer::sum); + return totalShards / nodes.size(); + } return avgShardsPerNode; } @@ -172,6 +180,11 @@ void balance() { */ @Override MoveDecision decideRebalance(final ShardRouting shard) { + if (FeatureFlags.isEnabled(FeatureFlags.SEARCHABLE_SNAPSHOT) + && RoutingPool.REMOTE_CAPABLE.equals(RoutingPool.getShardPool(shard, allocation))) { + return MoveDecision.NOT_TAKEN; + } + if (shard.started() == false) { // we can only rebalance started shards return MoveDecision.NOT_TAKEN; @@ -441,7 +454,19 @@ private void balanceByWeights() { * to the nodes we relocated them from. */ private String[] buildWeightOrderedIndices() { - final String[] indices = allocation.routingTable().indicesRouting().keys().toArray(String.class); + final String[] indices; + if (FeatureFlags.isEnabled(FeatureFlags.SEARCHABLE_SNAPSHOT)) { + final List localIndices = new ArrayList<>(); + for (String index : allocation.routingTable().indicesRouting().keys().toArray(String.class)) { + if (RoutingPool.LOCAL_ONLY.equals(RoutingPool.getIndexPool(metadata.index(index)))) { + localIndices.add(index); + } + } + indices = localIndices.toArray(new String[0]); + } else { + indices = allocation.routingTable().indicesRouting().keys().toArray(String.class); + } + final float[] deltas = new float[indices.length]; for (int i = 0; i < deltas.length; i++) { sorter.reset(indices[i]); @@ -507,7 +532,7 @@ void moveShards() { // shard movements, the goal of this iteration order is to achieve a fairer movement of shards from the nodes that are // offloading the shards. - // Trying to eliminate target nodes so that we donot unnecessarily iterate over source nodes + // Trying to eliminate target nodes so that we do not unnecessarily iterate over source nodes // when no target is eligible for (BalancedShardsAllocator.ModelNode currentNode : sorter.modelNodes) { checkAndAddInEligibleTargetNode(currentNode.getRoutingNode()); @@ -533,6 +558,11 @@ void moveShards() { ShardRouting shardRouting = it.next(); + if (FeatureFlags.isEnabled(FeatureFlags.SEARCHABLE_SNAPSHOT) + && RoutingPool.REMOTE_CAPABLE.equals(RoutingPool.getShardPool(shardRouting, allocation))) { + continue; + } + // Ensure that replicas don't relocate if primaries are being throttled and primary first is enabled if (movePrimaryFirst && primariesThrottled && !shardRouting.primary()) { logger.info( @@ -593,6 +623,11 @@ void moveShards() { */ @Override MoveDecision decideMove(final ShardRouting shardRouting) { + if (FeatureFlags.isEnabled(FeatureFlags.SEARCHABLE_SNAPSHOT) + && RoutingPool.REMOTE_CAPABLE.equals(RoutingPool.getShardPool(shardRouting, allocation))) { + return MoveDecision.NOT_TAKEN; + } + if (shardRouting.started() == false) { // we can only move started shards return MoveDecision.NOT_TAKEN; @@ -680,7 +715,7 @@ private Map buildModelFromAssigned() for (ShardRouting shard : rn) { assert rn.nodeId().equals(shard.currentNodeId()); /* we skip relocating shards here since we expect an initializing shard with the same id coming in */ - if (shard.state() != RELOCATING) { + if (RoutingPool.LOCAL_ONLY.equals(RoutingPool.getShardPool(shard, allocation)) && shard.state() != RELOCATING) { node.addShard(shard); if (logger.isTraceEnabled()) { logger.trace("Assigned shard [{}] to node [{}]", shard, node.getNodeId()); @@ -735,7 +770,17 @@ void allocateUnassigned() { * if we allocate for instance (0, R, IDX1) we move the second replica to the secondary array and proceed with * the next replica. If we could not find a node to allocate (0,R,IDX1) we move all it's replicas to ignoreUnassigned. */ - ShardRouting[] primary = unassigned.drain(); + ShardRouting[] unassignedShards = unassigned.drain(); + if (FeatureFlags.isEnabled(FeatureFlags.SEARCHABLE_SNAPSHOT)) { + List allUnassignedShards = Arrays.stream(unassignedShards).collect(Collectors.toList()); + List localUnassignedShards = allUnassignedShards.stream() + .filter(shard -> RoutingPool.LOCAL_ONLY.equals(RoutingPool.getShardPool(shard, allocation))) + .collect(Collectors.toList()); + allUnassignedShards.removeAll(localUnassignedShards); + allUnassignedShards.forEach(shard -> routingNodes.unassigned().add(shard)); + unassignedShards = localUnassignedShards.toArray(new ShardRouting[localUnassignedShards.size()]); + } + ShardRouting[] primary = unassignedShards; ShardRouting[] secondary = new ShardRouting[primary.length]; int secondaryLength = 0; int primaryLength = primary.length; diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/RemoteShardsBalancer.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/RemoteShardsBalancer.java new file mode 100644 index 0000000000000..efb2c5a27a12f --- /dev/null +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/RemoteShardsBalancer.java @@ -0,0 +1,646 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cluster.routing.allocation.allocator; + +import com.carrotsearch.hppc.ObjectIntHashMap; +import org.apache.logging.log4j.Logger; +import org.opensearch.cluster.routing.RoutingNode; +import org.opensearch.cluster.routing.RoutingNodes; +import org.opensearch.cluster.routing.RoutingPool; +import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.cluster.routing.UnassignedInfo; +import org.opensearch.cluster.routing.allocation.AllocateUnassignedDecision; +import org.opensearch.cluster.routing.allocation.MoveDecision; +import org.opensearch.cluster.routing.allocation.RoutingAllocation; +import org.opensearch.cluster.routing.allocation.decider.Decision; +import org.opensearch.cluster.routing.allocation.decider.DiskThresholdDecider; +import org.opensearch.common.Randomness; + +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.Set; + +/** + * A {@link RemoteShardsBalancer} used by the {@link BalancedShardsAllocator} to perform allocation operations + * for remote shards within the cluster. + * + * @opensearch.internal + */ +public final class RemoteShardsBalancer extends ShardsBalancer { + private final Logger logger; + private final RoutingAllocation allocation; + private final RoutingNodes routingNodes; + + public RemoteShardsBalancer(Logger logger, RoutingAllocation allocation) { + this.logger = logger; + this.allocation = allocation; + this.routingNodes = allocation.routingNodes(); + } + + /** + * Allocates unassigned remote shards on the routing node which are filtered using + * {@link #groupUnassignedShardsByIndex} + */ + @Override + void allocateUnassigned() { + if (routingNodes.unassigned().isEmpty()) { + logger.debug("No unassigned remote shards found."); + return; + } + + Queue nodeQueue = getShuffledRemoteNodes(); + if (nodeQueue.isEmpty()) { + logger.debug("No remote searcher nodes available for unassigned remote shards."); + failUnattemptedShards(); + return; + } + + Map unassignedShardMap = groupUnassignedShardsByIndex(); + allocateUnassignedPrimaries(nodeQueue, unassignedShardMap); + allocateUnassignedReplicas(nodeQueue, unassignedShardMap); + ignoreRemainingShards(unassignedShardMap); + } + + /** + * Performs shard movement for incompatible remote shards + */ + @Override + void moveShards() { + Queue eligibleNodes = new ArrayDeque<>(); + Queue excludedNodes = new ArrayDeque<>(); + classifyNodesForShardMovement(eligibleNodes, excludedNodes); + + if (excludedNodes.isEmpty()) { + logger.debug("No excluded nodes found. Returning..."); + return; + } + + while (!eligibleNodes.isEmpty() && !excludedNodes.isEmpty()) { + RoutingNode sourceNode = excludedNodes.poll(); + for (ShardRouting ineligibleShard : sourceNode) { + if (ineligibleShard.started() == false) { + continue; + } + + if (!RoutingPool.REMOTE_CAPABLE.equals(RoutingPool.getShardPool(ineligibleShard, allocation))) { + continue; + } + + if (eligibleNodes.isEmpty()) { + break; + } + + tryShardMovementToEligibleNode(eligibleNodes, ineligibleShard); + } + } + } + + /** + * Classifies the nodes into eligible and excluded depending on whether node is able or unable for shard assignment + * @param eligibleNodes contains the list of classified nodes eligible to accept shards + * @param excludedNodes contains the list of classified nodes that are unable for assigning shards + */ + private void classifyNodesForShardMovement(Queue eligibleNodes, Queue excludedNodes) { + List remoteRoutingNodes = getRemoteRoutingNodes(); + int throttledNodeCount = 0; + for (RoutingNode node : remoteRoutingNodes) { + Decision nodeDecision = allocation.deciders().canAllocateAnyShardToNode(node, allocation); + /* canAllocateAnyShardToNode decision can be THROTTLE for throttled nodes. To classify + * as excluded nodes, we look for Decision.Type.NO + */ + if (nodeDecision.type() == Decision.Type.NO) { + excludedNodes.add(node); + } else if (nodeDecision.type() == Decision.Type.YES) { + eligibleNodes.add(node); + } else { + throttledNodeCount++; + } + logger.debug( + "Excluded Node Count: [{}], Eligible Node Count: [{}], Throttled Node Count: [{}]", + excludedNodes.size(), + eligibleNodes.size(), + throttledNodeCount + ); + } + } + + /** + * Tries to move a shard assigned to an excluded node to an eligible node. + * + * @param eligibleNodes set of nodes that are still accepting shards + * @param shard the ineligible shard to be moved + */ + private void tryShardMovementToEligibleNode(Queue eligibleNodes, ShardRouting shard) { + Set nodesCheckedForShard = new HashSet<>(); + while (!eligibleNodes.isEmpty()) { + RoutingNode targetNode = eligibleNodes.poll(); + Decision currentShardDecision = allocation.deciders().canAllocate(shard, targetNode, allocation); + + if (currentShardDecision.type() == Decision.Type.YES) { + if (logger.isDebugEnabled()) { + logger.debug( + "Moving shard: {} from node: [{}] to node: [{}]", + shardShortSummary(shard), + shard.currentNodeId(), + targetNode.nodeId() + ); + } + routingNodes.relocateShard( + shard, + targetNode.nodeId(), + allocation.clusterInfo().getShardSize(shard, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE), + allocation.changes() + ); + eligibleNodes.offer(targetNode); + break; + } else { + if (logger.isTraceEnabled()) { + logger.trace( + "Cannot move shard: {} to node: [{}]. Decisions: [{}]", + shardShortSummary(shard), + targetNode.nodeId(), + currentShardDecision.getDecisions() + ); + } + + Decision nodeLevelDecision = allocation.deciders().canAllocateAnyShardToNode(targetNode, allocation); + if (nodeLevelDecision.type() == Decision.Type.YES) { + logger.debug("Node: [{}] can still accept shards. Adding it back to the queue.", targetNode.nodeId()); + eligibleNodes.offer(targetNode); + nodesCheckedForShard.add(targetNode.nodeId()); + } else { + logger.debug("Node: [{}] cannot accept any more shards. Removing it from queue.", targetNode.nodeId()); + } + + // Break out if all nodes in the queue have been checked for this shard + if (eligibleNodes.stream().allMatch(rn -> nodesCheckedForShard.contains(rn.nodeId()))) { + break; + } + } + } + } + + /** + * Performs heuristic, naive weight-based balancing for remote shards within the cluster by using average nodes per + * cluster as the metric for shard distribution. + * It does so without accounting for the local shards located on any nodes within the cluster. + */ + @Override + void balance() { + List remoteRoutingNodes = getRemoteRoutingNodes(); + logger.trace("Performing balancing for remote shards."); + + if (remoteRoutingNodes.isEmpty()) { + logger.info("No eligible remote nodes found to perform balancing"); + return; + } + + ObjectIntHashMap nodePrimaryShardCount = calculateNodePrimaryShardCount(remoteRoutingNodes); + int totalPrimaryShardCount = Arrays.stream(nodePrimaryShardCount.values).sum(); + + totalPrimaryShardCount += routingNodes.unassigned().getNumPrimaries(); + int avgPrimaryPerNode = (totalPrimaryShardCount + routingNodes.size() - 1) / routingNodes.size(); + + ArrayDeque sourceNodes = new ArrayDeque<>(); + ArrayDeque targetNodes = new ArrayDeque<>(); + for (RoutingNode node : remoteRoutingNodes) { + if (nodePrimaryShardCount.get(node.nodeId()) > avgPrimaryPerNode) { + sourceNodes.add(node); + } else if (nodePrimaryShardCount.get(node.nodeId()) < avgPrimaryPerNode) { + targetNodes.add(node); + } + } + + while (!sourceNodes.isEmpty() && !targetNodes.isEmpty()) { + RoutingNode sourceNode = sourceNodes.poll(); + tryRebalanceNode(sourceNode, targetNodes, avgPrimaryPerNode, nodePrimaryShardCount); + } + } + + /** + * Calculates the total number of primary shards per node. + * @param remoteRoutingNodes routing nodes for which the aggregation needs to be performed + * @return map of node id to primary shard count + */ + private ObjectIntHashMap calculateNodePrimaryShardCount(List remoteRoutingNodes) { + ObjectIntHashMap primaryShardCount = new ObjectIntHashMap<>(); + for (RoutingNode node : remoteRoutingNodes) { + int totalPrimaryShardsPerNode = 0; + for (ShardRouting shard : node) { + if (RoutingPool.REMOTE_CAPABLE.equals(RoutingPool.getShardPool(shard, allocation)) + && shard.primary() + && (shard.initializing() || shard.started())) { + totalPrimaryShardsPerNode++; + } + } + primaryShardCount.put(node.nodeId(), totalPrimaryShardsPerNode); + } + return primaryShardCount; + } + + @Override + AllocateUnassignedDecision decideAllocateUnassigned(ShardRouting shardRouting) { + throw new UnsupportedOperationException("remote shards balancer does not support decision operations"); + } + + @Override + MoveDecision decideMove(ShardRouting shardRouting) { + throw new UnsupportedOperationException("remote shards balancer does not support decision operations"); + } + + @Override + MoveDecision decideRebalance(ShardRouting shardRouting) { + throw new UnsupportedOperationException("remote shards balancer does not support decision operations"); + } + + /** + * Groups unassigned shards within the allocation based on the index. + * @return {@link UnassignedIndexShards} grouped by index name + */ + public Map groupUnassignedShardsByIndex() { + HashMap unassignedShardMap = new HashMap<>(); + unassignIgnoredRemoteShards(allocation); + for (ShardRouting shard : routingNodes.unassigned().drain()) { + String index = shard.getIndexName(); + if (!RoutingPool.REMOTE_CAPABLE.equals(RoutingPool.getShardPool(shard, allocation))) { + routingNodes.unassigned().add(shard); + continue; + } + if (!unassignedShardMap.containsKey(index)) { + unassignedShardMap.put(index, new UnassignedIndexShards()); + } + unassignedShardMap.get(index).addShard(shard); + } + return unassignedShardMap; + } + + /** + * Unassigned shards from {@link LocalShardsBalancer} are ignored since the balancer cannot allocate remote shards. + * Prior to allocation operations done by {@link RemoteShardsBalancer}, the ignored remote shards are moved back to + * unassigned status. + */ + private void unassignIgnoredRemoteShards(RoutingAllocation routingAllocation) { + RoutingNodes.UnassignedShards unassignedShards = routingAllocation.routingNodes().unassigned(); + for (ShardRouting shard : unassignedShards.drainIgnored()) { + RoutingPool pool = RoutingPool.getShardPool(shard, routingAllocation); + if (pool == RoutingPool.REMOTE_CAPABLE && shard.unassigned() && (shard.primary() || !shard.unassignedInfo().isDelayed())) { + unassignedShards.add(shard); + } else { + unassignedShards.ignoreShard(shard, shard.unassignedInfo().getLastAllocationStatus(), routingAllocation.changes()); + } + } + } + + private void allocateUnassignedPrimaries(Queue nodeQueue, Map unassignedShardMap) { + allocateUnassignedShards(true, nodeQueue, unassignedShardMap); + } + + private void allocateUnassignedReplicas(Queue nodeQueue, Map unassignedShardMap) { + allocateUnassignedShards(false, nodeQueue, unassignedShardMap); + } + + private void ignoreRemainingShards(Map unassignedShardMap) { + for (UnassignedIndexShards indexShards : unassignedShardMap.values()) { + for (ShardRouting shard : indexShards.getPrimaries()) { + routingNodes.unassigned().ignoreShard(shard, UnassignedInfo.AllocationStatus.DECIDERS_NO, allocation.changes()); + } + for (ShardRouting shard : indexShards.getReplicas()) { + routingNodes.unassigned().ignoreShard(shard, UnassignedInfo.AllocationStatus.DECIDERS_NO, allocation.changes()); + } + } + } + + private void allocateUnassignedShards( + boolean primaries, + Queue nodeQueue, + Map unassignedShardMap + ) { + logger.debug("Allocating unassigned {}. Nodes available in queue: [{}]", (primaries ? "primaries" : "replicas"), nodeQueue.size()); + + // Iterate through all shards index by index and allocate them + for (String index : unassignedShardMap.keySet()) { + if (nodeQueue.isEmpty()) { + break; + } + + UnassignedIndexShards indexShards = unassignedShardMap.get(index); + Queue shardsToAllocate = primaries ? indexShards.getPrimaries() : indexShards.getReplicas(); + if (shardsToAllocate.isEmpty()) { + continue; + } + logger.debug("Allocating shards for index: [{}]", index); + + while (!shardsToAllocate.isEmpty() && !nodeQueue.isEmpty()) { + ShardRouting shard = shardsToAllocate.poll(); + if (shard.assignedToNode()) { + if (logger.isDebugEnabled()) { + logger.debug("Shard: {} already assigned to node: [{}]", shardShortSummary(shard), shard.currentNodeId()); + } + continue; + } + + Decision shardLevelDecision = allocation.deciders().canAllocate(shard, allocation); + if (shardLevelDecision.type() == Decision.Type.NO) { + if (logger.isDebugEnabled()) { + logger.debug( + "Ignoring shard: [{}] as is cannot be allocated to any node. Shard level decisions: [{}][{}].", + shardShortSummary(shard), + shardLevelDecision.getDecisions(), + shardLevelDecision.getExplanation() + ); + } + routingNodes.unassigned().ignoreShard(shard, UnassignedInfo.AllocationStatus.DECIDERS_NO, allocation.changes()); + continue; + } + + tryAllocateUnassignedShard(nodeQueue, shard); + } + } + } + + /** + * Tries to allocate an unassigned shard to one of the nodes within the node queue. + * @param nodeQueue ordered list of nodes to try allocation + * @param shard the unassigned shard which needs to be allocated + */ + private void tryAllocateUnassignedShard(Queue nodeQueue, ShardRouting shard) { + boolean allocated = false; + boolean throttled = false; + Set nodesCheckedForShard = new HashSet<>(); + while (!nodeQueue.isEmpty()) { + RoutingNode node = nodeQueue.poll(); + Decision allocateDecision = allocation.deciders().canAllocate(shard, node, allocation); + nodesCheckedForShard.add(node.nodeId()); + if (allocateDecision.type() == Decision.Type.YES) { + if (logger.isTraceEnabled()) { + logger.trace("Assigned shard [{}] to [{}]", shardShortSummary(shard), node.nodeId()); + } + final long shardSize = DiskThresholdDecider.getExpectedShardSize( + shard, + ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE, + allocation.clusterInfo(), + allocation.snapshotShardSizeInfo(), + allocation.metadata(), + allocation.routingTable() + ); + ShardRouting initShard = routingNodes.initializeShard(shard, node.nodeId(), null, shardSize, allocation.changes()); + nodeQueue.offer(node); + allocated = true; + break; + } else { + if (logger.isTraceEnabled()) { + logger.trace( + "Cannot allocate shard: {} on node [{}]. Decisions: [{}]", + shardShortSummary(shard), + node.nodeId(), + allocateDecision.getDecisions() + ); + } + throttled = throttled || allocateDecision.type() == Decision.Type.THROTTLE; + + Decision nodeLevelDecision = allocation.deciders().canAllocateAnyShardToNode(node, allocation); + if (nodeLevelDecision.type() == Decision.Type.YES) { + if (logger.isTraceEnabled()) { + logger.trace( + "Node: [{}] can still accept shards, retaining it in queue - [{}]", + node.nodeId(), + nodeLevelDecision.getDecisions() + ); + } + nodeQueue.offer(node); + } else { + if (logger.isTraceEnabled()) { + logger.trace( + "Cannot allocate any shard to node: [{}]. Removing from queue. Node level decisions: [{}],[{}]", + node.nodeId(), + nodeLevelDecision.getDecisions(), + nodeLevelDecision.getExplanation() + ); + } + } + + // Break out if all nodes in the queue have been checked for this shard + if (nodeQueue.stream().allMatch(rn -> nodesCheckedForShard.contains(rn.nodeId()))) { + throttled = true; + break; + } + } + } + + if (!allocated) { + UnassignedInfo.AllocationStatus status = throttled + ? UnassignedInfo.AllocationStatus.DECIDERS_THROTTLED + : UnassignedInfo.AllocationStatus.DECIDERS_NO; + routingNodes.unassigned().ignoreShard(shard, status, allocation.changes()); + } + } + + private void tryRebalanceNode( + RoutingNode sourceNode, + ArrayDeque targetNodes, + int avgPrimary, + ObjectIntHashMap primaryCount + ) { + long shardsToBalance = primaryCount.get(sourceNode.nodeId()) - avgPrimary; + assert shardsToBalance >= 0 : "Shards to balance should be greater than 0, but found negative"; + Iterator shardIterator = sourceNode.copyShards().iterator(); + Set nodesCheckedForRelocation = new HashSet<>(); + + // Try to relocate the valid shards on the sourceNode, one at a time; + // until either sourceNode is balanced OR no more active primary shard available OR all the target nodes are exhausted + while (shardsToBalance > 0 && shardIterator.hasNext() && !targetNodes.isEmpty()) { + // Find an active primary shard to relocate + ShardRouting shard = shardIterator.next(); + if (!shard.started() || !shard.primary() || !RoutingPool.REMOTE_CAPABLE.equals(RoutingPool.getShardPool(shard, allocation))) { + continue; + } + + while (!targetNodes.isEmpty()) { + // Find a valid target node that can accommodate the current shard relocation + RoutingNode targetNode = targetNodes.poll(); + if (primaryCount.get(targetNode.nodeId()) >= avgPrimary) { + logger.trace("Avg shard limit reached for node: [{}]. Removing from queue.", targetNode.nodeId()); + continue; + } + + // Try relocate the shard on the target node + Decision rebalanceDecision = tryRelocateShard(shard, targetNode); + + if (rebalanceDecision.type() == Decision.Type.YES) { + shardsToBalance--; + primaryCount.addTo(targetNode.nodeId(), 1); + targetNodes.offer(targetNode); + break; + + // If the relocation attempt failed for the shard, check if the target node can accommodate any other shard; else remove + // the target node from the target list + } else { + Decision nodeDecision = allocation.deciders().canAllocateAnyShardToNode(targetNode, allocation); + if (nodeDecision.type() == Decision.Type.YES) { + targetNodes.offer(targetNode); + nodesCheckedForRelocation.add(targetNode.nodeId()); + } else { + if (logger.isTraceEnabled()) { + logger.trace( + "Cannot allocate any shard to node: [{}]. Removing from queue. Node level decisions: [{}],[{}]", + targetNode.nodeId(), + nodeDecision.getDecisions(), + nodeDecision.toString() + ); + } + } + } + + // If all the target nodes are exhausted for the current shard; skip to next shard + if (targetNodes.stream().allMatch(node -> nodesCheckedForRelocation.contains(node.nodeId()))) { + break; + } + } + } + } + + /** + * For every primary shard for which this method is invoked, + * swap is attempted with the destination node in case replica shard is present. + * In case replica is not present, relocation of the shard id performed. + */ + private Decision tryRelocateShard(ShardRouting shard, RoutingNode destinationNode) { + // Check if there is already a replica for the shard on the destination node. + // Then we can directly swap the replica with the primary shards. + // Invariant: We only allow swap relocation on remote shards. + ShardRouting replicaShard = destinationNode.getByShardId(shard.shardId()); + if (replicaShard != null) { + assert !replicaShard.primary() : "Primary Shard found while expected Replica during shard rebalance"; + return executeSwapShard(shard, replicaShard, allocation); + } + + // Since no replica present on the destinationNode; try relocating the shard to the destination node + Decision allocationDecision = allocation.deciders().canAllocate(shard, destinationNode, allocation); + Decision rebalanceDecision = allocation.deciders().canRebalance(shard, allocation); + logger.trace( + "Relocating shard [{}] from node [{}] to node [{}]. AllocationDecision: [{}]. AllocationExplanation: [{}]. " + + "RebalanceDecision: [{}]. RebalanceExplanation: [{}]", + shard.id(), + shard.currentNodeId(), + destinationNode.nodeId(), + allocationDecision.type(), + allocationDecision.toString(), + rebalanceDecision.type(), + rebalanceDecision.toString() + ); + + // Perform the relocation of allocation and rebalance decisions are YES + if ((allocationDecision.type() == Decision.Type.YES) && (rebalanceDecision.type() == Decision.Type.YES)) { + final long shardSize = allocation.clusterInfo().getShardSize(shard, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE); + ShardRouting targetShard = routingNodes.relocateShard(shard, destinationNode.nodeId(), shardSize, allocation.changes()).v2(); + logger.info("Relocated shard [{}] to node [{}] during primary Rebalance", shard, targetShard.currentNodeId()); + return Decision.YES; + } + + if ((allocationDecision.type() == Decision.Type.THROTTLE) || (rebalanceDecision.type() == Decision.Type.THROTTLE)) { + return Decision.THROTTLE; + } + + return Decision.NO; + } + + private Decision executeSwapShard(ShardRouting primaryShard, ShardRouting replicaShard, RoutingAllocation allocation) { + if (!replicaShard.started()) { + return new Decision.Single(Decision.Type.NO); + } + + allocation.routingNodes().swapPrimaryWithReplica(logger, primaryShard, replicaShard, allocation.changes()); + return new Decision.Single(Decision.Type.YES); + } + + private void failUnattemptedShards() { + RoutingNodes.UnassignedShards.UnassignedIterator unassignedIterator = routingNodes.unassigned().iterator(); + while (unassignedIterator.hasNext()) { + ShardRouting shard = unassignedIterator.next(); + UnassignedInfo unassignedInfo = shard.unassignedInfo(); + if (shard.primary() && unassignedInfo.getLastAllocationStatus() == UnassignedInfo.AllocationStatus.NO_ATTEMPT) { + unassignedIterator.updateUnassigned( + new UnassignedInfo( + unassignedInfo.getReason(), + unassignedInfo.getMessage(), + unassignedInfo.getFailure(), + unassignedInfo.getNumFailedAllocations(), + unassignedInfo.getUnassignedTimeInNanos(), + unassignedInfo.getUnassignedTimeInMillis(), + unassignedInfo.isDelayed(), + UnassignedInfo.AllocationStatus.DECIDERS_NO, + Collections.emptySet() + ), + shard.recoverySource(), + allocation.changes() + ); + } + } + } + + private Queue getShuffledRemoteNodes() { + List nodeList = getRemoteRoutingNodes(); + Randomness.shuffle(nodeList); + return new ArrayDeque<>(nodeList); + } + + /** + * Filters out and returns the list of {@link RoutingPool#REMOTE_CAPABLE} nodes from the routing nodes in cluster. + * @return list of {@link RoutingPool#REMOTE_CAPABLE} routing nodes. + */ + private List getRemoteRoutingNodes() { + List nodeList = new ArrayList<>(); + for (RoutingNode rNode : routingNodes) { + if (RoutingPool.REMOTE_CAPABLE.equals(RoutingPool.getNodePool(rNode))) { + nodeList.add(rNode); + } + } + return nodeList; + } + + /** + * {@link UnassignedIndexShards} maintains a queue of unassigned remote shards for allocation operations within + * the cluster. + * + * @opensearch.internal + */ + public static class UnassignedIndexShards { + private final Queue primaries = new ArrayDeque<>(); + private final Queue replicas = new ArrayDeque<>(); + + public void addShard(ShardRouting shard) { + if (shard.primary()) { + primaries.add(shard); + } else { + replicas.add(shard); + } + } + + public Queue getPrimaries() { + return primaries; + } + + public Queue getReplicas() { + return replicas; + } + } + + private String shardShortSummary(ShardRouting shard) { + return "[" + shard.getIndexName() + "]" + "[" + shard.getId() + "]" + "[" + (shard.primary() ? "p" : "r") + "]"; + } + +} diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/TargetPoolAllocationDecider.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/TargetPoolAllocationDecider.java new file mode 100644 index 0000000000000..c87f7d16079e9 --- /dev/null +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/TargetPoolAllocationDecider.java @@ -0,0 +1,104 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cluster.routing.allocation.decider; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.routing.RoutingNode; +import org.opensearch.cluster.routing.RoutingPool; +import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.cluster.routing.allocation.RoutingAllocation; + +/** + * {@link TargetPoolAllocationDecider} ensures that the different shard types are assigned to the nodes with + * appropriate capabilities. The node pools with respective capabilities are defined within {@link RoutingPool}. + * + * @opensearch.internal + */ +public class TargetPoolAllocationDecider extends AllocationDecider { + private static final Logger logger = LogManager.getLogger(TargetPoolAllocationDecider.class); + + public static final String NAME = "target_pool"; + + @Override + public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) { + RoutingPool shardPool = RoutingPool.getShardPool(shardRouting, allocation); + RoutingPool targetNodePool = RoutingPool.getNodePool(node); + if (RoutingPool.REMOTE_CAPABLE.equals(shardPool) && RoutingPool.LOCAL_ONLY.equals(targetNodePool)) { + logger.debug( + "Shard: [{}] has target pool: [{}]. Cannot allocate on node: [{}] with target pool: [{}]", + shardRouting.shortSummary(), + shardPool, + node.node(), + targetNodePool + ); + return allocation.decision( + Decision.NO, + NAME, + "Routing pools are incompatible. Shard pool: [%s], Node Pool: [%s]", + shardPool, + targetNodePool + ); + } + return allocation.decision( + Decision.YES, + NAME, + "Routing pools are compatible. Shard pool: [%s], Node Pool: [%s]", + shardPool, + targetNodePool + ); + } + + @Override + public Decision canAllocate(IndexMetadata indexMetadata, RoutingNode node, RoutingAllocation allocation) { + return canAllocateInTargetPool(indexMetadata, node.node(), allocation); + } + + @Override + public Decision canForceAllocatePrimary(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) { + logger.debug("Evaluating force allocation for primary shard."); + return canAllocate(shardRouting, node, allocation); + } + + public Decision shouldAutoExpandToNode(IndexMetadata indexMetadata, DiscoveryNode node, RoutingAllocation allocation) { + logger.debug("Evaluating node: {} for autoExpandReplica eligibility of index: {}", node, indexMetadata.getIndex()); + return canAllocateInTargetPool(indexMetadata, node, allocation); + } + + private Decision canAllocateInTargetPool(IndexMetadata indexMetadata, DiscoveryNode node, RoutingAllocation allocation) { + RoutingPool indexPool = RoutingPool.getIndexPool(indexMetadata); + RoutingPool targetNodePool = RoutingPool.getNodePool(node); + if (RoutingPool.REMOTE_CAPABLE.equals(indexPool) && RoutingPool.LOCAL_ONLY.equals(targetNodePool)) { + logger.debug( + "Index: [{}] has target pool: [{}]. Cannot allocate on node: [{}] with target pool: [{}]", + indexMetadata.getIndex().getName(), + indexPool, + node, + targetNodePool + ); + return allocation.decision( + Decision.NO, + NAME, + "Routing pools are incompatible. Index pool: [%s], Node Pool: [%s]", + indexPool, + targetNodePool + ); + } + return allocation.decision( + Decision.YES, + NAME, + "Routing pools are compatible. Index pool: [%s], Node Pool: [%s]", + indexPool, + targetNodePool + ); + } + +} diff --git a/server/src/test/java/org/opensearch/cluster/routing/AllocationIdTests.java b/server/src/test/java/org/opensearch/cluster/routing/AllocationIdTests.java index e266eacdc0320..fc008762edd35 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/AllocationIdTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/AllocationIdTests.java @@ -142,6 +142,24 @@ public void testMoveToUnassigned() { assertThat(shard.allocationId(), nullValue()); } + public void testMovePrimaryToReplica() { + logger.info("-- build started shard"); + ShardRouting shard = ShardRouting.newUnassigned( + new ShardId("test", "_na_", 0), + true, + ExistingStoreRecoverySource.INSTANCE, + new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, null) + ); + shard = shard.initialize("node1", null, -1); + shard = shard.moveToStarted(); + AllocationId originalAllocationId = shard.allocationId(); + + logger.info("-- move to replica"); + shard = shard.moveActivePrimaryToReplica(); + assertNotNull(shard.allocationId()); + assertEquals(originalAllocationId, shard.allocationId()); + } + public void testSerialization() throws IOException { AllocationId allocationId = AllocationId.newInitializing(); if (randomBoolean()) { diff --git a/server/src/test/java/org/opensearch/cluster/routing/RoutingNodesTests.java b/server/src/test/java/org/opensearch/cluster/routing/RoutingNodesTests.java index 3e9088d63cfb4..73136a71bc12a 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/RoutingNodesTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/RoutingNodesTests.java @@ -33,6 +33,7 @@ package org.opensearch.cluster.routing; import org.junit.Before; +import org.mockito.Mockito; import org.opensearch.Version; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.OpenSearchAllocationTestCase; @@ -160,4 +161,40 @@ public void testInterleavedShardIterator() { } assert shardCount == this.totalNumberOfShards; } + + public void testSwapPrimaryWithReplica() { + // Initialize all the shards for test index 1 and 2 + initPrimaries(); + startInitializingShards(TEST_INDEX_1); + startInitializingShards(TEST_INDEX_1); + startInitializingShards(TEST_INDEX_2); + startInitializingShards(TEST_INDEX_2); + + // Create primary shard count imbalance between two nodes + final RoutingNodes routingNodes = this.clusterState.getRoutingNodes(); + final RoutingNode node0 = routingNodes.node("node0"); + final RoutingNode node1 = routingNodes.node("node1"); + final List shardRoutingList = node0.shardsWithState(TEST_INDEX_1, ShardRoutingState.STARTED); + final RoutingChangesObserver routingChangesObserver = Mockito.mock(RoutingChangesObserver.class); + int swaps = 0; + + for (ShardRouting routing : shardRoutingList) { + if (routing.primary()) { + ShardRouting swap = node1.getByShardId(routing.shardId()); + routingNodes.swapPrimaryWithReplica(logger, routing, swap, routingChangesObserver); + swaps++; + } + } + Mockito.verify(routingChangesObserver, Mockito.times(swaps)).replicaPromoted(Mockito.any()); + + final List shards = node1.shardsWithState(TEST_INDEX_1, ShardRoutingState.STARTED); + int shardCount = 0; + for (ShardRouting shard : shards) { + if (shard.primary()) { + shardCount++; + } + } + + assertTrue(shardCount >= swaps); + } } diff --git a/server/src/test/java/org/opensearch/cluster/routing/ShardRoutingTests.java b/server/src/test/java/org/opensearch/cluster/routing/ShardRoutingTests.java index 1dd27cb706c64..a914ef5da31ca 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/ShardRoutingTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/ShardRoutingTests.java @@ -301,6 +301,18 @@ public void testEqualsIgnoringVersion() { } } + public void testSwapPrimaryWithReplica() { + final ShardRouting unassignedShard0 = TestShardRouting.newShardRouting("test", 0, null, false, ShardRoutingState.UNASSIGNED); + assertThrows(AssertionError.class, unassignedShard0::moveActivePrimaryToReplica); + + final ShardRouting activeShard0 = TestShardRouting.newShardRouting("test", 0, "node-1", false, ShardRoutingState.STARTED); + assertThrows(IllegalShardRoutingStateException.class, activeShard0::moveActivePrimaryToReplica); + + final ShardRouting activeShard1 = TestShardRouting.newShardRouting("test", 0, "node-1", true, ShardRoutingState.STARTED); + final ShardRouting activeReplicaShard1 = activeShard1.moveActivePrimaryToReplica(); + assertFalse(activeReplicaShard1.primary()); + } + public void testExpectedSize() throws IOException { final int iters = randomIntBetween(10, 100); for (int i = 0; i < iters; i++) { diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/RemoteShardsAllocateUnassignedTests.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/RemoteShardsAllocateUnassignedTests.java new file mode 100644 index 0000000000000..7c45b20ecee1f --- /dev/null +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/RemoteShardsAllocateUnassignedTests.java @@ -0,0 +1,123 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cluster.routing.allocation; + +import com.carrotsearch.hppc.ObjectIntHashMap; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.routing.RoutingNode; +import org.opensearch.cluster.routing.RoutingNodes; +import org.opensearch.cluster.routing.RoutingPool; +import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.cluster.routing.allocation.allocator.RemoteShardsBalancer; + +import java.util.Map; + +public class RemoteShardsAllocateUnassignedTests extends RemoteShardsBalancerBaseTestCase { + + /** + * Test Remote Shards Balancer initialization. + */ + public void testInit() { + int localOnlyNodes = 7; + int remoteCapableNodes = 3; + int localIndices = 10; + int remoteIndices = 15; + ClusterState clusterState = createInitialCluster(localOnlyNodes, remoteCapableNodes, localIndices, remoteIndices); + RoutingNodes routingNodes = new RoutingNodes(clusterState, false); + RoutingAllocation allocation = getRoutingAllocation(clusterState, routingNodes); + + RemoteShardsBalancer remoteShardsBalancer = new RemoteShardsBalancer(logger, allocation); + Map unassignedShardMap = remoteShardsBalancer.groupUnassignedShardsByIndex(); + + assertEquals(remoteIndices, unassignedShardMap.size()); + for (String index : unassignedShardMap.keySet()) { + assertTrue(index.startsWith(REMOTE_IDX_PREFIX)); + RemoteShardsBalancer.UnassignedIndexShards indexShards = unassignedShardMap.get(index); + assertEquals(5, indexShards.getPrimaries().size()); + for (ShardRouting shard : indexShards.getPrimaries()) { + assertTrue(shard.primary()); + assertEquals(shard.getIndexName(), index); + } + assertEquals(5, indexShards.getReplicas().size()); + for (ShardRouting shard : indexShards.getReplicas()) { + assertFalse(shard.primary()); + assertEquals(shard.getIndexName(), index); + } + } + } + + /** + * Test remote unassigned shard allocation for standard new cluster setup. + */ + public void testPrimaryAllocation() { + int localOnlyNodes = 7; + int remoteCapableNodes = 3; + int localIndices = 10; + int remoteIndices = 13; + ClusterState clusterState = createInitialCluster(localOnlyNodes, remoteCapableNodes, localIndices, remoteIndices); + AllocationService service = this.createRemoteCapableAllocationService(); + clusterState = allocateShardsAndBalance(clusterState, service); + RoutingNodes routingNodes = clusterState.getRoutingNodes(); + RoutingAllocation allocation = getRoutingAllocation(clusterState, routingNodes); + + assertEquals(0, routingNodes.unassigned().size()); + + ObjectIntHashMap nodePrimariesCounter = new ObjectIntHashMap<>(); + for (ShardRouting shard : clusterState.getRoutingTable().allShards()) { + assertFalse(shard.unassigned()); + RoutingNode node = routingNodes.node(shard.currentNodeId()); + RoutingPool nodePool = RoutingPool.getNodePool(node); + RoutingPool shardPool = RoutingPool.getShardPool(shard, allocation); + if (RoutingPool.REMOTE_CAPABLE.equals(shardPool)) { + assertEquals(nodePool, shardPool); + } + if (RoutingPool.getNodePool(node) == RoutingPool.REMOTE_CAPABLE + && RoutingPool.REMOTE_CAPABLE.equals(RoutingPool.getShardPool(shard, allocation)) + && shard.primary()) { + nodePrimariesCounter.putOrAdd(node.nodeId(), 1, 1); + } + } + final int indexShardLimit = (int) Math.ceil(totalPrimaries(remoteIndices) / (float) remoteCapableNodes); + for (int primaries : nodePrimariesCounter.values) { + assertTrue(primaries <= indexShardLimit); + } + } + + /** + * Test remote unassigned shard allocation when remote capable nodes fail to come up. + */ + public void testAllocationRemoteCapableNodesUnavailable() { + int localOnlyNodes = 7; + int remoteCapableNodes = 0; + int localIndices = 10; + int remoteIndices = 13; + ClusterState clusterState = createInitialCluster(localOnlyNodes, remoteCapableNodes, localIndices, remoteIndices); + AllocationService service = this.createRemoteCapableAllocationService(); + clusterState = allocateShardsAndBalance(clusterState, service); + RoutingNodes routingNodes = clusterState.getRoutingNodes(); + RoutingAllocation allocation = getRoutingAllocation(clusterState, routingNodes); + + assertEquals(totalShards(remoteIndices), routingNodes.unassigned().size()); + + for (ShardRouting shard : clusterState.getRoutingTable().allShards()) { + if (RoutingPool.getShardPool(shard, allocation) == RoutingPool.REMOTE_CAPABLE) { + assertTrue(shard.unassigned()); + } else { + assertFalse(shard.unassigned()); + RoutingNode node = routingNodes.node(shard.currentNodeId()); + assertEquals(RoutingPool.LOCAL_ONLY, RoutingPool.getNodePool(node)); + } + } + for (RoutingNode node : routingNodes) { + if (RoutingPool.getNodePool(node) == RoutingPool.REMOTE_CAPABLE) { + assertEquals(0, node.size()); + } + } + } +} diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/RemoteShardsBalancerBaseTestCase.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/RemoteShardsBalancerBaseTestCase.java new file mode 100644 index 0000000000000..789de474d8ce5 --- /dev/null +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/RemoteShardsBalancerBaseTestCase.java @@ -0,0 +1,302 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cluster.routing.allocation; + +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.opensearch.Version; +import org.opensearch.cluster.ClusterInfo; +import org.opensearch.cluster.ClusterModule; +import org.opensearch.cluster.ClusterName; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.DiskUsage; +import org.opensearch.cluster.EmptyClusterInfoService; +import org.opensearch.cluster.OpenSearchAllocationTestCase; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.metadata.Metadata; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.node.DiscoveryNodeRole; +import org.opensearch.cluster.node.DiscoveryNodes; +import org.opensearch.cluster.routing.RoutingNodes; +import org.opensearch.cluster.routing.RoutingTable; +import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.cluster.routing.UnassignedInfo; +import org.opensearch.cluster.routing.allocation.allocator.BalancedShardsAllocator; +import org.opensearch.cluster.routing.allocation.allocator.ShardsAllocator; +import org.opensearch.cluster.routing.allocation.decider.AllocationDecider; +import org.opensearch.cluster.routing.allocation.decider.AllocationDeciders; +import org.opensearch.common.SuppressForbidden; +import org.opensearch.common.collect.ImmutableOpenMap; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.transport.TransportAddress; +import org.opensearch.common.util.FeatureFlags; +import org.opensearch.index.IndexModule; +import org.opensearch.test.gateway.TestGatewayAllocator; + +import java.net.Inet4Address; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.opensearch.cluster.routing.ShardRoutingState.INITIALIZING; + +@SuppressForbidden(reason = "feature flag overrides") +public abstract class RemoteShardsBalancerBaseTestCase extends OpenSearchAllocationTestCase { + protected static final String LOCAL_NODE_PREFIX = "local-only-node"; + protected static final String REMOTE_NODE_PREFIX = "remote-capable-node"; + protected static final String LOCAL_IDX_PREFIX = "local-idx"; + protected static final String REMOTE_IDX_PREFIX = "remote-idx"; + protected static final Set MANAGER_DATA_ROLES = Set.of( + DiscoveryNodeRole.CLUSTER_MANAGER_ROLE, + DiscoveryNodeRole.DATA_ROLE + ); + protected static final Set SEARCH_DATA_ROLES = Set.of( + DiscoveryNodeRole.CLUSTER_MANAGER_ROLE, + DiscoveryNodeRole.DATA_ROLE, + DiscoveryNodeRole.SEARCH_ROLE + ); + + protected static final int PRIMARIES = 5; + protected static final int REPLICAS = 1; + private static final int MAX_REROUTE_ITERATIONS = 1000; + + protected ClusterSettings EMPTY_CLUSTER_SETTINGS = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + + @BeforeClass + public static void setup() { + System.setProperty(FeatureFlags.SEARCHABLE_SNAPSHOT, "true"); + } + + @AfterClass + public static void teardown() { + System.setProperty(FeatureFlags.SEARCHABLE_SNAPSHOT, "false"); + } + + public String getNodeId(int id, boolean isRemote, String prefix) { + if (isRemote) { + return REMOTE_NODE_PREFIX + "-" + prefix + "-" + id; + } + return LOCAL_NODE_PREFIX + "-" + prefix + "-" + id; + } + + public String getNodeId(int id, boolean isRemote) { + return getNodeId(id, isRemote, ""); + } + + public String getIndexName(int id, boolean isRemote, String prefix) { + if (isRemote) { + return REMOTE_IDX_PREFIX + "-" + prefix + "-" + id; + } + return LOCAL_IDX_PREFIX + "-" + prefix + "-" + id; + } + + public String getIndexName(int id, boolean isRemote) { + return getIndexName(id, isRemote, ""); + } + + public RoutingAllocation getRoutingAllocation(ClusterState clusterState, RoutingNodes routingNodes) { + return new RoutingAllocation( + randomAllocationDeciders(Settings.Builder.EMPTY_SETTINGS, EMPTY_CLUSTER_SETTINGS, random()), + routingNodes, + clusterState, + EmptyClusterInfoService.INSTANCE.getClusterInfo(), + null, + System.nanoTime() + ); + } + + private Map createNodeAttributes(String nodeId) { + Map attr = new HashMap<>(); + attr.put("name", nodeId); + attr.put("node_id", nodeId); + return attr; + } + + public ClusterState addNodes(ClusterState clusterState, int nodeCount, boolean isRemote) { + DiscoveryNodes.Builder nb = DiscoveryNodes.builder(clusterState.nodes()); + for (int i = 0; i < nodeCount; i++) { + String id = getNodeId(i, isRemote, "new"); + nb.add(newNode(id, id, isRemote ? SEARCH_DATA_ROLES : MANAGER_DATA_ROLES)); + } + return ClusterState.builder(clusterState).nodes(nb.build()).build(); + } + + public ClusterState addNodeWithIP(ClusterState clusterState, int nodeId, boolean isRemote, String IP) throws UnknownHostException { + TransportAddress ipAddress = new TransportAddress(Inet4Address.getByName(IP), 9200); + DiscoveryNodes.Builder nb = DiscoveryNodes.builder(clusterState.nodes()); + String id = getNodeId(nodeId, isRemote, "new"); + nb.add( + new DiscoveryNode( + id, + id, + ipAddress, + createNodeAttributes(id), + isRemote ? SEARCH_DATA_ROLES : MANAGER_DATA_ROLES, + Version.CURRENT + ) + ); + return ClusterState.builder(clusterState).nodes(nb.build()).build(); + } + + public ClusterState terminateNodes(ClusterState clusterState, AllocationService service, List nodesToTerminate) { + if (nodesToTerminate.isEmpty()) { + return clusterState; + } + logger.info("Terminating following nodes from cluster: [{}]", nodesToTerminate); + DiscoveryNodes.Builder nb = DiscoveryNodes.builder(clusterState.nodes()); + nodesToTerminate.forEach(nb::remove); + clusterState = ClusterState.builder(clusterState).nodes(nb.build()).build(); + clusterState = service.disassociateDeadNodes(clusterState, false, "nodes-terminated"); + return clusterState; + } + + public ClusterState createInitialCluster(int localOnlyNodes, int remoteCapableNodes, int localIndices, int remoteIndices) { + Metadata.Builder mb = Metadata.builder(); + for (int i = 0; i < localIndices; i++) { + mb.put( + IndexMetadata.builder(getIndexName(i, false)) + .settings(settings(Version.CURRENT).put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), "0")) + .numberOfShards(PRIMARIES) + .numberOfReplicas(REPLICAS) + ); + } + + for (int i = 0; i < remoteIndices; i++) { + mb.put( + IndexMetadata.builder(getIndexName(i, true)) + .settings( + settings(Version.CURRENT).put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), "0") + .put(IndexModule.INDEX_STORE_TYPE_SETTING.getKey(), IndexModule.Type.REMOTE_SNAPSHOT.getSettingsKey()) + ) + .numberOfShards(PRIMARIES) + .numberOfReplicas(REPLICAS) + ); + } + Metadata metadata = mb.build(); + + RoutingTable.Builder rb = RoutingTable.builder(); + for (int i = 0; i < localIndices; i++) { + rb.addAsNew(metadata.index(getIndexName(i, false))); + } + for (int i = 0; i < remoteIndices; i++) { + rb.addAsNew(metadata.index(getIndexName(i, true))); + } + RoutingTable routingTable = rb.build(); + + DiscoveryNodes.Builder nb = DiscoveryNodes.builder(); + for (int i = 0; i < localOnlyNodes; i++) { + String name = getNodeId(i, false); + nb.add(newNode(name, name, MANAGER_DATA_ROLES)); + } + for (int i = 0; i < remoteCapableNodes; i++) { + String name = getNodeId(i, true); + nb.add(newNode(name, name, SEARCH_DATA_ROLES)); + } + DiscoveryNodes nodes = nb.build(); + return ClusterState.builder(ClusterName.DEFAULT).metadata(metadata).routingTable(routingTable).nodes(nodes).build(); + } + + protected ClusterState createRemoteIndex(ClusterState state, String indexName) { + Metadata metadata = Metadata.builder(state.metadata()) + .put( + IndexMetadata.builder(indexName) + .settings( + settings(Version.CURRENT).put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), "5m") + .put(IndexModule.INDEX_STORE_TYPE_SETTING.getKey(), IndexModule.Type.REMOTE_SNAPSHOT.getSettingsKey()) + ) + .numberOfShards(PRIMARIES) + .numberOfReplicas(REPLICAS) + ) + .build(); + RoutingTable routingTable = RoutingTable.builder(state.routingTable()).addAsNew(metadata.index(indexName)).build(); + return ClusterState.builder(state).metadata(metadata).routingTable(routingTable).build(); + } + + private AllocationDeciders remoteAllocationDeciders(Settings settings, ClusterSettings clusterSettings) { + List deciders = new ArrayList<>( + ClusterModule.createAllocationDeciders(settings, clusterSettings, Collections.emptyList()) + ); + Collections.shuffle(deciders, random()); + return new AllocationDeciders(deciders); + } + + public AllocationService createRemoteCapableAllocationService() { + Settings settings = Settings.Builder.EMPTY_SETTINGS; + return new OpenSearchAllocationTestCase.MockAllocationService( + randomAllocationDeciders(settings, EMPTY_CLUSTER_SETTINGS, random()), + new TestGatewayAllocator(), + createShardAllocator(settings), + EmptyClusterInfoService.INSTANCE, + SNAPSHOT_INFO_SERVICE_WITH_NO_SHARD_SIZES + ); + } + + public AllocationService createRemoteCapableAllocationService(String excludeNodes) { + Settings settings = Settings.builder().put("cluster.routing.allocation.exclude.node_id", excludeNodes).build(); + return new MockAllocationService( + randomAllocationDeciders(settings, EMPTY_CLUSTER_SETTINGS, random()), + new TestGatewayAllocator(), + createShardAllocator(settings), + EmptyClusterInfoService.INSTANCE, + SNAPSHOT_INFO_SERVICE_WITH_NO_SHARD_SIZES + ); + } + + public AllocationDeciders createAllocationDeciders() { + Settings settings = Settings.Builder.EMPTY_SETTINGS; + return randomAllocationDeciders(settings, EMPTY_CLUSTER_SETTINGS, random()); + + } + + public ClusterState allocateShardsAndBalance(ClusterState clusterState, AllocationService service) { + int iterations = 0; + do { + clusterState = service.applyStartedShards(clusterState, clusterState.getRoutingNodes().shardsWithState(INITIALIZING)); + clusterState = service.reroute(clusterState, "reroute"); + iterations++; + } while (!clusterState.getRoutingNodes().shardsWithState(INITIALIZING).isEmpty() && iterations < MAX_REROUTE_ITERATIONS); + return clusterState; + } + + public int totalShards(int indices) { + return indices * PRIMARIES * (REPLICAS + 1); + } + + public int totalPrimaries(int indices) { + return indices * PRIMARIES; + } + + public ShardsAllocator createShardAllocator(Settings settings) { + ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + return new BalancedShardsAllocator(settings, clusterSettings); + } + + /** + * ClusterInfo that always reports /dev/null for the shards' data paths. + */ + public static class DevNullClusterInfo extends ClusterInfo { + public DevNullClusterInfo( + ImmutableOpenMap leastAvailableSpaceUsage, + ImmutableOpenMap mostAvailableSpaceUsage, + ImmutableOpenMap shardSizes + ) { + super(leastAvailableSpaceUsage, mostAvailableSpaceUsage, shardSizes, null, ImmutableOpenMap.of()); + } + + @Override + public String getDataPath(ShardRouting shardRouting) { + return "/dev/null"; + } + } +} diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/RemoteShardsMoveShardsTests.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/RemoteShardsMoveShardsTests.java new file mode 100644 index 0000000000000..f2e79b319d0dd --- /dev/null +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/RemoteShardsMoveShardsTests.java @@ -0,0 +1,109 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cluster.routing.allocation; + +import org.opensearch.Version; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.health.ClusterHealthStatus; +import org.opensearch.cluster.health.ClusterStateHealth; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.metadata.Metadata; +import org.opensearch.cluster.routing.RoutingNodes; +import org.opensearch.cluster.routing.RoutingTable; +import org.opensearch.cluster.routing.ShardRoutingState; +import org.opensearch.index.IndexModule; + +public class RemoteShardsMoveShardsTests extends RemoteShardsBalancerBaseTestCase { + + /** + * Test reroute terminates gracefully if shards cannot move out of the excluded node + */ + public void testExcludeNodeIdMoveBlocked() { + int localOnlyNodes = 7; + int remoteCapableNodes = 2; + int localIndices = 10; + int remoteIndices = 13; + ClusterState clusterState = createInitialCluster(localOnlyNodes, remoteCapableNodes, localIndices, remoteIndices); + AllocationService service = this.createRemoteCapableAllocationService(); + clusterState = allocateShardsAndBalance(clusterState, service); + assertEquals(ClusterHealthStatus.GREEN, (new ClusterStateHealth(clusterState)).getStatus()); + + // Exclude a node + final String excludedNodeID = getNodeId(0, true); + service = createRemoteCapableAllocationService(excludedNodeID); + clusterState = allocateShardsAndBalance(clusterState, service); + + RoutingNodes routingNodes = clusterState.getRoutingNodes(); + assertEquals(ClusterHealthStatus.GREEN, (new ClusterStateHealth(clusterState)).getStatus()); + assertEquals(0, routingNodes.unassigned().size()); + assertTrue(routingNodes.node(excludedNodeID).size() > 0); + } + + /** + * Test move operations for index level allocation settings. + * Supported for local indices, not supported for remote indices. + */ + public void testIndexLevelExclusions() throws InterruptedException { + int localOnlyNodes = 7; + int remoteCapableNodes = 3; + int localIndices = 10; + int remoteIndices = 13; + ClusterState clusterState = createInitialCluster(localOnlyNodes, remoteCapableNodes, localIndices, remoteIndices); + AllocationService service = this.createRemoteCapableAllocationService(); + clusterState = allocateShardsAndBalance(clusterState, service); + RoutingNodes routingNodes = clusterState.getRoutingNodes(); + + assertEquals(ClusterHealthStatus.GREEN, (new ClusterStateHealth(clusterState)).getStatus()); + assertEquals(0, routingNodes.unassigned().size()); + + final String excludedLocalOnlyNode = getNodeId(0, false); + final String excludedRemoteCapableNode = getNodeId(0, true); + final String localIndex = routingNodes.node(excludedLocalOnlyNode).shardsWithState(ShardRoutingState.STARTED).get(0).getIndexName(); + final String remoteIndex = routingNodes.node(excludedRemoteCapableNode) + .shardsWithState(ShardRoutingState.STARTED) + .stream() + .filter(shardRouting -> shardRouting.getIndexName().startsWith(REMOTE_IDX_PREFIX)) + .findFirst() + .get() + .getIndexName(); + + Metadata.Builder mb = Metadata.builder(clusterState.metadata()); + mb.put( + IndexMetadata.builder(clusterState.metadata().index(localIndex)) + .settings( + settings(Version.CURRENT).put("index.number_of_shards", PRIMARIES) + .put("index.number_of_replicas", REPLICAS) + .put("index.routing.allocation.exclude._name", excludedLocalOnlyNode) + .build() + ) + ); + mb.put( + IndexMetadata.builder(clusterState.metadata().index(remoteIndex)) + .settings( + settings(Version.CURRENT).put("index.number_of_shards", PRIMARIES) + .put("index.number_of_replicas", REPLICAS) + .put(IndexModule.INDEX_STORE_TYPE_SETTING.getKey(), IndexModule.Type.REMOTE_SNAPSHOT.getSettingsKey()) + .put("index.routing.allocation.exclude._name", excludedRemoteCapableNode) + .build() + ) + ); + clusterState = ClusterState.builder(clusterState).metadata(mb.build()).build(); + + clusterState = allocateShardsAndBalance(clusterState, service); + assertEquals(ClusterHealthStatus.GREEN, (new ClusterStateHealth(clusterState)).getStatus()); + RoutingTable routingTable = clusterState.routingTable(); + + // No shard of updated local index should be on excluded local capable node + assertTrue(routingTable.allShards(localIndex).stream().noneMatch(shard -> shard.currentNodeId().equals(excludedLocalOnlyNode))); + + // Since remote index shards are untouched, at least one shard should + // continue to stay on the excluded remote capable node + assertTrue(routingTable.allShards(remoteIndex).stream().anyMatch(shard -> shard.currentNodeId().equals(excludedRemoteCapableNode))); + } +} diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/RemoteShardsRebalanceShardsTests.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/RemoteShardsRebalanceShardsTests.java new file mode 100644 index 0000000000000..c4358aaf12ac2 --- /dev/null +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/RemoteShardsRebalanceShardsTests.java @@ -0,0 +1,82 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cluster.routing.allocation; + +import com.carrotsearch.hppc.ObjectIntHashMap; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.routing.RoutingNode; +import org.opensearch.cluster.routing.RoutingNodes; +import org.opensearch.cluster.routing.RoutingPool; +import org.opensearch.cluster.routing.ShardRouting; + +public class RemoteShardsRebalanceShardsTests extends RemoteShardsBalancerBaseTestCase { + + /** + * Test remote shard allocation and balancing for standard new cluster setup. + * + * Post rebalance primaries should be balanced across all the nodes. + */ + public void testShardAllocationAndRebalance() { + int localOnlyNodes = 20; + int remoteCapableNodes = 40; + int localIndices = 40; + int remoteIndices = 80; + ClusterState clusterState = createInitialCluster(localOnlyNodes, remoteCapableNodes, localIndices, remoteIndices); + AllocationService service = this.createRemoteCapableAllocationService(); + clusterState = allocateShardsAndBalance(clusterState, service); + RoutingNodes routingNodes = clusterState.getRoutingNodes(); + RoutingAllocation allocation = getRoutingAllocation(clusterState, routingNodes); + + ObjectIntHashMap nodePrimariesCounter = getShardCounterPerNodeForRemoteCapablePool(clusterState, allocation, true); + ObjectIntHashMap nodeReplicaCounter = getShardCounterPerNodeForRemoteCapablePool(clusterState, allocation, false); + int avgPrimariesPerNode = getTotalShardCountAcrossNodes(nodePrimariesCounter) / remoteCapableNodes; + + // Primary and replica are balanced post first reroute + for (RoutingNode node : routingNodes) { + if (RoutingPool.REMOTE_CAPABLE.equals(RoutingPool.getNodePool(node))) { + assertInRange(nodePrimariesCounter.get(node.nodeId()), avgPrimariesPerNode, remoteCapableNodes - 1); + assertTrue(nodeReplicaCounter.get(node.nodeId()) >= 0); + } + } + } + + private ObjectIntHashMap getShardCounterPerNodeForRemoteCapablePool( + ClusterState clusterState, + RoutingAllocation allocation, + boolean primary + ) { + ObjectIntHashMap nodePrimariesCounter = new ObjectIntHashMap<>(); + for (ShardRouting shard : clusterState.getRoutingTable().allShards()) { + if (RoutingPool.REMOTE_CAPABLE.equals(RoutingPool.getShardPool(shard, allocation)) && shard.primary() == primary) { + nodePrimariesCounter.putOrAdd(shard.currentNodeId(), 1, 1); + } + } + return nodePrimariesCounter; + } + + private int getTotalShardCountAcrossNodes(ObjectIntHashMap nodePrimariesCounter) { + int totalShardCount = 0; + for (int value : nodePrimariesCounter.values) { + totalShardCount += value; + } + return totalShardCount; + } + + /** + * Asserts that the expected value is within the variance range. + * + * Being used to assert the average number of shards per node. + * Variance is required in case of non-absolute mean values; + * for example, total number of remote capable nodes in a cluster. + */ + private void assertInRange(int actual, int expectedMean, int variance) { + assertTrue(actual >= expectedMean - variance); + assertTrue(actual <= expectedMean + variance); + } +} diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/decider/TargetPoolAllocationDeciderTests.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/decider/TargetPoolAllocationDeciderTests.java new file mode 100644 index 0000000000000..9a415ed0b339b --- /dev/null +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/decider/TargetPoolAllocationDeciderTests.java @@ -0,0 +1,114 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cluster.routing.allocation.decider; + +import org.opensearch.Version; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.metadata.Metadata; +import org.opensearch.cluster.routing.RoutingNode; +import org.opensearch.cluster.routing.RoutingNodes; +import org.opensearch.cluster.routing.RoutingTable; +import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.cluster.routing.allocation.AllocationService; +import org.opensearch.cluster.routing.allocation.RemoteShardsBalancerBaseTestCase; +import org.opensearch.cluster.routing.allocation.RoutingAllocation; + +import java.util.Collections; +import java.util.stream.Collectors; + +public class TargetPoolAllocationDeciderTests extends RemoteShardsBalancerBaseTestCase { + public void testTargetPoolAllocationDecisions() { + ClusterState clusterState = createInitialCluster(3, 3, 2, 2); + AllocationService service = this.createRemoteCapableAllocationService(); + clusterState = allocateShardsAndBalance(clusterState, service); + + // Add an unassigned primary shard for force allocation checks + Metadata metadata = Metadata.builder(clusterState.metadata()) + .put(IndexMetadata.builder("test_local_unassigned").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(1)) + .build(); + RoutingTable routingTable = RoutingTable.builder(clusterState.routingTable()) + .addAsNew(metadata.index("test_local_unassigned")) + .build(); + clusterState = ClusterState.builder(clusterState).metadata(metadata).routingTable(routingTable).build(); + + // Add remote index unassigned primary + clusterState = createRemoteIndex(clusterState, "test_remote_unassigned"); + + RoutingNodes defaultRoutingNodes = clusterState.getRoutingNodes(); + RoutingAllocation globalAllocation = getRoutingAllocation(clusterState, defaultRoutingNodes); + + ShardRouting localShard = clusterState.routingTable() + .allShards(getIndexName(0, false)) + .stream() + .filter(ShardRouting::primary) + .collect(Collectors.toList()) + .get(0); + ShardRouting remoteShard = clusterState.routingTable() + .allShards(getIndexName(0, true)) + .stream() + .filter(ShardRouting::primary) + .collect(Collectors.toList()) + .get(0); + ShardRouting unassignedLocalShard = clusterState.routingTable() + .allShards("test_local_unassigned") + .stream() + .filter(ShardRouting::primary) + .collect(Collectors.toList()) + .get(0); + ShardRouting unassignedRemoteShard = clusterState.routingTable() + .allShards("test_remote_unassigned") + .stream() + .filter(ShardRouting::primary) + .collect(Collectors.toList()) + .get(0); + IndexMetadata localIdx = globalAllocation.metadata().getIndexSafe(localShard.index()); + IndexMetadata remoteIdx = globalAllocation.metadata().getIndexSafe(remoteShard.index()); + String localNodeId = LOCAL_NODE_PREFIX; + for (RoutingNode routingNode : globalAllocation.routingNodes()) { + if (routingNode.nodeId().startsWith(LOCAL_NODE_PREFIX)) { + localNodeId = routingNode.nodeId(); + break; + } + } + String remoteNodeId = remoteShard.currentNodeId(); + RoutingNode localOnlyNode = defaultRoutingNodes.node(localNodeId); + RoutingNode remoteCapableNode = defaultRoutingNodes.node(remoteNodeId); + + AllocationDeciders deciders = new AllocationDeciders(Collections.singletonList(new TargetPoolAllocationDecider())); + + // Incompatible Pools + assertEquals(Decision.NO.type(), deciders.canAllocate(remoteShard, localOnlyNode, globalAllocation).type()); + assertEquals(Decision.NO.type(), deciders.canAllocate(remoteIdx, localOnlyNode, globalAllocation).type()); + assertEquals(Decision.NO.type(), deciders.canForceAllocatePrimary(unassignedRemoteShard, localOnlyNode, globalAllocation).type()); + + // Compatible Pools + assertEquals(Decision.YES.type(), deciders.canAllocate(localShard, remoteCapableNode, globalAllocation).type()); + assertEquals(Decision.YES.type(), deciders.canAllocate(localIdx, remoteCapableNode, globalAllocation).type()); + assertEquals(Decision.YES.type(), deciders.canAllocate(remoteShard, remoteCapableNode, globalAllocation).type()); + assertEquals(Decision.YES.type(), deciders.canAllocate(remoteIdx, remoteCapableNode, globalAllocation).type()); + assertEquals(Decision.YES.type(), deciders.canAllocate(localShard, localOnlyNode, globalAllocation).type()); + assertEquals(Decision.YES.type(), deciders.canAllocate(localIdx, localOnlyNode, globalAllocation).type()); + assertEquals( + Decision.YES.type(), + deciders.canForceAllocatePrimary(unassignedRemoteShard, remoteCapableNode, globalAllocation).type() + ); + assertEquals(Decision.YES.type(), deciders.canForceAllocatePrimary(unassignedLocalShard, localOnlyNode, globalAllocation).type()); + assertEquals( + Decision.YES.type(), + deciders.canForceAllocatePrimary(unassignedLocalShard, remoteCapableNode, globalAllocation).type() + ); + + // Verify only remote nodes are used for auto expand replica decision for remote index + assertEquals(Decision.YES.type(), deciders.shouldAutoExpandToNode(localIdx, remoteCapableNode.node(), globalAllocation).type()); + assertEquals(Decision.NO.type(), deciders.shouldAutoExpandToNode(remoteIdx, localOnlyNode.node(), globalAllocation).type()); + assertEquals(Decision.YES.type(), deciders.shouldAutoExpandToNode(localIdx, localOnlyNode.node(), globalAllocation).type()); + assertEquals(Decision.YES.type(), deciders.shouldAutoExpandToNode(remoteIdx, remoteCapableNode.node(), globalAllocation).type()); + } +} diff --git a/test/framework/src/main/java/org/opensearch/cluster/OpenSearchAllocationTestCase.java b/test/framework/src/main/java/org/opensearch/cluster/OpenSearchAllocationTestCase.java index a13d337fa4d26..1d527140dc038 100644 --- a/test/framework/src/main/java/org/opensearch/cluster/OpenSearchAllocationTestCase.java +++ b/test/framework/src/main/java/org/opensearch/cluster/OpenSearchAllocationTestCase.java @@ -168,6 +168,10 @@ protected static DiscoveryNode newNode(String nodeId, Set rol return new DiscoveryNode(nodeId, buildNewFakeTransportAddress(), emptyMap(), roles, Version.CURRENT); } + protected static DiscoveryNode newNode(String nodeName, String nodeId, Set roles) { + return new DiscoveryNode(nodeName, nodeId, buildNewFakeTransportAddress(), emptyMap(), roles, Version.CURRENT); + } + protected static DiscoveryNode newNode(String nodeId, Version version) { return new DiscoveryNode(nodeId, buildNewFakeTransportAddress(), emptyMap(), CLUSTER_MANAGER_DATA_ROLES, version); } diff --git a/test/framework/src/main/java/org/opensearch/test/InternalTestCluster.java b/test/framework/src/main/java/org/opensearch/test/InternalTestCluster.java index 0a20654d1e441..122dadeb152bb 100644 --- a/test/framework/src/main/java/org/opensearch/test/InternalTestCluster.java +++ b/test/framework/src/main/java/org/opensearch/test/InternalTestCluster.java @@ -199,6 +199,11 @@ public final class InternalTestCluster extends TestCluster { nodeAndClient.node.settings() ); + private static final Predicate SEARCH_NODE_PREDICATE = nodeAndClient -> DiscoveryNode.hasRole( + nodeAndClient.node.settings(), + DiscoveryNodeRole.SEARCH_ROLE + ); + private static final Predicate NO_DATA_NO_CLUSTER_MANAGER_PREDICATE = nodeAndClient -> DiscoveryNode .isClusterManagerNode(nodeAndClient.node.settings()) == false && DiscoveryNode.isDataNode(nodeAndClient.node.settings()) == false; @@ -664,6 +669,24 @@ public synchronized void ensureAtLeastNumDataNodes(int n) { } } + /** + * Ensures that at least n search nodes are present in the cluster. + * if more nodes than n are present this method will not + * stop any of the running nodes. + */ + public synchronized void ensureAtLeastNumSearchNodes(int n) { + int size = numSearchNodes(); + if (size < n) { + logger.info("increasing cluster size from {} to {}", size, n); + if (numSharedDedicatedClusterManagerNodes > 0) { + startSearchOnlyNodes(n - size); + } else { + startNodes(n - size, Settings.builder().put(onlyRole(Settings.EMPTY, DiscoveryNodeRole.SEARCH_ROLE)).build()); + } + validateClusterFormed(); + } + } + /** * Ensures that at most n are up and running. * If less nodes that n are running this method @@ -2291,6 +2314,14 @@ public List startDataOnlyNodes(int numNodes, Settings settings) { return startNodes(numNodes, Settings.builder().put(onlyRole(settings, DiscoveryNodeRole.DATA_ROLE)).build()); } + public List startSearchOnlyNodes(int numNodes) { + return startSearchOnlyNodes(numNodes, Settings.EMPTY); + } + + public List startSearchOnlyNodes(int numNodes, Settings settings) { + return startNodes(numNodes, Settings.builder().put(onlyRole(settings, DiscoveryNodeRole.SEARCH_ROLE)).build()); + } + /** calculates a min cluster-manager nodes value based on the given number of cluster-manager nodes */ private static int getMinClusterManagerNodes(int eligibleClusterManagerNodes) { return eligibleClusterManagerNodes / 2 + 1; @@ -2347,6 +2378,10 @@ public int numDataNodes() { return dataNodeAndClients().size(); } + public int numSearchNodes() { + return searchNodeAndClients().size(); + } + @Override public int numDataAndClusterManagerNodes() { return filterNodes(nodes, DATA_NODE_PREDICATE.or(CLUSTER_MANAGER_NODE_PREDICATE)).size(); @@ -2406,6 +2441,10 @@ private Collection dataNodeAndClients() { return filterNodes(nodes, DATA_NODE_PREDICATE); } + private Collection searchNodeAndClients() { + return filterNodes(nodes, SEARCH_NODE_PREDICATE); + } + private static Collection filterNodes( Map map, Predicate predicate