Skip to content

Commit

Permalink
Enhance searchable snapshots to enable a read-only view of older snap…
Browse files Browse the repository at this point in the history
…shots (#5429)

* Enhance searchable snapshots to enable a read-only view of older snapshots

This change removes the guardrails around N-1 backward compatibility and uses Lucene's "expert" APIs to read snapshots (Lucene segments) older than N-1 on a best-effort basis. The functionality is gated by an additional feature flag, separate from the searchable snapshots flag. Note that the Lucene integration is rather inefficient because the necessary "expert" Lucene APIs are still package-private.

Signed-off-by: Kartik Ganesh <gkart@amazon.com>

* Added some unit tests

This change also includes a test index ZIP file for the unit tests. The change also introduces a bug fix in the readAnySegmentsInfo method to close the reader before returning the SegmentInfos object - this avoids dangling/open file handles.

Signed-off-by: Kartik Ganesh <gkart@amazon.com>

* Incorporating PR feedback

Signed-off-by: Kartik Ganesh <gkart@amazon.com>

* Incorporate PR comments from andrross

Signed-off-by: Kartik Ganesh <gkart@amazon.com>

* Remove use of IndexSetting for minimum version for snapshots backwards compatibility

Signed-off-by: Kartik Ganesh <gkart@amazon.com>

* Moved ES 6.3.0 test data to a subdirectory

This change also includes an update to the file name to clarify that it is an ES index, and changing the associated markdown file name to just README.md. All tests that reference this ZIP file have corresponding changes to the path they reference.

Signed-off-by: Kartik Ganesh <gkart@amazon.com>

* Update unit tests to use try-with-resources

Signed-off-by: Kartik Ganesh <gkart@amazon.com>

* Added FeatureFlagSetter helper class

Also refactored unit test classes to use the helper class.

Signed-off-by: Kartik Ganesh <gkart@amazon.com>

* Incorporating PR feedback from @mch2

Signed-off-by: Kartik Ganesh <gkart@amazon.com>

* Fix IndexSettingsTests

Updated the asserts in IndexSettingsTests to account for the new defaulting behavior.

Signed-off-by: Kartik Ganesh <gkart@amazon.com>

Signed-off-by: Kartik Ganesh <gkart@amazon.com>
  • Loading branch information
kartg authored Jan 5, 2023
1 parent 53a4cae commit 85b85b1
Show file tree
Hide file tree
Showing 28 changed files with 554 additions and 86 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Added experimental support for extensions ([#5347](https://github.com/opensearch-project/OpenSearch/pull/5347)), ([#5518](https://github.com/opensearch-project/OpenSearch/pull/5518)), ([#5597](https://github.com/opensearch-project/OpenSearch/pull/5597)), ([#5615](https://github.com/opensearch-project/OpenSearch/pull/5615)))
- Add CI bundle pattern to distribution download ([#5348](https://github.com/opensearch-project/OpenSearch/pull/5348))
- Added @gbbafna as an OpenSearch maintainer ([#5668](https://github.com/opensearch-project/OpenSearch/pull/5668))
- Experimental support for extended backward compatiblity in searchable snapshots ([#5429](https://github.com/opensearch-project/OpenSearch/pull/5429))

### Dependencies
- Bump bcpg-fips from 1.0.5.1 to 1.0.7.1 ([#5148](https://github.com/opensearch-project/OpenSearch/pull/5148))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -394,8 +394,7 @@ public Builder addBlocks(IndexMetadata indexMetadata) {
if (IndexMetadata.INDEX_BLOCKS_READ_ONLY_ALLOW_DELETE_SETTING.get(indexMetadata.getSettings())) {
addIndexBlock(indexName, IndexMetadata.INDEX_READ_ONLY_ALLOW_DELETE_BLOCK);
}
if (IndexModule.Type.REMOTE_SNAPSHOT.getSettingsKey()
.equals(indexMetadata.getSettings().get(IndexModule.INDEX_STORE_TYPE_SETTING.getKey()))) {
if (IndexModule.Type.REMOTE_SNAPSHOT.match(indexMetadata.getSettings().get(IndexModule.INDEX_STORE_TYPE_SETTING.getKey()))) {
addIndexBlock(indexName, IndexMetadata.REMOTE_READ_ONLY_ALLOW_DELETE);
}
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1825,10 +1825,11 @@ public static IndexMetadata fromXContent(XContentParser parser) throws IOExcepti
throw new IllegalArgumentException("Unexpected token " + token);
}
}
if (Assertions.ENABLED) {
// Reference:
// https://github.com/opensearch-project/OpenSearch/blob/4dde0f2a3b445b2fc61dab29c5a2178967f4a3e3/server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java#L1620-L1628
Version legacyVersion = LegacyESVersion.fromId(6050099);
if (Assertions.ENABLED && Version.indexCreated(builder.settings).onOrAfter(legacyVersion)) {
assert mappingVersion : "mapping version should be present for indices";
}
if (Assertions.ENABLED) {
assert settingsVersion : "settings version should be present for indices";
}
if (Assertions.ENABLED && Version.indexCreated(builder.settings).onOrAfter(LegacyESVersion.V_7_2_0)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public static RoutingPool getShardPool(ShardRouting shard, RoutingAllocation all
*/
public static RoutingPool getIndexPool(IndexMetadata indexMetadata) {
Settings indexSettings = indexMetadata.getSettings();
if (IndexModule.Type.REMOTE_SNAPSHOT.getSettingsKey().equals(indexSettings.get(IndexModule.INDEX_STORE_TYPE_SETTING.getKey()))) {
if (IndexModule.Type.REMOTE_SNAPSHOT.match(indexSettings.get(IndexModule.INDEX_STORE_TYPE_SETTING.getKey()))) {
return REMOTE_CAPABLE;
}
return LOCAL_ONLY;
Expand Down
20 changes: 20 additions & 0 deletions server/src/main/java/org/opensearch/common/lucene/Lucene.java
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
import org.apache.lucene.index.SortedDocValues;
import org.apache.lucene.index.SortedNumericDocValues;
import org.apache.lucene.index.SortedSetDocValues;
import org.apache.lucene.index.StandardDirectoryReader;
import org.apache.lucene.index.StoredFieldVisitor;
import org.apache.lucene.index.Terms;
import org.apache.lucene.index.VectorValues;
Expand Down Expand Up @@ -162,6 +163,25 @@ public static SegmentInfos readSegmentInfos(Directory directory) throws IOExcept
return SegmentInfos.readLatestCommit(directory);
}

/**
* A variant of {@link #readSegmentInfos(Directory)} that supports reading indices written by
* older major versions of Lucene. The underlying implementation is a workaround since the
* "expert" readLatestCommit API is currently package-private in Lucene. First, all commits in
* the given {@link Directory} are listed - this result includes older Lucene commits. Then,
* the latest index commit is opened via {@link DirectoryReader} by including a minimum supported
* Lucene major version based on the minimum compatibility of the given {@link org.opensearch.Version}.
*/
public static SegmentInfos readSegmentInfosExtendedCompatibility(Directory directory, org.opensearch.Version minimumVersion)
throws IOException {
// This list is sorted from oldest to latest
List<IndexCommit> indexCommits = DirectoryReader.listCommits(directory);
IndexCommit latestCommit = indexCommits.get(indexCommits.size() - 1);
final int minSupportedLuceneMajor = minimumVersion.minimumIndexCompatibilityVersion().luceneVersion.major;
try (StandardDirectoryReader reader = (StandardDirectoryReader) DirectoryReader.open(latestCommit, minSupportedLuceneMajor, null)) {
return reader.getSegmentInfos();
}
}

/**
* Returns an iterable that allows to iterate over all files in this segments info
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,13 @@ public class FeatureFlags {
*/
public static final String SEARCHABLE_SNAPSHOT = "opensearch.experimental.feature.searchable_snapshot.enabled";

/**
* Gates the ability for Searchable Snapshots to read snapshots that are older than the
* guaranteed backward compatibility for OpenSearch (one prior major version) on a best effort basis.
*/
public static final String SEARCHABLE_SNAPSHOT_EXTENDED_COMPATIBILITY =
"opensearch.experimental.feature.searchable_snapshot.extended_compatibility.enabled";

/**
* Gates the functionality of extensions.
* Once the feature is ready for production release, this feature flag can be removed.
Expand Down
14 changes: 11 additions & 3 deletions server/src/main/java/org/opensearch/index/IndexModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -459,11 +459,19 @@ public boolean match(String setting) {
}

/**
* Convenience method to check whether the given IndexSettings contains
* an {@link #INDEX_STORE_TYPE_SETTING} set to the value of this type.
* Convenience method to check whether the given {@link IndexSettings}
* object contains an {@link #INDEX_STORE_TYPE_SETTING} set to the value of this type.
*/
public boolean match(IndexSettings settings) {
return match(INDEX_STORE_TYPE_SETTING.get(settings.getSettings()));
return match(settings.getSettings());
}

/**
* Convenience method to check whether the given {@link Settings}
* object contains an {@link #INDEX_STORE_TYPE_SETTING} set to the value of this type.
*/
public boolean match(Settings settings) {
return match(INDEX_STORE_TYPE_SETTING.get(settings));
}
}

Expand Down
28 changes: 28 additions & 0 deletions server/src/main/java/org/opensearch/index/IndexSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.opensearch.common.unit.ByteSizeUnit;
import org.opensearch.common.unit.ByteSizeValue;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.index.translog.Translog;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.ingest.IngestService;
Expand All @@ -60,11 +61,13 @@
import java.util.function.Function;
import java.util.function.UnaryOperator;

import static org.opensearch.common.util.FeatureFlags.SEARCHABLE_SNAPSHOT_EXTENDED_COMPATIBILITY;
import static org.opensearch.index.mapper.MapperService.INDEX_MAPPING_DEPTH_LIMIT_SETTING;
import static org.opensearch.index.mapper.MapperService.INDEX_MAPPING_FIELD_NAME_LENGTH_LIMIT_SETTING;
import static org.opensearch.index.mapper.MapperService.INDEX_MAPPING_NESTED_DOCS_LIMIT_SETTING;
import static org.opensearch.index.mapper.MapperService.INDEX_MAPPING_NESTED_FIELDS_LIMIT_SETTING;
import static org.opensearch.index.mapper.MapperService.INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING;
import static org.opensearch.index.store.remote.directory.RemoteSnapshotDirectory.SEARCHABLE_SNAPSHOT_EXTENDED_COMPATIBILITY_MINIMUM_VERSION;

/**
* This class encapsulates all index level settings and handles settings updates.
Expand Down Expand Up @@ -585,6 +588,8 @@ public final class IndexSettings {
private final boolean isRemoteStoreEnabled;
private final String remoteStoreRepository;
private final boolean isRemoteTranslogStoreEnabled;
private final boolean isRemoteSnapshot;
private Version extendedCompatibilitySnapshotVersion;
// volatile fields are updated via #updateIndexMetadata(IndexMetadata) under lock
private volatile Settings settings;
private volatile IndexMetadata indexMetadata;
Expand Down Expand Up @@ -747,6 +752,13 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti
isRemoteStoreEnabled = settings.getAsBoolean(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, false);
remoteStoreRepository = settings.get(IndexMetadata.SETTING_REMOTE_STORE_REPOSITORY);
isRemoteTranslogStoreEnabled = settings.getAsBoolean(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_ENABLED, false);
isRemoteSnapshot = IndexModule.Type.REMOTE_SNAPSHOT.match(this.settings);

if (isRemoteSnapshot && FeatureFlags.isEnabled(SEARCHABLE_SNAPSHOT_EXTENDED_COMPATIBILITY)) {
extendedCompatibilitySnapshotVersion = SEARCHABLE_SNAPSHOT_EXTENDED_COMPATIBILITY_MINIMUM_VERSION;
} else {
extendedCompatibilitySnapshotVersion = Version.CURRENT.minimumIndexCompatibilityVersion();
}
this.searchThrottled = INDEX_SEARCH_THROTTLED.get(settings);
this.queryStringLenient = QUERY_STRING_LENIENT_SETTING.get(settings);
this.queryStringAnalyzeWildcard = QUERY_STRING_ANALYZE_WILDCARD.get(nodeSettings);
Expand Down Expand Up @@ -1012,6 +1024,22 @@ public boolean isRemoteTranslogStoreEnabled() {
return isRemoteTranslogStoreEnabled;
}

/**
* Returns true if this is remote/searchable snapshot
*/
public boolean isRemoteSnapshot() {
return isRemoteSnapshot;
}

/**
* If this is a remote snapshot and the extended compatibility
* feature flag is enabled, this returns the minimum {@link Version}
* supported. In all other cases, the return value is null.
*/
public Version getExtendedCompatibilitySnapshotVersion() {
return extendedCompatibilitySnapshotVersion;
}

/**
* Returns the node settings. The settings returned from {@link #getSettings()} are a merged version of the
* index settings and the node settings where node settings are overwritten by index settings.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ public class ReadOnlyEngine extends Engine {
private final CompletionStatsCache completionStatsCache;
private final boolean requireCompleteHistory;
private final TranslogManager translogManager;
private final Version minimumSupportedVersion;

protected volatile TranslogStats translogStats;

Expand All @@ -119,6 +120,8 @@ public ReadOnlyEngine(
) {
super(config);
this.requireCompleteHistory = requireCompleteHistory;
// fetch the minimum Version for extended backward compatibility use-cases
this.minimumSupportedVersion = config.getIndexSettings().getExtendedCompatibilitySnapshotVersion();
try {
Store store = config.getStore();
store.incRef();
Expand All @@ -130,7 +133,11 @@ public ReadOnlyEngine(
// we obtain the IW lock even though we never modify the index.
// yet this makes sure nobody else does. including some testing tools that try to be messy
indexWriterLock = obtainLock ? directory.obtainLock(IndexWriter.WRITE_LOCK_NAME) : null;
this.lastCommittedSegmentInfos = Lucene.readSegmentInfos(directory);
if (isExtendedCompatibility()) {
this.lastCommittedSegmentInfos = Lucene.readSegmentInfosExtendedCompatibility(directory, this.minimumSupportedVersion);
} else {
this.lastCommittedSegmentInfos = Lucene.readSegmentInfos(directory);
}
if (seqNoStats == null) {
seqNoStats = buildSeqNoStats(config, lastCommittedSegmentInfos);
ensureMaxSeqNoEqualsToGlobalCheckpoint(seqNoStats);
Expand Down Expand Up @@ -223,7 +230,17 @@ protected final OpenSearchDirectoryReader wrapReader(

protected DirectoryReader open(IndexCommit commit) throws IOException {
assert Transports.assertNotTransportThread("opening index commit of a read-only engine");
return new SoftDeletesDirectoryReaderWrapper(DirectoryReader.open(commit), Lucene.SOFT_DELETES_FIELD);
DirectoryReader reader;
if (isExtendedCompatibility()) {
reader = DirectoryReader.open(commit, this.minimumSupportedVersion.luceneVersion.major, null);
} else {
reader = DirectoryReader.open(commit);
}
return new SoftDeletesDirectoryReaderWrapper(reader, Lucene.SOFT_DELETES_FIELD);
}

private boolean isExtendedCompatibility() {
return Version.CURRENT.minimumIndexCompatibilityVersion().onOrAfter(this.minimumSupportedVersion);
}

@Override
Expand Down
14 changes: 12 additions & 2 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -1984,7 +1984,7 @@ public void openEngineAndRecoverFromTranslog() throws IOException {
};

// Do not load the global checkpoint if this is a remote snapshot index
if (IndexModule.Type.REMOTE_SNAPSHOT.match(indexSettings) == false) {
if (indexSettings.isRemoteSnapshot() == false) {
loadGlobalCheckpointToReplicationTracker();
}

Expand Down Expand Up @@ -2039,7 +2039,7 @@ private void innerOpenEngineAndTranslog(LongSupplier globalCheckpointSupplier) t
}

private boolean assertSequenceNumbersInCommit() throws IOException {
final Map<String, String> userData = SegmentInfos.readLatestCommit(store.directory()).getUserData();
final Map<String, String> userData = fetchUserData();
assert userData.containsKey(SequenceNumbers.LOCAL_CHECKPOINT_KEY) : "commit point doesn't contains a local checkpoint";
assert userData.containsKey(MAX_SEQ_NO) : "commit point doesn't contains a maximum sequence number";
assert userData.containsKey(Engine.HISTORY_UUID_KEY) : "commit point doesn't contains a history uuid";
Expand All @@ -2054,6 +2054,16 @@ private boolean assertSequenceNumbersInCommit() throws IOException {
return true;
}

private Map<String, String> fetchUserData() throws IOException {
if (indexSettings.isRemoteSnapshot() && indexSettings.getExtendedCompatibilitySnapshotVersion() != null) {
// Inefficient method to support reading old Lucene indexes
return Lucene.readSegmentInfosExtendedCompatibility(store.directory(), indexSettings.getExtendedCompatibilitySnapshotVersion())
.getUserData();
} else {
return SegmentInfos.readLatestCommit(store.directory()).getUserData();
}
}

private void onNewEngine(Engine newEngine) {
assert Thread.holdsLock(engineMutex);
refreshListeners.setCurrentRefreshLocationSupplier(newEngine::getTranslogLastWriteLocation);
Expand Down
30 changes: 28 additions & 2 deletions server/src/main/java/org/opensearch/index/store/Store.java
Original file line number Diff line number Diff line change
Expand Up @@ -221,15 +221,21 @@ public Directory directory() {
public SegmentInfos readLastCommittedSegmentsInfo() throws IOException {
failIfCorrupted();
try {
return readSegmentsInfo(null, directory());
if (indexSettings.isRemoteSnapshot() && indexSettings.getExtendedCompatibilitySnapshotVersion() != null) {
return readSegmentInfosExtendedCompatibility(directory(), indexSettings.getExtendedCompatibilitySnapshotVersion());
} else {
return readSegmentsInfo(null, directory());
}
} catch (CorruptIndexException | IndexFormatTooOldException | IndexFormatTooNewException ex) {
markStoreCorrupted(ex);
throw ex;
}
}

/**
* Returns the segments info for the given commit or for the latest commit if the given commit is <code>null</code>
* Returns the segments info for the given commit or for the latest commit if the given commit is <code>null</code>.
* This method will throw an exception if the index is older than the standard backwards compatibility
* policy ( current major - 1). See also {@link #readSegmentInfosExtendedCompatibility(Directory, org.opensearch.Version)}.
*
* @throws IOException if the index is corrupted or the segments file is not present
*/
Expand All @@ -245,7 +251,27 @@ private static SegmentInfos readSegmentsInfo(IndexCommit commit, Directory direc
} catch (Exception ex) {
throw new CorruptIndexException("Hit unexpected exception while reading segment infos", "commit(" + commit + ")", ex);
}
}

/**
* Returns the segments info for the latest commit in the given directory. Unlike
* {@link #readSegmentsInfo(IndexCommit, Directory)}, this method supports reading
* older Lucene indices on a best-effort basis.
*
* @throws IOException if the index is corrupted or the segments file is not present
*/
private static SegmentInfos readSegmentInfosExtendedCompatibility(Directory directory, org.opensearch.Version minimumVersion)
throws IOException {
try {
return Lucene.readSegmentInfosExtendedCompatibility(directory, minimumVersion);
} catch (EOFException eof) {
// TODO this should be caught by lucene - EOF is almost certainly an index corruption
throw new CorruptIndexException("Read past EOF while reading segment infos", "<latest-commit>", eof);
} catch (IOException exception) {
throw exception; // IOExceptions like too many open files are not necessarily a corruption - just bubble it up
} catch (Exception ex) {
throw new CorruptIndexException("Hit unexpected exception while reading segment infos", "<latest-commit>", ex);
}
}

final void ensureOpen() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import org.apache.lucene.store.IndexOutput;
import org.apache.lucene.store.Lock;
import org.apache.lucene.store.NoLockFactory;
import org.opensearch.LegacyESVersion;
import org.opensearch.Version;
import org.opensearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot;
import org.opensearch.index.store.remote.file.OnDemandBlockSnapshotIndexInput;
import org.opensearch.index.store.remote.file.OnDemandVirtualFileSnapshotIndexInput;
Expand All @@ -35,6 +37,9 @@
* @opensearch.internal
*/
public final class RemoteSnapshotDirectory extends Directory {

public static final Version SEARCHABLE_SNAPSHOT_EXTENDED_COMPATIBILITY_MINIMUM_VERSION = LegacyESVersion.fromId(6000099);

private static final String VIRTUAL_FILE_PREFIX = BlobStoreRepository.VIRTUAL_DATA_BLOB_PREFIX;

private final Map<String, BlobStoreIndexShardSnapshot.FileInfo> fileInfoMap;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -887,7 +887,7 @@ private EngineFactory getEngineFactory(final IndexSettings idxSettings) {
if (idxSettings.isSegRepEnabled()) {
return new NRTReplicationEngineFactory();
}
if (IndexModule.Type.REMOTE_SNAPSHOT.match(idxSettings)) {
if (idxSettings.isRemoteSnapshot()) {
return config -> new ReadOnlyEngine(config, new SeqNoStats(0, 0, 0), new TranslogStats(), true, Function.identity(), false);
}
return new InternalEngineFactory();
Expand Down
Loading

0 comments on commit 85b85b1

Please sign in to comment.