Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [Security Manager Replacement] Create initial Java Agent to intercept Socket::connect calls ([#17724](https://github.com/opensearch-project/OpenSearch/pull/17724))
- Add ingestion management APIs for pause, resume and get ingestion state ([#17631](https://github.com/opensearch-project/OpenSearch/pull/17631))
- [Security Manager Replacement] Enhance Java Agent to intercept System::exit ([#17746](https://github.com/opensearch-project/OpenSearch/pull/17746))
- Support AutoExpand for SearchReplica ([#17741](https://github.com/opensearch-project/OpenSearch/pull/17741))

### Changed
- Migrate BC libs to their FIPS counterparts ([#14912](https://github.com/opensearch-project/OpenSearch/pull/14912))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.cluster.metadata;

import org.opensearch.cluster.routing.UnassignedInfo;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.remotestore.RemoteStoreBaseIntegTestCase;
import org.opensearch.test.InternalTestCluster;
import org.opensearch.test.OpenSearchIntegTestCase;

import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REPLICATION_TYPE;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class AutoExpandSearchReplicasIT extends RemoteStoreBaseIntegTestCase {

@Override
protected Settings featureFlagSettings() {
return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.READER_WRITER_SPLIT_EXPERIMENTAL, Boolean.TRUE).build();
}

public void testAutoExpandSearchReplica() throws Exception {
String indexName = "test";
internalCluster().startClusterManagerOnlyNode();

// Create a cluster with 2 data nodes and 1 search node
internalCluster().startDataOnlyNode();
internalCluster().startDataOnlyNode();
String searchNode = internalCluster().startSearchOnlyNode();

// Create index with 1 primary, 1 replica and 1 search replica shards
createIndex(
indexName,
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_SEARCH_REPLICAS, 1)
.put(SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
.put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), TimeValue.timeValueMillis(0))
.build()
);
ensureGreen();

assertBusy(() -> assertEquals(1, getNumShards(indexName).numSearchReplicas));

// Enable auto expand for search replica
client().admin()
.indices()
.prepareUpdateSettings(indexName)
.setSettings(Settings.builder().put("index.auto_expand_search_replicas", "0-all"))
.get();

// Add 1 more search nodes
internalCluster().startSearchOnlyNode();

assertBusy(() -> assertEquals(2, getNumShards(indexName).numSearchReplicas));

// Stop a node which hosts search replica
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(searchNode));
assertBusy(() -> assertEquals(1, getNumShards(indexName).numSearchReplicas));

// Add 1 more search nodes
internalCluster().startSearchOnlyNode();
assertBusy(() -> assertEquals(2, getNumShards(indexName).numSearchReplicas));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@

package org.opensearch.cluster.metadata;

import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.routing.allocation.RoutingAllocation;
import org.opensearch.cluster.routing.allocation.decider.Decision;
import org.opensearch.common.Booleans;
Expand Down Expand Up @@ -142,13 +141,14 @@ public boolean isEnabled() {

private OptionalInt getDesiredNumberOfReplicas(IndexMetadata indexMetadata, RoutingAllocation allocation) {
if (enabled) {
int numMatchingDataNodes = 0;
for (final DiscoveryNode cursor : allocation.nodes().getDataNodes().values()) {
Decision decision = allocation.deciders().shouldAutoExpandToNode(indexMetadata, cursor, allocation);
if (decision.type() != Decision.Type.NO) {
numMatchingDataNodes++;
}
}
int numMatchingDataNodes = (int) allocation.nodes()
.getDataNodes()
.values()
.stream()
.filter(node -> node.isSearchNode() == false)
.map(node -> allocation.deciders().shouldAutoExpandToNode(indexMetadata, node, allocation))
.filter(decision -> decision.type() != Decision.Type.NO)
.count();

final int min = getMinReplicas();
final int max = getMaxReplicas(numMatchingDataNodes);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.cluster.metadata;

import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.routing.allocation.RoutingAllocation;
import org.opensearch.cluster.routing.allocation.decider.Decision;
import org.opensearch.common.Booleans;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Setting.Property;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.OptionalInt;

import static org.opensearch.cluster.metadata.MetadataIndexStateService.isIndexVerifiedBeforeClosed;

/**
* This class acts as a functional wrapper around the {@code index.auto_expand_search_replicas} setting.
* This setting's value expands into a minimum and maximum value, requiring special handling based on the
* number of search nodes in the cluster. This class handles parsing and simplifies access to these values.
*
* @opensearch.internal
*/
public final class AutoExpandSearchReplicas {
// the value we recognize in the "max" position to mean all the search nodes
private static final String ALL_NODES_VALUE = "all";

private static final AutoExpandSearchReplicas FALSE_INSTANCE = new AutoExpandSearchReplicas(0, 0, false);

public static final Setting<AutoExpandSearchReplicas> SETTING = new Setting<>(
IndexMetadata.SETTING_AUTO_EXPAND_SEARCH_REPLICAS,
"false",
AutoExpandSearchReplicas::parse,
Property.Dynamic,
Property.IndexScope
);

private static AutoExpandSearchReplicas parse(String value) {
final int min;
final int max;
if (Booleans.isFalse(value)) {
return FALSE_INSTANCE;
}
final int dash = value.indexOf('-');
if (-1 == dash) {
throw new IllegalArgumentException(
"failed to parse [" + IndexMetadata.SETTING_AUTO_EXPAND_SEARCH_REPLICAS + "] from value: [" + value + "] at index " + dash
);
}
final String sMin = value.substring(0, dash);
try {
min = Integer.parseInt(sMin);
} catch (NumberFormatException e) {
throw new IllegalArgumentException(
"failed to parse [" + IndexMetadata.SETTING_AUTO_EXPAND_SEARCH_REPLICAS + "] from value: [" + value + "] at index " + dash,
e
);
}
String sMax = value.substring(dash + 1);
if (sMax.equals(ALL_NODES_VALUE)) {
max = Integer.MAX_VALUE;
} else {
try {
max = Integer.parseInt(sMax);
} catch (NumberFormatException e) {
throw new IllegalArgumentException(
"failed to parse ["
+ IndexMetadata.SETTING_AUTO_EXPAND_SEARCH_REPLICAS
+ "] from value: ["
+ value
+ "] at index "
+ dash,
e
);
}
}
return new AutoExpandSearchReplicas(min, max, true);
}

private final int minSearchReplicas;
private final int maxSearchReplicas;
private final boolean enabled;

private AutoExpandSearchReplicas(int minReplicas, int maxReplicas, boolean enabled) {
if (minReplicas > maxReplicas) {
throw new IllegalArgumentException(
"["
+ IndexMetadata.SETTING_AUTO_EXPAND_SEARCH_REPLICAS
+ "] minSearchReplicas must be =< maxSearchReplicas but wasn't "
+ minReplicas
+ " > "
+ maxReplicas
);
}
this.minSearchReplicas = minReplicas;
this.maxSearchReplicas = maxReplicas;
this.enabled = enabled;
}

int getMinSearchReplicas() {
return minSearchReplicas;
}

public int getMaxSearchReplicas() {
return maxSearchReplicas;
}

public boolean isEnabled() {
return enabled;
}

private OptionalInt getDesiredNumberOfSearchReplicas(IndexMetadata indexMetadata, RoutingAllocation allocation) {
int numMatchingSearchNodes = (int) allocation.nodes()
.getDataNodes()
.values()
.stream()
.filter(DiscoveryNode::isSearchNode)
.map(node -> allocation.deciders().shouldAutoExpandToNode(indexMetadata, node, allocation))
.filter(decision -> decision.type() != Decision.Type.NO)
.count();

return calculateNumberOfSearchReplicas(numMatchingSearchNodes);
}

// package private for testing
OptionalInt calculateNumberOfSearchReplicas(int numMatchingSearchNodes) {
// Calculate the maximum possible number of search replicas
int maxPossibleReplicas = Math.min(numMatchingSearchNodes, maxSearchReplicas);

// Determine the number of search replicas
int numberOfSearchReplicas = Math.max(minSearchReplicas, maxPossibleReplicas);

// Additional check to ensure we don't exceed max possible search replicas
if (numberOfSearchReplicas <= maxPossibleReplicas) {
return OptionalInt.of(numberOfSearchReplicas);
}

return OptionalInt.empty();
}

@Override
public String toString() {
return enabled ? minSearchReplicas + "-" + maxSearchReplicas : "false";
}

/**
* Checks if there are search replicas with the auto-expand feature that need to be adapted.
* Returns a map of updates, which maps the indices to be updated to the desired number of search replicas.
* The map has the desired number of search replicas as key and the indices to update as value, as this allows the result
* of this method to be directly applied to RoutingTable.Builder#updateNumberOfSearchReplicas.
*/
public static Map<Integer, List<String>> getAutoExpandSearchReplicaChanges(Metadata metadata, RoutingAllocation allocation) {
Map<Integer, List<String>> updatedSearchReplicas = new HashMap<>();

for (final IndexMetadata indexMetadata : metadata) {
if (indexMetadata.getState() == IndexMetadata.State.OPEN || isIndexVerifiedBeforeClosed(indexMetadata)) {
AutoExpandSearchReplicas autoExpandSearchReplicas = SETTING.get(indexMetadata.getSettings());
if (autoExpandSearchReplicas.isEnabled()) {
autoExpandSearchReplicas.getDesiredNumberOfSearchReplicas(indexMetadata, allocation)
.ifPresent(numberOfSearchReplicas -> {
if (numberOfSearchReplicas != indexMetadata.getNumberOfSearchOnlyReplicas()) {
updatedSearchReplicas.computeIfAbsent(numberOfSearchReplicas, ArrayList::new)
.add(indexMetadata.getIndex().getName());
}
});
}
}
}
return updatedSearchReplicas;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -492,7 +492,9 @@ public Iterator<Setting<?>> settings() {
);

public static final String SETTING_AUTO_EXPAND_REPLICAS = "index.auto_expand_replicas";
public static final String SETTING_AUTO_EXPAND_SEARCH_REPLICAS = "index.auto_expand_search_replicas";
public static final Setting<AutoExpandReplicas> INDEX_AUTO_EXPAND_REPLICAS_SETTING = AutoExpandReplicas.SETTING;
public static final Setting<AutoExpandSearchReplicas> INDEX_AUTO_EXPAND_SEARCH_REPLICAS_SETTING = AutoExpandSearchReplicas.SETTING;

/**
* Blocks the API.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1505,7 +1505,10 @@ List<String> getIndexSettingsValidationErrors(

Optional<String> replicaValidationError = awarenessReplicaBalance.validate(replicaCount, autoExpandReplica);
replicaValidationError.ifPresent(validationErrors::add);
Optional<String> searchReplicaValidationError = awarenessReplicaBalance.validate(searchReplicaCount);
Optional<String> searchReplicaValidationError = awarenessReplicaBalance.validate(
searchReplicaCount,
AutoExpandSearchReplicas.SETTING.get(settings)
);
searchReplicaValidationError.ifPresent(validationErrors::add);
}
return validationErrors;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,10 @@ public ClusterState execute(ClusterState currentState) {
for (Index index : request.indices()) {
if (index.getName().charAt(0) != '.') {
// No replica count validation for system indices
Optional<String> error = awarenessReplicaBalance.validate(updatedNumberOfSearchReplicas);
Optional<String> error = awarenessReplicaBalance.validate(
updatedNumberOfSearchReplicas,
AutoExpandSearchReplicas.SETTING.get(openSettings)
);

if (error.isPresent()) {
ValidationException ex = new ValidationException();
Expand Down
Loading
Loading