Skip to content

Commit

Permalink
Address PR review comments
Browse files Browse the repository at this point in the history
Signed-off-by: Sachin Kale <kalsac@amazon.com>
  • Loading branch information
Sachin Kale committed Nov 29, 2022
1 parent 1643c89 commit 58a125f
Show file tree
Hide file tree
Showing 5 changed files with 58 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.lucene.store.IndexOutput;
import org.opensearch.common.concurrent.GatedCloseable;
import org.opensearch.index.engine.EngineException;
import org.opensearch.index.seqno.SequenceNumbers;
import org.opensearch.index.store.RemoteSegmentStoreDirectory;

import java.io.IOException;
Expand All @@ -37,6 +38,8 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;

import static org.opensearch.index.seqno.SequenceNumbers.LOCAL_CHECKPOINT_KEY;

/**
* RefreshListener implementation to upload newly created segment files to the remote store
*
Expand All @@ -47,7 +50,7 @@ public final class RemoteStoreRefreshListener implements ReferenceManager.Refres
static final Set<String> EXCLUDE_FILES = Set.of("write.lock");
// Visible for testing
static final int LAST_N_METADATA_FILES_TO_KEEP = 10;
public static final String SEGMENT_INFO_SNAPSHOT_FILENAME = "segment_infos_snapshot_filename";
static final String SEGMENT_INFO_SNAPSHOT_FILENAME_PREFIX = "segment_infos_snapshot_filename";

private final IndexShard indexShard;
private final Directory storeDirectory;
Expand Down Expand Up @@ -92,15 +95,13 @@ public void afterRefresh(boolean didRefresh) {
this.remoteDirectory.init();
}
try {
String lastCommittedLocalSegmentFileName = SegmentInfos.getLastCommitSegmentsFileName(storeDirectory);
if (lastCommittedLocalSegmentFileName != null
&& !remoteDirectory.containsFile(
lastCommittedLocalSegmentFileName,
getChecksumOfLocalFile(lastCommittedLocalSegmentFileName)
)) {
// if a new segments_N file is present in local that is not uploaded to remote store yet, it
// is considered as a first refresh post commit. A cleanup of stale commit files is triggered.
// Ideally, we want this to be done in async flow. (GitHub issue #4315)
if (isRefreshAfterCommit()) {
deleteStaleCommits();
}
String segment_info_snapshot_filename = null;
String segmentInfoSnapshotFilename = null;
try (GatedCloseable<SegmentInfos> segmentInfosGatedCloseable = indexShard.getSegmentInfosSnapshot()) {
SegmentInfos segmentInfos = segmentInfosGatedCloseable.get();

Expand All @@ -122,8 +123,8 @@ public void afterRefresh(boolean didRefresh) {
boolean uploadStatus = uploadNewSegments(refreshedLocalFiles);
if (uploadStatus) {
if (segmentFilesFromSnapshot.equals(new HashSet<>(refreshedLocalFiles))) {
segment_info_snapshot_filename = uploadSegmentInfosSnapshot(latestSegmentInfos.get(), segmentInfos);
refreshedLocalFiles.add(segment_info_snapshot_filename);
segmentInfoSnapshotFilename = uploadSegmentInfosSnapshot(latestSegmentInfos.get(), segmentInfos);
refreshedLocalFiles.add(segmentInfoSnapshotFilename);
}

remoteDirectory.uploadMetadata(
Expand All @@ -143,11 +144,11 @@ public void afterRefresh(boolean didRefresh) {
logger.warn("Exception while reading SegmentInfosSnapshot", e);
} finally {
try {
if (segment_info_snapshot_filename != null) {
storeDirectory.deleteFile(segment_info_snapshot_filename);
if (segmentInfoSnapshotFilename != null) {
storeDirectory.deleteFile(segmentInfoSnapshotFilename);
}
} catch (IOException e) {
logger.warn("Exception while deleting: " + segment_info_snapshot_filename, e);
logger.warn("Exception while deleting: " + segmentInfoSnapshotFilename, e);
}
}
} catch (IOException e) {
Expand All @@ -162,16 +163,33 @@ public void afterRefresh(boolean didRefresh) {
}
}

/**
*
* @return true
* @throws IOException
*/
private boolean isRefreshAfterCommit() throws IOException {
String lastCommittedLocalSegmentFileName = SegmentInfos.getLastCommitSegmentsFileName(storeDirectory);
return (lastCommittedLocalSegmentFileName != null
&& !remoteDirectory.containsFile(lastCommittedLocalSegmentFileName, getChecksumOfLocalFile(lastCommittedLocalSegmentFileName)));
}

String uploadSegmentInfosSnapshot(String latestSegmentsNFilename, SegmentInfos segmentInfosSnapshot) throws IOException {
long localCheckpoint = indexShard.getEngine().getProcessedLocalCheckpoint();
String commitGeneration = latestSegmentsNFilename.substring((IndexFileNames.SEGMENTS + "_").length());
String segment_info_snapshot_filename = SEGMENT_INFO_SNAPSHOT_FILENAME + "__" + commitGeneration + "__" + localCheckpoint;
IndexOutput indexOutput = storeDirectory.createOutput(segment_info_snapshot_filename, IOContext.DEFAULT);
segmentInfosSnapshot.write(indexOutput);
indexOutput.close();
storeDirectory.sync(Collections.singleton(segment_info_snapshot_filename));
remoteDirectory.copyFrom(storeDirectory, segment_info_snapshot_filename, segment_info_snapshot_filename, IOContext.DEFAULT, true);
return segment_info_snapshot_filename;
long processedLocalCheckpoint = indexShard.getEngine().getProcessedLocalCheckpoint();

Map<String, String> userData = segmentInfosSnapshot.getUserData();
userData.put(LOCAL_CHECKPOINT_KEY, String.valueOf(processedLocalCheckpoint));
userData.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(processedLocalCheckpoint));
segmentInfosSnapshot.setUserData(userData, false);

long commitGeneration = IndexFileNames.parseGeneration(latestSegmentsNFilename);
String segmentInfoSnapshotFilename = SEGMENT_INFO_SNAPSHOT_FILENAME_PREFIX + "__" + commitGeneration;
try (IndexOutput indexOutput = storeDirectory.createOutput(segmentInfoSnapshotFilename, IOContext.DEFAULT)) {
segmentInfosSnapshot.write(indexOutput);
}
storeDirectory.sync(Collections.singleton(segmentInfoSnapshotFilename));
remoteDirectory.copyFrom(storeDirectory, segmentInfoSnapshotFilename, segmentInfoSnapshotFilename, IOContext.DEFAULT, true);
return segmentInfoSnapshotFilename;
}

// Visible for testing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@
import java.util.stream.Collectors;

import static org.opensearch.common.unit.TimeValue.timeValueMillis;
import static org.opensearch.index.shard.RemoteStoreRefreshListener.SEGMENT_INFO_SNAPSHOT_FILENAME;
import static org.opensearch.index.seqno.SequenceNumbers.LOCAL_CHECKPOINT_KEY;
import static org.opensearch.index.shard.RemoteStoreRefreshListener.SEGMENT_INFO_SNAPSHOT_FILENAME_PREFIX;

/**
* This package private utility class encapsulates the logic to recover an index shard from either an existing index on
Expand Down Expand Up @@ -469,24 +470,24 @@ private void recoverFromRemoteStore(IndexShard indexShard) throws IndexShardReco
String segmentInfosSnapshotFilename = null;
for (String file : remoteDirectory.listAll()) {
storeDirectory.copyFrom(remoteDirectory, file, file, IOContext.DEFAULT);
if (file.startsWith(SEGMENT_INFO_SNAPSHOT_FILENAME)) {
if (file.startsWith(SEGMENT_INFO_SNAPSHOT_FILENAME_PREFIX)) {
segmentInfosSnapshotFilename = file;
}
}

if (segmentInfosSnapshotFilename != null) {
String[] filenameTokens = segmentInfosSnapshotFilename.split("__");
try (
ChecksumIndexInput indexInput = new BufferedChecksumIndexInput(
storeDirectory.openInput(segmentInfosSnapshotFilename, IOContext.DEFAULT)
)
) {
SegmentInfos infos_snapshot = SegmentInfos.readCommit(
SegmentInfos infosSnapshot = SegmentInfos.readCommit(
store.directory(),
indexInput,
Integer.parseInt(filenameTokens[1])
Integer.parseInt(segmentInfosSnapshotFilename.split("__")[1])
);
store.commitSegmentInfos(infos_snapshot, Long.parseLong(filenameTokens[2]), Long.parseLong(filenameTokens[2]));
long processedLocalCheckpoint = Long.parseLong(infosSnapshot.getUserData().get(LOCAL_CHECKPOINT_KEY));
store.commitSegmentInfos(infosSnapshot, processedLocalCheckpoint, processedLocalCheckpoint);
} catch (IOException e) {
logger.info("Exception while reading {}, falling back to commit level restore", segmentInfosSnapshotFilename);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public final class RemoteSegmentStoreDirectory extends FilterDirectory {
* To prevent explosion of refresh metadata files, we replace refresh files for the given primary term and generation
* This is achieved by uploading refresh metadata file with the same UUID suffix.
*/
private String metadataFileUniqueSuffix;
private String commonFilenameSuffix;

/**
* Keeps track of local segment filename to uploaded filename along with other attributes like checksum.
Expand All @@ -92,7 +92,7 @@ public RemoteSegmentStoreDirectory(RemoteDirectory remoteDataDirectory, RemoteDi
* @throws IOException if there were any failures in reading the metadata file
*/
public void init() throws IOException {
this.metadataFileUniqueSuffix = UUIDs.base64UUID();
this.commonFilenameSuffix = UUIDs.base64UUID();
this.segmentsUploadedToRemoteStore = new ConcurrentHashMap<>(readLatestMetadataFile());
}

Expand Down Expand Up @@ -293,10 +293,10 @@ public IndexInput openInput(String name, IOContext context) throws IOException {
}
}

public void copyFrom(Directory from, String src, String dest, IOContext context, boolean override) throws IOException {
public void copyFrom(Directory from, String src, String dest, IOContext context, boolean useCommonSuffix) throws IOException {
String remoteFilename;
if (override && segmentsUploadedToRemoteStore.containsKey(dest)) {
remoteFilename = segmentsUploadedToRemoteStore.get(dest).uploadedFilename;
if (useCommonSuffix) {
remoteFilename = dest + SEGMENT_NAME_UUID_SEPARATOR + this.commonFilenameSuffix;
} else {
remoteFilename = getNewRemoteSegmentFilename(dest);
}
Expand Down Expand Up @@ -339,7 +339,7 @@ public boolean containsFile(String localFilename, String checksum) {
public void uploadMetadata(Collection<String> segmentFiles, Directory storeDirectory, long primaryTerm, long generation)
throws IOException {
synchronized (this) {
String metadataFilename = MetadataFilenameUtils.getMetadataFilename(primaryTerm, generation, this.metadataFileUniqueSuffix);
String metadataFilename = MetadataFilenameUtils.getMetadataFilename(primaryTerm, generation, this.commonFilenameSuffix);
IndexOutput indexOutput = storeDirectory.createOutput(metadataFilename, IOContext.DEFAULT);
Map<String, String> uploadedSegments = new HashMap<>();
for (String file : segmentFiles) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
import java.util.Map;
import java.util.concurrent.CountDownLatch;

import static org.opensearch.index.shard.RemoteStoreRefreshListener.SEGMENT_INFO_SNAPSHOT_FILENAME;
import static org.opensearch.index.shard.RemoteStoreRefreshListener.SEGMENT_INFO_SNAPSHOT_FILENAME_PREFIX;

public class RemoteStoreRefreshListenerTests extends IndexShardTestCase {
private IndexShard indexShard;
Expand Down Expand Up @@ -222,7 +222,9 @@ private void verifyUploadedSegments(RemoteSegmentStoreDirectory remoteSegmentSto
if (segmentsNFilename != null) {
String commitGeneration = segmentsNFilename.substring((IndexFileNames.SEGMENTS + "_").length());
assertTrue(
uploadedSegments.keySet().stream().anyMatch(s -> s.startsWith(SEGMENT_INFO_SNAPSHOT_FILENAME + "__" + commitGeneration))
uploadedSegments.keySet()
.stream()
.anyMatch(s -> s.startsWith(SEGMENT_INFO_SNAPSHOT_FILENAME_PREFIX + "__" + commitGeneration))
);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ public void testCopyFromException() throws IOException {
storeDirectory.close();
}

public void testCpoyFromOverride() throws IOException {
public void testCopyFromOverride() throws IOException {
String filename = "_100.si";
populateMetadata();
remoteSegmentStoreDirectory.init();
Expand Down

0 comments on commit 58a125f

Please sign in to comment.