Skip to content

Commit

Permalink
Add remote allocator for searchable snapshots
Browse files Browse the repository at this point in the history
Signed-off-by: Kunal Kotwani <kkotwani@amazon.com>

Part of this commit was developed from code and concepts initially implemented
in Amazon OpenSearch Service as part of the UltraWarm feature. Thank you to the
following developers and the entire UltraWarm team.

Co-authored-by: Min Zhou <minzho@amazon.com>
Co-authored-by: Ankit Malpani <malpani@amazon.com>
Co-authored-by: Rohit Nair <rohinair@amazon.com>
Co-authored-by: Sorabh Hamirwasia <hsorabh@amazon.com>
Co-authored-by: Ankit Jain <akjain@amazon.com>
Co-authored-by: Tianru Zhou <tianruz@amazon.com>
Co-authored-by: Neetika Singhal <neetiks@amazon.com>
Co-authored-by: Amit Khandelwal <mkhnde@amazon.com>
Co-authored-by: Vigya Sharma <vigyas@amazon.com>
Co-authored-by: Prateek Sharma <shrprat@amazon.com>
Co-authored-by: Venkata Jyothsna Donapati <donapv@amazon.com>
Co-authored-by: Vlad Rozov <vrozov@amazon.com>
  • Loading branch information
13 people committed Oct 27, 2022
1 parent fe0b917 commit a54f7a3
Show file tree
Hide file tree
Showing 20 changed files with 1,735 additions and 5 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
- Added changes for graceful node decommission ([#4586](https://github.com/opensearch-project/OpenSearch/pull/4586))
- Build no-jdk distributions as part of release build ([#4902](https://github.com/opensearch-project/OpenSearch/pull/4902))
- Renamed flaky tests ([#4912](https://github.com/opensearch-project/OpenSearch/pull/4912))

- Remote shard balancer support for searchable snapshots ([#4870](https://github.com/opensearch-project/OpenSearch/pull/4870))
### Dependencies
- Bumps `log4j-core` from 2.18.0 to 2.19.0
- Bumps `reactor-netty-http` from 1.0.18 to 1.0.23
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,13 @@ protected boolean addMockInternalEngine() {
return false;
}

@Override
protected Settings.Builder randomRepositorySettings() {
final Settings.Builder settings = Settings.builder();
settings.put("location", randomRepoPath()).put("compress", randomBoolean());
return settings;
}

public void testCreateSearchableSnapshot() throws Exception {
final int numReplicasIndex1 = randomIntBetween(1, 4);
final int numReplicasIndex2 = randomIntBetween(0, 2);
Expand Down Expand Up @@ -87,6 +94,8 @@ public void testCreateSearchableSnapshot() throws Exception {

assertTrue(client.admin().indices().prepareDelete("test-idx-1", "test-idx-2").get().isAcknowledged());

internalCluster().ensureAtLeastNumSearchNodes(Math.max(numReplicasIndex1, numReplicasIndex2) + 1);

logger.info("--> restore indices as 'remote_snapshot'");
client.admin()
.cluster()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
import org.opensearch.cluster.routing.allocation.decider.SameShardAllocationDecider;
import org.opensearch.cluster.routing.allocation.decider.ShardsLimitAllocationDecider;
import org.opensearch.cluster.routing.allocation.decider.SnapshotInProgressAllocationDecider;
import org.opensearch.cluster.routing.allocation.decider.TargetPoolAllocationDecider;
import org.opensearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.ParseField;
Expand All @@ -85,6 +86,7 @@
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Setting.Property;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.common.util.set.Sets;
import org.opensearch.common.xcontent.NamedXContentRegistry;
Expand Down Expand Up @@ -359,6 +361,9 @@ public static Collection<AllocationDecider> createAllocationDeciders(
addAllocationDecider(deciders, new ShardsLimitAllocationDecider(settings, clusterSettings));
addAllocationDecider(deciders, new AwarenessAllocationDecider(settings, clusterSettings));
addAllocationDecider(deciders, new NodeLoadAwareAllocationDecider(settings, clusterSettings));
if (FeatureFlags.isEnabled(FeatureFlags.SEARCHABLE_SNAPSHOT)) {
addAllocationDecider(deciders, new TargetPoolAllocationDecider());
}

clusterPlugins.stream()
.flatMap(p -> p.createAllocationDeciders(settings, clusterSettings).stream())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -702,6 +702,23 @@ assert node(failedShard.currentNodeId()).getByShardId(failedShard.shardId()) ==
+ " was matched but wasn't removed";
}

public void swapPrimaryWithReplica(
Logger logger,
ShardRouting primaryShard,
ShardRouting replicaShard,
RoutingChangesObserver changes
) {
assert primaryShard.primary() : "Invalid primary shard provided";
assert !replicaShard.primary() : "Invalid Replica shard provided";

ShardRouting newPrimary = primaryShard.moveActivePrimaryToReplica();
ShardRouting newReplica = replicaShard.moveActiveReplicaToPrimary();
updateAssigned(primaryShard, newPrimary);
updateAssigned(replicaShard, newReplica);
logger.info("Swap relocation performed for shard [{}]", newPrimary.shortSummary());
changes.replicaPromoted(newPrimary);
}

private void unassignPrimaryAndPromoteActiveReplicaIfExists(
ShardRouting failedShard,
UnassignedInfo unassignedInfo,
Expand Down Expand Up @@ -1127,6 +1144,18 @@ public ShardRouting[] drain() {
primaries = 0;
return mutableShardRoutings;
}

/**
* Drains all ignored shards and returns it.
* This method will not drain unassigned shards.
*/
public ShardRouting[] drainIgnored() {
nodes.ensureMutable();
ShardRouting[] mutableShardRoutings = ignored.toArray(new ShardRouting[ignored.size()]);
ignored.clear();
ignoredPrimaries = 0;
return mutableShardRoutings;
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.cluster.routing;

import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.routing.allocation.RoutingAllocation;
import org.opensearch.common.settings.Settings;
import org.opensearch.index.IndexModule;

/**
* {@link RoutingPool} defines the different node types based on the assigned capabilities. The methods
* help decide the capabilities of a specific node as well as an index or shard based on the index configuration.
* These methods help with allocation decisions and determining shard classification with the allocation process.
*
* @opensearch.internal
*/
public enum RoutingPool {
LOCAL_ONLY,
REMOTE_CAPABLE;

/**
* Helps to determine the appropriate {@link RoutingPool} for a given node from the {@link RoutingNode}
*/
public static RoutingPool getNodePool(RoutingNode node) {
return getNodePool(node.node());
}

/**
* Helps to determine the appropriate {@link RoutingPool} for a given node from the {@link DiscoveryNode}
*/
public static RoutingPool getNodePool(DiscoveryNode node) {
if (node.isSearchNode()) {
return REMOTE_CAPABLE;
}
return LOCAL_ONLY;
}

/**
* Can determine the appropriate {@link RoutingPool} for a given shard using the {@link IndexMetadata} for the
* index using the {@link RoutingAllocation}.
* @param shard the shard routing for which {@link RoutingPool} has to be determined.
* @param allocation the current allocation of the cluster
* @return {@link RoutingPool} for the given shard.
*/
public static RoutingPool getShardPool(ShardRouting shard, RoutingAllocation allocation) {
IndexMetadata indexMetadata = allocation.metadata().getIndexSafe(shard.index());
return getIndexPool(indexMetadata);
}

/**
* Can determine the appropriate {@link RoutingPool} for a given index using the {@link IndexMetadata}.
* @param indexMetadata the index metadata object for which {@link RoutingPool} has to be determined.
* @return {@link RoutingPool} for the given index.
*/
public static RoutingPool getIndexPool(IndexMetadata indexMetadata) {
Settings indexSettings = indexMetadata.getSettings();
if (IndexModule.Type.REMOTE_SNAPSHOT.getSettingsKey().equals(indexSettings.get(IndexModule.INDEX_STORE_TYPE_SETTING.getKey()))) {
return REMOTE_CAPABLE;
}
return LOCAL_ONLY;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -534,6 +534,29 @@ public ShardRouting moveToStarted() {
);
}

/**
* Make the active primary shard as replica
*
* @throws IllegalShardRoutingStateException if shard is already a replica
*/
public ShardRouting moveActivePrimaryToReplica() {
assert active() : "expected an active shard " + this;
if (!primary) {
throw new IllegalShardRoutingStateException(this, "Not a primary shard, can't move to replica");
}
return new ShardRouting(
shardId,
currentNodeId,
relocatingNodeId,
false,
state,
recoverySource,
unassignedInfo,
allocationId,
expectedShardSize
);
}

/**
* Make the active shard primary unless it's not primary
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Setting.Property;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;

import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -145,6 +146,13 @@ public void allocate(RoutingAllocation allocation) {
localShardsBalancer.allocateUnassigned();
localShardsBalancer.moveShards();
localShardsBalancer.balance();

if (FeatureFlags.isEnabled(FeatureFlags.SEARCHABLE_SNAPSHOT)) {
final ShardsBalancer remoteShardsBalancer = new RemoteShardsBalancer(logger, allocation);
remoteShardsBalancer.allocateUnassigned();
remoteShardsBalancer.moveShards();
remoteShardsBalancer.balance();
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.routing.RoutingNode;
import org.opensearch.cluster.routing.RoutingNodes;
import org.opensearch.cluster.routing.RoutingPool;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.ShardRoutingState;
import org.opensearch.cluster.routing.UnassignedInfo;
Expand All @@ -27,9 +28,11 @@
import org.opensearch.cluster.routing.allocation.decider.Decision;
import org.opensearch.cluster.routing.allocation.decider.DiskThresholdDecider;
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.gateway.PriorityComparator;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
Expand All @@ -38,6 +41,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

import static org.opensearch.cluster.routing.ShardRoutingState.RELOCATING;
Expand Down Expand Up @@ -102,6 +106,10 @@ public float avgShardsPerNode(String index) {
*/
@Override
public float avgShardsPerNode() {
if (FeatureFlags.isEnabled(FeatureFlags.SEARCHABLE_SNAPSHOT)) {
float totalShards = nodes.values().stream().map(BalancedShardsAllocator.ModelNode::numShards).reduce(0, Integer::sum);
return totalShards / nodes.size();
}
return avgShardsPerNode;
}

Expand Down Expand Up @@ -172,6 +180,11 @@ void balance() {
*/
@Override
MoveDecision decideRebalance(final ShardRouting shard) {
if (FeatureFlags.isEnabled(FeatureFlags.SEARCHABLE_SNAPSHOT)
&& RoutingPool.REMOTE_CAPABLE.equals(RoutingPool.getShardPool(shard, allocation))) {
return MoveDecision.NOT_TAKEN;
}

if (shard.started() == false) {
// we can only rebalance started shards
return MoveDecision.NOT_TAKEN;
Expand Down Expand Up @@ -441,7 +454,19 @@ private void balanceByWeights() {
* to the nodes we relocated them from.
*/
private String[] buildWeightOrderedIndices() {
final String[] indices = allocation.routingTable().indicesRouting().keys().toArray(String.class);
final String[] indices;
if (FeatureFlags.isEnabled(FeatureFlags.SEARCHABLE_SNAPSHOT)) {
final List<String> localIndices = new ArrayList<>();
for (String index : allocation.routingTable().indicesRouting().keys().toArray(String.class)) {
if (RoutingPool.LOCAL_ONLY.equals(RoutingPool.getIndexPool(metadata.index(index)))) {
localIndices.add(index);
}
}
indices = localIndices.toArray(new String[0]);
} else {
indices = allocation.routingTable().indicesRouting().keys().toArray(String.class);
}

final float[] deltas = new float[indices.length];
for (int i = 0; i < deltas.length; i++) {
sorter.reset(indices[i]);
Expand Down Expand Up @@ -507,7 +532,7 @@ void moveShards() {
// shard movements, the goal of this iteration order is to achieve a fairer movement of shards from the nodes that are
// offloading the shards.

// Trying to eliminate target nodes so that we donot unnecessarily iterate over source nodes
// Trying to eliminate target nodes so that we do not unnecessarily iterate over source nodes
// when no target is eligible
for (BalancedShardsAllocator.ModelNode currentNode : sorter.modelNodes) {
checkAndAddInEligibleTargetNode(currentNode.getRoutingNode());
Expand All @@ -533,6 +558,11 @@ void moveShards() {

ShardRouting shardRouting = it.next();

if (FeatureFlags.isEnabled(FeatureFlags.SEARCHABLE_SNAPSHOT)
&& RoutingPool.REMOTE_CAPABLE.equals(RoutingPool.getShardPool(shardRouting, allocation))) {
continue;
}

// Ensure that replicas don't relocate if primaries are being throttled and primary first is enabled
if (movePrimaryFirst && primariesThrottled && !shardRouting.primary()) {
logger.info(
Expand Down Expand Up @@ -593,6 +623,11 @@ void moveShards() {
*/
@Override
MoveDecision decideMove(final ShardRouting shardRouting) {
if (FeatureFlags.isEnabled(FeatureFlags.SEARCHABLE_SNAPSHOT)
&& RoutingPool.REMOTE_CAPABLE.equals(RoutingPool.getShardPool(shardRouting, allocation))) {
return MoveDecision.NOT_TAKEN;
}

if (shardRouting.started() == false) {
// we can only move started shards
return MoveDecision.NOT_TAKEN;
Expand Down Expand Up @@ -680,7 +715,7 @@ private Map<String, BalancedShardsAllocator.ModelNode> buildModelFromAssigned()
for (ShardRouting shard : rn) {
assert rn.nodeId().equals(shard.currentNodeId());
/* we skip relocating shards here since we expect an initializing shard with the same id coming in */
if (shard.state() != RELOCATING) {
if (RoutingPool.LOCAL_ONLY.equals(RoutingPool.getShardPool(shard, allocation)) && shard.state() != RELOCATING) {
node.addShard(shard);
if (logger.isTraceEnabled()) {
logger.trace("Assigned shard [{}] to node [{}]", shard, node.getNodeId());
Expand Down Expand Up @@ -735,7 +770,17 @@ void allocateUnassigned() {
* if we allocate for instance (0, R, IDX1) we move the second replica to the secondary array and proceed with
* the next replica. If we could not find a node to allocate (0,R,IDX1) we move all it's replicas to ignoreUnassigned.
*/
ShardRouting[] primary = unassigned.drain();
ShardRouting[] unassignedShards = unassigned.drain();
if (FeatureFlags.isEnabled(FeatureFlags.SEARCHABLE_SNAPSHOT)) {
List<ShardRouting> allUnassignedShards = Arrays.stream(unassignedShards).collect(Collectors.toList());
List<ShardRouting> localUnassignedShards = allUnassignedShards.stream()
.filter(shard -> RoutingPool.LOCAL_ONLY.equals(RoutingPool.getShardPool(shard, allocation)))
.collect(Collectors.toList());
allUnassignedShards.removeAll(localUnassignedShards);
allUnassignedShards.forEach(shard -> routingNodes.unassigned().add(shard));
unassignedShards = localUnassignedShards.toArray(new ShardRouting[localUnassignedShards.size()]);
}
ShardRouting[] primary = unassignedShards;
ShardRouting[] secondary = new ShardRouting[primary.length];
int secondaryLength = 0;
int primaryLength = primary.length;
Expand Down
Loading

0 comments on commit a54f7a3

Please sign in to comment.