diff --git a/server/src/main/java/org/opensearch/cluster/ClusterModule.java b/server/src/main/java/org/opensearch/cluster/ClusterModule.java index 6279447485348..ee3ef0cbc4d26 100644 --- a/server/src/main/java/org/opensearch/cluster/ClusterModule.java +++ b/server/src/main/java/org/opensearch/cluster/ClusterModule.java @@ -74,6 +74,7 @@ import org.opensearch.cluster.routing.allocation.decider.SameShardAllocationDecider; import org.opensearch.cluster.routing.allocation.decider.ShardsLimitAllocationDecider; import org.opensearch.cluster.routing.allocation.decider.SnapshotInProgressAllocationDecider; +import org.opensearch.cluster.routing.allocation.decider.TargetPoolAllocationDecider; import org.opensearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.ParseField; @@ -85,6 +86,7 @@ import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Setting.Property; import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.FeatureFlags; import org.opensearch.common.util.concurrent.ThreadContext; import org.opensearch.common.util.set.Sets; import org.opensearch.common.xcontent.NamedXContentRegistry; @@ -359,6 +361,9 @@ public static Collection createAllocationDeciders( addAllocationDecider(deciders, new ShardsLimitAllocationDecider(settings, clusterSettings)); addAllocationDecider(deciders, new AwarenessAllocationDecider(settings, clusterSettings)); addAllocationDecider(deciders, new NodeLoadAwareAllocationDecider(settings, clusterSettings)); + if (FeatureFlags.isEnabled(FeatureFlags.SEARCHABLE_SNAPSHOT)) { + addAllocationDecider(deciders, new TargetPoolAllocationDecider()); + } clusterPlugins.stream() .flatMap(p -> p.createAllocationDeciders(settings, clusterSettings).stream()) diff --git a/server/src/main/java/org/opensearch/cluster/routing/RoutingNodes.java b/server/src/main/java/org/opensearch/cluster/routing/RoutingNodes.java index 986df494917c0..3034664f3ea13 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/RoutingNodes.java +++ b/server/src/main/java/org/opensearch/cluster/routing/RoutingNodes.java @@ -702,6 +702,23 @@ assert node(failedShard.currentNodeId()).getByShardId(failedShard.shardId()) == + " was matched but wasn't removed"; } + public void swapPrimaryWithReplica( + Logger logger, + ShardRouting primaryShard, + ShardRouting replicaShard, + RoutingChangesObserver changes + ) { + assert primaryShard.primary() : "Invalid primary shard provided"; + assert !replicaShard.primary() : "Invalid Replica shard provided"; + + ShardRouting newPrimary = primaryShard.moveActivePrimaryToReplica(); + ShardRouting newReplica = replicaShard.moveActiveReplicaToPrimary(); + updateAssigned(primaryShard, newPrimary); + updateAssigned(replicaShard, newReplica); + logger.info("Swap relocation performed for shard [{}]", newPrimary.shortSummary()); + changes.replicaPromoted(newPrimary); + } + private void unassignPrimaryAndPromoteActiveReplicaIfExists( ShardRouting failedShard, UnassignedInfo unassignedInfo, diff --git a/server/src/main/java/org/opensearch/cluster/routing/RoutingPool.java b/server/src/main/java/org/opensearch/cluster/routing/RoutingPool.java new file mode 100644 index 0000000000000..8b70c3435e419 --- /dev/null +++ b/server/src/main/java/org/opensearch/cluster/routing/RoutingPool.java @@ -0,0 +1,44 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cluster.routing; + +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.routing.allocation.RoutingAllocation; +import org.opensearch.common.settings.Settings; +import org.opensearch.index.IndexModule; + +public enum RoutingPool { + LOCAL_ONLY, + REMOTE_CAPABLE; + + public static RoutingPool getNodePool(RoutingNode node) { + return getNodePool(node.node()); + } + + public static RoutingPool getNodePool(DiscoveryNode node) { + if (node.isSearchNode()) { + return REMOTE_CAPABLE; + } + return LOCAL_ONLY; + } + + public static RoutingPool getShardPool(ShardRouting shard, RoutingAllocation allocation) { + IndexMetadata indexMetadata = allocation.metadata().getIndexSafe(shard.index()); + return getIndexPool(indexMetadata); + } + + public static RoutingPool getIndexPool(IndexMetadata indexMetadata) { + Settings indexSettings = indexMetadata.getSettings(); + if (IndexModule.Type.REMOTE_SNAPSHOT.getSettingsKey().equals(indexSettings.get(IndexModule.INDEX_STORE_TYPE_SETTING.getKey()))) { + return REMOTE_CAPABLE; + } + return LOCAL_ONLY; + } +} diff --git a/server/src/main/java/org/opensearch/cluster/routing/ShardRouting.java b/server/src/main/java/org/opensearch/cluster/routing/ShardRouting.java index e3aa2a666d454..5945c7d17a69a 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/ShardRouting.java +++ b/server/src/main/java/org/opensearch/cluster/routing/ShardRouting.java @@ -534,6 +534,29 @@ public ShardRouting moveToStarted() { ); } + /** + * Make the active primary shard as replica + * + * @throws IllegalShardRoutingStateException if shard is already a replica + */ + public ShardRouting moveActivePrimaryToReplica() { + assert active() : "expected an active shard " + this; + if (!primary) { + throw new IllegalShardRoutingStateException(this, "Not a primary shard, can't move to replica"); + } + return new ShardRouting( + shardId, + currentNodeId, + relocatingNodeId, + false, + state, + recoverySource, + unassignedInfo, + allocationId, + expectedShardSize + ); + } + /** * Make the active shard primary unless it's not primary * diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java index 42c8f7987bf3d..d8761e9b1a78e 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java @@ -50,6 +50,7 @@ import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Setting.Property; import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.FeatureFlags; import java.util.HashMap; import java.util.HashSet; @@ -145,6 +146,13 @@ public void allocate(RoutingAllocation allocation) { localShardsBalancer.allocateUnassigned(); localShardsBalancer.moveShards(); localShardsBalancer.balance(); + + if (FeatureFlags.isEnabled(FeatureFlags.SEARCHABLE_SNAPSHOT)) { + final ShardsBalancer remoteShardsBalancer = new RemoteShardsBalancer(logger, allocation); + remoteShardsBalancer.allocateUnassigned(); + remoteShardsBalancer.moveShards(); + remoteShardsBalancer.balance(); + } } @Override diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/LocalShardsBalancer.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/LocalShardsBalancer.java index 53d7c827392d5..7b6f362281632 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/LocalShardsBalancer.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/LocalShardsBalancer.java @@ -15,6 +15,7 @@ import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.routing.RoutingNode; import org.opensearch.cluster.routing.RoutingNodes; +import org.opensearch.cluster.routing.RoutingPool; import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.cluster.routing.ShardRoutingState; import org.opensearch.cluster.routing.UnassignedInfo; @@ -27,9 +28,11 @@ import org.opensearch.cluster.routing.allocation.decider.Decision; import org.opensearch.cluster.routing.allocation.decider.DiskThresholdDecider; import org.opensearch.common.collect.Tuple; +import org.opensearch.common.util.FeatureFlags; import org.opensearch.gateway.PriorityComparator; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.Comparator; import java.util.HashMap; @@ -38,6 +41,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; import java.util.stream.StreamSupport; import static org.opensearch.cluster.routing.ShardRoutingState.RELOCATING; @@ -102,6 +106,12 @@ public float avgShardsPerNode(String index) { */ @Override public float avgShardsPerNode() { + if (FeatureFlags.isEnabled(FeatureFlags.SEARCHABLE_SNAPSHOT)) { + float totalShards = nodes.values().stream() + .map(BalancedShardsAllocator.ModelNode::numShards) + .reduce(0, Integer::sum); + return totalShards / nodes.size(); + } return avgShardsPerNode; } @@ -172,6 +182,11 @@ void balance() { */ @Override MoveDecision decideRebalance(final ShardRouting shard) { + if (FeatureFlags.isEnabled(FeatureFlags.SEARCHABLE_SNAPSHOT) && + RoutingPool.REMOTE_CAPABLE.equals(RoutingPool.getShardPool(shard, allocation))) { + return MoveDecision.NOT_TAKEN; + } + if (shard.started() == false) { // we can only rebalance started shards return MoveDecision.NOT_TAKEN; @@ -441,7 +456,19 @@ private void balanceByWeights() { * to the nodes we relocated them from. */ private String[] buildWeightOrderedIndices() { - final String[] indices = allocation.routingTable().indicesRouting().keys().toArray(String.class); + final String[] indices; + if (FeatureFlags.isEnabled(FeatureFlags.SEARCHABLE_SNAPSHOT)) { + final List localIndices = new ArrayList<>(); + for (String index: allocation.routingTable().indicesRouting().keys().toArray(String.class)) { + if (RoutingPool.LOCAL_ONLY.equals(RoutingPool.getIndexPool(metadata.index(index)))) { + localIndices.add(index); + } + } + indices = localIndices.toArray(new String[0]); + } else { + indices = allocation.routingTable().indicesRouting().keys().toArray(String.class); + } + final float[] deltas = new float[indices.length]; for (int i = 0; i < deltas.length; i++) { sorter.reset(indices[i]); @@ -507,7 +534,7 @@ void moveShards() { // shard movements, the goal of this iteration order is to achieve a fairer movement of shards from the nodes that are // offloading the shards. - // Trying to eliminate target nodes so that we donot unnecessarily iterate over source nodes + // Trying to eliminate target nodes so that we do not unnecessarily iterate over source nodes // when no target is eligible for (BalancedShardsAllocator.ModelNode currentNode : sorter.modelNodes) { checkAndAddInEligibleTargetNode(currentNode.getRoutingNode()); @@ -533,6 +560,11 @@ void moveShards() { ShardRouting shardRouting = it.next(); + if (FeatureFlags.isEnabled(FeatureFlags.SEARCHABLE_SNAPSHOT) && + RoutingPool.REMOTE_CAPABLE.equals(RoutingPool.getShardPool(shardRouting, allocation))) { + continue; + } + // Ensure that replicas don't relocate if primaries are being throttled and primary first is enabled if (movePrimaryFirst && primariesThrottled && !shardRouting.primary()) { logger.info( @@ -593,6 +625,11 @@ void moveShards() { */ @Override MoveDecision decideMove(final ShardRouting shardRouting) { + if (FeatureFlags.isEnabled(FeatureFlags.SEARCHABLE_SNAPSHOT) && + RoutingPool.REMOTE_CAPABLE.equals(RoutingPool.getShardPool(shardRouting, allocation))) { + return MoveDecision.NOT_TAKEN; + } + if (shardRouting.started() == false) { // we can only move started shards return MoveDecision.NOT_TAKEN; @@ -680,7 +717,8 @@ private Map buildModelFromAssigned() for (ShardRouting shard : rn) { assert rn.nodeId().equals(shard.currentNodeId()); /* we skip relocating shards here since we expect an initializing shard with the same id coming in */ - if (shard.state() != RELOCATING) { + if (RoutingPool.LOCAL_ONLY.equals(RoutingPool.getShardPool(shard, allocation)) && + shard.state() != RELOCATING) { node.addShard(shard); if (logger.isTraceEnabled()) { logger.trace("Assigned shard [{}] to node [{}]", shard, node.getNodeId()); @@ -735,7 +773,17 @@ void allocateUnassigned() { * if we allocate for instance (0, R, IDX1) we move the second replica to the secondary array and proceed with * the next replica. If we could not find a node to allocate (0,R,IDX1) we move all it's replicas to ignoreUnassigned. */ - ShardRouting[] primary = unassigned.drain(); + ShardRouting[] unassignedShards = unassigned.drain(); + if (FeatureFlags.isEnabled(FeatureFlags.SEARCHABLE_SNAPSHOT)) { + List allUnassignedShards = Arrays.stream(unassignedShards).collect(Collectors.toList()); + List localUnassignedShards = allUnassignedShards.stream() + .filter(shard -> RoutingPool.LOCAL_ONLY.equals(RoutingPool.getShardPool(shard, allocation))) + .collect(Collectors.toList()); + allUnassignedShards.removeAll(localUnassignedShards); + allUnassignedShards.forEach(shard -> routingNodes.unassigned().add(shard)); + unassignedShards = localUnassignedShards.toArray(new ShardRouting[localUnassignedShards.size()]); + } + ShardRouting[] primary = unassignedShards; ShardRouting[] secondary = new ShardRouting[primary.length]; int secondaryLength = 0; int primaryLength = primary.length; diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/RemoteShardsBalancer.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/RemoteShardsBalancer.java new file mode 100644 index 0000000000000..d35e4f4c5c659 --- /dev/null +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/RemoteShardsBalancer.java @@ -0,0 +1,579 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cluster.routing.allocation.allocator; + +import com.carrotsearch.hppc.ObjectIntHashMap; +import org.apache.logging.log4j.Logger; +import org.opensearch.cluster.routing.RoutingNode; +import org.opensearch.cluster.routing.RoutingNodes; +import org.opensearch.cluster.routing.RoutingPool; +import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.cluster.routing.UnassignedInfo; +import org.opensearch.cluster.routing.allocation.AllocateUnassignedDecision; +import org.opensearch.cluster.routing.allocation.MoveDecision; +import org.opensearch.cluster.routing.allocation.RoutingAllocation; +import org.opensearch.cluster.routing.allocation.decider.Decision; +import org.opensearch.cluster.routing.allocation.decider.DiskThresholdDecider; +import org.opensearch.common.Randomness; + +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.Set; + +/** + * A {@link RemoteShardsBalancer} used by the {@link BalancedShardsAllocator} to perform allocation operations + * for remote shards within the cluster. + * + * @opensearch.internal + */ +public final class RemoteShardsBalancer extends ShardsBalancer { + private final Logger logger; + private final RoutingAllocation allocation; + private final RoutingNodes routingNodes; + + public RemoteShardsBalancer(Logger logger, RoutingAllocation allocation) { + this.logger = logger; + this.allocation = allocation; + this.routingNodes = allocation.routingNodes(); + } + + /** + * Allocates unassigned remote shards on the routing node which are filtered using + * {@link #groupUnassignedShardsByIndex} + */ + @Override + void allocateUnassigned() { + if (routingNodes.unassigned().isEmpty()) { + logger.debug("No unassigned remote shards found."); + return; + } + + Queue nodeQueue = getShuffledRemoteNodes(); + if (nodeQueue.isEmpty()) { + logger.debug("No remote searcher nodes available for unassigned remote shards."); + failUnattemptedShards(); + return; + } + + Map unassignedShardMap = groupUnassignedShardsByIndex(); + allocateUnassignedPrimaries(nodeQueue, unassignedShardMap); + allocateUnassignedReplicas(nodeQueue, unassignedShardMap); + ignoreRemainingShards(unassignedShardMap); + } + + /** + * Performs shard movement for incompatible remote shards + */ + @Override + void moveShards() { + Queue eligibleNodes = new ArrayDeque<>(); + Queue excludedNodes = new ArrayDeque<>(); + + Iterator routingNodesIter = routingNodes.iterator(); + int throttledNodeCount = 0; + while (routingNodesIter.hasNext()) { + RoutingNode node = routingNodesIter.next(); + Decision nodeDecision = allocation.deciders().canAllocateAnyShardToNode(node, allocation); + /* canAllocateAnyShardToNode decision can be THROTTLE for throttled nodes. To classify + * as excluded nodes, we look for Decision.Type.NO + */ + if (nodeDecision.type() == Decision.Type.NO) { + excludedNodes.add(node); + } else if (nodeDecision.type() == Decision.Type.YES) { + eligibleNodes.add(node); + } else { + throttledNodeCount++; + } + logger.debug( + "Excluded Node Count: [{}], Eligible Node Count: [{}], Throttled Node Count: [{}]", + excludedNodes.size(), + eligibleNodes.size(), + throttledNodeCount + ); + } + + if (excludedNodes.isEmpty()) { + logger.debug("No excluded nodes found. Returning..."); + return; + } + + while (!eligibleNodes.isEmpty() && !excludedNodes.isEmpty()) { + RoutingNode sourceNode = excludedNodes.poll(); + for (ShardRouting shard : sourceNode) { + if (shard.started() == false) { + continue; + } + + if (!RoutingPool.REMOTE_CAPABLE.equals(RoutingPool.getShardPool(shard, allocation))) { + continue; + } + + if (eligibleNodes.isEmpty()) { + break; + } + + Set nodesCheckedForShard = new HashSet<>(); + while (!eligibleNodes.isEmpty()) { + RoutingNode targetNode = eligibleNodes.poll(); + Decision currentShardDecision = allocation.deciders().canAllocate(shard, targetNode, allocation); + + if (currentShardDecision.type() == Decision.Type.YES) { + if (logger.isDebugEnabled()) { + logger.debug( + "Moving shard: {} from node: [{}] to node: [{}]", + shardShortSummary(shard), + shard.currentNodeId(), + targetNode.nodeId() + ); + } + routingNodes.relocateShard( + shard, + targetNode.nodeId(), + allocation.clusterInfo().getShardSize(shard, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE), + allocation.changes() + ); + eligibleNodes.offer(targetNode); + break; + } else { + if (logger.isTraceEnabled()) { + logger.trace( + "Cannot move shard: {} to node: [{}]. Decisions: [{}]", + shardShortSummary(shard), + targetNode.nodeId(), + currentShardDecision.getDecisions() + ); + } + + Decision nodeLevelDecision = allocation.deciders().canAllocateAnyShardToNode(targetNode, allocation); + if (nodeLevelDecision.type() == Decision.Type.YES) { + logger.debug("Node: [{}] can still accept shards. Adding it back to the queue.", targetNode.nodeId()); + eligibleNodes.offer(targetNode); + nodesCheckedForShard.add(targetNode.nodeId()); + } else { + logger.debug("Node: [{}] cannot accept any more shards. Removing it from queue.", targetNode.nodeId()); + } + + // Break out if all nodes in the queue have been checked for this shard + if (eligibleNodes.stream().allMatch(rn -> nodesCheckedForShard.contains(rn.nodeId()))) { + break; + } + } + } + } + } + } + + /** + * Performs heuristic based balancing for remote shards within the cluster. + * It does so without accounting for the local shards located on any nodes within the cluster. + */ + @Override + void balance() { + Iterator nodesIter = routingNodes.iterator(); + List nodeList = new ArrayList<>(); + while (nodesIter.hasNext()) { + RoutingNode rNode = nodesIter.next(); + nodeList.add(rNode); + } + logger.trace("Performing balancing for remote shards."); + + if (nodeList.isEmpty()) { + logger.info("No eligible remote nodes found to perform balancing"); + return; + } + + ObjectIntHashMap primaryShardCount = new ObjectIntHashMap<>(); + int totalPrimaryShard = 0; + for (RoutingNode node : nodeList) { + int totalPrimaryPerNode = 0; + for (ShardRouting shard : node) { + if (RoutingPool.REMOTE_CAPABLE.equals(RoutingPool.getShardPool(shard, allocation)) + && shard.primary() + && (shard.initializing() || shard.started())) { + totalPrimaryShard++; + totalPrimaryPerNode++; + } + } + primaryShardCount.put(node.nodeId(), totalPrimaryPerNode); + } + + totalPrimaryShard += routingNodes.unassigned().getNumPrimaries(); + int avgPrimaryPerNode = (totalPrimaryShard + routingNodes.size() - 1) / routingNodes.size(); + + ArrayDeque sourceNodes = new ArrayDeque<>(); + ArrayDeque targetNodes = new ArrayDeque<>(); + for (RoutingNode node : nodeList) { + if (primaryShardCount.get(node.nodeId()) > avgPrimaryPerNode) { + sourceNodes.add(node); + } else if (primaryShardCount.get(node.nodeId()) < avgPrimaryPerNode) { + targetNodes.add(node); + } + } + + while (!sourceNodes.isEmpty() && !targetNodes.isEmpty()) { + RoutingNode sourceNode = sourceNodes.poll(); + tryRebalanceNode(sourceNode, targetNodes, avgPrimaryPerNode, primaryShardCount); + } + } + + @Override + AllocateUnassignedDecision decideAllocateUnassigned(ShardRouting shardRouting) { + throw new UnsupportedOperationException("remote shards balancer does not support decision operations"); + } + + @Override + MoveDecision decideMove(ShardRouting shardRouting) { + throw new UnsupportedOperationException("remote shards balancer does not support decision operations"); + } + + @Override + MoveDecision decideRebalance(ShardRouting shardRouting) { + throw new UnsupportedOperationException("remote shards balancer does not support decision operations"); + } + + public Map groupUnassignedShardsByIndex() { + HashMap unassignedShardMap = new HashMap<>(); + for (ShardRouting shard : routingNodes.unassigned().drain()) { + String index = shard.getIndexName(); + if (!RoutingPool.REMOTE_CAPABLE.equals(RoutingPool.getShardPool(shard, allocation))) { + routingNodes.unassigned().add(shard); + continue; + } + if (!unassignedShardMap.containsKey(index)) { + unassignedShardMap.put(index, new UnassignedIndexShards()); + } + UnassignedIndexShards indexShards = unassignedShardMap.get(index); + indexShards.addShard(shard); + } + return unassignedShardMap; + } + + private void allocateUnassignedPrimaries(Queue nodeQueue, Map unassignedShardMap) { + allocateUnassigned(true, nodeQueue, unassignedShardMap); + } + + private void allocateUnassignedReplicas(Queue nodeQueue, Map unassignedShardMap) { + allocateUnassigned(false, nodeQueue, unassignedShardMap); + } + + private void ignoreRemainingShards(Map unassignedShardMap) { + for (UnassignedIndexShards indexShards : unassignedShardMap.values()) { + for (ShardRouting shard : indexShards.getPrimaries()) { + routingNodes.unassigned().ignoreShard(shard, UnassignedInfo.AllocationStatus.DECIDERS_NO, allocation.changes()); + } + for (ShardRouting shard : indexShards.getReplicas()) { + routingNodes.unassigned().ignoreShard(shard, UnassignedInfo.AllocationStatus.DECIDERS_NO, allocation.changes()); + } + } + } + + private void allocateUnassigned( + boolean primaries, + Queue nodeQueue, + Map unassignedShardMap + ) { + logger.debug("Allocating unassigned {}. Nodes available in queue: [{}]", (primaries ? "primaries" : "replicas"), nodeQueue.size()); + + // Iterate through all shards index by index and allocate them + for (String index : unassignedShardMap.keySet()) { + if (nodeQueue.isEmpty()) { + break; + } + + UnassignedIndexShards indexShards = unassignedShardMap.get(index); + Queue shardsToAllocate = primaries ? indexShards.getPrimaries() : indexShards.getReplicas(); + if (shardsToAllocate.isEmpty()) { + continue; + } + logger.debug("Allocating shards for index: [{}]", index); + + while (!shardsToAllocate.isEmpty() && !nodeQueue.isEmpty()) { + ShardRouting shard = shardsToAllocate.poll(); + if (shard.assignedToNode()) { + if (logger.isDebugEnabled()) { + logger.debug("Shard: {} already assigned to node: [{}]", shardShortSummary(shard), shard.currentNodeId()); + } + continue; + } + + Decision shardLevelDecision = allocation.deciders().canAllocate(shard, allocation); + if (shardLevelDecision.type() == Decision.Type.NO) { + if (logger.isDebugEnabled()) { + logger.debug( + "Ignoring shard: [{}] as is cannot be allocated to any node. Shard level decisions: [{}][{}].", + shardShortSummary(shard), + shardLevelDecision.getDecisions(), + shardLevelDecision.getExplanation() + ); + } + routingNodes.unassigned().ignoreShard(shard, UnassignedInfo.AllocationStatus.DECIDERS_NO, allocation.changes()); + continue; + } + + boolean allocated = false; + boolean throttled = false; + Set nodesCheckedForShard = new HashSet<>(); + while (!nodeQueue.isEmpty()) { + RoutingNode node = nodeQueue.poll(); + Decision allocateDecision = allocation.deciders().canAllocate(shard, node, allocation); + nodesCheckedForShard.add(node.nodeId()); + if (allocateDecision.type() == Decision.Type.YES) { + if (logger.isTraceEnabled()) { + logger.trace("Assigned shard [{}] to [{}]", shardShortSummary(shard), node.nodeId()); + } + final long shardSize = DiskThresholdDecider.getExpectedShardSize( + shard, + ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE, + allocation.clusterInfo(), + allocation.snapshotShardSizeInfo(), + allocation.metadata(), + allocation.routingTable() + ); + ShardRouting initShard = routingNodes.initializeShard(shard, node.nodeId(), null, shardSize, allocation.changes()); + nodeQueue.offer(node); + allocated = true; + break; + } else { + if (logger.isTraceEnabled()) { + logger.trace( + "Cannot allocate shard: {} on node [{}]. Decisions: [{}]", + shardShortSummary(shard), + node.nodeId(), + allocateDecision.getDecisions() + ); + } + throttled = throttled || allocateDecision.type() == Decision.Type.THROTTLE; + + Decision nodeLevelDecision = allocation.deciders().canAllocateAnyShardToNode(node, allocation); + if (nodeLevelDecision.type() == Decision.Type.YES) { + if (logger.isTraceEnabled()) { + logger.trace( + "Node: [{}] can still accept shards, retaining it in queue - [{}]", + node.nodeId(), + nodeLevelDecision.getDecisions() + ); + } + nodeQueue.offer(node); + } else { + if (logger.isTraceEnabled()) { + logger.trace( + "Cannot allocate any shard to node: [{}]. Removing from queue. Node level decisions: [{}],[{}]", + node.nodeId(), + nodeLevelDecision.getDecisions(), + nodeLevelDecision.getExplanation() + ); + } + } + + // Break out if all nodes in the queue have been checked for this shard + if (nodeQueue.stream().allMatch(rn -> nodesCheckedForShard.contains(rn.nodeId()))) { + throttled = true; + break; + } + } + } + + if (!allocated) { + UnassignedInfo.AllocationStatus status = throttled + ? UnassignedInfo.AllocationStatus.DECIDERS_THROTTLED + : UnassignedInfo.AllocationStatus.DECIDERS_NO; + routingNodes.unassigned().ignoreShard(shard, status, allocation.changes()); + } + } + } + } + + private void tryRebalanceNode( + RoutingNode sourceNode, + ArrayDeque targetNodes, + int avgPrimary, + ObjectIntHashMap primaryCount + ) { + long shardsToBalance = primaryCount.get(sourceNode.nodeId()) - avgPrimary; + assert shardsToBalance >= 0 : "Shards to balance should be greater than 0, but found negative"; + Iterator shardIterator = sourceNode.copyShards().iterator(); + Set nodesCheckedForRelocation = new HashSet<>(); + + // Try to relocate the valid shards on the sourceNode, one at a time; + // until either sourceNode is balanced OR no more active primary shard available OR all the target nodes are exhausted + while (shardsToBalance > 0 && shardIterator.hasNext() && !targetNodes.isEmpty()) { + // Find an active primary shard to relocate + ShardRouting shard = shardIterator.next(); + if (!shard.started() || !shard.primary() || + !RoutingPool.REMOTE_CAPABLE.equals(RoutingPool.getShardPool(shard, allocation))) { + continue; + } + + while (!targetNodes.isEmpty()) { + // Find a valid target node that can accommodate the current shard relocation + RoutingNode targetNode = targetNodes.poll(); + if (primaryCount.get(targetNode.nodeId()) >= avgPrimary) { + logger.trace("Avg shard limit reached for node: [{}]. Removing from queue.", targetNode.nodeId()); + continue; + } + + // Try relocate the shard on the target node + Decision rebalanceDecision = tryRelocateShard(shard, targetNode); + + if (rebalanceDecision.type() == Decision.Type.YES) { + shardsToBalance--; + primaryCount.addTo(targetNode.nodeId(), 1); + targetNodes.offer(targetNode); + break; + + // If the relocation attempt failed for the shard, check if the target node can accommodate any other shard; else remove + // the target node from the target list + } else { + Decision nodeDecision = allocation.deciders().canAllocateAnyShardToNode(targetNode, allocation); + if (nodeDecision.type() == Decision.Type.YES) { + targetNodes.offer(targetNode); + nodesCheckedForRelocation.add(targetNode.nodeId()); + } else { + if (logger.isTraceEnabled()) { + logger.trace( + "Cannot allocate any shard to node: [{}]. Removing from queue. Node level decisions: [{}],[{}]", + targetNode.nodeId(), + nodeDecision.getDecisions(), + nodeDecision.toString() + ); + } + } + } + + // If all the target nodes are exhausted for the current shard; skip to next shard + if (targetNodes.stream().allMatch(node -> nodesCheckedForRelocation.contains(node.nodeId()))) { + break; + } + } + } + } + + /** + * For every primary shard for which this method is invoked, + * swap is attempted with the destination node in case replica shard is present. + * In case replica is not present, relocation of the shard id performed. + */ + private Decision tryRelocateShard(ShardRouting shard, RoutingNode destinationNode) { + // Check if there is already a replica for the shard on the destination node. + // Then we can directly swap the replica with the primary shards. + // Invariant: We only allow swap relocation on remote shards. + ShardRouting replicaShard = destinationNode.getByShardId(shard.shardId()); + if (replicaShard != null) { + assert !replicaShard.primary() : "Primary Shard found while expected Replica during shard rebalance"; + return executeSwapShard(shard, replicaShard, allocation); + } + + // Since no replica present on the destinationNode; try relocating the shard to the destination node + Decision allocationDecision = allocation.deciders().canAllocate(shard, destinationNode, allocation); + Decision rebalanceDecision = allocation.deciders().canRebalance(shard, allocation); + logger.trace( + "Relocating shard [{}] from node [{}] to node [{}]. AllocationDecision: [{}]. AllocationExplanation: [{}]. " + + "RebalanceDecision: [{}]. RebalanceExplanation: [{}]", + shard.id(), + shard.currentNodeId(), + destinationNode.nodeId(), + allocationDecision.type(), + allocationDecision.toString(), + rebalanceDecision.type(), + rebalanceDecision.toString() + ); + + // Perform the relocation of allocation and rebalance decisions are YES + if ((allocationDecision.type() == Decision.Type.YES) && (rebalanceDecision.type() == Decision.Type.YES)) { + final long shardSize = allocation.clusterInfo().getShardSize(shard, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE); + ShardRouting targetShard = routingNodes.relocateShard(shard, destinationNode.nodeId(), shardSize, allocation.changes()).v2(); + logger.info("Relocated shard [{}] to node [{}] during primary Rebalance", shard, targetShard.currentNodeId()); + return Decision.YES; + } + + if ((allocationDecision.type() == Decision.Type.THROTTLE) || (rebalanceDecision.type() == Decision.Type.THROTTLE)) { + return Decision.THROTTLE; + } + + return Decision.NO; + } + + private Decision executeSwapShard(ShardRouting primaryShard, ShardRouting replicaShard, RoutingAllocation allocation) { + if (!replicaShard.started()) { + return new Decision.Single(Decision.Type.NO); + } + + allocation.routingNodes().swapPrimaryWithReplica(logger, primaryShard, replicaShard, allocation.changes()); + return new Decision.Single(Decision.Type.YES); + } + + private void failUnattemptedShards() { + RoutingNodes.UnassignedShards.UnassignedIterator unassignedIterator = routingNodes.unassigned().iterator(); + while (unassignedIterator.hasNext()) { + ShardRouting shard = unassignedIterator.next(); + UnassignedInfo unassignedInfo = shard.unassignedInfo(); + if (shard.primary() && unassignedInfo.getLastAllocationStatus() == UnassignedInfo.AllocationStatus.NO_ATTEMPT) { + unassignedIterator.updateUnassigned( + new UnassignedInfo( + unassignedInfo.getReason(), + unassignedInfo.getMessage(), + unassignedInfo.getFailure(), + unassignedInfo.getNumFailedAllocations(), + unassignedInfo.getUnassignedTimeInNanos(), + unassignedInfo.getUnassignedTimeInMillis(), + unassignedInfo.isDelayed(), + UnassignedInfo.AllocationStatus.DECIDERS_NO, + Collections.emptySet() + ), + shard.recoverySource(), + allocation.changes() + ); + } + } + } + + private Queue getShuffledRemoteNodes() { + Iterator nodesIter = routingNodes.mutableIterator(); + List nodeList = new ArrayList<>(); + while (nodesIter.hasNext()) { + RoutingNode rNode = nodesIter.next(); + nodeList.add(rNode); + } + Randomness.shuffle(nodeList); + return new ArrayDeque<>(nodeList); + } + + public static class UnassignedIndexShards { + private final Queue primaries = new ArrayDeque<>(); + private final Queue replicas = new ArrayDeque<>(); + + public void addShard(ShardRouting shard) { + if (shard.primary()) { + primaries.add(shard); + } else { + replicas.add(shard); + } + } + + public Queue getPrimaries() { + return primaries; + } + + public Queue getReplicas() { + return replicas; + } + } + + public static String shardShortSummary(ShardRouting shard) { + return "[" + shard.getIndexName() + "]" + "[" + shard.getId() + "]" + "[" + (shard.primary() ? "p" : "r") + "]"; + } + +} diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/TargetPoolAllocationDecider.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/TargetPoolAllocationDecider.java new file mode 100644 index 0000000000000..09deb79c836ed --- /dev/null +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/TargetPoolAllocationDecider.java @@ -0,0 +1,68 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cluster.routing.allocation.decider; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.routing.RoutingNode; +import org.opensearch.cluster.routing.RoutingPool; +import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.cluster.routing.allocation.RoutingAllocation; + +public class TargetPoolAllocationDecider extends AllocationDecider { + private static final Logger logger = LogManager.getLogger(TargetPoolAllocationDecider.class); + + public static final String NAME = "target_pool"; + + @Override + public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) { + RoutingPool shardPool = RoutingPool.getShardPool(shardRouting, allocation); + RoutingPool targetNodePool = RoutingPool.getNodePool(node); + if (RoutingPool.REMOTE_CAPABLE.equals(shardPool) && RoutingPool.LOCAL_ONLY.equals(targetNodePool)) { + logger.debug("Shard: [{}] has target pool: [{}]. Cannot allocate on node: [{}] with target pool: [{}]", + shardRouting.shortSummary(), shardPool, node.node(), targetNodePool); + return allocation.decision(Decision.NO, NAME, + "Routing pools are incompatible. Shard pool: [%s], Node Pool: [%s]", shardPool, targetNodePool); + } + return allocation.decision(Decision.YES, NAME, + "Routing pools are compatible. Shard pool: [%s], Node Pool: [%s]", shardPool, targetNodePool); + } + + @Override + public Decision canAllocate(IndexMetadata indexMetadata, RoutingNode node, RoutingAllocation allocation) { + return canAllocateInTargetPool(indexMetadata, node.node(), allocation); + } + + @Override + public Decision canForceAllocatePrimary(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) { + logger.debug("Evaluating force allocation for primary shard."); + return canAllocate(shardRouting, node, allocation); + } + + public Decision shouldAutoExpandToNode(IndexMetadata indexMetadata, DiscoveryNode node, RoutingAllocation allocation) { + logger.debug("Evaluating node: {} for autoExpandReplica eligibility of index: {}", node, indexMetadata.getIndex()); + return canAllocateInTargetPool(indexMetadata, node, allocation); + } + + private Decision canAllocateInTargetPool(IndexMetadata indexMetadata, DiscoveryNode node, RoutingAllocation allocation) { + RoutingPool indexPool = RoutingPool.getIndexPool(indexMetadata); + RoutingPool targetNodePool = RoutingPool.getNodePool(node); + if (RoutingPool.REMOTE_CAPABLE.equals(indexPool) && RoutingPool.LOCAL_ONLY.equals(targetNodePool)) { + logger.debug("Index: [{}] has target pool: [{}]. Cannot allocate on node: [{}] with target pool: [{}]", + indexMetadata.getIndex().getName(), indexPool, node, targetNodePool); + return allocation.decision(Decision.NO, NAME, + "Routing pools are incompatible. Index pool: [%s], Node Pool: [%s]", indexPool, targetNodePool); + } + return allocation.decision(Decision.YES, NAME, + "Routing pools are compatible. Index pool: [%s], Node Pool: [%s]", indexPool, targetNodePool); + } + +} diff --git a/server/src/main/java/org/opensearch/common/util/FeatureFlags.java b/server/src/main/java/org/opensearch/common/util/FeatureFlags.java index 7297479776da9..c7d2a3a23f239 100644 --- a/server/src/main/java/org/opensearch/common/util/FeatureFlags.java +++ b/server/src/main/java/org/opensearch/common/util/FeatureFlags.java @@ -43,6 +43,9 @@ public class FeatureFlags { * and false otherwise. */ public static boolean isEnabled(String featureFlagName) { + if (SEARCHABLE_SNAPSHOT.equals(featureFlagName)) { + return true; + } return "true".equalsIgnoreCase(System.getProperty(featureFlagName)); } } diff --git a/server/src/test/java/org/opensearch/cluster/routing/AllocationIdTests.java b/server/src/test/java/org/opensearch/cluster/routing/AllocationIdTests.java index e266eacdc0320..fc008762edd35 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/AllocationIdTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/AllocationIdTests.java @@ -142,6 +142,24 @@ public void testMoveToUnassigned() { assertThat(shard.allocationId(), nullValue()); } + public void testMovePrimaryToReplica() { + logger.info("-- build started shard"); + ShardRouting shard = ShardRouting.newUnassigned( + new ShardId("test", "_na_", 0), + true, + ExistingStoreRecoverySource.INSTANCE, + new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, null) + ); + shard = shard.initialize("node1", null, -1); + shard = shard.moveToStarted(); + AllocationId originalAllocationId = shard.allocationId(); + + logger.info("-- move to replica"); + shard = shard.moveActivePrimaryToReplica(); + assertNotNull(shard.allocationId()); + assertEquals(originalAllocationId, shard.allocationId()); + } + public void testSerialization() throws IOException { AllocationId allocationId = AllocationId.newInitializing(); if (randomBoolean()) { diff --git a/server/src/test/java/org/opensearch/cluster/routing/RoutingNodesTests.java b/server/src/test/java/org/opensearch/cluster/routing/RoutingNodesTests.java index 3e9088d63cfb4..39910a27eafe6 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/RoutingNodesTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/RoutingNodesTests.java @@ -33,6 +33,7 @@ package org.opensearch.cluster.routing; import org.junit.Before; +import org.mockito.Mockito; import org.opensearch.Version; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.OpenSearchAllocationTestCase; @@ -160,4 +161,40 @@ public void testInterleavedShardIterator() { } assert shardCount == this.totalNumberOfShards; } + + public void testSwapPrimaryWithReplica() { + // Initialize all the shards for test index 1 and 2 + initPrimaries(); + startInitializingShards(TEST_INDEX_1); + startInitializingShards(TEST_INDEX_1); + startInitializingShards(TEST_INDEX_2); + startInitializingShards(TEST_INDEX_2); + + // Create primary shard count imbalance between two nodes + final RoutingNodes routingNodes = this.clusterState.getRoutingNodes(); + final RoutingNode node0 = routingNodes.node("node0"); + final RoutingNode node1 = routingNodes.node("node1"); + final List shardRoutingList = node0.shardsWithState(TEST_INDEX_1, ShardRoutingState.STARTED); + final RoutingChangesObserver routingChangesObserver = Mockito.mock(RoutingChangesObserver.class); + int swaps = 0; + + for (ShardRouting routing : shardRoutingList) { + if (routing.primary()) { + ShardRouting swap = node1.getByShardId(routing.shardId()); + routingNodes.swapPrimaryWithReplica(logger, routing, swap, routingChangesObserver); + swaps++; + } + } + Mockito.verify(routingChangesObserver, Mockito.times(swaps)).replicaPromoted(Mockito.any()); + + final List shards = node1.shardsWithState(TEST_INDEX_1, ShardRoutingState.STARTED); + int shardCount = 0; + for (ShardRouting shard: shards) { + if (shard.primary()) { + shardCount++; + } + } + + assertTrue(shardCount >= swaps); + } } diff --git a/server/src/test/java/org/opensearch/cluster/routing/ShardRoutingTests.java b/server/src/test/java/org/opensearch/cluster/routing/ShardRoutingTests.java index 1dd27cb706c64..a914ef5da31ca 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/ShardRoutingTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/ShardRoutingTests.java @@ -301,6 +301,18 @@ public void testEqualsIgnoringVersion() { } } + public void testSwapPrimaryWithReplica() { + final ShardRouting unassignedShard0 = TestShardRouting.newShardRouting("test", 0, null, false, ShardRoutingState.UNASSIGNED); + assertThrows(AssertionError.class, unassignedShard0::moveActivePrimaryToReplica); + + final ShardRouting activeShard0 = TestShardRouting.newShardRouting("test", 0, "node-1", false, ShardRoutingState.STARTED); + assertThrows(IllegalShardRoutingStateException.class, activeShard0::moveActivePrimaryToReplica); + + final ShardRouting activeShard1 = TestShardRouting.newShardRouting("test", 0, "node-1", true, ShardRoutingState.STARTED); + final ShardRouting activeReplicaShard1 = activeShard1.moveActivePrimaryToReplica(); + assertFalse(activeReplicaShard1.primary()); + } + public void testExpectedSize() throws IOException { final int iters = randomIntBetween(10, 100); for (int i = 0; i < iters; i++) { diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/RemoteShardsAllocateUnassignedTests.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/RemoteShardsAllocateUnassignedTests.java new file mode 100644 index 0000000000000..cdafcbfd428da --- /dev/null +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/RemoteShardsAllocateUnassignedTests.java @@ -0,0 +1,121 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cluster.routing.allocation; + +import com.carrotsearch.hppc.ObjectIntHashMap; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.routing.RoutingNode; +import org.opensearch.cluster.routing.RoutingNodes; +import org.opensearch.cluster.routing.RoutingPool; +import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.cluster.routing.allocation.allocator.RemoteShardsBalancer; + +import java.util.Map; + +public class RemoteShardsAllocateUnassignedTests extends RemoteShardsBalancerBaseTestCase { + + /** + * Test Remote Shards Balancer initialization. + */ + public void testInit() { + int localOnlyNodes = 7; + int remoteCapableNodes = 3; + int localIndices = 10; + int remoteIndices = 15; + ClusterState clusterState = createInitialCluster(localOnlyNodes, remoteCapableNodes, localIndices, remoteIndices); + RoutingNodes routingNodes = new RoutingNodes(clusterState, false); + RoutingAllocation allocation = getRoutingAllocation(clusterState, routingNodes); + + RemoteShardsBalancer remoteShardsBalancer = new RemoteShardsBalancer(logger, allocation); + Map unassignedShardMap = remoteShardsBalancer.groupUnassignedShardsByIndex(); + + assertEquals(remoteIndices, unassignedShardMap.size()); + for (String index : unassignedShardMap.keySet()) { + assertTrue(index.startsWith(REMOTE_IDX_PREFIX)); + RemoteShardsBalancer.UnassignedIndexShards indexShards = unassignedShardMap.get(index); + assertEquals(5, indexShards.getPrimaries().size()); + for (ShardRouting shard : indexShards.getPrimaries()) { + assertTrue(shard.primary()); + assertEquals(shard.getIndexName(), index); + } + assertEquals(5, indexShards.getReplicas().size()); + for (ShardRouting shard : indexShards.getReplicas()) { + assertFalse(shard.primary()); + assertEquals(shard.getIndexName(), index); + } + } + } + + /** + * Test remote unassigned shard allocation for standard new cluster setup. + */ + public void testPrimaryAllocation() { + int localOnlyNodes = 7; + int remoteCapableNodes = 3; + int localIndices = 10; + int remoteIndices = 13; + ClusterState clusterState = createInitialCluster(localOnlyNodes, remoteCapableNodes, localIndices, remoteIndices); + AllocationService service = this.createRemoteCapableAllocationService(); + clusterState = allocateShardsAndBalance(clusterState, service); + RoutingNodes routingNodes = clusterState.getRoutingNodes(); + RoutingAllocation allocation = getRoutingAllocation(clusterState, routingNodes); + + assertEquals(0, routingNodes.unassigned().size()); + + ObjectIntHashMap nodePrimariesCounter = new ObjectIntHashMap<>(); + for (ShardRouting shard : clusterState.getRoutingTable().allShards()) { + assertFalse(shard.unassigned()); + RoutingNode node = routingNodes.node(shard.currentNodeId()); + RoutingPool nodePool = RoutingPool.getNodePool(node); + RoutingPool shardPool = RoutingPool.getShardPool(shard, allocation); + if (RoutingPool.REMOTE_CAPABLE.equals(shardPool)) { + assertEquals(nodePool, shardPool); + } + if (RoutingPool.getNodePool(node) == RoutingPool.REMOTE_CAPABLE && shard.primary()) { + nodePrimariesCounter.putOrAdd(node.nodeId(), 1, 1); + } + } + final int indexShardLimit = (int) Math.ceil(totalPrimaries(remoteIndices) / (float) remoteCapableNodes); + for (int primaries : nodePrimariesCounter.values) { + assertTrue(primaries <= indexShardLimit); + } + } + + /** + * Test remote unassigned shard allocation when remote capable nodes fail to come up. + */ + public void testAllocationRemoteCapableNodesUnavailable() { + int localOnlyNodes = 7; + int remoteCapableNodes = 0; + int localIndices = 10; + int remoteIndices = 13; + ClusterState clusterState = createInitialCluster(localOnlyNodes, remoteCapableNodes, localIndices, remoteIndices); + AllocationService service = this.createRemoteCapableAllocationService(); + clusterState = allocateShardsAndBalance(clusterState, service); + RoutingNodes routingNodes = clusterState.getRoutingNodes(); + RoutingAllocation allocation = getRoutingAllocation(clusterState, routingNodes); + + assertEquals(totalShards(remoteIndices), routingNodes.unassigned().size()); + + for (ShardRouting shard : clusterState.getRoutingTable().allShards()) { + if (RoutingPool.getShardPool(shard, allocation) == RoutingPool.REMOTE_CAPABLE) { + assertTrue(shard.unassigned()); + } else { + assertFalse(shard.unassigned()); + RoutingNode node = routingNodes.node(shard.currentNodeId()); + assertEquals(RoutingPool.LOCAL_ONLY, RoutingPool.getNodePool(node)); + } + } + for (RoutingNode node : routingNodes) { + if (RoutingPool.getNodePool(node) == RoutingPool.REMOTE_CAPABLE) { + assertEquals(0, node.size()); + } + } + } +} diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/RemoteShardsBalancerBaseTestCase.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/RemoteShardsBalancerBaseTestCase.java new file mode 100644 index 0000000000000..e589ce4d629b0 --- /dev/null +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/RemoteShardsBalancerBaseTestCase.java @@ -0,0 +1,249 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cluster.routing.allocation; + +import org.opensearch.Version; +import org.opensearch.cluster.ClusterInfo; +import org.opensearch.cluster.ClusterModule; +import org.opensearch.cluster.ClusterName; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.DiskUsage; +import org.opensearch.cluster.EmptyClusterInfoService; +import org.opensearch.cluster.OpenSearchAllocationTestCase; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.metadata.Metadata; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.node.DiscoveryNodeRole; +import org.opensearch.cluster.node.DiscoveryNodes; +import org.opensearch.cluster.routing.RoutingNodes; +import org.opensearch.cluster.routing.RoutingTable; +import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.cluster.routing.UnassignedInfo; +import org.opensearch.cluster.routing.allocation.allocator.BalancedShardsAllocator; +import org.opensearch.cluster.routing.allocation.allocator.ShardsAllocator; +import org.opensearch.cluster.routing.allocation.decider.AllocationDecider; +import org.opensearch.cluster.routing.allocation.decider.AllocationDeciders; +import org.opensearch.common.collect.ImmutableOpenMap; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.transport.TransportAddress; +import org.opensearch.index.IndexModule; +import org.opensearch.test.gateway.TestGatewayAllocator; + +import java.net.Inet4Address; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.opensearch.cluster.routing.ShardRoutingState.INITIALIZING; + +public class RemoteShardsBalancerBaseTestCase extends OpenSearchAllocationTestCase { + protected static final String LOCAL_NODE_PREFIX = "local-only-node"; + protected static final String REMOTE_NODE_PREFIX = "remote-capable-node"; + protected static final String LOCAL_IDX_PREFIX = "local-idx"; + protected static final String REMOTE_IDX_PREFIX = "remote-idx"; + protected static final Set MANAGER_DATA_ROLES = + Set.of(DiscoveryNodeRole.CLUSTER_MANAGER_ROLE, DiscoveryNodeRole.DATA_ROLE); + protected static final Set SEARCH_DATA_ROLES = + Set.of(DiscoveryNodeRole.CLUSTER_MANAGER_ROLE, DiscoveryNodeRole.DATA_ROLE, DiscoveryNodeRole.SEARCH_ROLE); + + protected static final int PRIMARIES = 5; + protected static final int REPLICAS = 1; + private static final int MAX_REROUTE_ITERATIONS = 1000; + + protected ClusterSettings EMPTY_CLUSTER_SETTINGS = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + + public String getNodeId(int id, boolean isRemote, String prefix) { + if (isRemote) { + return REMOTE_NODE_PREFIX + "-" + prefix + "-" + id; + } + return LOCAL_NODE_PREFIX + "-" + prefix + "-" + id; + } + + public String getNodeId(int id, boolean isRemote) { + return getNodeId(id, isRemote, ""); + } + + public String getIndexName(int id, boolean isRemote, String prefix) { + if (isRemote) { + return REMOTE_IDX_PREFIX + "-" + prefix + "-" + id; + } + return LOCAL_IDX_PREFIX + "-" + prefix + "-" + id; + } + + public String getIndexName(int id, boolean isRemote) { + return getIndexName(id, isRemote, ""); + } + + public RoutingAllocation getRoutingAllocation(ClusterState clusterState, RoutingNodes routingNodes) { + return new RoutingAllocation(randomAllocationDeciders(Settings.Builder.EMPTY_SETTINGS, EMPTY_CLUSTER_SETTINGS, random()), + routingNodes, clusterState, EmptyClusterInfoService.INSTANCE.getClusterInfo(), null, System.nanoTime()); + } + + private Map createNodeAttributes(String nodeId) { + Map attr = new HashMap<>(); + attr.put("name", nodeId); + attr.put("node_id", nodeId); + return attr; + } + + public ClusterState addNodes(ClusterState clusterState, int nodeCount, boolean isRemote) { + DiscoveryNodes.Builder nb = DiscoveryNodes.builder(clusterState.nodes()); + for (int i=0; i < nodeCount; i++) { + String id = getNodeId(i, isRemote, "new"); + nb.add(newNode(id, id, isRemote ? SEARCH_DATA_ROLES : MANAGER_DATA_ROLES)); + } + return ClusterState.builder(clusterState).nodes(nb.build()).build(); + } + + public ClusterState addNodeWithIP(ClusterState clusterState, int nodeId, boolean isRemote, String IP) throws UnknownHostException { + TransportAddress ipAddress = new TransportAddress(Inet4Address.getByName(IP), 9200); + DiscoveryNodes.Builder nb = DiscoveryNodes.builder(clusterState.nodes()); + String id = getNodeId(nodeId, isRemote, "new"); + nb.add(new DiscoveryNode(id, id, ipAddress, createNodeAttributes(id), isRemote ? SEARCH_DATA_ROLES : MANAGER_DATA_ROLES, Version.CURRENT)); + return ClusterState.builder(clusterState).nodes(nb.build()).build(); + } + + public ClusterState terminateNodes(ClusterState clusterState, AllocationService service, List nodesToTerminate) { + if (nodesToTerminate.isEmpty()) { + return clusterState; + } + logger.info("Terminating following nodes from cluster: [{}]", nodesToTerminate); + DiscoveryNodes.Builder nb = DiscoveryNodes.builder(clusterState.nodes()); + nodesToTerminate.forEach(nb::remove); + clusterState = ClusterState.builder(clusterState).nodes(nb.build()).build(); + clusterState = service.disassociateDeadNodes(clusterState, false, "nodes-terminated"); + return clusterState; + } + + public ClusterState createInitialCluster(int localOnlyNodes, int remoteCapableNodes, int localIndices, int remoteIndices) { + Metadata.Builder mb = Metadata.builder(); + for (int i=0; i < localIndices; i++) { + mb.put(IndexMetadata.builder(getIndexName(i, false)) + .settings(settings(Version.CURRENT) + .put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), "0")) + .numberOfShards(PRIMARIES) + .numberOfReplicas(REPLICAS) + ); + } + + for (int i=0; i < remoteIndices; i++) { + mb.put(IndexMetadata.builder(getIndexName(i, true)) + .settings(settings(Version.CURRENT) + .put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), "0") + .put(IndexModule.INDEX_STORE_TYPE_SETTING.getKey(), IndexModule.Type.REMOTE_SNAPSHOT.getSettingsKey())) + .numberOfShards(PRIMARIES) + .numberOfReplicas(REPLICAS) + ); + } + Metadata metadata = mb.build(); + + RoutingTable.Builder rb = RoutingTable.builder(); + for (int i=0; i < localIndices; i++) { + rb.addAsNew(metadata.index(getIndexName(i, false))); + } + for (int i=0; i < remoteIndices; i++) { + rb.addAsNew(metadata.index(getIndexName(i, true))); + } + RoutingTable routingTable = rb.build(); + + DiscoveryNodes.Builder nb = DiscoveryNodes.builder(); + for (int i=0; i < localOnlyNodes; i++) { + String name = getNodeId(i, false); + nb.add(newNode(name, name, MANAGER_DATA_ROLES)); + } + for (int i=0; i < remoteCapableNodes; i++) { + String name = getNodeId(i, true); + nb.add(newNode(name, name, SEARCH_DATA_ROLES)); + } + DiscoveryNodes nodes = nb.build(); + return ClusterState.builder(ClusterName.DEFAULT).metadata(metadata).routingTable(routingTable).nodes(nodes).build(); + } + + protected ClusterState createRemoteIndex(ClusterState state, String indexName) { + Metadata metadata = Metadata.builder(state.metadata()) + .put(IndexMetadata.builder(indexName) + .settings(settings(Version.CURRENT) + .put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), "5m") + .put(IndexModule.INDEX_STORE_TYPE_SETTING.getKey(), IndexModule.Type.REMOTE_SNAPSHOT.getSettingsKey())) + .numberOfShards(PRIMARIES) + .numberOfReplicas(REPLICAS) + ).build(); + RoutingTable routingTable = RoutingTable.builder(state.routingTable()).addAsNew(metadata.index(indexName)).build(); + return ClusterState.builder(state).metadata(metadata).routingTable(routingTable).build(); + } + + private AllocationDeciders remoteAllocationDeciders(Settings settings, ClusterSettings clusterSettings) { + List deciders = new ArrayList<>( + ClusterModule.createAllocationDeciders(settings, clusterSettings, Collections.emptyList())); + Collections.shuffle(deciders, random()); + return new AllocationDeciders(deciders); + } + + public AllocationService createRemoteCapableAllocationService() { + Settings settings = Settings.Builder.EMPTY_SETTINGS; + return new OpenSearchAllocationTestCase.MockAllocationService(randomAllocationDeciders(settings, EMPTY_CLUSTER_SETTINGS, random()), + new TestGatewayAllocator(), createShardAllocator(settings), EmptyClusterInfoService.INSTANCE, SNAPSHOT_INFO_SERVICE_WITH_NO_SHARD_SIZES); + } + + public AllocationService createRemoteCapableAllocationService(String excludeNodes) { + Settings settings = Settings.builder().put("cluster.routing.allocation.exclude.node_id", excludeNodes).build(); + return new MockAllocationService(randomAllocationDeciders(settings, EMPTY_CLUSTER_SETTINGS, random()), + new TestGatewayAllocator(), createShardAllocator(settings), EmptyClusterInfoService.INSTANCE, SNAPSHOT_INFO_SERVICE_WITH_NO_SHARD_SIZES); + } + + public AllocationDeciders createAllocationDeciders() { + Settings settings = Settings.Builder.EMPTY_SETTINGS; + return randomAllocationDeciders(settings, EMPTY_CLUSTER_SETTINGS, random()); + + } + + public ClusterState allocateShardsAndBalance(ClusterState clusterState, AllocationService service) { + int iterations = 0; + do { + clusterState = service.applyStartedShards(clusterState, clusterState.getRoutingNodes().shardsWithState(INITIALIZING)); + clusterState = service.reroute(clusterState, "reroute"); + iterations++; + } while(!clusterState.getRoutingNodes().shardsWithState(INITIALIZING).isEmpty() && iterations < MAX_REROUTE_ITERATIONS); + return clusterState; + } + + public int totalShards(int indices) { + return indices * PRIMARIES * (REPLICAS + 1); + } + + public int totalPrimaries(int indices) { + return indices * PRIMARIES; + } + + public ShardsAllocator createShardAllocator(Settings settings){ + ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + return new BalancedShardsAllocator(settings, clusterSettings); + } + + /** + * ClusterInfo that always reports /dev/null for the shards' data paths. + */ + public static class DevNullClusterInfo extends ClusterInfo { + public DevNullClusterInfo(ImmutableOpenMap leastAvailableSpaceUsage, + ImmutableOpenMap mostAvailableSpaceUsage, + ImmutableOpenMap shardSizes) { + super(leastAvailableSpaceUsage, mostAvailableSpaceUsage, shardSizes, null, ImmutableOpenMap.of()); + } + + @Override + public String getDataPath(ShardRouting shardRouting) { + return "/dev/null"; + } + } +} diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/RemoteShardsMoveShardsTests.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/RemoteShardsMoveShardsTests.java new file mode 100644 index 0000000000000..b8a9539f021f0 --- /dev/null +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/RemoteShardsMoveShardsTests.java @@ -0,0 +1,95 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cluster.routing.allocation; + +import org.opensearch.Version; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.health.ClusterHealthStatus; +import org.opensearch.cluster.health.ClusterStateHealth; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.metadata.Metadata; +import org.opensearch.cluster.routing.RoutingNodes; +import org.opensearch.cluster.routing.RoutingTable; +import org.opensearch.cluster.routing.ShardRoutingState; +import org.opensearch.index.IndexModule; + +public class RemoteShardsMoveShardsTests extends RemoteShardsBalancerBaseTestCase { + + /** + * Test reroute terminates gracefully if shards cannot move out of the excluded node + */ + public void testExcludeNodeIdMoveBlocked() { + int localOnlyNodes = 7; + int remoteCapableNodes = 2; + int localIndices = 10; + int remoteIndices = 13; + ClusterState clusterState = createInitialCluster(localOnlyNodes, remoteCapableNodes, localIndices, remoteIndices); + AllocationService service = this.createRemoteCapableAllocationService(); + clusterState = allocateShardsAndBalance(clusterState, service); + assertEquals(ClusterHealthStatus.GREEN, (new ClusterStateHealth(clusterState)).getStatus()); + + // Exclude a node + final String excludedNodeID = getNodeId(0, true); + service = createRemoteCapableAllocationService(excludedNodeID); + clusterState = allocateShardsAndBalance(clusterState, service); + + RoutingNodes routingNodes = clusterState.getRoutingNodes(); + assertEquals(ClusterHealthStatus.GREEN, (new ClusterStateHealth(clusterState)).getStatus()); + assertEquals(0, routingNodes.unassigned().size()); + assertTrue(routingNodes.node(excludedNodeID).size() > 0); + } + + /** + * Test move operations for index level allocation settings. + * Supported for local indices, not supported for remote indices. + */ + public void testIndexLevelExclusions() { + int localOnlyNodes = 7; + int remoteCapableNodes = 3; + int localIndices = 10; + int remoteIndices = 13; + ClusterState clusterState = createInitialCluster(localOnlyNodes, remoteCapableNodes, localIndices, remoteIndices); + AllocationService service = this.createRemoteCapableAllocationService(); + clusterState = allocateShardsAndBalance(clusterState, service); + RoutingNodes routingNodes = clusterState.getRoutingNodes(); + + assertEquals(ClusterHealthStatus.GREEN, (new ClusterStateHealth(clusterState)).getStatus()); + assertEquals(0, routingNodes.unassigned().size()); + + final String excludedLocalOnlyNode = getNodeId(0, false); + final String excludedRemoteCapableNode = getNodeId(0, true); + final String localIndex = routingNodes.node(excludedLocalOnlyNode).shardsWithState(ShardRoutingState.STARTED).get(0).getIndexName(); + final String remoteIndex = routingNodes.node(excludedRemoteCapableNode).shardsWithState(ShardRoutingState.STARTED).get(0).getIndexName(); + + Metadata.Builder mb = Metadata.builder(clusterState.metadata()); + mb.put(IndexMetadata.builder(clusterState.metadata().index(localIndex)) + .settings(settings(Version.CURRENT) + .put("index.number_of_shards", PRIMARIES) + .put("index.number_of_replicas", REPLICAS) + .put("index.routing.allocation.exclude._name", excludedLocalOnlyNode).build())) + .put(IndexMetadata.builder(clusterState.metadata().index(remoteIndex)) + .settings(settings(Version.CURRENT) + .put("index.number_of_shards", PRIMARIES) + .put("index.number_of_replicas", REPLICAS) + .put(IndexModule.INDEX_STORE_TYPE_SETTING.getKey(), IndexModule.Type.REMOTE_SNAPSHOT.getSettingsKey()) + .put("index.routing.allocation.exclude._name", excludedRemoteCapableNode).build())); + clusterState = ClusterState.builder(clusterState).metadata(mb.build()).build(); + + clusterState = allocateShardsAndBalance(clusterState, service); + assertEquals(ClusterHealthStatus.GREEN, (new ClusterStateHealth(clusterState)).getStatus()); + RoutingTable routingTable = clusterState.routingTable(); + + // No shard of updated local index should be on excluded local capable node + assertTrue(routingTable.allShards(localIndex).stream().noneMatch(shard -> shard.currentNodeId().equals(excludedLocalOnlyNode))); + + // Since remote index shards are untouched, at least one shard should + // continue to stay on the excluded remote capable node + assertTrue(routingTable.allShards(remoteIndex).stream().anyMatch(shard -> shard.currentNodeId().equals(excludedRemoteCapableNode))); + } +} diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/RemoteShardsRebalanceShardsTests.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/RemoteShardsRebalanceShardsTests.java new file mode 100644 index 0000000000000..e82f392e407f2 --- /dev/null +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/RemoteShardsRebalanceShardsTests.java @@ -0,0 +1,78 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cluster.routing.allocation; + +import com.carrotsearch.hppc.ObjectIntHashMap; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.routing.RoutingNode; +import org.opensearch.cluster.routing.RoutingNodes; +import org.opensearch.cluster.routing.RoutingPool; +import org.opensearch.cluster.routing.ShardRouting; + +public class RemoteShardsRebalanceShardsTests extends RemoteShardsBalancerBaseTestCase { + + /** + * Test remote shard allocation and balancing for standard new cluster setup. + * + * Post rebalance primaries should be balanced across all the nodes. + */ + public void testShardAllocationAndRebalance() { + int localOnlyNodes = 20; + int remoteCapableNodes = 40; + int localIndices = 40; + int remoteIndices = 80; + ClusterState clusterState = createInitialCluster(localOnlyNodes, remoteCapableNodes, localIndices, remoteIndices); + AllocationService service = this.createRemoteCapableAllocationService(); + clusterState = allocateShardsAndBalance(clusterState, service); + RoutingNodes routingNodes = clusterState.getRoutingNodes(); + RoutingAllocation allocation = getRoutingAllocation(clusterState, routingNodes); + + ObjectIntHashMap nodePrimariesCounter = getShardCounterPerNodeForRemoteCapablePool(clusterState, allocation, true); + ObjectIntHashMap nodeReplicaCounter = getShardCounterPerNodeForRemoteCapablePool(clusterState, allocation, false); + int avgPrimariesPerNode = getTotalShardCountAcrossNodes(nodePrimariesCounter) / remoteCapableNodes; + + //Primary and replica are balanced post first reroute + for (RoutingNode node: routingNodes) { + if (RoutingPool.REMOTE_CAPABLE.equals(RoutingPool.getNodePool(node))) { + assertInRange(nodePrimariesCounter.get(node.nodeId()), avgPrimariesPerNode, remoteCapableNodes - 1); + assertTrue(nodeReplicaCounter.get(node.nodeId()) >= 0); + } + } + } + + private ObjectIntHashMap getShardCounterPerNodeForRemoteCapablePool(ClusterState clusterState, RoutingAllocation allocation, boolean primary) { + ObjectIntHashMap nodePrimariesCounter = new ObjectIntHashMap<>(); + for (ShardRouting shard : clusterState.getRoutingTable().allShards()) { + if (RoutingPool.REMOTE_CAPABLE.equals(RoutingPool.getShardPool(shard, allocation)) && shard.primary() == primary) { + nodePrimariesCounter.putOrAdd(shard.currentNodeId(), 1, 1); + } + } + return nodePrimariesCounter; + } + + private int getTotalShardCountAcrossNodes(ObjectIntHashMap nodePrimariesCounter) { + int totalShardCount = 0; + for (int value: nodePrimariesCounter.values) { + totalShardCount += value; + } + return totalShardCount; + } + + /** + * Asserts that the expected value is within the variance range. + * + * Being used to assert the average number of shards per node. + * Variance is required in case of non-absolute mean values; + * for example, total number of remote capable nodes in a cluster. + */ + private void assertInRange(int actual, int expectedMean, int variance) { + assertTrue(actual >= expectedMean - variance); + assertTrue(actual <= expectedMean + variance); + } +} diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/decider/TargetPoolAllocationDeciderTests.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/decider/TargetPoolAllocationDeciderTests.java new file mode 100644 index 0000000000000..3f615aa8f7e41 --- /dev/null +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/decider/TargetPoolAllocationDeciderTests.java @@ -0,0 +1,75 @@ +package org.opensearch.cluster.routing.allocation.decider; + +import org.opensearch.Version; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.metadata.Metadata; +import org.opensearch.cluster.routing.RoutingNode; +import org.opensearch.cluster.routing.RoutingNodes; +import org.opensearch.cluster.routing.RoutingTable; +import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.cluster.routing.allocation.AllocationService; +import org.opensearch.cluster.routing.allocation.RemoteShardsBalancerBaseTestCase; +import org.opensearch.cluster.routing.allocation.RoutingAllocation; + +import java.util.Collections; +import java.util.stream.Collectors; + +public class TargetPoolAllocationDeciderTests extends RemoteShardsBalancerBaseTestCase { + public void testTargetPoolAllocationDecisions() { + ClusterState clusterState = createInitialCluster(3, 3, 2, 2); + AllocationService service = this.createRemoteCapableAllocationService(); + clusterState = allocateShardsAndBalance(clusterState, service); + + // Add an unassigned primary shard for force allocation checks + Metadata metadata = Metadata.builder(clusterState.metadata()) + .put(IndexMetadata.builder("test_local_unassigned").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(1)).build(); + RoutingTable routingTable = RoutingTable.builder(clusterState.routingTable()).addAsNew(metadata.index("test_local_unassigned")).build(); + clusterState = ClusterState.builder(clusterState).metadata(metadata).routingTable(routingTable).build(); + + // Add remote index unassigned primary + clusterState = createRemoteIndex(clusterState, "test_remote_unassigned"); + + RoutingNodes defaultRoutingNodes = clusterState.getRoutingNodes(); + RoutingAllocation globalAllocation = getRoutingAllocation(clusterState, defaultRoutingNodes); + + ShardRouting localShard = clusterState.routingTable().allShards(getIndexName(0, false)) + .stream().filter(ShardRouting::primary).collect(Collectors.toList()).get(0); + ShardRouting remoteShard = clusterState.routingTable().allShards(getIndexName(0, true)) + .stream().filter(ShardRouting::primary).collect(Collectors.toList()).get(0); + ShardRouting unassignedLocalShard = clusterState.routingTable().allShards("test_local_unassigned") + .stream().filter(ShardRouting::primary).collect(Collectors.toList()).get(0); + ShardRouting unassignedRemoteShard = clusterState.routingTable().allShards("test_remote_unassigned") + .stream().filter(ShardRouting::primary).collect(Collectors.toList()).get(0); + IndexMetadata localIdx = globalAllocation.metadata().getIndexSafe(localShard.index()); + IndexMetadata remoteIdx = globalAllocation.metadata().getIndexSafe(remoteShard.index()); + String localNodeId = localShard.currentNodeId(); + String remoteNodeId = remoteShard.currentNodeId(); + RoutingNode localOnlyNode = defaultRoutingNodes.node(localNodeId); + RoutingNode remoteCapableNode = defaultRoutingNodes.node(remoteNodeId); + + AllocationDeciders deciders = new AllocationDeciders(Collections.singletonList(new TargetPoolAllocationDecider())); + + // Incompatible Pools + assertEquals(Decision.NO.type(), deciders.canAllocate(remoteShard, localOnlyNode, globalAllocation).type()); + assertEquals(Decision.NO.type(), deciders.canAllocate(remoteIdx, localOnlyNode, globalAllocation).type()); + assertEquals(Decision.NO.type(), deciders.canForceAllocatePrimary(unassignedRemoteShard, localOnlyNode, globalAllocation).type()); + + // Compatible Pools + assertEquals(Decision.YES.type(), deciders.canAllocate(localShard, remoteCapableNode, globalAllocation).type()); + assertEquals(Decision.YES.type(), deciders.canAllocate(localIdx, remoteCapableNode, globalAllocation).type()); + assertEquals(Decision.YES.type(), deciders.canAllocate(remoteShard, remoteCapableNode, globalAllocation).type()); + assertEquals(Decision.YES.type(), deciders.canAllocate(remoteIdx, remoteCapableNode, globalAllocation).type()); + assertEquals(Decision.YES.type(), deciders.canAllocate(localShard, localOnlyNode, globalAllocation).type()); + assertEquals(Decision.YES.type(), deciders.canAllocate(localIdx, localOnlyNode, globalAllocation).type()); + assertEquals(Decision.YES.type(), deciders.canForceAllocatePrimary(unassignedRemoteShard, remoteCapableNode, globalAllocation).type()); + assertEquals(Decision.YES.type(), deciders.canForceAllocatePrimary(unassignedLocalShard, localOnlyNode, globalAllocation).type()); + assertEquals(Decision.YES.type(), deciders.canForceAllocatePrimary(unassignedLocalShard, remoteCapableNode, globalAllocation).type()); + + // Verify only remote nodes are used for auto expand replica decision for remote index + assertEquals(Decision.YES.type(), deciders.shouldAutoExpandToNode(localIdx, remoteCapableNode.node(), globalAllocation).type()); + assertEquals(Decision.NO.type(), deciders.shouldAutoExpandToNode(remoteIdx, localOnlyNode.node(), globalAllocation).type()); + assertEquals(Decision.YES.type(), deciders.shouldAutoExpandToNode(localIdx, localOnlyNode.node(), globalAllocation).type()); + assertEquals(Decision.YES.type(), deciders.shouldAutoExpandToNode(remoteIdx, remoteCapableNode.node(), globalAllocation).type()); + } +} diff --git a/test/framework/src/main/java/org/opensearch/cluster/OpenSearchAllocationTestCase.java b/test/framework/src/main/java/org/opensearch/cluster/OpenSearchAllocationTestCase.java index a13d337fa4d26..96917593076f7 100644 --- a/test/framework/src/main/java/org/opensearch/cluster/OpenSearchAllocationTestCase.java +++ b/test/framework/src/main/java/org/opensearch/cluster/OpenSearchAllocationTestCase.java @@ -168,6 +168,9 @@ protected static DiscoveryNode newNode(String nodeId, Set rol return new DiscoveryNode(nodeId, buildNewFakeTransportAddress(), emptyMap(), roles, Version.CURRENT); } + protected static DiscoveryNode newNode(String nodeName, String nodeId, Set roles) { + return new DiscoveryNode(nodeName, nodeId, buildNewFakeTransportAddress(), emptyMap(), roles, Version.CURRENT); + } protected static DiscoveryNode newNode(String nodeId, Version version) { return new DiscoveryNode(nodeId, buildNewFakeTransportAddress(), emptyMap(), CLUSTER_MANAGER_DATA_ROLES, version); }