From d696e19ecd844327bcc4cae5ac8985e663405c7e Mon Sep 17 00:00:00 2001 From: Bukhtawar Khan Date: Tue, 20 Sep 2022 12:02:20 +0530 Subject: [PATCH] Changes to introduce input stream Signed-off-by: Bukhtawar Khan --- .../opensearch/index/translog/Checkpoint.java | 4 + .../transfer/BlobStoreTransferService.java | 21 ++--- .../index/translog/transfer/FileSnapshot.java | 90 +++++++++++-------- .../transfer/TransferSnapshotProvider.java | 19 +++- .../transfer/TranslogTransferManager.java | 17 ++-- .../transfer/TranslogTransferMetadata.java | 57 +++++++----- .../TranslogTransferManagerTests.java | 2 + 7 files changed, 123 insertions(+), 87 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/translog/Checkpoint.java b/server/src/main/java/org/opensearch/index/translog/Checkpoint.java index c7339ea1dac8a..8df574ed8374f 100644 --- a/server/src/main/java/org/opensearch/index/translog/Checkpoint.java +++ b/server/src/main/java/org/opensearch/index/translog/Checkpoint.java @@ -266,6 +266,10 @@ public long getMinTranslogGeneration() { return minTranslogGeneration; } + public long getGeneration() { + return generation; + } + @Override public boolean equals(Object o) { if (this == o) return true; diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java b/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java index 014e94673b477..0aede52939a1a 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/BlobStoreTransferService.java @@ -17,8 +17,8 @@ import org.opensearch.common.blobstore.BlobStore; import org.opensearch.index.translog.transfer.FileSnapshot.TransferFileSnapshot; -import java.io.ByteArrayInputStream; import java.io.IOException; +import java.io.InputStream; import java.util.concurrent.ExecutorService; /** @@ -47,14 +47,9 @@ public void uploadBlobAsync( assert remoteTransferPath instanceof BlobPath; BlobPath blobPath = (BlobPath) remoteTransferPath; executorService.execute(ActionRunnable.wrap(listener, l -> { - try { + try (InputStream inputStream = fileSnapshot.inputStream()) { blobStore.blobContainer(blobPath) - .writeBlobAtomic( - fileSnapshot.getName(), - new ByteArrayInputStream(fileSnapshot.getContent()), - fileSnapshot.getContentLength(), - true - ); + .writeBlobAtomic(fileSnapshot.getName(), inputStream, fileSnapshot.getContentLength(), true); l.onResponse(fileSnapshot); } catch (Exception e) { logger.error(() -> new ParameterizedMessage("Failed to upload blob {}", fileSnapshot.getName()), e); @@ -67,14 +62,8 @@ public void uploadBlobAsync( public void uploadBlob(final TransferFileSnapshot fileSnapshot, Iterable remoteTransferPath) throws IOException { assert remoteTransferPath instanceof BlobPath; BlobPath blobPath = (BlobPath) remoteTransferPath; - try { - blobStore.blobContainer(blobPath) - .writeBlobAtomic( - fileSnapshot.getName(), - new ByteArrayInputStream(fileSnapshot.getContent()), - fileSnapshot.getContentLength(), - true - ); + try (InputStream inputStream = fileSnapshot.inputStream()) { + blobStore.blobContainer(blobPath).writeBlobAtomic(fileSnapshot.getName(), inputStream, fileSnapshot.getContentLength(), true); } catch (Exception ex) { logger.error(() -> new ParameterizedMessage("Failed to upload blob {}", fileSnapshot.getName()), ex); throw ex; diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/FileSnapshot.java b/server/src/main/java/org/opensearch/index/translog/transfer/FileSnapshot.java index bc12cc7fe237b..da3a7cd35d4b7 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/FileSnapshot.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/FileSnapshot.java @@ -9,49 +9,50 @@ package org.opensearch.index.translog.transfer; import org.opensearch.common.Nullable; +import org.opensearch.common.io.stream.ByteBufferStreamInput; +import org.opensearch.common.io.stream.BytesStreamInput; +import org.opensearch.common.io.stream.InputStreamStreamInput; +import org.opensearch.core.internal.io.IOUtils; +import org.opensearch.index.translog.BufferedChecksumStreamInput; import java.io.ByteArrayInputStream; +import java.io.Closeable; +import java.io.FileInputStream; import java.io.IOException; -import java.nio.file.Files; +import java.io.InputStream; +import java.nio.channels.Channels; +import java.nio.channels.FileChannel; import java.nio.file.Path; +import java.nio.file.StandardOpenOption; import java.util.Objects; -import java.util.zip.CRC32; -import java.util.zip.CheckedInputStream; /** * Snapshot of a single file that gets transferred * * @opensearch.internal */ -public class FileSnapshot { +public class FileSnapshot implements Closeable { - private final long checksum; - private final byte[] content; private final String name; - private final long contentLength; + private InputStream inputStream; @Nullable private Path path; + @Nullable + private FileChannel fileChannel; + @Nullable + private byte[] content; public FileSnapshot(Path path) throws IOException { Objects.requireNonNull(path); - this.name = path.getFileName().toString(); + this.name = path.toString(); this.path = path; - try (CheckedInputStream stream = new CheckedInputStream(Files.newInputStream(path), new CRC32())) { - this.content = stream.readAllBytes(); - this.checksum = stream.getChecksum().getValue(); - this.contentLength = content.length; - } + this.fileChannel = FileChannel.open(path, StandardOpenOption.READ); } public FileSnapshot(String name, byte[] content) throws IOException { - Objects.requireNonNull(content); Objects.requireNonNull(name); this.name = name; - try (CheckedInputStream stream = new CheckedInputStream(new ByteArrayInputStream(content), new CRC32())) { - this.content = stream.readAllBytes(); - this.checksum = stream.getChecksum().getValue(); - this.contentLength = content.length; - } + this.content = content; } public Path getPath() { @@ -62,21 +63,25 @@ public String getName() { return name; } - public byte[] getContent() { - return content; + public long getContentLength() throws IOException { + return fileChannel == null ? fileChannel.size() : content.length ; } - public long getChecksum() { - return checksum; - } - - public long getContentLength() { - return contentLength; + public InputStream inputStream() throws IOException { + if (fileChannel != null) { + this.inputStream = new BufferedChecksumStreamInput( + new InputStreamStreamInput(Channels.newInputStream(fileChannel), fileChannel.size()), + path.toString() + ); + } else { + this.inputStream = new BufferedChecksumStreamInput(new BytesStreamInput(content), name); + } + return inputStream; } @Override public int hashCode() { - return Objects.hash(name, path, checksum, contentLength); + return Objects.hash(name, inputStream); } @Override @@ -84,10 +89,7 @@ public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; FileSnapshot other = (FileSnapshot) o; - return Objects.equals(this.name, other.name) - && Objects.equals(this.path, other.path) - && Objects.equals(this.checksum, other.checksum) - && Objects.equals(this.contentLength, other.contentLength); + return Objects.equals(this.name, other.name) && Objects.equals(this.inputStream, other.inputStream); } @Override @@ -96,14 +98,16 @@ public String toString() { .append(name) .append(", path = ") .append(path.toUri()) - .append(", checksum = ") - .append(checksum) - .append(", contentLength = ") - .append(contentLength) .append("]") .toString(); } + @Override + public void close() throws IOException { + IOUtils.close(inputStream, fileChannel); + } + + public static class TransferFileSnapshot extends FileSnapshot { private final long primaryTerm; @@ -171,11 +175,18 @@ public boolean equals(Object o) { public static class CheckpointFileSnapshot extends TransferFileSnapshot { + private final long generation; + private final long minTranslogGeneration; - public CheckpointFileSnapshot(long primaryTerm, long minTranslogGeneration, Path path) throws IOException { + public CheckpointFileSnapshot(long primaryTerm, long generation, long minTranslogGeneration, Path path) throws IOException { super(path, primaryTerm); this.minTranslogGeneration = minTranslogGeneration; + this.generation = generation; + } + + public long getGeneration() { + return generation; } public long getMinTranslogGeneration() { @@ -184,7 +195,7 @@ public long getMinTranslogGeneration() { @Override public int hashCode() { - return Objects.hash(minTranslogGeneration, super.hashCode()); + return Objects.hash(generation, minTranslogGeneration, super.hashCode()); } @Override @@ -193,7 +204,8 @@ public boolean equals(Object o) { if (this == o) return true; if (getClass() != o.getClass()) return false; CheckpointFileSnapshot other = (CheckpointFileSnapshot) o; - return Objects.equals(this.minTranslogGeneration, other.minTranslogGeneration); + return Objects.equals(this.minTranslogGeneration, other.minTranslogGeneration) + && Objects.equals(this.generation, other.generation); } return false; } diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TransferSnapshotProvider.java b/server/src/main/java/org/opensearch/index/translog/transfer/TransferSnapshotProvider.java index e8bfe84734ed1..b9c0f18dce1c4 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TransferSnapshotProvider.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TransferSnapshotProvider.java @@ -14,11 +14,13 @@ import java.io.IOException; import java.nio.file.Path; import java.util.HashSet; +import java.util.LinkedList; import java.util.List; import java.util.Set; import java.util.function.Function; import java.util.function.Supplier; import java.util.stream.Collectors; +import java.util.stream.LongStream; import static org.opensearch.index.translog.transfer.FileSnapshot.TransferFileSnapshot; import static org.opensearch.index.translog.transfer.FileSnapshot.TranslogFileSnapshot; @@ -45,11 +47,12 @@ public TransferSnapshotProvider( final long readerGeneration = reader.getGeneration(); final long readerPrimaryTerm = reader.getPrimaryTerm(); final long minTranslogGeneration = reader.getCheckpoint().getMinTranslogGeneration(); + final long checkpointGeneration = reader.getCheckpoint().getGeneration(); Path translogPath = reader.path(); Path checkpointPath = location.resolve(checkpointGenFileNameMapper.apply(readerGeneration)); translogTransferSnapshot.add( new TranslogFileSnapshot(readerPrimaryTerm, readerGeneration, translogPath), - new CheckpointFileSnapshot(readerPrimaryTerm, minTranslogGeneration, checkpointPath) + new CheckpointFileSnapshot(readerPrimaryTerm, checkpointGeneration, minTranslogGeneration, checkpointPath) ); } } @@ -63,32 +66,46 @@ static class TranslogCheckpointTransferSnapshot implements TransferSnapshot { private final Set> translogCheckpointFileInfoTupleSet; private final int size; + private final List generations; private CheckpointFileSnapshot latestCheckPointFileSnapshot; private TranslogFileSnapshot latestTranslogFileSnapshot; private final long generation; private long highestGeneration; + private long lowestGeneration; private final long primaryTerm; TranslogCheckpointTransferSnapshot(long primaryTerm, long generation, int size) { translogCheckpointFileInfoTupleSet = new HashSet<>(size); this.size = size; + this.generations = new LinkedList<>(); this.generation = generation; this.primaryTerm = primaryTerm; + this.highestGeneration = Long.MIN_VALUE; + this.lowestGeneration = Long.MAX_VALUE; } private void add(TranslogFileSnapshot translogFileSnapshot, CheckpointFileSnapshot checkPointFileSnapshot) { translogCheckpointFileInfoTupleSet.add(Tuple.tuple(translogFileSnapshot, checkPointFileSnapshot)); + assert translogFileSnapshot.getGeneration() == checkPointFileSnapshot.getGeneration(); + generations.add(translogFileSnapshot.getGeneration()); if (highestGeneration < translogFileSnapshot.getGeneration()) { latestCheckPointFileSnapshot = checkPointFileSnapshot; latestTranslogFileSnapshot = translogFileSnapshot; highestGeneration = translogFileSnapshot.getGeneration(); } + this.lowestGeneration = Math.min(lowestGeneration, translogFileSnapshot.getGeneration()); } private void assertInvariants() { assert this.primaryTerm == latestTranslogFileSnapshot.getPrimaryTerm() : "inconsistent primary term"; assert this.generation == highestGeneration : "inconsistent generation"; assert translogCheckpointFileInfoTupleSet.size() == size : "inconsistent translog and checkpoint file count"; + assert highestGeneration <= lowestGeneration : "lowest generation is greater than highest generation"; + assert LongStream.iterate(lowestGeneration, i -> i + 1) + .limit(highestGeneration) + .boxed() + .collect(Collectors.toList()) + .equals(generations.stream().sorted().collect(Collectors.toList())) == true : "generation gaps found"; } @Override diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java index edf8bbeb584a8..057956b0dbc57 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java @@ -14,10 +14,9 @@ import org.opensearch.action.ActionListener; import org.opensearch.action.LatchedActionListener; import org.opensearch.common.blobstore.BlobPath; -import org.opensearch.common.bytes.BytesReference; -import org.opensearch.common.io.stream.BytesStreamOutput; import org.opensearch.index.translog.transfer.listener.FileTransferListener; import org.opensearch.index.translog.transfer.listener.TranslogTransferListener; + import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; @@ -132,16 +131,12 @@ private TransferFileSnapshot prepareMetadata(TransferSnapshot transferSnapshot) ); TranslogTransferMetadata translogTransferMetadata = transferSnapshot.getTranslogTransferMetadata(); translogTransferMetadata.setGenerationToPrimaryTermMapper(new HashMap<>(generationPrimaryTermMap)); - TransferFileSnapshot fileSnapshot; - try (BytesStreamOutput output = new BytesStreamOutput()) { - translogTransferMetadata.writeTo(output); - fileSnapshot = new TransferFileSnapshot( - translogTransferMetadata.getMetadataFileName(), - BytesReference.toBytes(output.bytes()), - -1 - ); + TransferFileSnapshot fileSnapshot = new TransferFileSnapshot( + translogTransferMetadata.getFileName(), + translogTransferMetadata.createMetadataBytes(), + translogTransferMetadata.getPrimaryTerm() + ); - } return fileSnapshot; } } diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferMetadata.java b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferMetadata.java index 77aaee1d0ad20..bfb237721e4bf 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferMetadata.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferMetadata.java @@ -8,10 +8,12 @@ package org.opensearch.index.translog.transfer; +import org.apache.lucene.codecs.CodecUtil; +import org.apache.lucene.store.DataOutput; +import org.apache.lucene.store.OutputStreamIndexOutput; import org.apache.lucene.util.SetOnce; -import org.opensearch.common.io.stream.StreamInput; -import org.opensearch.common.io.stream.StreamOutput; -import org.opensearch.common.io.stream.Writeable; +import org.opensearch.common.bytes.BytesReference; +import org.opensearch.common.io.stream.BytesStreamOutput; import java.io.IOException; import java.util.Arrays; @@ -25,7 +27,7 @@ * * @opensearch.internal */ -public class TranslogTransferMetadata implements Writeable { +public class TranslogTransferMetadata { private final long primaryTerm; @@ -37,10 +39,16 @@ public class TranslogTransferMetadata implements Writeable { private final int count; - private final SetOnce> generationToPrimaryTermMapper = new SetOnce<>(); + private final SetOnce> generationToPrimaryTermMapper = new SetOnce<>(); private static final String METADATA_SEPARATOR = "__"; + private static final int BUFFER_SIZE = 4096; + + private static final int CURRENT_VERSION = 1; + + private static final String METADATA_CODEC = "md"; + public TranslogTransferMetadata(long primaryTerm, long generation, long minTranslogGeneration, int count) { this.primaryTerm = primaryTerm; this.generation = generation; @@ -49,16 +57,6 @@ public TranslogTransferMetadata(long primaryTerm, long generation, long minTrans this.count = count; } - TranslogTransferMetadata(StreamInput in) throws IOException { - this.primaryTerm = in.readLong(); - this.generation = in.readLong(); - this.minTranslogGeneration = in.readLong(); - this.count = in.readInt(); - this.timeStamp = in.readLong(); - this.generationToPrimaryTermMapper.set(in.readMap()); - - } - public long getPrimaryTerm() { return primaryTerm; } @@ -75,22 +73,42 @@ public int getCount() { return count; } - public void setGenerationToPrimaryTermMapper(Map generationToPrimaryTermMap) { + public void setGenerationToPrimaryTermMapper(Map generationToPrimaryTermMap) { generationToPrimaryTermMapper.set(generationToPrimaryTermMap); } - public String getMetadataFileName() { + public String getFileName() { return String.join( METADATA_SEPARATOR, Arrays.asList(String.valueOf(primaryTerm), String.valueOf(generation), String.valueOf(timeStamp)) ); } + public byte[] createMetadataBytes() throws IOException { + try (BytesStreamOutput output = new BytesStreamOutput()) { + try ( + OutputStreamIndexOutput indexOutput = new OutputStreamIndexOutput( + "translog transfer metadata "+ getPrimaryTerm(), + getFileName(), + output, + BUFFER_SIZE + ) + ) { + CodecUtil.writeHeader(indexOutput, METADATA_CODEC, CURRENT_VERSION); + write(indexOutput); + CodecUtil.writeFooter(indexOutput); + } + return BytesReference.toBytes(output.bytes()); + } + } + + @Override public int hashCode() { return Objects.hash(primaryTerm, generation, timeStamp); } + @Override public boolean equals(Object o) { if (this == o) return true; @@ -101,12 +119,11 @@ public boolean equals(Object o) { && Objects.equals(this.timeStamp, other.timeStamp); } - @Override - public void writeTo(StreamOutput out) throws IOException { + private void write(DataOutput out) throws IOException { out.writeLong(primaryTerm); out.writeLong(generation); out.writeLong(minTranslogGeneration); out.writeLong(timeStamp); - out.writeMap(generationToPrimaryTermMapper.get()); + out.writeMapOfStrings(generationToPrimaryTermMapper.get()); } } diff --git a/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java b/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java index d662333fb32df..d0f6bd86c032d 100644 --- a/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java +++ b/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java @@ -102,11 +102,13 @@ public Set getCheckpointFileSnapshots() { return Set.of( new CheckpointFileSnapshot( primaryTerm, + generation, minTranslogGeneration, createTempFile(Translog.TRANSLOG_FILE_PREFIX + generation, Translog.CHECKPOINT_SUFFIX) ), new CheckpointFileSnapshot( primaryTerm, + generation, minTranslogGeneration, createTempFile(Translog.TRANSLOG_FILE_PREFIX + (generation - 1), Translog.CHECKPOINT_SUFFIX) )