From 694c89fd70cf544345b7b370bce7d3498e8587d6 Mon Sep 17 00:00:00 2001 From: Kunal Kotwani Date: Fri, 21 Oct 2022 10:29:31 -0700 Subject: [PATCH] Add remote allocator for searchable snapshots Signed-off-by: Kunal Kotwani Part of this commit was developed from code and concepts initially implemented in Amazon OpenSearch Service as part of the UltraWarm feature. Thank you to the following developers and the entire UltraWarm team. Co-authored-by: Min Zhou Co-authored-by: Ankit Malpani Co-authored-by: Rohit Nair Co-authored-by: Sorabh Hamirwasia Co-authored-by: Ankit Jain Co-authored-by: Tianru Zhou Co-authored-by: Neetika Singhal Co-authored-by: Amit Khandelwal Co-authored-by: Vigya Sharma Co-authored-by: Prateek Sharma Co-authored-by: Venkata Jyothsna Donapati Co-authored-by: Vlad Rozov Co-authored-by: Mohit Agrawal Co-authored-by: Shweta Thareja Co-authored-by: Palash Hedau Co-authored-by: Saurabh Singh Co-authored-by: Piyush Daftary Co-authored-by: Payal Maheshwari Co-authored-by: Kunal Khatua Co-authored-by: Gulshan Kumar Co-authored-by: Rushi Agrawal Co-authored-by: Ketan Verma Co-authored-by: Gaurav Chandani Co-authored-by: Dharmesh Singh Signed-off-by: Kunal Kotwani --- CHANGELOG.md | 2 +- .../snapshots/SearchableSnapshotIT.java | 9 + .../org/opensearch/cluster/ClusterModule.java | 5 + .../cluster/routing/RoutingNodes.java | 29 + .../cluster/routing/RoutingPool.java | 69 ++ .../cluster/routing/ShardRouting.java | 23 + .../allocator/BalancedShardsAllocator.java | 8 + .../allocator/LocalShardsBalancer.java | 53 +- .../allocator/RemoteShardsBalancer.java | 646 ++++++++++++++++++ .../decider/TargetPoolAllocationDecider.java | 104 +++ .../cluster/routing/AllocationIdTests.java | 18 + .../cluster/routing/RoutingNodesTests.java | 37 + .../cluster/routing/ShardRoutingTests.java | 12 + .../RemoteShardsAllocateUnassignedTests.java | 123 ++++ .../RemoteShardsBalancerBaseTestCase.java | 302 ++++++++ .../RemoteShardsMoveShardsTests.java | 109 +++ .../RemoteShardsRebalanceShardsTests.java | 82 +++ .../TargetPoolAllocationDeciderTests.java | 114 ++++ .../cluster/OpenSearchAllocationTestCase.java | 4 + .../opensearch/test/InternalTestCluster.java | 39 ++ 20 files changed, 1783 insertions(+), 5 deletions(-) create mode 100644 server/src/main/java/org/opensearch/cluster/routing/RoutingPool.java create mode 100644 server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/RemoteShardsBalancer.java create mode 100644 server/src/main/java/org/opensearch/cluster/routing/allocation/decider/TargetPoolAllocationDecider.java create mode 100644 server/src/test/java/org/opensearch/cluster/routing/allocation/RemoteShardsAllocateUnassignedTests.java create mode 100644 server/src/test/java/org/opensearch/cluster/routing/allocation/RemoteShardsBalancerBaseTestCase.java create mode 100644 server/src/test/java/org/opensearch/cluster/routing/allocation/RemoteShardsMoveShardsTests.java create mode 100644 server/src/test/java/org/opensearch/cluster/routing/allocation/RemoteShardsRebalanceShardsTests.java create mode 100644 server/src/test/java/org/opensearch/cluster/routing/allocation/decider/TargetPoolAllocationDeciderTests.java diff --git a/CHANGELOG.md b/CHANGELOG.md index 91a22aab5a5b8..1432d7eec62dc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -49,7 +49,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/) - Renamed flaky tests ([#4912](https://github.com/opensearch-project/OpenSearch/pull/4912)) - Update previous release bwc version to 2.5.0 ([#5003](https://github.com/opensearch-project/OpenSearch/pull/5003)) - Use getParameterCount instead of getParameterTypes ([#4821](https://github.com/opensearch-project/OpenSearch/pull/4821)) - +- Remote shard balancer support for searchable snapshots ([#4870](https://github.com/opensearch-project/OpenSearch/pull/4870)) ### Dependencies - Bumps `log4j-core` from 2.18.0 to 2.19.0 - Bumps `reactor-netty-http` from 1.0.18 to 1.0.23 diff --git a/server/src/internalClusterTest/java/org/opensearch/snapshots/SearchableSnapshotIT.java b/server/src/internalClusterTest/java/org/opensearch/snapshots/SearchableSnapshotIT.java index c528a311e0761..0e964fdf14109 100644 --- a/server/src/internalClusterTest/java/org/opensearch/snapshots/SearchableSnapshotIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/snapshots/SearchableSnapshotIT.java @@ -53,6 +53,13 @@ protected boolean addMockInternalEngine() { return false; } + @Override + protected Settings.Builder randomRepositorySettings() { + final Settings.Builder settings = Settings.builder(); + settings.put("location", randomRepoPath()).put("compress", randomBoolean()); + return settings; + } + public void testCreateSearchableSnapshot() throws Exception { final int numReplicasIndex1 = randomIntBetween(1, 4); final int numReplicasIndex2 = randomIntBetween(0, 2); @@ -92,6 +99,8 @@ public void testCreateSearchableSnapshot() throws Exception { assertTrue(client.admin().indices().prepareDelete("test-idx-1", "test-idx-2").get().isAcknowledged()); + internalCluster().ensureAtLeastNumSearchNodes(Math.max(numReplicasIndex1, numReplicasIndex2) + 1); + logger.info("--> restore indices as 'remote_snapshot'"); client.admin() .cluster() diff --git a/server/src/main/java/org/opensearch/cluster/ClusterModule.java b/server/src/main/java/org/opensearch/cluster/ClusterModule.java index 6279447485348..ee3ef0cbc4d26 100644 --- a/server/src/main/java/org/opensearch/cluster/ClusterModule.java +++ b/server/src/main/java/org/opensearch/cluster/ClusterModule.java @@ -74,6 +74,7 @@ import org.opensearch.cluster.routing.allocation.decider.SameShardAllocationDecider; import org.opensearch.cluster.routing.allocation.decider.ShardsLimitAllocationDecider; import org.opensearch.cluster.routing.allocation.decider.SnapshotInProgressAllocationDecider; +import org.opensearch.cluster.routing.allocation.decider.TargetPoolAllocationDecider; import org.opensearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.ParseField; @@ -85,6 +86,7 @@ import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Setting.Property; import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.FeatureFlags; import org.opensearch.common.util.concurrent.ThreadContext; import org.opensearch.common.util.set.Sets; import org.opensearch.common.xcontent.NamedXContentRegistry; @@ -359,6 +361,9 @@ public static Collection createAllocationDeciders( addAllocationDecider(deciders, new ShardsLimitAllocationDecider(settings, clusterSettings)); addAllocationDecider(deciders, new AwarenessAllocationDecider(settings, clusterSettings)); addAllocationDecider(deciders, new NodeLoadAwareAllocationDecider(settings, clusterSettings)); + if (FeatureFlags.isEnabled(FeatureFlags.SEARCHABLE_SNAPSHOT)) { + addAllocationDecider(deciders, new TargetPoolAllocationDecider()); + } clusterPlugins.stream() .flatMap(p -> p.createAllocationDeciders(settings, clusterSettings).stream()) diff --git a/server/src/main/java/org/opensearch/cluster/routing/RoutingNodes.java b/server/src/main/java/org/opensearch/cluster/routing/RoutingNodes.java index 986df494917c0..17267d5474738 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/RoutingNodes.java +++ b/server/src/main/java/org/opensearch/cluster/routing/RoutingNodes.java @@ -702,6 +702,23 @@ assert node(failedShard.currentNodeId()).getByShardId(failedShard.shardId()) == + " was matched but wasn't removed"; } + public void swapPrimaryWithReplica( + Logger logger, + ShardRouting primaryShard, + ShardRouting replicaShard, + RoutingChangesObserver changes + ) { + assert primaryShard.primary() : "Invalid primary shard provided"; + assert !replicaShard.primary() : "Invalid Replica shard provided"; + + ShardRouting newPrimary = primaryShard.moveActivePrimaryToReplica(); + ShardRouting newReplica = replicaShard.moveActiveReplicaToPrimary(); + updateAssigned(primaryShard, newPrimary); + updateAssigned(replicaShard, newReplica); + logger.info("Swap relocation performed for shard [{}]", newPrimary.shortSummary()); + changes.replicaPromoted(newPrimary); + } + private void unassignPrimaryAndPromoteActiveReplicaIfExists( ShardRouting failedShard, UnassignedInfo unassignedInfo, @@ -1127,6 +1144,18 @@ public ShardRouting[] drain() { primaries = 0; return mutableShardRoutings; } + + /** + * Drains all ignored shards and returns it. + * This method will not drain unassigned shards. + */ + public ShardRouting[] drainIgnored() { + nodes.ensureMutable(); + ShardRouting[] mutableShardRoutings = ignored.toArray(new ShardRouting[ignored.size()]); + ignored.clear(); + ignoredPrimaries = 0; + return mutableShardRoutings; + } } /** diff --git a/server/src/main/java/org/opensearch/cluster/routing/RoutingPool.java b/server/src/main/java/org/opensearch/cluster/routing/RoutingPool.java new file mode 100644 index 0000000000000..1a3c366694221 --- /dev/null +++ b/server/src/main/java/org/opensearch/cluster/routing/RoutingPool.java @@ -0,0 +1,69 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cluster.routing; + +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.routing.allocation.RoutingAllocation; +import org.opensearch.common.settings.Settings; +import org.opensearch.index.IndexModule; + +/** + * {@link RoutingPool} defines the different node types based on the assigned capabilities. The methods + * help decide the capabilities of a specific node as well as an index or shard based on the index configuration. + * These methods help with allocation decisions and determining shard classification with the allocation process. + * + * @opensearch.internal + */ +public enum RoutingPool { + LOCAL_ONLY, + REMOTE_CAPABLE; + + /** + * Helps to determine the appropriate {@link RoutingPool} for a given node from the {@link RoutingNode} + */ + public static RoutingPool getNodePool(RoutingNode node) { + return getNodePool(node.node()); + } + + /** + * Helps to determine the appropriate {@link RoutingPool} for a given node from the {@link DiscoveryNode} + */ + public static RoutingPool getNodePool(DiscoveryNode node) { + if (node.isSearchNode()) { + return REMOTE_CAPABLE; + } + return LOCAL_ONLY; + } + + /** + * Can determine the appropriate {@link RoutingPool} for a given shard using the {@link IndexMetadata} for the + * index using the {@link RoutingAllocation}. + * @param shard the shard routing for which {@link RoutingPool} has to be determined. + * @param allocation the current allocation of the cluster + * @return {@link RoutingPool} for the given shard. + */ + public static RoutingPool getShardPool(ShardRouting shard, RoutingAllocation allocation) { + IndexMetadata indexMetadata = allocation.metadata().getIndexSafe(shard.index()); + return getIndexPool(indexMetadata); + } + + /** + * Can determine the appropriate {@link RoutingPool} for a given index using the {@link IndexMetadata}. + * @param indexMetadata the index metadata object for which {@link RoutingPool} has to be determined. + * @return {@link RoutingPool} for the given index. + */ + public static RoutingPool getIndexPool(IndexMetadata indexMetadata) { + Settings indexSettings = indexMetadata.getSettings(); + if (IndexModule.Type.REMOTE_SNAPSHOT.getSettingsKey().equals(indexSettings.get(IndexModule.INDEX_STORE_TYPE_SETTING.getKey()))) { + return REMOTE_CAPABLE; + } + return LOCAL_ONLY; + } +} diff --git a/server/src/main/java/org/opensearch/cluster/routing/ShardRouting.java b/server/src/main/java/org/opensearch/cluster/routing/ShardRouting.java index e3aa2a666d454..5945c7d17a69a 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/ShardRouting.java +++ b/server/src/main/java/org/opensearch/cluster/routing/ShardRouting.java @@ -534,6 +534,29 @@ public ShardRouting moveToStarted() { ); } + /** + * Make the active primary shard as replica + * + * @throws IllegalShardRoutingStateException if shard is already a replica + */ + public ShardRouting moveActivePrimaryToReplica() { + assert active() : "expected an active shard " + this; + if (!primary) { + throw new IllegalShardRoutingStateException(this, "Not a primary shard, can't move to replica"); + } + return new ShardRouting( + shardId, + currentNodeId, + relocatingNodeId, + false, + state, + recoverySource, + unassignedInfo, + allocationId, + expectedShardSize + ); + } + /** * Make the active shard primary unless it's not primary * diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java index 42c8f7987bf3d..d8761e9b1a78e 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java @@ -50,6 +50,7 @@ import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Setting.Property; import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.FeatureFlags; import java.util.HashMap; import java.util.HashSet; @@ -145,6 +146,13 @@ public void allocate(RoutingAllocation allocation) { localShardsBalancer.allocateUnassigned(); localShardsBalancer.moveShards(); localShardsBalancer.balance(); + + if (FeatureFlags.isEnabled(FeatureFlags.SEARCHABLE_SNAPSHOT)) { + final ShardsBalancer remoteShardsBalancer = new RemoteShardsBalancer(logger, allocation); + remoteShardsBalancer.allocateUnassigned(); + remoteShardsBalancer.moveShards(); + remoteShardsBalancer.balance(); + } } @Override diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/LocalShardsBalancer.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/LocalShardsBalancer.java index 53d7c827392d5..3c5e4013748af 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/LocalShardsBalancer.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/LocalShardsBalancer.java @@ -15,6 +15,7 @@ import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.routing.RoutingNode; import org.opensearch.cluster.routing.RoutingNodes; +import org.opensearch.cluster.routing.RoutingPool; import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.cluster.routing.ShardRoutingState; import org.opensearch.cluster.routing.UnassignedInfo; @@ -27,9 +28,11 @@ import org.opensearch.cluster.routing.allocation.decider.Decision; import org.opensearch.cluster.routing.allocation.decider.DiskThresholdDecider; import org.opensearch.common.collect.Tuple; +import org.opensearch.common.util.FeatureFlags; import org.opensearch.gateway.PriorityComparator; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.Comparator; import java.util.HashMap; @@ -38,6 +41,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; import java.util.stream.StreamSupport; import static org.opensearch.cluster.routing.ShardRoutingState.RELOCATING; @@ -102,6 +106,10 @@ public float avgShardsPerNode(String index) { */ @Override public float avgShardsPerNode() { + if (FeatureFlags.isEnabled(FeatureFlags.SEARCHABLE_SNAPSHOT)) { + float totalShards = nodes.values().stream().map(BalancedShardsAllocator.ModelNode::numShards).reduce(0, Integer::sum); + return totalShards / nodes.size(); + } return avgShardsPerNode; } @@ -172,6 +180,11 @@ void balance() { */ @Override MoveDecision decideRebalance(final ShardRouting shard) { + if (FeatureFlags.isEnabled(FeatureFlags.SEARCHABLE_SNAPSHOT) + && RoutingPool.REMOTE_CAPABLE.equals(RoutingPool.getShardPool(shard, allocation))) { + return MoveDecision.NOT_TAKEN; + } + if (shard.started() == false) { // we can only rebalance started shards return MoveDecision.NOT_TAKEN; @@ -441,7 +454,19 @@ private void balanceByWeights() { * to the nodes we relocated them from. */ private String[] buildWeightOrderedIndices() { - final String[] indices = allocation.routingTable().indicesRouting().keys().toArray(String.class); + final String[] indices; + if (FeatureFlags.isEnabled(FeatureFlags.SEARCHABLE_SNAPSHOT)) { + final List localIndices = new ArrayList<>(); + for (String index : allocation.routingTable().indicesRouting().keys().toArray(String.class)) { + if (RoutingPool.LOCAL_ONLY.equals(RoutingPool.getIndexPool(metadata.index(index)))) { + localIndices.add(index); + } + } + indices = localIndices.toArray(new String[0]); + } else { + indices = allocation.routingTable().indicesRouting().keys().toArray(String.class); + } + final float[] deltas = new float[indices.length]; for (int i = 0; i < deltas.length; i++) { sorter.reset(indices[i]); @@ -507,7 +532,7 @@ void moveShards() { // shard movements, the goal of this iteration order is to achieve a fairer movement of shards from the nodes that are // offloading the shards. - // Trying to eliminate target nodes so that we donot unnecessarily iterate over source nodes + // Trying to eliminate target nodes so that we do not unnecessarily iterate over source nodes // when no target is eligible for (BalancedShardsAllocator.ModelNode currentNode : sorter.modelNodes) { checkAndAddInEligibleTargetNode(currentNode.getRoutingNode()); @@ -533,6 +558,11 @@ void moveShards() { ShardRouting shardRouting = it.next(); + if (FeatureFlags.isEnabled(FeatureFlags.SEARCHABLE_SNAPSHOT) + && RoutingPool.REMOTE_CAPABLE.equals(RoutingPool.getShardPool(shardRouting, allocation))) { + continue; + } + // Ensure that replicas don't relocate if primaries are being throttled and primary first is enabled if (movePrimaryFirst && primariesThrottled && !shardRouting.primary()) { logger.info( @@ -593,6 +623,11 @@ void moveShards() { */ @Override MoveDecision decideMove(final ShardRouting shardRouting) { + if (FeatureFlags.isEnabled(FeatureFlags.SEARCHABLE_SNAPSHOT) + && RoutingPool.REMOTE_CAPABLE.equals(RoutingPool.getShardPool(shardRouting, allocation))) { + return MoveDecision.NOT_TAKEN; + } + if (shardRouting.started() == false) { // we can only move started shards return MoveDecision.NOT_TAKEN; @@ -680,7 +715,7 @@ private Map buildModelFromAssigned() for (ShardRouting shard : rn) { assert rn.nodeId().equals(shard.currentNodeId()); /* we skip relocating shards here since we expect an initializing shard with the same id coming in */ - if (shard.state() != RELOCATING) { + if (RoutingPool.LOCAL_ONLY.equals(RoutingPool.getShardPool(shard, allocation)) && shard.state() != RELOCATING) { node.addShard(shard); if (logger.isTraceEnabled()) { logger.trace("Assigned shard [{}] to node [{}]", shard, node.getNodeId()); @@ -735,7 +770,17 @@ void allocateUnassigned() { * if we allocate for instance (0, R, IDX1) we move the second replica to the secondary array and proceed with * the next replica. If we could not find a node to allocate (0,R,IDX1) we move all it's replicas to ignoreUnassigned. */ - ShardRouting[] primary = unassigned.drain(); + ShardRouting[] unassignedShards = unassigned.drain(); + if (FeatureFlags.isEnabled(FeatureFlags.SEARCHABLE_SNAPSHOT)) { + List allUnassignedShards = Arrays.stream(unassignedShards).collect(Collectors.toList()); + List localUnassignedShards = allUnassignedShards.stream() + .filter(shard -> RoutingPool.LOCAL_ONLY.equals(RoutingPool.getShardPool(shard, allocation))) + .collect(Collectors.toList()); + allUnassignedShards.removeAll(localUnassignedShards); + allUnassignedShards.forEach(shard -> routingNodes.unassigned().add(shard)); + unassignedShards = localUnassignedShards.toArray(new ShardRouting[localUnassignedShards.size()]); + } + ShardRouting[] primary = unassignedShards; ShardRouting[] secondary = new ShardRouting[primary.length]; int secondaryLength = 0; int primaryLength = primary.length; diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/RemoteShardsBalancer.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/RemoteShardsBalancer.java new file mode 100644 index 0000000000000..efb2c5a27a12f --- /dev/null +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/RemoteShardsBalancer.java @@ -0,0 +1,646 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cluster.routing.allocation.allocator; + +import com.carrotsearch.hppc.ObjectIntHashMap; +import org.apache.logging.log4j.Logger; +import org.opensearch.cluster.routing.RoutingNode; +import org.opensearch.cluster.routing.RoutingNodes; +import org.opensearch.cluster.routing.RoutingPool; +import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.cluster.routing.UnassignedInfo; +import org.opensearch.cluster.routing.allocation.AllocateUnassignedDecision; +import org.opensearch.cluster.routing.allocation.MoveDecision; +import org.opensearch.cluster.routing.allocation.RoutingAllocation; +import org.opensearch.cluster.routing.allocation.decider.Decision; +import org.opensearch.cluster.routing.allocation.decider.DiskThresholdDecider; +import org.opensearch.common.Randomness; + +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.Set; + +/** + * A {@link RemoteShardsBalancer} used by the {@link BalancedShardsAllocator} to perform allocation operations + * for remote shards within the cluster. + * + * @opensearch.internal + */ +public final class RemoteShardsBalancer extends ShardsBalancer { + private final Logger logger; + private final RoutingAllocation allocation; + private final RoutingNodes routingNodes; + + public RemoteShardsBalancer(Logger logger, RoutingAllocation allocation) { + this.logger = logger; + this.allocation = allocation; + this.routingNodes = allocation.routingNodes(); + } + + /** + * Allocates unassigned remote shards on the routing node which are filtered using + * {@link #groupUnassignedShardsByIndex} + */ + @Override + void allocateUnassigned() { + if (routingNodes.unassigned().isEmpty()) { + logger.debug("No unassigned remote shards found."); + return; + } + + Queue nodeQueue = getShuffledRemoteNodes(); + if (nodeQueue.isEmpty()) { + logger.debug("No remote searcher nodes available for unassigned remote shards."); + failUnattemptedShards(); + return; + } + + Map unassignedShardMap = groupUnassignedShardsByIndex(); + allocateUnassignedPrimaries(nodeQueue, unassignedShardMap); + allocateUnassignedReplicas(nodeQueue, unassignedShardMap); + ignoreRemainingShards(unassignedShardMap); + } + + /** + * Performs shard movement for incompatible remote shards + */ + @Override + void moveShards() { + Queue eligibleNodes = new ArrayDeque<>(); + Queue excludedNodes = new ArrayDeque<>(); + classifyNodesForShardMovement(eligibleNodes, excludedNodes); + + if (excludedNodes.isEmpty()) { + logger.debug("No excluded nodes found. Returning..."); + return; + } + + while (!eligibleNodes.isEmpty() && !excludedNodes.isEmpty()) { + RoutingNode sourceNode = excludedNodes.poll(); + for (ShardRouting ineligibleShard : sourceNode) { + if (ineligibleShard.started() == false) { + continue; + } + + if (!RoutingPool.REMOTE_CAPABLE.equals(RoutingPool.getShardPool(ineligibleShard, allocation))) { + continue; + } + + if (eligibleNodes.isEmpty()) { + break; + } + + tryShardMovementToEligibleNode(eligibleNodes, ineligibleShard); + } + } + } + + /** + * Classifies the nodes into eligible and excluded depending on whether node is able or unable for shard assignment + * @param eligibleNodes contains the list of classified nodes eligible to accept shards + * @param excludedNodes contains the list of classified nodes that are unable for assigning shards + */ + private void classifyNodesForShardMovement(Queue eligibleNodes, Queue excludedNodes) { + List remoteRoutingNodes = getRemoteRoutingNodes(); + int throttledNodeCount = 0; + for (RoutingNode node : remoteRoutingNodes) { + Decision nodeDecision = allocation.deciders().canAllocateAnyShardToNode(node, allocation); + /* canAllocateAnyShardToNode decision can be THROTTLE for throttled nodes. To classify + * as excluded nodes, we look for Decision.Type.NO + */ + if (nodeDecision.type() == Decision.Type.NO) { + excludedNodes.add(node); + } else if (nodeDecision.type() == Decision.Type.YES) { + eligibleNodes.add(node); + } else { + throttledNodeCount++; + } + logger.debug( + "Excluded Node Count: [{}], Eligible Node Count: [{}], Throttled Node Count: [{}]", + excludedNodes.size(), + eligibleNodes.size(), + throttledNodeCount + ); + } + } + + /** + * Tries to move a shard assigned to an excluded node to an eligible node. + * + * @param eligibleNodes set of nodes that are still accepting shards + * @param shard the ineligible shard to be moved + */ + private void tryShardMovementToEligibleNode(Queue eligibleNodes, ShardRouting shard) { + Set nodesCheckedForShard = new HashSet<>(); + while (!eligibleNodes.isEmpty()) { + RoutingNode targetNode = eligibleNodes.poll(); + Decision currentShardDecision = allocation.deciders().canAllocate(shard, targetNode, allocation); + + if (currentShardDecision.type() == Decision.Type.YES) { + if (logger.isDebugEnabled()) { + logger.debug( + "Moving shard: {} from node: [{}] to node: [{}]", + shardShortSummary(shard), + shard.currentNodeId(), + targetNode.nodeId() + ); + } + routingNodes.relocateShard( + shard, + targetNode.nodeId(), + allocation.clusterInfo().getShardSize(shard, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE), + allocation.changes() + ); + eligibleNodes.offer(targetNode); + break; + } else { + if (logger.isTraceEnabled()) { + logger.trace( + "Cannot move shard: {} to node: [{}]. Decisions: [{}]", + shardShortSummary(shard), + targetNode.nodeId(), + currentShardDecision.getDecisions() + ); + } + + Decision nodeLevelDecision = allocation.deciders().canAllocateAnyShardToNode(targetNode, allocation); + if (nodeLevelDecision.type() == Decision.Type.YES) { + logger.debug("Node: [{}] can still accept shards. Adding it back to the queue.", targetNode.nodeId()); + eligibleNodes.offer(targetNode); + nodesCheckedForShard.add(targetNode.nodeId()); + } else { + logger.debug("Node: [{}] cannot accept any more shards. Removing it from queue.", targetNode.nodeId()); + } + + // Break out if all nodes in the queue have been checked for this shard + if (eligibleNodes.stream().allMatch(rn -> nodesCheckedForShard.contains(rn.nodeId()))) { + break; + } + } + } + } + + /** + * Performs heuristic, naive weight-based balancing for remote shards within the cluster by using average nodes per + * cluster as the metric for shard distribution. + * It does so without accounting for the local shards located on any nodes within the cluster. + */ + @Override + void balance() { + List remoteRoutingNodes = getRemoteRoutingNodes(); + logger.trace("Performing balancing for remote shards."); + + if (remoteRoutingNodes.isEmpty()) { + logger.info("No eligible remote nodes found to perform balancing"); + return; + } + + ObjectIntHashMap nodePrimaryShardCount = calculateNodePrimaryShardCount(remoteRoutingNodes); + int totalPrimaryShardCount = Arrays.stream(nodePrimaryShardCount.values).sum(); + + totalPrimaryShardCount += routingNodes.unassigned().getNumPrimaries(); + int avgPrimaryPerNode = (totalPrimaryShardCount + routingNodes.size() - 1) / routingNodes.size(); + + ArrayDeque sourceNodes = new ArrayDeque<>(); + ArrayDeque targetNodes = new ArrayDeque<>(); + for (RoutingNode node : remoteRoutingNodes) { + if (nodePrimaryShardCount.get(node.nodeId()) > avgPrimaryPerNode) { + sourceNodes.add(node); + } else if (nodePrimaryShardCount.get(node.nodeId()) < avgPrimaryPerNode) { + targetNodes.add(node); + } + } + + while (!sourceNodes.isEmpty() && !targetNodes.isEmpty()) { + RoutingNode sourceNode = sourceNodes.poll(); + tryRebalanceNode(sourceNode, targetNodes, avgPrimaryPerNode, nodePrimaryShardCount); + } + } + + /** + * Calculates the total number of primary shards per node. + * @param remoteRoutingNodes routing nodes for which the aggregation needs to be performed + * @return map of node id to primary shard count + */ + private ObjectIntHashMap calculateNodePrimaryShardCount(List remoteRoutingNodes) { + ObjectIntHashMap primaryShardCount = new ObjectIntHashMap<>(); + for (RoutingNode node : remoteRoutingNodes) { + int totalPrimaryShardsPerNode = 0; + for (ShardRouting shard : node) { + if (RoutingPool.REMOTE_CAPABLE.equals(RoutingPool.getShardPool(shard, allocation)) + && shard.primary() + && (shard.initializing() || shard.started())) { + totalPrimaryShardsPerNode++; + } + } + primaryShardCount.put(node.nodeId(), totalPrimaryShardsPerNode); + } + return primaryShardCount; + } + + @Override + AllocateUnassignedDecision decideAllocateUnassigned(ShardRouting shardRouting) { + throw new UnsupportedOperationException("remote shards balancer does not support decision operations"); + } + + @Override + MoveDecision decideMove(ShardRouting shardRouting) { + throw new UnsupportedOperationException("remote shards balancer does not support decision operations"); + } + + @Override + MoveDecision decideRebalance(ShardRouting shardRouting) { + throw new UnsupportedOperationException("remote shards balancer does not support decision operations"); + } + + /** + * Groups unassigned shards within the allocation based on the index. + * @return {@link UnassignedIndexShards} grouped by index name + */ + public Map groupUnassignedShardsByIndex() { + HashMap unassignedShardMap = new HashMap<>(); + unassignIgnoredRemoteShards(allocation); + for (ShardRouting shard : routingNodes.unassigned().drain()) { + String index = shard.getIndexName(); + if (!RoutingPool.REMOTE_CAPABLE.equals(RoutingPool.getShardPool(shard, allocation))) { + routingNodes.unassigned().add(shard); + continue; + } + if (!unassignedShardMap.containsKey(index)) { + unassignedShardMap.put(index, new UnassignedIndexShards()); + } + unassignedShardMap.get(index).addShard(shard); + } + return unassignedShardMap; + } + + /** + * Unassigned shards from {@link LocalShardsBalancer} are ignored since the balancer cannot allocate remote shards. + * Prior to allocation operations done by {@link RemoteShardsBalancer}, the ignored remote shards are moved back to + * unassigned status. + */ + private void unassignIgnoredRemoteShards(RoutingAllocation routingAllocation) { + RoutingNodes.UnassignedShards unassignedShards = routingAllocation.routingNodes().unassigned(); + for (ShardRouting shard : unassignedShards.drainIgnored()) { + RoutingPool pool = RoutingPool.getShardPool(shard, routingAllocation); + if (pool == RoutingPool.REMOTE_CAPABLE && shard.unassigned() && (shard.primary() || !shard.unassignedInfo().isDelayed())) { + unassignedShards.add(shard); + } else { + unassignedShards.ignoreShard(shard, shard.unassignedInfo().getLastAllocationStatus(), routingAllocation.changes()); + } + } + } + + private void allocateUnassignedPrimaries(Queue nodeQueue, Map unassignedShardMap) { + allocateUnassignedShards(true, nodeQueue, unassignedShardMap); + } + + private void allocateUnassignedReplicas(Queue nodeQueue, Map unassignedShardMap) { + allocateUnassignedShards(false, nodeQueue, unassignedShardMap); + } + + private void ignoreRemainingShards(Map unassignedShardMap) { + for (UnassignedIndexShards indexShards : unassignedShardMap.values()) { + for (ShardRouting shard : indexShards.getPrimaries()) { + routingNodes.unassigned().ignoreShard(shard, UnassignedInfo.AllocationStatus.DECIDERS_NO, allocation.changes()); + } + for (ShardRouting shard : indexShards.getReplicas()) { + routingNodes.unassigned().ignoreShard(shard, UnassignedInfo.AllocationStatus.DECIDERS_NO, allocation.changes()); + } + } + } + + private void allocateUnassignedShards( + boolean primaries, + Queue nodeQueue, + Map unassignedShardMap + ) { + logger.debug("Allocating unassigned {}. Nodes available in queue: [{}]", (primaries ? "primaries" : "replicas"), nodeQueue.size()); + + // Iterate through all shards index by index and allocate them + for (String index : unassignedShardMap.keySet()) { + if (nodeQueue.isEmpty()) { + break; + } + + UnassignedIndexShards indexShards = unassignedShardMap.get(index); + Queue shardsToAllocate = primaries ? indexShards.getPrimaries() : indexShards.getReplicas(); + if (shardsToAllocate.isEmpty()) { + continue; + } + logger.debug("Allocating shards for index: [{}]", index); + + while (!shardsToAllocate.isEmpty() && !nodeQueue.isEmpty()) { + ShardRouting shard = shardsToAllocate.poll(); + if (shard.assignedToNode()) { + if (logger.isDebugEnabled()) { + logger.debug("Shard: {} already assigned to node: [{}]", shardShortSummary(shard), shard.currentNodeId()); + } + continue; + } + + Decision shardLevelDecision = allocation.deciders().canAllocate(shard, allocation); + if (shardLevelDecision.type() == Decision.Type.NO) { + if (logger.isDebugEnabled()) { + logger.debug( + "Ignoring shard: [{}] as is cannot be allocated to any node. Shard level decisions: [{}][{}].", + shardShortSummary(shard), + shardLevelDecision.getDecisions(), + shardLevelDecision.getExplanation() + ); + } + routingNodes.unassigned().ignoreShard(shard, UnassignedInfo.AllocationStatus.DECIDERS_NO, allocation.changes()); + continue; + } + + tryAllocateUnassignedShard(nodeQueue, shard); + } + } + } + + /** + * Tries to allocate an unassigned shard to one of the nodes within the node queue. + * @param nodeQueue ordered list of nodes to try allocation + * @param shard the unassigned shard which needs to be allocated + */ + private void tryAllocateUnassignedShard(Queue nodeQueue, ShardRouting shard) { + boolean allocated = false; + boolean throttled = false; + Set nodesCheckedForShard = new HashSet<>(); + while (!nodeQueue.isEmpty()) { + RoutingNode node = nodeQueue.poll(); + Decision allocateDecision = allocation.deciders().canAllocate(shard, node, allocation); + nodesCheckedForShard.add(node.nodeId()); + if (allocateDecision.type() == Decision.Type.YES) { + if (logger.isTraceEnabled()) { + logger.trace("Assigned shard [{}] to [{}]", shardShortSummary(shard), node.nodeId()); + } + final long shardSize = DiskThresholdDecider.getExpectedShardSize( + shard, + ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE, + allocation.clusterInfo(), + allocation.snapshotShardSizeInfo(), + allocation.metadata(), + allocation.routingTable() + ); + ShardRouting initShard = routingNodes.initializeShard(shard, node.nodeId(), null, shardSize, allocation.changes()); + nodeQueue.offer(node); + allocated = true; + break; + } else { + if (logger.isTraceEnabled()) { + logger.trace( + "Cannot allocate shard: {} on node [{}]. Decisions: [{}]", + shardShortSummary(shard), + node.nodeId(), + allocateDecision.getDecisions() + ); + } + throttled = throttled || allocateDecision.type() == Decision.Type.THROTTLE; + + Decision nodeLevelDecision = allocation.deciders().canAllocateAnyShardToNode(node, allocation); + if (nodeLevelDecision.type() == Decision.Type.YES) { + if (logger.isTraceEnabled()) { + logger.trace( + "Node: [{}] can still accept shards, retaining it in queue - [{}]", + node.nodeId(), + nodeLevelDecision.getDecisions() + ); + } + nodeQueue.offer(node); + } else { + if (logger.isTraceEnabled()) { + logger.trace( + "Cannot allocate any shard to node: [{}]. Removing from queue. Node level decisions: [{}],[{}]", + node.nodeId(), + nodeLevelDecision.getDecisions(), + nodeLevelDecision.getExplanation() + ); + } + } + + // Break out if all nodes in the queue have been checked for this shard + if (nodeQueue.stream().allMatch(rn -> nodesCheckedForShard.contains(rn.nodeId()))) { + throttled = true; + break; + } + } + } + + if (!allocated) { + UnassignedInfo.AllocationStatus status = throttled + ? UnassignedInfo.AllocationStatus.DECIDERS_THROTTLED + : UnassignedInfo.AllocationStatus.DECIDERS_NO; + routingNodes.unassigned().ignoreShard(shard, status, allocation.changes()); + } + } + + private void tryRebalanceNode( + RoutingNode sourceNode, + ArrayDeque targetNodes, + int avgPrimary, + ObjectIntHashMap primaryCount + ) { + long shardsToBalance = primaryCount.get(sourceNode.nodeId()) - avgPrimary; + assert shardsToBalance >= 0 : "Shards to balance should be greater than 0, but found negative"; + Iterator shardIterator = sourceNode.copyShards().iterator(); + Set nodesCheckedForRelocation = new HashSet<>(); + + // Try to relocate the valid shards on the sourceNode, one at a time; + // until either sourceNode is balanced OR no more active primary shard available OR all the target nodes are exhausted + while (shardsToBalance > 0 && shardIterator.hasNext() && !targetNodes.isEmpty()) { + // Find an active primary shard to relocate + ShardRouting shard = shardIterator.next(); + if (!shard.started() || !shard.primary() || !RoutingPool.REMOTE_CAPABLE.equals(RoutingPool.getShardPool(shard, allocation))) { + continue; + } + + while (!targetNodes.isEmpty()) { + // Find a valid target node that can accommodate the current shard relocation + RoutingNode targetNode = targetNodes.poll(); + if (primaryCount.get(targetNode.nodeId()) >= avgPrimary) { + logger.trace("Avg shard limit reached for node: [{}]. Removing from queue.", targetNode.nodeId()); + continue; + } + + // Try relocate the shard on the target node + Decision rebalanceDecision = tryRelocateShard(shard, targetNode); + + if (rebalanceDecision.type() == Decision.Type.YES) { + shardsToBalance--; + primaryCount.addTo(targetNode.nodeId(), 1); + targetNodes.offer(targetNode); + break; + + // If the relocation attempt failed for the shard, check if the target node can accommodate any other shard; else remove + // the target node from the target list + } else { + Decision nodeDecision = allocation.deciders().canAllocateAnyShardToNode(targetNode, allocation); + if (nodeDecision.type() == Decision.Type.YES) { + targetNodes.offer(targetNode); + nodesCheckedForRelocation.add(targetNode.nodeId()); + } else { + if (logger.isTraceEnabled()) { + logger.trace( + "Cannot allocate any shard to node: [{}]. Removing from queue. Node level decisions: [{}],[{}]", + targetNode.nodeId(), + nodeDecision.getDecisions(), + nodeDecision.toString() + ); + } + } + } + + // If all the target nodes are exhausted for the current shard; skip to next shard + if (targetNodes.stream().allMatch(node -> nodesCheckedForRelocation.contains(node.nodeId()))) { + break; + } + } + } + } + + /** + * For every primary shard for which this method is invoked, + * swap is attempted with the destination node in case replica shard is present. + * In case replica is not present, relocation of the shard id performed. + */ + private Decision tryRelocateShard(ShardRouting shard, RoutingNode destinationNode) { + // Check if there is already a replica for the shard on the destination node. + // Then we can directly swap the replica with the primary shards. + // Invariant: We only allow swap relocation on remote shards. + ShardRouting replicaShard = destinationNode.getByShardId(shard.shardId()); + if (replicaShard != null) { + assert !replicaShard.primary() : "Primary Shard found while expected Replica during shard rebalance"; + return executeSwapShard(shard, replicaShard, allocation); + } + + // Since no replica present on the destinationNode; try relocating the shard to the destination node + Decision allocationDecision = allocation.deciders().canAllocate(shard, destinationNode, allocation); + Decision rebalanceDecision = allocation.deciders().canRebalance(shard, allocation); + logger.trace( + "Relocating shard [{}] from node [{}] to node [{}]. AllocationDecision: [{}]. AllocationExplanation: [{}]. " + + "RebalanceDecision: [{}]. RebalanceExplanation: [{}]", + shard.id(), + shard.currentNodeId(), + destinationNode.nodeId(), + allocationDecision.type(), + allocationDecision.toString(), + rebalanceDecision.type(), + rebalanceDecision.toString() + ); + + // Perform the relocation of allocation and rebalance decisions are YES + if ((allocationDecision.type() == Decision.Type.YES) && (rebalanceDecision.type() == Decision.Type.YES)) { + final long shardSize = allocation.clusterInfo().getShardSize(shard, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE); + ShardRouting targetShard = routingNodes.relocateShard(shard, destinationNode.nodeId(), shardSize, allocation.changes()).v2(); + logger.info("Relocated shard [{}] to node [{}] during primary Rebalance", shard, targetShard.currentNodeId()); + return Decision.YES; + } + + if ((allocationDecision.type() == Decision.Type.THROTTLE) || (rebalanceDecision.type() == Decision.Type.THROTTLE)) { + return Decision.THROTTLE; + } + + return Decision.NO; + } + + private Decision executeSwapShard(ShardRouting primaryShard, ShardRouting replicaShard, RoutingAllocation allocation) { + if (!replicaShard.started()) { + return new Decision.Single(Decision.Type.NO); + } + + allocation.routingNodes().swapPrimaryWithReplica(logger, primaryShard, replicaShard, allocation.changes()); + return new Decision.Single(Decision.Type.YES); + } + + private void failUnattemptedShards() { + RoutingNodes.UnassignedShards.UnassignedIterator unassignedIterator = routingNodes.unassigned().iterator(); + while (unassignedIterator.hasNext()) { + ShardRouting shard = unassignedIterator.next(); + UnassignedInfo unassignedInfo = shard.unassignedInfo(); + if (shard.primary() && unassignedInfo.getLastAllocationStatus() == UnassignedInfo.AllocationStatus.NO_ATTEMPT) { + unassignedIterator.updateUnassigned( + new UnassignedInfo( + unassignedInfo.getReason(), + unassignedInfo.getMessage(), + unassignedInfo.getFailure(), + unassignedInfo.getNumFailedAllocations(), + unassignedInfo.getUnassignedTimeInNanos(), + unassignedInfo.getUnassignedTimeInMillis(), + unassignedInfo.isDelayed(), + UnassignedInfo.AllocationStatus.DECIDERS_NO, + Collections.emptySet() + ), + shard.recoverySource(), + allocation.changes() + ); + } + } + } + + private Queue getShuffledRemoteNodes() { + List nodeList = getRemoteRoutingNodes(); + Randomness.shuffle(nodeList); + return new ArrayDeque<>(nodeList); + } + + /** + * Filters out and returns the list of {@link RoutingPool#REMOTE_CAPABLE} nodes from the routing nodes in cluster. + * @return list of {@link RoutingPool#REMOTE_CAPABLE} routing nodes. + */ + private List getRemoteRoutingNodes() { + List nodeList = new ArrayList<>(); + for (RoutingNode rNode : routingNodes) { + if (RoutingPool.REMOTE_CAPABLE.equals(RoutingPool.getNodePool(rNode))) { + nodeList.add(rNode); + } + } + return nodeList; + } + + /** + * {@link UnassignedIndexShards} maintains a queue of unassigned remote shards for allocation operations within + * the cluster. + * + * @opensearch.internal + */ + public static class UnassignedIndexShards { + private final Queue primaries = new ArrayDeque<>(); + private final Queue replicas = new ArrayDeque<>(); + + public void addShard(ShardRouting shard) { + if (shard.primary()) { + primaries.add(shard); + } else { + replicas.add(shard); + } + } + + public Queue getPrimaries() { + return primaries; + } + + public Queue getReplicas() { + return replicas; + } + } + + private String shardShortSummary(ShardRouting shard) { + return "[" + shard.getIndexName() + "]" + "[" + shard.getId() + "]" + "[" + (shard.primary() ? "p" : "r") + "]"; + } + +} diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/TargetPoolAllocationDecider.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/TargetPoolAllocationDecider.java new file mode 100644 index 0000000000000..c87f7d16079e9 --- /dev/null +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/TargetPoolAllocationDecider.java @@ -0,0 +1,104 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cluster.routing.allocation.decider; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.routing.RoutingNode; +import org.opensearch.cluster.routing.RoutingPool; +import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.cluster.routing.allocation.RoutingAllocation; + +/** + * {@link TargetPoolAllocationDecider} ensures that the different shard types are assigned to the nodes with + * appropriate capabilities. The node pools with respective capabilities are defined within {@link RoutingPool}. + * + * @opensearch.internal + */ +public class TargetPoolAllocationDecider extends AllocationDecider { + private static final Logger logger = LogManager.getLogger(TargetPoolAllocationDecider.class); + + public static final String NAME = "target_pool"; + + @Override + public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) { + RoutingPool shardPool = RoutingPool.getShardPool(shardRouting, allocation); + RoutingPool targetNodePool = RoutingPool.getNodePool(node); + if (RoutingPool.REMOTE_CAPABLE.equals(shardPool) && RoutingPool.LOCAL_ONLY.equals(targetNodePool)) { + logger.debug( + "Shard: [{}] has target pool: [{}]. Cannot allocate on node: [{}] with target pool: [{}]", + shardRouting.shortSummary(), + shardPool, + node.node(), + targetNodePool + ); + return allocation.decision( + Decision.NO, + NAME, + "Routing pools are incompatible. Shard pool: [%s], Node Pool: [%s]", + shardPool, + targetNodePool + ); + } + return allocation.decision( + Decision.YES, + NAME, + "Routing pools are compatible. Shard pool: [%s], Node Pool: [%s]", + shardPool, + targetNodePool + ); + } + + @Override + public Decision canAllocate(IndexMetadata indexMetadata, RoutingNode node, RoutingAllocation allocation) { + return canAllocateInTargetPool(indexMetadata, node.node(), allocation); + } + + @Override + public Decision canForceAllocatePrimary(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) { + logger.debug("Evaluating force allocation for primary shard."); + return canAllocate(shardRouting, node, allocation); + } + + public Decision shouldAutoExpandToNode(IndexMetadata indexMetadata, DiscoveryNode node, RoutingAllocation allocation) { + logger.debug("Evaluating node: {} for autoExpandReplica eligibility of index: {}", node, indexMetadata.getIndex()); + return canAllocateInTargetPool(indexMetadata, node, allocation); + } + + private Decision canAllocateInTargetPool(IndexMetadata indexMetadata, DiscoveryNode node, RoutingAllocation allocation) { + RoutingPool indexPool = RoutingPool.getIndexPool(indexMetadata); + RoutingPool targetNodePool = RoutingPool.getNodePool(node); + if (RoutingPool.REMOTE_CAPABLE.equals(indexPool) && RoutingPool.LOCAL_ONLY.equals(targetNodePool)) { + logger.debug( + "Index: [{}] has target pool: [{}]. Cannot allocate on node: [{}] with target pool: [{}]", + indexMetadata.getIndex().getName(), + indexPool, + node, + targetNodePool + ); + return allocation.decision( + Decision.NO, + NAME, + "Routing pools are incompatible. Index pool: [%s], Node Pool: [%s]", + indexPool, + targetNodePool + ); + } + return allocation.decision( + Decision.YES, + NAME, + "Routing pools are compatible. Index pool: [%s], Node Pool: [%s]", + indexPool, + targetNodePool + ); + } + +} diff --git a/server/src/test/java/org/opensearch/cluster/routing/AllocationIdTests.java b/server/src/test/java/org/opensearch/cluster/routing/AllocationIdTests.java index e266eacdc0320..fc008762edd35 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/AllocationIdTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/AllocationIdTests.java @@ -142,6 +142,24 @@ public void testMoveToUnassigned() { assertThat(shard.allocationId(), nullValue()); } + public void testMovePrimaryToReplica() { + logger.info("-- build started shard"); + ShardRouting shard = ShardRouting.newUnassigned( + new ShardId("test", "_na_", 0), + true, + ExistingStoreRecoverySource.INSTANCE, + new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, null) + ); + shard = shard.initialize("node1", null, -1); + shard = shard.moveToStarted(); + AllocationId originalAllocationId = shard.allocationId(); + + logger.info("-- move to replica"); + shard = shard.moveActivePrimaryToReplica(); + assertNotNull(shard.allocationId()); + assertEquals(originalAllocationId, shard.allocationId()); + } + public void testSerialization() throws IOException { AllocationId allocationId = AllocationId.newInitializing(); if (randomBoolean()) { diff --git a/server/src/test/java/org/opensearch/cluster/routing/RoutingNodesTests.java b/server/src/test/java/org/opensearch/cluster/routing/RoutingNodesTests.java index 3e9088d63cfb4..73136a71bc12a 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/RoutingNodesTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/RoutingNodesTests.java @@ -33,6 +33,7 @@ package org.opensearch.cluster.routing; import org.junit.Before; +import org.mockito.Mockito; import org.opensearch.Version; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.OpenSearchAllocationTestCase; @@ -160,4 +161,40 @@ public void testInterleavedShardIterator() { } assert shardCount == this.totalNumberOfShards; } + + public void testSwapPrimaryWithReplica() { + // Initialize all the shards for test index 1 and 2 + initPrimaries(); + startInitializingShards(TEST_INDEX_1); + startInitializingShards(TEST_INDEX_1); + startInitializingShards(TEST_INDEX_2); + startInitializingShards(TEST_INDEX_2); + + // Create primary shard count imbalance between two nodes + final RoutingNodes routingNodes = this.clusterState.getRoutingNodes(); + final RoutingNode node0 = routingNodes.node("node0"); + final RoutingNode node1 = routingNodes.node("node1"); + final List shardRoutingList = node0.shardsWithState(TEST_INDEX_1, ShardRoutingState.STARTED); + final RoutingChangesObserver routingChangesObserver = Mockito.mock(RoutingChangesObserver.class); + int swaps = 0; + + for (ShardRouting routing : shardRoutingList) { + if (routing.primary()) { + ShardRouting swap = node1.getByShardId(routing.shardId()); + routingNodes.swapPrimaryWithReplica(logger, routing, swap, routingChangesObserver); + swaps++; + } + } + Mockito.verify(routingChangesObserver, Mockito.times(swaps)).replicaPromoted(Mockito.any()); + + final List shards = node1.shardsWithState(TEST_INDEX_1, ShardRoutingState.STARTED); + int shardCount = 0; + for (ShardRouting shard : shards) { + if (shard.primary()) { + shardCount++; + } + } + + assertTrue(shardCount >= swaps); + } } diff --git a/server/src/test/java/org/opensearch/cluster/routing/ShardRoutingTests.java b/server/src/test/java/org/opensearch/cluster/routing/ShardRoutingTests.java index 1dd27cb706c64..a914ef5da31ca 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/ShardRoutingTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/ShardRoutingTests.java @@ -301,6 +301,18 @@ public void testEqualsIgnoringVersion() { } } + public void testSwapPrimaryWithReplica() { + final ShardRouting unassignedShard0 = TestShardRouting.newShardRouting("test", 0, null, false, ShardRoutingState.UNASSIGNED); + assertThrows(AssertionError.class, unassignedShard0::moveActivePrimaryToReplica); + + final ShardRouting activeShard0 = TestShardRouting.newShardRouting("test", 0, "node-1", false, ShardRoutingState.STARTED); + assertThrows(IllegalShardRoutingStateException.class, activeShard0::moveActivePrimaryToReplica); + + final ShardRouting activeShard1 = TestShardRouting.newShardRouting("test", 0, "node-1", true, ShardRoutingState.STARTED); + final ShardRouting activeReplicaShard1 = activeShard1.moveActivePrimaryToReplica(); + assertFalse(activeReplicaShard1.primary()); + } + public void testExpectedSize() throws IOException { final int iters = randomIntBetween(10, 100); for (int i = 0; i < iters; i++) { diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/RemoteShardsAllocateUnassignedTests.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/RemoteShardsAllocateUnassignedTests.java new file mode 100644 index 0000000000000..7c45b20ecee1f --- /dev/null +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/RemoteShardsAllocateUnassignedTests.java @@ -0,0 +1,123 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cluster.routing.allocation; + +import com.carrotsearch.hppc.ObjectIntHashMap; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.routing.RoutingNode; +import org.opensearch.cluster.routing.RoutingNodes; +import org.opensearch.cluster.routing.RoutingPool; +import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.cluster.routing.allocation.allocator.RemoteShardsBalancer; + +import java.util.Map; + +public class RemoteShardsAllocateUnassignedTests extends RemoteShardsBalancerBaseTestCase { + + /** + * Test Remote Shards Balancer initialization. + */ + public void testInit() { + int localOnlyNodes = 7; + int remoteCapableNodes = 3; + int localIndices = 10; + int remoteIndices = 15; + ClusterState clusterState = createInitialCluster(localOnlyNodes, remoteCapableNodes, localIndices, remoteIndices); + RoutingNodes routingNodes = new RoutingNodes(clusterState, false); + RoutingAllocation allocation = getRoutingAllocation(clusterState, routingNodes); + + RemoteShardsBalancer remoteShardsBalancer = new RemoteShardsBalancer(logger, allocation); + Map unassignedShardMap = remoteShardsBalancer.groupUnassignedShardsByIndex(); + + assertEquals(remoteIndices, unassignedShardMap.size()); + for (String index : unassignedShardMap.keySet()) { + assertTrue(index.startsWith(REMOTE_IDX_PREFIX)); + RemoteShardsBalancer.UnassignedIndexShards indexShards = unassignedShardMap.get(index); + assertEquals(5, indexShards.getPrimaries().size()); + for (ShardRouting shard : indexShards.getPrimaries()) { + assertTrue(shard.primary()); + assertEquals(shard.getIndexName(), index); + } + assertEquals(5, indexShards.getReplicas().size()); + for (ShardRouting shard : indexShards.getReplicas()) { + assertFalse(shard.primary()); + assertEquals(shard.getIndexName(), index); + } + } + } + + /** + * Test remote unassigned shard allocation for standard new cluster setup. + */ + public void testPrimaryAllocation() { + int localOnlyNodes = 7; + int remoteCapableNodes = 3; + int localIndices = 10; + int remoteIndices = 13; + ClusterState clusterState = createInitialCluster(localOnlyNodes, remoteCapableNodes, localIndices, remoteIndices); + AllocationService service = this.createRemoteCapableAllocationService(); + clusterState = allocateShardsAndBalance(clusterState, service); + RoutingNodes routingNodes = clusterState.getRoutingNodes(); + RoutingAllocation allocation = getRoutingAllocation(clusterState, routingNodes); + + assertEquals(0, routingNodes.unassigned().size()); + + ObjectIntHashMap nodePrimariesCounter = new ObjectIntHashMap<>(); + for (ShardRouting shard : clusterState.getRoutingTable().allShards()) { + assertFalse(shard.unassigned()); + RoutingNode node = routingNodes.node(shard.currentNodeId()); + RoutingPool nodePool = RoutingPool.getNodePool(node); + RoutingPool shardPool = RoutingPool.getShardPool(shard, allocation); + if (RoutingPool.REMOTE_CAPABLE.equals(shardPool)) { + assertEquals(nodePool, shardPool); + } + if (RoutingPool.getNodePool(node) == RoutingPool.REMOTE_CAPABLE + && RoutingPool.REMOTE_CAPABLE.equals(RoutingPool.getShardPool(shard, allocation)) + && shard.primary()) { + nodePrimariesCounter.putOrAdd(node.nodeId(), 1, 1); + } + } + final int indexShardLimit = (int) Math.ceil(totalPrimaries(remoteIndices) / (float) remoteCapableNodes); + for (int primaries : nodePrimariesCounter.values) { + assertTrue(primaries <= indexShardLimit); + } + } + + /** + * Test remote unassigned shard allocation when remote capable nodes fail to come up. + */ + public void testAllocationRemoteCapableNodesUnavailable() { + int localOnlyNodes = 7; + int remoteCapableNodes = 0; + int localIndices = 10; + int remoteIndices = 13; + ClusterState clusterState = createInitialCluster(localOnlyNodes, remoteCapableNodes, localIndices, remoteIndices); + AllocationService service = this.createRemoteCapableAllocationService(); + clusterState = allocateShardsAndBalance(clusterState, service); + RoutingNodes routingNodes = clusterState.getRoutingNodes(); + RoutingAllocation allocation = getRoutingAllocation(clusterState, routingNodes); + + assertEquals(totalShards(remoteIndices), routingNodes.unassigned().size()); + + for (ShardRouting shard : clusterState.getRoutingTable().allShards()) { + if (RoutingPool.getShardPool(shard, allocation) == RoutingPool.REMOTE_CAPABLE) { + assertTrue(shard.unassigned()); + } else { + assertFalse(shard.unassigned()); + RoutingNode node = routingNodes.node(shard.currentNodeId()); + assertEquals(RoutingPool.LOCAL_ONLY, RoutingPool.getNodePool(node)); + } + } + for (RoutingNode node : routingNodes) { + if (RoutingPool.getNodePool(node) == RoutingPool.REMOTE_CAPABLE) { + assertEquals(0, node.size()); + } + } + } +} diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/RemoteShardsBalancerBaseTestCase.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/RemoteShardsBalancerBaseTestCase.java new file mode 100644 index 0000000000000..789de474d8ce5 --- /dev/null +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/RemoteShardsBalancerBaseTestCase.java @@ -0,0 +1,302 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cluster.routing.allocation; + +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.opensearch.Version; +import org.opensearch.cluster.ClusterInfo; +import org.opensearch.cluster.ClusterModule; +import org.opensearch.cluster.ClusterName; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.DiskUsage; +import org.opensearch.cluster.EmptyClusterInfoService; +import org.opensearch.cluster.OpenSearchAllocationTestCase; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.metadata.Metadata; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.node.DiscoveryNodeRole; +import org.opensearch.cluster.node.DiscoveryNodes; +import org.opensearch.cluster.routing.RoutingNodes; +import org.opensearch.cluster.routing.RoutingTable; +import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.cluster.routing.UnassignedInfo; +import org.opensearch.cluster.routing.allocation.allocator.BalancedShardsAllocator; +import org.opensearch.cluster.routing.allocation.allocator.ShardsAllocator; +import org.opensearch.cluster.routing.allocation.decider.AllocationDecider; +import org.opensearch.cluster.routing.allocation.decider.AllocationDeciders; +import org.opensearch.common.SuppressForbidden; +import org.opensearch.common.collect.ImmutableOpenMap; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.transport.TransportAddress; +import org.opensearch.common.util.FeatureFlags; +import org.opensearch.index.IndexModule; +import org.opensearch.test.gateway.TestGatewayAllocator; + +import java.net.Inet4Address; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.opensearch.cluster.routing.ShardRoutingState.INITIALIZING; + +@SuppressForbidden(reason = "feature flag overrides") +public abstract class RemoteShardsBalancerBaseTestCase extends OpenSearchAllocationTestCase { + protected static final String LOCAL_NODE_PREFIX = "local-only-node"; + protected static final String REMOTE_NODE_PREFIX = "remote-capable-node"; + protected static final String LOCAL_IDX_PREFIX = "local-idx"; + protected static final String REMOTE_IDX_PREFIX = "remote-idx"; + protected static final Set MANAGER_DATA_ROLES = Set.of( + DiscoveryNodeRole.CLUSTER_MANAGER_ROLE, + DiscoveryNodeRole.DATA_ROLE + ); + protected static final Set SEARCH_DATA_ROLES = Set.of( + DiscoveryNodeRole.CLUSTER_MANAGER_ROLE, + DiscoveryNodeRole.DATA_ROLE, + DiscoveryNodeRole.SEARCH_ROLE + ); + + protected static final int PRIMARIES = 5; + protected static final int REPLICAS = 1; + private static final int MAX_REROUTE_ITERATIONS = 1000; + + protected ClusterSettings EMPTY_CLUSTER_SETTINGS = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + + @BeforeClass + public static void setup() { + System.setProperty(FeatureFlags.SEARCHABLE_SNAPSHOT, "true"); + } + + @AfterClass + public static void teardown() { + System.setProperty(FeatureFlags.SEARCHABLE_SNAPSHOT, "false"); + } + + public String getNodeId(int id, boolean isRemote, String prefix) { + if (isRemote) { + return REMOTE_NODE_PREFIX + "-" + prefix + "-" + id; + } + return LOCAL_NODE_PREFIX + "-" + prefix + "-" + id; + } + + public String getNodeId(int id, boolean isRemote) { + return getNodeId(id, isRemote, ""); + } + + public String getIndexName(int id, boolean isRemote, String prefix) { + if (isRemote) { + return REMOTE_IDX_PREFIX + "-" + prefix + "-" + id; + } + return LOCAL_IDX_PREFIX + "-" + prefix + "-" + id; + } + + public String getIndexName(int id, boolean isRemote) { + return getIndexName(id, isRemote, ""); + } + + public RoutingAllocation getRoutingAllocation(ClusterState clusterState, RoutingNodes routingNodes) { + return new RoutingAllocation( + randomAllocationDeciders(Settings.Builder.EMPTY_SETTINGS, EMPTY_CLUSTER_SETTINGS, random()), + routingNodes, + clusterState, + EmptyClusterInfoService.INSTANCE.getClusterInfo(), + null, + System.nanoTime() + ); + } + + private Map createNodeAttributes(String nodeId) { + Map attr = new HashMap<>(); + attr.put("name", nodeId); + attr.put("node_id", nodeId); + return attr; + } + + public ClusterState addNodes(ClusterState clusterState, int nodeCount, boolean isRemote) { + DiscoveryNodes.Builder nb = DiscoveryNodes.builder(clusterState.nodes()); + for (int i = 0; i < nodeCount; i++) { + String id = getNodeId(i, isRemote, "new"); + nb.add(newNode(id, id, isRemote ? SEARCH_DATA_ROLES : MANAGER_DATA_ROLES)); + } + return ClusterState.builder(clusterState).nodes(nb.build()).build(); + } + + public ClusterState addNodeWithIP(ClusterState clusterState, int nodeId, boolean isRemote, String IP) throws UnknownHostException { + TransportAddress ipAddress = new TransportAddress(Inet4Address.getByName(IP), 9200); + DiscoveryNodes.Builder nb = DiscoveryNodes.builder(clusterState.nodes()); + String id = getNodeId(nodeId, isRemote, "new"); + nb.add( + new DiscoveryNode( + id, + id, + ipAddress, + createNodeAttributes(id), + isRemote ? SEARCH_DATA_ROLES : MANAGER_DATA_ROLES, + Version.CURRENT + ) + ); + return ClusterState.builder(clusterState).nodes(nb.build()).build(); + } + + public ClusterState terminateNodes(ClusterState clusterState, AllocationService service, List nodesToTerminate) { + if (nodesToTerminate.isEmpty()) { + return clusterState; + } + logger.info("Terminating following nodes from cluster: [{}]", nodesToTerminate); + DiscoveryNodes.Builder nb = DiscoveryNodes.builder(clusterState.nodes()); + nodesToTerminate.forEach(nb::remove); + clusterState = ClusterState.builder(clusterState).nodes(nb.build()).build(); + clusterState = service.disassociateDeadNodes(clusterState, false, "nodes-terminated"); + return clusterState; + } + + public ClusterState createInitialCluster(int localOnlyNodes, int remoteCapableNodes, int localIndices, int remoteIndices) { + Metadata.Builder mb = Metadata.builder(); + for (int i = 0; i < localIndices; i++) { + mb.put( + IndexMetadata.builder(getIndexName(i, false)) + .settings(settings(Version.CURRENT).put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), "0")) + .numberOfShards(PRIMARIES) + .numberOfReplicas(REPLICAS) + ); + } + + for (int i = 0; i < remoteIndices; i++) { + mb.put( + IndexMetadata.builder(getIndexName(i, true)) + .settings( + settings(Version.CURRENT).put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), "0") + .put(IndexModule.INDEX_STORE_TYPE_SETTING.getKey(), IndexModule.Type.REMOTE_SNAPSHOT.getSettingsKey()) + ) + .numberOfShards(PRIMARIES) + .numberOfReplicas(REPLICAS) + ); + } + Metadata metadata = mb.build(); + + RoutingTable.Builder rb = RoutingTable.builder(); + for (int i = 0; i < localIndices; i++) { + rb.addAsNew(metadata.index(getIndexName(i, false))); + } + for (int i = 0; i < remoteIndices; i++) { + rb.addAsNew(metadata.index(getIndexName(i, true))); + } + RoutingTable routingTable = rb.build(); + + DiscoveryNodes.Builder nb = DiscoveryNodes.builder(); + for (int i = 0; i < localOnlyNodes; i++) { + String name = getNodeId(i, false); + nb.add(newNode(name, name, MANAGER_DATA_ROLES)); + } + for (int i = 0; i < remoteCapableNodes; i++) { + String name = getNodeId(i, true); + nb.add(newNode(name, name, SEARCH_DATA_ROLES)); + } + DiscoveryNodes nodes = nb.build(); + return ClusterState.builder(ClusterName.DEFAULT).metadata(metadata).routingTable(routingTable).nodes(nodes).build(); + } + + protected ClusterState createRemoteIndex(ClusterState state, String indexName) { + Metadata metadata = Metadata.builder(state.metadata()) + .put( + IndexMetadata.builder(indexName) + .settings( + settings(Version.CURRENT).put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), "5m") + .put(IndexModule.INDEX_STORE_TYPE_SETTING.getKey(), IndexModule.Type.REMOTE_SNAPSHOT.getSettingsKey()) + ) + .numberOfShards(PRIMARIES) + .numberOfReplicas(REPLICAS) + ) + .build(); + RoutingTable routingTable = RoutingTable.builder(state.routingTable()).addAsNew(metadata.index(indexName)).build(); + return ClusterState.builder(state).metadata(metadata).routingTable(routingTable).build(); + } + + private AllocationDeciders remoteAllocationDeciders(Settings settings, ClusterSettings clusterSettings) { + List deciders = new ArrayList<>( + ClusterModule.createAllocationDeciders(settings, clusterSettings, Collections.emptyList()) + ); + Collections.shuffle(deciders, random()); + return new AllocationDeciders(deciders); + } + + public AllocationService createRemoteCapableAllocationService() { + Settings settings = Settings.Builder.EMPTY_SETTINGS; + return new OpenSearchAllocationTestCase.MockAllocationService( + randomAllocationDeciders(settings, EMPTY_CLUSTER_SETTINGS, random()), + new TestGatewayAllocator(), + createShardAllocator(settings), + EmptyClusterInfoService.INSTANCE, + SNAPSHOT_INFO_SERVICE_WITH_NO_SHARD_SIZES + ); + } + + public AllocationService createRemoteCapableAllocationService(String excludeNodes) { + Settings settings = Settings.builder().put("cluster.routing.allocation.exclude.node_id", excludeNodes).build(); + return new MockAllocationService( + randomAllocationDeciders(settings, EMPTY_CLUSTER_SETTINGS, random()), + new TestGatewayAllocator(), + createShardAllocator(settings), + EmptyClusterInfoService.INSTANCE, + SNAPSHOT_INFO_SERVICE_WITH_NO_SHARD_SIZES + ); + } + + public AllocationDeciders createAllocationDeciders() { + Settings settings = Settings.Builder.EMPTY_SETTINGS; + return randomAllocationDeciders(settings, EMPTY_CLUSTER_SETTINGS, random()); + + } + + public ClusterState allocateShardsAndBalance(ClusterState clusterState, AllocationService service) { + int iterations = 0; + do { + clusterState = service.applyStartedShards(clusterState, clusterState.getRoutingNodes().shardsWithState(INITIALIZING)); + clusterState = service.reroute(clusterState, "reroute"); + iterations++; + } while (!clusterState.getRoutingNodes().shardsWithState(INITIALIZING).isEmpty() && iterations < MAX_REROUTE_ITERATIONS); + return clusterState; + } + + public int totalShards(int indices) { + return indices * PRIMARIES * (REPLICAS + 1); + } + + public int totalPrimaries(int indices) { + return indices * PRIMARIES; + } + + public ShardsAllocator createShardAllocator(Settings settings) { + ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + return new BalancedShardsAllocator(settings, clusterSettings); + } + + /** + * ClusterInfo that always reports /dev/null for the shards' data paths. + */ + public static class DevNullClusterInfo extends ClusterInfo { + public DevNullClusterInfo( + ImmutableOpenMap leastAvailableSpaceUsage, + ImmutableOpenMap mostAvailableSpaceUsage, + ImmutableOpenMap shardSizes + ) { + super(leastAvailableSpaceUsage, mostAvailableSpaceUsage, shardSizes, null, ImmutableOpenMap.of()); + } + + @Override + public String getDataPath(ShardRouting shardRouting) { + return "/dev/null"; + } + } +} diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/RemoteShardsMoveShardsTests.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/RemoteShardsMoveShardsTests.java new file mode 100644 index 0000000000000..f2e79b319d0dd --- /dev/null +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/RemoteShardsMoveShardsTests.java @@ -0,0 +1,109 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cluster.routing.allocation; + +import org.opensearch.Version; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.health.ClusterHealthStatus; +import org.opensearch.cluster.health.ClusterStateHealth; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.metadata.Metadata; +import org.opensearch.cluster.routing.RoutingNodes; +import org.opensearch.cluster.routing.RoutingTable; +import org.opensearch.cluster.routing.ShardRoutingState; +import org.opensearch.index.IndexModule; + +public class RemoteShardsMoveShardsTests extends RemoteShardsBalancerBaseTestCase { + + /** + * Test reroute terminates gracefully if shards cannot move out of the excluded node + */ + public void testExcludeNodeIdMoveBlocked() { + int localOnlyNodes = 7; + int remoteCapableNodes = 2; + int localIndices = 10; + int remoteIndices = 13; + ClusterState clusterState = createInitialCluster(localOnlyNodes, remoteCapableNodes, localIndices, remoteIndices); + AllocationService service = this.createRemoteCapableAllocationService(); + clusterState = allocateShardsAndBalance(clusterState, service); + assertEquals(ClusterHealthStatus.GREEN, (new ClusterStateHealth(clusterState)).getStatus()); + + // Exclude a node + final String excludedNodeID = getNodeId(0, true); + service = createRemoteCapableAllocationService(excludedNodeID); + clusterState = allocateShardsAndBalance(clusterState, service); + + RoutingNodes routingNodes = clusterState.getRoutingNodes(); + assertEquals(ClusterHealthStatus.GREEN, (new ClusterStateHealth(clusterState)).getStatus()); + assertEquals(0, routingNodes.unassigned().size()); + assertTrue(routingNodes.node(excludedNodeID).size() > 0); + } + + /** + * Test move operations for index level allocation settings. + * Supported for local indices, not supported for remote indices. + */ + public void testIndexLevelExclusions() throws InterruptedException { + int localOnlyNodes = 7; + int remoteCapableNodes = 3; + int localIndices = 10; + int remoteIndices = 13; + ClusterState clusterState = createInitialCluster(localOnlyNodes, remoteCapableNodes, localIndices, remoteIndices); + AllocationService service = this.createRemoteCapableAllocationService(); + clusterState = allocateShardsAndBalance(clusterState, service); + RoutingNodes routingNodes = clusterState.getRoutingNodes(); + + assertEquals(ClusterHealthStatus.GREEN, (new ClusterStateHealth(clusterState)).getStatus()); + assertEquals(0, routingNodes.unassigned().size()); + + final String excludedLocalOnlyNode = getNodeId(0, false); + final String excludedRemoteCapableNode = getNodeId(0, true); + final String localIndex = routingNodes.node(excludedLocalOnlyNode).shardsWithState(ShardRoutingState.STARTED).get(0).getIndexName(); + final String remoteIndex = routingNodes.node(excludedRemoteCapableNode) + .shardsWithState(ShardRoutingState.STARTED) + .stream() + .filter(shardRouting -> shardRouting.getIndexName().startsWith(REMOTE_IDX_PREFIX)) + .findFirst() + .get() + .getIndexName(); + + Metadata.Builder mb = Metadata.builder(clusterState.metadata()); + mb.put( + IndexMetadata.builder(clusterState.metadata().index(localIndex)) + .settings( + settings(Version.CURRENT).put("index.number_of_shards", PRIMARIES) + .put("index.number_of_replicas", REPLICAS) + .put("index.routing.allocation.exclude._name", excludedLocalOnlyNode) + .build() + ) + ); + mb.put( + IndexMetadata.builder(clusterState.metadata().index(remoteIndex)) + .settings( + settings(Version.CURRENT).put("index.number_of_shards", PRIMARIES) + .put("index.number_of_replicas", REPLICAS) + .put(IndexModule.INDEX_STORE_TYPE_SETTING.getKey(), IndexModule.Type.REMOTE_SNAPSHOT.getSettingsKey()) + .put("index.routing.allocation.exclude._name", excludedRemoteCapableNode) + .build() + ) + ); + clusterState = ClusterState.builder(clusterState).metadata(mb.build()).build(); + + clusterState = allocateShardsAndBalance(clusterState, service); + assertEquals(ClusterHealthStatus.GREEN, (new ClusterStateHealth(clusterState)).getStatus()); + RoutingTable routingTable = clusterState.routingTable(); + + // No shard of updated local index should be on excluded local capable node + assertTrue(routingTable.allShards(localIndex).stream().noneMatch(shard -> shard.currentNodeId().equals(excludedLocalOnlyNode))); + + // Since remote index shards are untouched, at least one shard should + // continue to stay on the excluded remote capable node + assertTrue(routingTable.allShards(remoteIndex).stream().anyMatch(shard -> shard.currentNodeId().equals(excludedRemoteCapableNode))); + } +} diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/RemoteShardsRebalanceShardsTests.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/RemoteShardsRebalanceShardsTests.java new file mode 100644 index 0000000000000..c4358aaf12ac2 --- /dev/null +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/RemoteShardsRebalanceShardsTests.java @@ -0,0 +1,82 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cluster.routing.allocation; + +import com.carrotsearch.hppc.ObjectIntHashMap; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.routing.RoutingNode; +import org.opensearch.cluster.routing.RoutingNodes; +import org.opensearch.cluster.routing.RoutingPool; +import org.opensearch.cluster.routing.ShardRouting; + +public class RemoteShardsRebalanceShardsTests extends RemoteShardsBalancerBaseTestCase { + + /** + * Test remote shard allocation and balancing for standard new cluster setup. + * + * Post rebalance primaries should be balanced across all the nodes. + */ + public void testShardAllocationAndRebalance() { + int localOnlyNodes = 20; + int remoteCapableNodes = 40; + int localIndices = 40; + int remoteIndices = 80; + ClusterState clusterState = createInitialCluster(localOnlyNodes, remoteCapableNodes, localIndices, remoteIndices); + AllocationService service = this.createRemoteCapableAllocationService(); + clusterState = allocateShardsAndBalance(clusterState, service); + RoutingNodes routingNodes = clusterState.getRoutingNodes(); + RoutingAllocation allocation = getRoutingAllocation(clusterState, routingNodes); + + ObjectIntHashMap nodePrimariesCounter = getShardCounterPerNodeForRemoteCapablePool(clusterState, allocation, true); + ObjectIntHashMap nodeReplicaCounter = getShardCounterPerNodeForRemoteCapablePool(clusterState, allocation, false); + int avgPrimariesPerNode = getTotalShardCountAcrossNodes(nodePrimariesCounter) / remoteCapableNodes; + + // Primary and replica are balanced post first reroute + for (RoutingNode node : routingNodes) { + if (RoutingPool.REMOTE_CAPABLE.equals(RoutingPool.getNodePool(node))) { + assertInRange(nodePrimariesCounter.get(node.nodeId()), avgPrimariesPerNode, remoteCapableNodes - 1); + assertTrue(nodeReplicaCounter.get(node.nodeId()) >= 0); + } + } + } + + private ObjectIntHashMap getShardCounterPerNodeForRemoteCapablePool( + ClusterState clusterState, + RoutingAllocation allocation, + boolean primary + ) { + ObjectIntHashMap nodePrimariesCounter = new ObjectIntHashMap<>(); + for (ShardRouting shard : clusterState.getRoutingTable().allShards()) { + if (RoutingPool.REMOTE_CAPABLE.equals(RoutingPool.getShardPool(shard, allocation)) && shard.primary() == primary) { + nodePrimariesCounter.putOrAdd(shard.currentNodeId(), 1, 1); + } + } + return nodePrimariesCounter; + } + + private int getTotalShardCountAcrossNodes(ObjectIntHashMap nodePrimariesCounter) { + int totalShardCount = 0; + for (int value : nodePrimariesCounter.values) { + totalShardCount += value; + } + return totalShardCount; + } + + /** + * Asserts that the expected value is within the variance range. + * + * Being used to assert the average number of shards per node. + * Variance is required in case of non-absolute mean values; + * for example, total number of remote capable nodes in a cluster. + */ + private void assertInRange(int actual, int expectedMean, int variance) { + assertTrue(actual >= expectedMean - variance); + assertTrue(actual <= expectedMean + variance); + } +} diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/decider/TargetPoolAllocationDeciderTests.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/decider/TargetPoolAllocationDeciderTests.java new file mode 100644 index 0000000000000..9a415ed0b339b --- /dev/null +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/decider/TargetPoolAllocationDeciderTests.java @@ -0,0 +1,114 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cluster.routing.allocation.decider; + +import org.opensearch.Version; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.metadata.Metadata; +import org.opensearch.cluster.routing.RoutingNode; +import org.opensearch.cluster.routing.RoutingNodes; +import org.opensearch.cluster.routing.RoutingTable; +import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.cluster.routing.allocation.AllocationService; +import org.opensearch.cluster.routing.allocation.RemoteShardsBalancerBaseTestCase; +import org.opensearch.cluster.routing.allocation.RoutingAllocation; + +import java.util.Collections; +import java.util.stream.Collectors; + +public class TargetPoolAllocationDeciderTests extends RemoteShardsBalancerBaseTestCase { + public void testTargetPoolAllocationDecisions() { + ClusterState clusterState = createInitialCluster(3, 3, 2, 2); + AllocationService service = this.createRemoteCapableAllocationService(); + clusterState = allocateShardsAndBalance(clusterState, service); + + // Add an unassigned primary shard for force allocation checks + Metadata metadata = Metadata.builder(clusterState.metadata()) + .put(IndexMetadata.builder("test_local_unassigned").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(1)) + .build(); + RoutingTable routingTable = RoutingTable.builder(clusterState.routingTable()) + .addAsNew(metadata.index("test_local_unassigned")) + .build(); + clusterState = ClusterState.builder(clusterState).metadata(metadata).routingTable(routingTable).build(); + + // Add remote index unassigned primary + clusterState = createRemoteIndex(clusterState, "test_remote_unassigned"); + + RoutingNodes defaultRoutingNodes = clusterState.getRoutingNodes(); + RoutingAllocation globalAllocation = getRoutingAllocation(clusterState, defaultRoutingNodes); + + ShardRouting localShard = clusterState.routingTable() + .allShards(getIndexName(0, false)) + .stream() + .filter(ShardRouting::primary) + .collect(Collectors.toList()) + .get(0); + ShardRouting remoteShard = clusterState.routingTable() + .allShards(getIndexName(0, true)) + .stream() + .filter(ShardRouting::primary) + .collect(Collectors.toList()) + .get(0); + ShardRouting unassignedLocalShard = clusterState.routingTable() + .allShards("test_local_unassigned") + .stream() + .filter(ShardRouting::primary) + .collect(Collectors.toList()) + .get(0); + ShardRouting unassignedRemoteShard = clusterState.routingTable() + .allShards("test_remote_unassigned") + .stream() + .filter(ShardRouting::primary) + .collect(Collectors.toList()) + .get(0); + IndexMetadata localIdx = globalAllocation.metadata().getIndexSafe(localShard.index()); + IndexMetadata remoteIdx = globalAllocation.metadata().getIndexSafe(remoteShard.index()); + String localNodeId = LOCAL_NODE_PREFIX; + for (RoutingNode routingNode : globalAllocation.routingNodes()) { + if (routingNode.nodeId().startsWith(LOCAL_NODE_PREFIX)) { + localNodeId = routingNode.nodeId(); + break; + } + } + String remoteNodeId = remoteShard.currentNodeId(); + RoutingNode localOnlyNode = defaultRoutingNodes.node(localNodeId); + RoutingNode remoteCapableNode = defaultRoutingNodes.node(remoteNodeId); + + AllocationDeciders deciders = new AllocationDeciders(Collections.singletonList(new TargetPoolAllocationDecider())); + + // Incompatible Pools + assertEquals(Decision.NO.type(), deciders.canAllocate(remoteShard, localOnlyNode, globalAllocation).type()); + assertEquals(Decision.NO.type(), deciders.canAllocate(remoteIdx, localOnlyNode, globalAllocation).type()); + assertEquals(Decision.NO.type(), deciders.canForceAllocatePrimary(unassignedRemoteShard, localOnlyNode, globalAllocation).type()); + + // Compatible Pools + assertEquals(Decision.YES.type(), deciders.canAllocate(localShard, remoteCapableNode, globalAllocation).type()); + assertEquals(Decision.YES.type(), deciders.canAllocate(localIdx, remoteCapableNode, globalAllocation).type()); + assertEquals(Decision.YES.type(), deciders.canAllocate(remoteShard, remoteCapableNode, globalAllocation).type()); + assertEquals(Decision.YES.type(), deciders.canAllocate(remoteIdx, remoteCapableNode, globalAllocation).type()); + assertEquals(Decision.YES.type(), deciders.canAllocate(localShard, localOnlyNode, globalAllocation).type()); + assertEquals(Decision.YES.type(), deciders.canAllocate(localIdx, localOnlyNode, globalAllocation).type()); + assertEquals( + Decision.YES.type(), + deciders.canForceAllocatePrimary(unassignedRemoteShard, remoteCapableNode, globalAllocation).type() + ); + assertEquals(Decision.YES.type(), deciders.canForceAllocatePrimary(unassignedLocalShard, localOnlyNode, globalAllocation).type()); + assertEquals( + Decision.YES.type(), + deciders.canForceAllocatePrimary(unassignedLocalShard, remoteCapableNode, globalAllocation).type() + ); + + // Verify only remote nodes are used for auto expand replica decision for remote index + assertEquals(Decision.YES.type(), deciders.shouldAutoExpandToNode(localIdx, remoteCapableNode.node(), globalAllocation).type()); + assertEquals(Decision.NO.type(), deciders.shouldAutoExpandToNode(remoteIdx, localOnlyNode.node(), globalAllocation).type()); + assertEquals(Decision.YES.type(), deciders.shouldAutoExpandToNode(localIdx, localOnlyNode.node(), globalAllocation).type()); + assertEquals(Decision.YES.type(), deciders.shouldAutoExpandToNode(remoteIdx, remoteCapableNode.node(), globalAllocation).type()); + } +} diff --git a/test/framework/src/main/java/org/opensearch/cluster/OpenSearchAllocationTestCase.java b/test/framework/src/main/java/org/opensearch/cluster/OpenSearchAllocationTestCase.java index a13d337fa4d26..1d527140dc038 100644 --- a/test/framework/src/main/java/org/opensearch/cluster/OpenSearchAllocationTestCase.java +++ b/test/framework/src/main/java/org/opensearch/cluster/OpenSearchAllocationTestCase.java @@ -168,6 +168,10 @@ protected static DiscoveryNode newNode(String nodeId, Set rol return new DiscoveryNode(nodeId, buildNewFakeTransportAddress(), emptyMap(), roles, Version.CURRENT); } + protected static DiscoveryNode newNode(String nodeName, String nodeId, Set roles) { + return new DiscoveryNode(nodeName, nodeId, buildNewFakeTransportAddress(), emptyMap(), roles, Version.CURRENT); + } + protected static DiscoveryNode newNode(String nodeId, Version version) { return new DiscoveryNode(nodeId, buildNewFakeTransportAddress(), emptyMap(), CLUSTER_MANAGER_DATA_ROLES, version); } diff --git a/test/framework/src/main/java/org/opensearch/test/InternalTestCluster.java b/test/framework/src/main/java/org/opensearch/test/InternalTestCluster.java index d5925c03db480..62b94cd4f0106 100644 --- a/test/framework/src/main/java/org/opensearch/test/InternalTestCluster.java +++ b/test/framework/src/main/java/org/opensearch/test/InternalTestCluster.java @@ -199,6 +199,11 @@ public final class InternalTestCluster extends TestCluster { nodeAndClient.node.settings() ); + private static final Predicate SEARCH_NODE_PREDICATE = nodeAndClient -> DiscoveryNode.hasRole( + nodeAndClient.node.settings(), + DiscoveryNodeRole.SEARCH_ROLE + ); + private static final Predicate NO_DATA_NO_CLUSTER_MANAGER_PREDICATE = nodeAndClient -> DiscoveryNode .isClusterManagerNode(nodeAndClient.node.settings()) == false && DiscoveryNode.isDataNode(nodeAndClient.node.settings()) == false; @@ -664,6 +669,24 @@ public synchronized void ensureAtLeastNumDataNodes(int n) { } } + /** + * Ensures that at least n search nodes are present in the cluster. + * if more nodes than n are present this method will not + * stop any of the running nodes. + */ + public synchronized void ensureAtLeastNumSearchNodes(int n) { + int size = numSearchNodes(); + if (size < n) { + logger.info("increasing cluster size from {} to {}", size, n); + if (numSharedDedicatedClusterManagerNodes > 0) { + startSearchOnlyNodes(n - size); + } else { + startNodes(n - size, Settings.builder().put(onlyRole(Settings.EMPTY, DiscoveryNodeRole.SEARCH_ROLE)).build()); + } + validateClusterFormed(); + } + } + /** * Ensures that at most n are up and running. * If less nodes that n are running this method @@ -2291,6 +2314,14 @@ public List startDataOnlyNodes(int numNodes, Settings settings) { return startNodes(numNodes, Settings.builder().put(onlyRole(settings, DiscoveryNodeRole.DATA_ROLE)).build()); } + public List startSearchOnlyNodes(int numNodes) { + return startSearchOnlyNodes(numNodes, Settings.EMPTY); + } + + public List startSearchOnlyNodes(int numNodes, Settings settings) { + return startNodes(numNodes, Settings.builder().put(onlyRole(settings, DiscoveryNodeRole.SEARCH_ROLE)).build()); + } + /** calculates a min cluster-manager nodes value based on the given number of cluster-manager nodes */ private static int getMinClusterManagerNodes(int eligibleClusterManagerNodes) { return eligibleClusterManagerNodes / 2 + 1; @@ -2347,6 +2378,10 @@ public int numDataNodes() { return dataNodeAndClients().size(); } + public int numSearchNodes() { + return searchNodeAndClients().size(); + } + @Override public int numDataAndClusterManagerNodes() { return filterNodes(nodes, DATA_NODE_PREDICATE.or(CLUSTER_MANAGER_NODE_PREDICATE)).size(); @@ -2406,6 +2441,10 @@ private Collection dataNodeAndClients() { return filterNodes(nodes, DATA_NODE_PREDICATE); } + private Collection searchNodeAndClients() { + return filterNodes(nodes, SEARCH_NODE_PREDICATE); + } + private static Collection filterNodes( Map map, Predicate predicate