From f6a86db7aa4641171996ffae4b9d5d40334593e3 Mon Sep 17 00:00:00 2001 From: Gaurav Bafna <85113518+gbbafna@users.noreply.github.com> Date: Mon, 9 Jan 2023 12:16:50 +0530 Subject: [PATCH] [Remote Translog] Introduce remote translog transfer support (#4480) (#5693) * [Backport 2.x] Introduce remote translog transfer support Signed-off-by: Bukhtawar Khan --- CHANGELOG.md | 1 + .../translog/BufferedChecksumStreamInput.java | 7 +- .../opensearch/index/translog/Checkpoint.java | 10 +- .../index/translog/TranslogReader.java | 2 +- .../transfer/BlobStoreTransferService.java | 71 ++++++ .../index/translog/transfer/FileSnapshot.java | 223 ++++++++++++++++++ .../transfer/FileTransferException.java | 30 +++ .../translog/transfer/TransferService.java | 43 ++++ .../translog/transfer/TransferSnapshot.java | 42 ++++ .../TranslogCheckpointTransferSnapshot.java | 148 ++++++++++++ .../transfer/TranslogTransferManager.java | 147 ++++++++++++ .../transfer/TranslogTransferMetadata.java | 127 ++++++++++ .../listener/FileTransferListener.java | 32 +++ .../listener/TranslogTransferListener.java | 36 +++ .../transfer/listener/package-info.java | 10 + .../index/translog/transfer/package-info.java | 10 + .../BlobStoreTransferServiceTests.java | 123 ++++++++++ .../TranslogTransferManagerTests.java | 150 ++++++++++++ 18 files changed, 1209 insertions(+), 3 deletions(-) create mode 100644 server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java create mode 100644 server/src/main/java/org/opensearch/index/translog/transfer/FileSnapshot.java create mode 100644 server/src/main/java/org/opensearch/index/translog/transfer/FileTransferException.java create mode 100644 server/src/main/java/org/opensearch/index/translog/transfer/TransferService.java create mode 100644 server/src/main/java/org/opensearch/index/translog/transfer/TransferSnapshot.java create mode 100644 server/src/main/java/org/opensearch/index/translog/transfer/TranslogCheckpointTransferSnapshot.java create mode 100644 server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java create mode 100644 server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferMetadata.java create mode 100644 server/src/main/java/org/opensearch/index/translog/transfer/listener/FileTransferListener.java create mode 100644 server/src/main/java/org/opensearch/index/translog/transfer/listener/TranslogTransferListener.java create mode 100644 server/src/main/java/org/opensearch/index/translog/transfer/listener/package-info.java create mode 100644 server/src/main/java/org/opensearch/index/translog/transfer/package-info.java create mode 100644 server/src/test/java/org/opensearch/index/translog/transfer/BlobStoreTransferServiceTests.java create mode 100644 server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java diff --git a/CHANGELOG.md b/CHANGELOG.md index 9d6f439b02998..a4a777191a35d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -42,6 +42,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), ### Changed - Change http code for DecommissioningFailedException from 500 to 400 ([#5283](https://github.com/opensearch-project/OpenSearch/pull/5283)) - Pre conditions check before updating weighted routing metadata ([#4955](https://github.com/opensearch-project/OpenSearch/pull/4955)) +- Support remote translog transfer for request level durability([#4480](https://github.com/opensearch-project/OpenSearch/pull/4480)) ### Deprecated - Refactor fuzziness interface on query builders ([#5433](https://github.com/opensearch-project/OpenSearch/pull/5433)) diff --git a/server/src/main/java/org/opensearch/index/translog/BufferedChecksumStreamInput.java b/server/src/main/java/org/opensearch/index/translog/BufferedChecksumStreamInput.java index 5feb994171b65..f299da0c1ac1e 100644 --- a/server/src/main/java/org/opensearch/index/translog/BufferedChecksumStreamInput.java +++ b/server/src/main/java/org/opensearch/index/translog/BufferedChecksumStreamInput.java @@ -36,6 +36,7 @@ import org.opensearch.common.io.stream.FilterStreamInput; import org.opensearch.common.io.stream.StreamInput; +import java.io.EOFException; import java.io.IOException; import java.util.zip.CRC32; import java.util.zip.Checksum; @@ -117,7 +118,11 @@ public void reset() throws IOException { @Override public int read() throws IOException { - return readByte() & 0xFF; + try { + return readByte() & 0xFF; + } catch (EOFException e) { + return -1; + } } @Override diff --git a/server/src/main/java/org/opensearch/index/translog/Checkpoint.java b/server/src/main/java/org/opensearch/index/translog/Checkpoint.java index ade28791b2e27..8df574ed8374f 100644 --- a/server/src/main/java/org/opensearch/index/translog/Checkpoint.java +++ b/server/src/main/java/org/opensearch/index/translog/Checkpoint.java @@ -59,7 +59,7 @@ * * @opensearch.internal */ -final class Checkpoint { +final public class Checkpoint { final long offset; final int numOps; @@ -262,6 +262,14 @@ public synchronized byte[] toByteArray() { return byteOutputStream.toByteArray(); } + public long getMinTranslogGeneration() { + return minTranslogGeneration; + } + + public long getGeneration() { + return generation; + } + @Override public boolean equals(Object o) { if (this == o) return true; diff --git a/server/src/main/java/org/opensearch/index/translog/TranslogReader.java b/server/src/main/java/org/opensearch/index/translog/TranslogReader.java index 9d22fe0a498eb..205229949da77 100644 --- a/server/src/main/java/org/opensearch/index/translog/TranslogReader.java +++ b/server/src/main/java/org/opensearch/index/translog/TranslogReader.java @@ -138,7 +138,7 @@ public int totalOperations() { } @Override - final Checkpoint getCheckpoint() { + final public Checkpoint getCheckpoint() { return checkpoint; } diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java b/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java new file mode 100644 index 0000000000000..36d9d71217837 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java @@ -0,0 +1,71 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.translog.transfer; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.opensearch.action.ActionListener; +import org.opensearch.action.ActionRunnable; +import org.opensearch.common.blobstore.BlobPath; +import org.opensearch.common.blobstore.BlobStore; +import org.opensearch.index.translog.transfer.FileSnapshot.TransferFileSnapshot; + +import java.io.IOException; +import java.io.InputStream; +import java.util.concurrent.ExecutorService; + +/** + * Service that handles remote transfer of translog and checkpoint files + * + * @opensearch.internal + */ +public class BlobStoreTransferService implements TransferService { + + private final BlobStore blobStore; + private final ExecutorService executorService; + + private static final Logger logger = LogManager.getLogger(BlobStoreTransferService.class); + + public BlobStoreTransferService(BlobStore blobStore, ExecutorService executorService) { + this.blobStore = blobStore; + this.executorService = executorService; + } + + @Override + public void uploadBlobAsync( + final TransferFileSnapshot fileSnapshot, + Iterable remoteTransferPath, + ActionListener listener + ) { + assert remoteTransferPath instanceof BlobPath; + BlobPath blobPath = (BlobPath) remoteTransferPath; + executorService.execute(ActionRunnable.wrap(listener, l -> { + try (InputStream inputStream = fileSnapshot.inputStream()) { + blobStore.blobContainer(blobPath) + .writeBlobAtomic(fileSnapshot.getName(), inputStream, fileSnapshot.getContentLength(), true); + l.onResponse(fileSnapshot); + } catch (Exception e) { + logger.error(() -> new ParameterizedMessage("Failed to upload blob {}", fileSnapshot.getName()), e); + l.onFailure(new FileTransferException(fileSnapshot, e)); + } + })); + } + + @Override + public void uploadBlob(final TransferFileSnapshot fileSnapshot, Iterable remoteTransferPath) throws IOException { + assert remoteTransferPath instanceof BlobPath; + BlobPath blobPath = (BlobPath) remoteTransferPath; + try (InputStream inputStream = fileSnapshot.inputStream()) { + blobStore.blobContainer(blobPath).writeBlobAtomic(fileSnapshot.getName(), inputStream, fileSnapshot.getContentLength(), true); + } catch (Exception ex) { + throw ex; + } + } +} diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/FileSnapshot.java b/server/src/main/java/org/opensearch/index/translog/transfer/FileSnapshot.java new file mode 100644 index 0000000000000..e8c06e3d251c7 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/translog/transfer/FileSnapshot.java @@ -0,0 +1,223 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.translog.transfer; + +import org.opensearch.common.Nullable; +import org.opensearch.common.io.stream.BytesStreamInput; +import org.opensearch.common.io.stream.InputStreamStreamInput; +import org.opensearch.core.internal.io.IOUtils; +import org.opensearch.index.translog.BufferedChecksumStreamInput; + +import java.io.Closeable; +import java.io.IOException; +import java.io.InputStream; +import java.nio.channels.Channels; +import java.nio.channels.FileChannel; +import java.nio.file.Path; +import java.nio.file.StandardOpenOption; +import java.util.Objects; + +/** + * Snapshot of a single file that gets transferred + * + * @opensearch.internal + */ +public class FileSnapshot implements Closeable { + + private final String name; + @Nullable + private final FileChannel fileChannel; + @Nullable + private Path path; + @Nullable + private byte[] content; + + private FileSnapshot(Path path) throws IOException { + Objects.requireNonNull(path); + this.name = path.getFileName().toString(); + this.path = path; + this.fileChannel = FileChannel.open(path, StandardOpenOption.READ); + } + + private FileSnapshot(String name, byte[] content) { + Objects.requireNonNull(name); + this.name = name; + this.content = content; + this.fileChannel = null; + } + + public Path getPath() { + return path; + } + + public String getName() { + return name; + } + + public long getContentLength() throws IOException { + return fileChannel == null ? content.length : fileChannel.size(); + } + + public InputStream inputStream() throws IOException { + return fileChannel != null + ? new BufferedChecksumStreamInput( + new InputStreamStreamInput(Channels.newInputStream(fileChannel), fileChannel.size()), + path.toString() + ) + : new BufferedChecksumStreamInput(new BytesStreamInput(content), name); + } + + @Override + public int hashCode() { + return Objects.hash(name, content, path); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + FileSnapshot other = (FileSnapshot) o; + return Objects.equals(this.name, other.name) + && Objects.equals(this.content, other.content) + && Objects.equals(this.path, other.path); + } + + @Override + public String toString() { + return new StringBuilder("FileInfo [").append(" name = ") + .append(name) + .append(", path = ") + .append(path.toUri()) + .append("]") + .toString(); + } + + @Override + public void close() throws IOException { + IOUtils.close(fileChannel); + } + + /** + * Snapshot of a single file with primary term that gets transferred + * + * @opensearch.internal + */ + public static class TransferFileSnapshot extends FileSnapshot { + + private final long primaryTerm; + + public TransferFileSnapshot(Path path, long primaryTerm) throws IOException { + super(path); + this.primaryTerm = primaryTerm; + } + + public TransferFileSnapshot(String name, byte[] content, long primaryTerm) throws IOException { + super(name, content); + this.primaryTerm = primaryTerm; + } + + public long getPrimaryTerm() { + return primaryTerm; + } + + @Override + public int hashCode() { + return Objects.hash(primaryTerm, super.hashCode()); + } + + @Override + public boolean equals(Object o) { + if (super.equals(o)) { + if (this == o) return true; + if (getClass() != o.getClass()) return false; + TransferFileSnapshot other = (TransferFileSnapshot) o; + return Objects.equals(this.primaryTerm, other.primaryTerm); + } + return false; + } + } + + /** + * Snapshot of a single .tlg file that gets transferred + * + * @opensearch.internal + */ + public static final class TranslogFileSnapshot extends TransferFileSnapshot { + + private final long generation; + + public TranslogFileSnapshot(long primaryTerm, long generation, Path path) throws IOException { + super(path, primaryTerm); + this.generation = generation; + } + + public long getGeneration() { + return generation; + } + + @Override + public int hashCode() { + return Objects.hash(generation, super.hashCode()); + } + + @Override + public boolean equals(Object o) { + if (super.equals(o)) { + if (this == o) return true; + if (getClass() != o.getClass()) return false; + TranslogFileSnapshot other = (TranslogFileSnapshot) o; + return Objects.equals(this.generation, other.generation); + } + return false; + } + } + + /** + * Snapshot of a single .ckp file that gets transferred + * + * @opensearch.internal + */ + public static final class CheckpointFileSnapshot extends TransferFileSnapshot { + + private final long generation; + + private final long minTranslogGeneration; + + public CheckpointFileSnapshot(long primaryTerm, long generation, long minTranslogGeneration, Path path) throws IOException { + super(path, primaryTerm); + this.minTranslogGeneration = minTranslogGeneration; + this.generation = generation; + } + + public long getGeneration() { + return generation; + } + + public long getMinTranslogGeneration() { + return minTranslogGeneration; + } + + @Override + public int hashCode() { + return Objects.hash(generation, minTranslogGeneration, super.hashCode()); + } + + @Override + public boolean equals(Object o) { + if (super.equals(o)) { + if (this == o) return true; + if (getClass() != o.getClass()) return false; + CheckpointFileSnapshot other = (CheckpointFileSnapshot) o; + return Objects.equals(this.minTranslogGeneration, other.minTranslogGeneration) + && Objects.equals(this.generation, other.generation); + } + return false; + } + } +} diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/FileTransferException.java b/server/src/main/java/org/opensearch/index/translog/transfer/FileTransferException.java new file mode 100644 index 0000000000000..89a4135d2409b --- /dev/null +++ b/server/src/main/java/org/opensearch/index/translog/transfer/FileTransferException.java @@ -0,0 +1,30 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.translog.transfer; + +import org.opensearch.index.translog.transfer.FileSnapshot.TransferFileSnapshot; + +/** + * Exception when a single file transfer encounters a failure + * + * @opensearch.internal + */ +public class FileTransferException extends RuntimeException { + + private final TransferFileSnapshot fileSnapshot; + + public FileTransferException(TransferFileSnapshot fileSnapshot, Throwable cause) { + super(cause); + this.fileSnapshot = fileSnapshot; + } + + public TransferFileSnapshot getFileSnapshot() { + return fileSnapshot; + } +} diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TransferService.java b/server/src/main/java/org/opensearch/index/translog/transfer/TransferService.java new file mode 100644 index 0000000000000..ed6c185352833 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TransferService.java @@ -0,0 +1,43 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.translog.transfer; + +import org.opensearch.action.ActionListener; +import org.opensearch.index.translog.transfer.FileSnapshot.TransferFileSnapshot; + +import java.io.IOException; + +/** + * Interface for the translog transfer service responsible for interacting with a remote store + * + * @opensearch.internal + */ +public interface TransferService { + + /** + * Uploads the {@link TransferFileSnapshot} async, once the upload is complete the callback is invoked + * @param fileSnapshot the file snapshot to upload + * @param remotePath the remote path where upload should be made + * @param listener the callback to be invoked once upload completes successfully/fails + */ + void uploadBlobAsync( + final TransferFileSnapshot fileSnapshot, + Iterable remotePath, + ActionListener listener + ); + + /** + * Uploads the {@link TransferFileSnapshot} blob + * @param fileSnapshot the file snapshot to upload + * @param remotePath the remote path where upload should be made + * @throws IOException the exception while transferring the data + */ + void uploadBlob(final TransferFileSnapshot fileSnapshot, Iterable remotePath) throws IOException; + +} diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TransferSnapshot.java b/server/src/main/java/org/opensearch/index/translog/transfer/TransferSnapshot.java new file mode 100644 index 0000000000000..b4c1c97f04a7d --- /dev/null +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TransferSnapshot.java @@ -0,0 +1,42 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.translog.transfer; + +import org.opensearch.index.translog.transfer.FileSnapshot.TransferFileSnapshot; +import org.opensearch.index.translog.transfer.FileSnapshot.CheckpointFileSnapshot; +import org.opensearch.index.translog.transfer.FileSnapshot.TranslogFileSnapshot; + +import java.util.Set; + +/** + * The snapshot of the generational translog and checkpoint files and it's corresponding metadata that is transferred + * to the {@link TransferService} + * + * @opensearch.internal + */ +public interface TransferSnapshot { + + /** + * The snapshot of the checkpoint generational files + * @return the set of {@link CheckpointFileSnapshot} + */ + Set getCheckpointFileSnapshots(); + + /** + * The snapshot of the translog generational files + * @return the set of {@link TranslogFileSnapshot} + */ + Set getTranslogFileSnapshots(); + + /** + * The translog transfer metadata of this {@link TransferSnapshot} + * @return the translog transfer metadata + */ + TranslogTransferMetadata getTranslogTransferMetadata(); +} diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogCheckpointTransferSnapshot.java b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogCheckpointTransferSnapshot.java new file mode 100644 index 0000000000000..30b81627614b7 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogCheckpointTransferSnapshot.java @@ -0,0 +1,148 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.translog.transfer; + +import org.opensearch.common.collect.Tuple; +import org.opensearch.index.translog.TranslogReader; + +import java.io.IOException; +import java.nio.file.Path; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.LongStream; + +import static org.opensearch.index.translog.transfer.FileSnapshot.TransferFileSnapshot; +import static org.opensearch.index.translog.transfer.FileSnapshot.TranslogFileSnapshot; +import static org.opensearch.index.translog.transfer.FileSnapshot.CheckpointFileSnapshot; + +/** + * Implementation for a {@link TransferSnapshot} which builds the snapshot from the translog and checkpoint files present on the local-disk + * + * @opensearch.internal + */ +public class TranslogCheckpointTransferSnapshot implements TransferSnapshot { + + private final Set> translogCheckpointFileInfoTupleSet; + private final int size; + private final long generation; + private final long primaryTerm; + private long minTranslogGeneration; + + TranslogCheckpointTransferSnapshot(long primaryTerm, long generation, int size) { + translogCheckpointFileInfoTupleSet = new HashSet<>(size); + this.size = size; + this.generation = generation; + this.primaryTerm = primaryTerm; + } + + private void add(TranslogFileSnapshot translogFileSnapshot, CheckpointFileSnapshot checkPointFileSnapshot) { + translogCheckpointFileInfoTupleSet.add(Tuple.tuple(translogFileSnapshot, checkPointFileSnapshot)); + assert translogFileSnapshot.getGeneration() == checkPointFileSnapshot.getGeneration(); + } + + private void setMinTranslogGeneration(long minTranslogGeneration) { + this.minTranslogGeneration = minTranslogGeneration; + } + + @Override + public Set getTranslogFileSnapshots() { + return translogCheckpointFileInfoTupleSet.stream().map(Tuple::v1).collect(Collectors.toSet()); + } + + @Override + public TranslogTransferMetadata getTranslogTransferMetadata() { + return new TranslogTransferMetadata(primaryTerm, generation, minTranslogGeneration, translogCheckpointFileInfoTupleSet.size() * 2); + } + + @Override + public Set getCheckpointFileSnapshots() { + return translogCheckpointFileInfoTupleSet.stream().map(Tuple::v2).collect(Collectors.toSet()); + } + + @Override + public String toString() { + return new StringBuilder("TranslogTransferSnapshot [").append(" primary term = ") + .append(primaryTerm) + .append(", generation = ") + .append(generation) + .append(" ]") + .toString(); + } + + /** + * Builder for {@link TranslogCheckpointTransferSnapshot} + */ + public static class Builder { + private final long primaryTerm; + private final long generation; + private final List readers; + private final Function checkpointGenFileNameMapper; + private final Path location; + + public Builder( + long primaryTerm, + long generation, + Path location, + List readers, + Function checkpointGenFileNameMapper + ) { + this.primaryTerm = primaryTerm; + this.generation = generation; + this.readers = readers; + this.checkpointGenFileNameMapper = checkpointGenFileNameMapper; + this.location = location; + } + + public TranslogCheckpointTransferSnapshot build() throws IOException { + final List generations = new LinkedList<>(); + long highestGeneration = Long.MIN_VALUE; + long highestGenPrimaryTerm = Long.MIN_VALUE; + long lowestGeneration = Long.MAX_VALUE; + long highestGenMinTranslogGeneration = Long.MIN_VALUE; + TranslogCheckpointTransferSnapshot translogTransferSnapshot = new TranslogCheckpointTransferSnapshot( + primaryTerm, + generation, + readers.size() + ); + for (TranslogReader reader : readers) { + final long readerGeneration = reader.getGeneration(); + final long readerPrimaryTerm = reader.getPrimaryTerm(); + final long minTranslogGeneration = reader.getCheckpoint().getMinTranslogGeneration(); + final long checkpointGeneration = reader.getCheckpoint().getGeneration(); + Path translogPath = reader.path(); + Path checkpointPath = location.resolve(checkpointGenFileNameMapper.apply(readerGeneration)); + generations.add(readerGeneration); + translogTransferSnapshot.add( + new TranslogFileSnapshot(readerPrimaryTerm, readerGeneration, translogPath), + new CheckpointFileSnapshot(readerPrimaryTerm, checkpointGeneration, minTranslogGeneration, checkpointPath) + ); + if (readerGeneration > highestGeneration) { + highestGeneration = readerGeneration; + highestGenMinTranslogGeneration = minTranslogGeneration; + highestGenPrimaryTerm = readerPrimaryTerm; + } + lowestGeneration = Math.min(lowestGeneration, readerGeneration); + } + translogTransferSnapshot.setMinTranslogGeneration(highestGenMinTranslogGeneration); + + assert this.primaryTerm == highestGenPrimaryTerm : "inconsistent primary term"; + assert this.generation == highestGeneration : "inconsistent generation"; + assert LongStream.iterate(lowestGeneration, i -> i + 1) + .limit(highestGeneration) + .boxed() + .collect(Collectors.toList()) + .equals(generations.stream().sorted().collect(Collectors.toList())) == true : "generation gaps found"; + return translogTransferSnapshot; + } + } +} diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java new file mode 100644 index 0000000000000..02ebab8ed6826 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java @@ -0,0 +1,147 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.translog.transfer; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.opensearch.action.ActionListener; +import org.opensearch.action.LatchedActionListener; +import org.opensearch.common.blobstore.BlobPath; +import org.opensearch.index.translog.transfer.listener.FileTransferListener; +import org.opensearch.index.translog.transfer.listener.TranslogTransferListener; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.function.UnaryOperator; +import java.util.stream.Collectors; + +import static org.opensearch.index.translog.transfer.FileSnapshot.TransferFileSnapshot; +import static org.opensearch.index.translog.transfer.FileSnapshot.TranslogFileSnapshot; + +/** + * The class responsible for orchestrating the transfer of a {@link TransferSnapshot} via a {@link TransferService} + * + * @opensearch.internal + */ +public class TranslogTransferManager { + + private final TransferService transferService; + private final BlobPath remoteBaseTransferPath; + private final FileTransferListener fileTransferListener; + private final UnaryOperator> exclusionFilter; + + private static final long TRANSFER_TIMEOUT_IN_MILLIS = 30000; + + private static final Logger logger = LogManager.getLogger(TranslogTransferManager.class); + + public TranslogTransferManager( + TransferService transferService, + BlobPath remoteBaseTransferPath, + FileTransferListener fileTransferListener, + UnaryOperator> exclusionFilter + ) { + this.transferService = transferService; + this.remoteBaseTransferPath = remoteBaseTransferPath; + this.fileTransferListener = fileTransferListener; + this.exclusionFilter = exclusionFilter; + } + + public boolean transferSnapshot(TransferSnapshot transferSnapshot, TranslogTransferListener translogTransferListener) + throws IOException { + List exceptionList = new ArrayList<>(transferSnapshot.getTranslogTransferMetadata().getCount()); + Set toUpload = new HashSet<>(transferSnapshot.getTranslogTransferMetadata().getCount()); + try { + toUpload.addAll(exclusionFilter.apply(transferSnapshot.getTranslogFileSnapshots())); + toUpload.addAll(exclusionFilter.apply(transferSnapshot.getCheckpointFileSnapshots())); + final CountDownLatch latch = new CountDownLatch(toUpload.size()); + LatchedActionListener latchedActionListener = new LatchedActionListener<>( + ActionListener.wrap(fileTransferListener::onSuccess, ex -> { + assert ex instanceof FileTransferException; + logger.error( + () -> new ParameterizedMessage( + "Exception during transfer for file {}", + ((FileTransferException) ex).getFileSnapshot().getName() + ), + ex + ); + FileTransferException e = (FileTransferException) ex; + fileTransferListener.onFailure(e.getFileSnapshot(), ex); + exceptionList.add(ex); + }), + latch + ); + toUpload.forEach( + fileSnapshot -> transferService.uploadBlobAsync( + fileSnapshot, + remoteBaseTransferPath.add(String.valueOf(fileSnapshot.getPrimaryTerm())), + latchedActionListener + ) + ); + try { + if (latch.await(TRANSFER_TIMEOUT_IN_MILLIS, TimeUnit.MILLISECONDS) == false) { + Exception ex = new TimeoutException("Timed out waiting for transfer of snapshot " + transferSnapshot + " to complete"); + exceptionList.forEach(ex::addSuppressed); + throw ex; + } + } catch (InterruptedException ex) { + exceptionList.forEach(ex::addSuppressed); + Thread.currentThread().interrupt(); + throw ex; + } + if (exceptionList.isEmpty()) { + final TransferFileSnapshot transferFileSnapshot = prepareMetadata(transferSnapshot); + transferService.uploadBlob( + prepareMetadata(transferSnapshot), + remoteBaseTransferPath.add(String.valueOf(transferFileSnapshot.getPrimaryTerm())) + ); + translogTransferListener.onUploadComplete(transferSnapshot); + return true; + } else { + Exception ex = new RuntimeException("Failed to upload some files during transfer"); + exceptionList.forEach(ex::addSuppressed); + throw ex; + } + } catch (Exception ex) { + logger.error(() -> new ParameterizedMessage("Transfer failed for snapshot {}", transferSnapshot), ex); + translogTransferListener.onUploadFailed(transferSnapshot, ex); + return false; + } + } + + private TransferFileSnapshot prepareMetadata(TransferSnapshot transferSnapshot) throws IOException { + Map generationPrimaryTermMap = transferSnapshot.getTranslogFileSnapshots().stream().map(s -> { + assert s instanceof TranslogFileSnapshot; + return (TranslogFileSnapshot) s; + }) + .collect( + Collectors.toMap( + snapshot -> String.valueOf(snapshot.getGeneration()), + snapshot -> String.valueOf(snapshot.getPrimaryTerm()) + ) + ); + TranslogTransferMetadata translogTransferMetadata = transferSnapshot.getTranslogTransferMetadata(); + translogTransferMetadata.setGenerationToPrimaryTermMapper(new HashMap<>(generationPrimaryTermMap)); + TransferFileSnapshot fileSnapshot = new TransferFileSnapshot( + translogTransferMetadata.getFileName(), + translogTransferMetadata.createMetadataBytes(), + translogTransferMetadata.getPrimaryTerm() + ); + + return fileSnapshot; + } +} diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferMetadata.java b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferMetadata.java new file mode 100644 index 0000000000000..0aae773f593fd --- /dev/null +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferMetadata.java @@ -0,0 +1,127 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.translog.transfer; + +import org.apache.lucene.codecs.CodecUtil; +import org.apache.lucene.store.DataOutput; +import org.apache.lucene.store.OutputStreamIndexOutput; +import org.apache.lucene.util.SetOnce; +import org.opensearch.common.bytes.BytesReference; +import org.opensearch.common.io.stream.BytesStreamOutput; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Map; +import java.util.Objects; + +/** + * The metadata associated with every transfer {@link TransferSnapshot}. The metadata is uploaded at the end of the + * tranlog and generational checkpoint uploads to mark the latest generation and the translog/checkpoint files that are + * still referenced by the last checkpoint. + * + * @opensearch.internal + */ +public class TranslogTransferMetadata { + + private final long primaryTerm; + + private final long generation; + + private final long minTranslogGeneration; + + private final long timeStamp; + + private final int count; + + private final SetOnce> generationToPrimaryTermMapper = new SetOnce<>(); + + private static final String METADATA_SEPARATOR = "__"; + + private static final int BUFFER_SIZE = 4096; + + private static final int CURRENT_VERSION = 1; + + private static final String METADATA_CODEC = "md"; + + public TranslogTransferMetadata(long primaryTerm, long generation, long minTranslogGeneration, int count) { + this.primaryTerm = primaryTerm; + this.generation = generation; + this.minTranslogGeneration = minTranslogGeneration; + this.timeStamp = System.currentTimeMillis(); + this.count = count; + } + + public long getPrimaryTerm() { + return primaryTerm; + } + + public long getGeneration() { + return generation; + } + + public long getMinTranslogGeneration() { + return minTranslogGeneration; + } + + public int getCount() { + return count; + } + + public void setGenerationToPrimaryTermMapper(Map generationToPrimaryTermMap) { + generationToPrimaryTermMapper.set(generationToPrimaryTermMap); + } + + public String getFileName() { + return String.join( + METADATA_SEPARATOR, + Arrays.asList(String.valueOf(primaryTerm), String.valueOf(generation), String.valueOf(timeStamp)) + ); + } + + public byte[] createMetadataBytes() throws IOException { + try (BytesStreamOutput output = new BytesStreamOutput()) { + try ( + OutputStreamIndexOutput indexOutput = new OutputStreamIndexOutput( + "translog transfer metadata " + primaryTerm, + getFileName(), + output, + BUFFER_SIZE + ) + ) { + CodecUtil.writeHeader(indexOutput, METADATA_CODEC, CURRENT_VERSION); + write(indexOutput); + CodecUtil.writeFooter(indexOutput); + } + return BytesReference.toBytes(output.bytes()); + } + } + + @Override + public int hashCode() { + return Objects.hash(primaryTerm, generation, timeStamp); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + TranslogTransferMetadata other = (TranslogTransferMetadata) o; + return Objects.equals(this.primaryTerm, other.primaryTerm) + && Objects.equals(this.generation, other.generation) + && Objects.equals(this.timeStamp, other.timeStamp); + } + + private void write(DataOutput out) throws IOException { + out.writeLong(primaryTerm); + out.writeLong(generation); + out.writeLong(minTranslogGeneration); + out.writeLong(timeStamp); + out.writeMapOfStrings(generationToPrimaryTermMapper.get()); + } +} diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/listener/FileTransferListener.java b/server/src/main/java/org/opensearch/index/translog/transfer/listener/FileTransferListener.java new file mode 100644 index 0000000000000..939b56f109a36 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/translog/transfer/listener/FileTransferListener.java @@ -0,0 +1,32 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.translog.transfer.listener; + +import org.opensearch.index.translog.transfer.FileSnapshot.TransferFileSnapshot; + +/** + * The listener to be invoked on the completion or failure of a {@link TransferFileSnapshot} + * + * @opensearch.internal + */ +public interface FileTransferListener { + + /** + * Invoked when the transfer of a single {@link TransferFileSnapshot} succeeds + * @param fileSnapshot the corresponding file snapshot + */ + void onSuccess(TransferFileSnapshot fileSnapshot); + + /** + * Invoked when the transfer of a single {@link TransferFileSnapshot} fails + * @param fileSnapshot the corresponding file snapshot + * @param e the exception while processing the {@link TransferFileSnapshot} + */ + void onFailure(TransferFileSnapshot fileSnapshot, Exception e); +} diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/listener/TranslogTransferListener.java b/server/src/main/java/org/opensearch/index/translog/transfer/listener/TranslogTransferListener.java new file mode 100644 index 0000000000000..c09fd8798e505 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/translog/transfer/listener/TranslogTransferListener.java @@ -0,0 +1,36 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.translog.transfer.listener; + +import org.opensearch.index.translog.transfer.TransferSnapshot; + +import java.io.IOException; + +/** + * The listener to be invoked on the completion or failure of a {@link TransferSnapshot} + * + * @opensearch.internal + */ +public interface TranslogTransferListener { + + /** + * Invoked when the transfer of {@link TransferSnapshot} succeeds + * @param transferSnapshot the transfer snapshot + * @throws IOException the exception during the transfer of data + */ + void onUploadComplete(TransferSnapshot transferSnapshot) throws IOException; + + /** + * Invoked when the transfer of {@link TransferSnapshot} fails + * @param transferSnapshot the transfer snapshot + * @param ex the exception while processing the {@link TransferSnapshot} + * @throws IOException the exception during the transfer of data + */ + void onUploadFailed(TransferSnapshot transferSnapshot, Exception ex) throws IOException; +} diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/listener/package-info.java b/server/src/main/java/org/opensearch/index/translog/transfer/listener/package-info.java new file mode 100644 index 0000000000000..edb7f453515b1 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/translog/transfer/listener/package-info.java @@ -0,0 +1,10 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** Core classes responsible for handling all translog operations */ +package org.opensearch.index.translog.transfer.listener; diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/package-info.java b/server/src/main/java/org/opensearch/index/translog/transfer/package-info.java new file mode 100644 index 0000000000000..2ac96b01b0673 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/translog/transfer/package-info.java @@ -0,0 +1,10 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** Core classes responsible for handling all translog operations */ +package org.opensearch.index.translog.transfer; diff --git a/server/src/test/java/org/opensearch/index/translog/transfer/BlobStoreTransferServiceTests.java b/server/src/test/java/org/opensearch/index/translog/transfer/BlobStoreTransferServiceTests.java new file mode 100644 index 0000000000000..adca47bf64c64 --- /dev/null +++ b/server/src/test/java/org/opensearch/index/translog/transfer/BlobStoreTransferServiceTests.java @@ -0,0 +1,123 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.translog.transfer; + +import org.opensearch.action.ActionListener; +import org.opensearch.action.LatchedActionListener; +import org.opensearch.cluster.metadata.RepositoryMetadata; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.env.Environment; +import org.opensearch.env.TestEnvironment; +import org.opensearch.indices.recovery.RecoverySettings; +import org.opensearch.repositories.Repository; +import org.opensearch.repositories.blobstore.BlobStoreRepository; +import org.opensearch.repositories.blobstore.BlobStoreTestUtil; +import org.opensearch.repositories.fs.FsRepository; +import org.opensearch.test.OpenSearchTestCase; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardOpenOption; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +public class BlobStoreTransferServiceTests extends OpenSearchTestCase { + + private ExecutorService executorService; + + private BlobStoreRepository repository; + + @Override + public void setUp() throws Exception { + super.setUp(); + repository = createRepository(); + executorService = Executors.newFixedThreadPool(1); + } + + public void testUploadBlob() throws IOException { + Path testFile = createTempFile(); + Files.write(testFile, randomByteArrayOfLength(128), StandardOpenOption.APPEND); + FileSnapshot.TransferFileSnapshot transferFileSnapshot = new FileSnapshot.TransferFileSnapshot(testFile, randomNonNegativeLong()); + TransferService transferService = new BlobStoreTransferService(repository.blobStore(), executorService); + transferService.uploadBlob(transferFileSnapshot, repository.basePath()); + } + + public void testUploadBlobAsync() throws IOException, InterruptedException { + Path testFile = createTempFile(); + Files.write(testFile, randomByteArrayOfLength(128), StandardOpenOption.APPEND); + AtomicBoolean succeeded = new AtomicBoolean(false); + FileSnapshot.TransferFileSnapshot transferFileSnapshot = new FileSnapshot.TransferFileSnapshot(testFile, randomNonNegativeLong()); + CountDownLatch latch = new CountDownLatch(1); + TransferService transferService = new BlobStoreTransferService(repository.blobStore(), executorService); + transferService.uploadBlobAsync(transferFileSnapshot, repository.basePath(), new LatchedActionListener<>(new ActionListener<>() { + @Override + public void onResponse(FileSnapshot.TransferFileSnapshot fileSnapshot) { + assert succeeded.compareAndSet(false, true); + assertEquals(transferFileSnapshot.getPrimaryTerm(), fileSnapshot.getPrimaryTerm()); + assertEquals(transferFileSnapshot.getName(), fileSnapshot.getName()); + } + + @Override + public void onFailure(Exception e) { + throw new AssertionError("Failed to perform uploadBlobAsync", e); + } + }, latch)); + assertTrue(latch.await(1000, TimeUnit.MILLISECONDS)); + assertTrue(succeeded.get()); + } + + @Override + public void tearDown() throws Exception { + super.tearDown(); + repository.stop(); + executorService.shutdown(); + executorService.awaitTermination(1000, TimeUnit.MILLISECONDS); + } + + /** Create a {@link Repository} with a random name **/ + private BlobStoreRepository createRepository() { + Settings settings = Settings.builder().put("location", randomAlphaOfLength(10)).build(); + RepositoryMetadata repositoryMetadata = new RepositoryMetadata(randomAlphaOfLength(10), FsRepository.TYPE, settings); + final ClusterService clusterService = BlobStoreTestUtil.mockClusterService(repositoryMetadata); + final FsRepository repository = new FsRepository( + repositoryMetadata, + createEnvironment(), + xContentRegistry(), + clusterService, + new RecoverySettings(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)) + ) { + @Override + protected void assertSnapshotOrGenericThread() { + // eliminate thread name check as we create repo manually + } + }; + clusterService.addStateApplier(event -> repository.updateState(event.state())); + // Apply state once to initialize repo properly like RepositoriesService would + repository.updateState(clusterService.state()); + repository.start(); + return repository; + } + + /** Create a {@link Environment} with random path.home and path.repo **/ + private Environment createEnvironment() { + Path home = createTempDir(); + return TestEnvironment.newEnvironment( + Settings.builder() + .put(Environment.PATH_HOME_SETTING.getKey(), home.toAbsolutePath()) + .put(Environment.PATH_REPO_SETTING.getKey(), home.resolve("repo").toAbsolutePath()) + .build() + ); + } +} diff --git a/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java b/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java new file mode 100644 index 0000000000000..60b7029f18fa6 --- /dev/null +++ b/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java @@ -0,0 +1,150 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.translog.transfer; + +import org.apache.lucene.tests.util.LuceneTestCase; +import org.mockito.Mockito; +import org.opensearch.action.ActionListener; +import org.opensearch.common.blobstore.BlobPath; +import org.opensearch.index.translog.Translog; +import org.opensearch.index.translog.transfer.listener.FileTransferListener; +import org.opensearch.index.translog.transfer.listener.TranslogTransferListener; +import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.index.translog.transfer.FileSnapshot.CheckpointFileSnapshot; +import org.opensearch.index.translog.transfer.FileSnapshot.TranslogFileSnapshot; +import org.opensearch.index.translog.transfer.FileSnapshot.TransferFileSnapshot; + +import java.io.IOException; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.mock; + +@LuceneTestCase.SuppressFileSystems("*") +public class TranslogTransferManagerTests extends OpenSearchTestCase { + + private TransferService transferService; + private BlobPath remoteBaseTransferPath; + private long primaryTerm; + private long generation; + private long minTranslogGeneration; + + @Override + public void setUp() throws Exception { + super.setUp(); + primaryTerm = randomNonNegativeLong(); + generation = randomNonNegativeLong(); + minTranslogGeneration = randomLongBetween(0, generation); + remoteBaseTransferPath = new BlobPath().add("base_path"); + transferService = mock(TransferService.class); + } + + @SuppressWarnings("unchecked") + public void testTransferSnapshot() throws IOException { + AtomicInteger fileTransferSucceeded = new AtomicInteger(); + AtomicInteger fileTransferFailed = new AtomicInteger(); + AtomicInteger translogTransferSucceeded = new AtomicInteger(); + AtomicInteger translogTransferFailed = new AtomicInteger(); + + doNothing().when(transferService) + .uploadBlob(any(TransferFileSnapshot.class), Mockito.eq(remoteBaseTransferPath.add(String.valueOf(primaryTerm)))); + doAnswer(invocationOnMock -> { + ActionListener listener = (ActionListener) invocationOnMock.getArguments()[2]; + listener.onResponse((TransferFileSnapshot) invocationOnMock.getArguments()[0]); + return null; + }).when(transferService).uploadBlobAsync(any(TransferFileSnapshot.class), any(BlobPath.class), any(ActionListener.class)); + + TranslogTransferManager translogTransferManager = new TranslogTransferManager( + transferService, + remoteBaseTransferPath, + new FileTransferListener() { + @Override + public void onSuccess(TransferFileSnapshot fileSnapshot) { + fileTransferSucceeded.incrementAndGet(); + } + + @Override + public void onFailure(TransferFileSnapshot fileSnapshot, Exception e) { + fileTransferFailed.incrementAndGet(); + } + }, + r -> r + ); + + assertTrue(translogTransferManager.transferSnapshot(createTransferSnapshot(), new TranslogTransferListener() { + @Override + public void onUploadComplete(TransferSnapshot transferSnapshot) { + translogTransferSucceeded.incrementAndGet(); + } + + @Override + public void onUploadFailed(TransferSnapshot transferSnapshot, Exception ex) { + translogTransferFailed.incrementAndGet(); + } + })); + assertEquals(4, fileTransferSucceeded.get()); + assertEquals(0, fileTransferFailed.get()); + assertEquals(1, translogTransferSucceeded.get()); + assertEquals(0, translogTransferFailed.get()); + } + + private TransferSnapshot createTransferSnapshot() { + return new TransferSnapshot() { + @Override + public Set getCheckpointFileSnapshots() { + try { + return Set.of( + new CheckpointFileSnapshot( + primaryTerm, + generation, + minTranslogGeneration, + createTempFile(Translog.TRANSLOG_FILE_PREFIX + generation, Translog.CHECKPOINT_SUFFIX) + ), + new CheckpointFileSnapshot( + primaryTerm, + generation, + minTranslogGeneration, + createTempFile(Translog.TRANSLOG_FILE_PREFIX + (generation - 1), Translog.CHECKPOINT_SUFFIX) + ) + ); + } catch (IOException e) { + throw new AssertionError("Failed to create temp file", e); + } + } + + @Override + public Set getTranslogFileSnapshots() { + try { + return Set.of( + new TranslogFileSnapshot( + primaryTerm, + generation, + createTempFile(Translog.TRANSLOG_FILE_PREFIX + generation, Translog.TRANSLOG_FILE_SUFFIX) + ), + new TranslogFileSnapshot( + primaryTerm, + generation - 1, + createTempFile(Translog.TRANSLOG_FILE_PREFIX + (generation - 1), Translog.TRANSLOG_FILE_SUFFIX) + ) + ); + } catch (IOException e) { + throw new AssertionError("Failed to create temp file", e); + } + } + + @Override + public TranslogTransferMetadata getTranslogTransferMetadata() { + return new TranslogTransferMetadata(primaryTerm, generation, minTranslogGeneration, randomInt(5)); + } + }; + } +}