-
Notifications
You must be signed in to change notification settings - Fork 1.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Remote Translog] Add support for downloading files from remote translog #5649
Changes from all commits
3d8d0e8
c7681bb
075d500
7d4e454
5c4970b
385e859
4c633d4
f9316a8
1be30a6
7a9c32e
032b022
d2b2611
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -60,11 +60,16 @@ | |
import org.opensearch.index.seqno.SequenceNumbers; | ||
import org.opensearch.index.snapshots.IndexShardRestoreFailedException; | ||
import org.opensearch.index.store.Store; | ||
import org.opensearch.index.translog.RemoteFsTranslog; | ||
import org.opensearch.index.translog.Translog; | ||
import org.opensearch.index.translog.transfer.FileTransferTracker; | ||
import org.opensearch.index.translog.transfer.TranslogTransferManager; | ||
import org.opensearch.indices.recovery.RecoveryState; | ||
import org.opensearch.indices.replication.common.ReplicationLuceneIndex; | ||
import org.opensearch.repositories.IndexId; | ||
import org.opensearch.repositories.Repository; | ||
import org.opensearch.repositories.blobstore.BlobStoreRepository; | ||
import org.opensearch.threadpool.ThreadPool; | ||
|
||
import java.io.IOException; | ||
import java.util.Arrays; | ||
|
@@ -114,13 +119,13 @@ void recoverFromStore(final IndexShard indexShard, ActionListener<Boolean> liste | |
} | ||
} | ||
|
||
void recoverFromRemoteStore(final IndexShard indexShard, ActionListener<Boolean> listener) { | ||
void recoverFromRemoteStore(final IndexShard indexShard, Repository repository, ActionListener<Boolean> listener) { | ||
if (canRecover(indexShard)) { | ||
RecoverySource.Type recoveryType = indexShard.recoveryState().getRecoverySource().getType(); | ||
assert recoveryType == RecoverySource.Type.REMOTE_STORE : "expected remote store recovery type but was: " + recoveryType; | ||
ActionListener.completeWith(recoveryListener(indexShard, listener), () -> { | ||
logger.debug("starting recovery from remote store ..."); | ||
recoverFromRemoteStore(indexShard); | ||
recoverFromRemoteStore(indexShard, repository); | ||
return true; | ||
}); | ||
} else { | ||
|
@@ -435,7 +440,7 @@ private ActionListener<Boolean> recoveryListener(IndexShard indexShard, ActionLi | |
}); | ||
} | ||
|
||
private void recoverFromRemoteStore(IndexShard indexShard) throws IndexShardRecoveryException { | ||
private void recoverFromRemoteStore(IndexShard indexShard, Repository repository) throws IndexShardRecoveryException { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
final Store remoteStore = indexShard.remoteStore(); | ||
if (remoteStore == null) { | ||
throw new IndexShardRecoveryException( | ||
|
@@ -453,9 +458,12 @@ private void recoverFromRemoteStore(IndexShard indexShard) throws IndexShardReco | |
// Download segments from remote segment store | ||
indexShard.syncSegmentsFromRemoteSegmentStore(true); | ||
|
||
// This creates empty trans-log for now | ||
// ToDo: Add code to restore from remote trans-log | ||
bootstrap(indexShard, store); | ||
if (repository != null) { | ||
syncTranslogFilesFromRemoteTranslog(indexShard, repository); | ||
} else { | ||
bootstrap(indexShard, store); | ||
} | ||
|
||
assert indexShard.shardRouting.primary() : "only primary shards can recover from store"; | ||
indexShard.recoveryState().getIndex().setFileDetailsComplete(); | ||
indexShard.openEngineAndRecoverFromTranslog(); | ||
|
@@ -470,6 +478,19 @@ private void recoverFromRemoteStore(IndexShard indexShard) throws IndexShardReco | |
} | ||
} | ||
|
||
private void syncTranslogFilesFromRemoteTranslog(IndexShard indexShard, Repository repository) throws IOException { | ||
assert repository instanceof BlobStoreRepository : "repository should be instance of BlobStoreRepository"; | ||
BlobStoreRepository blobStoreRepository = (BlobStoreRepository) repository; | ||
FileTransferTracker fileTransferTracker = new FileTransferTracker(shardId); | ||
TranslogTransferManager translogTransferManager = RemoteFsTranslog.buildTranslogTransferManager( | ||
blobStoreRepository, | ||
indexShard.getThreadPool().executor(ThreadPool.Names.TRANSLOG_TRANSFER), | ||
shardId, | ||
fileTransferTracker | ||
); | ||
RemoteFsTranslog.download(translogTransferManager, indexShard.shardPath().resolveTranslog()); | ||
} | ||
|
||
Comment on lines
+481
to
+493
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There seems to be a problem here
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Yes, agree. This was required as
The There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Created a tracking issue: #5679 |
||
/** | ||
* Recovers the state of the shard from the store. | ||
*/ | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -8,19 +8,25 @@ | |
|
||
package org.opensearch.index.translog; | ||
|
||
import org.opensearch.common.io.FileSystemUtils; | ||
import org.opensearch.common.lease.Releasable; | ||
import org.opensearch.common.lease.Releasables; | ||
import org.opensearch.common.util.concurrent.ReleasableLock; | ||
import org.opensearch.core.internal.io.IOUtils; | ||
import org.opensearch.index.shard.ShardId; | ||
import org.opensearch.index.translog.transfer.BlobStoreTransferService; | ||
import org.opensearch.index.translog.transfer.FileTransferTracker; | ||
import org.opensearch.index.translog.transfer.TransferSnapshot; | ||
import org.opensearch.index.translog.transfer.TranslogCheckpointTransferSnapshot; | ||
import org.opensearch.index.translog.transfer.TranslogTransferManager; | ||
import org.opensearch.index.translog.transfer.TranslogTransferMetadata; | ||
import org.opensearch.index.translog.transfer.listener.TranslogTransferListener; | ||
import org.opensearch.repositories.blobstore.BlobStoreRepository; | ||
|
||
import java.io.IOException; | ||
import java.nio.file.Files; | ||
import java.nio.file.Path; | ||
import java.util.Map; | ||
import java.util.Set; | ||
import java.util.concurrent.ExecutorService; | ||
import java.util.function.LongConsumer; | ||
|
@@ -54,15 +60,11 @@ public RemoteFsTranslog( | |
super(config, translogUUID, deletionPolicy, globalCheckpointSupplier, primaryTermSupplier, persistedSequenceNumberConsumer); | ||
this.blobStoreRepository = blobStoreRepository; | ||
fileTransferTracker = new FileTransferTracker(shardId); | ||
this.translogTransferManager = new TranslogTransferManager( | ||
new BlobStoreTransferService(blobStoreRepository.blobStore(), executorService), | ||
blobStoreRepository.basePath().add(shardId.getIndex().getUUID()).add(String.valueOf(shardId.id())), | ||
fileTransferTracker, | ||
fileTransferTracker::exclusionFilter | ||
); | ||
this.translogTransferManager = buildTranslogTransferManager(blobStoreRepository, executorService, shardId, fileTransferTracker); | ||
|
||
try { | ||
final Checkpoint checkpoint = readCheckpoint(location); | ||
download(translogTransferManager, location); | ||
Checkpoint checkpoint = readCheckpoint(location); | ||
this.readers.addAll(recoverFromFiles(checkpoint)); | ||
if (readers.isEmpty()) { | ||
throw new IllegalStateException("at least one reader must be recovered"); | ||
|
@@ -94,6 +96,45 @@ public RemoteFsTranslog( | |
} | ||
} | ||
|
||
public static void download(TranslogTransferManager translogTransferManager, Path location) throws IOException { | ||
|
||
TranslogTransferMetadata translogMetadata = translogTransferManager.readMetadata(); | ||
if (translogMetadata != null) { | ||
Comment on lines
+101
to
+102
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Minor, but if you return an Optional from readMetadata() this all becomes a bit more explicit with
and you don't need the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. With this, we will not be able to throw checked exception from |
||
if (Files.notExists(location)) { | ||
Files.createDirectories(location); | ||
} | ||
// Delete translog files on local before downloading from remote | ||
for (Path file : FileSystemUtils.files(location)) { | ||
Files.delete(file); | ||
} | ||
Map<String, String> generationToPrimaryTermMapper = translogMetadata.getGenerationToPrimaryTermMapper(); | ||
for (long i = translogMetadata.getGeneration(); i >= translogMetadata.getMinTranslogGeneration(); i--) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. if segments upload is lagging, then just downloading the most recent translog might not be enough. Can we create a tracking issue? If it exists already, pls do share. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Already tracking it here: #3754 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. More details are present in #5567 . |
||
String generation = Long.toString(i); | ||
translogTransferManager.downloadTranslog(generationToPrimaryTermMapper.get(generation), generation, location); | ||
} | ||
// We copy the latest generation .ckp file to translog.ckp so that flows that depend on | ||
// existence of translog.ckp file work in the same way | ||
Files.copy( | ||
location.resolve(Translog.getCommitCheckpointFileName(translogMetadata.getGeneration())), | ||
location.resolve(Translog.CHECKPOINT_FILE_NAME) | ||
); | ||
} | ||
} | ||
|
||
public static TranslogTransferManager buildTranslogTransferManager( | ||
BlobStoreRepository blobStoreRepository, | ||
ExecutorService executorService, | ||
ShardId shardId, | ||
FileTransferTracker fileTransferTracker | ||
) { | ||
return new TranslogTransferManager( | ||
new BlobStoreTransferService(blobStoreRepository.blobStore(), executorService), | ||
blobStoreRepository.basePath().add(shardId.getIndex().getUUID()).add(String.valueOf(shardId.id())), | ||
fileTransferTracker, | ||
fileTransferTracker::exclusionFilter | ||
); | ||
} | ||
|
||
@Override | ||
public boolean ensureSynced(Location location) throws IOException { | ||
try (ReleasableLock ignored = writeLock.acquire()) { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we could set
remoteTranslogRepo
as null here and then get rid of else block?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it possible to push down some of this logic to
restoreFromRemoteStore
method?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remoteTranslogRepo
needs to be final as it is provided as argument to a lambda, so we can't initialize it twice.restoreFromRemoteStore
does not have reference torepositoriesService
to fetch the repository.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we please use
Optional
instead?