Skip to content

Commit

Permalink
Fix Shallow copy snapshot failures on closed index (opensearch-projec…
Browse files Browse the repository at this point in the history
…t#16868)

* Fix shallow v1 snapshot failures on closed index

Signed-off-by: Shubh Sahu <shubhvs@amazon.com>

* UT fix

Signed-off-by: Shubh Sahu <shubhvs@amazon.com>

* Adding UT

Signed-off-by: Shubh Sahu <shubhvs@amazon.com>

* small fix

Signed-off-by: Shubh Sahu <shubhvs@amazon.com>

* Addressing comments

Signed-off-by: Shubh Sahu <shubhvs@amazon.com>

* Addressing comments

Signed-off-by: Shubh Sahu <shubhvs@amazon.com>

* Modifying IT to restore snapshot

Signed-off-by: Shubh Sahu <shubhvs@amazon.com>

---------

Signed-off-by: Shubh Sahu <shubhvs@amazon.com>
Co-authored-by: Shubh Sahu <shubhvs@amazon.com>
  • Loading branch information
astute-decipher and Shubh Sahu authored Jan 9, 2025
1 parent 1d4b85f commit 2eadf12
Show file tree
Hide file tree
Showing 10 changed files with 346 additions and 37 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Fix _list/shards API failing when closed indices are present ([#16606](https://github.com/opensearch-project/OpenSearch/pull/16606))
- Fix remote shards balance ([#15335](https://github.com/opensearch-project/OpenSearch/pull/15335))
- Always use `constant_score` query for `match_only_text` field ([#16964](https://github.com/opensearch-project/OpenSearch/pull/16964))
- Fix Shallow copy snapshot failures on closed index ([#16868](https://github.com/opensearch-project/OpenSearch/pull/16868))

### Security

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@
import org.opensearch.indices.recovery.RecoverySettings;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.plugins.Plugin;
import org.opensearch.repositories.blobstore.BlobStoreRepository;
import org.opensearch.snapshots.SnapshotInfo;
import org.opensearch.snapshots.SnapshotState;
import org.opensearch.test.InternalTestCluster;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.transport.MockTransportService;
Expand Down Expand Up @@ -1078,4 +1081,79 @@ public void testCloseIndexWithNoOpSyncAndFlushForAsyncTranslog() throws Interrup
Thread.sleep(10000);
ensureGreen(INDEX_NAME);
}

public void testSuccessfulShallowV1SnapshotPostIndexClose() throws Exception {
internalCluster().startClusterManagerOnlyNode();
String dataNode = internalCluster().startDataOnlyNodes(1).get(0);
createIndex(INDEX_NAME, remoteStoreIndexSettings(0, 10000L, -1));
ensureGreen(INDEX_NAME);

ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest();
updateSettingsRequest.persistentSettings(Settings.builder().put(CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING.getKey(), "0ms"));

assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet());

logger.info("Create shallow snapshot setting enabled repo");
String shallowSnapshotRepoName = "shallow-snapshot-repo-name";
Path shallowSnapshotRepoPath = randomRepoPath();
Settings.Builder settings = Settings.builder()
.put("location", shallowSnapshotRepoPath)
.put(BlobStoreRepository.REMOTE_STORE_INDEX_SHALLOW_COPY.getKey(), Boolean.TRUE);
createRepository(shallowSnapshotRepoName, "fs", settings);

for (int i = 0; i < 10; i++) {
indexBulk(INDEX_NAME, 1);
}
flushAndRefresh(INDEX_NAME);

logger.info("Verify shallow snapshot created before close");
final String snapshot1 = "snapshot1";
SnapshotInfo snapshotInfo1 = internalCluster().client()
.admin()
.cluster()
.prepareCreateSnapshot(shallowSnapshotRepoName, snapshot1)
.setIndices(INDEX_NAME)
.setWaitForCompletion(true)
.get()
.getSnapshotInfo();

assertEquals(SnapshotState.SUCCESS, snapshotInfo1.state());
assertTrue(snapshotInfo1.successfulShards() > 0);
assertEquals(0, snapshotInfo1.failedShards());

for (int i = 0; i < 10; i++) {
indexBulk(INDEX_NAME, 1);
}

// close index
client().admin().indices().close(Requests.closeIndexRequest(INDEX_NAME)).actionGet();
Thread.sleep(1000);
logger.info("Verify shallow snapshot created after close");
final String snapshot2 = "snapshot2";

SnapshotInfo snapshotInfo2 = internalCluster().client()
.admin()
.cluster()
.prepareCreateSnapshot(shallowSnapshotRepoName, snapshot2)
.setIndices(INDEX_NAME)
.setWaitForCompletion(true)
.get()
.getSnapshotInfo();

assertEquals(SnapshotState.SUCCESS, snapshotInfo2.state());
assertTrue(snapshotInfo2.successfulShards() > 0);
assertEquals(0, snapshotInfo2.failedShards());

// delete the index
cluster().wipeIndices(INDEX_NAME);
// try restoring the snapshot
RestoreSnapshotResponse restoreSnapshotResponse = clusterAdmin().prepareRestoreSnapshot(shallowSnapshotRepoName, snapshot2)
.setWaitForCompletion(true)
.execute()
.actionGet();
assertThat(restoreSnapshotResponse.getRestoreInfo().totalShards(), greaterThan(0));
ensureGreen(INDEX_NAME);
flushAndRefresh(INDEX_NAME);
assertBusy(() -> { assertHitCount(client(dataNode).prepareSearch(INDEX_NAME).setSize(0).get(), 20); });
}
}
16 changes: 16 additions & 0 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -1624,6 +1624,22 @@ public org.apache.lucene.util.Version minimumCompatibleVersion() {
return luceneVersion == null ? indexSettings.getIndexVersionCreated().luceneVersion : luceneVersion;
}

/**
* Fetches the last remote uploaded segment metadata file
* @return {@link RemoteSegmentMetadata}
* @throws IOException
*/
public RemoteSegmentMetadata fetchLastRemoteUploadedSegmentMetadata() throws IOException {
if (!indexSettings.isAssignedOnRemoteNode()) {
throw new IllegalStateException("Index is not assigned on Remote Node");
}
RemoteSegmentMetadata lastUploadedMetadata = getRemoteDirectory().readLatestMetadataFile();
if (lastUploadedMetadata == null) {
throw new FileNotFoundException("No metadata file found in remote store");
}
return lastUploadedMetadata;
}

/**
* Creates a new {@link IndexCommit} snapshot from the currently running engine. All resources referenced by this
* commit won't be freed until the commit / snapshot is closed.
Expand Down
39 changes: 39 additions & 0 deletions server/src/main/java/org/opensearch/repositories/Repository.java
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,45 @@ default void snapshotRemoteStoreIndexShard(
throw new UnsupportedOperationException();
}

/**
* Adds a reference of remote store data for a index commit point.
* <p>
* The index commit point can be obtained by using {@link org.opensearch.index.engine.Engine#acquireLastIndexCommit} method.
* Or for closed index can be obtained by reading last remote uploaded metadata by using {@link org.opensearch.index.shard.IndexShard#fetchLastRemoteUploadedSegmentMetadata()} method.
* Repository implementations shouldn't release the snapshot index commit point. It is done by the method caller.
* <p>
* As snapshot process progresses, implementation of this method should update {@link IndexShardSnapshotStatus} object and check
* {@link IndexShardSnapshotStatus#isAborted()} to see if the snapshot process should be aborted.
* @param store store to be snapshotted
* @param snapshotId snapshot id
* @param indexId id for the index being snapshotted
* @param snapshotIndexCommit commit point
* @param shardStateIdentifier a unique identifier of the state of the shard that is stored with the shard's snapshot and used
* to detect if the shard has changed between snapshots. If {@code null} is passed as the identifier
* snapshotting will be done by inspecting the physical files referenced by {@code snapshotIndexCommit}
* @param snapshotStatus snapshot status
* @param primaryTerm current Primary Term
* @param commitGeneration current commit generation
* @param startTime start time of the snapshot commit, this will be used as the start time for snapshot.
* @param indexFilesToFileLengthMap map of index files to file length
* @param listener listener invoked on completion
*/
default void snapshotRemoteStoreIndexShard(
Store store,
SnapshotId snapshotId,
IndexId indexId,
@Nullable IndexCommit snapshotIndexCommit,
@Nullable String shardStateIdentifier,
IndexShardSnapshotStatus snapshotStatus,
long primaryTerm,
long commitGeneration,
long startTime,
@Nullable Map<String, Long> indexFilesToFileLengthMap,
ActionListener<String> listener
) {
throw new UnsupportedOperationException();
}

/**
* Restores snapshot of the shard.
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3714,6 +3714,33 @@ private void writeAtomic(BlobContainer container, final String blobName, final B
}
}

@Override
public void snapshotRemoteStoreIndexShard(
Store store,
SnapshotId snapshotId,
IndexId indexId,
IndexCommit snapshotIndexCommit,
@Nullable String shardStateIdentifier,
IndexShardSnapshotStatus snapshotStatus,
long primaryTerm,
long startTime,
ActionListener<String> listener
) {
snapshotRemoteStoreIndexShard(
store,
snapshotId,
indexId,
snapshotIndexCommit,
shardStateIdentifier,
snapshotStatus,
primaryTerm,
snapshotIndexCommit.getGeneration(),
startTime,
null,
listener
);
}

@Override
public void snapshotRemoteStoreIndexShard(
Store store,
Expand All @@ -3723,27 +3750,38 @@ public void snapshotRemoteStoreIndexShard(
String shardStateIdentifier,
IndexShardSnapshotStatus snapshotStatus,
long primaryTerm,
long commitGeneration,
long startTime,
Map<String, Long> indexFilesToFileLengthMap,
ActionListener<String> listener
) {
if (isReadOnly()) {
listener.onFailure(new RepositoryException(metadata.name(), "cannot snapshot shard on a readonly repository"));
return;
}

final ShardId shardId = store.shardId();
try {
final String generation = snapshotStatus.generation();
logger.info("[{}] [{}] shallow copy snapshot to [{}] [{}] ...", shardId, snapshotId, metadata.name(), generation);
final BlobContainer shardContainer = shardContainer(indexId, shardId);

long indexTotalFileSize = 0;
// local store is being used here to fetch the files metadata instead of remote store as currently
// remote store is mirroring the local store.
List<String> fileNames = new ArrayList<>(snapshotIndexCommit.getFileNames());
Store.MetadataSnapshot commitSnapshotMetadata = store.getMetadata(snapshotIndexCommit);
for (String fileName : fileNames) {
indexTotalFileSize += commitSnapshotMetadata.get(fileName).length();
List<String> fileNames;

if (snapshotIndexCommit != null) {
// local store is being used here to fetch the files metadata instead of remote store as currently
// remote store is mirroring the local store.
fileNames = new ArrayList<>(snapshotIndexCommit.getFileNames());
Store.MetadataSnapshot commitSnapshotMetadata = store.getMetadata(snapshotIndexCommit);
for (String fileName : fileNames) {
indexTotalFileSize += commitSnapshotMetadata.get(fileName).length();
}
} else {
fileNames = new ArrayList<>(indexFilesToFileLengthMap.keySet());
indexTotalFileSize = indexFilesToFileLengthMap.values().stream().mapToLong(Long::longValue).sum();
}

int indexTotalNumberOfFiles = fileNames.size();

snapshotStatus.moveToStarted(
Expand All @@ -3754,7 +3792,7 @@ public void snapshotRemoteStoreIndexShard(
indexTotalFileSize
);

final IndexShardSnapshotStatus.Copy lastSnapshotStatus = snapshotStatus.moveToFinalize(snapshotIndexCommit.getGeneration());
final IndexShardSnapshotStatus.Copy lastSnapshotStatus = snapshotStatus.moveToFinalize(commitGeneration);

// now create and write the commit point
logger.trace("[{}] [{}] writing shard snapshot file", shardId, snapshotId);
Expand All @@ -3765,7 +3803,7 @@ public void snapshotRemoteStoreIndexShard(
snapshotId.getName(),
lastSnapshotStatus.getIndexVersion(),
primaryTerm,
snapshotIndexCommit.getGeneration(),
commitGeneration,
lastSnapshotStatus.getStartTime(),
threadPool.absoluteTimeInMillis() - lastSnapshotStatus.getStartTime(),
indexTotalNumberOfFiles,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.opensearch.cluster.SnapshotsInProgress.ShardSnapshotStatus;
import org.opensearch.cluster.SnapshotsInProgress.ShardState;
import org.opensearch.cluster.SnapshotsInProgress.State;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.Nullable;
Expand All @@ -63,6 +64,7 @@
import org.opensearch.index.shard.IndexShardState;
import org.opensearch.index.snapshots.IndexShardSnapshotStatus;
import org.opensearch.index.snapshots.IndexShardSnapshotStatus.Stage;
import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadata;
import org.opensearch.indices.IndicesService;
import org.opensearch.repositories.IndexId;
import org.opensearch.repositories.RepositoriesService;
Expand All @@ -74,7 +76,6 @@
import org.opensearch.transport.TransportService;

import java.io.IOException;
import java.nio.file.NoSuchFileException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
Expand Down Expand Up @@ -371,7 +372,9 @@ private void snapshot(
ActionListener<String> listener
) {
try {
final IndexShard indexShard = indicesService.indexServiceSafe(shardId.getIndex()).getShardOrNull(shardId.id());
final IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
final IndexShard indexShard = indexService.getShardOrNull(shardId.id());
final boolean closedIndex = indexService.getMetadata().getState() == IndexMetadata.State.CLOSE;
if (indexShard.routingEntry().primary() == false) {
throw new IndexShardSnapshotFailedException(shardId, "snapshot should be performed only on primary");
}
Expand All @@ -398,36 +401,56 @@ private void snapshot(
if (remoteStoreIndexShallowCopy && indexShard.indexSettings().isRemoteStoreEnabled()) {
long startTime = threadPool.relativeTimeInMillis();
long primaryTerm = indexShard.getOperationPrimaryTerm();
// we flush first to make sure we get the latest writes snapshotted
wrappedSnapshot = indexShard.acquireLastIndexCommitAndRefresh(true);
IndexCommit snapshotIndexCommit = wrappedSnapshot.get();
long commitGeneration = snapshotIndexCommit.getGeneration();
long commitGeneration = 0L;
Map<String, Long> indexFilesToFileLengthMap = null;
IndexCommit snapshotIndexCommit = null;

try {
if (closedIndex) {
RemoteSegmentMetadata lastRemoteUploadedIndexCommit = indexShard.fetchLastRemoteUploadedSegmentMetadata();
indexFilesToFileLengthMap = lastRemoteUploadedIndexCommit.getMetadata()
.entrySet()
.stream()
.collect(Collectors.toMap(Map.Entry::getKey, entry -> entry.getValue().getLength()));
primaryTerm = lastRemoteUploadedIndexCommit.getPrimaryTerm();
commitGeneration = lastRemoteUploadedIndexCommit.getGeneration();
} else {
wrappedSnapshot = indexShard.acquireLastIndexCommitAndRefresh(true);
snapshotIndexCommit = wrappedSnapshot.get();
commitGeneration = snapshotIndexCommit.getGeneration();
}
indexShard.acquireLockOnCommitData(snapshot.getSnapshotId().getUUID(), primaryTerm, commitGeneration);
} catch (NoSuchFileException e) {
wrappedSnapshot.close();
logger.warn(
"Exception while acquiring lock on primaryTerm = {} and generation = {}",
primaryTerm,
commitGeneration
);
indexShard.flush(new FlushRequest(shardId.getIndexName()).force(true));
wrappedSnapshot = indexShard.acquireLastIndexCommit(false);
snapshotIndexCommit = wrappedSnapshot.get();
commitGeneration = snapshotIndexCommit.getGeneration();
indexShard.acquireLockOnCommitData(snapshot.getSnapshotId().getUUID(), primaryTerm, commitGeneration);
} catch (IOException e) {
if (closedIndex) {
logger.warn("Exception while reading latest metadata file from remote store");
listener.onFailure(e);
} else {
wrappedSnapshot.close();
logger.warn(
"Exception while acquiring lock on primaryTerm = {} and generation = {}",
primaryTerm,
commitGeneration
);
indexShard.flush(new FlushRequest(shardId.getIndexName()).force(true));
wrappedSnapshot = indexShard.acquireLastIndexCommit(false);
snapshotIndexCommit = wrappedSnapshot.get();
commitGeneration = snapshotIndexCommit.getGeneration();
indexShard.acquireLockOnCommitData(snapshot.getSnapshotId().getUUID(), primaryTerm, commitGeneration);
}
}
try {
repository.snapshotRemoteStoreIndexShard(
indexShard.store(),
snapshot.getSnapshotId(),
indexId,
snapshotIndexCommit,
getShardStateId(indexShard, snapshotIndexCommit),
null,
snapshotStatus,
primaryTerm,
commitGeneration,
startTime,
ActionListener.runBefore(listener, wrappedSnapshot::close)
indexFilesToFileLengthMap,
closedIndex ? listener : ActionListener.runBefore(listener, wrappedSnapshot::close)
);
} catch (IndexShardSnapshotFailedException e) {
logger.error(
Expand Down
Loading

0 comments on commit 2eadf12

Please sign in to comment.