Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add remote shards allocator for searchable snapshots #4870

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
- Renamed flaky tests ([#4912](https://github.com/opensearch-project/OpenSearch/pull/4912))
- Update previous release bwc version to 2.5.0 ([#5003](https://github.com/opensearch-project/OpenSearch/pull/5003))
- Use getParameterCount instead of getParameterTypes ([#4821](https://github.com/opensearch-project/OpenSearch/pull/4821))

- Remote shard balancer support for searchable snapshots ([#4870](https://github.com/opensearch-project/OpenSearch/pull/4870))
### Dependencies
- Bumps `log4j-core` from 2.18.0 to 2.19.0
- Bumps `reactor-netty-http` from 1.0.18 to 1.0.23
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,13 @@ protected boolean addMockInternalEngine() {
return false;
}

@Override
protected Settings.Builder randomRepositorySettings() {
final Settings.Builder settings = Settings.builder();
settings.put("location", randomRepoPath()).put("compress", randomBoolean());
return settings;
}

public void testCreateSearchableSnapshot() throws Exception {
final int numReplicasIndex1 = randomIntBetween(1, 4);
final int numReplicasIndex2 = randomIntBetween(0, 2);
Expand Down Expand Up @@ -92,6 +99,8 @@ public void testCreateSearchableSnapshot() throws Exception {

assertTrue(client.admin().indices().prepareDelete("test-idx-1", "test-idx-2").get().isAcknowledged());

internalCluster().ensureAtLeastNumSearchNodes(Math.max(numReplicasIndex1, numReplicasIndex2) + 1);

logger.info("--> restore indices as 'remote_snapshot'");
client.admin()
.cluster()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
import org.opensearch.cluster.routing.allocation.decider.SameShardAllocationDecider;
import org.opensearch.cluster.routing.allocation.decider.ShardsLimitAllocationDecider;
import org.opensearch.cluster.routing.allocation.decider.SnapshotInProgressAllocationDecider;
import org.opensearch.cluster.routing.allocation.decider.TargetPoolAllocationDecider;
import org.opensearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.ParseField;
Expand All @@ -85,6 +86,7 @@
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Setting.Property;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.common.util.set.Sets;
import org.opensearch.common.xcontent.NamedXContentRegistry;
Expand Down Expand Up @@ -359,6 +361,9 @@ public static Collection<AllocationDecider> createAllocationDeciders(
addAllocationDecider(deciders, new ShardsLimitAllocationDecider(settings, clusterSettings));
addAllocationDecider(deciders, new AwarenessAllocationDecider(settings, clusterSettings));
addAllocationDecider(deciders, new NodeLoadAwareAllocationDecider(settings, clusterSettings));
if (FeatureFlags.isEnabled(FeatureFlags.SEARCHABLE_SNAPSHOT)) {
addAllocationDecider(deciders, new TargetPoolAllocationDecider());
}

clusterPlugins.stream()
.flatMap(p -> p.createAllocationDeciders(settings, clusterSettings).stream())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -702,6 +702,23 @@ assert node(failedShard.currentNodeId()).getByShardId(failedShard.shardId()) ==
+ " was matched but wasn't removed";
}

public void swapPrimaryWithReplica(
Logger logger,
ShardRouting primaryShard,
ShardRouting replicaShard,
RoutingChangesObserver changes
) {
assert primaryShard.primary() : "Invalid primary shard provided";
assert !replicaShard.primary() : "Invalid Replica shard provided";

ShardRouting newPrimary = primaryShard.moveActivePrimaryToReplica();
ShardRouting newReplica = replicaShard.moveActiveReplicaToPrimary();
updateAssigned(primaryShard, newPrimary);
updateAssigned(replicaShard, newReplica);
logger.info("Swap relocation performed for shard [{}]", newPrimary.shortSummary());
changes.replicaPromoted(newPrimary);
}

private void unassignPrimaryAndPromoteActiveReplicaIfExists(
ShardRouting failedShard,
UnassignedInfo unassignedInfo,
Expand Down Expand Up @@ -1127,6 +1144,18 @@ public ShardRouting[] drain() {
primaries = 0;
return mutableShardRoutings;
}

/**
* Drains all ignored shards and returns it.
* This method will not drain unassigned shards.
*/
public ShardRouting[] drainIgnored() {
nodes.ensureMutable();
ShardRouting[] mutableShardRoutings = ignored.toArray(new ShardRouting[ignored.size()]);
ignored.clear();
ignoredPrimaries = 0;
return mutableShardRoutings;
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.cluster.routing;

import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.routing.allocation.RoutingAllocation;
import org.opensearch.common.settings.Settings;
import org.opensearch.index.IndexModule;

/**
* {@link RoutingPool} defines the different node types based on the assigned capabilities. The methods
* help decide the capabilities of a specific node as well as an index or shard based on the index configuration.
* These methods help with allocation decisions and determining shard classification with the allocation process.
*
* @opensearch.internal
*/
public enum RoutingPool {
andrross marked this conversation as resolved.
Show resolved Hide resolved
LOCAL_ONLY,
REMOTE_CAPABLE;

/**
* Helps to determine the appropriate {@link RoutingPool} for a given node from the {@link RoutingNode}
*/
public static RoutingPool getNodePool(RoutingNode node) {
return getNodePool(node.node());
}

/**
* Helps to determine the appropriate {@link RoutingPool} for a given node from the {@link DiscoveryNode}
*/
public static RoutingPool getNodePool(DiscoveryNode node) {
if (node.isSearchNode()) {
return REMOTE_CAPABLE;
}
return LOCAL_ONLY;
}

/**
* Can determine the appropriate {@link RoutingPool} for a given shard using the {@link IndexMetadata} for the
* index using the {@link RoutingAllocation}.
* @param shard the shard routing for which {@link RoutingPool} has to be determined.
* @param allocation the current allocation of the cluster
* @return {@link RoutingPool} for the given shard.
*/
public static RoutingPool getShardPool(ShardRouting shard, RoutingAllocation allocation) {
IndexMetadata indexMetadata = allocation.metadata().getIndexSafe(shard.index());
return getIndexPool(indexMetadata);
}

/**
* Can determine the appropriate {@link RoutingPool} for a given index using the {@link IndexMetadata}.
* @param indexMetadata the index metadata object for which {@link RoutingPool} has to be determined.
* @return {@link RoutingPool} for the given index.
*/
public static RoutingPool getIndexPool(IndexMetadata indexMetadata) {
Settings indexSettings = indexMetadata.getSettings();
if (IndexModule.Type.REMOTE_SNAPSHOT.getSettingsKey().equals(indexSettings.get(IndexModule.INDEX_STORE_TYPE_SETTING.getKey()))) {
return REMOTE_CAPABLE;
}
return LOCAL_ONLY;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -534,6 +534,29 @@ public ShardRouting moveToStarted() {
);
}

/**
* Make the active primary shard as replica
*
* @throws IllegalShardRoutingStateException if shard is already a replica
*/
public ShardRouting moveActivePrimaryToReplica() {
assert active() : "expected an active shard " + this;
if (!primary) {
throw new IllegalShardRoutingStateException(this, "Not a primary shard, can't move to replica");
}
return new ShardRouting(
shardId,
currentNodeId,
relocatingNodeId,
false,
state,
recoverySource,
unassignedInfo,
allocationId,
expectedShardSize
);
}

/**
* Make the active shard primary unless it's not primary
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Setting.Property;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;

import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -145,6 +146,13 @@ public void allocate(RoutingAllocation allocation) {
localShardsBalancer.allocateUnassigned();
localShardsBalancer.moveShards();
localShardsBalancer.balance();

if (FeatureFlags.isEnabled(FeatureFlags.SEARCHABLE_SNAPSHOT)) {
final ShardsBalancer remoteShardsBalancer = new RemoteShardsBalancer(logger, allocation);
remoteShardsBalancer.allocateUnassigned();
remoteShardsBalancer.moveShards();
remoteShardsBalancer.balance();
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.routing.RoutingNode;
import org.opensearch.cluster.routing.RoutingNodes;
import org.opensearch.cluster.routing.RoutingPool;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.ShardRoutingState;
import org.opensearch.cluster.routing.UnassignedInfo;
Expand All @@ -27,9 +28,11 @@
import org.opensearch.cluster.routing.allocation.decider.Decision;
import org.opensearch.cluster.routing.allocation.decider.DiskThresholdDecider;
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.gateway.PriorityComparator;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
Expand All @@ -38,6 +41,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

import static org.opensearch.cluster.routing.ShardRoutingState.RELOCATING;
Expand Down Expand Up @@ -102,6 +106,10 @@ public float avgShardsPerNode(String index) {
*/
@Override
public float avgShardsPerNode() {
if (FeatureFlags.isEnabled(FeatureFlags.SEARCHABLE_SNAPSHOT)) {
float totalShards = nodes.values().stream().map(BalancedShardsAllocator.ModelNode::numShards).reduce(0, Integer::sum);
return totalShards / nodes.size();
}
return avgShardsPerNode;
}

Expand Down Expand Up @@ -172,6 +180,11 @@ void balance() {
*/
@Override
MoveDecision decideRebalance(final ShardRouting shard) {
if (FeatureFlags.isEnabled(FeatureFlags.SEARCHABLE_SNAPSHOT)
&& RoutingPool.REMOTE_CAPABLE.equals(RoutingPool.getShardPool(shard, allocation))) {
return MoveDecision.NOT_TAKEN;
}

if (shard.started() == false) {
// we can only rebalance started shards
return MoveDecision.NOT_TAKEN;
Expand Down Expand Up @@ -441,7 +454,19 @@ private void balanceByWeights() {
* to the nodes we relocated them from.
*/
private String[] buildWeightOrderedIndices() {
final String[] indices = allocation.routingTable().indicesRouting().keys().toArray(String.class);
final String[] indices;
if (FeatureFlags.isEnabled(FeatureFlags.SEARCHABLE_SNAPSHOT)) {
final List<String> localIndices = new ArrayList<>();
for (String index : allocation.routingTable().indicesRouting().keys().toArray(String.class)) {
if (RoutingPool.LOCAL_ONLY.equals(RoutingPool.getIndexPool(metadata.index(index)))) {
localIndices.add(index);
}
}
indices = localIndices.toArray(new String[0]);
} else {
indices = allocation.routingTable().indicesRouting().keys().toArray(String.class);
}

final float[] deltas = new float[indices.length];
for (int i = 0; i < deltas.length; i++) {
sorter.reset(indices[i]);
Expand Down Expand Up @@ -507,7 +532,7 @@ void moveShards() {
// shard movements, the goal of this iteration order is to achieve a fairer movement of shards from the nodes that are
// offloading the shards.

// Trying to eliminate target nodes so that we donot unnecessarily iterate over source nodes
// Trying to eliminate target nodes so that we do not unnecessarily iterate over source nodes
// when no target is eligible
for (BalancedShardsAllocator.ModelNode currentNode : sorter.modelNodes) {
checkAndAddInEligibleTargetNode(currentNode.getRoutingNode());
Expand All @@ -533,6 +558,11 @@ void moveShards() {

ShardRouting shardRouting = it.next();

if (FeatureFlags.isEnabled(FeatureFlags.SEARCHABLE_SNAPSHOT)
&& RoutingPool.REMOTE_CAPABLE.equals(RoutingPool.getShardPool(shardRouting, allocation))) {
continue;
}

// Ensure that replicas don't relocate if primaries are being throttled and primary first is enabled
if (movePrimaryFirst && primariesThrottled && !shardRouting.primary()) {
logger.info(
Expand Down Expand Up @@ -593,6 +623,11 @@ void moveShards() {
*/
@Override
MoveDecision decideMove(final ShardRouting shardRouting) {
if (FeatureFlags.isEnabled(FeatureFlags.SEARCHABLE_SNAPSHOT)
&& RoutingPool.REMOTE_CAPABLE.equals(RoutingPool.getShardPool(shardRouting, allocation))) {
return MoveDecision.NOT_TAKEN;
}

if (shardRouting.started() == false) {
// we can only move started shards
return MoveDecision.NOT_TAKEN;
Expand Down Expand Up @@ -680,7 +715,7 @@ private Map<String, BalancedShardsAllocator.ModelNode> buildModelFromAssigned()
for (ShardRouting shard : rn) {
assert rn.nodeId().equals(shard.currentNodeId());
/* we skip relocating shards here since we expect an initializing shard with the same id coming in */
if (shard.state() != RELOCATING) {
if (RoutingPool.LOCAL_ONLY.equals(RoutingPool.getShardPool(shard, allocation)) && shard.state() != RELOCATING) {
node.addShard(shard);
if (logger.isTraceEnabled()) {
logger.trace("Assigned shard [{}] to node [{}]", shard, node.getNodeId());
Expand Down Expand Up @@ -735,7 +770,17 @@ void allocateUnassigned() {
* if we allocate for instance (0, R, IDX1) we move the second replica to the secondary array and proceed with
* the next replica. If we could not find a node to allocate (0,R,IDX1) we move all it's replicas to ignoreUnassigned.
*/
ShardRouting[] primary = unassigned.drain();
ShardRouting[] unassignedShards = unassigned.drain();
if (FeatureFlags.isEnabled(FeatureFlags.SEARCHABLE_SNAPSHOT)) {
List<ShardRouting> allUnassignedShards = Arrays.stream(unassignedShards).collect(Collectors.toList());
List<ShardRouting> localUnassignedShards = allUnassignedShards.stream()
.filter(shard -> RoutingPool.LOCAL_ONLY.equals(RoutingPool.getShardPool(shard, allocation)))
.collect(Collectors.toList());
allUnassignedShards.removeAll(localUnassignedShards);
allUnassignedShards.forEach(shard -> routingNodes.unassigned().add(shard));
unassignedShards = localUnassignedShards.toArray(new ShardRouting[localUnassignedShards.size()]);
}
ShardRouting[] primary = unassignedShards;
ShardRouting[] secondary = new ShardRouting[primary.length];
int secondaryLength = 0;
int primaryLength = primary.length;
Expand Down
Loading