Skip to content

Commit

Permalink
[Remote Translog] Add support for downloading files from remote trans…
Browse files Browse the repository at this point in the history
…log (#5649)

* Add support to download translog from remote store during recovery

Signed-off-by: Sachin Kale <kalsac@amazon.com>
  • Loading branch information
sachinpkale authored Jan 4, 2023
1 parent be64e82 commit 28e9b11
Show file tree
Hide file tree
Showing 8 changed files with 370 additions and 23 deletions.
13 changes: 10 additions & 3 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -2319,10 +2319,10 @@ public void recoverFromStore(ActionListener<Boolean> listener) {
storeRecovery.recoverFromStore(this, listener);
}

public void restoreFromRemoteStore(ActionListener<Boolean> listener) {
public void restoreFromRemoteStore(Repository repository, ActionListener<Boolean> listener) {
assert shardRouting.primary() : "recover from store only makes sense if the shard is a primary shard";
StoreRecovery storeRecovery = new StoreRecovery(shardId, logger);
storeRecovery.recoverFromRemoteStore(this, listener);
storeRecovery.recoverFromRemoteStore(this, repository, listener);
}

public void restoreFromRepository(Repository repository, ActionListener<Boolean> listener) {
Expand Down Expand Up @@ -3098,7 +3098,14 @@ public void startRecovery(
executeRecovery("from store", recoveryState, recoveryListener, this::recoverFromStore);
break;
case REMOTE_STORE:
executeRecovery("from remote store", recoveryState, recoveryListener, this::restoreFromRemoteStore);
final Repository remoteTranslogRepo;
final String remoteTranslogRepoName = indexSettings.getRemoteStoreTranslogRepository();
if (remoteTranslogRepoName != null) {
remoteTranslogRepo = repositoriesService.repository(remoteTranslogRepoName);
} else {
remoteTranslogRepo = null;
}
executeRecovery("from remote store", recoveryState, recoveryListener, l -> restoreFromRemoteStore(remoteTranslogRepo, l));
break;
case PEER:
try {
Expand Down
33 changes: 27 additions & 6 deletions server/src/main/java/org/opensearch/index/shard/StoreRecovery.java
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,16 @@
import org.opensearch.index.seqno.SequenceNumbers;
import org.opensearch.index.snapshots.IndexShardRestoreFailedException;
import org.opensearch.index.store.Store;
import org.opensearch.index.translog.RemoteFsTranslog;
import org.opensearch.index.translog.Translog;
import org.opensearch.index.translog.transfer.FileTransferTracker;
import org.opensearch.index.translog.transfer.TranslogTransferManager;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.indices.replication.common.ReplicationLuceneIndex;
import org.opensearch.repositories.IndexId;
import org.opensearch.repositories.Repository;
import org.opensearch.repositories.blobstore.BlobStoreRepository;
import org.opensearch.threadpool.ThreadPool;

import java.io.IOException;
import java.util.Arrays;
Expand Down Expand Up @@ -114,13 +119,13 @@ void recoverFromStore(final IndexShard indexShard, ActionListener<Boolean> liste
}
}

void recoverFromRemoteStore(final IndexShard indexShard, ActionListener<Boolean> listener) {
void recoverFromRemoteStore(final IndexShard indexShard, Repository repository, ActionListener<Boolean> listener) {
if (canRecover(indexShard)) {
RecoverySource.Type recoveryType = indexShard.recoveryState().getRecoverySource().getType();
assert recoveryType == RecoverySource.Type.REMOTE_STORE : "expected remote store recovery type but was: " + recoveryType;
ActionListener.completeWith(recoveryListener(indexShard, listener), () -> {
logger.debug("starting recovery from remote store ...");
recoverFromRemoteStore(indexShard);
recoverFromRemoteStore(indexShard, repository);
return true;
});
} else {
Expand Down Expand Up @@ -435,7 +440,7 @@ private ActionListener<Boolean> recoveryListener(IndexShard indexShard, ActionLi
});
}

private void recoverFromRemoteStore(IndexShard indexShard) throws IndexShardRecoveryException {
private void recoverFromRemoteStore(IndexShard indexShard, Repository repository) throws IndexShardRecoveryException {
final Store remoteStore = indexShard.remoteStore();
if (remoteStore == null) {
throw new IndexShardRecoveryException(
Expand All @@ -453,9 +458,12 @@ private void recoverFromRemoteStore(IndexShard indexShard) throws IndexShardReco
// Download segments from remote segment store
indexShard.syncSegmentsFromRemoteSegmentStore(true);

// This creates empty trans-log for now
// ToDo: Add code to restore from remote trans-log
bootstrap(indexShard, store);
if (repository != null) {
syncTranslogFilesFromRemoteTranslog(indexShard, repository);
} else {
bootstrap(indexShard, store);
}

assert indexShard.shardRouting.primary() : "only primary shards can recover from store";
indexShard.recoveryState().getIndex().setFileDetailsComplete();
indexShard.openEngineAndRecoverFromTranslog();
Expand All @@ -470,6 +478,19 @@ private void recoverFromRemoteStore(IndexShard indexShard) throws IndexShardReco
}
}

private void syncTranslogFilesFromRemoteTranslog(IndexShard indexShard, Repository repository) throws IOException {
assert repository instanceof BlobStoreRepository : "repository should be instance of BlobStoreRepository";
BlobStoreRepository blobStoreRepository = (BlobStoreRepository) repository;
FileTransferTracker fileTransferTracker = new FileTransferTracker(shardId);
TranslogTransferManager translogTransferManager = RemoteFsTranslog.buildTranslogTransferManager(
blobStoreRepository,
indexShard.getThreadPool().executor(ThreadPool.Names.TRANSLOG_TRANSFER),
shardId,
fileTransferTracker
);
RemoteFsTranslog.download(translogTransferManager, indexShard.shardPath().resolveTranslog());
}

/**
* Recovers the state of the shard from the store.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,25 @@

package org.opensearch.index.translog;

import org.opensearch.common.io.FileSystemUtils;
import org.opensearch.common.lease.Releasable;
import org.opensearch.common.lease.Releasables;
import org.opensearch.common.util.concurrent.ReleasableLock;
import org.opensearch.core.internal.io.IOUtils;
import org.opensearch.index.shard.ShardId;
import org.opensearch.index.translog.transfer.BlobStoreTransferService;
import org.opensearch.index.translog.transfer.FileTransferTracker;
import org.opensearch.index.translog.transfer.TransferSnapshot;
import org.opensearch.index.translog.transfer.TranslogCheckpointTransferSnapshot;
import org.opensearch.index.translog.transfer.TranslogTransferManager;
import org.opensearch.index.translog.transfer.TranslogTransferMetadata;
import org.opensearch.index.translog.transfer.listener.TranslogTransferListener;
import org.opensearch.repositories.blobstore.BlobStoreRepository;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.function.LongConsumer;
Expand Down Expand Up @@ -54,15 +60,11 @@ public RemoteFsTranslog(
super(config, translogUUID, deletionPolicy, globalCheckpointSupplier, primaryTermSupplier, persistedSequenceNumberConsumer);
this.blobStoreRepository = blobStoreRepository;
fileTransferTracker = new FileTransferTracker(shardId);
this.translogTransferManager = new TranslogTransferManager(
new BlobStoreTransferService(blobStoreRepository.blobStore(), executorService),
blobStoreRepository.basePath().add(shardId.getIndex().getUUID()).add(String.valueOf(shardId.id())),
fileTransferTracker,
fileTransferTracker::exclusionFilter
);
this.translogTransferManager = buildTranslogTransferManager(blobStoreRepository, executorService, shardId, fileTransferTracker);

try {
final Checkpoint checkpoint = readCheckpoint(location);
download(translogTransferManager, location);
Checkpoint checkpoint = readCheckpoint(location);
this.readers.addAll(recoverFromFiles(checkpoint));
if (readers.isEmpty()) {
throw new IllegalStateException("at least one reader must be recovered");
Expand Down Expand Up @@ -94,6 +96,45 @@ public RemoteFsTranslog(
}
}

public static void download(TranslogTransferManager translogTransferManager, Path location) throws IOException {

TranslogTransferMetadata translogMetadata = translogTransferManager.readMetadata();
if (translogMetadata != null) {
if (Files.notExists(location)) {
Files.createDirectories(location);
}
// Delete translog files on local before downloading from remote
for (Path file : FileSystemUtils.files(location)) {
Files.delete(file);
}
Map<String, String> generationToPrimaryTermMapper = translogMetadata.getGenerationToPrimaryTermMapper();
for (long i = translogMetadata.getGeneration(); i >= translogMetadata.getMinTranslogGeneration(); i--) {
String generation = Long.toString(i);
translogTransferManager.downloadTranslog(generationToPrimaryTermMapper.get(generation), generation, location);
}
// We copy the latest generation .ckp file to translog.ckp so that flows that depend on
// existence of translog.ckp file work in the same way
Files.copy(
location.resolve(Translog.getCommitCheckpointFileName(translogMetadata.getGeneration())),
location.resolve(Translog.CHECKPOINT_FILE_NAME)
);
}
}

public static TranslogTransferManager buildTranslogTransferManager(
BlobStoreRepository blobStoreRepository,
ExecutorService executorService,
ShardId shardId,
FileTransferTracker fileTransferTracker
) {
return new TranslogTransferManager(
new BlobStoreTransferService(blobStoreRepository.blobStore(), executorService),
blobStoreRepository.basePath().add(shardId.getIndex().getUUID()).add(String.valueOf(shardId.id())),
fileTransferTracker,
fileTransferTracker::exclusionFilter
);
}

@Override
public boolean ensureSynced(Location location) throws IOException {
try (ReleasableLock ignored = writeLock.acquire()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,19 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.store.IndexInput;
import org.opensearch.action.ActionListener;
import org.opensearch.action.LatchedActionListener;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.lucene.store.ByteArrayIndexInput;
import org.opensearch.index.translog.Translog;
import org.opensearch.index.translog.transfer.listener.FileTransferListener;
import org.opensearch.index.translog.transfer.listener.TranslogTransferListener;

import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -128,6 +134,50 @@ public boolean transferSnapshot(TransferSnapshot transferSnapshot, TranslogTrans
}
}

public boolean downloadTranslog(String primaryTerm, String generation, Path location) throws IOException {
logger.info(
"Downloading translog files with: Primary Term = {}, Generation = {}, Location = {}",
primaryTerm,
generation,
location
);
// Download Checkpoint file from remote to local FS
String ckpFileName = Translog.getCommitCheckpointFileName(Long.parseLong(generation));
downloadToFS(ckpFileName, location, primaryTerm);
// Download translog file from remote to local FS
String translogFilename = Translog.getFilename(Long.parseLong(generation));
downloadToFS(translogFilename, location, primaryTerm);
return true;
}

private void downloadToFS(String fileName, Path location, String primaryTerm) throws IOException {
Path filePath = location.resolve(fileName);
// Here, we always override the existing file if present.
// We need to change this logic when we introduce incremental download
if (Files.exists(filePath)) {
Files.delete(filePath);
}
try (InputStream inputStream = transferService.downloadBlob(remoteBaseTransferPath.add(primaryTerm), fileName)) {
Files.copy(inputStream, filePath);
}
}

public TranslogTransferMetadata readMetadata() throws IOException {
return transferService.listAll(remoteMetadaTransferPath)
.stream()
.max(TranslogTransferMetadata.METADATA_FILENAME_COMPARATOR)
.map(filename -> {
try (InputStream inputStream = transferService.downloadBlob(remoteMetadaTransferPath, filename);) {
IndexInput indexInput = new ByteArrayIndexInput("metadata file", inputStream.readAllBytes());
return new TranslogTransferMetadata(indexInput);
} catch (IOException e) {
logger.error(() -> new ParameterizedMessage("Exception while reading metadata file: {}", filename), e);
return null;
}
})
.orElse(null);
}

private TransferFileSnapshot prepareMetadata(TransferSnapshot transferSnapshot) throws IOException {
Map<String, String> generationPrimaryTermMap = transferSnapshot.getTranslogFileSnapshots().stream().map(s -> {
assert s instanceof TranslogFileSnapshot;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,16 @@

import org.apache.lucene.codecs.CodecUtil;
import org.apache.lucene.store.DataOutput;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.OutputStreamIndexOutput;
import org.apache.lucene.util.SetOnce;
import org.opensearch.common.bytes.BytesReference;
import org.opensearch.common.io.stream.BytesStreamOutput;

import java.io.IOException;
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;

Expand All @@ -37,7 +40,7 @@ public class TranslogTransferMetadata {

private final long timeStamp;

private final int count;
private int count;

private final SetOnce<Map<String, String>> generationToPrimaryTermMapper = new SetOnce<>();

Expand All @@ -49,6 +52,8 @@ public class TranslogTransferMetadata {

private static final String METADATA_CODEC = "md";

public static final Comparator<String> METADATA_FILENAME_COMPARATOR = new MetadataFilenameComparator();

public TranslogTransferMetadata(long primaryTerm, long generation, long minTranslogGeneration, int count) {
this.primaryTerm = primaryTerm;
this.generation = generation;
Expand All @@ -57,6 +62,16 @@ public TranslogTransferMetadata(long primaryTerm, long generation, long minTrans
this.count = count;
}

public TranslogTransferMetadata(IndexInput indexInput) throws IOException {
CodecUtil.checksumEntireFile(indexInput);
CodecUtil.checkHeader(indexInput, METADATA_CODEC, CURRENT_VERSION, CURRENT_VERSION);
this.primaryTerm = indexInput.readLong();
this.generation = indexInput.readLong();
this.minTranslogGeneration = indexInput.readLong();
this.timeStamp = indexInput.readLong();
this.generationToPrimaryTermMapper.set(indexInput.readMapOfStrings());
}

public long getPrimaryTerm() {
return primaryTerm;
}
Expand All @@ -77,6 +92,10 @@ public void setGenerationToPrimaryTermMapper(Map<String, String> generationToPri
generationToPrimaryTermMapper.set(generationToPrimaryTermMap);
}

public Map<String, String> getGenerationToPrimaryTermMapper() {
return generationToPrimaryTermMapper.get();
}

public String getFileName() {
return String.join(
METADATA_SEPARATOR,
Expand Down Expand Up @@ -122,6 +141,29 @@ private void write(DataOutput out) throws IOException {
out.writeLong(generation);
out.writeLong(minTranslogGeneration);
out.writeLong(timeStamp);
out.writeMapOfStrings(generationToPrimaryTermMapper.get());
if (generationToPrimaryTermMapper.get() != null) {
out.writeMapOfStrings(generationToPrimaryTermMapper.get());
} else {
out.writeMapOfStrings(new HashMap<>());
}
}

private static class MetadataFilenameComparator implements Comparator<String> {
@Override
public int compare(String first, String second) {
// Format of metadata filename is <Primary Term>__<Generation>__<Timestamp>
String[] filenameTokens1 = first.split(METADATA_SEPARATOR);
String[] filenameTokens2 = second.split(METADATA_SEPARATOR);
// Here, we are not comparing only primary term and generation.
// Timestamp is not a good measure of comparison in case primary term and generation are same.
for (int i = 0; i < filenameTokens1.length - 1; i++) {
if (filenameTokens1[i].equals(filenameTokens2[i]) == false) {
return Long.compare(Long.parseLong(filenameTokens1[i]), Long.parseLong(filenameTokens2[i]));
}
}
throw new IllegalArgumentException(
"TranslogTransferMetadata files " + first + " and " + second + " have same primary term and generation"
);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2812,7 +2812,7 @@ public void testRestoreShardFromRemoteStore(boolean performFlush) throws IOExcep
DiscoveryNode localNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT);
target.markAsRecovering("remote_store", new RecoveryState(routing, localNode, null));
final PlainActionFuture<Boolean> future = PlainActionFuture.newFuture();
target.restoreFromRemoteStore(future);
target.restoreFromRemoteStore(null, future);
target.remoteStore().decRef();

assertTrue(future.actionGet());
Expand Down
Loading

0 comments on commit 28e9b11

Please sign in to comment.