Skip to content

Commit

Permalink
[Remote Translog] Use InputStream that supports mark and reset while …
Browse files Browse the repository at this point in the history
…uploading translog files (#5868) (#5900)

* Use stream that supports mark and reset for translog upload

Signed-off-by: Sachin Kale <kalsac@amazon.com>
  • Loading branch information
opensearch-trigger-bot[bot] authored Jan 17, 2023
1 parent 29bd1ef commit d34a34b
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,19 @@
package org.opensearch.index.translog.transfer;

import org.opensearch.common.Nullable;
import org.opensearch.common.io.stream.BytesStreamInput;
import org.opensearch.common.io.stream.InputStreamStreamInput;
import org.opensearch.common.lucene.store.ByteArrayIndexInput;
import org.opensearch.common.lucene.store.InputStreamIndexInput;
import org.opensearch.core.internal.io.IOUtils;
import org.opensearch.index.translog.BufferedChecksumStreamInput;

import java.io.BufferedInputStream;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.nio.channels.Channels;
import java.nio.channels.FileChannel;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.Arrays;
import java.util.Objects;

/**
Expand Down Expand Up @@ -66,11 +67,8 @@ public long getContentLength() throws IOException {

public InputStream inputStream() throws IOException {
return fileChannel != null
? new BufferedChecksumStreamInput(
new InputStreamStreamInput(Channels.newInputStream(fileChannel), fileChannel.size()),
path.toString()
)
: new BufferedChecksumStreamInput(new BytesStreamInput(content), name);
? new BufferedInputStream(Channels.newInputStream(fileChannel))
: new InputStreamIndexInput(new ByteArrayIndexInput(this.name, content), content.length);
}

@Override
Expand All @@ -83,9 +81,7 @@ public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
FileSnapshot other = (FileSnapshot) o;
return Objects.equals(this.name, other.name)
&& Objects.equals(this.content, other.content)
&& Objects.equals(this.path, other.path);
return Objects.equals(this.name, other.name) && Arrays.equals(this.content, other.content) && Objects.equals(this.path, other.path);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.index.translog.transfer;

import org.junit.After;
import org.opensearch.test.OpenSearchTestCase;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;

public class FileSnapshotTests extends OpenSearchTestCase {

FileSnapshot fileSnapshot;

@After
public void tearDown() throws Exception {
super.tearDown();
fileSnapshot.close();
}

public void testFileSnapshotPath() throws IOException {
Path file = createTempFile();
Files.writeString(file, "hello");
fileSnapshot = new FileSnapshot.TransferFileSnapshot(file, 12);

assertFileSnapshotProperties(file);

try (FileSnapshot sameFileSnapshot = new FileSnapshot.TransferFileSnapshot(file, 12)) {
assertEquals(sameFileSnapshot, fileSnapshot);
}

try (FileSnapshot sameFileDiffPTSnapshot = new FileSnapshot.TransferFileSnapshot(file, 34)) {
assertNotEquals(sameFileDiffPTSnapshot, fileSnapshot);
}
}

public void testFileSnapshotContent() throws IOException {
Path file = createTempFile();
Files.writeString(file, "hello");
fileSnapshot = new FileSnapshot.TransferFileSnapshot(file.getFileName().toString(), Files.readAllBytes(file), 23);

assertFileSnapshotProperties(file);

try (
FileSnapshot sameFileSnapshot = new FileSnapshot.TransferFileSnapshot(
file.getFileName().toString(),
Files.readAllBytes(file),
23
)
) {
assertEquals(sameFileSnapshot, fileSnapshot);
}

try (
FileSnapshot anotherFileSnapshot = new FileSnapshot.TransferFileSnapshot(
file.getFileName().toString(),
Files.readAllBytes(createTempFile()),
23
)
) {
assertNotEquals(anotherFileSnapshot, fileSnapshot);
}
}

private void assertFileSnapshotProperties(Path file) throws IOException {
assertEquals(file.getFileName().toString(), fileSnapshot.getName());
assertEquals(Files.size(file), fileSnapshot.getContentLength());
assertTrue(fileSnapshot.inputStream().markSupported());
}
}

0 comments on commit d34a34b

Please sign in to comment.