Skip to content

Commit

Permalink
Treat last fetch timestamp of pinned timestamp as one of the pinned t…
Browse files Browse the repository at this point in the history
…imestamps

Signed-off-by: Sachin Kale <sachinpkale@gmail.com>
  • Loading branch information
sachinpkale committed Oct 19, 2024
1 parent 7c4931f commit 6d86b38
Show file tree
Hide file tree
Showing 4 changed files with 258 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,24 +27,29 @@
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.io.PathUtils;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.io.IOUtils;
import org.opensearch.core.index.Index;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.index.IndexService;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.mapper.MapperService;
import org.opensearch.index.remote.RemoteStoreEnums;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.RemoteStoreSettings;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.node.remotestore.RemoteStorePinnedTimestampService;
import org.opensearch.repositories.blobstore.BlobStoreRepository;
import org.opensearch.snapshots.AbstractSnapshotIntegTestCase;
import org.opensearch.snapshots.SnapshotInfo;
import org.opensearch.snapshots.SnapshotRestoreException;
import org.opensearch.snapshots.SnapshotState;
import org.opensearch.test.BackgroundIndexer;
import org.opensearch.test.InternalTestCluster;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.junit.annotations.TestIssueLogging;
import org.junit.After;
import org.junit.Before;

Expand All @@ -53,15 +58,18 @@
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REMOTE_STORE_ENABLED;
import static org.opensearch.index.query.QueryBuilders.matchAllQuery;
import static org.opensearch.index.remote.RemoteStoreEnums.DataCategory.SEGMENTS;
import static org.opensearch.index.remote.RemoteStoreEnums.DataCategory.TRANSLOG;
import static org.opensearch.index.remote.RemoteStoreEnums.DataType.DATA;
Expand Down Expand Up @@ -885,4 +893,197 @@ public void testRestoreOperationsUsingDifferentRepos() throws Exception {
ensureGreen(indexName1);
assertDocsPresentInIndex(client, indexName1, 3 * numDocsInIndex1);
}

public void testContinuousIndexing() throws Exception {
internalCluster().startClusterManagerOnlyNode();
internalCluster().startDataOnlyNode();
String index = "test-index";
String snapshotRepo = "test-restore-snapshot-repo";
String baseSnapshotName = "snapshot_";
Path absolutePath1 = randomRepoPath().toAbsolutePath();
logger.info("Snapshot Path [{}]", absolutePath1);

createRepository(snapshotRepo, "fs", getRepositorySettings(absolutePath1, true));

Client client = client();
Settings indexSettings = Settings.builder()
.put(super.indexSettings())
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.build();

createIndex(index, indexSettings);
ensureGreen(index);

RemoteStorePinnedTimestampService remoteStorePinnedTimestampService = internalCluster().getInstance(
RemoteStorePinnedTimestampService.class,
primaryNodeName(index)
);

remoteStorePinnedTimestampService.rescheduleAsyncUpdatePinnedTimestampTask(TimeValue.timeValueSeconds(randomIntBetween(1, 5)));
RemoteStoreSettings.setPinnedTimestampsLookbackInterval(TimeValue.timeValueSeconds(randomIntBetween(1, 5)));

long totalDocs = 0;
Map<String, Long> snapshots = new HashMap<>();
int numDocs = randomIntBetween(200, 300);
totalDocs += numDocs;
try (BackgroundIndexer indexer = new BackgroundIndexer(index, MapperService.SINGLE_MAPPING_NAME, client(), numDocs)) {
int numberOfSnapshots = 5;
for (int i = 0; i < numberOfSnapshots; i++) {
logger.info("--> waiting for {} docs to be indexed ...", numDocs);
long finalTotalDocs1 = totalDocs;
assertBusy(() -> assertEquals(finalTotalDocs1, indexer.totalIndexedDocs()), 120, TimeUnit.SECONDS);
logger.info("--> {} total docs indexed", totalDocs);
String snapshotName = baseSnapshotName + i;
createSnapshot(snapshotRepo, snapshotName, new ArrayList<>());
snapshots.put(snapshotName, totalDocs);
if (i < numberOfSnapshots - 1) {
numDocs = randomIntBetween(200, 300);
indexer.continueIndexing(numDocs);
totalDocs += numDocs;
}
}
}

logger.info("Snapshots Status: " + snapshots);

for (String snapshot : snapshots.keySet()) {
logger.info("Restoring snapshot: {}", snapshot);
assertAcked(client().admin().indices().delete(new DeleteIndexRequest(index)).get());

RestoreSnapshotResponse restoreSnapshotResponse1 = client.admin()
.cluster()
.prepareRestoreSnapshot(snapshotRepo, snapshot)
.setWaitForCompletion(true)
.setIndices()
.get();

assertEquals(RestStatus.OK, restoreSnapshotResponse1.status());

// Verify restored index's stats
ensureGreen(TimeValue.timeValueSeconds(60), index);
long finalTotalDocs = totalDocs;
assertBusy(() -> {
Long hits = client().prepareSearch(index)
.setQuery(matchAllQuery())
.setSize((int) finalTotalDocs)
.storedFields()
.execute()
.actionGet()
.getHits()
.getTotalHits().value;

assertEquals(snapshots.get(snapshot), hits);
});
}
}

@TestIssueLogging(value = "_root:DEBUG", issueUrl = "")
public void testHashedPrefixTranslogMetadataCombination() throws Exception {
Settings settings = Settings.builder()
.put(RemoteStoreSettings.CLUSTER_REMOTE_STORE_PATH_TYPE_SETTING.getKey(), randomFrom(RemoteStoreEnums.PathType.values()))
.put(RemoteStoreSettings.CLUSTER_REMOTE_STORE_TRANSLOG_METADATA.getKey(), randomBoolean())
.build();

internalCluster().startClusterManagerOnlyNode(settings);
internalCluster().startDataOnlyNode(settings);
String index = "test-index";
String snapshotRepo = "test-restore-snapshot-repo";
String baseSnapshotName = "snapshot_";
Path absolutePath1 = randomRepoPath().toAbsolutePath();
logger.info("Snapshot Path [{}]", absolutePath1);

createRepository(snapshotRepo, "fs", getRepositorySettings(absolutePath1, true));

Client client = client();
Settings indexSettings = Settings.builder()
.put(super.indexSettings())
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.build();

createIndex(index, indexSettings);
ensureGreen(index);

RemoteStorePinnedTimestampService remoteStorePinnedTimestampService = internalCluster().getInstance(
RemoteStorePinnedTimestampService.class,
primaryNodeName(index)
);

remoteStorePinnedTimestampService.rescheduleAsyncUpdatePinnedTimestampTask(TimeValue.timeValueSeconds(randomIntBetween(1, 5)));
RemoteStoreSettings.setPinnedTimestampsLookbackInterval(TimeValue.timeValueSeconds(randomIntBetween(1, 5)));

long totalDocs = 0;
Map<String, Long> snapshots = new HashMap<>();
int numDocs = randomIntBetween(200, 300);
totalDocs += numDocs;
try (BackgroundIndexer indexer = new BackgroundIndexer(index, MapperService.SINGLE_MAPPING_NAME, client(), numDocs)) {
int numberOfSnapshots = 5;
for (int i = 0; i < numberOfSnapshots; i++) {
logger.info("--> waiting for {} docs to be indexed ...", numDocs);
long finalTotalDocs1 = totalDocs;
assertBusy(() -> assertEquals(finalTotalDocs1, indexer.totalIndexedDocs()), 120, TimeUnit.SECONDS);
logger.info("--> {} total docs indexed", totalDocs);
String snapshotName = baseSnapshotName + i;
createSnapshot(snapshotRepo, snapshotName, new ArrayList<>());
snapshots.put(snapshotName, totalDocs);
if (i < numberOfSnapshots - 1) {
numDocs = randomIntBetween(200, 300);
indexer.continueIndexing(numDocs);
totalDocs += numDocs;
}
}
}

logger.info("Snapshots Status: " + snapshots);

for (String snapshot : snapshots.keySet()) {
logger.info("Restoring snapshot: {}", snapshot);

if (randomBoolean()) {
assertAcked(client().admin().indices().delete(new DeleteIndexRequest(index)).get());
} else {
assertAcked(client().admin().indices().prepareClose(index));
}

assertTrue(
internalCluster().client()
.admin()
.cluster()
.prepareUpdateSettings()
.setTransientSettings(
Settings.builder()
.put(CLUSTER_REMOTE_STORE_PATH_TYPE_SETTING.getKey(), randomFrom(RemoteStoreEnums.PathType.values()))
.put(RemoteStoreSettings.CLUSTER_REMOTE_STORE_TRANSLOG_METADATA.getKey(), randomBoolean())
)
.get()
.isAcknowledged()
);

RestoreSnapshotResponse restoreSnapshotResponse1 = client.admin()
.cluster()
.prepareRestoreSnapshot(snapshotRepo, snapshot)
.setWaitForCompletion(true)
.setIndices()
.get();

assertEquals(RestStatus.OK, restoreSnapshotResponse1.status());

// Verify restored index's stats
ensureGreen(TimeValue.timeValueSeconds(60), index);
long finalTotalDocs = totalDocs;
assertBusy(() -> {
Long hits = client().prepareSearch(index)
.setQuery(matchAllQuery())
.setSize((int) finalTotalDocs)
.storedFields()
.execute()
.actionGet()
.getHits()
.getTotalHits().value;

assertEquals(snapshots.get(snapshot), hits);
});
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -861,9 +861,11 @@ public void deleteStaleSegments(int lastNMetadataFilesToKeep) throws IOException

Tuple<Long, Set<Long>> pinnedTimestampsState = RemoteStorePinnedTimestampService.getPinnedTimestamps();

Set<Long> pinnedTimestamps = new HashSet<>(pinnedTimestampsState.v2());
pinnedTimestamps.add(pinnedTimestampsState.v1());
Set<String> implicitLockedFiles = RemoteStoreUtils.getPinnedTimestampLockedFiles(
sortedMetadataFileList,
pinnedTimestampsState.v2(),
pinnedTimestamps,
metadataFilePinnedTimestampMap,
MetadataFilenameUtils::getTimestamp,
MetadataFilenameUtils::getNodeIdByPrimaryTermAndGen
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -345,9 +345,11 @@ protected static List<String> getMetadataFilesToBeDeleted(
);

// Get md files matching pinned timestamps
Set<Long> pinnedTimestamps = new HashSet<>(pinnedTimestampsState.v2());
pinnedTimestamps.add(pinnedTimestampsState.v1());
Set<String> implicitLockedFiles = RemoteStoreUtils.getPinnedTimestampLockedFiles(
metadataFilesToBeDeleted,
pinnedTimestampsState.v2(),
pinnedTimestamps,
metadataFilePinnedTimestampMap,
file -> RemoteStoreUtils.invertLong(file.split(METADATA_SEPARATOR)[3]),
TranslogTransferMetadata::getNodeIdByPrimaryTermAndGen
Expand Down
Loading

0 comments on commit 6d86b38

Please sign in to comment.