diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStorePinnedTimestampsGarbageCollectionIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStorePinnedTimestampsGarbageCollectionIT.java index 08ece7df457cc..2922ad33586d2 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStorePinnedTimestampsGarbageCollectionIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStorePinnedTimestampsGarbageCollectionIT.java @@ -421,8 +421,8 @@ public void testIndexDeletionNoPinnedTimestamps() throws Exception { client().admin().indices().prepareDelete(INDEX_NAME).get(); assertBusy(() -> { - assertEquals(0, Files.list(translogMetadataPath).collect(Collectors.toList()).size()); - assertEquals(0, Files.list(translogDataPath).collect(Collectors.toList()).size()); + assertEquals(1, Files.list(translogMetadataPath).collect(Collectors.toList()).size()); + assertEquals(4, Files.list(translogDataPath).collect(Collectors.toList()).size()); }); } @@ -490,7 +490,7 @@ public void testIndexDeletionWithPinnedTimestamps() throws Exception { assertBusy(() -> { List metadataFiles = Files.list(translogMetadataPath).collect(Collectors.toList()); - assertEquals(1, metadataFiles.size()); + assertEquals(2, metadataFiles.size()); verifyTranslogDataFileCount(metadataFiles, translogDataPath); }); diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RestoreShallowSnapshotV2IT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RestoreShallowSnapshotV2IT.java index 24f1141ddbede..d532abaa2b0ad 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RestoreShallowSnapshotV2IT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RestoreShallowSnapshotV2IT.java @@ -27,22 +27,26 @@ import org.opensearch.common.blobstore.BlobPath; import org.opensearch.common.io.PathUtils; import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.io.IOUtils; import org.opensearch.core.index.Index; import org.opensearch.core.rest.RestStatus; import org.opensearch.index.IndexService; import org.opensearch.index.IndexSettings; +import org.opensearch.index.mapper.MapperService; import org.opensearch.index.remote.RemoteStoreEnums; import org.opensearch.index.shard.IndexShard; import org.opensearch.indices.IndicesService; import org.opensearch.indices.RemoteStoreSettings; import org.opensearch.indices.recovery.RecoveryState; import org.opensearch.indices.replication.common.ReplicationType; +import org.opensearch.node.remotestore.RemoteStorePinnedTimestampService; import org.opensearch.repositories.blobstore.BlobStoreRepository; import org.opensearch.snapshots.AbstractSnapshotIntegTestCase; import org.opensearch.snapshots.SnapshotInfo; import org.opensearch.snapshots.SnapshotRestoreException; import org.opensearch.snapshots.SnapshotState; +import org.opensearch.test.BackgroundIndexer; import org.opensearch.test.InternalTestCluster; import org.opensearch.test.OpenSearchIntegTestCase; import org.junit.After; @@ -53,15 +57,18 @@ import java.nio.file.Path; import java.util.ArrayList; import java.util.Arrays; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import java.util.stream.Stream; import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REMOTE_STORE_ENABLED; +import static org.opensearch.index.query.QueryBuilders.matchAllQuery; import static org.opensearch.index.remote.RemoteStoreEnums.DataCategory.SEGMENTS; import static org.opensearch.index.remote.RemoteStoreEnums.DataCategory.TRANSLOG; import static org.opensearch.index.remote.RemoteStoreEnums.DataType.DATA; @@ -885,4 +892,88 @@ public void testRestoreOperationsUsingDifferentRepos() throws Exception { ensureGreen(indexName1); assertDocsPresentInIndex(client, indexName1, 3 * numDocsInIndex1); } + + public void testContinuousIndexing() throws Exception { + internalCluster().startClusterManagerOnlyNode(); + internalCluster().startDataOnlyNode(); + String index = "test-index"; + String snapshotRepo = "test-restore-snapshot-repo"; + String baseSnapshotName = "snapshot_"; + Path absolutePath1 = randomRepoPath().toAbsolutePath(); + logger.info("Snapshot Path [{}]", absolutePath1); + + createRepository(snapshotRepo, "fs", getRepositorySettings(absolutePath1, true)); + + Client client = client(); + Settings indexSettings = Settings.builder() + .put(super.indexSettings()) + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .build(); + + createIndex(index, indexSettings); + ensureGreen(index); + + RemoteStorePinnedTimestampService remoteStorePinnedTimestampService = internalCluster().getInstance( + RemoteStorePinnedTimestampService.class, + primaryNodeName(index) + ); + + remoteStorePinnedTimestampService.rescheduleAsyncUpdatePinnedTimestampTask(TimeValue.timeValueSeconds(randomIntBetween(1, 5))); + RemoteStoreSettings.setPinnedTimestampsLookbackInterval(TimeValue.timeValueSeconds(randomIntBetween(1, 5))); + + long totalDocs = 0; + Map snapshots = new HashMap<>(); + int numDocs = randomIntBetween(200, 300); + totalDocs += numDocs; + try (BackgroundIndexer indexer = new BackgroundIndexer(index, MapperService.SINGLE_MAPPING_NAME, client(), numDocs)) { + int numberOfSnapshots = 5; + for (int i = 0; i < numberOfSnapshots; i++) { + logger.info("--> waiting for {} docs to be indexed ...", numDocs); + long finalTotalDocs1 = totalDocs; + assertBusy(() -> assertEquals(finalTotalDocs1, indexer.totalIndexedDocs()), 120, TimeUnit.SECONDS); + logger.info("--> {} total docs indexed", totalDocs); + String snapshotName = baseSnapshotName + i; + createSnapshot(snapshotRepo, snapshotName, new ArrayList<>()); + snapshots.put(snapshotName, totalDocs); + if (i < numberOfSnapshots - 1) { + numDocs = randomIntBetween(200, 300); + indexer.continueIndexing(numDocs); + totalDocs += numDocs; + } + } + } + + logger.info("Snapshots Status: " + snapshots); + + for (String snapshot : snapshots.keySet()) { + logger.info("Restoring snapshot: {}", snapshot); + assertAcked(client().admin().indices().delete(new DeleteIndexRequest(index)).get()); + + RestoreSnapshotResponse restoreSnapshotResponse1 = client.admin() + .cluster() + .prepareRestoreSnapshot(snapshotRepo, snapshot) + .setWaitForCompletion(true) + .setIndices() + .get(); + + assertEquals(RestStatus.OK, restoreSnapshotResponse1.status()); + + // Verify restored index's stats + ensureGreen(TimeValue.timeValueSeconds(60), index); + long finalTotalDocs = totalDocs; + assertBusy(() -> { + Long hits = client().prepareSearch(index) + .setQuery(matchAllQuery()) + .setSize((int) finalTotalDocs) + .storedFields() + .execute() + .actionGet() + .getHits() + .getTotalHits().value; + + assertEquals(snapshots.get(snapshot), hits); + }); + } + } } diff --git a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java index 25d6e24cade14..27a78dc3ce2f6 100644 --- a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java +++ b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java @@ -39,6 +39,7 @@ import org.opensearch.index.store.lockmanager.RemoteStoreMetadataLockManager; import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadata; import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadataHandler; +import org.opensearch.indices.RemoteStoreSettings; import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; import org.opensearch.node.remotestore.RemoteStorePinnedTimestampService; import org.opensearch.threadpool.ThreadPool; @@ -891,6 +892,16 @@ public void deleteStaleSegments(int lastNMetadataFilesToKeep) throws IOException lastSuccessfulFetchOfPinnedTimestamps ); + if (metadataFilesEligibleToDelete.isEmpty()) { + logger.debug("No metadata files are eligible to be deleted based on lastNMetadataFilesToKeep and age"); + return; + } + + // If pinned timestamps are enabled, make sure to not delete last metadata file. + if (RemoteStoreSettings.isPinnedTimestampsEnabled()) { + metadataFilesEligibleToDelete.remove(sortedMetadataFileList.get(0)); + } + List metadataFilesToBeDeleted = metadataFilesEligibleToDelete.stream() .filter(metadataFile -> allLockFiles.contains(metadataFile) == false) .collect(Collectors.toList()); @@ -905,7 +916,7 @@ public void deleteStaleSegments(int lastNMetadataFilesToKeep) throws IOException Set activeSegmentRemoteFilenames = new HashSet<>(); final Set metadataFilesToFilterActiveSegments = getMetadataFilesToFilterActiveSegments( - lastNMetadataFilesToKeep, + sortedMetadataFileList.indexOf(metadataFilesEligibleToDelete.get(0)), sortedMetadataFileList, allLockFiles ); diff --git a/server/src/main/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslog.java b/server/src/main/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslog.java index 1f54c09a04cc7..54cbf8ac9a9f8 100644 --- a/server/src/main/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslog.java +++ b/server/src/main/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslog.java @@ -189,7 +189,7 @@ public void onResponse(List blobMetadata) { List metadataFilesToBeDeleted = getMetadataFilesToBeDeleted(metadataFiles, indexDeleted); // If index is not deleted, make sure to keep latest metadata file - if (indexDeleted == false) { + if (indexDeleted == false || RemoteStoreSettings.isPinnedTimestampsEnabled()) { metadataFilesToBeDeleted.remove(metadataFiles.get(0)); } diff --git a/server/src/test/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslogTests.java b/server/src/test/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslogTests.java index 0995f2e75a17a..838f97ade9e8e 100644 --- a/server/src/test/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslogTests.java +++ b/server/src/test/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslogTests.java @@ -286,9 +286,9 @@ public void testIndexDeletionWithNoPinnedTimestampNoRecentMdFiles() throws Excep assertBusy(() -> assertTrue(translog.isRemoteGenerationDeletionPermitsAvailable())); assertBusy(() -> { - assertEquals(0, blobStoreTransferService.listAll(getTranslogDirectory().add(METADATA_DIR)).size()); + assertEquals(1, blobStoreTransferService.listAll(getTranslogDirectory().add(METADATA_DIR)).size()); assertEquals( - 0, + 12, blobStoreTransferService.listAll(getTranslogDirectory().add(DATA_DIR).add(String.valueOf(primaryTerm.get()))).size() ); });