Skip to content

Commit

Permalink
Enhance searchable snapshots to enable a read-only view of older snap…
Browse files Browse the repository at this point in the history
…shots

This change removes the guardrails around N-1 backward compatibility and uses Lucene's "expert" APIs to read snapshots (Lucene segments) older than N-1 on a best-effort basis. The functionality is gated by an additional feature flag, separate from the searchable snapshots flag. Note that the Lucene integration is rather inefficient because the necessary "expert" Lucene APIs are still package-private.

Signed-off-by: Kartik Ganesh <gkart@amazon.com>
  • Loading branch information
kartg committed Dec 1, 2022
1 parent f7e2d8e commit dfe549a
Show file tree
Hide file tree
Showing 10 changed files with 157 additions and 10 deletions.
6 changes: 6 additions & 0 deletions server/src/main/java/org/opensearch/Version.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.common.xcontent.ToXContentFragment;
import org.opensearch.common.xcontent.XContentBuilder;
import org.opensearch.monitor.jvm.JvmInfo;
Expand All @@ -51,6 +52,8 @@
import java.util.Locale;
import java.util.Objects;

import static org.opensearch.common.util.FeatureFlags.SEARCHABLE_SNAPSHOT_EXTENDED_BWC;

/**
* OpenSearch Version Class
*
Expand Down Expand Up @@ -413,6 +416,9 @@ private Version computeMinIndexCompatVersion() {
} else if (major == 7 || major == 1) {
return LegacyESVersion.fromId(6000026);
} else if (major == 2) {
if (FeatureFlags.isEnabled(SEARCHABLE_SNAPSHOT_EXTENDED_BWC)) {
return LegacyESVersion.fromId(6030099);
}
return LegacyESVersion.V_7_0_0;
} else {
bwcMajor = major - 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1825,10 +1825,13 @@ public static IndexMetadata fromXContent(XContentParser parser) throws IOExcepti
throw new IllegalArgumentException("Unexpected token " + token);
}
}
if (Assertions.ENABLED) {
// Reference:
// https://github.com/opensearch-project/OpenSearch/blob/4dde0f2a3b445b2fc61dab29c5a2178967f4a3e3/server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java#L1620-L1628
Version legacyVersion = LegacyESVersion.fromId(6050099);
if (Assertions.ENABLED && Version.indexCreated(builder.settings).onOrAfter(legacyVersion)) {
assert mappingVersion : "mapping version should be present for indices";
}
if (Assertions.ENABLED) {
if (Assertions.ENABLED && Version.indexCreated(builder.settings).onOrAfter(legacyVersion)) {
assert settingsVersion : "settings version should be present for indices";
}
if (Assertions.ENABLED && Version.indexCreated(builder.settings).onOrAfter(LegacyESVersion.V_7_2_0)) {
Expand Down
18 changes: 18 additions & 0 deletions server/src/main/java/org/opensearch/common/lucene/Lucene.java
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
import org.apache.lucene.index.SortedDocValues;
import org.apache.lucene.index.SortedNumericDocValues;
import org.apache.lucene.index.SortedSetDocValues;
import org.apache.lucene.index.StandardDirectoryReader;
import org.apache.lucene.index.StoredFieldVisitor;
import org.apache.lucene.index.Terms;
import org.apache.lucene.index.VectorValues;
Expand Down Expand Up @@ -162,6 +163,23 @@ public static SegmentInfos readSegmentInfos(Directory directory) throws IOExcept
return SegmentInfos.readLatestCommit(directory);
}

/**
* A variant of {@link #readSegmentInfos(Directory)} that supports reading indices written by
* older major versions of Lucene. The underlying implementation is a workaround since the
* "expert" readLatestCommit API is currently package-private in Lucene. First, all commits in
* the given {@link Directory} are listed - this result includes older Lucene commits. Then,
* the latest index commit is opened via {@link DirectoryReader} by including a minimum supported
* Lucene major version - this is based on the minimum index compatiblity version of OpenSearch.
*/
public static SegmentInfos readAnySegmentInfos(Directory directory) throws IOException {
// This list is sorted from oldest to latest
List<IndexCommit> indexCommits = DirectoryReader.listCommits(directory);
IndexCommit latestCommit = indexCommits.get(indexCommits.size() - 1);
int minSupportedLuceneMajor = org.opensearch.Version.CURRENT.minimumIndexCompatibilityVersion().luceneVersion.major;
StandardDirectoryReader reader = (StandardDirectoryReader) DirectoryReader.open(latestCommit, minSupportedLuceneMajor, null);
return reader.getSegmentInfos();
}

/**
* Returns an iterable that allows to iterate over all files in this segments info
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,13 @@ public class FeatureFlags {
*/
public static final String SEARCHABLE_SNAPSHOT = "opensearch.experimental.feature.searchable_snapshot.enabled";

/**
* Gates the ability for Searchable Snapshots to read snapshots that are older than the
* guaranteed backward compatibilty for OpenSearch (one prior major version) on a best effort basis.
*/
public static final String SEARCHABLE_SNAPSHOT_EXTENDED_BWC =
"opensearch.experimental.feature.searchable_snapshot.extended_bwc.enabled";

/**
* Gates the functionality of extensions.
* Once the feature is ready for production release, this feature flag can be removed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,24 @@ public class ReadOnlyEngine extends Engine {
private final CompletionStatsCache completionStatsCache;
private final boolean requireCompleteHistory;
private final TranslogManager translogManager;
private final boolean allowOldIndices;

protected volatile TranslogStats translogStats;

/**
* Wrapper constructor that turns off support for reading old indices.
*/
public ReadOnlyEngine(
EngineConfig config,
SeqNoStats seqNoStats,
TranslogStats translogStats,
boolean obtainLock,
Function<DirectoryReader, DirectoryReader> readerWrapperFunction,
boolean requireCompleteHistory
) {
this(config, seqNoStats, translogStats, obtainLock, readerWrapperFunction, requireCompleteHistory, false);
}

/**
* Creates a new ReadOnlyEngine. This ctor can also be used to open a read-only engine on top of an already opened
* read-write engine. It allows to optionally obtain the writer locks for the shard which would time-out if another
Expand All @@ -115,10 +130,12 @@ public ReadOnlyEngine(
TranslogStats translogStats,
boolean obtainLock,
Function<DirectoryReader, DirectoryReader> readerWrapperFunction,
boolean requireCompleteHistory
boolean requireCompleteHistory,
boolean allowOldIndices
) {
super(config);
this.requireCompleteHistory = requireCompleteHistory;
this.allowOldIndices = allowOldIndices;
try {
Store store = config.getStore();
store.incRef();
Expand All @@ -130,7 +147,11 @@ public ReadOnlyEngine(
// we obtain the IW lock even though we never modify the index.
// yet this makes sure nobody else does. including some testing tools that try to be messy
indexWriterLock = obtainLock ? directory.obtainLock(IndexWriter.WRITE_LOCK_NAME) : null;
this.lastCommittedSegmentInfos = Lucene.readSegmentInfos(directory);
if (this.allowOldIndices) {
this.lastCommittedSegmentInfos = Lucene.readAnySegmentInfos(directory);
} else {
this.lastCommittedSegmentInfos = Lucene.readSegmentInfos(directory);
}
if (seqNoStats == null) {
seqNoStats = buildSeqNoStats(config, lastCommittedSegmentInfos);
ensureMaxSeqNoEqualsToGlobalCheckpoint(seqNoStats);
Expand Down Expand Up @@ -223,7 +244,14 @@ protected final OpenSearchDirectoryReader wrapReader(

protected DirectoryReader open(IndexCommit commit) throws IOException {
assert Transports.assertNotTransportThread("opening index commit of a read-only engine");
return new SoftDeletesDirectoryReaderWrapper(DirectoryReader.open(commit), Lucene.SOFT_DELETES_FIELD);
DirectoryReader reader;
if (this.allowOldIndices) {
int minSupportedLuceneMajor = org.opensearch.Version.CURRENT.minimumIndexCompatibilityVersion().luceneVersion.major;
reader = DirectoryReader.open(commit, minSupportedLuceneMajor, null);
} else {
reader = DirectoryReader.open(commit);
}
return new SoftDeletesDirectoryReaderWrapper(reader, Lucene.SOFT_DELETES_FIELD);
}

@Override
Expand Down
15 changes: 13 additions & 2 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@
import org.opensearch.common.unit.ByteSizeValue;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.BigArrays;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.common.util.concurrent.AbstractRunnable;
import org.opensearch.common.util.concurrent.AsyncIOProcessor;
import org.opensearch.common.util.concurrent.RunOnce;
Expand Down Expand Up @@ -1984,7 +1985,7 @@ public void openEngineAndRecoverFromTranslog() throws IOException {
};

// Do not load the global checkpoint if this is a remote snapshot index
if (IndexModule.Type.REMOTE_SNAPSHOT.match(indexSettings) == false) {
if (isRemoteSnapshotIndex() == false) {
loadGlobalCheckpointToReplicationTracker();
}

Expand Down Expand Up @@ -2039,7 +2040,13 @@ private void innerOpenEngineAndTranslog(LongSupplier globalCheckpointSupplier) t
}

private boolean assertSequenceNumbersInCommit() throws IOException {
final Map<String, String> userData = SegmentInfos.readLatestCommit(store.directory()).getUserData();
final Map<String, String> userData;
if (isRemoteSnapshotIndex() && FeatureFlags.isEnabled(FeatureFlags.SEARCHABLE_SNAPSHOT_EXTENDED_BWC)) {
// Inefficient method to support reading old Lucene indexes
userData = Lucene.readAnySegmentInfos(store.directory()).getUserData();
} else {
userData = Lucene.readSegmentInfos(store.directory()).getUserData();
}
assert userData.containsKey(SequenceNumbers.LOCAL_CHECKPOINT_KEY) : "commit point doesn't contains a local checkpoint";
assert userData.containsKey(MAX_SEQ_NO) : "commit point doesn't contains a maximum sequence number";
assert userData.containsKey(Engine.HISTORY_UUID_KEY) : "commit point doesn't contains a history uuid";
Expand All @@ -2054,6 +2061,10 @@ private boolean assertSequenceNumbersInCommit() throws IOException {
return true;
}

private boolean isRemoteSnapshotIndex() {
return IndexModule.Type.REMOTE_SNAPSHOT.match(indexSettings);
}

private void onNewEngine(Engine newEngine) {
assert Thread.holdsLock(engineMutex);
refreshListeners.setCurrentRefreshLocationSupplier(newEngine::getTranslogLastWriteLocation);
Expand Down
32 changes: 30 additions & 2 deletions server/src/main/java/org/opensearch/index/store/Store.java
Original file line number Diff line number Diff line change
Expand Up @@ -79,13 +79,15 @@
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Setting.Property;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.common.util.concurrent.AbstractRefCounted;
import org.opensearch.common.util.concurrent.RefCounted;
import org.opensearch.common.util.iterable.Iterables;
import org.opensearch.core.internal.io.IOUtils;
import org.opensearch.env.NodeEnvironment;
import org.opensearch.env.ShardLock;
import org.opensearch.env.ShardLockObtainFailedException;
import org.opensearch.index.IndexModule;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.engine.CombinedDeletionPolicy;
import org.opensearch.index.engine.Engine;
Expand Down Expand Up @@ -221,15 +223,22 @@ public Directory directory() {
public SegmentInfos readLastCommittedSegmentsInfo() throws IOException {
failIfCorrupted();
try {
return readSegmentsInfo(null, directory());
if (IndexModule.Type.REMOTE_SNAPSHOT.match(indexSettings)
&& FeatureFlags.isEnabled(FeatureFlags.SEARCHABLE_SNAPSHOT_EXTENDED_BWC)) {
return readAnySegmentsInfo(directory());
} else {
return readSegmentsInfo(null, directory());
}
} catch (CorruptIndexException | IndexFormatTooOldException | IndexFormatTooNewException ex) {
markStoreCorrupted(ex);
throw ex;
}
}

/**
* Returns the segments info for the given commit or for the latest commit if the given commit is <code>null</code>
* Returns the segments info for the given commit or for the latest commit if the given commit is <code>null</code>.
* This method will throw an exception if the index is older than the standard backwards compatibility
* policy ( current major - 1). See also {@link #readAnySegmentsInfo(Directory)}.
*
* @throws IOException if the index is corrupted or the segments file is not present
*/
Expand All @@ -245,7 +254,26 @@ private static SegmentInfos readSegmentsInfo(IndexCommit commit, Directory direc
} catch (Exception ex) {
throw new CorruptIndexException("Hit unexpected exception while reading segment infos", "commit(" + commit + ")", ex);
}
}

/**
* Returns the segments info for the latest commit in the given directory. Unlike
* {@link #readSegmentsInfo(IndexCommit, Directory)}, this method supports reading
* older Lucene indices on a best-effort basis.
*
* @throws IOException if the index is corrupted or the segments file is not present
*/
private static SegmentInfos readAnySegmentsInfo(Directory directory) throws IOException {
try {
return Lucene.readAnySegmentInfos(directory);
} catch (EOFException eof) {
// TODO this should be caught by lucene - EOF is almost certainly an index corruption
throw new CorruptIndexException("Read past EOF while reading segment infos", "<latest-commit>", eof);
} catch (IOException exception) {
throw exception; // IOExceptions like too many open files are not necessarily a corruption - just bubble it up
} catch (Exception ex) {
throw new CorruptIndexException("Hit unexpected exception while reading segment infos", "<latest-commit>", ex);
}
}

final void ensureOpen() {
Expand Down
22 changes: 21 additions & 1 deletion server/src/main/java/org/opensearch/indices/IndicesService.java
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@
import org.opensearch.common.unit.ByteSizeValue;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.BigArrays;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.common.util.concurrent.AbstractRefCounted;
import org.opensearch.common.util.concurrent.AbstractRunnable;
import org.opensearch.common.util.concurrent.OpenSearchExecutors;
Expand Down Expand Up @@ -768,7 +769,26 @@ private EngineFactory getEngineFactory(final IndexSettings idxSettings) {
return new NRTReplicationEngineFactory();
}
if (IndexModule.Type.REMOTE_SNAPSHOT.match(idxSettings)) {
return config -> new ReadOnlyEngine(config, new SeqNoStats(0, 0, 0), new TranslogStats(), true, Function.identity(), false);
if (FeatureFlags.isEnabled(FeatureFlags.SEARCHABLE_SNAPSHOT_EXTENDED_BWC)) {
return config -> new ReadOnlyEngine(
config,
new SeqNoStats(0, 0, 0),
new TranslogStats(),
true,
Function.identity(),
false,
true
);
} else {
return config -> new ReadOnlyEngine(
config,
new SeqNoStats(0, 0, 0),
new TranslogStats(),
true,
Function.identity(),
false
);
}
}
return new InternalEngineFactory();
} else if (engineFactories.size() == 1) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.opensearch.index.snapshots.blobstore.SnapshotFiles;
import org.opensearch.index.store.Store;
import org.opensearch.index.store.StoreFileMetadata;
import org.opensearch.index.store.remote.directory.RemoteSnapshotDirectory;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.snapshots.SnapshotId;

Expand Down Expand Up @@ -198,6 +199,10 @@ public void restore(SnapshotFiles snapshotFiles, Store store, ActionListener<Voi
}

private void afterRestore(SnapshotFiles snapshotFiles, Store store, StoreFileMetadata restoredSegmentsFile) {
// no cleanup needed for searchable snapshots
if (store.directory() instanceof RemoteSnapshotDirectory) {
return;
}
// read the snapshot data persisted
try {
Lucene.pruneUnreferencedFiles(restoredSegmentsFile.name(), store.directory());
Expand Down
21 changes: 21 additions & 0 deletions server/src/test/java/org/opensearch/VersionTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,17 @@
package org.opensearch;

import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.SuppressForbidden;
import org.opensearch.common.lucene.Lucene;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.test.VersionUtils;
import org.hamcrest.Matchers;

import java.lang.reflect.Modifier;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
Expand Down Expand Up @@ -245,6 +249,23 @@ public void testOpenSearchMinIndexCompatVersion() {
assertEquals(expected.revision, actual.revision);
}

@SuppressForbidden(reason = "tests minimum index compatiblity with a feature flag enabled")
public void testMinimumIndexCompatWithFeatureFlag() {
AccessController.doPrivileged(
(PrivilegedAction<String>) () -> System.setProperty(FeatureFlags.SEARCHABLE_SNAPSHOT_EXTENDED_BWC, "true")
);
try {
Version expected = LegacyESVersion.fromId(6000099);
// affects version 2+
Version actual = Version.fromId(2000099);
assertEquals(expected.major, actual.minimumIndexCompatibilityVersion().major);
} finally {
AccessController.doPrivileged(
(PrivilegedAction<String>) () -> System.clearProperty(FeatureFlags.SEARCHABLE_SNAPSHOT_EXTENDED_BWC)
);
}
}

/** test first version of opensearch compatibility that does not support legacy versions */
public void testOpenSearchPreLegacyRemoval() {
Version opensearchVersion = Version.fromString("3.0.0");
Expand Down

0 comments on commit dfe549a

Please sign in to comment.