Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce interface changes to support read/write blob with object metadata #13023

Merged
merged 21 commits into from
Apr 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ public void readBlobAsync(String blobName, ActionListener<ReadContext> listener)
);
}
}
listener.onResponse(new ReadContext(blobSize, blobPartInputStreamFutures, blobChecksum));
listener.onResponse(new ReadContext.Builder(blobSize, blobPartInputStreamFutures).blobChecksum(blobChecksum).build());
} catch (Exception ex) {
listener.onFailure(ex);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@

import org.apache.lucene.store.IndexInput;
import org.opensearch.cluster.metadata.RepositoryMetadata;
import org.opensearch.common.CheckedTriFunction;
import org.opensearch.common.CheckedConsumer;
import org.opensearch.common.StreamContext;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.blobstore.stream.write.StreamContextSupplier;
Expand All @@ -49,6 +49,7 @@
import org.opensearch.repositories.s3.async.AsyncTransferManager;
import org.opensearch.test.OpenSearchTestCase;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;

import java.io.IOException;
Expand Down Expand Up @@ -466,24 +467,30 @@ private void testWriteBlobByStreams(boolean expectException, boolean throwExcept
exceptionRef.set(ex);
countDownLatch.countDown();
});
blobContainer.asyncBlobUpload(new WriteContext("write_blob_by_streams_max_retries", new StreamContextSupplier() {
@Override
public StreamContext supplyStreamContext(long partSize) {
return new StreamContext(new CheckedTriFunction<Integer, Long, Long, InputStreamContainer, IOException>() {
@Override
public InputStreamContainer apply(Integer partNo, Long size, Long position) throws IOException {
InputStream inputStream = new OffsetRangeIndexInputStream(new ByteArrayIndexInput("desc", bytes), size, position);
openInputStreams.add(inputStream);
return new InputStreamContainer(inputStream, size, position);
}
}, partSize, calculateLastPartSize(bytes.length, partSize), calculateNumberOfParts(bytes.length, partSize));
}
}, bytes.length, false, WritePriority.NORMAL, uploadSuccess -> {

StreamContextSupplier streamContextSupplier = partSize -> new StreamContext((partNo, size, position) -> {
InputStream inputStream = new OffsetRangeIndexInputStream(new ByteArrayIndexInput("desc", bytes), size, position);
openInputStreams.add(inputStream);
return new InputStreamContainer(inputStream, size, position);
}, partSize, calculateLastPartSize(bytes.length, partSize), calculateNumberOfParts(bytes.length, partSize));

CheckedConsumer<Boolean, IOException> uploadFinalizer = uploadSuccess -> {
assertTrue(uploadSuccess);
if (throwExceptionOnFinalizeUpload) {
throw new RuntimeException();
}
}, false, null), completionListener);
};

WriteContext writeContext = new WriteContext.Builder().fileName("write_blob_by_streams_max_retries")
.streamContextSupplier(streamContextSupplier)
.fileSize(bytes.length)
.failIfAlreadyExists(false)
.writePriority(WritePriority.NORMAL)
.uploadFinalizer(uploadFinalizer)
.doRemoteDataIntegrityCheck(false)
.build();

blobContainer.asyncBlobUpload(writeContext, completionListener);

assertTrue(countDownLatch.await(5000, TimeUnit.SECONDS));
// wait for completableFuture to finish
Expand Down Expand Up @@ -516,24 +523,30 @@ private void testWriteBlobByStreamsLargeBlob(boolean expectException, boolean th
countDownLatch.countDown();
});
List<InputStream> openInputStreams = new ArrayList<>();
blobContainer.asyncBlobUpload(new WriteContext("write_large_blob", new StreamContextSupplier() {
@Override
public StreamContext supplyStreamContext(long partSize) {
return new StreamContext(new CheckedTriFunction<Integer, Long, Long, InputStreamContainer, IOException>() {
@Override
public InputStreamContainer apply(Integer partNo, Long size, Long position) throws IOException {
InputStream inputStream = new OffsetRangeIndexInputStream(new ZeroIndexInput("desc", blobSize), size, position);
openInputStreams.add(inputStream);
return new InputStreamContainer(inputStream, size, position);
}
}, partSize, calculateLastPartSize(blobSize, partSize), calculateNumberOfParts(blobSize, partSize));
}
}, blobSize, false, WritePriority.NORMAL, uploadSuccess -> {

StreamContextSupplier streamContextSupplier = partSize1 -> new StreamContext((partNo, size, position) -> {
InputStream inputStream = new OffsetRangeIndexInputStream(new ZeroIndexInput("desc", blobSize), size, position);
openInputStreams.add(inputStream);
return new InputStreamContainer(inputStream, size, position);
}, partSize1, calculateLastPartSize(blobSize, partSize1), calculateNumberOfParts(blobSize, partSize1));

CheckedConsumer<Boolean, IOException> uploadFinalizer = uploadSuccess -> {
assertTrue(uploadSuccess);
if (throwExceptionOnFinalizeUpload) {
throw new RuntimeException();
}
}, false, null), completionListener);
};

WriteContext writeContext = new WriteContext.Builder().fileName("write_large_blob")
.streamContextSupplier(streamContextSupplier)
.fileSize(blobSize)
.failIfAlreadyExists(false)
.writePriority(WritePriority.NORMAL)
.uploadFinalizer(uploadFinalizer)
.doRemoteDataIntegrityCheck(false)
.build();

blobContainer.asyncBlobUpload(writeContext, completionListener);

assertTrue(countDownLatch.await(5000, TimeUnit.SECONDS));
if (expectException || throwExceptionOnFinalizeUpload) {
Expand Down Expand Up @@ -632,20 +645,23 @@ private void testLargeFilesRedirectedToSlowSyncClient(boolean expectException) t

List<InputStream> openInputStreams = new ArrayList<>();
final S3BlobContainer s3BlobContainer = Mockito.spy(new S3BlobContainer(blobPath, blobStore));
s3BlobContainer.asyncBlobUpload(new WriteContext("write_large_blob", new StreamContextSupplier() {
@Override
public StreamContext supplyStreamContext(long partSize) {
return new StreamContext(new CheckedTriFunction<Integer, Long, Long, InputStreamContainer, IOException>() {
@Override
public InputStreamContainer apply(Integer partNo, Long size, Long position) throws IOException {
InputStream inputStream = new OffsetRangeIndexInputStream(new ZeroIndexInput("desc", blobSize), size, position);
openInputStreams.add(inputStream);
return new InputStreamContainer(inputStream, size, position);
}
}, partSize, calculateLastPartSize(blobSize, partSize), calculateNumberOfParts(blobSize, partSize));
}
}, blobSize, false, WritePriority.HIGH, uploadSuccess -> { assertTrue(uploadSuccess); }, false, null), completionListener);

StreamContextSupplier streamContextSupplier = partSize1 -> new StreamContext((partNo, size, position) -> {
InputStream inputStream = new OffsetRangeIndexInputStream(new ZeroIndexInput("desc", blobSize), size, position);
openInputStreams.add(inputStream);
return new InputStreamContainer(inputStream, size, position);
}, partSize1, calculateLastPartSize(blobSize, partSize1), calculateNumberOfParts(blobSize, partSize1));

WriteContext writeContext = new WriteContext.Builder().fileName("write_large_blob")
.streamContextSupplier(streamContextSupplier)
.fileSize(blobSize)
.failIfAlreadyExists(false)
.writePriority(WritePriority.HIGH)
.uploadFinalizer(Assert::assertTrue)
.doRemoteDataIntegrityCheck(false)
.build();

s3BlobContainer.asyncBlobUpload(writeContext, completionListener);
assertTrue(countDownLatch.await(5000, TimeUnit.SECONDS));
if (expectException) {
assertNotNull(exceptionRef.get());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@

import org.apache.http.HttpStatus;
import org.opensearch.cluster.metadata.RepositoryMetadata;
import org.opensearch.common.CheckedTriFunction;
import org.opensearch.common.Nullable;
import org.opensearch.common.StreamContext;
import org.opensearch.common.SuppressForbidden;
Expand Down Expand Up @@ -332,22 +331,24 @@ public void testWriteBlobByStreamsWithRetries() throws Exception {
exceptionRef.set(ex);
countDownLatch.countDown();
});
blobContainer.asyncBlobUpload(new WriteContext("write_blob_by_streams_max_retries", new StreamContextSupplier() {
@Override
public StreamContext supplyStreamContext(long partSize) {
return new StreamContext(new CheckedTriFunction<Integer, Long, Long, InputStreamContainer, IOException>() {
@Override
public InputStreamContainer apply(Integer partNo, Long size, Long position) throws IOException {
InputStream inputStream = new OffsetRangeIndexInputStream(new ByteArrayIndexInput("desc", bytes), size, position);
openInputStreams.add(inputStream);
return new InputStreamContainer(inputStream, size, position);
}
}, partSize, calculateLastPartSize(bytes.length, partSize), calculateNumberOfParts(bytes.length, partSize));
}
}, bytes.length, false, WritePriority.NORMAL, Assert::assertTrue, false, null), completionListener);

StreamContextSupplier streamContextSupplier = partSize -> new StreamContext((partNo, size, position) -> {
InputStream inputStream = new OffsetRangeIndexInputStream(new ByteArrayIndexInput("desc", bytes), size, position);
openInputStreams.add(inputStream);
return new InputStreamContainer(inputStream, size, position);
}, partSize, calculateLastPartSize(bytes.length, partSize), calculateNumberOfParts(bytes.length, partSize));

WriteContext writeContext = new WriteContext.Builder().fileName("write_blob_by_streams_max_retries")
.streamContextSupplier(streamContextSupplier)
.fileSize(bytes.length)
.failIfAlreadyExists(false)
.writePriority(WritePriority.NORMAL)
.uploadFinalizer(Assert::assertTrue)
.doRemoteDataIntegrityCheck(false)
.build();

blobContainer.asyncBlobUpload(writeContext, completionListener);
assertTrue(countDownLatch.await(5000, TimeUnit.SECONDS));

assertThat(countDown.isCountedDown(), is(true));

openInputStreams.forEach(inputStream -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ public void readBlobAsync(String blobName, ActionListener<ReadContext> listener)
InputStreamContainer blobPartStream = new InputStreamContainer(readBlob(blobName, offset, partSize), partSize, offset);
blobPartStreams.add(() -> CompletableFuture.completedFuture(blobPartStream));
}
ReadContext blobReadContext = new ReadContext(contentLength, blobPartStreams, null);
ReadContext blobReadContext = new ReadContext.Builder(contentLength, blobPartStreams).build();
listener.onResponse(blobReadContext);
} catch (Exception e) {
listener.onFailure(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

package org.opensearch.common.blobstore;

import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.core.action.ActionListener;

import java.io.IOException;
Expand Down Expand Up @@ -77,6 +78,20 @@
*/
InputStream readBlob(String blobName) throws IOException;

/**
* Creates a new {@link BlobDownloadResponse} for the given blob name.
*
* @param blobName
* The name of the blob to get an {@link InputStream} for.
* @return The {@link BlobDownloadResponse} of the blob.
* @throws NoSuchFileException if the blob does not exist
* @throws IOException if the blob can not be read.
*/
@ExperimentalApi
default BlobDownloadResponse readBlobWithMetadata(String blobName) throws IOException {
skumawat2025 marked this conversation as resolved.
Show resolved Hide resolved
throw new UnsupportedOperationException("readBlobWithMetadata is not implemented yet");

Check warning on line 92 in server/src/main/java/org/opensearch/common/blobstore/BlobContainer.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/common/blobstore/BlobContainer.java#L92

Added line #L92 was not covered by tests
};

/**
* Creates a new {@link InputStream} that can be used to read the given blob starting from
* a specific {@code position} in the blob. The {@code length} is an indication of the
Expand Down Expand Up @@ -128,6 +143,36 @@
*/
void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException;

/**
* Reads blob content from the input stream and writes it to the container in a new blob with the given name, and metadata.
* This method assumes the container does not already contain a blob of the same blobName. If a blob by the
* same name already exists, the operation will fail and an {@link IOException} will be thrown.
*
* @param blobName
* The name of the blob to write the contents of the input stream to.
* @param inputStream
* The input stream from which to retrieve the bytes to write to the blob.
* @param metadata
* The metadata to be associate with the blob upload.
* @param blobSize
* The size of the blob to be written, in bytes. It is implementation dependent whether
* this value is used in writing the blob to the repository.
* @param failIfAlreadyExists
* whether to throw a FileAlreadyExistsException if the given blob already exists
* @throws FileAlreadyExistsException if failIfAlreadyExists is true and a blob by the same name already exists
skumawat2025 marked this conversation as resolved.
Show resolved Hide resolved
* @throws IOException if the input stream could not be read, or the target blob could not be written to.
*/
@ExperimentalApi
default void writeBlobWithMetadata(
String blobName,
InputStream inputStream,
Map<String, String> metadata,
long blobSize,
boolean failIfAlreadyExists
) throws IOException {
throw new UnsupportedOperationException("writeBlobWithMetadata is not implemented yet");

Check warning on line 173 in server/src/main/java/org/opensearch/common/blobstore/BlobContainer.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/common/blobstore/BlobContainer.java#L173

Added line #L173 was not covered by tests
};

/**
* Reads blob content from the input stream and writes it to the container in a new blob with the given name,
* using an atomic write operation if the implementation supports it.
Expand All @@ -149,6 +194,38 @@
*/
void writeBlobAtomic(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException;

/**
* Reads blob content from the input stream and writes it to the container in a new blob with the given name, and metadata
* using an atomic write operation if the implementation supports it.
* <p>
* This method assumes the container does not already contain a blob of the same blobName. If a blob by the
* same name already exists, the operation will fail and an {@link IOException} will be thrown.
*
* @param blobName
* The name of the blob to write the contents of the input stream to.
* @param inputStream
* The input stream from which to retrieve the bytes to write to the blob.
* @param metadata
* The metadata to be associate with the blob upload.
* @param blobSize
* The size of the blob to be written, in bytes. It is implementation dependent whether
* this value is used in writing the blob to the repository.
* @param failIfAlreadyExists
* whether to throw a FileAlreadyExistsException if the given blob already exists
* @throws FileAlreadyExistsException if failIfAlreadyExists is true and a blob by the same name already exists
* @throws IOException if the input stream could not be read, or the target blob could not be written to.
*/
@ExperimentalApi
default void writeBlobAtomicWithMetadata(
String blobName,
InputStream inputStream,
Map<String, String> metadata,
long blobSize,
boolean failIfAlreadyExists
) throws IOException {
throw new UnsupportedOperationException("writeBlobAtomicWithMetadata is not implemented yet");

Check warning on line 226 in server/src/main/java/org/opensearch/common/blobstore/BlobContainer.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/common/blobstore/BlobContainer.java#L226

Added line #L226 was not covered by tests
};

/**
* Deletes this container and all its contents from the repository.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.common.blobstore;

import java.io.InputStream;
import java.util.Map;

/**
* Represents the response from a blob download operation, containing both the
* input stream of the blob content and the associated metadata.
*
* @opensearch.experimental
*/
public class BlobDownloadResponse {
skumawat2025 marked this conversation as resolved.
Show resolved Hide resolved

/**
* Downloaded blob InputStream
*/
private final InputStream inputStream;

/**
* Metadata of the downloaded blob
*/
private final Map<String, String> metadata;
sachinpkale marked this conversation as resolved.
Show resolved Hide resolved

public InputStream getInputStream() {
return inputStream;

Check warning on line 33 in server/src/main/java/org/opensearch/common/blobstore/BlobDownloadResponse.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/common/blobstore/BlobDownloadResponse.java#L33

Added line #L33 was not covered by tests
}

public Map<String, String> getMetadata() {
return metadata;

Check warning on line 37 in server/src/main/java/org/opensearch/common/blobstore/BlobDownloadResponse.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/common/blobstore/BlobDownloadResponse.java#L37

Added line #L37 was not covered by tests
}

public BlobDownloadResponse(InputStream inputStream, Map<String, String> metadata) {
this.inputStream = inputStream;
this.metadata = metadata;
}

Check warning on line 43 in server/src/main/java/org/opensearch/common/blobstore/BlobDownloadResponse.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/common/blobstore/BlobDownloadResponse.java#L40-L43

Added lines #L40 - L43 were not covered by tests

}
Loading
Loading