Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] [Writable Warm] Composite Directory implementation and integrating it with FileCache #14489

Merged
merged 2 commits into from
Jun 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add fingerprint ingest processor ([#13724](https://github.com/opensearch-project/OpenSearch/pull/13724))
- [Remote Store] Rate limiter for remote store low priority uploads ([#14374](https://github.com/opensearch-project/OpenSearch/pull/14374/))
- Apply the date histogram rewrite optimization to range aggregation ([#13865](https://github.com/opensearch-project/OpenSearch/pull/13865))
- [Writable Warm] Add composite directory implementation and integrate it with FileCache ([12782](https://github.com/opensearch-project/OpenSearch/pull/12782))

### Dependencies
- Update to Apache Lucene 9.11.0 ([#14042](https://github.com/opensearch-project/OpenSearch/pull/14042))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.remotestore;

import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;

import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FilterDirectory;
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest;
import org.opensearch.action.admin.indices.get.GetIndexRequest;
import org.opensearch.action.admin.indices.get.GetIndexResponse;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.settings.SettingsException;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.index.IndexModule;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.store.CompositeDirectory;
import org.opensearch.index.store.remote.file.CleanerDaemonThreadLeakFilter;
import org.opensearch.index.store.remote.filecache.FileCache;
import org.opensearch.index.store.remote.utils.FileTypeUtils;
import org.opensearch.indices.IndicesService;
import org.opensearch.node.Node;
import org.opensearch.test.InternalTestCluster;
import org.opensearch.test.OpenSearchIntegTestCase;

import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
import java.util.stream.Collectors;

import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;

@ThreadLeakFilters(filters = CleanerDaemonThreadLeakFilter.class)
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0, supportsDedicatedMasters = false)
// Uncomment the below line to enable trace level logs for this test for better debugging
// @TestLogging(reason = "Getting trace logs from composite directory package", value = "org.opensearch.index.store:TRACE")
public class WritableWarmIT extends RemoteStoreBaseIntegTestCase {

protected static final String INDEX_NAME = "test-idx-1";
protected static final int NUM_DOCS_IN_BULK = 1000;

/*
Disabling MockFSIndexStore plugin as the MockFSDirectoryFactory wraps the FSDirectory over a OpenSearchMockDirectoryWrapper which extends FilterDirectory (whereas FSDirectory extends BaseDirectory)
As a result of this wrapping the local directory of Composite Directory does not satisfy the assertion that local directory must be of type FSDirectory
*/
@Override
protected boolean addMockIndexStorePlugin() {
return false;
}

@Override
protected Settings featureFlagSettings() {
Settings.Builder featureSettings = Settings.builder();
featureSettings.put(FeatureFlags.TIERED_REMOTE_INDEX, true);
return featureSettings.build();
}

public void testWritableWarmFeatureFlagDisabled() {
Settings clusterSettings = Settings.builder().put(super.nodeSettings(0)).put(FeatureFlags.TIERED_REMOTE_INDEX, false).build();
InternalTestCluster internalTestCluster = internalCluster();
internalTestCluster.startClusterManagerOnlyNode(clusterSettings);
internalTestCluster.startDataOnlyNode(clusterSettings);

Settings indexSettings = Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.put(IndexModule.INDEX_STORE_LOCALITY_SETTING.getKey(), IndexModule.DataLocalityType.PARTIAL.name())
.build();

try {
prepareCreate(INDEX_NAME).setSettings(indexSettings).get();
fail("Should have thrown Exception as setting should not be registered if Feature Flag is Disabled");
} catch (SettingsException | IllegalArgumentException ex) {
assertEquals(
"unknown setting ["
+ IndexModule.INDEX_STORE_LOCALITY_SETTING.getKey()
+ "] please check that any required plugins are installed, or check the "
+ "breaking changes documentation for removed settings",
ex.getMessage()
);
}
}

public void testWritableWarmBasic() throws Exception {
InternalTestCluster internalTestCluster = internalCluster();
internalTestCluster.startClusterManagerOnlyNode();
internalTestCluster.startDataOnlyNode();
Settings settings = Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.put(IndexModule.INDEX_STORE_LOCALITY_SETTING.getKey(), IndexModule.DataLocalityType.PARTIAL.name())
.build();
assertAcked(client().admin().indices().prepareCreate(INDEX_NAME).setSettings(settings).get());

// Verify from the cluster settings if the data locality is partial
GetIndexResponse getIndexResponse = client().admin()
.indices()
.getIndex(new GetIndexRequest().indices(INDEX_NAME).includeDefaults(true))
.get();
Settings indexSettings = getIndexResponse.settings().get(INDEX_NAME);
assertEquals(IndexModule.DataLocalityType.PARTIAL.name(), indexSettings.get(IndexModule.INDEX_STORE_LOCALITY_SETTING.getKey()));

// Ingesting some docs
indexBulk(INDEX_NAME, NUM_DOCS_IN_BULK);
flushAndRefresh(INDEX_NAME);

// ensuring cluster is green after performing force-merge
ensureGreen();

SearchResponse searchResponse = client().prepareSearch(INDEX_NAME).setQuery(QueryBuilders.matchAllQuery()).get();
// Asserting that search returns same number of docs as ingested
assertHitCount(searchResponse, NUM_DOCS_IN_BULK);

// Ingesting docs again before force merge
indexBulk(INDEX_NAME, NUM_DOCS_IN_BULK);
flushAndRefresh(INDEX_NAME);

FileCache fileCache = internalTestCluster.getDataNodeInstance(Node.class).fileCache();
IndexShard shard = internalTestCluster.getDataNodeInstance(IndicesService.class)
.indexService(resolveIndex(INDEX_NAME))
.getShardOrNull(0);
Directory directory = (((FilterDirectory) (((FilterDirectory) (shard.store().directory())).getDelegate())).getDelegate());

// Force merging the index
Set<String> filesBeforeMerge = new HashSet<>(Arrays.asList(directory.listAll()));
client().admin().indices().prepareForceMerge(INDEX_NAME).setMaxNumSegments(1).get();
flushAndRefresh(INDEX_NAME);
Set<String> filesAfterMerge = new HashSet<>(Arrays.asList(directory.listAll()));

Set<String> filesFromPreviousGenStillPresent = filesBeforeMerge.stream()
.filter(filesAfterMerge::contains)
.filter(file -> !FileTypeUtils.isLockFile(file))
.filter(file -> !FileTypeUtils.isSegmentsFile(file))
.collect(Collectors.toUnmodifiableSet());

// Asserting that after merge all the files from previous gen are no more part of the directory
assertTrue(filesFromPreviousGenStillPresent.isEmpty());

// Asserting that files from previous gen are not present in File Cache as well
filesBeforeMerge.stream()
.filter(file -> !FileTypeUtils.isLockFile(file))
.filter(file -> !FileTypeUtils.isSegmentsFile(file))
.forEach(file -> assertNull(fileCache.get(((CompositeDirectory) directory).getFilePath(file))));

// Deleting the index (so that ref count drops to zero for all the files) and then pruning the cache to clear it to avoid any file
// leaks
assertAcked(client().admin().indices().delete(new DeleteIndexRequest(INDEX_NAME)).get());
fileCache.prune();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.opensearch.common.annotation.PublicApi;
import org.opensearch.common.logging.Loggers;
import org.opensearch.common.settings.Setting.Property;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.index.IndexModule;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.IndexSortConfig;
Expand Down Expand Up @@ -260,7 +261,10 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
* is ready for production release, the feature flag can be removed, and the
* setting should be moved to {@link #BUILT_IN_INDEX_SETTINGS}.
*/
public static final Map<String, List<Setting>> FEATURE_FLAGGED_INDEX_SETTINGS = Map.of();
public static final Map<String, List<Setting>> FEATURE_FLAGGED_INDEX_SETTINGS = Map.of(
FeatureFlags.TIERED_REMOTE_INDEX,
List.of(IndexModule.INDEX_STORE_LOCALITY_SETTING)
);

public static final IndexScopedSettings DEFAULT_SCOPED_SETTINGS = new IndexScopedSettings(Settings.EMPTY, BUILT_IN_INDEX_SETTINGS);

Expand Down
78 changes: 76 additions & 2 deletions server/src/main/java/org/opensearch/index/IndexModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@
import java.util.function.Function;
import java.util.function.Supplier;

import static org.apache.logging.log4j.util.Strings.toRootUpperCase;

/**
* IndexModule represents the central extension point for index level custom implementations like:
* <ul>
Expand Down Expand Up @@ -141,6 +143,17 @@
Property.NodeScope
);

/**
* Index setting which used to determine how the data is cached locally fully or partially
*/
public static final Setting<DataLocalityType> INDEX_STORE_LOCALITY_SETTING = new Setting<>(
"index.store.data_locality",
DataLocalityType.FULL.name(),
DataLocalityType::getValueOf,
Property.IndexScope,
Property.NodeScope
);

public static final Setting<String> INDEX_RECOVERY_TYPE_SETTING = new Setting<>(
"index.recovery.type",
"",
Expand Down Expand Up @@ -297,6 +310,7 @@
private final AtomicBoolean frozen = new AtomicBoolean(false);
private final BooleanSupplier allowExpensiveQueries;
private final Map<String, IndexStorePlugin.RecoveryStateFactory> recoveryStateFactories;
private final FileCache fileCache;

/**
* Construct the index module for the index with the specified index settings. The index module contains extension points for plugins
Expand All @@ -315,7 +329,8 @@
final Map<String, IndexStorePlugin.DirectoryFactory> directoryFactories,
final BooleanSupplier allowExpensiveQueries,
final IndexNameExpressionResolver expressionResolver,
final Map<String, IndexStorePlugin.RecoveryStateFactory> recoveryStateFactories
final Map<String, IndexStorePlugin.RecoveryStateFactory> recoveryStateFactories,
final FileCache fileCache
) {
this.indexSettings = indexSettings;
this.analysisRegistry = analysisRegistry;
Expand All @@ -327,6 +342,30 @@
this.allowExpensiveQueries = allowExpensiveQueries;
this.expressionResolver = expressionResolver;
this.recoveryStateFactories = recoveryStateFactories;
this.fileCache = fileCache;
}

public IndexModule(
final IndexSettings indexSettings,
final AnalysisRegistry analysisRegistry,
final EngineFactory engineFactory,
final EngineConfigFactory engineConfigFactory,
final Map<String, IndexStorePlugin.DirectoryFactory> directoryFactories,
final BooleanSupplier allowExpensiveQueries,
final IndexNameExpressionResolver expressionResolver,
final Map<String, IndexStorePlugin.RecoveryStateFactory> recoveryStateFactories
) {
this(
indexSettings,
analysisRegistry,
engineFactory,
engineConfigFactory,
directoryFactories,
allowExpensiveQueries,
expressionResolver,
recoveryStateFactories,
null
);
}

/**
Expand Down Expand Up @@ -577,6 +616,40 @@
}
}

/**
* Indicates the locality of the data - whether it will be cached fully or partially
*/
public enum DataLocalityType {
/**
* Indicates that all the data will be cached locally
*/
FULL,
/**
* Indicates that only a subset of the data will be cached locally
*/
PARTIAL;

private static final Map<String, DataLocalityType> LOCALITY_TYPES;

static {
final Map<String, DataLocalityType> localityTypes = new HashMap<>(values().length);
for (final DataLocalityType dataLocalityType : values()) {
localityTypes.put(dataLocalityType.name(), dataLocalityType);
}
LOCALITY_TYPES = Collections.unmodifiableMap(localityTypes);
}

public static DataLocalityType getValueOf(final String localityType) {
Objects.requireNonNull(localityType, "No locality type given.");
final String localityTypeName = toRootUpperCase(localityType.trim());
final DataLocalityType type = LOCALITY_TYPES.get(localityTypeName);
if (type != null) {
return type;
}
throw new IllegalArgumentException("Unknown locality type constant [" + localityType + "].");

Check warning on line 649 in server/src/main/java/org/opensearch/index/IndexModule.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/IndexModule.java#L649

Added line #L649 was not covered by tests
}
}

public static Type defaultStoreType(final boolean allowMmap) {
if (allowMmap && Constants.JRE_IS_64BIT && MMapDirectory.UNMAP_SUPPORTED) {
return Type.HYBRIDFS;
Expand Down Expand Up @@ -665,7 +738,8 @@
translogFactorySupplier,
clusterDefaultRefreshIntervalSupplier,
recoverySettings,
remoteStoreSettings
remoteStoreSettings,
fileCache
);
success = true;
return indexService;
Expand Down
Loading
Loading