Skip to content

Commit

Permalink
[Remote Store] Add support for refresh level durability (#5253)
Browse files Browse the repository at this point in the history
* Add support for refresh level durability

Signed-off-by: Sachin Kale <kalsac@amazon.com>
  • Loading branch information
sachinpkale authored Dec 16, 2022
1 parent cb26035 commit d27c30a
Show file tree
Hide file tree
Showing 8 changed files with 182 additions and 25 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,10 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Bump `opencensus-contrib-http-util` from 0.18.0 to 0.31.1 ([#3633](https://github.com/opensearch-project/OpenSearch/pull/3633))
- Bump `geoip2` from 3.0.1 to 3.0.2 ([#5103](https://github.com/opensearch-project/OpenSearch/pull/5103))
- Bump gradle-extra-configurations-plugin from 7.0.0 to 8.0.0 ([#4808](https://github.com/opensearch-project/OpenSearch/pull/4808))

### Changed
- Add support for refresh level durability ([#5253](https://github.com/opensearch-project/OpenSearch/pull/5253))

### Deprecated
- Refactor fuzziness interface on query builders ([#5433](https://github.com/opensearch-project/OpenSearch/pull/5433))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2748,7 +2748,7 @@ public long tryDeleteDocument(IndexReader readerIn, int docID) {
/**
* Returned the last local checkpoint value has been refreshed internally.
*/
final long lastRefreshedCheckpoint() {
public final long lastRefreshedCheckpoint() {
return lastRefreshedCheckpointListener.refreshedCheckpoint.get();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,16 @@
import org.apache.lucene.store.FilterDirectory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.IndexOutput;
import org.opensearch.common.concurrent.GatedCloseable;
import org.opensearch.index.engine.EngineException;
import org.opensearch.index.engine.InternalEngine;
import org.opensearch.index.seqno.SequenceNumbers;
import org.opensearch.index.store.RemoteSegmentStoreDirectory;

import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
Expand All @@ -34,6 +38,8 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;

import static org.opensearch.index.seqno.SequenceNumbers.LOCAL_CHECKPOINT_KEY;

/**
* RefreshListener implementation to upload newly created segment files to the remote store
*
Expand All @@ -44,6 +50,7 @@ public final class RemoteStoreRefreshListener implements ReferenceManager.Refres
static final Set<String> EXCLUDE_FILES = Set.of("write.lock");
// Visible for testing
static final int LAST_N_METADATA_FILES_TO_KEEP = 10;
static final String SEGMENT_INFO_SNAPSHOT_FILENAME_PREFIX = "segment_infos_snapshot_filename";

private final IndexShard indexShard;
private final Directory storeDirectory;
Expand Down Expand Up @@ -88,46 +95,67 @@ public void afterRefresh(boolean didRefresh) {
this.remoteDirectory.init();
}
try {
String lastCommittedLocalSegmentFileName = SegmentInfos.getLastCommitSegmentsFileName(storeDirectory);
if (!remoteDirectory.containsFile(
lastCommittedLocalSegmentFileName,
getChecksumOfLocalFile(lastCommittedLocalSegmentFileName)
)) {
// if a new segments_N file is present in local that is not uploaded to remote store yet, it
// is considered as a first refresh post commit. A cleanup of stale commit files is triggered.
// This is done to avoid delete post each refresh.
// Ideally, we want this to be done in async flow. (GitHub issue #4315)
if (isRefreshAfterCommit()) {
deleteStaleCommits();
}

String segmentInfoSnapshotFilename = null;
try (GatedCloseable<SegmentInfos> segmentInfosGatedCloseable = indexShard.getSegmentInfosSnapshot()) {
SegmentInfos segmentInfos = segmentInfosGatedCloseable.get();
Collection<String> refreshedLocalFiles = segmentInfos.files(true);

List<String> segmentInfosFiles = refreshedLocalFiles.stream()
Collection<String> localSegmentsPostRefresh = segmentInfos.files(true);

List<String> segmentInfosFiles = localSegmentsPostRefresh.stream()
.filter(file -> file.startsWith(IndexFileNames.SEGMENTS))
.collect(Collectors.toList());
Optional<String> latestSegmentInfos = segmentInfosFiles.stream()
.max(Comparator.comparingLong(IndexFileNames::parseGeneration));
.max(Comparator.comparingLong(SegmentInfos::generationFromSegmentsFileName));

if (latestSegmentInfos.isPresent()) {
refreshedLocalFiles.addAll(SegmentInfos.readCommit(storeDirectory, latestSegmentInfos.get()).files(true));
// SegmentInfosSnapshot is a snapshot of reader's view of segments and may not contain
// all the segments from last commit if they are merged away but not yet committed.
// Each metadata file in the remote segment store represents a commit and the following
// statement keeps sure that each metadata will always contain all the segments from last commit + refreshed
// segments.
localSegmentsPostRefresh.addAll(
SegmentInfos.readCommit(storeDirectory, latestSegmentInfos.get()).files(true)
);
segmentInfosFiles.stream()
.filter(file -> !file.equals(latestSegmentInfos.get()))
.forEach(refreshedLocalFiles::remove);
.forEach(localSegmentsPostRefresh::remove);

boolean uploadStatus = uploadNewSegments(refreshedLocalFiles);
boolean uploadStatus = uploadNewSegments(localSegmentsPostRefresh);
if (uploadStatus) {
segmentInfoSnapshotFilename = uploadSegmentInfosSnapshot(latestSegmentInfos.get(), segmentInfos);
localSegmentsPostRefresh.add(segmentInfoSnapshotFilename);

remoteDirectory.uploadMetadata(
refreshedLocalFiles,
localSegmentsPostRefresh,
storeDirectory,
indexShard.getOperationPrimaryTerm(),
segmentInfos.getGeneration()
);
localSegmentChecksumMap.keySet()
.stream()
.filter(file -> !refreshedLocalFiles.contains(file))
.filter(file -> !localSegmentsPostRefresh.contains(file))
.collect(Collectors.toSet())
.forEach(localSegmentChecksumMap::remove);
}
}
} catch (EngineException e) {
logger.warn("Exception while reading SegmentInfosSnapshot", e);
} finally {
try {
if (segmentInfoSnapshotFilename != null) {
storeDirectory.deleteFile(segmentInfoSnapshotFilename);
}
} catch (IOException e) {
logger.warn("Exception while deleting: " + segmentInfoSnapshotFilename, e);
}
}
} catch (IOException e) {
// We don't want to fail refresh if upload of new segments fails. The missed segments will be re-tried
Expand All @@ -141,6 +169,39 @@ public void afterRefresh(boolean didRefresh) {
}
}

private boolean isRefreshAfterCommit() throws IOException {
String lastCommittedLocalSegmentFileName = SegmentInfos.getLastCommitSegmentsFileName(storeDirectory);
return (lastCommittedLocalSegmentFileName != null
&& !remoteDirectory.containsFile(lastCommittedLocalSegmentFileName, getChecksumOfLocalFile(lastCommittedLocalSegmentFileName)));
}

String uploadSegmentInfosSnapshot(String latestSegmentsNFilename, SegmentInfos segmentInfosSnapshot) throws IOException {
// We use lastRefreshedCheckpoint as local checkpoint for the SegmentInfosSnapshot. This is better than using
// getProcessedLocalCheckpoint() as processedCheckpoint can advance between reading the value and setting up
// in SegmentInfos.userData. This could lead to data loss as, during recovery, translog will be replayed based on
// LOCAL_CHECKPOINT_KEY.
// lastRefreshedCheckpoint is updated after refresh listeners are executed, this means, InternalEngine.lastRefreshedCheckpoint()
// will return checkpoint of last refresh but that does not impact the correctness as duplicate sequence numbers
// will not be replayed.
assert indexShard.getEngine() instanceof InternalEngine : "Expected shard with InternalEngine, got: "
+ indexShard.getEngine().getClass();
final long lastRefreshedCheckpoint = ((InternalEngine) indexShard.getEngine()).lastRefreshedCheckpoint();

Map<String, String> userData = segmentInfosSnapshot.getUserData();
userData.put(LOCAL_CHECKPOINT_KEY, String.valueOf(lastRefreshedCheckpoint));
userData.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(lastRefreshedCheckpoint));
segmentInfosSnapshot.setUserData(userData, false);

long commitGeneration = SegmentInfos.generationFromSegmentsFileName(latestSegmentsNFilename);
String segmentInfoSnapshotFilename = SEGMENT_INFO_SNAPSHOT_FILENAME_PREFIX + "__" + commitGeneration;
try (IndexOutput indexOutput = storeDirectory.createOutput(segmentInfoSnapshotFilename, IOContext.DEFAULT)) {
segmentInfosSnapshot.write(indexOutput);
}
storeDirectory.sync(Collections.singleton(segmentInfoSnapshotFilename));
remoteDirectory.copyFrom(storeDirectory, segmentInfoSnapshotFilename, segmentInfoSnapshotFilename, IOContext.DEFAULT, true);
return segmentInfoSnapshotFilename;
}

// Visible for testing
boolean uploadNewSegments(Collection<String> localFiles) throws IOException {
AtomicBoolean uploadSuccess = new AtomicBoolean(true);
Expand Down
25 changes: 25 additions & 0 deletions server/src/main/java/org/opensearch/index/shard/StoreRecovery.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
import org.apache.lucene.index.NoMergePolicy;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.search.Sort;
import org.apache.lucene.store.BufferedChecksumIndexInput;
import org.apache.lucene.store.ChecksumIndexInput;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FilterDirectory;
import org.apache.lucene.store.IOContext;
Expand Down Expand Up @@ -76,6 +78,8 @@
import java.util.stream.Collectors;

import static org.opensearch.common.unit.TimeValue.timeValueMillis;
import static org.opensearch.index.seqno.SequenceNumbers.LOCAL_CHECKPOINT_KEY;
import static org.opensearch.index.shard.RemoteStoreRefreshListener.SEGMENT_INFO_SNAPSHOT_FILENAME_PREFIX;

/**
* This package private utility class encapsulates the logic to recover an index shard from either an existing index on
Expand Down Expand Up @@ -463,9 +467,30 @@ private void recoverFromRemoteStore(IndexShard indexShard) throws IndexShardReco
for (String file : storeDirectory.listAll()) {
storeDirectory.deleteFile(file);
}
String segmentInfosSnapshotFilename = null;
for (String file : remoteDirectory.listAll()) {
storeDirectory.copyFrom(remoteDirectory, file, file, IOContext.DEFAULT);
if (file.startsWith(SEGMENT_INFO_SNAPSHOT_FILENAME_PREFIX)) {
segmentInfosSnapshotFilename = file;
}
}

if (segmentInfosSnapshotFilename != null) {
try (
ChecksumIndexInput indexInput = new BufferedChecksumIndexInput(
storeDirectory.openInput(segmentInfosSnapshotFilename, IOContext.DEFAULT)
)
) {
SegmentInfos infosSnapshot = SegmentInfos.readCommit(
store.directory(),
indexInput,
Long.parseLong(segmentInfosSnapshotFilename.split("__")[1])
);
long processedLocalCheckpoint = Long.parseLong(infosSnapshot.getUserData().get(LOCAL_CHECKPOINT_KEY));
store.commitSegmentInfos(infosSnapshot, processedLocalCheckpoint, processedLocalCheckpoint);
}
}

// This creates empty trans-log for now
// ToDo: Add code to restore from remote trans-log
bootstrap(indexShard, store);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public final class RemoteSegmentStoreDirectory extends FilterDirectory {
* To prevent explosion of refresh metadata files, we replace refresh files for the given primary term and generation
* This is achieved by uploading refresh metadata file with the same UUID suffix.
*/
private String metadataFileUniqueSuffix;
private String commonFilenameSuffix;

/**
* Keeps track of local segment filename to uploaded filename along with other attributes like checksum.
Expand All @@ -92,7 +92,7 @@ public RemoteSegmentStoreDirectory(RemoteDirectory remoteDataDirectory, RemoteDi
* @throws IOException if there were any failures in reading the metadata file
*/
public void init() throws IOException {
this.metadataFileUniqueSuffix = UUIDs.base64UUID();
this.commonFilenameSuffix = UUIDs.base64UUID();
this.segmentsUploadedToRemoteStore = new ConcurrentHashMap<>(readLatestMetadataFile());
}

Expand Down Expand Up @@ -293,17 +293,26 @@ public IndexInput openInput(String name, IOContext context) throws IOException {
}
}

public void copyFrom(Directory from, String src, String dest, IOContext context, boolean useCommonSuffix) throws IOException {
String remoteFilename;
if (useCommonSuffix) {
remoteFilename = dest + SEGMENT_NAME_UUID_SEPARATOR + this.commonFilenameSuffix;
} else {
remoteFilename = getNewRemoteSegmentFilename(dest);
}
remoteDataDirectory.copyFrom(from, src, remoteFilename, context);
String checksum = getChecksumOfLocalFile(from, src);
UploadedSegmentMetadata segmentMetadata = new UploadedSegmentMetadata(src, remoteFilename, checksum);
segmentsUploadedToRemoteStore.put(src, segmentMetadata);
}

/**
* Copies an existing src file from directory from to a non-existent file dest in this directory.
* Once the segment is uploaded to remote segment store, update the cache accordingly.
*/
@Override
public void copyFrom(Directory from, String src, String dest, IOContext context) throws IOException {
String remoteFilename = getNewRemoteSegmentFilename(dest);
remoteDataDirectory.copyFrom(from, src, remoteFilename, context);
String checksum = getChecksumOfLocalFile(from, src);
UploadedSegmentMetadata segmentMetadata = new UploadedSegmentMetadata(src, remoteFilename, checksum);
segmentsUploadedToRemoteStore.put(src, segmentMetadata);
copyFrom(from, src, dest, context, false);
}

/**
Expand All @@ -330,7 +339,7 @@ public boolean containsFile(String localFilename, String checksum) {
public void uploadMetadata(Collection<String> segmentFiles, Directory storeDirectory, long primaryTerm, long generation)
throws IOException {
synchronized (this) {
String metadataFilename = MetadataFilenameUtils.getMetadataFilename(primaryTerm, generation, this.metadataFileUniqueSuffix);
String metadataFilename = MetadataFilenameUtils.getMetadataFilename(primaryTerm, generation, this.commonFilenameSuffix);
IndexOutput indexOutput = storeDirectory.createOutput(metadataFilename, IOContext.DEFAULT);
Map<String, String> uploadedSegments = new HashMap<>();
for (String file : segmentFiles) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2736,18 +2736,31 @@ public void restoreShard(
closeShards(target);
}

public void testRestoreShardFromRemoteStore() throws IOException {
public void testRefreshLevelRestoreShardFromRemoteStore() throws IOException {
testRestoreShardFromRemoteStore(false);
}

public void testCommitLevelRestoreShardFromRemoteStore() throws IOException {
testRestoreShardFromRemoteStore(true);
}

public void testRestoreShardFromRemoteStore(boolean performFlush) throws IOException {
IndexShard target = newStartedShard(
true,
Settings.builder().put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true).build(),
Settings.builder()
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
.put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true)
.build(),
new InternalEngineFactory()
);

indexDoc(target, "_doc", "1");
indexDoc(target, "_doc", "2");
target.refresh("test");
assertDocs(target, "1", "2");
flushShard(target);
if (performFlush) {
flushShard(target);
}

ShardRouting routing = ShardRoutingHelper.initWithSameId(
target.routingEntry(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.opensearch.index.shard;

import org.apache.lucene.index.IndexFileNames;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FilterDirectory;
Expand All @@ -30,6 +31,8 @@
import java.util.Map;
import java.util.concurrent.CountDownLatch;

import static org.opensearch.index.shard.RemoteStoreRefreshListener.SEGMENT_INFO_SNAPSHOT_FILENAME_PREFIX;

public class RemoteStoreRefreshListenerTests extends IndexShardTestCase {
private IndexShard indexShard;
private RemoteStoreRefreshListener remoteStoreRefreshListener;
Expand Down Expand Up @@ -204,13 +207,29 @@ public void onFailure(Exception e) {
private void verifyUploadedSegments(RemoteSegmentStoreDirectory remoteSegmentStoreDirectory) throws IOException {
Map<String, RemoteSegmentStoreDirectory.UploadedSegmentMetadata> uploadedSegments = remoteSegmentStoreDirectory
.getSegmentsUploadedToRemoteStore();
String segmentsNFilename = null;
try (GatedCloseable<SegmentInfos> segmentInfosGatedCloseable = indexShard.getSegmentInfosSnapshot()) {
SegmentInfos segmentInfos = segmentInfosGatedCloseable.get();
for (String file : segmentInfos.files(true)) {
if (!RemoteStoreRefreshListener.EXCLUDE_FILES.contains(file)) {
assertTrue(uploadedSegments.containsKey(file));
}
if (file.startsWith(IndexFileNames.SEGMENTS)) {
segmentsNFilename = file;
}
}
}
if (segmentsNFilename != null) {
String commitGeneration = segmentsNFilename.substring((IndexFileNames.SEGMENTS + "_").length());
assertTrue(
uploadedSegments.keySet()
.stream()
.anyMatch(
s -> s.startsWith(
SEGMENT_INFO_SNAPSHOT_FILENAME_PREFIX + "__" + Long.parseLong(commitGeneration, Character.MAX_RADIX)
)
)
);
}
}
}
Loading

0 comments on commit d27c30a

Please sign in to comment.