Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Remote Translog] Add support for downloading files from remote translog #5649

Merged
merged 12 commits into from
Jan 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -2319,10 +2319,10 @@ public void recoverFromStore(ActionListener<Boolean> listener) {
storeRecovery.recoverFromStore(this, listener);
}

public void restoreFromRemoteStore(ActionListener<Boolean> listener) {
public void restoreFromRemoteStore(Repository repository, ActionListener<Boolean> listener) {
assert shardRouting.primary() : "recover from store only makes sense if the shard is a primary shard";
StoreRecovery storeRecovery = new StoreRecovery(shardId, logger);
storeRecovery.recoverFromRemoteStore(this, listener);
storeRecovery.recoverFromRemoteStore(this, repository, listener);
}

public void restoreFromRepository(Repository repository, ActionListener<Boolean> listener) {
Expand Down Expand Up @@ -3098,7 +3098,14 @@ public void startRecovery(
executeRecovery("from store", recoveryState, recoveryListener, this::recoverFromStore);
break;
case REMOTE_STORE:
executeRecovery("from remote store", recoveryState, recoveryListener, this::restoreFromRemoteStore);
final Repository remoteTranslogRepo;
Copy link
Member

@ashking94 ashking94 Dec 28, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we could set remoteTranslogRepo as null here and then get rid of else block?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it possible to push down some of this logic to restoreFromRemoteStore method?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remoteTranslogRepo needs to be final as it is provided as argument to a lambda, so we can't initialize it twice.

restoreFromRemoteStore does not have reference to repositoriesService to fetch the repository.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we please use Optional instead?

final String remoteTranslogRepoName = indexSettings.getRemoteStoreTranslogRepository();
if (remoteTranslogRepoName != null) {
remoteTranslogRepo = repositoriesService.repository(remoteTranslogRepoName);
} else {
remoteTranslogRepo = null;
}
executeRecovery("from remote store", recoveryState, recoveryListener, l -> restoreFromRemoteStore(remoteTranslogRepo, l));
break;
case PEER:
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,16 @@
import org.opensearch.index.seqno.SequenceNumbers;
import org.opensearch.index.snapshots.IndexShardRestoreFailedException;
import org.opensearch.index.store.Store;
import org.opensearch.index.translog.RemoteFsTranslog;
import org.opensearch.index.translog.Translog;
import org.opensearch.index.translog.transfer.FileTransferTracker;
import org.opensearch.index.translog.transfer.TranslogTransferManager;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.indices.replication.common.ReplicationLuceneIndex;
import org.opensearch.repositories.IndexId;
import org.opensearch.repositories.Repository;
import org.opensearch.repositories.blobstore.BlobStoreRepository;
import org.opensearch.threadpool.ThreadPool;

import java.io.IOException;
import java.util.Arrays;
Expand Down Expand Up @@ -114,13 +119,13 @@ void recoverFromStore(final IndexShard indexShard, ActionListener<Boolean> liste
}
}

void recoverFromRemoteStore(final IndexShard indexShard, ActionListener<Boolean> listener) {
void recoverFromRemoteStore(final IndexShard indexShard, Repository repository, ActionListener<Boolean> listener) {
if (canRecover(indexShard)) {
RecoverySource.Type recoveryType = indexShard.recoveryState().getRecoverySource().getType();
assert recoveryType == RecoverySource.Type.REMOTE_STORE : "expected remote store recovery type but was: " + recoveryType;
ActionListener.completeWith(recoveryListener(indexShard, listener), () -> {
logger.debug("starting recovery from remote store ...");
recoverFromRemoteStore(indexShard);
recoverFromRemoteStore(indexShard, repository);
return true;
});
} else {
Expand Down Expand Up @@ -435,7 +440,7 @@ private ActionListener<Boolean> recoveryListener(IndexShard indexShard, ActionLi
});
}

private void recoverFromRemoteStore(IndexShard indexShard) throws IndexShardRecoveryException {
private void recoverFromRemoteStore(IndexShard indexShard, Repository repository) throws IndexShardRecoveryException {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use Optional here as well?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Optional<Repository> as a parameter? But optional generally used in the return type, right?

final Store remoteStore = indexShard.remoteStore();
if (remoteStore == null) {
throw new IndexShardRecoveryException(
Expand All @@ -453,9 +458,12 @@ private void recoverFromRemoteStore(IndexShard indexShard) throws IndexShardReco
// Download segments from remote segment store
indexShard.syncSegmentsFromRemoteSegmentStore(true);

// This creates empty trans-log for now
// ToDo: Add code to restore from remote trans-log
bootstrap(indexShard, store);
if (repository != null) {
syncTranslogFilesFromRemoteTranslog(indexShard, repository);
} else {
bootstrap(indexShard, store);
}

assert indexShard.shardRouting.primary() : "only primary shards can recover from store";
indexShard.recoveryState().getIndex().setFileDetailsComplete();
indexShard.openEngineAndRecoverFromTranslog();
Expand All @@ -470,6 +478,19 @@ private void recoverFromRemoteStore(IndexShard indexShard) throws IndexShardReco
}
}

private void syncTranslogFilesFromRemoteTranslog(IndexShard indexShard, Repository repository) throws IOException {
assert repository instanceof BlobStoreRepository : "repository should be instance of BlobStoreRepository";
BlobStoreRepository blobStoreRepository = (BlobStoreRepository) repository;
FileTransferTracker fileTransferTracker = new FileTransferTracker(shardId);
TranslogTransferManager translogTransferManager = RemoteFsTranslog.buildTranslogTransferManager(
blobStoreRepository,
indexShard.getThreadPool().executor(ThreadPool.Names.TRANSLOG_TRANSFER),
shardId,
fileTransferTracker
);
RemoteFsTranslog.download(translogTransferManager, indexShard.shardPath().resolveTranslog());
}

Comment on lines +481 to +493
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There seems to be a problem here

  1. There will be two different instances of FileTransferTracker one created here and one created by RemoteFsTranslog which will cause inconsistency in file tracking.
  2. The approach looks like a quick fix to workaround the dependency management and instance creation.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The approach looks like a quick fix to workaround the dependency management and instance creation.

Yes, agree. This was required as RemoteFsTranslog.download is a static method. It needs to be static as, at some places in the OpenSearch code, we refer to translog files without using Translog class. In the long term, we need to change this behavior and make sure translog files are always accessed via Translog object.

There will be two different instances of FileTransferTracker one created here and one created by RemoteFsTranslog which will cause inconsistency in file tracking.

The FileTransferTracker instance we create here will not be used to track the upload of files, the one created in RemoteFsTranslog will be used. This is not ideal way to handle it but will be fixed with the solution proposed in the first reply above.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Created a tracking issue: #5679

/**
* Recovers the state of the shard from the store.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,25 @@

package org.opensearch.index.translog;

import org.opensearch.common.io.FileSystemUtils;
import org.opensearch.common.lease.Releasable;
import org.opensearch.common.lease.Releasables;
import org.opensearch.common.util.concurrent.ReleasableLock;
import org.opensearch.core.internal.io.IOUtils;
import org.opensearch.index.shard.ShardId;
import org.opensearch.index.translog.transfer.BlobStoreTransferService;
import org.opensearch.index.translog.transfer.FileTransferTracker;
import org.opensearch.index.translog.transfer.TransferSnapshot;
import org.opensearch.index.translog.transfer.TranslogCheckpointTransferSnapshot;
import org.opensearch.index.translog.transfer.TranslogTransferManager;
import org.opensearch.index.translog.transfer.TranslogTransferMetadata;
import org.opensearch.index.translog.transfer.listener.TranslogTransferListener;
import org.opensearch.repositories.blobstore.BlobStoreRepository;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.function.LongConsumer;
Expand Down Expand Up @@ -54,15 +60,11 @@ public RemoteFsTranslog(
super(config, translogUUID, deletionPolicy, globalCheckpointSupplier, primaryTermSupplier, persistedSequenceNumberConsumer);
this.blobStoreRepository = blobStoreRepository;
fileTransferTracker = new FileTransferTracker(shardId);
this.translogTransferManager = new TranslogTransferManager(
new BlobStoreTransferService(blobStoreRepository.blobStore(), executorService),
blobStoreRepository.basePath().add(shardId.getIndex().getUUID()).add(String.valueOf(shardId.id())),
fileTransferTracker,
fileTransferTracker::exclusionFilter
);
this.translogTransferManager = buildTranslogTransferManager(blobStoreRepository, executorService, shardId, fileTransferTracker);

try {
final Checkpoint checkpoint = readCheckpoint(location);
download(translogTransferManager, location);
Checkpoint checkpoint = readCheckpoint(location);
this.readers.addAll(recoverFromFiles(checkpoint));
if (readers.isEmpty()) {
throw new IllegalStateException("at least one reader must be recovered");
Expand Down Expand Up @@ -94,6 +96,45 @@ public RemoteFsTranslog(
}
}

public static void download(TranslogTransferManager translogTransferManager, Path location) throws IOException {

TranslogTransferMetadata translogMetadata = translogTransferManager.readMetadata();
if (translogMetadata != null) {
Comment on lines +101 to +102
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor, but if you return an Optional from readMetadata() this all becomes a bit more explicit with

translogTransferManager.readMetadata().ifPresent(translogMetadata -> {
   ...
});

and you don't need the orElse(null) below or to worry about NPEs.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With this, we will not be able to throw checked exception from download method. IMO, the caller should handle failures in the download method.

if (Files.notExists(location)) {
Files.createDirectories(location);
}
// Delete translog files on local before downloading from remote
for (Path file : FileSystemUtils.files(location)) {
Files.delete(file);
}
Map<String, String> generationToPrimaryTermMapper = translogMetadata.getGenerationToPrimaryTermMapper();
for (long i = translogMetadata.getGeneration(); i >= translogMetadata.getMinTranslogGeneration(); i--) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if segments upload is lagging, then just downloading the most recent translog might not be enough. Can we create a tracking issue? If it exists already, pls do share.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Already tracking it here: #3754

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

More details are present in #5567 .

String generation = Long.toString(i);
translogTransferManager.downloadTranslog(generationToPrimaryTermMapper.get(generation), generation, location);
}
// We copy the latest generation .ckp file to translog.ckp so that flows that depend on
// existence of translog.ckp file work in the same way
Files.copy(
location.resolve(Translog.getCommitCheckpointFileName(translogMetadata.getGeneration())),
location.resolve(Translog.CHECKPOINT_FILE_NAME)
);
}
}

public static TranslogTransferManager buildTranslogTransferManager(
BlobStoreRepository blobStoreRepository,
ExecutorService executorService,
ShardId shardId,
FileTransferTracker fileTransferTracker
) {
return new TranslogTransferManager(
new BlobStoreTransferService(blobStoreRepository.blobStore(), executorService),
blobStoreRepository.basePath().add(shardId.getIndex().getUUID()).add(String.valueOf(shardId.id())),
fileTransferTracker,
fileTransferTracker::exclusionFilter
);
}

@Override
public boolean ensureSynced(Location location) throws IOException {
try (ReleasableLock ignored = writeLock.acquire()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,19 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.store.IndexInput;
import org.opensearch.action.ActionListener;
import org.opensearch.action.LatchedActionListener;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.lucene.store.ByteArrayIndexInput;
import org.opensearch.index.translog.Translog;
import org.opensearch.index.translog.transfer.listener.FileTransferListener;
import org.opensearch.index.translog.transfer.listener.TranslogTransferListener;

import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -128,6 +134,50 @@ public boolean transferSnapshot(TransferSnapshot transferSnapshot, TranslogTrans
}
}

public boolean downloadTranslog(String primaryTerm, String generation, Path location) throws IOException {
logger.info(
"Downloading translog files with: Primary Term = {}, Generation = {}, Location = {}",
primaryTerm,
generation,
location
);
// Download Checkpoint file from remote to local FS
String ckpFileName = Translog.getCommitCheckpointFileName(Long.parseLong(generation));
downloadToFS(ckpFileName, location, primaryTerm);
// Download translog file from remote to local FS
String translogFilename = Translog.getFilename(Long.parseLong(generation));
downloadToFS(translogFilename, location, primaryTerm);
return true;
}

private void downloadToFS(String fileName, Path location, String primaryTerm) throws IOException {
Path filePath = location.resolve(fileName);
// Here, we always override the existing file if present.
// We need to change this logic when we introduce incremental download
if (Files.exists(filePath)) {
Files.delete(filePath);
}
try (InputStream inputStream = transferService.downloadBlob(remoteBaseTransferPath.add(primaryTerm), fileName)) {
Files.copy(inputStream, filePath);
}
}

public TranslogTransferMetadata readMetadata() throws IOException {
return transferService.listAll(remoteMetadaTransferPath)
.stream()
.max(TranslogTransferMetadata.METADATA_FILENAME_COMPARATOR)
.map(filename -> {
try (InputStream inputStream = transferService.downloadBlob(remoteMetadaTransferPath, filename);) {
IndexInput indexInput = new ByteArrayIndexInput("metadata file", inputStream.readAllBytes());
return new TranslogTransferMetadata(indexInput);
} catch (IOException e) {
logger.error(() -> new ParameterizedMessage("Exception while reading metadata file: {}", filename), e);
return null;
}
})
.orElse(null);
}

private TransferFileSnapshot prepareMetadata(TransferSnapshot transferSnapshot) throws IOException {
Map<String, String> generationPrimaryTermMap = transferSnapshot.getTranslogFileSnapshots().stream().map(s -> {
assert s instanceof TranslogFileSnapshot;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,16 @@

import org.apache.lucene.codecs.CodecUtil;
import org.apache.lucene.store.DataOutput;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.OutputStreamIndexOutput;
import org.apache.lucene.util.SetOnce;
import org.opensearch.common.bytes.BytesReference;
import org.opensearch.common.io.stream.BytesStreamOutput;

import java.io.IOException;
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;

Expand All @@ -37,7 +40,7 @@ public class TranslogTransferMetadata {

private final long timeStamp;

private final int count;
private int count;

private final SetOnce<Map<String, String>> generationToPrimaryTermMapper = new SetOnce<>();

Expand All @@ -49,6 +52,8 @@ public class TranslogTransferMetadata {

private static final String METADATA_CODEC = "md";

public static final Comparator<String> METADATA_FILENAME_COMPARATOR = new MetadataFilenameComparator();

public TranslogTransferMetadata(long primaryTerm, long generation, long minTranslogGeneration, int count) {
this.primaryTerm = primaryTerm;
this.generation = generation;
Expand All @@ -57,6 +62,16 @@ public TranslogTransferMetadata(long primaryTerm, long generation, long minTrans
this.count = count;
}

public TranslogTransferMetadata(IndexInput indexInput) throws IOException {
CodecUtil.checksumEntireFile(indexInput);
CodecUtil.checkHeader(indexInput, METADATA_CODEC, CURRENT_VERSION, CURRENT_VERSION);
this.primaryTerm = indexInput.readLong();
this.generation = indexInput.readLong();
this.minTranslogGeneration = indexInput.readLong();
this.timeStamp = indexInput.readLong();
this.generationToPrimaryTermMapper.set(indexInput.readMapOfStrings());
}

public long getPrimaryTerm() {
return primaryTerm;
}
Expand All @@ -77,6 +92,10 @@ public void setGenerationToPrimaryTermMapper(Map<String, String> generationToPri
generationToPrimaryTermMapper.set(generationToPrimaryTermMap);
}

public Map<String, String> getGenerationToPrimaryTermMapper() {
return generationToPrimaryTermMapper.get();
}

public String getFileName() {
return String.join(
METADATA_SEPARATOR,
Expand Down Expand Up @@ -122,6 +141,29 @@ private void write(DataOutput out) throws IOException {
out.writeLong(generation);
out.writeLong(minTranslogGeneration);
out.writeLong(timeStamp);
out.writeMapOfStrings(generationToPrimaryTermMapper.get());
if (generationToPrimaryTermMapper.get() != null) {
out.writeMapOfStrings(generationToPrimaryTermMapper.get());
} else {
out.writeMapOfStrings(new HashMap<>());
}
}

private static class MetadataFilenameComparator implements Comparator<String> {
@Override
public int compare(String first, String second) {
// Format of metadata filename is <Primary Term>__<Generation>__<Timestamp>
String[] filenameTokens1 = first.split(METADATA_SEPARATOR);
String[] filenameTokens2 = second.split(METADATA_SEPARATOR);
// Here, we are not comparing only primary term and generation.
// Timestamp is not a good measure of comparison in case primary term and generation are same.
for (int i = 0; i < filenameTokens1.length - 1; i++) {
if (filenameTokens1[i].equals(filenameTokens2[i]) == false) {
return Long.compare(Long.parseLong(filenameTokens1[i]), Long.parseLong(filenameTokens2[i]));
}
}
throw new IllegalArgumentException(
"TranslogTransferMetadata files " + first + " and " + second + " have same primary term and generation"
);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2812,7 +2812,7 @@ public void testRestoreShardFromRemoteStore(boolean performFlush) throws IOExcep
DiscoveryNode localNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT);
target.markAsRecovering("remote_store", new RecoveryState(routing, localNode, null));
final PlainActionFuture<Boolean> future = PlainActionFuture.newFuture();
target.restoreFromRemoteStore(future);
target.restoreFromRemoteStore(null, future);
target.remoteStore().decRef();

assertTrue(future.actionGet());
Expand Down
Loading