Skip to content

Commit

Permalink
Remove feature flag for searchable snapshots (#7117) (#7150)
Browse files Browse the repository at this point in the history
Signed-off-by: Kunal Kotwani <kkotwani@amazon.com>
(cherry picked from commit 1c84218)
  • Loading branch information
kotwanikunal authored Apr 13, 2023
1 parent dfc4258 commit a960ec3
Show file tree
Hide file tree
Showing 18 changed files with 78 additions and 141 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add experimental support for ZSTD compression. ([#3577](https://github.com/opensearch-project/OpenSearch/pull/3577))
- [Segment Replication] Add point in time and scroll query compatibility. ([#6644](https://github.com/opensearch-project/OpenSearch/pull/6644))
- Add retry delay as dynamic setting for cluster maanger throttling. ([#6998](https://github.com/opensearch-project/OpenSearch/pull/6998))
- Introduce full support for searchable snapshots ([#5087](https://github.com/opensearch-project/OpenSearch/issues/5087))

### Dependencies
- Bump `org.apache.logging.log4j:log4j-core` from 2.18.0 to 2.20.0 ([#6490](https://github.com/opensearch-project/OpenSearch/pull/6490))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.opensearch.common.io.PathUtils;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.ByteSizeUnit;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.index.Index;
import org.opensearch.index.IndexNotFoundException;
import org.opensearch.index.store.remote.file.CleanerDaemonThreadLeakFilter;
Expand Down Expand Up @@ -60,11 +59,6 @@ protected boolean addMockInternalEngine() {
return false;
}

@Override
protected Settings featureFlagSettings() {
return Settings.builder().put(FeatureFlags.SEARCHABLE_SNAPSHOT, "true").build();
}

@Override
protected Settings.Builder randomRepositorySettings() {
final Settings.Builder settings = Settings.builder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import org.opensearch.common.Nullable;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.core.xcontent.ToXContentFragment;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.discovery.DiscoveryStats;
Expand Down Expand Up @@ -189,7 +188,7 @@ public NodeStats(StreamInput in) throws IOException {
} else {
weightedRoutingStats = null;
}
if (in.getVersion().onOrAfter(Version.V_2_7_0) && FeatureFlags.isEnabled(FeatureFlags.SEARCHABLE_SNAPSHOT)) {
if (in.getVersion().onOrAfter(Version.V_2_7_0)) {
fileCacheStats = in.readOptionalWriteable(FileCacheStats::new);
} else {
fileCacheStats = null;
Expand Down Expand Up @@ -409,7 +408,7 @@ public void writeTo(StreamOutput out) throws IOException {
if (out.getVersion().onOrAfter(Version.V_2_6_0)) {
out.writeOptionalWriteable(weightedRoutingStats);
}
if (out.getVersion().onOrAfter(Version.V_2_7_0) && FeatureFlags.isEnabled(FeatureFlags.SEARCHABLE_SNAPSHOT)) {
if (out.getVersion().onOrAfter(Version.V_2_7_0)) {
out.writeOptionalWriteable(fileCacheStats);
}
}
Expand Down Expand Up @@ -493,7 +492,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
if (getWeightedRoutingStats() != null) {
getWeightedRoutingStats().toXContent(builder, params);
}
if (getFileCacheStats() != null && FeatureFlags.isEnabled(FeatureFlags.SEARCHABLE_SNAPSHOT)) {
if (getFileCacheStats() != null) {
getFileCacheStats().toXContent(builder, params);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.common.logging.DeprecationLogger;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.core.xcontent.ToXContentObject;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.common.xcontent.XContentType;
Expand Down Expand Up @@ -152,7 +151,7 @@ public RestoreSnapshotRequest(StreamInput in) throws IOException {
if (in.getVersion().onOrAfter(LegacyESVersion.V_7_10_0)) {
snapshotUuid = in.readOptionalString();
}
if (FeatureFlags.isEnabled(FeatureFlags.SEARCHABLE_SNAPSHOT) && in.getVersion().onOrAfter(Version.V_2_4_0)) {
if (in.getVersion().onOrAfter(Version.V_2_7_0)) {
storageType = in.readEnum(StorageType.class);
}
}
Expand Down Expand Up @@ -182,7 +181,7 @@ public void writeTo(StreamOutput out) throws IOException {
"restricting the snapshot UUID is forbidden in a cluster with version [" + out.getVersion() + "] nodes"
);
}
if (FeatureFlags.isEnabled(FeatureFlags.SEARCHABLE_SNAPSHOT) && out.getVersion().onOrAfter(Version.V_2_4_0)) {
if (out.getVersion().onOrAfter(Version.V_2_7_0)) {
out.writeEnum(storageType);
}
}
Expand Down Expand Up @@ -595,17 +594,13 @@ public RestoreSnapshotRequest source(Map<String, Object> source) {
throw new IllegalArgumentException("malformed ignore_index_settings section, should be an array of strings");
}
} else if (name.equals("storage_type")) {
if (FeatureFlags.isEnabled(FeatureFlags.SEARCHABLE_SNAPSHOT)) {
if (entry.getValue() instanceof String) {
storageType(StorageType.fromString((String) entry.getValue()));
} else {
throw new IllegalArgumentException("malformed storage_type");
}

if (entry.getValue() instanceof String) {
storageType(StorageType.fromString((String) entry.getValue()));
} else {
throw new IllegalArgumentException(
"Unsupported parameter " + name + ". Feature flag is not enabled for this experimental feature"
);
throw new IllegalArgumentException("malformed storage_type");
}

} else {
if (IndicesOptions.isIndicesOptions(name) == false) {
throw new IllegalArgumentException("Unknown parameter " + name);
Expand Down Expand Up @@ -648,7 +643,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.value(ignoreIndexSetting);
}
builder.endArray();
if (FeatureFlags.isEnabled(FeatureFlags.SEARCHABLE_SNAPSHOT) && storageType != null) {
if (storageType != null) {
storageType.toXContent(builder);
}
builder.endObject();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.inject.Inject;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.index.Index;
import org.opensearch.index.IndexModule;
import org.opensearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -126,38 +125,30 @@ protected ClusterBlockException checkBlock(UpdateSettingsRequest request, Cluste
return null;
}

if (FeatureFlags.isEnabled(FeatureFlags.SEARCHABLE_SNAPSHOT)) {
final Index[] requestIndices = indexNameExpressionResolver.concreteIndices(state, request);
boolean allowSearchableSnapshotSettingsUpdate = true;
// check if all indices in the request are remote snapshot
for (Index index : requestIndices) {
if (state.blocks().indexBlocked(ClusterBlockLevel.METADATA_WRITE, index.getName())) {
allowSearchableSnapshotSettingsUpdate = allowSearchableSnapshotSettingsUpdate
&& IndexModule.Type.REMOTE_SNAPSHOT.match(
state.getMetadata().getIndexSafe(index).getSettings().get(INDEX_STORE_TYPE_SETTING.getKey())
);
}
final Index[] requestIndices = indexNameExpressionResolver.concreteIndices(state, request);
boolean allowSearchableSnapshotSettingsUpdate = true;
// check if all indices in the request are remote snapshot
for (Index index : requestIndices) {
if (state.blocks().indexBlocked(ClusterBlockLevel.METADATA_WRITE, index.getName())) {
allowSearchableSnapshotSettingsUpdate = allowSearchableSnapshotSettingsUpdate
&& IndexModule.Type.REMOTE_SNAPSHOT.match(
state.getMetadata().getIndexSafe(index).getSettings().get(INDEX_STORE_TYPE_SETTING.getKey())
);
}
// check if all settings in the request are in the allow list
if (allowSearchableSnapshotSettingsUpdate) {
for (String setting : request.settings().keySet()) {
allowSearchableSnapshotSettingsUpdate = allowSearchableSnapshotSettingsUpdate
&& (ALLOWLIST_REMOTE_SNAPSHOT_SETTINGS.contains(setting)
|| Stream.of(ALLOWLIST_REMOTE_SNAPSHOT_SETTINGS_PREFIXES).anyMatch(setting::startsWith));
}
}
// check if all settings in the request are in the allow list
if (allowSearchableSnapshotSettingsUpdate) {
for (String setting : request.settings().keySet()) {
allowSearchableSnapshotSettingsUpdate = allowSearchableSnapshotSettingsUpdate
&& (ALLOWLIST_REMOTE_SNAPSHOT_SETTINGS.contains(setting)
|| Stream.of(ALLOWLIST_REMOTE_SNAPSHOT_SETTINGS_PREFIXES).anyMatch(setting::startsWith));
}

return allowSearchableSnapshotSettingsUpdate
? null
: state.blocks()
.indicesBlockedException(
ClusterBlockLevel.METADATA_WRITE,
indexNameExpressionResolver.concreteIndexNames(state, request)
);
}

return state.blocks()
.indicesBlockedException(ClusterBlockLevel.METADATA_WRITE, indexNameExpressionResolver.concreteIndexNames(state, request));
return allowSearchableSnapshotSettingsUpdate
? null
: state.blocks()
.indicesBlockedException(ClusterBlockLevel.METADATA_WRITE, indexNameExpressionResolver.concreteIndexNames(state, request));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,6 @@
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Setting.Property;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.common.util.set.Sets;
import org.opensearch.core.xcontent.NamedXContentRegistry;
Expand Down Expand Up @@ -370,9 +369,7 @@ public static Collection<AllocationDecider> createAllocationDeciders(
addAllocationDecider(deciders, new ShardsLimitAllocationDecider(settings, clusterSettings));
addAllocationDecider(deciders, new AwarenessAllocationDecider(settings, clusterSettings));
addAllocationDecider(deciders, new NodeLoadAwareAllocationDecider(settings, clusterSettings));
if (FeatureFlags.isEnabled(FeatureFlags.SEARCHABLE_SNAPSHOT)) {
addAllocationDecider(deciders, new TargetPoolAllocationDecider());
}
addAllocationDecider(deciders, new TargetPoolAllocationDecider());

clusterPlugins.stream()
.flatMap(p -> p.createAllocationDeciders(settings, clusterSettings).stream())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.common.io.stream.Writeable;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.core.xcontent.ToXContent;
import org.opensearch.core.xcontent.ToXContentObject;
import org.opensearch.core.xcontent.XContentBuilder;
Expand Down Expand Up @@ -287,7 +286,7 @@ public SnapshotRecoverySource(
} else {
index = new IndexId(in.readString(), IndexMetadata.INDEX_UUID_NA_VALUE);
}
if (FeatureFlags.isEnabled(FeatureFlags.SEARCHABLE_SNAPSHOT) && in.getVersion().onOrAfter(Version.V_2_4_0)) {
if (in.getVersion().onOrAfter(Version.V_2_7_0)) {
isSearchableSnapshot = in.readBoolean();
} else {
isSearchableSnapshot = false;
Expand Down Expand Up @@ -330,7 +329,7 @@ protected void writeAdditionalFields(StreamOutput out) throws IOException {
} else {
out.writeString(index.getName());
}
if (FeatureFlags.isEnabled(FeatureFlags.SEARCHABLE_SNAPSHOT) && out.getVersion().onOrAfter(Version.V_2_4_0)) {
if (out.getVersion().onOrAfter(Version.V_2_7_0)) {
out.writeBoolean(isSearchableSnapshot);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Setting.Property;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;

import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -193,12 +192,11 @@ public void allocate(RoutingAllocation allocation) {
localShardsBalancer.moveShards();
localShardsBalancer.balance();

if (FeatureFlags.isEnabled(FeatureFlags.SEARCHABLE_SNAPSHOT)) {
final ShardsBalancer remoteShardsBalancer = new RemoteShardsBalancer(logger, allocation);
remoteShardsBalancer.allocateUnassigned();
remoteShardsBalancer.moveShards();
remoteShardsBalancer.balance();
}
final ShardsBalancer remoteShardsBalancer = new RemoteShardsBalancer(logger, allocation);
remoteShardsBalancer.allocateUnassigned();
remoteShardsBalancer.moveShards();
remoteShardsBalancer.balance();

}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import org.opensearch.cluster.routing.allocation.decider.Decision;
import org.opensearch.cluster.routing.allocation.decider.DiskThresholdDecider;
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.gateway.PriorityComparator;

import java.util.ArrayList;
Expand Down Expand Up @@ -126,11 +125,8 @@ public float avgPrimaryShardsPerNode() {
*/
@Override
public float avgShardsPerNode() {
if (FeatureFlags.isEnabled(FeatureFlags.SEARCHABLE_SNAPSHOT)) {
float totalShards = nodes.values().stream().map(BalancedShardsAllocator.ModelNode::numShards).reduce(0, Integer::sum);
return totalShards / nodes.size();
}
return avgShardsPerNode;
float totalShards = nodes.values().stream().map(BalancedShardsAllocator.ModelNode::numShards).reduce(0, Integer::sum);
return totalShards / nodes.size();
}

/**
Expand Down Expand Up @@ -200,8 +196,7 @@ void balance() {
*/
@Override
MoveDecision decideRebalance(final ShardRouting shard) {
if (FeatureFlags.isEnabled(FeatureFlags.SEARCHABLE_SNAPSHOT)
&& RoutingPool.REMOTE_CAPABLE.equals(RoutingPool.getShardPool(shard, allocation))) {
if (RoutingPool.REMOTE_CAPABLE.equals(RoutingPool.getShardPool(shard, allocation))) {
return MoveDecision.NOT_TAKEN;
}

Expand Down Expand Up @@ -474,18 +469,14 @@ private void balanceByWeights() {
* to the nodes we relocated them from.
*/
private String[] buildWeightOrderedIndices() {
final String[] indices;
if (FeatureFlags.isEnabled(FeatureFlags.SEARCHABLE_SNAPSHOT)) {
final List<String> localIndices = new ArrayList<>();
for (String index : allocation.routingTable().indicesRouting().keys().toArray(String.class)) {
if (RoutingPool.LOCAL_ONLY.equals(RoutingPool.getIndexPool(metadata.index(index)))) {
localIndices.add(index);
}

final List<String> localIndices = new ArrayList<>();
for (String index : allocation.routingTable().indicesRouting().keys().toArray(String.class)) {
if (RoutingPool.LOCAL_ONLY.equals(RoutingPool.getIndexPool(metadata.index(index)))) {
localIndices.add(index);
}
indices = localIndices.toArray(new String[0]);
} else {
indices = allocation.routingTable().indicesRouting().keys().toArray(String.class);
}
final String[] indices = localIndices.toArray(new String[0]);

final float[] deltas = new float[indices.length];
for (int i = 0; i < deltas.length; i++) {
Expand Down Expand Up @@ -578,8 +569,7 @@ void moveShards() {

ShardRouting shardRouting = it.next();

if (FeatureFlags.isEnabled(FeatureFlags.SEARCHABLE_SNAPSHOT)
&& RoutingPool.REMOTE_CAPABLE.equals(RoutingPool.getShardPool(shardRouting, allocation))) {
if (RoutingPool.REMOTE_CAPABLE.equals(RoutingPool.getShardPool(shardRouting, allocation))) {
continue;
}

Expand Down Expand Up @@ -643,8 +633,7 @@ void moveShards() {
*/
@Override
MoveDecision decideMove(final ShardRouting shardRouting) {
if (FeatureFlags.isEnabled(FeatureFlags.SEARCHABLE_SNAPSHOT)
&& RoutingPool.REMOTE_CAPABLE.equals(RoutingPool.getShardPool(shardRouting, allocation))) {
if (RoutingPool.REMOTE_CAPABLE.equals(RoutingPool.getShardPool(shardRouting, allocation))) {
return MoveDecision.NOT_TAKEN;
}

Expand Down Expand Up @@ -791,15 +780,13 @@ void allocateUnassigned() {
* the next replica. If we could not find a node to allocate (0,R,IDX1) we move all it's replicas to ignoreUnassigned.
*/
ShardRouting[] unassignedShards = unassigned.drain();
if (FeatureFlags.isEnabled(FeatureFlags.SEARCHABLE_SNAPSHOT)) {
List<ShardRouting> allUnassignedShards = Arrays.stream(unassignedShards).collect(Collectors.toList());
List<ShardRouting> localUnassignedShards = allUnassignedShards.stream()
.filter(shard -> RoutingPool.LOCAL_ONLY.equals(RoutingPool.getShardPool(shard, allocation)))
.collect(Collectors.toList());
allUnassignedShards.removeAll(localUnassignedShards);
allUnassignedShards.forEach(shard -> routingNodes.unassigned().add(shard));
unassignedShards = localUnassignedShards.toArray(new ShardRouting[localUnassignedShards.size()]);
}
List<ShardRouting> allUnassignedShards = Arrays.stream(unassignedShards).collect(Collectors.toList());
List<ShardRouting> localUnassignedShards = allUnassignedShards.stream()
.filter(shard -> RoutingPool.LOCAL_ONLY.equals(RoutingPool.getShardPool(shard, allocation)))
.collect(Collectors.toList());
allUnassignedShards.removeAll(localUnassignedShards);
allUnassignedShards.forEach(shard -> routingNodes.unassigned().add(shard));
unassignedShards = localUnassignedShards.toArray(new ShardRouting[0]);
ShardRouting[] primary = unassignedShards;
ShardRouting[] secondary = new ShardRouting[primary.length];
int secondaryLength = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -634,7 +634,10 @@ public void apply(Settings value, Settings current, Settings previous) {
SegmentReplicationPressureService.SEGMENT_REPLICATION_INDEXING_PRESSURE_ENABLED,
SegmentReplicationPressureService.MAX_INDEXING_CHECKPOINTS,
SegmentReplicationPressureService.MAX_REPLICATION_TIME_SETTING,
SegmentReplicationPressureService.MAX_ALLOWED_STALE_SHARDS
SegmentReplicationPressureService.MAX_ALLOWED_STALE_SHARDS,

// Settings related to Searchable Snapshots
Node.NODE_SEARCH_CACHE_SIZE_SETTING
)
)
);
Expand All @@ -647,8 +650,6 @@ public void apply(Settings value, Settings current, Settings previous) {
* setting should be moved to {@link #BUILT_IN_CLUSTER_SETTINGS}.
*/
public static final Map<String, List<Setting>> FEATURE_FLAGGED_CLUSTER_SETTINGS = Map.of(
FeatureFlags.SEARCHABLE_SNAPSHOT,
List.of(Node.NODE_SEARCH_CACHE_SIZE_SETTING),
FeatureFlags.SEGMENT_REPLICATION_EXPERIMENTAL,
List.of(IndicesService.CLUSTER_REPLICATION_TYPE_SETTING)
);
Expand Down
Loading

0 comments on commit a960ec3

Please sign in to comment.